Documentation Index
Fetch the complete documentation index at: https://mintlify.com/apache/iceberg/llms.txt
Use this file to discover all available pages before exploring further.
The BatchScan interface provides an API for configuring batch-oriented scans over Iceberg tables, optimized for processing large amounts of data.
Overview
BatchScan is designed for batch processing workflows and extends the base Scan interface. It provides similar snapshot selection capabilities to TableScan but is optimized for batch processing engines.
Interface
public interface BatchScan extends Scan<BatchScan, ScanTask, ScanTaskGroup<ScanTask>>
Core Methods
table()
Returns the table from which this scan loads data.
Returns: This scan’s table
Example:
BatchScan scan = table.newBatchScan();
Table sourceTable = scan.table();
useSnapshot()
Creates a new batch scan that will use a snapshot with the given ID.
BatchScan useSnapshot(long snapshotId)
Parameters:
snapshotId - A snapshot ID
Returns: A new scan based on this with the given snapshot ID
Throws: IllegalArgumentException if the snapshot cannot be found
Example:
BatchScan scan = table.newBatchScan()
.useSnapshot(987654321L);
useRef()
Creates a new batch scan that will use the given reference.
BatchScan useRef(String ref)
Parameters:
ref - A reference name (branch or tag)
Returns: A new scan based on this with the given reference
Throws: IllegalArgumentException if the reference with the given name could not be found
Example:
BatchScan scan = table.newBatchScan()
.useRef("main");
asOfTime()
Creates a new batch scan that will use the most recent snapshot as of the given time.
BatchScan asOfTime(long timestampMillis)
Parameters:
timestampMillis - A timestamp in milliseconds since the epoch
Returns: A new scan based on this with the current snapshot at the given time
Throws: IllegalArgumentException if the snapshot cannot be found or time travel is attempted on a tag
Example:
// Scan as of 2 days ago
long twoDaysAgo = System.currentTimeMillis() - (2 * 24 * 60 * 60 * 1000);
BatchScan scan = table.newBatchScan()
.asOfTime(twoDaysAgo);
snapshot()
Returns the snapshot that will be used by this scan.
Returns: The Snapshot this scan will use
Example:
BatchScan scan = table.newBatchScan();
Snapshot snapshot = scan.snapshot();
System.out.println("Using snapshot: " + snapshot.snapshotId());
Examples
Basic Batch Scan
import org.apache.iceberg.Table;
import org.apache.iceberg.BatchScan;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.ScanTask;
import org.apache.iceberg.ScanTaskGroup;
// Create a batch scan
Table table = ...;
BatchScan scan = table.newBatchScan();
// Plan tasks
try (CloseableIterable<ScanTask> tasks = scan.planFiles()) {
for (ScanTask task : tasks) {
// Process each scan task
}
}
Batch Scan with Snapshot Selection
// Get latest snapshot ID
long latestSnapshotId = table.currentSnapshot().snapshotId();
// Create scan for specific snapshot
BatchScan scan = table.newBatchScan()
.useSnapshot(latestSnapshotId);
Snapshot snapshot = scan.snapshot();
System.out.println("Processing snapshot: " + snapshot.snapshotId());
System.out.println("Snapshot timestamp: " + snapshot.timestampMillis());
Time-Based Batch Processing
import java.time.Instant;
import java.time.LocalDate;
import java.time.ZoneOffset;
// Process data as of start of today
Instant startOfToday = LocalDate.now()
.atStartOfDay()
.toInstant();
BatchScan scan = table.newBatchScan()
.asOfTime(startOfToday.toEpochMilli());
// Process tasks
try (CloseableIterable<ScanTaskGroup<ScanTask>> taskGroups = scan.planTasks()) {
for (ScanTaskGroup<ScanTask> group : taskGroups) {
// Process task group in batch
for (ScanTask task : group.tasks()) {
// Process individual task
}
}
}
Branch-Based Batch Scan
// Scan a specific branch for batch processing
BatchScan productionScan = table.newBatchScan()
.useRef("production");
// Apply filters and projection
BatchScan filteredScan = productionScan
.filter(Expressions.greaterThan("timestamp", startTime))
.select("id", "event_type", "payload");
try (CloseableIterable<ScanTask> tasks = filteredScan.planFiles()) {
tasks.forEach(task -> {
// Batch process filtered data from production branch
});
}
Task Group Processing
// Plan task groups for parallel processing
BatchScan scan = table.newBatchScan()
.filter(Expressions.equal("date", "2024-01-15"));
try (CloseableIterable<ScanTaskGroup<ScanTask>> taskGroups = scan.planTasks()) {
// Each task group can be processed in parallel
taskGroups.forEach(group -> {
System.out.println("Task group with " + group.tasks().size() + " tasks");
// Process all tasks in this group
for (ScanTask task : group.tasks()) {
// Process task
}
});
}
Historical Batch Processing
import java.util.List;
// Process all snapshots in a date range
List<Snapshot> snapshots = table.snapshots();
for (Snapshot snapshot : snapshots) {
long timestamp = snapshot.timestampMillis();
if (timestamp >= startDate && timestamp <= endDate) {
BatchScan scan = table.newBatchScan()
.useSnapshot(snapshot.snapshotId());
try (CloseableIterable<ScanTask> tasks = scan.planFiles()) {
// Process this snapshot in batch
processBatch(tasks);
}
}
}
Comparing TableScan and BatchScan
// TableScan returns FileScanTask
TableScan tableScan = table.newScan();
try (CloseableIterable<FileScanTask> fileTasks = tableScan.planFiles()) {
// Process individual files
}
// BatchScan returns ScanTask (more general)
BatchScan batchScan = table.newBatchScan();
try (CloseableIterable<ScanTask> scanTasks = batchScan.planFiles()) {
// Process scan tasks (may include more than just files)
}
// BatchScan can group tasks for better parallelism
try (CloseableIterable<ScanTaskGroup<ScanTask>> groups = batchScan.planTasks()) {
// Process task groups in parallel
}
Task Planning
BatchScan provides two planning methods:
planFiles()
Returns individual scan tasks.
CloseableIterable<ScanTask> planFiles()
planTasks()
Returns grouped scan tasks optimized for parallel processing.
CloseableIterable<ScanTaskGroup<ScanTask>> planTasks()
Example:
BatchScan scan = table.newBatchScan();
// Option 1: Process individual tasks
try (CloseableIterable<ScanTask> tasks = scan.planFiles()) {
tasks.forEach(this::processTask);
}
// Option 2: Process task groups (better for parallelism)
try (CloseableIterable<ScanTaskGroup<ScanTask>> groups = scan.planTasks()) {
groups.forEach(group -> {
// Submit entire group to a worker
executor.submit(() -> processGroup(group));
});
}
See Also