Skip to main content
Iceberg supports both streaming and batch reads with Apache Flink’s DataStream API and Table API.

Reading with SQL

Iceberg supports both streaming and batch read in Flink. Execute the following SQL command to switch execution mode:
-- Execute the flink job in streaming mode for current session context
SET execution.runtime-mode = streaming;

-- Execute the flink job in batch mode for current session context
SET execution.runtime-mode = batch;

Batch Read

Submit a Flink batch job:
-- Execute the flink job in batch mode for current session context
SET execution.runtime-mode = batch;
SELECT * FROM sample;

Streaming Read

Iceberg supports processing incremental data in Flink streaming jobs which starts from a historical snapshot:
-- Submit the flink job in streaming mode for current session.
SET execution.runtime-mode = streaming;

-- Enable this switch because streaming read SQL will provide options in SQL hint.
SET table.dynamic-table-options.enabled=true;

-- Read all records from current snapshot, then read incremental data.
SELECT * FROM sample /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/ ;

-- Read incremental data starting from snapshot-id (records from this snapshot excluded).
SELECT * FROM sample /*+ OPTIONS(
  'streaming'='true',
  'monitor-interval'='1s',
  'start-snapshot-id'='3821550127947089987'
)*/ ;
See read options for more configuration details.

FLIP-27 Source for SQL

To opt in or out of the FLIP-27 source:
-- Opt out the FLIP-27 source.
-- Default is false for Flink 1.19 and below, and true for Flink 1.20 and above.
SET table.exec.iceberg.use-flip27-source = false;

Reading Branches and Tags

Branches and tags can be read via SQL by specifying options:
-- Read from branch b1
SELECT * FROM table /*+ OPTIONS('branch'='b1') */ ;

-- Read from tag t1
SELECT * FROM table /*+ OPTIONS('tag'='t1') */;

-- Incremental scan from tag t1 to tag t2
SELECT * FROM table /*+ OPTIONS(
  'streaming'='true',
  'monitor-interval'='1s',
  'start-tag'='t1',
  'end-tag'='t2'
) */;

Reading with DataStream

Batch Read

This example reads all records from an Iceberg table and prints them to stdout:
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path");

DataStream<RowData> batch = FlinkSource.forRowData()
     .env(env)
     .tableLoader(tableLoader)
     .streaming(false)
     .build();

// Print all records to stdout.
batch.print();

// Submit and execute this batch read job.
env.execute("Test Iceberg Batch Read");

Streaming Read

This example reads incremental records starting from a specific snapshot:
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path");

DataStream<RowData> stream = FlinkSource.forRowData()
     .env(env)
     .tableLoader(tableLoader)
     .streaming(true)
     .startSnapshotId(3821550127947089987L)
     .build();

// Print all records to stdout.
stream.print();

// Submit and execute this streaming read job.
env.execute("Test Iceberg Streaming Read");
See the FlinkSource#Builder for more options.

FLIP-27 Source (DataStream)

The FLIP-27 source interface was introduced in Flink 1.12. It aims to solve several shortcomings of the old SourceFunction interface.

Batch Read with FLIP-27

StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path");

IcebergSource<RowData> source = IcebergSource.forRowData()
    .tableLoader(tableLoader)
    .assignerFactory(new SimpleSplitAssignerFactory())
    .build();

DataStream<RowData> batch = env.fromSource(
    source,
    WatermarkStrategy.noWatermarks(),
    "My Iceberg Source",
    TypeInformation.of(RowData.class));

// Print all records to stdout.
batch.print();

// Submit and execute this batch read job.
env.execute("Test Iceberg Batch Read");

Streaming Read with FLIP-27

This example starts streaming read from the latest table snapshot (inclusive). Every 60s, it polls the Iceberg table to discover new append-only snapshots:
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path");

IcebergSource source = IcebergSource.forRowData()
    .tableLoader(tableLoader)
    .assignerFactory(new SimpleSplitAssignerFactory())
    .streaming(true)
    .streamingStartingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT)
    .monitorInterval(Duration.ofSeconds(60))
    .build();

DataStream<RowData> stream = env.fromSource(
    source,
    WatermarkStrategy.noWatermarks(),
    "My Iceberg Source",
    TypeInformation.of(RowData.class));

// Print all records to stdout.
stream.print();

// Submit and execute this streaming read job.
env.execute("Test Iceberg Streaming Read");
CDC read is not supported yet.

Reading Branches and Tags with DataStream

Branches and tags can also be read via the DataStream API:
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path");

// Read from branch
DataStream<RowData> batch = FlinkSource.forRowData()
    .env(env)
    .tableLoader(tableLoader)
    .branch("test-branch")
    .streaming(false)
    .build();

// Read from tag
DataStream<RowData> batch = FlinkSource.forRowData()
    .env(env)
    .tableLoader(tableLoader)
    .tag("test-tag")
    .streaming(false)
    .build();

// Streaming read from start-tag
DataStream<RowData> stream = FlinkSource.forRowData()
    .env(env)
    .tableLoader(tableLoader)
    .streaming(true)
    .startTag("test-tag")
    .build();

Inspecting Tables

To inspect a table’s history, snapshots, and other metadata, Iceberg supports metadata tables.
Metadata tables are identified by adding the metadata table name after the original table name. For example, history for db.table is read using db.table$history.

History

To show table history:
SELECT * FROM prod.db.table$history;
made_current_atsnapshot_idparent_idis_current_ancestor
2019-02-08 03:29:51.2155781947118336215154NULLtrue
2019-02-08 03:47:55.94851792995261850568305781947118336215154true
2019-02-09 16:24:30.132964100402475335445179299526185056830false
2019-02-09 16:32:47.33629998756080624373305179299526185056830true
This shows a commit that was rolled back. Snapshot 296410040247533544 and 2999875608062437330 have the same parent snapshot. Snapshot 296410040247533544 was rolled back and is not an ancestor of the current table state.

Snapshots

To show the valid snapshots for a table:
SELECT * FROM prod.db.table$snapshots;
committed_atsnapshot_idparent_idoperationmanifest_listsummary
2019-02-08 03:29:51.21557897183625154nullappends3://…/table/metadata/snap-57897183625154-1.avroadded-records: 2478404, total-records: 2478404

Files

To show a table’s current data files:
SELECT * FROM prod.db.table$files;

Manifests

To show a table’s current file manifests:
SELECT * FROM prod.db.table$manifests;

Partitions

To show a table’s current partitions:
SELECT * FROM prod.db.table$partitions;

References

To show a table’s known snapshot references:
SELECT * FROM prod.db.table$refs;
nametypesnapshot_idmax_reference_age_in_msmin_snapshots_to_keepmax_snapshot_age_in_ms
mainBRANCH4686954189838128572102030
testTagTAG468695418983812857210nullnull

Next Steps

Writes

Write data to Iceberg tables

Configuration

Configure read options for Flink