Skip to main content

Catalogs

Spark uses pluggable table catalogs configured via properties under spark.sql.catalog.

Catalog Types

Iceberg provides two catalog implementations:
ImplementationDescriptionUse Case
SparkCatalogDedicated Iceberg catalogHive Metastore or Hadoop warehouse
SparkSessionCatalogAdds Iceberg support to built-in catalogMixed Iceberg and non-Iceberg tables

Hive Metastore Catalog

Configure a Hive-based catalog:
spark.sql.catalog.hive_prod=org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.hive_prod.type=hive
spark.sql.catalog.hive_prod.uri=thrift://metastore-host:port
# Omit uri to use hive.metastore.uris from hive-site.xml

REST Catalog

Configure a REST catalog:
spark.sql.catalog.rest_prod=org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.rest_prod.type=rest
spark.sql.catalog.rest_prod.uri=http://localhost:8080

Hadoop Catalog

Configure a directory-based catalog:
spark.sql.catalog.hadoop_prod=org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.hadoop_prod.type=hadoop
spark.sql.catalog.hadoop_prod.warehouse=hdfs://nn:8020/warehouse/path

Catalog Configuration

Common configuration properties:
PropertyValuesDescription
spark.sql.catalog.<name>.typehive, hadoop, rest, glue, jdbc, nessieCatalog implementation type
spark.sql.catalog.<name>.catalog-implClass nameCustom catalog implementation
spark.sql.catalog.<name>.io-implClass nameCustom FileIO implementation
spark.sql.catalog.<name>.warehousePathWarehouse directory base path
spark.sql.catalog.<name>.uriURIMetastore URI (Hive) or REST URL
spark.sql.catalog.<name>.default-namespaceNamespaceDefault current namespace
spark.sql.catalog.<name>.cache-enabledtrue/falseEnable catalog cache (default: true)
spark.sql.catalog.<name>.cache.expiration-interval-msMillisecondsCache expiration time (default: 30000)

Table Defaults and Overrides

Set default or enforced table properties:
# Default property (can be overridden)
spark.sql.catalog.my_catalog.table-default.write.format.default=orc

# Override property (cannot be overridden)
spark.sql.catalog.my_catalog.table-override.write.metadata.compression-codec=gzip

View Defaults and Overrides

Similar configuration for views:
spark.sql.catalog.my_catalog.view-default.key=value
spark.sql.catalog.my_catalog.view-override.key=value

Using Catalogs

Reference tables with catalog names:
SELECT * FROM hive_prod.db.table;

Replacing the Session Catalog

Add Iceberg support to Spark’s built-in catalog:
spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog
spark.sql.catalog.spark_catalog.type=hive
This allows using the same Hive Metastore for both Iceberg and non-Iceberg tables. Non-Iceberg tables are handled by the built-in catalog.

Catalog-Specific Hadoop Configuration

Set per-catalog Hadoop properties:
spark.sql.catalog.hadoop_prod.hadoop.fs.s3a.endpoint=http://aws-local:9000
spark.sql.catalog.hadoop_prod.hadoop.fs.s3a.access.key=mykey
spark.sql.catalog.hadoop_prod.hadoop.fs.s3a.secret.key=mysecret
Catalog-specific properties take precedence over global spark.hadoop.* properties.

Loading Custom Catalogs

Use a custom catalog implementation:
spark.sql.catalog.custom_prod=org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.custom_prod.catalog-impl=com.my.custom.CatalogImpl
spark.sql.catalog.custom_prod.my-additional-catalog-config=my-value

SQL Extensions

Enable Iceberg SQL extensions for advanced features:
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
Extensions enable:
  • CALL stored procedures
  • ALTER TABLE ... ADD/DROP PARTITION FIELD
  • ALTER TABLE ... WRITE ORDERED BY
  • ALTER TABLE ... SET IDENTIFIER FIELDS
  • Branching and tagging DDL

Runtime Configuration

Configuration Precedence

Settings are applied in the following order (highest to lowest priority):
  1. DataSource Read/Write Options - .option(...) in code
  2. Spark Session Configuration - spark.conf.set(...) or spark-defaults.conf
  3. Table Properties - ALTER TABLE SET TBLPROPERTIES
  4. Default Value

