Skip to main content

Java API Quickstart

This guide provides a comprehensive introduction to the Apache Iceberg Java API. You’ll learn how to programmatically create and manage Iceberg tables, work with schemas and partitions, and perform read and write operations.
The Java API is the reference implementation for Iceberg. All code examples in this guide use real patterns from the Iceberg source code.

Dependencies

Add the required dependencies to your project:
<dependencies>
  <!-- Core Iceberg API -->
  <dependency>
    <groupId>org.apache.iceberg</groupId>
    <artifactId>iceberg-core</artifactId>
    <version>1.7.1</version>
  </dependency>
  
  <!-- For Parquet file format -->
  <dependency>
    <groupId>org.apache.iceberg</groupId>
    <artifactId>iceberg-parquet</artifactId>
    <version>1.7.1</version>
  </dependency>
  
  <!-- For Hive Metastore catalog -->
  <dependency>
    <groupId>org.apache.iceberg</groupId>
    <artifactId>iceberg-hive-metastore</artifactId>
    <version>1.7.1</version>
  </dependency>
  
  <!-- For data operations -->
  <dependency>
    <groupId>org.apache.iceberg</groupId>
    <artifactId>iceberg-data</artifactId>
    <version>1.7.1</version>
  </dependency>
</dependencies>

Working with Catalogs

Catalogs are the entry point for working with Iceberg tables. They provide methods to create, load, rename, and drop tables.

Hive Catalog

The Hive catalog uses a Hive Metastore to track Iceberg tables:
import org.apache.iceberg.hive.HiveCatalog;
import java.util.HashMap;
import java.util.Map;

HiveCatalog catalog = new HiveCatalog();

// Configure the catalog
Map<String, String> properties = new HashMap<>();
properties.put("warehouse", "/path/to/warehouse");
properties.put("uri", "thrift://localhost:9083");

catalog.initialize("hive", properties);
You can optionally use Spark’s Hadoop configuration:
catalog.setConf(spark.sparkContext().hadoopConfiguration());

Hadoop Catalog

The Hadoop catalog is a file-based catalog suitable for HDFS and S3:
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.hadoop.HadoopCatalog;

Configuration conf = new Configuration();
String warehousePath = "hdfs://host:8020/warehouse";
HadoopCatalog catalog = new HadoopCatalog(conf, warehousePath);
The Hadoop catalog is not safe for concurrent writes on local filesystems or S3. Use it with HDFS or an atomic-rename-capable filesystem.

Catalog Operations

All catalogs implement the Catalog interface:
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.catalog.Namespace;

// Create a table identifier
TableIdentifier name = TableIdentifier.of("logging", "logs");

// Create a table
Table table = catalog.createTable(name, schema, spec);

// Load an existing table
Table table = catalog.loadTable(name);

// List tables in a namespace
List<TableIdentifier> tables = catalog.listTables(Namespace.of("logging"));

// Rename a table
TableIdentifier newName = TableIdentifier.of("logging", "application_logs");
catalog.renameTable(name, newName);

// Drop a table
catalog.dropTable(name);

Creating Schemas

Schemas define the structure of your table. Iceberg schemas are strongly typed and support nested structures.

Basic Schema

Create a schema using the Schema class:
import org.apache.iceberg.Schema;
import org.apache.iceberg.types.Types;

Schema schema = new Schema(
    Types.NestedField.required(1, "id", Types.LongType.get()),
    Types.NestedField.required(2, "data", Types.StringType.get()),
    Types.NestedField.optional(3, "category", Types.StringType.get()),
    Types.NestedField.required(4, "event_time", 
        Types.TimestampType.withZone())
);
About type IDs:
  • Type IDs must be unique within a schema
  • They enable schema evolution and column mapping
  • Iceberg automatically reassigns IDs when creating tables

Complex Schema

Iceberg supports nested types (structs, lists, maps):
Schema schema = new Schema(
    Types.NestedField.required(1, "level", Types.StringType.get()),
    Types.NestedField.required(2, "event_time", 
        Types.TimestampType.withZone()),
    Types.NestedField.required(3, "message", Types.StringType.get()),
    
    // List type
    Types.NestedField.optional(4, "call_stack", 
        Types.ListType.ofRequired(5, Types.StringType.get())),
    
    // Struct type
    Types.NestedField.optional(6, "user", 
        Types.StructType.of(
            Types.NestedField.required(7, "id", Types.LongType.get()),
            Types.NestedField.required(8, "name", Types.StringType.get())
        )),
    
    // Map type
    Types.NestedField.optional(9, "properties",
        Types.MapType.ofRequired(10, 11, 
            Types.StringType.get(), 
            Types.StringType.get()))
);

