Skip to main content

Overview

To write to Iceberg tables in Spark, first configure Spark catalogs. Some write operations require Iceberg SQL extensions.

Feature Support

FeatureSpark SupportNotes
SQL INSERT INTO✔️ All versionsRequires spark.sql.storeAssignmentPolicy=ANSI
SQL MERGE INTO✔️ All versionsRequires Iceberg Spark extensions
SQL INSERT OVERWRITE✔️ All versionsRequires ANSI assignment policy
SQL DELETE FROM✔️ All versionsRow-level deletes require extensions
SQL UPDATE✔️ All versionsRequires Iceberg Spark extensions
DataFrame writes✔️ All versionsDataFrameWriterV2 recommended
DataFrame CTAS/RTAS✔️ All versionsRequires DSv2 API
DataFrame MERGE✔️ Spark 4.0+Requires DSv2 API

Writing with SQL

INSERT INTO

Append new data to a table:
INSERT INTO prod.db.table 
VALUES (1, 'a'), (2, 'b');

MERGE INTO

Perform upserts and conditional updates:
MERGE INTO prod.db.target t
USING (SELECT * FROM updates) s
ON t.id = s.id
WHEN MATCHED THEN UPDATE SET t.data = s.data
WHEN NOT MATCHED THEN INSERT *;
MERGE INTO is recommended over INSERT OVERWRITE because Iceberg only replaces affected data files, and dynamic overwrite behavior can change if partitioning changes.
Only one source record can update any target row. Multiple matches will cause an error.

Snapshot Summary (Spark 4.1+)

MERGE INTO operations provide detailed metrics in snapshot summaries:
MetricDescription
spark.merge-into.num-target-rows-copiedRows copied without modification
spark.merge-into.num-target-rows-deletedRows deleted
spark.merge-into.num-target-rows-updatedRows updated
spark.merge-into.num-target-rows-insertedRows inserted
spark.merge-into.num-target-rows-matched-updatedRows updated by MATCHED clause
spark.merge-into.num-target-rows-not-matched-by-source-deletedRows deleted by NOT MATCHED BY SOURCE

INSERT OVERWRITE

Replace table or partition data:
-- Set dynamic mode (recommended)
SET spark.sql.sources.partitionOverwriteMode=dynamic;

INSERT OVERWRITE prod.my_app.logs
SELECT uuid, first(level), first(ts), first(message)
FROM prod.my_app.logs
WHERE cast(ts as date) = '2020-07-01'
GROUP BY uuid;
Dynamic overwrite mode is recommended for Iceberg tables. Static mode requires a PARTITION clause that can only reference table columns, not hidden partitions.

DELETE FROM

Remove rows matching a filter:
DELETE FROM prod.db.table
WHERE ts >= '2020-05-01' AND ts < '2020-06-01';
If the filter matches entire partitions, Iceberg performs a metadata-only delete. Otherwise, only affected data files are rewritten.

UPDATE

Update rows matching a filter:
UPDATE prod.db.table
SET c1 = 'update_c1', c2 = 'update_c2'
WHERE ts >= '2020-05-01' AND ts < '2020-06-01';

Writing to Branches

Write to specific branches for Write-Audit-Publish (WAP) workflows:
INSERT INTO prod.db.table.branch_audit 
VALUES (1, 'a'), (2, 'b');
The branch must exist before writing. Use CREATE BRANCH to create branches.

Writing with DataFrames

The DataFrameWriterV2 API is recommended for DataFrame writes:

Appending Data

val data: DataFrame = ...
data.writeTo("prod.db.table").append()

Overwriting Data

val data: DataFrame = ...
data.writeTo("prod.db.table").overwritePartitions()

Creating Tables

val data: DataFrame = ...
data.writeTo("prod.db.table").create()

Merging Data (Spark 4.0+)

val source: DataFrame = ...
source.mergeInto("target", $"source.id" === $"target.id")
    .whenMatched($"target.id" === 1)
    .updateAll()
    .whenMatched($"target.id" === 2)
    .delete()
    .whenNotMatched()
    .insertAll()
    .whenNotMatchedBySource($"target.id" === 3)
    .update(Map("status" -> lit("invalid")))
    .merge()

Writing to Branches

val data: DataFrame = ...
data.writeTo("prod.db.table.branch_audit").append()

data.writeTo("prod.db.table.branch_audit").overwritePartitions()
When using the v1 DataFrame API, use saveAsTable or insertInto instead of format("iceberg") to ensure proper catalog integration.

Schema Merge

Enable automatic schema evolution during writes:
1

Enable Schema Evolution

Configure the table to accept schema changes:
ALTER TABLE prod.db.sample SET TBLPROPERTIES (
  'write.spark.accept-any-schema'='true'
);
2

Enable mergeSchema Option

Enable the merge option in your write:
data.writeTo("prod.db.sample")
    .option("mergeSchema", "true")
    .append()
Schema merge behavior:
  • New columns in source are added to target (set to NULL for existing rows)
  • Missing columns in source are set to NULL (insert) or unchanged (update)

Distribution Modes

Control how Spark distributes data before writing:
ModeDescriptionUse Case
hash (default)Hash-based shuffle by partition valuesGeneral purpose, balanced performance
rangeRange-based shuffle with global sortingBetter read performance, higher write cost
noneNo automatic distributionManual control, requires sorting
ALTER TABLE prod.db.sample SET TBLPROPERTIES (
    'write.distribution-mode'='hash'
);
The hash distribution mode (new default in Iceberg 1.2.0) automatically sorts data by partition value, eliminating the need for manual sorting in most cases.

Controlling File Sizes

Target File Size

Control output file sizes with table properties:
ALTER TABLE prod.db.sample SET TBLPROPERTIES (
    'write.target-file-size-bytes'='536870912'  -- 512 MB
);

Spark Task Size

Adjust Spark’s Adaptive Query Execution (AQE) to control task sizes:
spark.sql.adaptive.advisoryPartitionSizeInBytes=134217728  -- 128 MB
The in-memory Spark row size is larger than on-disk columnar-compressed size. A larger AQE partition size than the target file size is typically needed.

Fanout Writers

For streaming workloads on partitioned tables:
data.writeTo("prod.db.sample")
    .option("fanout-enabled", "true")
    .append()
Fanout writers keep files open per partition value until the write task finishes. Use only for streaming; not recommended for batch writes.

Write Options

Common write options for DataFrameWriterV2:
OptionDescriptionDefault
write-formatFile format (parquet, avro, orc)Table default
target-file-size-bytesTarget output file sizeTable property
compression-codecCompression codecTable default
distribution-modeDistribution strategyhash
fanout-enabledEnable fanout writerfalse
isolation-levelIsolation level for overwritesnull
snapshot-property.<key>Custom snapshot metadata-

Example with Options

df.writeTo("catalog.db.table")
    .option("write-format", "avro")
    .option("target-file-size-bytes", "268435456")  // 256 MB
    .option("compression-codec", "zstd")
    .option("snapshot-property.key", "value")
    .append()

Next Steps

Procedures

Maintain tables with compaction and cleanup

Configuration

Configure write performance and behavior

Structured Streaming

Stream data with continuous writes

DDL Operations

Create and alter table schemas