Spark SQL Options

Global Iceberg behaviors via Spark configuration:
val spark = SparkSession.builder()
  .appName("IcebergExample")
  .config("spark.sql.catalog.my_catalog", 
          "org.apache.iceberg.spark.SparkCatalog")
  .config("spark.sql.extensions", 
          "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
  .config("spark.sql.iceberg.vectorization.enabled", "false")
  .getOrCreate()

Common SQL Options

OptionDefaultDescription
spark.sql.iceberg.vectorization.enabledTable defaultEnable vectorized reads
spark.sql.iceberg.parquet.reader-typeICEBERGParquet reader (ICEBERG, COMET)
spark.sql.iceberg.check-nullabilitytrueValidate write schema nullability
spark.sql.iceberg.check-orderingtrueValidate write schema column order
spark.sql.iceberg.aggregate-push-down.enabledtruePush down aggregates (MAX, MIN, COUNT)
spark.sql.iceberg.distribution-modeSee WritesWrite distribution strategy
spark.wap.idnullWrite-Audit-Publish snapshot ID
spark.wap.branchnullWAP branch name
spark.sql.iceberg.compression-codecTable defaultWrite compression codec
spark.sql.iceberg.compression-levelTable defaultCompression level
spark.sql.iceberg.data-planning-modeAUTOData file scan planning (AUTO, LOCAL, DISTRIBUTED)
spark.sql.iceberg.delete-planning-modeAUTODelete file scan planning
spark.sql.iceberg.locality.enabledfalseReport locality for task placement
spark.sql.iceberg.executor-cache.enabledtrueEnable executor-side cache
spark.sql.iceberg.merge-schemafalseEnable schema evolution on write
spark.sql.iceberg.report-column-statstrueReport Puffin statistics to Spark CBO

Read Options

Options for DataFrame reads:
spark.read
    .option("snapshot-id", 10963874102873L)
    .table("catalog.db.table")
OptionDefaultDescription
snapshot-idLatestSnapshot ID to read
as-of-timestampLatestTimestamp in milliseconds
branch-Branch name to read
tag-Tag name to read
split-sizeTable propertyOverride split target size
lookbackTable propertyOverride planning lookback
file-open-costTable propertyOverride file open cost
vectorization-enabledTable propertyEnable vectorized reads
batch-sizeTable propertyVectorization batch size
stream-from-timestamp-Streaming start timestamp
streaming-max-files-per-micro-batchINT_MAXMax files per streaming batch
streaming-max-rows-per-micro-batchINT_MAXSoft max rows per batch

Write Options

Options for DataFrame writes:
df.writeTo("catalog.db.table")
    .option("write-format", "avro")
    .option("target-file-size-bytes", "268435456")
    .option("compression-codec", "zstd")
    .option("snapshot-property.key", "value")
    .append()
OptionDefaultDescription
write-formatTable defaultFile format (parquet, avro, orc)
target-file-size-bytesTable propertyTarget file size
compression-codecTable defaultCompression codec
compression-levelTable defaultCompression level
compression-strategyTable defaultORC compression strategy
distribution-modeSee WritesDistribution mode
fanout-enabledfalseEnable fanout writer
check-nullabilitytrueValidate field nullability
check-orderingtrueValidate column order
isolation-levelnullIsolation level (serializable, snapshot)
validate-from-snapshot-idnullBase snapshot for conflict detection
snapshot-property.<key>-Custom snapshot metadata
delete-granularityfileDelete granularity

Commit Metadata

Add custom metadata to snapshots:
import org.apache.iceberg.spark.CommitMetadata;

Map<String, String> properties = Maps.newHashMap();
properties.put("property_key", "property_value");

CommitMetadata.withCommitProperties(properties,
    () -> {
        spark.sql("DELETE FROM table WHERE id = 1");
        return 0;
    },
    RuntimeException.class);

Next Steps

Getting Started

Set up your first Iceberg table

Write Data

Configure write performance and distribution

Procedures

Use stored procedures for maintenance

Structured Streaming

Configure streaming reads and writes