Supported Types

Iceberg supports the following types:
Types.BooleanType.get()           // Boolean
Types.IntegerType.get()           // 32-bit integer
Types.LongType.get()              // 64-bit integer
Types.FloatType.get()             // 32-bit float
Types.DoubleType.get()            // 64-bit float
Types.DecimalType.of(9, 2)        // Decimal with precision and scale
Types.StringType.get()            // UTF-8 string
Types.BinaryType.get()            // Binary data
Types.FixedType.ofLength(16)      // Fixed-length binary
Types.UUIDType.get()              // UUID
Types.DateType.get()              // Date (no time)
Types.TimeType.get()              // Time (no date)

Converting Schemas

Convert schemas from other formats:
import org.apache.avro.Schema;
import org.apache.avro.Schema.Parser;
import org.apache.iceberg.avro.AvroSchemaUtil;

Schema avroSchema = new Parser().parse(
    "{\"type\": \"record\", \"name\": \"User\", \"fields\": [...]}"
);
Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema);

Creating Partition Specs

Partition specs define how Iceberg groups records into data files. Unlike Hive, Iceberg partitioning is hidden from users.

Basic Partitioning

import org.apache.iceberg.PartitionSpec;

// Unpartitioned table
PartitionSpec spec = PartitionSpec.unpartitioned();

// Partition by identity (exact value)
PartitionSpec spec = PartitionSpec.builderFor(schema)
    .identity("category")
    .build();

// Partition by hour of timestamp
PartitionSpec spec = PartitionSpec.builderFor(schema)
    .hour("event_time")
    .build();

Advanced Partitioning

Iceberg supports multiple partition transforms:
PartitionSpec spec = PartitionSpec.builderFor(schema)
    // Time-based transforms
    .year("event_time")      // Partition by year
    .month("event_time")     // Partition by month
    .day("event_time")       // Partition by day
    .hour("event_time")      // Partition by hour
    
    // Bucketing (hash-based)
    .bucket("id", 16)        // Hash into 16 buckets
    
    // Truncation (prefix)
    .truncate("message", 10) // First 10 characters
    
    // Identity (exact value)
    .identity("level")
    
    .build();

Real-World Example

Partition a logs table by hour and log level:
Schema schema = new Schema(
    Types.NestedField.required(1, "level", Types.StringType.get()),
    Types.NestedField.required(2, "event_time", Types.TimestampType.withZone()),
    Types.NestedField.required(3, "message", Types.StringType.get())
);

PartitionSpec spec = PartitionSpec.builderFor(schema)
    .hour("event_time")
    .identity("level")
    .build();
Users don’t need to know about partitioning when querying! This query automatically uses the partition:
SELECT * FROM logs 
WHERE event_time BETWEEN '2024-01-01 10:00:00' AND '2024-01-01 12:00:00'
  AND level = 'ERROR';

Creating Tables

Combine schemas and partition specs to create tables:
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;

// Define the table
TableIdentifier name = TableIdentifier.of("logging", "logs");

// Create the table
Table table = catalog.createTable(name, schema, spec);

System.out.println("Created table at: " + table.location());

With Properties

Specify table properties at creation time:
import java.util.HashMap;
import java.util.Map;

Map<String, String> properties = new HashMap<>();
properties.put("write.format.default", "parquet");
properties.put("write.parquet.compression-codec", "zstd");
properties.put("commit.retry.num-retries", "3");

Table table = catalog.createTable(name, schema, spec, properties);

Table Metadata

The Table interface provides access to table metadata:
import org.apache.iceberg.Table;

Table table = catalog.loadTable(TableIdentifier.of("db", "table"));

// Get schema
Schema schema = table.schema();
System.out.println("Schema: " + schema);

// Get partition spec
PartitionSpec spec = table.spec();
System.out.println("Partition spec: " + spec);

// Get properties
Map<String, String> properties = table.properties();
System.out.println("Properties: " + properties);

// Get current snapshot
Snapshot snapshot = table.currentSnapshot();
if (snapshot != null) {
    System.out.println("Snapshot ID: " + snapshot.snapshotId());
    System.out.println("Timestamp: " + snapshot.timestampMillis());
}

// Get all snapshots
Iterable<Snapshot> snapshots = table.snapshots();
for (Snapshot s : snapshots) {
    System.out.println("Snapshot: " + s.snapshotId());
}

