ArtenTracker / src /csv_importer.py
Johannes
Initial deployment (no data - downloaded from HF Dataset at startup)
0d4a0ba
"""Import GBIF occurrence CSV/TSV downloads (tab-separated) into our parquet cache format.
GBIF standard download columns differ slightly from our API-derived fields:
- 'datasetKey' instead of 'datasetName'
- 'countryCode' (ISO 2-letter) instead of 'country' (full name)
- No 'country' full-name column
"""
import pandas as pd
from pathlib import Path
from src.utils import CURRENT_YEAR
GBIF_CSV_RENAME = {
"datasetKey": "datasetName",
}
TARGET_FIELDS = [
"gbifID", "occurrenceID", "decimalLatitude", "decimalLongitude",
"year", "month", "day", "eventDate",
"datasetName", "institutionCode", "recordedBy",
"scientificName", "species", "country", "countryCode",
"stateProvince", "locality", "coordinateUncertaintyInMeters",
]
# Columns we need from the raw file (including aliases)
_READ_COLS = set(TARGET_FIELDS) | {"datasetKey"}
_CHUNK_SIZE = 200_000
def _process_chunk(chunk: pd.DataFrame, exclude_current_year: bool) -> pd.DataFrame:
# Rename datasetKey -> datasetName only when datasetName isn't also present
if "datasetKey" in chunk.columns and "datasetName" in chunk.columns:
chunk = chunk.drop(columns=["datasetKey"])
elif "datasetKey" in chunk.columns:
chunk = chunk.rename(columns=GBIF_CSV_RENAME)
# Numeric conversions
chunk["decimalLatitude"] = pd.to_numeric(chunk.get("decimalLatitude"), errors="coerce")
chunk["decimalLongitude"] = pd.to_numeric(chunk.get("decimalLongitude"), errors="coerce")
chunk["year"] = pd.to_numeric(chunk.get("year"), errors="coerce")
chunk["month"] = pd.to_numeric(chunk.get("month"), errors="coerce")
chunk["day"] = pd.to_numeric(chunk.get("day"), errors="coerce")
if "coordinateUncertaintyInMeters" in chunk.columns:
chunk["coordinateUncertaintyInMeters"] = pd.to_numeric(
chunk["coordinateUncertaintyInMeters"], errors="coerce"
)
chunk = chunk.dropna(subset=["decimalLatitude", "decimalLongitude"])
if exclude_current_year and "year" in chunk.columns:
chunk = chunk[chunk["year"] < CURRENT_YEAR]
for col in TARGET_FIELDS:
if col not in chunk.columns:
chunk[col] = None
return chunk[TARGET_FIELDS].copy()
def import_gbif_csv(filepath: str | Path, exclude_current_year: bool = True) -> pd.DataFrame:
"""Read a GBIF CSV/TSV download and return a cleaned DataFrame.
Reads only the columns we need (huge memory saving for large files),
processes in chunks to handle files of any size, skips malformed lines.
"""
filepath = Path(filepath)
if not filepath.exists():
raise FileNotFoundError(f"Datei nicht gefunden: {filepath}")
# Detect separator and available columns
with open(filepath, "r", encoding="utf-8", errors="replace") as f:
header_line = f.readline()
sep = "\t" if "\t" in header_line else ","
available_cols = [c.strip() for c in header_line.split(sep)]
usecols = [c for c in available_cols if c in _READ_COLS]
chunks = pd.read_csv(
filepath,
sep=sep,
encoding="utf-8",
encoding_errors="replace",
low_memory=False,
dtype=str,
on_bad_lines="skip",
usecols=usecols,
chunksize=_CHUNK_SIZE,
)
parts = [_process_chunk(chunk, exclude_current_year) for chunk in chunks]
if not parts:
return pd.DataFrame(columns=TARGET_FIELDS)
return pd.concat(parts, ignore_index=True)