Automating Metadata Extraction for Batch Raster Jobs

Quick answer

Use rasterio inside a ThreadPoolExecutor to read only file headers — no pixel data — and collect results as a JSON list. The loop below processes thousands of GeoTIFFs in seconds:

from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
import rasterio

def extract_single(path: Path) -> dict:
    with rasterio.open(path) as src:
        return {
            "file": str(path),
            "crs": src.crs.to_string() if src.crs else None,
            "bounds": src.bounds._asdict(),
            "res": src.res,
            "count": src.count,
            "dtype": src.dtypes[0],
            "nodata": src.nodata,
        }

paths = list(Path("tiles/").glob("**/*.tif"))
with ThreadPoolExecutor(max_workers=8) as pool:
    results = [f.result() for f in as_completed(pool.submit(extract_single, p) for p in paths)]

Context

When a remote sensing project graduates from one-off inspection scripts to a repeatable data pipeline, you face hundreds or thousands of raster files from heterogeneous sensors, acquisition dates, and CRS definitions. Manually calling rasterio.open() on each file sequentially is accurate but far too slow for production; a 10 000-tile Sentinel-2 archive can take over 30 minutes at 3 files/second on a local NVMe drive.

This page covers the batch extraction pattern that belongs to Extracting and Parsing Raster Metadata — the art of reading spatial, spectral, and provenance attributes from file headers as fast as the storage subsystem allows. For the broader architectural picture, including how extracted metadata slots into spatiotemporal catalogs, see Core Raster Fundamentals & STAC Mapping.

The output of a well-designed batch extractor feeds directly into downstream steps: querying STAC catalogs programmatically to find assets by bounding box and date, or handling pixel resolution and scaling decisions that depend on the per-file res tuple you extract here.


Environment & setup

Package Minimum version Role
rasterio 1.3 Header-only reads, CRS access, affine transform
pyarrow 14.0 Parquet output for large batches
pandas 2.0 DataFrame construction before Parquet write
pip install "rasterio>=1.3" "pyarrow>=14.0" "pandas>=2.0"

Pipeline architecture

The diagram below shows how file paths flow through the thread pool, each worker reading only the header, then results converge into a normalised output file.

Batch metadata extraction pipeline File paths are dispatched to a ThreadPoolExecutor. Each worker opens a GeoTIFF header with rasterio, extracts CRS, bounds, resolution, band count, and tags, then returns a dict. Results are collected by as_completed and written to a JSON or Parquet file. File list Path("tiles/ **/*.tif") ThreadPoolExecutor worker · rasterio.open() worker · rasterio.open() worker · rasterio.open() Collect & normalise as_completed() Output JSON / Parquet Fields per file: crs · bounds · res count · dtype · nodata · tags

Complete working example

The script below is production-ready: it reads only headers, handles errors per file without aborting the batch, normalises the output schema, and supports both JSON and Parquet sinks.

import json
import logging
import os
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Any

import rasterio
from rasterio.errors import RasterioError

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s | %(levelname)s | %(message)s",
)

# ---------------------------------------------------------------------------
# Single-file extractor — runs inside every thread
# ---------------------------------------------------------------------------

def extract_raster_metadata(filepath: Path) -> dict[str, Any]:
    """Read only the file header; never load pixel arrays into memory."""
    record: dict[str, Any] = {
        "filepath": str(filepath),
        "status": "success",
        "crs": None,
        "bounds": None,
        "res": None,
        "count": None,
        "dtype": None,
        "nodata": None,
        "transform": None,
        "dataset_tags": {},
        "band_tags": {},
    }
    try:
        with rasterio.open(filepath) as src:
            # .crs, .bounds, .res never trigger pixel reads
            record["crs"] = src.crs.to_string() if src.crs else None
            record["bounds"] = src.bounds._asdict()
            record["res"] = list(src.res)            # (x_res, y_res) in CRS units
            record["count"] = src.count
            record["dtype"] = src.dtypes[0]          # first band's dtype; all bands if uniform
            record["nodata"] = src.nodata
            # Affine transform as 6-element GDAL list: [xoff, xscale, 0, yoff, 0, yscale]
            record["transform"] = [round(v, 9) for v in src.transform.to_gdal()]
            # Dataset-level GDAL metadata tags (acquisition date, sensor name, etc.)
            record["dataset_tags"] = dict(src.tags())
            # Per-band tags: dict keyed by 1-based band index
            record["band_tags"] = {
                i: dict(src.tags(i)) for i in src.indexes if src.tags(i)
            }
    except RasterioError as exc:
        # Corrupted header, unsupported driver, missing auxiliary files
        record["status"] = "error"
        record["error_message"] = str(exc)
        logging.warning("RasterioError on %s: %s", filepath, exc)
    except Exception as exc:  # noqa: BLE001
        # Permissions, missing file, unexpected GDAL state
        record["status"] = "fatal"
        record["error_message"] = str(exc)
        logging.error("Fatal error on %s: %s", filepath, exc)
    return record


# ---------------------------------------------------------------------------
# Batch runner
# ---------------------------------------------------------------------------

