"""Import GBIF occurrence CSV/TSV downloads (tab-separated) into our parquet cache format. GBIF standard download columns differ slightly from our API-derived fields: - 'datasetKey' instead of 'datasetName' - 'countryCode' (ISO 2-letter) instead of 'country' (full name) - No 'country' full-name column """ import pandas as pd from pathlib import Path from src.utils import CURRENT_YEAR GBIF_CSV_RENAME = { "datasetKey": "datasetName", } TARGET_FIELDS = [ "gbifID", "occurrenceID", "decimalLatitude", "decimalLongitude", "year", "month", "day", "eventDate", "datasetName", "institutionCode", "recordedBy", "scientificName", "species", "country", "countryCode", "stateProvince", "locality", "coordinateUncertaintyInMeters", ] # Columns we need from the raw file (including aliases) _READ_COLS = set(TARGET_FIELDS) | {"datasetKey"} _CHUNK_SIZE = 200_000 def _process_chunk(chunk: pd.DataFrame, exclude_current_year: bool) -> pd.DataFrame: # Rename datasetKey -> datasetName only when datasetName isn't also present if "datasetKey" in chunk.columns and "datasetName" in chunk.columns: chunk = chunk.drop(columns=["datasetKey"]) elif "datasetKey" in chunk.columns: chunk = chunk.rename(columns=GBIF_CSV_RENAME) # Numeric conversions chunk["decimalLatitude"] = pd.to_numeric(chunk.get("decimalLatitude"), errors="coerce") chunk["decimalLongitude"] = pd.to_numeric(chunk.get("decimalLongitude"), errors="coerce") chunk["year"] = pd.to_numeric(chunk.get("year"), errors="coerce") chunk["month"] = pd.to_numeric(chunk.get("month"), errors="coerce") chunk["day"] = pd.to_numeric(chunk.get("day"), errors="coerce") if "coordinateUncertaintyInMeters" in chunk.columns: chunk["coordinateUncertaintyInMeters"] = pd.to_numeric( chunk["coordinateUncertaintyInMeters"], errors="coerce" ) chunk = chunk.dropna(subset=["decimalLatitude", "decimalLongitude"]) if exclude_current_year and "year" in chunk.columns: chunk = chunk[chunk["year"] < CURRENT_YEAR] for col in TARGET_FIELDS: if col not in chunk.columns: chunk[col] = None return chunk[TARGET_FIELDS].copy() def import_gbif_csv(filepath: str | Path, exclude_current_year: bool = True) -> pd.DataFrame: """Read a GBIF CSV/TSV download and return a cleaned DataFrame. Reads only the columns we need (huge memory saving for large files), processes in chunks to handle files of any size, skips malformed lines. """ filepath = Path(filepath) if not filepath.exists(): raise FileNotFoundError(f"Datei nicht gefunden: {filepath}") # Detect separator and available columns with open(filepath, "r", encoding="utf-8", errors="replace") as f: header_line = f.readline() sep = "\t" if "\t" in header_line else "," available_cols = [c.strip() for c in header_line.split(sep)] usecols = [c for c in available_cols if c in _READ_COLS] chunks = pd.read_csv( filepath, sep=sep, encoding="utf-8", encoding_errors="replace", low_memory=False, dtype=str, on_bad_lines="skip", usecols=usecols, chunksize=_CHUNK_SIZE, ) parts = [_process_chunk(chunk, exclude_current_year) for chunk in chunks] if not parts: return pd.DataFrame(columns=TARGET_FIELDS) return pd.concat(parts, ignore_index=True)