Skip to main content

Overview

Apache Iceberg provides a flexible architecture for building custom catalogs that integrate with your organization’s metadata infrastructure. This guide covers implementing custom components for complete control over metadata management.
To work with encrypted tables, custom catalogs must address specific security requirements.

Custom TableOperations

The TableOperations interface handles low-level metadata read/write operations. Extend BaseMetastoreTableOperations to implement your custom metadata backend:
class CustomTableOperations extends BaseMetastoreTableOperations {
  private String dbName;
  private String tableName;
  private Configuration conf;
  private FileIO fileIO;

  protected CustomTableOperations(Configuration conf, String dbName, String tableName) {
    this.conf = conf;
    this.dbName = dbName;
    this.tableName = tableName;
  }

  // Read metadata location from your custom metastore
  @Override
  public void doRefresh() {
    // Example: Query your custom service for metadata location
    String metadataLocation = CustomService.getMetadataForTable(conf, dbName, tableName);
    
    // Update from the metadata file location
    refreshFromMetadataLocation(metadataLocation);
  }

  // Atomically commit new metadata
  @Override
  public void doCommit(TableMetadata base, TableMetadata metadata) {
    String oldMetadataLocation = base.location();
    
    // Write new metadata file using helper method
    String newMetadataLocation = writeNewMetadata(metadata, currentVersion() + 1);
    
    // Atomically update the pointer in your metastore
    CustomService.updateMetadataLocation(
      dbName, tableName, oldMetadataLocation, newMetadataLocation
    );
  }

  // Provide FileIO for reading/writing metadata files
  @Override
  public FileIO io() {
    if (fileIO == null) {
      fileIO = new HadoopFileIO(conf);
    }
    return fileIO;
  }
}

Key Methods

MethodPurpose
doRefresh()Retrieve current metadata location from your metastore
doCommit()Atomically update metadata location (must implement CAS)
io()Provide FileIO implementation for metadata files
current()Return current table metadata (inherited)
refresh()Refresh metadata from metastore (inherited)
The doCommit() method must implement atomic compare-and-swap (CAS) semantics to prevent concurrent modification issues.

Custom Catalog Implementation

Extend BaseMetastoreCatalog to create a full catalog implementation:
public class CustomCatalog extends BaseMetastoreCatalog {
  private Configuration configuration;

  // Must have a no-arg constructor for dynamic loading
  public CustomCatalog() {
  }

  public CustomCatalog(Configuration configuration) {
    this.configuration = configuration;
  }

  @Override
  protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
    String dbName = tableIdentifier.namespace().level(0);
    String tableName = tableIdentifier.name();
    return new CustomTableOperations(configuration, dbName, tableName);
  }

  @Override
  protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) {
    String warehouseLocation = configuration.get("custom.iceberg.warehouse.location");
    
    if (warehouseLocation == null) {
      throw new RuntimeException("custom.iceberg.warehouse.location not set!");
    }

    return String.format(
      "%s/%s.db/%s",
      warehouseLocation,
      tableIdentifier.namespace().levels()[0],
      tableIdentifier.name()
    );
  }

  @Override
  public boolean dropTable(TableIdentifier identifier, boolean purge) {
    // Implement table deletion in your metastore
    return CustomService.deleteTable(
      identifier.namespace().level(0),
      identifier.name()
    );
  }

  @Override
  public void renameTable(TableIdentifier from, TableIdentifier to) {
    Preconditions.checkArgument(
      from.namespace().level(0).equals(to.namespace().level(0)),
      "Cannot move table between databases"
    );
    
    CustomService.renameTable(
      from.namespace().level(0),
      from.name(),
      to.name()
    );
  }

  @Override
  public void initialize(String name, Map<String, String> properties) {
    // Initialize catalog with properties from compute engine
    this.configuration = new Configuration();
    properties.forEach((key, value) -> configuration.set(key, value));
  }
}

Loading Custom Catalogs

Spark