def run_batch_extraction(
    filepaths: list[Path],
    *,
    max_workers: int | None = None,
    output_json: Path | None = Path("metadata_batch.json"),
    output_parquet: Path | None = None,
) -> list[dict[str, Any]]:
    """
    Extract headers from all raster files in parallel.

    max_workers defaults to min(32, os.cpu_count() * 4) — a reasonable
    starting point for NVMe or fast SAN storage.  For S3 or GCS, reduce
    to 8–12 to avoid HTTP 429 / connection pool exhaustion.
    """
    if max_workers is None:
        max_workers = min(32, (os.cpu_count() or 4) * 4)

    results: list[dict[str, Any]] = []
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_map = {executor.submit(extract_raster_metadata, fp): fp for fp in filepaths}
        for future in as_completed(future_map):
            results.append(future.result())

    # Sort deterministically so diffs are meaningful
    results.sort(key=lambda r: r["filepath"])

    success = sum(1 for r in results if r["status"] == "success")
    logging.info("Batch complete: %d/%d succeeded.", success, len(results))

    if output_json:
        output_json.write_text(
            json.dumps(results, indent=2, ensure_ascii=False), encoding="utf-8"
        )
        logging.info("JSON written to %s", output_json)

    if output_parquet:
        _write_parquet(results, output_parquet)

    return results


def _write_parquet(records: list[dict[str, Any]], path: Path) -> None:
    """Flatten nested dicts to string columns and write Parquet."""
    import pandas as pd  # lazy import — only required for Parquet output

    df = pd.DataFrame(records)
    # Nested dicts (bounds, dataset_tags, band_tags) must be serialised
    for col in ("bounds", "dataset_tags", "band_tags"):
        if col in df.columns:
            df[col] = df[col].apply(
                lambda v: json.dumps(v) if isinstance(v, dict) else v
            )
    # res is a list — store as string to keep Parquet schema simple
    if "res" in df.columns:
        df["res"] = df["res"].apply(str)

    df.to_parquet(path, index=False)
    logging.info("Parquet written to %s", path)


# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------

if __name__ == "__main__":
    tile_dir = Path("tiles/")
    all_tifs = sorted(tile_dir.glob("**/*.tif"))
    run_batch_extraction(
        all_tifs,
        output_json=Path("metadata_batch.json"),
        output_parquet=Path("metadata_batch.parquet"),
    )

Variant patterns

Variant 1 — Cloud object storage (S3 / GCS)

When files live in cloud buckets, open them via rasterio’s GDAL virtual filesystem (/vsis3/, /vsigs/). The header read still avoids a full download because Cloud-Optimized GeoTIFF structure places the TIFF header and IFD at the front of the file, so GDAL fetches only the first HTTP range.

import boto3
import rasterio
from rasterio.session import AWSSession

def extract_s3_metadata(s3_uri: str) -> dict:
    """
    s3_uri format: "s3://bucket/path/to/file.tif"
    Reuses an authenticated boto3 session across threads.
    """
    boto_session = boto3.Session()  # reads credentials from env / instance role
    with rasterio.Env(AWSSession(boto_session)):
        # /vsis3/ path is constructed automatically by rasterio from the s3:// URI
        with rasterio.open(s3_uri) as src:
            return {
                "uri": s3_uri,
                "crs": src.crs.to_string() if src.crs else None,
                "bounds": src.bounds._asdict(),
                "res": list(src.res),
                "count": src.count,
                "dtype": src.dtypes[0],
            }

Reduce max_workers to 8–12 for S3 to avoid HTTP 503 throttling. Enable GDAL_HTTP_UNSAFESSL=YES only if your bucket endpoint uses a self-signed certificate.

Variant 2 — Parquet output for enterprise catalogs

JSON works well for debugging batches under ~50 000 files. Above that, switch to Parquet. The _write_parquet helper in the main script handles this, but you can also build the DataFrame incrementally when RAM is tight:

import pyarrow as pa
import pyarrow.parquet as pq

def stream_to_parquet(records_iter, output_path: Path, batch_size: int = 5_000) -> None:
    """Write results in batches to cap memory usage during conversion."""
    writer = None
    batch: list[dict] = []

    for record in records_iter:
        batch.append(record)
        if len(batch) >= batch_size:
            table = pa.Table.from_pylist(batch)
            if writer is None:
                writer = pq.ParquetWriter(output_path, table.schema)
            writer.write_table(table)
            batch.clear()

    # Flush the final partial batch
    if batch:
        table = pa.Table.from_pylist(batch)
        if writer is None:
            writer = pq.ParquetWriter(output_path, table.schema)
        writer.write_table(table)

    if writer:
        writer.close()

Variant 3 — Resume interrupted batches with a checkpoint file

For archives exceeding 100 000 files, maintain a checkpoint so a crashed run does not reprocess already-extracted files:

import json
from pathlib import Path

def load_checkpoint(checkpoint_path: Path) -> set[str]:
    """Return the set of already-processed file paths."""
    if not checkpoint_path.exists():
        return set()
    with checkpoint_path.open() as f:
        return {line.strip() for line in f if line.strip()}

def save_checkpoint(checkpoint_path: Path, filepath: str) -> None:
    """Append one processed path to the checkpoint file."""
    with checkpoint_path.open("a") as f:
        f.write(filepath + "\n")

# Usage: filter before submitting to the pool
checkpoint = Path("extraction_checkpoint.txt")
already_done = load_checkpoint(checkpoint)
remaining = [p for p in all_tifs if str(p) not in already_done]

Common errors

NotGeoreferencedWarning: Dataset has no geotransform The file lacks an affine transform; src.transform returns the default identity matrix. Fix: filter these files out before the batch or flag record["status"] = "no_georef" and exclude them from catalog ingestion.

CPLE_OpenFailed: No such file or directory The path string was constructed incorrectly (e.g. a mixed-OS path separator). Fix: always use pathlib.Path objects and call str(path) only when passing to rasterio.open().

RasterioIOError: HTTP error 403 Cloud bucket access is denied. Fix: confirm your boto3.Session() picks up the correct credentials; for EC2 instances check the instance profile IAM policy includes s3:GetObject.