Skip to main content
Apache Flink supports creating Iceberg table directly without creating an explicit Flink catalog in Flink SQL. This allows you to create an Iceberg table by specifying 'connector'='iceberg' table option in Flink SQL.

Overview

In Flink, the SQL CREATE TABLE test (..) WITH ('connector'='iceberg', ...) will create a Flink table in current Flink catalog (uses GenericInMemoryCatalog by default), which is just mapping to the underlying Iceberg table instead of maintaining Iceberg table directly in current Flink catalog.
The Flink Iceberg connector allows setting the catalog properties through table properties. See Catalog Configuration for details.

Hive Catalog Example

Before executing the following SQL, please make sure you’ve configured the Flink SQL client correctly according to the Flink overview.

Basic Table Creation

The following SQL will create a Flink table in the current Flink catalog, which maps to the Iceberg table default_database.flink_table managed in Iceberg catalog:
CREATE TABLE flink_table (
    id   BIGINT,
    data STRING
) WITH (
    'connector'='iceberg',
    'catalog-name'='hive_prod',
    'uri'='thrift://localhost:9083',
    'warehouse'='hdfs://nn:8020/path/to/warehouse'
);

Mapping to Different Table

If you want to create a Flink table mapping to a different Iceberg table managed in Hive catalog (such as hive_db.hive_iceberg_table in Hive), then you can create Flink table as following:
CREATE TABLE flink_table (
    id   BIGINT,
    data STRING
) WITH (
    'connector'='iceberg',
    'catalog-name'='hive_prod',
    'catalog-database'='hive_db',
    'catalog-table'='hive_iceberg_table',
    'uri'='thrift://localhost:9083',
    'warehouse'='hdfs://nn:8020/path/to/warehouse'
);
The underlying catalog database (hive_db in the above example) will be created automatically if it does not exist when writing records into the Flink table.

Hadoop Catalog Example

The following SQL will create a Flink table in current Flink catalog, which maps to the Iceberg table default_database.flink_table managed in Hadoop catalog:
CREATE TABLE flink_table (
    id   BIGINT,
    data STRING
) WITH (
    'connector'='iceberg',
    'catalog-name'='hadoop_prod',
    'catalog-type'='hadoop',
    'warehouse'='hdfs://nn:8020/path/to/warehouse'
);

REST Catalog Example

The following SQL will create a Flink table in current Flink catalog, which maps to the Iceberg table default_database.flink_table managed in REST catalog:
CREATE TABLE flink_table (
    id   BIGINT,
    data STRING
) WITH (
    'connector'='iceberg',
    'catalog-name'='rest_prod',
    'catalog-type'='rest',
    'uri'='https://localhost/',
    'credential'='xxxx', -- Optional
    'token'='xxxx',      -- Optional
    'scope'='xxxx'       -- Optional
);

Custom Catalog Example

The following SQL will create a Flink table in current Flink catalog, which maps to the Iceberg table default_database.flink_table managed in a custom catalog of type com.my.custom.CatalogImpl:
CREATE TABLE flink_table (
    id   BIGINT,
    data STRING
) WITH (
    'connector'='iceberg',
    'catalog-name'='custom_prod',
    'catalog-impl'='com.my.custom.CatalogImpl',
    -- More table properties for the customized catalog
    'my-additional-catalog-config'='my-value'
);
Check sections under the Integrations tab for all custom catalogs.

Complete Example

Here’s a complete example using Hive catalog:
CREATE TABLE flink_table (
    id   BIGINT,
    data STRING
) WITH (
    'connector'='iceberg',
    'catalog-name'='hive_prod',
    'uri'='thrift://localhost:9083',
    'warehouse'='file:///path/to/warehouse'
);

INSERT INTO flink_table VALUES (1, 'AAA'), (2, 'BBB'), (3, 'CCC');

SET execution.result-mode=tableau;
SELECT * FROM flink_table;
Result:
+----+------+
| id | data |
+----+------+
|  1 |  AAA |
|  2 |  BBB |
|  3 |  CCC |
+----+------+
3 rows in set

Next Steps

DDL Operations

Learn about DDL commands for Iceberg tables

Configuration

Configure Flink catalog properties