This page describes configuration options for Iceberg catalogs and read/write operations in Flink.
Catalog Configuration
A catalog is created and named by executing the following query:
CREATE CATALOG < catalog_name > WITH (
'type' = 'iceberg' ,
`<config_key>` = `<config_value>`
);
Global Properties
The following properties can be set globally and are not limited to a specific catalog implementation:
Property Required Values Description type ✔️ iceberg Must be iceberg catalog-type hive, hadoop, rest, glue, jdbc, nessieThe underlying Iceberg catalog implementation catalog-impl The fully-qualified class name of a custom catalog implementation property-version Version number to describe the property version. Current version is 1 cache-enabled true or falseWhether to enable catalog cache, default is true cache.expiration-interval-ms How long catalog entries are cached (ms); -1 disables expiration, default is -1
Hive Catalog
The following properties can be set if using the Hive catalog:
Property Required Description uri ✔️ The Hive metastore’s thrift URI clients The Hive metastore client pool size, default is 2 warehouse The Hive warehouse location hive-conf-dir Path to a directory containing a hive-site.xml configuration file hadoop-conf-dir Path to a directory containing core-site.xml and hdfs-site.xml
Hadoop Catalog
The following properties can be set if using the Hadoop catalog:
Property Required Description warehouse ✔️ The HDFS directory to store metadata files and data files
REST Catalog
The following properties can be set if using the REST catalog:
Property Required Description uri ✔️ The URL to the REST Catalog credential A credential to exchange for a token in the OAuth2 client credentials flow token A token which will be used to interact with the server
Read Options
Flink read options can be passed in multiple ways:
DataStream API
IcebergSource . forRowData ()
. tableLoader ( TableLoader . fromCatalog (...))
. assignerFactory ( new SimpleSplitAssignerFactory ())
. streaming ( true )
. startSnapshotId ( 3821550127947089987L )
. monitorInterval ( Duration . ofMillis ( 10L ))
. build ()
SQL Hints
SELECT * FROM tableName /*+ OPTIONS('monitor-interval'='10s') */
Flink Configuration
env . getConfig ()
. getConfiguration ()
. set ( FlinkReadOptions . SPLIT_FILE_OPEN_COST_OPTION , 1000L );
Priority: Read option > Flink configuration > Table property
Available Read Options
Read Option Flink Configuration Table Property Default Description snapshot-id N/A N/A null For time travel in batch mode. Read data from the specified snapshot-id case-sensitive connector.iceberg.case-sensitive N/A false If true, match column name in a case sensitive way as-of-timestamp N/A N/A null For time travel in batch mode. Read data from the most recent snapshot as of the given time (ms) starting-strategy connector.iceberg.starting-strategy N/A INCREMENTAL_FROM_LATEST_SNAPSHOT Starting strategy for streaming execution start-snapshot-timestamp N/A N/A null Start to read data from the most recent snapshot as of the given time (ms) start-snapshot-id N/A N/A null Start to read data from the specified snapshot-id end-snapshot-id N/A N/A Latest snapshot Specifies the end snapshot branch N/A N/A main Specifies the branch to read from in batch mode tag N/A N/A null Specifies the tag to read from in batch mode start-tag N/A N/A null Specifies the starting tag to read from for incremental reads end-tag N/A N/A null Specifies the ending tag to read from for incremental reads split-size connector.iceberg.split-size read.split.target-size 128 MB Target size when combining input splits split-lookback connector.iceberg.split-file-open-cost read.split.planning-lookback 10 Number of bins to consider when combining input splits split-file-open-cost connector.iceberg.split-file-open-cost read.split.open-file-cost 4MB The estimated cost to open a file streaming connector.iceberg.streaming N/A false Sets whether the current task runs in streaming or batch mode monitor-interval connector.iceberg.monitor-interval N/A 60s Monitor interval to discover splits from new snapshots include-column-stats connector.iceberg.include-column-stats N/A false Load column stats with each data file max-planning-snapshot-count connector.iceberg.max-planning-snapshot-count N/A Integer.MAX_VALUE Max number of snapshots limited per split enumeration limit connector.iceberg.limit N/A -1 Limited output number of rows max-allowed-planning-failures connector.iceberg.max-allowed-planning-failures N/A 3 Max allowed consecutive failures for scan planning watermark-column connector.iceberg.watermark-column N/A null Specifies the watermark column for watermark generation watermark-column-time-unit connector.iceberg.watermark-column-time-unit N/A TimeUnit.MICROSECONDS Specifies the watermark time unit
Write Options
Flink write options can be passed when configuring the FlinkSink:
DataStream API
FlinkSink . Builder builder = FlinkSink . forRow (dataStream, SimpleDataUtil . FLINK_SCHEMA )
. table (table)
. tableLoader (tableLoader)
. set ( "write-format" , "orc" )
. set ( FlinkWriteOptions . OVERWRITE_MODE , "true" );
SQL Hints
INSERT INTO tableName /*+ OPTIONS('upsert-enabled'='true') */
SELECT * FROM source;
Available Write Options
Flink Option Default Description write-format Table write.format.default File format to use for this write operation; parquet, avro, or orc target-file-size-bytes As per table property Overrides this table’s write.target-file-size-bytes upsert-enabled Table write.upsert.enabled Overrides this table’s write.upsert.enabled overwrite-enabled false Overwrite the table’s data distribution-mode Table write.distribution-mode Overrides this table’s write.distribution-mode range-distribution-statistics-type Auto Range distribution data statistics collection type: Map, Sketch, Auto range-distribution-sort-key-base-weight 0.0 Base weight for every sort key relative to target traffic weight per writer task compression-codec Table write.(fileformat).compression-codec Overrides this table’s compression codec for this write compression-level Table write.(fileformat).compression-level Overrides this table’s compression level for Parquet and Avro compression-strategy Table write.orc.compression-strategy Overrides this table’s compression strategy for ORC write-parallelism Upstream operator parallelism Overrides the writer parallelism uid-suffix As per table property Overrides the uid suffix used in the underlying IcebergSink
Range Distribution Statistics Type
Map : Collects accurate sampling count for every single key. Use for low cardinality scenarios (hundreds or thousands)
Sketch : Constructs uniform random sampling via reservoir sampling. Use for high cardinality scenarios (millions)
Auto : Starts with Map statistics. If cardinality exceeds 10,000, switches to Sketch automatically
Range Distribution Sort Key Base Weight
If sort order contains partition columns, each sort key maps to one partition and data file. This relative weight avoids placing too many small files for sort keys with low traffic.
Value of 0.02 means each key has a base weight of 2% of the targeted traffic weight per writer task.
This is only applicable to StatisticsType.Map for low-cardinality scenarios.
Examples
-- Set execution mode to batch
SET execution . runtime - mode = batch;
-- Read from a specific snapshot
SELECT * FROM my_table /*+ OPTIONS('snapshot-id'='3821550127947089987') */ ;
-- Time travel to a specific timestamp
SELECT * FROM my_table /*+ OPTIONS('as-of-timestamp'='1609459200000') */ ;
-- Set execution mode to streaming
SET execution . runtime - mode = streaming;
-- Enable dynamic table options
SET table . dynamic - table - options . enabled = true;
-- Stream from latest snapshot
SELECT * FROM my_table /*+ OPTIONS(
'streaming'='true',
'monitor-interval'='10s'
) */ ;
-- Write with specific format
INSERT INTO my_table /*+ OPTIONS(
'write-format'='orc',
'compression-codec'='zstd'
) */
SELECT * FROM source;
-- Write with upsert enabled
INSERT INTO my_table /*+ OPTIONS('upsert-enabled'='true') */
SELECT * FROM source;
Next Steps
Queries Learn about reading data with Flink
Writes Learn about writing data with Flink