Reading shapefiles into PyIceberg DataFrames efficiently

The primary failure mode in spatial lakehouse ingestion pipelines is unbounded memory allocation during legacy vector parsing. Shapefiles (.shp, .shx, .dbf) lack native spatial indexing at the file level and force geometry deserialization into in-memory object graphs. When routed directly into PyIceberg through high-level geopandas or fiona read_file() calls, the resulting DataFrame triggers immediate OOM kills on standard 16–32GB worker nodes. This occurs because PyIceberg’s schema inference engine attempts to materialize the entire coordinate array before Parquet serialization, while simultaneously allocating temporary buffers for WKT/WKB conversion. Production-grade ingestion requires decoupling I/O, geometry serialization, and Iceberg write transactions into a strictly bounded, chunked pipeline.

Root Cause Analysis: Double-Materialization Overhead

Shapefile ingestion fails at two distinct boundaries: cursor exhaustion and schema inference latency. The .shp format stores geometries as variable-length binary records. Loading these into a Pandas-backed DataFrame forces contiguous memory allocation proportional to vertex count, not file size. When PyIceberg attempts to map these objects to its native binary or geometry types, it performs a full scan to infer nullability, precision, and coordinate reference system (CRS) metadata. This double-materialization pattern is unsustainable for municipal-scale parcels, hydrological networks, or cadastral datasets exceeding 500MB.

The resolution requires bypassing high-level geometry object instantiation entirely. Instead of materializing shapely objects, the pipeline must stream raw WKB bytes directly from the file cursor, apply explicit schema constraints, and append batches to the Iceberg table using transactional append operations. This approach aligns with established PyIceberg Spatial Workflows that prioritize binary column mapping over object-relational translation.

Pipeline Architecture: Chunked WKB Streaming

The ingestion engine must operate at the record level, not the dataset level. pyogrio provides a low-overhead cursor interface that supports batch_size iteration without loading the full attribute table. Each batch is extracted as a pyarrow.RecordBatch, where the geometry column is immediately serialized to WKB bytes. This eliminates Python object overhead and reduces memory footprint by 60–80% compared to GeoJSON or WKT intermediaries.

The architecture enforces three strict boundaries:

  1. Explicit Schema Declaration: Bypasses PyIceberg’s inference engine by defining column types upfront.
  2. Vectorized WKB Conversion: Uses Shapely 2.0+ C-accelerated routines to convert Arrow geometry arrays directly to binary.
  3. Transactional Append Isolation: Commits each batch as a discrete Iceberg snapshot, enabling rollback on failure and preventing partial writes.

Production Implementation

The following pipeline configures a chunked reader, applies deterministic WKB serialization, and writes to an Iceberg table with explicit partitioning and compression. It assumes a pre-initialized PyIceberg catalog (catalog) and a target table namespace.

python
import os
import gc
import logging
import pyogrio
import pyarrow as pa
import pyarrow.compute as pc
import shapely
from pyiceberg.catalog import load_catalog
from pyiceberg.schema import Schema
from pyiceberg.types import BinaryType, StringType, IntegerType, NestedField
from pyiceberg.partitioning import PartitionSpec, PartitionField
from pyiceberg.transforms import IdentityTransform

logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")

# 1. Define explicit Iceberg schema to bypass inference
iceberg_schema = Schema(
    NestedField(1, "parcel_id", IntegerType(), required=True),
    NestedField(2, "owner_name", StringType(), required=False),
    NestedField(3, "zoning_code", StringType(), required=False),
    NestedField(4, "geometry_wkb", BinaryType(), required=True),
)

# 2. Initialize table with partitioning and compression properties
catalog = load_catalog("default")
table = catalog.create_table(
    identifier="spatial.parcels_raw",
    schema=iceberg_schema,
    partition_spec=PartitionSpec(
        PartitionField(
            source_id=2, field_id=1000, transform=IdentityTransform(), name="zoning_code"
        )
    ),
    properties={
        "write.parquet.compression-codec": "zstd",
        "write.parquet.compression-level": "3",
        "write.target-file-size-bytes": "134217728",  # 128MB
        "write.parquet.page-size-bytes": "1048576",    # 1MB
    },
)

