Skip to main content

Overview

Iceberg supports Spark Structured Streaming for both reading and writing data. Streaming is built on Spark’s DataSourceV2 API.

Streaming Reads

Read incremental data starting from a historical timestamp:
val df = spark.readStream
    .format("iceberg")
    .option("stream-from-timestamp", streamStartTimestamp.toString)
    .load("database.table_name")
Iceberg only processes append snapshots. Overwrite and delete snapshots cause exceptions by default.

Skip Non-Append Snapshots

Ignore overwrite and delete snapshots:
val df = spark.readStream
    .format("iceberg")
    .option("streaming-skip-overwrite-snapshots", "true")
    .option("streaming-skip-delete-snapshots", "true")
    .load("database.table_name")

Limit Input Rate

Control micro-batch sizes:
val df = spark.readStream
    .format("iceberg")
    .option("streaming-max-files-per-micro-batch", "1")
    .load("database.table_name")
When both limits are set, the micro-batch stops at whichever limit is reached first.
Rate limiting also works with Trigger.AvailableNow to split one-time processing into multiple batches. Limits are ignored with the deprecated Trigger.Once.

Streaming Writes

Write streaming data to Iceberg tables:
data.writeStream
    .format("iceberg")
    .outputMode("append")
    .trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))
    .option("checkpointLocation", checkpointPath)
    .toTable("database.table_name")

Output Modes

Iceberg supports two output modes:
ModeBehaviorUse Case
appendAppends each micro-batchContinuous data ingestion
completeReplaces table contents each micro-batchAggregations, stateful operations
Iceberg does not support experimental continuous processing mode.

Create Table Before Streaming

Ensure the table exists before starting the streaming query:
CREATE TABLE database.table_name (
    id bigint,
    data string,
    ts timestamp
) USING iceberg
PARTITIONED BY (days(ts));

Partitioned Tables

Iceberg requires data to be sorted by partition value per task.

Fanout Writer

Avoid repartitioning overhead with the fanout writer:
data.writeStream
    .format("iceberg")
    .outputMode("append")
    .trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))
    .option("fanout-enabled", "true")
    .option("checkpointLocation", checkpointPath)
    .toTable("database.table_name")
The fanout writer keeps one file open per partition value until the write task finishes.
Do not use fanout writer for batch workloads. Explicit sorting is more efficient for batch writes.

Maintenance for Streaming Tables

Streaming writes create metadata quickly. Regular maintenance is essential.

Tune Commit Rate

Use appropriate trigger intervals:
data.writeStream
    .trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))
    // ...
Trigger intervals under 1 minute create excessive metadata.

Expire Old Snapshots

Remove old snapshots regularly:
CALL catalog.system.expire_snapshots(
    'database.table_name',
    older_than => TIMESTAMP '2024-01-01 00:00:00',
    retain_last => 100
);
By default, snapshots older than 5 days are expired. Adjust based on your retention requirements.

Compact Data Files

Streamming writes create many small files. Compact them regularly:
-- Compact small files
CALL catalog.system.rewrite_data_files(
    table => 'database.table_name',
    options => map('min-input-files', '5')
);

Rewrite Manifests

Optimize manifest files for better query performance:
CALL catalog.system.rewrite_manifests('database.table_name');

Complete Example

End-to-end streaming pipeline:
1

Create Table

CREATE TABLE streaming.events (
    event_id string,
    user_id bigint,
    event_type string,
    event_time timestamp
) USING iceberg
PARTITIONED BY (days(event_time));
2

Start Streaming Write

val streamingData = spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "events")
    .load()

val parsedData = streamingData
    .selectExpr("CAST(value AS STRING) as json")
    .select(from_json($"json", schema).as("data"))
    .select("data.*")

parsedData.writeStream
    .format("iceberg")
    .outputMode("append")
    .trigger(Trigger.ProcessingTime(2, TimeUnit.MINUTES))
    .option("fanout-enabled", "true")
    .option("checkpointLocation", "/tmp/checkpoint")
    .toTable("streaming.events")
3

Schedule Maintenance

Run these procedures on a schedule:
-- Daily: Expire old snapshots
CALL catalog.system.expire_snapshots('streaming.events');

-- Weekly: Compact files
CALL catalog.system.rewrite_data_files('streaming.events');

-- Weekly: Rewrite manifests
CALL catalog.system.rewrite_manifests('streaming.events');

Best Practices

  • Minimum: 1 minute for most workloads
  • Recommended: 2-5 minutes for high-throughput streams
  • High Latency OK: 10+ minutes for maximum efficiency
Store checkpoints in reliable storage:
.option("checkpointLocation", "s3://bucket/checkpoints/table_name")
Never delete checkpoints while a query is running.
For streaming tables:
  • Use time-based partitioning (hours, days)
  • Enable fanout writer for partitioned tables
  • Avoid high-cardinality partitions
Track these metrics:
  • Micro-batch processing time
  • Number of files per micro-batch
  • Table snapshot count
  • File sizes in recent partitions
Recommended schedule:
TaskFrequencyPurpose
Expire snapshotsDailyRemove old metadata
Compact filesWeeklyReduce small files
Rewrite manifestsWeeklyOptimize scan planning
Remove orphansMonthlyClean up unused files

Troubleshooting

Symptoms: Slow queries, high metadata overheadSolutions:
  • Increase trigger interval
  • Run rewrite_data_files more frequently
  • Increase target-file-size-bytes
Symptoms: Slow query planning, large metadata filesSolutions:
  • Expire snapshots more aggressively
  • Run rewrite_manifests
  • Reduce commit frequency
Symptoms: Streaming query fails on overwrite snapshotsSolutions:
.option("streaming-skip-overwrite-snapshots", "true")
Symptoms: Driver or executor OOMSolutions:
  • Reduce micro-batch size with streaming-max-files-per-micro-batch
  • Increase executor memory
  • Enable streaming-skip-delete-snapshots

Next Steps

Procedures

Learn about maintenance procedures

Writes

Understand write distribution modes

Configuration

Configure streaming options

Queries

Query streaming tables