Writing with SQL
Iceberg supports bothINSERT INTO and INSERT OVERWRITE.
INSERT INTO
To append new data to a table with a Flink streaming job, useINSERT INTO:
INSERT OVERWRITE
To replace data in the table with the result of a query, useINSERT OVERWRITE in batch job. Overwrites are atomic operations for Iceberg tables.
Partitions that have rows produced by the SELECT query will be replaced:
select values:
For a partitioned table, when all partition columns are set in the
PARTITION clause, it’s a static partition insert. If only some partition columns are set, it’s a dynamic partition insert. For an unpartitioned table, its data will be completely overwritten by INSERT OVERWRITE.UPSERT
Iceberg supportsUPSERT based on the primary key when writing data into v2 table format. There are two ways to enable upsert:
Table-Level Property
Enable theUPSERT mode as table-level property write.upsert.enabled:
Write Option
EnableUPSERT mode using upsert-enabled in the write options:
Writing with DataStream
Appending Data
Flink supports writingDataStream<RowData> and DataStream<Row> to Iceberg tables:
Overwrite Data
Set theoverwrite flag in FlinkSink builder to overwrite data:
Upsert Data
Set theupsert flag in FlinkSink builder to upsert data. The table must use v2 table format and have a primary key:
Branch Writes
Writing to branches in Iceberg tables is supported via thetoBranch API:
Write Metrics
The Flink Iceberg sink provides metrics for monitoring write operations.Writer Metrics
Parallel writer metrics are added under the sub group ofIcebergStreamWriter.
Tags:
table: full table name (like iceberg.my_db.my_table)subtask_index: writer subtask index starting from 0
| Metric Name | Type | Description |
|---|---|---|
| lastFlushDurationMs | Gauge | Duration (in ms) that writer subtasks take to flush and upload files during checkpoint |
| flushedDataFiles | Counter | Number of data files flushed and uploaded |
| flushedDeleteFiles | Counter | Number of delete files flushed and uploaded |
| flushedReferencedDataFiles | Counter | Number of data files referenced by the flushed delete files |
| dataFilesSizeHistogram | Histogram | Histogram distribution of data file sizes (in bytes) |
| deleteFilesSizeHistogram | Histogram | Histogram distribution of delete file sizes (in bytes) |
Committer Metrics
Committer metrics are added under the sub group ofIcebergFilesCommitter.
Tags:
table: full table name (like iceberg.my_db.my_table)
| Metric Name | Type | Description |
|---|---|---|
| lastCheckpointDurationMs | Gauge | Duration (in ms) that the committer operator checkpoints its state |
| lastCommitDurationMs | Gauge | Duration (in ms) that the Iceberg table commit takes |
| committedDataFilesCount | Counter | Number of data files committed |
| committedDataFilesRecordCount | Counter | Number of records in the committed data files |
| committedDataFilesByteCount | Counter | Number of bytes in the committed data files |
| committedDeleteFilesCount | Counter | Number of delete files committed |
| committedDeleteFilesRecordCount | Counter | Number of records in the committed delete files |
| committedDeleteFilesByteCount | Counter | Number of bytes in the committed delete files |
| elapsedSecondsSinceLastSuccessfulCommit | Gauge | Elapsed time (in seconds) since last successful Iceberg commit |
Write Options
Flink write options are passed when configuring the FlinkSink:Distribution Mode
Flink streaming writer supports bothHASH and RANGE distribution modes.
Hash Distribution
HASH distribution shuffles data by partition key (partitioned table) or equality fields (non-partitioned table). Limitations:- Doesn’t handle skewed data well
- Can result in unbalanced traffic distribution if cardinality is low
- Writer parallelism is limited to the cardinality of the hash key
Range Distribution (Experimental)
RANGE distribution shuffles data by partition key or sort order via a custom range partitioner. It collects traffic statistics to evenly distribute traffic to writer tasks.Range distribution only shuffles the data via range partitioner. Rows are not sorted within a data file.
Use Cases
- Tables partitioned by event time with skewed distribution
- Tables partitioned by country code with varying traffic
- Tables where queries include predicates on non-partition columns
Usage
Enable range distribution in Java:Sink V2 Implementation
In Flink 1.15, the SinkV2 interface was introduced. A newIcebergSink implementation based on SinkV2 is available.
Writing with SQL
To turn on SinkV2 based implementation in SQL:Writing with DataStream
To use SinkV2 based implementation, replaceFlinkSink with IcebergSink:
Important Notes
Next Steps
Configuration
Configure write options for Flink
Maintenance
Maintain Iceberg tables with Flink