Overview
Apache Iceberg provides a flexible architecture for building custom catalogs that integrate with your organization’s metadata infrastructure. This guide covers implementing custom components for complete control over metadata management.
Custom TableOperations
The TableOperations interface handles low-level metadata read/write operations. Extend BaseMetastoreTableOperations to implement your custom metadata backend:
class CustomTableOperations extends BaseMetastoreTableOperations {
private String dbName ;
private String tableName ;
private Configuration conf ;
private FileIO fileIO ;
protected CustomTableOperations ( Configuration conf , String dbName , String tableName ) {
this . conf = conf;
this . dbName = dbName;
this . tableName = tableName;
}
// Read metadata location from your custom metastore
@ Override
public void doRefresh () {
// Example: Query your custom service for metadata location
String metadataLocation = CustomService . getMetadataForTable (conf, dbName, tableName);
// Update from the metadata file location
refreshFromMetadataLocation (metadataLocation);
}
// Atomically commit new metadata
@ Override
public void doCommit ( TableMetadata base , TableMetadata metadata ) {
String oldMetadataLocation = base . location ();
// Write new metadata file using helper method
String newMetadataLocation = writeNewMetadata (metadata, currentVersion () + 1 );
// Atomically update the pointer in your metastore
CustomService . updateMetadataLocation (
dbName, tableName, oldMetadataLocation, newMetadataLocation
);
}
// Provide FileIO for reading/writing metadata files
@ Override
public FileIO io () {
if (fileIO == null ) {
fileIO = new HadoopFileIO (conf);
}
return fileIO;
}
}
Key Methods
Method Purpose doRefresh()Retrieve current metadata location from your metastore doCommit()Atomically update metadata location (must implement CAS) io()Provide FileIO implementation for metadata files current()Return current table metadata (inherited) refresh()Refresh metadata from metastore (inherited)
The doCommit() method must implement atomic compare-and-swap (CAS) semantics to prevent concurrent modification issues.
Custom Catalog Implementation
Extend BaseMetastoreCatalog to create a full catalog implementation:
public class CustomCatalog extends BaseMetastoreCatalog {
private Configuration configuration ;
// Must have a no-arg constructor for dynamic loading
public CustomCatalog () {
}
public CustomCatalog ( Configuration configuration ) {
this . configuration = configuration;
}
@ Override
protected TableOperations newTableOps ( TableIdentifier tableIdentifier ) {
String dbName = tableIdentifier . namespace (). level ( 0 );
String tableName = tableIdentifier . name ();
return new CustomTableOperations (configuration, dbName, tableName);
}
@ Override
protected String defaultWarehouseLocation ( TableIdentifier tableIdentifier ) {
String warehouseLocation = configuration . get ( "custom.iceberg.warehouse.location" );
if (warehouseLocation == null ) {
throw new RuntimeException ( "custom.iceberg.warehouse.location not set!" );
}
return String . format (
"%s/%s.db/%s" ,
warehouseLocation,
tableIdentifier . namespace (). levels ()[ 0 ],
tableIdentifier . name ()
);
}
@ Override
public boolean dropTable ( TableIdentifier identifier , boolean purge ) {
// Implement table deletion in your metastore
return CustomService . deleteTable (
identifier . namespace (). level ( 0 ),
identifier . name ()
);
}
@ Override
public void renameTable ( TableIdentifier from , TableIdentifier to ) {
Preconditions . checkArgument (
from . namespace (). level ( 0 ). equals ( to . namespace (). level ( 0 )),
"Cannot move table between databases"
);
CustomService . renameTable (
from . namespace (). level ( 0 ),
from . name (),
to . name ()
);
}
@ Override
public void initialize ( String name , Map < String , String > properties ) {
// Initialize catalog with properties from compute engine
this . configuration = new Configuration ();
properties . forEach ((key, value) -> configuration . set (key, value));
}
}
Loading Custom Catalogs
Spark
Specify catalog-impl property to load your custom catalog:
spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:{icebergVersion} \
--conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.my_catalog.catalog-impl=com.example.CustomCatalog \
--conf spark.sql.catalog.my_catalog.warehouse=s3://my-bucket/warehouse \
--conf spark.sql.catalog.my_catalog.custom.property=value
Flink
CREATE CATALOG my_catalog WITH (
'type' = 'iceberg' ,
'catalog-impl' = 'com.example.CustomCatalog' ,
'warehouse' = 's3://my-bucket/warehouse' ,
'custom.property' = 'value'
);
Java API
Map < String , String > properties = new HashMap <>();
properties . put ( CatalogProperties . CATALOG_IMPL , "com.example.CustomCatalog" );
properties . put ( CatalogProperties . WAREHOUSE_LOCATION , "s3://my-bucket/warehouse" );
properties . put ( "custom.property" , "value" );
Configuration hadoopConf = new Configuration ();
Catalog catalog = CatalogUtil . buildIcebergCatalog (
"custom_catalog" ,
properties,
hadoopConf
);
Custom FileIO Implementation
Implement FileIO to customize how Iceberg reads and writes data files:
public class CustomFileIO implements FileIO {
// Must have a no-arg constructor
public CustomFileIO () {
}
@ Override
public InputFile newInputFile ( String path ) {
return new CustomInputFile (path);
}
@ Override
public OutputFile newOutputFile ( String path ) {
return new CustomOutputFile (path);
}
@ Override
public void deleteFile ( String path ) {
Path toDelete = new Path (path);
FileSystem fs = Util . getFs (toDelete);
try {
fs . delete (toDelete, false /* not recursive */ );
} catch ( IOException e ) {
throw new RuntimeIOException (e, "Failed to delete file: %s" , path);
}
}
@ Override
public void initialize ( Map < String , String > properties ) {
// Initialize with catalog properties
}
}
Using Custom FileIO
Specify io-impl in catalog properties:
spark-sql \
--conf spark.sql.catalog.my_catalog.io-impl=com.example.CustomFileIO
If your FileIO needs Hadoop configuration, implement org.apache.hadoop.conf.Configurable.
Custom LocationProvider
Control where Iceberg writes data files by implementing LocationProvider:
public class CustomLocationProvider implements LocationProvider {
private String tableLocation ;
// Must have this constructor or a no-arg constructor
public CustomLocationProvider ( String tableLocation , Map < String , String > properties ) {
this . tableLocation = tableLocation;
}
@ Override
public String newDataLocation ( String filename ) {
// Generate custom file path
return String . format (
"%s/%s/%s" ,
tableLocation,
UUID . randomUUID (). toString (),
filename
);
}
@ Override
public String newDataLocation ( PartitionSpec spec , StructLike partitionData , String filename ) {
// Generate custom partitioned file path
return newDataLocation (filename);
}
}
Setting LocationProvider
Set at table creation time:
CREATE TABLE hive . default .my_table (
id bigint ,
data string,
category string
)
USING iceberg
OPTIONS (
'write.location-provider.impl' = 'com.example.CustomLocationProvider'
)
PARTITIONED BY (category);
Hadoop Configuration Access
If your custom components need Hadoop configuration:
public class CustomCatalog extends BaseMetastoreCatalog
implements Configurable {
private Configuration conf ;
@ Override
public void setConf ( Configuration conf ) {
this . conf = conf;
}
@ Override
public Configuration getConf () {
return conf;
}
}
Best Practices
Implement Atomic Operations
Your doCommit() method must use compare-and-swap semantics to prevent lost updates. Use optimistic locking, conditional updates, or distributed locks.
Multiple writers may attempt to commit simultaneously. Your metastore must handle concurrent commits gracefully with proper retry logic.
Cache metadata locations when possible to reduce metastore load. Implement the invalidateTable() method to clear caches.
Support Namespace Operations
If your catalog supports namespaces, implement the SupportsNamespaces interface for better organization.
Complete Example
For a complete working example, see the reference implementations:
Next Steps
JDBC Catalog See a production-ready custom catalog implementation
Custom FileIO Deep dive into FileIO implementations