Documentation Index Fetch the complete documentation index at: https://mintlify.com/apache/iceberg/llms.txt
Use this file to discover all available pages before exploring further.
Apache Iceberg provides maintenance operations for Flink to optimize table performance and manage storage efficiently.
Batch Mode Maintenance
Rewrite Files Action
Iceberg provides an API to rewrite small files into larger files by submitting Flink batch jobs. The behavior is the same as Spark’s rewriteDataFiles action.
import org.apache.iceberg.flink.actions.Actions;
TableLoader tableLoader = TableLoader . fromCatalog (
CatalogLoader . hive ( "my_catalog" , configuration, properties),
TableIdentifier . of ( "database" , "table" )
);
Table table = tableLoader . loadTable ();
RewriteDataFilesActionResult result = Actions . forTable (table)
. rewriteDataFiles ()
. execute ();
Streaming Mode Maintenance
Overview
The TableMaintenance API in Apache Iceberg empowers Flink jobs to execute maintenance tasks natively, either embedded within existing streaming pipelines or deployed as standalone Flink jobs. This eliminates dependencies on external systems like Spark.
Benefits:
Streamlined architecture
Reduced operational costs
Enhanced automation capabilities
No dependency on Spark infrastructure
Supported Features
ExpireSnapshots
Removes old snapshots and their files. Internally uses cleanExpiredFiles(true) when committing.
. add ( ExpireSnapshots . builder ()
. maxSnapshotAge ( Duration . ofDays ( 7 ))
. retainLast ( 10 )
. deleteBatchSize ( 1000 ))
RewriteDataFiles
Compacts small files to optimize file sizes. Supports partial progress commits and limiting maximum rewritten bytes per run.
. add ( RewriteDataFiles . builder ()
. targetFileSizeBytes ( 256 * 1024 * 1024 )
. minFileSizeBytes ( 32 * 1024 * 1024 )
. partialProgressEnabled ( true )
. partialProgressMaxCommits ( 5 ))
DeleteOrphanFiles
Removes files which are not referenced in any metadata files of an Iceberg table.
. add ( DeleteOrphanFiles . builder ()
. minAge ( Duration . ofDays ( 3 ))
. deleteBatchSize ( 1000 ))
Lock Management
The TriggerLockFactory is essential for coordinating maintenance tasks. It prevents concurrent maintenance operations on the same table.
Why locks are needed:
Prevents concurrent access conflicts
Ensures data consistency
Manages resources effectively
Avoids duplicate work even with a single job
JDBC Lock Factory
Uses a database table to manage distributed locks:
Map < String , String > jdbcProps = new HashMap <>();
jdbcProps . put ( "jdbc.user" , "flink" );
jdbcProps . put ( "jdbc.password" , "flinkpw" );
jdbcProps . put ( "flink-maintenance.lock.jdbc.init-lock-tables" , "true" );
TriggerLockFactory lockFactory = new JdbcLockFactory (
"jdbc:postgresql://localhost:5432/iceberg" ,
"catalog.db.table" ,
jdbcProps
);
ZooKeeper Lock Factory
Uses Apache ZooKeeper for distributed locks:
TriggerLockFactory lockFactory = new ZkLockFactory (
"localhost:2181" , // ZooKeeper connection string
"catalog.db.table" , // Lock ID
60000 , // sessionTimeoutMs
15000 , // connectionTimeoutMs
3000 , // baseSleepTimeMs
3 // maxRetries
);
Quick Start Example
The following example demonstrates automated maintenance for an Iceberg table:
StreamExecutionEnvironment env = StreamExecutionEnvironment . getExecutionEnvironment ();
TableLoader tableLoader = TableLoader . fromCatalog (
CatalogLoader . hive ( "my_catalog" , configuration, properties),
TableIdentifier . of ( "database" , "table" )
);
Map < String , String > jdbcProps = new HashMap <>();
jdbcProps . put ( "jdbc.user" , "flink" );
jdbcProps . put ( "jdbc.password" , "flinkpw" );
TriggerLockFactory lockFactory = new JdbcLockFactory (
"jdbc:postgresql://localhost:5432/iceberg" ,
"catalog.db.table" ,
jdbcProps
);
TableMaintenance . forTable (env, tableLoader, lockFactory)
. uidSuffix ( "my-maintenance-job" )
. rateLimit ( Duration . ofMinutes ( 10 ))
. lockCheckDelay ( Duration . ofSeconds ( 10 ))
. add ( ExpireSnapshots . builder ()
. scheduleOnCommitCount ( 10 )
. maxSnapshotAge ( Duration . ofMinutes ( 10 ))
. retainLast ( 5 )
. deleteBatchSize ( 5 )
. parallelism ( 8 ))
. add ( RewriteDataFiles . builder ()
. scheduleOnDataFileCount ( 10 )
. targetFileSizeBytes ( 128 * 1024 * 1024 )
. partialProgressEnabled ( true )
. partialProgressMaxCommits ( 10 ))
. append ();
env . execute ( "Table Maintenance Job" );
Configuration Options
TableMaintenance Builder
Method Description Default uidSuffix(String)Unique identifier suffix for the job Random UUID rateLimit(Duration)Minimum interval between task executions 60 seconds lockCheckDelay(Duration)Delay for checking lock availability 30 seconds parallelism(int)Default parallelism for maintenance tasks System default maxReadBack(int)Max snapshots to check during initialization 100
Common Task Options
Method Description Default Type scheduleOnCommitCount(int)Trigger after N commits No scheduling int scheduleOnDataFileCount(int)Trigger after N data files No scheduling int scheduleOnDataFileSize(long)Trigger after total data file size (bytes) No scheduling long scheduleOnPosDeleteFileCount(int)Trigger after N positional delete files No scheduling int scheduleOnPosDeleteRecordCount(long)Trigger after N positional delete records No scheduling long scheduleOnEqDeleteFileCount(int)Trigger after N equality delete files No scheduling int scheduleOnEqDeleteRecordCount(long)Trigger after N equality delete records No scheduling long scheduleOnInterval(Duration)Trigger after time interval No scheduling Duration
ExpireSnapshots Options
Method Description Default Type maxSnapshotAge(Duration)Maximum age of snapshots to retain 5 days Duration retainLast(int)Minimum number of snapshots to retain 1 int deleteBatchSize(int)Number of files to delete in each batch 1000 int planningWorkerPoolSize(int)Number of worker threads for planning Shared pool int cleanExpiredMetadata(boolean)Remove expired metadata files false boolean
RewriteDataFiles Options
Method Description Default Type targetFileSizeBytes(long)Target size for rewritten files Table property or 512MB long minFileSizeBytes(long)Minimum size for compaction 75% of target long maxFileSizeBytes(long)Maximum size for compaction 180% of target long minInputFiles(int)Minimum files to trigger rewrite 5 int deleteFileThreshold(int)Min delete-file count per data file Integer.MAX_VALUE int rewriteAll(boolean)Rewrite all data files false boolean maxFileGroupSizeBytes(long)Maximum total size of a file group 100GB long partialProgressEnabled(boolean)Enable partial progress commits false boolean partialProgressMaxCommits(int)Maximum commits for partial progress 10 int maxRewriteBytes(long)Maximum bytes to rewrite per execution Long.MAX_VALUE long
DeleteOrphanFiles Options
Method Description Default Type location(string)Location to start recursive listing Table location String usePrefixListing(boolean)Use prefix-based file listing False boolean minAge(Duration)Remove orphan files created before this timestamp 3 days ago Duration planningWorkerPoolSize(int)Number of worker threads for planning Shared pool int
Post-Commit Integration
Automatic execution of maintenance tasks after data is committed using addPostCommitTopology(...):
Map < String , String > flinkConf = new HashMap <>();
flinkConf . put ( FlinkWriteOptions . COMPACTION_ENABLE . key (), "true" );
flinkConf . put ( LockConfig . LOCK_TYPE_OPTION . key (), LockConfig . JdbcLockConfig . JDBC );
flinkConf . put ( LockConfig . JdbcLockConfig . JDBC_URI_OPTION . key (),
"jdbc:postgresql://localhost:5432/iceberg" );
flinkConf . put ( LockConfig . LOCK_ID_OPTION . key (), "catalog.db.table" );
IcebergSink . forRowData (dataStream)
. table (table)
. tableLoader (tableLoader)
. setAll (flinkConf)
. append ();
SQL Examples
Enable maintenance using SQL:
-- Enable Iceberg V2 Sink and compaction
SET 'table.exec.iceberg.use.v2.sink' = 'true' ;
SET 'compaction-enabled' = 'true' ;
-- Configure JDBC lock
SET 'flink-maintenance.lock.type' = 'jdbc' ;
SET 'flink-maintenance.lock.lock-id' = 'catalog.db.table' ;
SET 'flink-maintenance.lock.jdbc.uri' = 'jdbc:postgresql://localhost:5432/iceberg' ;
SET 'flink-maintenance.lock.jdbc.init-lock-tables' = 'true' ;
INSERT INTO db . tbl SELECT ...;
Or specify options in table DDL:
CREATE TABLE db .tbl (
id BIGINT ,
data STRING
) WITH (
'connector' = 'iceberg' ,
'catalog-name' = 'my_catalog' ,
'compaction-enabled' = 'true' ,
'flink-maintenance.lock.type' = 'jdbc' ,
'flink-maintenance.lock.lock-id' = 'catalog.db.table' ,
'flink-maintenance.lock.jdbc.uri' = 'jdbc:postgresql://localhost:5432/iceberg'
);
Best Practices
Resource Management
Use dedicated slot sharing groups for maintenance tasks
Set appropriate parallelism based on cluster resources
Enable checkpointing for fault tolerance
Scheduling Strategy
Avoid too frequent executions with rateLimit
Use scheduleOnCommitCount for write-heavy tables
Use scheduleOnDataFileCount for fine-grained control
Adjust deleteBatchSize based on storage performance
Enable partialProgressEnabled for large rewrite operations
Set reasonable maxRewriteBytes limits
Set appropriate maxFileGroupSizeBytes for parallel processing
Troubleshooting
OutOfMemoryError during file deletion
Cause: Large number of files being deleted in a single batch.
Solution: Reduce the batch size:
. deleteBatchSize ( 500 ) // Reduce from default 1000
Lock conflicts
Cause: Multiple jobs attempting maintenance simultaneously.
Solution: Increase lock check delay and rate limit:
. lockCheckDelay ( Duration . ofMinutes ( 1 ))
. rateLimit ( Duration . ofMinutes ( 10 ))
Slow rewrite operations
Cause: Too much data being rewritten in a single run.
Solution: Enable partial progress and limit bytes:
. partialProgressEnabled ( true )
. partialProgressMaxCommits ( 3 )
. maxRewriteBytes ( 1L * 1024 * 1024 * 1024 ) // 1GB
Complete Production Example
public class TableMaintenanceJob {
public static void main ( String [] args ) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment . getExecutionEnvironment ();
env . enableCheckpointing ( 60000 );
TableLoader tableLoader = TableLoader . fromCatalog (
CatalogLoader . hive ( "my_catalog" , configuration),
TableIdentifier . of ( "database" , "table" )
);
Map < String , String > jdbcProps = new HashMap <>();
jdbcProps . put ( "jdbc.user" , "flink" );
jdbcProps . put ( "jdbc.password" , "flinkpw" );
jdbcProps . put ( "flink-maintenance.lock.jdbc.init-lock-tables" , "true" );
TriggerLockFactory lockFactory = new JdbcLockFactory (
"jdbc:postgresql://localhost:5432/iceberg" ,
"catalog.db.table" ,
jdbcProps
);
TableMaintenance . forTable (env, tableLoader, lockFactory)
. uidSuffix ( "production-maintenance" )
. rateLimit ( Duration . ofMinutes ( 15 ))
. lockCheckDelay ( Duration . ofSeconds ( 30 ))
. parallelism ( 4 )
. add ( ExpireSnapshots . builder ()
. maxSnapshotAge ( Duration . ofDays ( 7 ))
. retainLast ( 10 ))
. add ( RewriteDataFiles . builder ()
. targetFileSizeBytes ( 256 * 1024 * 1024 )
. minFileSizeBytes ( 32 * 1024 * 1024 )
. scheduleOnDataFileCount ( 20 )
. partialProgressEnabled ( true )
. partialProgressMaxCommits ( 5 )
. maxRewriteBytes ( 2L * 1024 * 1024 * 1024 )
. parallelism ( 6 ))
. add ( DeleteOrphanFiles . builder ()
. minAge ( Duration . ofDays ( 5 )))
. append ();
env . execute ( "Iceberg Table Maintenance" );
}
}
Next Steps
Configuration Configure Flink for Iceberg
Writes Learn about writing data with Flink