Spaces:
Sleeping
Sleeping
| """Import GBIF occurrence CSV/TSV downloads (tab-separated) into our parquet cache format. | |
| GBIF standard download columns differ slightly from our API-derived fields: | |
| - 'datasetKey' instead of 'datasetName' | |
| - 'countryCode' (ISO 2-letter) instead of 'country' (full name) | |
| - No 'country' full-name column | |
| """ | |
| import pandas as pd | |
| from pathlib import Path | |
| from src.utils import CURRENT_YEAR | |
| GBIF_CSV_RENAME = { | |
| "datasetKey": "datasetName", | |
| } | |
| TARGET_FIELDS = [ | |
| "gbifID", "occurrenceID", "decimalLatitude", "decimalLongitude", | |
| "year", "month", "day", "eventDate", | |
| "datasetName", "institutionCode", "recordedBy", | |
| "scientificName", "species", "country", "countryCode", | |
| "stateProvince", "locality", "coordinateUncertaintyInMeters", | |
| ] | |
| # Columns we need from the raw file (including aliases) | |
| _READ_COLS = set(TARGET_FIELDS) | {"datasetKey"} | |
| _CHUNK_SIZE = 200_000 | |
| def _process_chunk(chunk: pd.DataFrame, exclude_current_year: bool) -> pd.DataFrame: | |
| # Rename datasetKey -> datasetName only when datasetName isn't also present | |
| if "datasetKey" in chunk.columns and "datasetName" in chunk.columns: | |
| chunk = chunk.drop(columns=["datasetKey"]) | |
| elif "datasetKey" in chunk.columns: | |
| chunk = chunk.rename(columns=GBIF_CSV_RENAME) | |
| # Numeric conversions | |
| chunk["decimalLatitude"] = pd.to_numeric(chunk.get("decimalLatitude"), errors="coerce") | |
| chunk["decimalLongitude"] = pd.to_numeric(chunk.get("decimalLongitude"), errors="coerce") | |
| chunk["year"] = pd.to_numeric(chunk.get("year"), errors="coerce") | |
| chunk["month"] = pd.to_numeric(chunk.get("month"), errors="coerce") | |
| chunk["day"] = pd.to_numeric(chunk.get("day"), errors="coerce") | |
| if "coordinateUncertaintyInMeters" in chunk.columns: | |
| chunk["coordinateUncertaintyInMeters"] = pd.to_numeric( | |
| chunk["coordinateUncertaintyInMeters"], errors="coerce" | |
| ) | |
| chunk = chunk.dropna(subset=["decimalLatitude", "decimalLongitude"]) | |
| if exclude_current_year and "year" in chunk.columns: | |
| chunk = chunk[chunk["year"] < CURRENT_YEAR] | |
| for col in TARGET_FIELDS: | |
| if col not in chunk.columns: | |
| chunk[col] = None | |
| return chunk[TARGET_FIELDS].copy() | |
| def import_gbif_csv(filepath: str | Path, exclude_current_year: bool = True) -> pd.DataFrame: | |
| """Read a GBIF CSV/TSV download and return a cleaned DataFrame. | |
| Reads only the columns we need (huge memory saving for large files), | |
| processes in chunks to handle files of any size, skips malformed lines. | |
| """ | |
| filepath = Path(filepath) | |
| if not filepath.exists(): | |
| raise FileNotFoundError(f"Datei nicht gefunden: {filepath}") | |
| # Detect separator and available columns | |
| with open(filepath, "r", encoding="utf-8", errors="replace") as f: | |
| header_line = f.readline() | |
| sep = "\t" if "\t" in header_line else "," | |
| available_cols = [c.strip() for c in header_line.split(sep)] | |
| usecols = [c for c in available_cols if c in _READ_COLS] | |
| chunks = pd.read_csv( | |
| filepath, | |
| sep=sep, | |
| encoding="utf-8", | |
| encoding_errors="replace", | |
| low_memory=False, | |
| dtype=str, | |
| on_bad_lines="skip", | |
| usecols=usecols, | |
| chunksize=_CHUNK_SIZE, | |
| ) | |
| parts = [_process_chunk(chunk, exclude_current_year) for chunk in chunks] | |
| if not parts: | |
| return pd.DataFrame(columns=TARGET_FIELDS) | |
| return pd.concat(parts, ignore_index=True) | |