// Get table location
String location = table.location();
System.out.println("Location: " + location);

Scanning Tables

Scan tables to read data.

File-Level Scanning

Get the list of files to read:
import org.apache.iceberg.TableScan;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.FileScanTask;

// Create a scan
TableScan scan = table.newScan();

// Apply filters
scan = scan.filter(Expressions.equal("level", "ERROR"));
scan = scan.filter(Expressions.greaterThan("event_time", 1704067200000L));

// Project columns
scan = scan.select("level", "message", "event_time");

// Get the projected schema
Schema projection = scan.schema();

// Plan and iterate files
try (CloseableIterable<FileScanTask> tasks = scan.planFiles()) {
    for (FileScanTask task : tasks) {
        System.out.println("File: " + task.file().path());
        System.out.println("Records: " + task.file().recordCount());
        System.out.println("Size: " + task.file().fileSizeInBytes());
    }
}

Time Travel

Read from historical snapshots:
// Read from a specific snapshot
TableScan scan = table.newScan()
    .useSnapshot(snapshotId);

// Read as of a timestamp (milliseconds since epoch)
long oneHourAgo = System.currentTimeMillis() - 3600000;
TableScan scan = table.newScan()
    .asOfTime(oneHourAgo);

Row-Level Scanning

Read actual row data using IcebergGenerics:
import org.apache.iceberg.data.IcebergGenerics;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.expressions.Expressions;

// Read records
try (CloseableIterable<Record> records = IcebergGenerics.read(table)
        .where(Expressions.lessThan("id", 100))
        .select("id", "name", "email")
        .build()) {
    
    for (Record record : records) {
        Long id = record.getField("id");
        String name = record.getField("name");
        String email = record.getField("email");
        
        System.out.println(id + ": " + name + " <" + email + ">");
    }
}

Filter Expressions

Iceberg supports rich filter expressions:
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.Expression;

// Comparison operators
Expression filter = Expressions.equal("status", "active");
filter = Expressions.notEqual("status", "deleted");
filter = Expressions.lessThan("age", 30);
filter = Expressions.lessThanOrEqual("age", 30);
filter = Expressions.greaterThan("age", 18);
filter = Expressions.greaterThanOrEqual("age", 18);

// Null checks
filter = Expressions.isNull("deleted_at");
filter = Expressions.isNotNull("created_at");

// String operations
filter = Expressions.startsWith("name", "John");
filter = Expressions.notStartsWith("name", "Test");

// Set membership
filter = Expressions.in("status", "active", "pending", "approved");
filter = Expressions.notIn("status", "deleted", "archived");

// Logical operators
filter = Expressions.and(
    Expressions.greaterThan("age", 18),
    Expressions.equal("status", "active")
);
filter = Expressions.or(
    Expressions.equal("type", "admin"),
    Expressions.equal("type", "moderator")
);
filter = Expressions.not(Expressions.equal("status", "deleted"));

Update Operations

All update operations use a builder pattern with a commit() call.

Appending Data

import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;

// Create data file metadata
DataFile dataFile = DataFiles.builder(spec)
    .withPath("/path/to/data-file.parquet")
    .withFileSizeInBytes(1048576)  // 1 MB
    .withRecordCount(1000)
    .withPartitionPath("event_time_hour=2024-01-01-10/level=ERROR")
    .build();

// Append to table
table.newAppend()
    .appendFile(dataFile)
    .commit();

Schema Evolution

Modify the schema without rewriting data:
// Add columns
table.updateSchema()
    .addColumn("user_id", Types.LongType.get())
    .addColumn("tags", Types.ListType.ofOptional(100, Types.StringType.get()))
    .commit();

// Rename columns
table.updateSchema()
    .renameColumn("user_id", "account_id")
    .commit();

// Update column types (must be compatible)
table.updateSchema()
    .updateColumn("count", Types.LongType.get())
    .commit();

// Delete columns
table.updateSchema()
    .deleteColumn("deprecated_field")
    .commit();

// Make required column optional
table.updateSchema()
    .makeColumnOptional("optional_field")
    .commit();

// Update column documentation
table.updateSchema()
    .updateColumnDoc("email", "User's primary email address")
    .commit();

Partition Evolution

Change how new data is partitioned:
// Add new partition field
table.updateSpec()
    .addField(Expressions.bucket("user_id", 16))
    .commit();

// Remove partition field
table.updateSpec()
    .removeField("event_time_day")
    .commit();

