File size: 3,441 Bytes
0d4a0ba
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
"""Import GBIF occurrence CSV/TSV downloads (tab-separated) into our parquet cache format.

GBIF standard download columns differ slightly from our API-derived fields:
- 'datasetKey' instead of 'datasetName'
- 'countryCode' (ISO 2-letter) instead of 'country' (full name)
- No 'country' full-name column
"""

import pandas as pd
from pathlib import Path

from src.utils import CURRENT_YEAR

GBIF_CSV_RENAME = {
    "datasetKey": "datasetName",
}

TARGET_FIELDS = [
    "gbifID", "occurrenceID", "decimalLatitude", "decimalLongitude",
    "year", "month", "day", "eventDate",
    "datasetName", "institutionCode", "recordedBy",
    "scientificName", "species", "country", "countryCode",
    "stateProvince", "locality", "coordinateUncertaintyInMeters",
]

# Columns we need from the raw file (including aliases)
_READ_COLS = set(TARGET_FIELDS) | {"datasetKey"}
_CHUNK_SIZE = 200_000


def _process_chunk(chunk: pd.DataFrame, exclude_current_year: bool) -> pd.DataFrame:
    # Rename datasetKey -> datasetName only when datasetName isn't also present
    if "datasetKey" in chunk.columns and "datasetName" in chunk.columns:
        chunk = chunk.drop(columns=["datasetKey"])
    elif "datasetKey" in chunk.columns:
        chunk = chunk.rename(columns=GBIF_CSV_RENAME)

    # Numeric conversions
    chunk["decimalLatitude"] = pd.to_numeric(chunk.get("decimalLatitude"), errors="coerce")
    chunk["decimalLongitude"] = pd.to_numeric(chunk.get("decimalLongitude"), errors="coerce")
    chunk["year"] = pd.to_numeric(chunk.get("year"), errors="coerce")
    chunk["month"] = pd.to_numeric(chunk.get("month"), errors="coerce")
    chunk["day"] = pd.to_numeric(chunk.get("day"), errors="coerce")
    if "coordinateUncertaintyInMeters" in chunk.columns:
        chunk["coordinateUncertaintyInMeters"] = pd.to_numeric(
            chunk["coordinateUncertaintyInMeters"], errors="coerce"
        )

    chunk = chunk.dropna(subset=["decimalLatitude", "decimalLongitude"])

    if exclude_current_year and "year" in chunk.columns:
        chunk = chunk[chunk["year"] < CURRENT_YEAR]

    for col in TARGET_FIELDS:
        if col not in chunk.columns:
            chunk[col] = None

    return chunk[TARGET_FIELDS].copy()


def import_gbif_csv(filepath: str | Path, exclude_current_year: bool = True) -> pd.DataFrame:
    """Read a GBIF CSV/TSV download and return a cleaned DataFrame.

    Reads only the columns we need (huge memory saving for large files),
    processes in chunks to handle files of any size, skips malformed lines.
    """
    filepath = Path(filepath)
    if not filepath.exists():
        raise FileNotFoundError(f"Datei nicht gefunden: {filepath}")

    # Detect separator and available columns
    with open(filepath, "r", encoding="utf-8", errors="replace") as f:
        header_line = f.readline()
    sep = "\t" if "\t" in header_line else ","
    available_cols = [c.strip() for c in header_line.split(sep)]
    usecols = [c for c in available_cols if c in _READ_COLS]

    chunks = pd.read_csv(
        filepath,
        sep=sep,
        encoding="utf-8",
        encoding_errors="replace",
        low_memory=False,
        dtype=str,
        on_bad_lines="skip",
        usecols=usecols,
        chunksize=_CHUNK_SIZE,
    )

    parts = [_process_chunk(chunk, exclude_current_year) for chunk in chunks]
    if not parts:
        return pd.DataFrame(columns=TARGET_FIELDS)
    return pd.concat(parts, ignore_index=True)