Documentation Index Fetch the complete documentation index at: https://mintlify.com/apache/iceberg/llms.txt
Use this file to discover all available pages before exploring further.
Overview
The FileIO interface is Iceberg’s abstraction for reading and writing data and metadata files. It provides a flexible way to integrate with any storage system while maintaining Iceberg’s guarantees around metadata operations and table commits.
Why FileIO?
Iceberg’s design separates concerns:
Metadata Operations : Planning and committing changes (uses FileIO)
Data Operations : Reading and writing table data (uses processing engine + FileIO)
Physical Layout : Absolute paths allow flexibility in file organization
Key Benefits
No File Renaming : Iceberg never renames files, simplifying storage requirements
Absolute Paths : Metadata tracks full file paths, enabling flexible layouts
Minimal Requirements : Only need read, write, delete, and seek operations
Custom Storage : Support any storage backend with a FileIO implementation
FileIO Interface
The core interface is simple:
public interface FileIO extends Serializable , Closeable {
/**
* Get an InputFile to read bytes from a file
*/
InputFile newInputFile ( String path );
/**
* Get an OutputFile to write bytes to a file
*/
OutputFile newOutputFile ( String path );
/**
* Delete a file
*/
void deleteFile ( String path );
/**
* Initialize with catalog properties
*/
default void initialize ( Map < String , String > properties ) {}
}
Built-in Implementations
Iceberg provides FileIO implementations for common storage systems:
Implementation Storage Type Module S3FileIOAmazon S3 iceberg-awsGCSFileIOGoogle Cloud Storage iceberg-gcpADLSFileIOAzure Data Lake Storage iceberg-azureOSSFileIOAlibaba Cloud OSS iceberg-aliyunHadoopFileIOAny Hadoop FileSystem iceberg-coreResolvingFileIOMultiple storage types iceberg-core
Implementing Custom FileIO
Basic Implementation
package com.example.storage;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import java.util.Map;
public class CustomFileIO implements FileIO {
private CustomStorageClient client ;
// Must have no-arg constructor for dynamic loading
public CustomFileIO () {
}
@ Override
public void initialize ( Map < String , String > properties ) {
String endpoint = properties . get ( "custom.storage.endpoint" );
String accessKey = properties . get ( "custom.storage.access-key" );
String secretKey = properties . get ( "custom.storage.secret-key" );
this . client = new CustomStorageClient (endpoint, accessKey, secretKey);
}
@ Override
public InputFile newInputFile ( String path ) {
return new CustomInputFile (client, path);
}
@ Override
public OutputFile newOutputFile ( String path ) {
return new CustomOutputFile (client, path);
}
@ Override
public void deleteFile ( String path ) {
try {
client . delete (path);
} catch ( IOException e ) {
throw new RuntimeIOException (e, "Failed to delete: %s" , path);
}
}
@ Override
public void close () {
if (client != null ) {
client . close ();
}
}
}
public class CustomInputFile implements InputFile {
private final CustomStorageClient client ;
private final String path ;
private Long length ;
public CustomInputFile ( CustomStorageClient client , String path ) {
this . client = client;
this . path = path;
}
@ Override
public long getLength () {
if (length == null ) {
length = client . getObjectMetadata (path). getContentLength ();
}
return length;
}
@ Override
public SeekableInputStream newStream () {
return new CustomSeekableInputStream (client, path);
}
@ Override
public String location () {
return path;
}
@ Override
public boolean exists () {
return client . exists (path);
}
}
Implementing OutputFile
public class CustomOutputFile implements OutputFile {
private final CustomStorageClient client ;
private final String path ;
public CustomOutputFile ( CustomStorageClient client , String path ) {
this . client = client;
this . path = path;
}
@ Override
public PositionOutputStream create () {
return new CustomPositionOutputStream (client, path);
}
@ Override
public PositionOutputStream createOrOverwrite () {
// Delete if exists, then create
if ( client . exists (path)) {
client . delete (path);
}
return create ();
}
@ Override
public String location () {
return path;
}
@ Override
public InputFile toInputFile () {
return new CustomInputFile (client, path);
}
}
public class CustomSeekableInputStream extends SeekableInputStream {
private final CustomStorageClient client ;
private final String path ;
private InputStream stream ;
private long pos = 0 ;
private long markPos = 0 ;
public CustomSeekableInputStream ( CustomStorageClient client , String path ) {
this . client = client;
this . path = path;
this . stream = client . getObject (path);
}
@ Override
public long getPos () throws IOException {
return pos;
}
@ Override
public void seek ( long newPos ) throws IOException {
if (newPos == pos) {
return ;
}
// Close existing stream and open new one at position
stream . close ();
stream = client . getObject (path, newPos);
pos = newPos;
}
@ Override
public int read () throws IOException {
int b = stream . read ();
if (b >= 0 ) {
pos ++ ;
}
return b;
}
@ Override
public int read ( byte [] b , int off , int len ) throws IOException {
int bytesRead = stream . read (b, off, len);
if (bytesRead > 0 ) {
pos += bytesRead;
}
return bytesRead;
}
@ Override
public void close () throws IOException {
stream . close ();
}
}
Configuration
Loading via Catalog Property
spark-sql \
--conf spark.sql.catalog.my_catalog.io-impl=com.example.storage.CustomFileIO \
--conf spark.sql.catalog.my_catalog.custom.storage.endpoint=https://storage.example.com \
--conf spark.sql.catalog.my_catalog.custom.storage.access-key=ACCESS_KEY \
--conf spark.sql.catalog.my_catalog.custom.storage.secret-key=SECRET_KEY
Loading via Java API
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.catalog.Catalog;
import java.util.HashMap;
import java.util.Map;
Map < String , String > properties = new HashMap <>();
properties . put ( "io-impl" , "com.example.storage.CustomFileIO" );
properties . put ( "custom.storage.endpoint" , "https://storage.example.com" );
properties . put ( "custom.storage.access-key" , "ACCESS_KEY" );
properties . put ( "custom.storage.secret-key" , "SECRET_KEY" );
properties . put ( "warehouse" , "custom://my-bucket/warehouse" );
Catalog catalog = CatalogUtil . buildIcebergCatalog (
"my_catalog" ,
properties,
hadoopConf
);
Advanced Features
Hadoop Configuration Access
If your FileIO needs Hadoop configuration:
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
public class CustomFileIO implements FileIO , Configurable {
private Configuration conf ;
@ Override
public void setConf ( Configuration conf ) {
this . conf = conf;
}
@ Override
public Configuration getConf () {
return conf;
}
@ Override
public void initialize ( Map < String , String > properties ) {
// Can access Hadoop configuration here
String value = conf . get ( "hadoop.property.name" );
}
}
Bulk Delete Operations
Optimize deletes with bulk operations:
public class CustomFileIO implements FileIO , SupportsBulkOperations {
@ Override
public void deleteFiles ( Iterable < String > pathsToDelete ) throws IOException {
List < String > batch = new ArrayList <>();
for ( String path : pathsToDelete) {
batch . add (path);
if ( batch . size () >= 1000 ) {
client . bulkDelete (batch);
batch . clear ();
}
}
if ( ! batch . isEmpty ()) {
client . bulkDelete (batch);
}
}
}
Prefix Operations
Implement efficient prefix listing:
public class CustomFileIO implements FileIO , SupportsPrefixOperations {
@ Override
public Iterable < FileInfo > listPrefix ( String prefix ) {
return client . listObjects (prefix). stream ()
. map (obj -> new FileInfo ( obj . getKey (), obj . getSize (), obj . getLastModified ()))
. collect ( Collectors . toList ());
}
@ Override
public void deletePrefix ( String prefix ) {
List < String > toDelete = client . listObjects (prefix). stream ()
. map (StorageObject :: getKey)
. collect ( Collectors . toList ());
deleteFiles (toDelete);
}
}
Testing Your FileIO
Unit Tests
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.io.InputFile;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions. * ;
public class CustomFileIOTest {
@ Test
public void testWriteAndRead () throws IOException {
FileIO io = new CustomFileIO ();
io . initialize ( testProperties ());
String path = "test://bucket/file.txt" ;
String content = "Hello, Iceberg!" ;
// Write
OutputFile out = io . newOutputFile (path);
try ( PositionOutputStream stream = out . create ()) {
stream . write ( content . getBytes ( StandardCharsets . UTF_8 ));
}
// Read
InputFile in = io . newInputFile (path);
assertEquals ( content . length (), in . getLength ());
try ( SeekableInputStream stream = in . newStream ()) {
byte [] bytes = new byte [ content . length ()];
stream . readFully (bytes);
assertEquals (content, new String (bytes, StandardCharsets . UTF_8 ));
}
// Delete
io . deleteFile (path);
assertFalse ( io . newInputFile (path). exists ());
}
@ Test
public void testSeek () throws IOException {
FileIO io = new CustomFileIO ();
io . initialize ( testProperties ());
String path = "test://bucket/seekable.txt" ;
byte [] content = "0123456789" . getBytes ( StandardCharsets . UTF_8 );
// Write
try ( PositionOutputStream stream = io . newOutputFile (path). create ()) {
stream . write (content);
}
// Read with seek
try ( SeekableInputStream stream = io . newInputFile (path). newStream ()) {
stream . seek ( 5 );
assertEquals ( '5' , stream . read ());
stream . seek ( 0 );
assertEquals ( '0' , stream . read ());
stream . seek ( 9 );
assertEquals ( '9' , stream . read ());
}
}
}
Reuse HTTP connections for better performance: private static final CustomStorageClient sharedClient =
new CustomStorageClient (pooledConnectionManager);
Implement Bulk Operations
Batch deletes and list operations when possible to reduce API calls.
Use buffered streams for better throughput: @ Override
public SeekableInputStream newStream () {
return new BufferedSeekableInputStream (
new CustomSeekableInputStream (client, path),
8192 // 8KB buffer
);
}
Best Practices
Thread Safety : Ensure FileIO instances are thread-safe or document thread safety requirements
Resource Cleanup : Always close streams and clients in close() method
Error Handling : Wrap storage exceptions in Iceberg exceptions (RuntimeIOException)
Retry Logic : Implement retries for transient failures
Metrics : Add instrumentation for monitoring (optional)
Documentation : Document custom properties and configuration requirements
Common Use Cases
Cloud Storage Integration
Implement FileIO for cloud storage not natively supported:
Oracle Cloud Infrastructure (OCI) Object Storage
Cloudflare R2
Wasabi
MinIO
Ceph RADOS Gateway
On-Premises Storage
Integrate with enterprise storage systems:
NetApp StorageGRID
Pure Storage FlashBlade
IBM Cloud Object Storage
Scality RING
Custom Protocols
Support custom URI schemes:
// Support custom:// protocol
if ( path . startsWith ( "custom://" )) {
return new CustomInputFile (path);
}
Debugging
Enable debug logging:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CustomFileIO implements FileIO {
private static final Logger LOG = LoggerFactory . getLogger ( CustomFileIO . class );
@ Override
public InputFile newInputFile ( String path ) {
LOG . debug ( "Creating InputFile for path: {}" , path);
return new CustomInputFile (client, path);
}
}
Configure logging level:
--conf spark.driver.extraJavaOptions="-Dlog4j.logger.com.example.storage=DEBUG"
Next Steps
AWS S3 FileIO See production FileIO implementation for S3
Custom Catalog Build custom catalog with your FileIO