Iceberg supports both streaming and batch read in Flink. Execute the following SQL command to switch execution mode:
-- Execute the flink job in streaming mode for current session contextSET execution.runtime-mode = streaming;-- Execute the flink job in batch mode for current session contextSET execution.runtime-mode = batch;
Iceberg supports processing incremental data in Flink streaming jobs which starts from a historical snapshot:
-- Submit the flink job in streaming mode for current session.SET execution.runtime-mode = streaming;-- Enable this switch because streaming read SQL will provide options in SQL hint.SET table.dynamic-table-options.enabled=true;-- Read all records from current snapshot, then read incremental data.SELECT * FROM sample /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/ ;-- Read incremental data starting from snapshot-id (records from this snapshot excluded).SELECT * FROM sample /*+ OPTIONS( 'streaming'='true', 'monitor-interval'='1s', 'start-snapshot-id'='3821550127947089987')*/ ;
-- Opt out the FLIP-27 source.-- Default is false for Flink 1.19 and below, and true for Flink 1.20 and above.SET table.exec.iceberg.use-flip27-source = false;
Branches and tags can be read via SQL by specifying options:
-- Read from branch b1SELECT * FROM table /*+ OPTIONS('branch'='b1') */ ;-- Read from tag t1SELECT * FROM table /*+ OPTIONS('tag'='t1') */;-- Incremental scan from tag t1 to tag t2SELECT * FROM table /*+ OPTIONS( 'streaming'='true', 'monitor-interval'='1s', 'start-tag'='t1', 'end-tag'='t2') */;
This example starts streaming read from the latest table snapshot (inclusive). Every 60s, it polls the Iceberg table to discover new append-only snapshots:
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path");IcebergSource source = IcebergSource.forRowData() .tableLoader(tableLoader) .assignerFactory(new SimpleSplitAssignerFactory()) .streaming(true) .streamingStartingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT) .monitorInterval(Duration.ofSeconds(60)) .build();DataStream<RowData> stream = env.fromSource( source, WatermarkStrategy.noWatermarks(), "My Iceberg Source", TypeInformation.of(RowData.class));// Print all records to stdout.stream.print();// Submit and execute this streaming read job.env.execute("Test Iceberg Streaming Read");
To inspect a table’s history, snapshots, and other metadata, Iceberg supports metadata tables.
Metadata tables are identified by adding the metadata table name after the original table name. For example, history for db.table is read using db.table$history.
This shows a commit that was rolled back. Snapshot 296410040247533544 and 2999875608062437330 have the same parent snapshot. Snapshot 296410040247533544 was rolled back and is not an ancestor of the current table state.