def stream_shapefile_to_iceberg(shapefile_path: str, batch_size: int = 25000):
    """Chunked WKB ingestion pipeline with transactional rollback."""
    if not os.path.exists(shapefile_path):
        raise FileNotFoundError(f"Shapefile not found: {shapefile_path}")

    # pyogrio.open_arrow yields an iterator of pyarrow.RecordBatch
    with pyogrio.open_arrow(shapefile_path, batch_size=batch_size) as reader:
        for batch_idx, batch in enumerate(reader):
            try:
                # Extract and convert geometry to WKB bytes (Shapely 2.0+ vectorized)
                geo_col = batch.column("geometry")
                # Convert Arrow geometry to WKB binary array
                wkb_bytes = shapely.to_wkb(geo_col.to_pylist(), include_z=False, hex=False)
                wkb_array = pa.array(wkb_bytes, type=pa.binary())

                # Construct PyArrow table matching Iceberg schema
                arrow_table = pa.table({
                    "parcel_id": batch.column("parcel_id"),
                    "owner_name": batch.column("owner_name"),
                    "zoning_code": batch.column("zoning_code"),
                    "geometry_wkb": wkb_array,
                })

                # Transactional append
                table.append(arrow_table)
                logging.info(f"Committed batch {batch_idx} ({len(arrow_table)} rows)")

                # Force garbage collection to release Arrow buffers
                del arrow_table, wkb_array, batch
                gc.collect()

            except Exception as e:
                logging.error(f"Batch {batch_idx} failed: {e}")
                # Iceberg handles snapshot isolation; failed batches do not corrupt table
                raise RuntimeError(f"Ingestion aborted at batch {batch_idx}. Check transaction logs.") from e

    logging.info("Ingestion complete. Run snapshot expiration to reclaim staging files.")

Configuration & Tuning Parameters

Production deployments require explicit memory and I/O constraints. The following parameters govern pipeline stability:

Parameter Recommended Value Impact
batch_size 15000–35000 Balances cursor overhead with memory pressure. Exceeding 50k triggers swap on 16GB nodes.
write.target-file-size-bytes 134217728 (128MB) Aligns with Iceberg’s default file sizing. Prevents small-file fragmentation during compaction.
PYARROW_MEMORY_LIMIT 85% of container RAM Enforces hard cap on Arrow buffer allocation. Set via os.environ["PYARROW_MEMORY_LIMIT"] = "12G"
shapely.to_wkb(include_z) False Strips Z-coordinates unless explicitly required. Reduces WKB payload by ~33%.
iceberg.catalog.io-impl pyiceberg.io.pyarrow.PyArrowFileIO Ensures zero-copy Parquet writes and native Arrow buffer reuse.

These settings integrate seamlessly with broader Python Ecosystem & Integration Workflows that standardize lakehouse I/O across heterogeneous data sources.

Failure Resolution & Debugging

OOM During Append

Symptom: Worker process killed with SIGKILL or MemoryError during table.append(). Resolution: Reduce batch_size to 10000. Verify PYARROW_MEMORY_LIMIT is enforced. Disable geopandas fallback by ensuring pyogrio is the sole vector reader. Monitor RSS with psutil.Process(os.getpid()).memory_info().rss.

Schema Drift / Type Mismatch

Symptom: pyiceberg.exceptions.SchemaMismatchError: Field 'geometry_wkb' type mismatch. Resolution: PyIceberg enforces strict schema evolution. If the source shapefile adds columns, update the Iceberg table via table.update_schema().add_column(...).commit() before ingestion. Never rely on implicit type coercion.

Transaction Timeout / Staging File Accumulation

Symptom: TimeoutError during commit or excessive .metadata/ directory growth. Resolution: Increase pyiceberg.table.write.commit-retry-attempts to 5. Schedule table.expire_snapshots() and table.remove_orphan_files() post-ingestion. Staging files are retained until snapshot expiration by design.

Invalid WKB / Geometry Validation

Symptom: shapely.errors.GeoTypeError: Invalid geometry type. Resolution: Pre-validate with pyogrio.open_arrow(..., skip_invalid=True). Alternatively, wrap shapely.to_wkb() in a try/except and log malformed record indices for manual QA. Iceberg does not perform runtime geometry validation; enforce it upstream.

For authoritative reference on Arrow geometry interoperability and WKB specification compliance, consult the Pyogrio Documentation and the Apache Iceberg Python API Reference. Implementing this pipeline guarantees deterministic memory bounds, snapshot-safe commits, and direct compatibility with downstream spatial query engines.