Specify catalog-impl property to load your custom catalog:
spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:{icebergVersion} \
  --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \
  --conf spark.sql.catalog.my_catalog.catalog-impl=com.example.CustomCatalog \
  --conf spark.sql.catalog.my_catalog.warehouse=s3://my-bucket/warehouse \
  --conf spark.sql.catalog.my_catalog.custom.property=value
CREATE CATALOG my_catalog WITH (
  'type' = 'iceberg',
  'catalog-impl' = 'com.example.CustomCatalog',
  'warehouse' = 's3://my-bucket/warehouse',
  'custom.property' = 'value'
);

Java API

Map<String, String> properties = new HashMap<>();
properties.put(CatalogProperties.CATALOG_IMPL, "com.example.CustomCatalog");
properties.put(CatalogProperties.WAREHOUSE_LOCATION, "s3://my-bucket/warehouse");
properties.put("custom.property", "value");

Configuration hadoopConf = new Configuration();
Catalog catalog = CatalogUtil.buildIcebergCatalog(
  "custom_catalog",
  properties,
  hadoopConf
);

Custom FileIO Implementation

Implement FileIO to customize how Iceberg reads and writes data files:
public class CustomFileIO implements FileIO {
  
  // Must have a no-arg constructor
  public CustomFileIO() {
  }

  @Override
  public InputFile newInputFile(String path) {
    return new CustomInputFile(path);
  }

  @Override
  public OutputFile newOutputFile(String path) {
    return new CustomOutputFile(path);
  }

  @Override
  public void deleteFile(String path) {
    Path toDelete = new Path(path);
    FileSystem fs = Util.getFs(toDelete);
    try {
      fs.delete(toDelete, false /* not recursive */);
    } catch (IOException e) {
      throw new RuntimeIOException(e, "Failed to delete file: %s", path);
    }
  }

  @Override
  public void initialize(Map<String, String> properties) {
    // Initialize with catalog properties
  }
}

Using Custom FileIO

Specify io-impl in catalog properties:
spark-sql \
  --conf spark.sql.catalog.my_catalog.io-impl=com.example.CustomFileIO
If your FileIO needs Hadoop configuration, implement org.apache.hadoop.conf.Configurable.

Custom LocationProvider

Control where Iceberg writes data files by implementing LocationProvider:
public class CustomLocationProvider implements LocationProvider {
  private String tableLocation;

  // Must have this constructor or a no-arg constructor
  public CustomLocationProvider(String tableLocation, Map<String, String> properties) {
    this.tableLocation = tableLocation;
  }

  @Override
  public String newDataLocation(String filename) {
    // Generate custom file path
    return String.format(
      "%s/%s/%s",
      tableLocation,
      UUID.randomUUID().toString(),
      filename
    );
  }

  @Override
  public String newDataLocation(PartitionSpec spec, StructLike partitionData, String filename) {
    // Generate custom partitioned file path
    return newDataLocation(filename);
  }
}

Setting LocationProvider

Set at table creation time:
CREATE TABLE hive.default.my_table (
  id bigint,
  data string,
  category string
)
USING iceberg
OPTIONS (
  'write.location-provider.impl' = 'com.example.CustomLocationProvider'
)
PARTITIONED BY (category);

Hadoop Configuration Access

If your custom components need Hadoop configuration:
public class CustomCatalog extends BaseMetastoreCatalog 
    implements Configurable {
  
  private Configuration conf;

  @Override
  public void setConf(Configuration conf) {
    this.conf = conf;
  }

  @Override
  public Configuration getConf() {
    return conf;
  }
}

Best Practices

Your doCommit() method must use compare-and-swap semantics to prevent lost updates. Use optimistic locking, conditional updates, or distributed locks.
Multiple writers may attempt to commit simultaneously. Your metastore must handle concurrent commits gracefully with proper retry logic.
Cache metadata locations when possible to reduce metastore load. Implement the invalidateTable() method to clear caches.
If your catalog supports namespaces, implement the SupportsNamespaces interface for better organization.

Complete Example

For a complete working example, see the reference implementations:

Next Steps

JDBC Catalog

See a production-ready custom catalog implementation

Custom FileIO

Deep dive into FileIO implementations