Batch Mode Maintenance
Rewrite Files Action
Iceberg provides an API to rewrite small files into larger files by submitting Flink batch jobs. The behavior is the same as Spark’srewriteDataFiles action.
Streaming Mode Maintenance
Overview
TheTableMaintenance API in Apache Iceberg empowers Flink jobs to execute maintenance tasks natively, either embedded within existing streaming pipelines or deployed as standalone Flink jobs. This eliminates dependencies on external systems like Spark.
Benefits:
- Streamlined architecture
- Reduced operational costs
- Enhanced automation capabilities
- No dependency on Spark infrastructure
Supported Features
ExpireSnapshots
Removes old snapshots and their files. Internally usescleanExpiredFiles(true) when committing.
RewriteDataFiles
Compacts small files to optimize file sizes. Supports partial progress commits and limiting maximum rewritten bytes per run.DeleteOrphanFiles
Removes files which are not referenced in any metadata files of an Iceberg table.Lock Management
TheTriggerLockFactory is essential for coordinating maintenance tasks. It prevents concurrent maintenance operations on the same table.
JDBC Lock Factory
Uses a database table to manage distributed locks:ZooKeeper Lock Factory
Uses Apache ZooKeeper for distributed locks:Quick Start Example
The following example demonstrates automated maintenance for an Iceberg table:Configuration Options
TableMaintenance Builder
| Method | Description | Default |
|---|---|---|
uidSuffix(String) | Unique identifier suffix for the job | Random UUID |
rateLimit(Duration) | Minimum interval between task executions | 60 seconds |
lockCheckDelay(Duration) | Delay for checking lock availability | 30 seconds |
parallelism(int) | Default parallelism for maintenance tasks | System default |
maxReadBack(int) | Max snapshots to check during initialization | 100 |
Common Task Options
| Method | Description | Default | Type |
|---|---|---|---|
scheduleOnCommitCount(int) | Trigger after N commits | No scheduling | int |
scheduleOnDataFileCount(int) | Trigger after N data files | No scheduling | int |
scheduleOnDataFileSize(long) | Trigger after total data file size (bytes) | No scheduling | long |
scheduleOnPosDeleteFileCount(int) | Trigger after N positional delete files | No scheduling | int |
scheduleOnPosDeleteRecordCount(long) | Trigger after N positional delete records | No scheduling | long |
scheduleOnEqDeleteFileCount(int) | Trigger after N equality delete files | No scheduling | int |
scheduleOnEqDeleteRecordCount(long) | Trigger after N equality delete records | No scheduling | long |
scheduleOnInterval(Duration) | Trigger after time interval | No scheduling | Duration |
ExpireSnapshots Options
| Method | Description | Default | Type |
|---|---|---|---|
maxSnapshotAge(Duration) | Maximum age of snapshots to retain | 5 days | Duration |
retainLast(int) | Minimum number of snapshots to retain | 1 | int |
deleteBatchSize(int) | Number of files to delete in each batch | 1000 | int |
planningWorkerPoolSize(int) | Number of worker threads for planning | Shared pool | int |
cleanExpiredMetadata(boolean) | Remove expired metadata files | false | boolean |
RewriteDataFiles Options
| Method | Description | Default | Type |
|---|---|---|---|
targetFileSizeBytes(long) | Target size for rewritten files | Table property or 512MB | long |
minFileSizeBytes(long) | Minimum size for compaction | 75% of target | long |
maxFileSizeBytes(long) | Maximum size for compaction | 180% of target | long |
minInputFiles(int) | Minimum files to trigger rewrite | 5 | int |
deleteFileThreshold(int) | Min delete-file count per data file | Integer.MAX_VALUE | int |
rewriteAll(boolean) | Rewrite all data files | false | boolean |
maxFileGroupSizeBytes(long) | Maximum total size of a file group | 100GB | long |
partialProgressEnabled(boolean) | Enable partial progress commits | false | boolean |
partialProgressMaxCommits(int) | Maximum commits for partial progress | 10 | int |
maxRewriteBytes(long) | Maximum bytes to rewrite per execution | Long.MAX_VALUE | long |
DeleteOrphanFiles Options
| Method | Description | Default | Type |
|---|---|---|---|
location(string) | Location to start recursive listing | Table location | String |
usePrefixListing(boolean) | Use prefix-based file listing | False | boolean |
minAge(Duration) | Remove orphan files created before this timestamp | 3 days ago | Duration |
planningWorkerPoolSize(int) | Number of worker threads for planning | Shared pool | int |
Post-Commit Integration
Automatic execution of maintenance tasks after data is committed usingaddPostCommitTopology(...):
SQL Examples
Enable maintenance using SQL:Best Practices
Resource Management
- Use dedicated slot sharing groups for maintenance tasks
- Set appropriate parallelism based on cluster resources
- Enable checkpointing for fault tolerance
Scheduling Strategy
- Avoid too frequent executions with
rateLimit - Use
scheduleOnCommitCountfor write-heavy tables - Use
scheduleOnDataFileCountfor fine-grained control
Performance Tuning
- Adjust
deleteBatchSizebased on storage performance - Enable
partialProgressEnabledfor large rewrite operations - Set reasonable
maxRewriteByteslimits - Set appropriate
maxFileGroupSizeBytesfor parallel processing
Troubleshooting
OutOfMemoryError during file deletion
Cause: Large number of files being deleted in a single batch. Solution: Reduce the batch size:Lock conflicts
Cause: Multiple jobs attempting maintenance simultaneously. Solution: Increase lock check delay and rate limit:Slow rewrite operations
Cause: Too much data being rewritten in a single run. Solution: Enable partial progress and limit bytes:Complete Production Example
Next Steps
Configuration
Configure Flink for Iceberg
Writes
Learn about writing data with Flink