// Rename partition field
table.updateSpec()
    .renameField("event_time_day", "date")
    .commit();

Updating Properties

table.updateProperties()
    .set("write.format.default", "parquet")
    .set("write.parquet.compression-codec", "zstd")
    .remove("old.property")
    .commit();

Deleting Data

import org.apache.iceberg.expressions.Expressions;

// Delete files matching a filter
table.newDelete()
    .deleteFromRowFilter(Expressions.lessThan("event_time", cutoffTime))
    .commit();

// Delete specific files
table.newDelete()
    .deleteFile(dataFile)
    .commit();

Expiring Snapshots

Remove old snapshots to save space:
long olderThan = System.currentTimeMillis() - 7 * 24 * 60 * 60 * 1000L; // 7 days

table.expireSnapshots()
    .expireOlderThan(olderThan)
    .retainLast(10)  // Keep at least 10 snapshots
    .commit();

Transactions

Group multiple operations into a single atomic commit:
import org.apache.iceberg.Transaction;

// Create a transaction
Transaction txn = table.newTransaction();

// Add operations to the transaction
txn.newDelete()
    .deleteFromRowFilter(Expressions.equal("status", "deleted"))
    .commit();

txn.newAppend()
    .appendFile(newDataFile)
    .commit();

txn.updateProperties()
    .set("last.update.time", String.valueOf(System.currentTimeMillis()))
    .commit();

// Commit all operations atomically
txn.commitTransaction();

Branching and Tagging

Create branches and tags for experimentation and auditing.

Creating Branches

String branch = "test-branch";

// Create a branch with retention settings
table.manageSnapshots()
    .createBranch(branch, table.currentSnapshot().snapshotId())
    .setMinSnapshotsToKeep(branch, 2)
    .setMaxSnapshotAgeMs(branch, 3600000)      // 1 hour
    .setMaxRefAgeMs(branch, 604800000)         // 7 days
    .commit();

Creating Tags

String tag = "v1.0.0";

table.manageSnapshots()
    .createTag(tag, snapshotId)
    .setMaxRefAgeMs(tag, 86400000)  // 1 day
    .commit();

Writing to Branches

// Append to a branch
table.newAppend()
    .appendFile(dataFile)
    .toBranch("test-branch")
    .commit();

// Row delta on a branch
table.newRowDelta()
    .addRows(dataFile)
    .addDeletes(deleteFile)
    .toBranch("test-branch")
    .commit();

Reading from Branches

// Read from branch head
TableScan branchScan = table.newScan().useRef("test-branch");

// Read from tag
TableScan tagScan = table.newScan().useRef("v1.0.0");

Managing Branches and Tags

// Fast-forward a branch
table.manageSnapshots()
    .fastForwardBranch("test-branch", "main")
    .commit();

// Replace branch/tag snapshot
table.manageSnapshots()
    .replaceBranch("test-branch", newSnapshotId)
    .commit();

// Remove branch
table.manageSnapshots()
    .removeBranch("test-branch")
    .commit();

// Remove tag
table.manageSnapshots()
    .removeTag("v1.0.0")
    .commit();

Best Practices

Use try-with-resources for scans and iterables:
try (CloseableIterable<Record> records = IcebergGenerics.read(table).build()) {
    for (Record record : records) {
        // Process record
    }
}
  • Too fine: Many small files, slow metadata operations
  • Too coarse: Poor query pruning, read unnecessary data
  • Target: 500MB-1GB per partition
// Good: Daily partitions for moderate data volume
PartitionSpec.builderFor(schema).day("event_time").build();

// Good: Hourly + bucketing for high volume
PartitionSpec.builderFor(schema)
    .hour("event_time")
    .bucket("user_id", 16)
    .build();
Iceberg uses optimistic concurrency. Writes may need to retry:
try {
    table.newAppend()
        .appendFile(dataFile)
        .commit();
} catch (CommitFailedException e) {
    // Another writer committed first
    // Iceberg will retry automatically if the change is compatible
    throw e;
}
Regularly run maintenance operations:
  • Expire snapshots: Remove old history
  • Remove orphan files: Clean up unreferenced files
  • Compact files: Merge small files
  • Rewrite manifests: Optimize metadata
// Expire old snapshots
table.expireSnapshots()
    .expireOlderThan(weekAgo)
    .retainLast(10)
    .commit();

Next Steps

API Documentation

Complete Java API reference

Schema Evolution

Learn safe schema evolution

Performance Tuning

Optimize table performance

Configuration

Table and catalog configuration