Automating Metadata Extraction for Batch Raster Jobs
Quick answer
Use rasterio inside a ThreadPoolExecutor to read only file headers — no pixel data — and collect results as a JSON list. The loop below processes thousands of GeoTIFFs in seconds:
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
import rasterio
def extract_single(path: Path) -> dict:
with rasterio.open(path) as src:
return {
"file": str(path),
"crs": src.crs.to_string() if src.crs else None,
"bounds": src.bounds._asdict(),
"res": src.res,
"count": src.count,
"dtype": src.dtypes[0],
"nodata": src.nodata,
}
paths = list(Path("tiles/").glob("**/*.tif"))
with ThreadPoolExecutor(max_workers=8) as pool:
results = [f.result() for f in as_completed(pool.submit(extract_single, p) for p in paths)]
Context
When a remote sensing project graduates from one-off inspection scripts to a repeatable data pipeline, you face hundreds or thousands of raster files from heterogeneous sensors, acquisition dates, and CRS definitions. Manually calling rasterio.open() on each file sequentially is accurate but far too slow for production; a 10 000-tile Sentinel-2 archive can take over 30 minutes at 3 files/second on a local NVMe drive.
This page covers the batch extraction pattern that belongs to Extracting and Parsing Raster Metadata — the art of reading spatial, spectral, and provenance attributes from file headers as fast as the storage subsystem allows. For the broader architectural picture, including how extracted metadata slots into spatiotemporal catalogs, see Core Raster Fundamentals & STAC Mapping.
The output of a well-designed batch extractor feeds directly into downstream steps: querying STAC catalogs programmatically to find assets by bounding box and date, or handling pixel resolution and scaling decisions that depend on the per-file res tuple you extract here.
Environment & setup
| Package | Minimum version | Role |
|---|---|---|
rasterio |
1.3 | Header-only reads, CRS access, affine transform |
pyarrow |
14.0 | Parquet output for large batches |
pandas |
2.0 | DataFrame construction before Parquet write |
pip install "rasterio>=1.3" "pyarrow>=14.0" "pandas>=2.0"
Pipeline architecture
The diagram below shows how file paths flow through the thread pool, each worker reading only the header, then results converge into a normalised output file.
Complete working example
The script below is production-ready: it reads only headers, handles errors per file without aborting the batch, normalises the output schema, and supports both JSON and Parquet sinks.
import json
import logging
import os
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Any
import rasterio
from rasterio.errors import RasterioError
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(levelname)s | %(message)s",
)
# ---------------------------------------------------------------------------
# Single-file extractor — runs inside every thread
# ---------------------------------------------------------------------------
def extract_raster_metadata(filepath: Path) -> dict[str, Any]:
"""Read only the file header; never load pixel arrays into memory."""
record: dict[str, Any] = {
"filepath": str(filepath),
"status": "success",
"crs": None,
"bounds": None,
"res": None,
"count": None,
"dtype": None,
"nodata": None,
"transform": None,
"dataset_tags": {},
"band_tags": {},
}
try:
with rasterio.open(filepath) as src:
# .crs, .bounds, .res never trigger pixel reads
record["crs"] = src.crs.to_string() if src.crs else None
record["bounds"] = src.bounds._asdict()
record["res"] = list(src.res) # (x_res, y_res) in CRS units
record["count"] = src.count
record["dtype"] = src.dtypes[0] # first band's dtype; all bands if uniform
record["nodata"] = src.nodata
# Affine transform as 6-element GDAL list: [xoff, xscale, 0, yoff, 0, yscale]
record["transform"] = [round(v, 9) for v in src.transform.to_gdal()]
# Dataset-level GDAL metadata tags (acquisition date, sensor name, etc.)
record["dataset_tags"] = dict(src.tags())
# Per-band tags: dict keyed by 1-based band index
record["band_tags"] = {
i: dict(src.tags(i)) for i in src.indexes if src.tags(i)
}
except RasterioError as exc:
# Corrupted header, unsupported driver, missing auxiliary files
record["status"] = "error"
record["error_message"] = str(exc)
logging.warning("RasterioError on %s: %s", filepath, exc)
except Exception as exc: # noqa: BLE001
# Permissions, missing file, unexpected GDAL state
record["status"] = "fatal"
record["error_message"] = str(exc)
logging.error("Fatal error on %s: %s", filepath, exc)
return record
# ---------------------------------------------------------------------------
# Batch runner
# ---------------------------------------------------------------------------
def run_batch_extraction(
filepaths: list[Path],
*,
max_workers: int | None = None,
output_json: Path | None = Path("metadata_batch.json"),
output_parquet: Path | None = None,
) -> list[dict[str, Any]]:
"""
Extract headers from all raster files in parallel.
max_workers defaults to min(32, os.cpu_count() * 4) — a reasonable
starting point for NVMe or fast SAN storage. For S3 or GCS, reduce
to 8–12 to avoid HTTP 429 / connection pool exhaustion.
"""
if max_workers is None:
max_workers = min(32, (os.cpu_count() or 4) * 4)
results: list[dict[str, Any]] = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_map = {executor.submit(extract_raster_metadata, fp): fp for fp in filepaths}
for future in as_completed(future_map):
results.append(future.result())
# Sort deterministically so diffs are meaningful
results.sort(key=lambda r: r["filepath"])
success = sum(1 for r in results if r["status"] == "success")
logging.info("Batch complete: %d/%d succeeded.", success, len(results))
if output_json:
output_json.write_text(
json.dumps(results, indent=2, ensure_ascii=False), encoding="utf-8"
)
logging.info("JSON written to %s", output_json)
if output_parquet:
_write_parquet(results, output_parquet)
return results
def _write_parquet(records: list[dict[str, Any]], path: Path) -> None:
"""Flatten nested dicts to string columns and write Parquet."""
import pandas as pd # lazy import — only required for Parquet output
df = pd.DataFrame(records)
# Nested dicts (bounds, dataset_tags, band_tags) must be serialised
for col in ("bounds", "dataset_tags", "band_tags"):
if col in df.columns:
df[col] = df[col].apply(
lambda v: json.dumps(v) if isinstance(v, dict) else v
)
# res is a list — store as string to keep Parquet schema simple
if "res" in df.columns:
df["res"] = df["res"].apply(str)
df.to_parquet(path, index=False)
logging.info("Parquet written to %s", path)
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
if __name__ == "__main__":
tile_dir = Path("tiles/")
all_tifs = sorted(tile_dir.glob("**/*.tif"))
run_batch_extraction(
all_tifs,
output_json=Path("metadata_batch.json"),
output_parquet=Path("metadata_batch.parquet"),
)
Variant patterns
Variant 1 — Cloud object storage (S3 / GCS)
When files live in cloud buckets, open them via rasterio’s GDAL virtual filesystem (/vsis3/, /vsigs/). The header read still avoids a full download because Cloud-Optimized GeoTIFF structure places the TIFF header and IFD at the front of the file, so GDAL fetches only the first HTTP range.
import boto3
import rasterio
from rasterio.session import AWSSession
def extract_s3_metadata(s3_uri: str) -> dict:
"""
s3_uri format: "s3://bucket/path/to/file.tif"
Reuses an authenticated boto3 session across threads.
"""
boto_session = boto3.Session() # reads credentials from env / instance role
with rasterio.Env(AWSSession(boto_session)):
# /vsis3/ path is constructed automatically by rasterio from the s3:// URI
with rasterio.open(s3_uri) as src:
return {
"uri": s3_uri,
"crs": src.crs.to_string() if src.crs else None,
"bounds": src.bounds._asdict(),
"res": list(src.res),
"count": src.count,
"dtype": src.dtypes[0],
}
Reduce max_workers to 8–12 for S3 to avoid HTTP 503 throttling. Enable GDAL_HTTP_UNSAFESSL=YES only if your bucket endpoint uses a self-signed certificate.
Variant 2 — Parquet output for enterprise catalogs
JSON works well for debugging batches under ~50 000 files. Above that, switch to Parquet. The _write_parquet helper in the main script handles this, but you can also build the DataFrame incrementally when RAM is tight:
import pyarrow as pa
import pyarrow.parquet as pq
def stream_to_parquet(records_iter, output_path: Path, batch_size: int = 5_000) -> None:
"""Write results in batches to cap memory usage during conversion."""
writer = None
batch: list[dict] = []
for record in records_iter:
batch.append(record)
if len(batch) >= batch_size:
table = pa.Table.from_pylist(batch)
if writer is None:
writer = pq.ParquetWriter(output_path, table.schema)
writer.write_table(table)
batch.clear()
# Flush the final partial batch
if batch:
table = pa.Table.from_pylist(batch)
if writer is None:
writer = pq.ParquetWriter(output_path, table.schema)
writer.write_table(table)
if writer:
writer.close()
Variant 3 — Resume interrupted batches with a checkpoint file
For archives exceeding 100 000 files, maintain a checkpoint so a crashed run does not reprocess already-extracted files:
import json
from pathlib import Path
def load_checkpoint(checkpoint_path: Path) -> set[str]:
"""Return the set of already-processed file paths."""
if not checkpoint_path.exists():
return set()
with checkpoint_path.open() as f:
return {line.strip() for line in f if line.strip()}
def save_checkpoint(checkpoint_path: Path, filepath: str) -> None:
"""Append one processed path to the checkpoint file."""
with checkpoint_path.open("a") as f:
f.write(filepath + "\n")
# Usage: filter before submitting to the pool
checkpoint = Path("extraction_checkpoint.txt")
already_done = load_checkpoint(checkpoint)
remaining = [p for p in all_tifs if str(p) not in already_done]
Common errors
NotGeoreferencedWarning: Dataset has no geotransform
The file lacks an affine transform; src.transform returns the default identity matrix. Fix: filter these files out before the batch or flag record["status"] = "no_georef" and exclude them from catalog ingestion.
CPLE_OpenFailed: No such file or directory
The path string was constructed incorrectly (e.g. a mixed-OS path separator). Fix: always use pathlib.Path objects and call str(path) only when passing to rasterio.open().
RasterioIOError: HTTP error 403
Cloud bucket access is denied. Fix: confirm your boto3.Session() picks up the correct credentials; for EC2 instances check the instance profile IAM policy includes s3:GetObject.
Related
- Extracting and Parsing Raster Metadata — parent page covering single-file inspection, GDAL tag structures, and projection metadata.
- Understanding Cloud-Optimized GeoTIFF Structure — why COG headers are cheap to read over HTTP range requests, which underpins the S3 variant above.
- Querying STAC Catalogs Programmatically — the natural next step once your batch-extracted metadata is normalised: register assets into a STAC catalog and query by bounding box and date.
- Handling Pixel Resolution and Scaling — use the
restuples from batch extraction to drive resampling and window-read decisions. - Spectral Index Calculation Pipelines — batch-extracted band count and dtype tell you which files have the required spectral bands before kicking off an index computation job.