Feature Support
Iceberg provides comprehensive Flink integration with both SQL and DataStream APIs:| Feature | Support | Notes |
|---|---|---|
| SQL create catalog | ✔️ | |
| SQL create database | ✔️ | |
| SQL create table | ✔️ | |
| SQL create table like | ✔️ | |
| SQL alter table | ✔️ | Only support altering table properties, column and partition changes are not supported |
| SQL drop table | ✔️ | |
| SQL select | ✔️ | Support both streaming and batch mode |
| SQL insert into | ✔️ | Support both streaming and batch mode |
| SQL insert overwrite | ✔️ | |
| DataStream read | ✔️ | |
| DataStream append | ✔️ | |
| DataStream overwrite | ✔️ | |
| Metadata tables | ✔️ | |
| Rewrite files action | ✔️ |
Getting Started
Prerequisites
Download Flink from the Apache download page. Iceberg uses Scala 2.12 when compiling the Apacheiceberg-flink-runtime jar, so it’s recommended to use Flink bundled with Scala 2.12.
Flink SQL Client
Start the Flink SQL client with the Iceberg runtime jar:By default, Iceberg ships with Hadoop jars for Hadoop catalog. To use Hive catalog, load the Hive jars when opening the Flink SQL client.
Flink Python API
Install the Apache Flink dependency usingpip:
StreamTableEnvironment and execute Flink SQL:
Type Conversion
Iceberg’s integration for Flink automatically converts between Flink and Iceberg types.Flink to Iceberg
| Flink | Iceberg | Notes |
|---|---|---|
| boolean | boolean | |
| tinyint | integer | |
| smallint | integer | |
| integer | integer | |
| bigint | long | |
| float | float | |
| double | double | |
| char | string | |
| varchar | string | |
| string | string | |
| binary | binary | |
| varbinary | fixed | |
| decimal | decimal | |
| date | date | |
| time | time | |
| timestamp | timestamp without timezone | |
| timestamp_ltz | timestamp with timezone | |
| array | list | |
| map | map | |
| multiset | map | |
| row | struct | |
| raw | Not supported | |
| interval | Not supported | |
| structured | Not supported | |
| timestamp with zone | Not supported | |
| distinct | Not supported | |
| null | Not supported | |
| symbol | Not supported | |
| logical | Not supported |
Iceberg to Flink
| Iceberg | Flink | Notes |
|---|---|---|
| boolean | boolean | |
| struct | row | |
| list | array | |
| map | map | |
| integer | integer | |
| long | bigint | |
| float | float | |
| double | double | |
| date | date | |
| time | time | |
| timestamp without timezone | timestamp(6) | |
| timestamp with timezone | timestamp_ltz(6) | |
| string | varchar(2147483647) | |
| uuid | binary(16) | |
| fixed(N) | binary(N) | |
| binary | varbinary(2147483647) | |
| decimal(P, S) | decimal(P, S) | |
| nanosecond timestamp | timestamp(9) | |
| nanosecond timestamp with timezone | timestamp_ltz(9) | |
| unknown | null | |
| variant | Not supported | |
| geometry | Not supported | |
| geography | Not supported |
Next Steps
Connector Setup
Configure Flink connector for Iceberg tables
DDL Operations
Create and manage Iceberg tables with Flink DDL
Queries
Read data from Iceberg tables using Flink
Writes
Write data to Iceberg tables with Flink