import argparse
import csv
import html
import json
import os
import re
import sqlite3
import urllib.parse
from datetime import datetime, timezone
from pathlib import Path
import chromadb
import httpx
from dotenv import load_dotenv
from openai import OpenAI
load_dotenv()
BASE_DIR = Path(__file__).resolve().parent
DATA_DIR = BASE_DIR / "data"
SPECIES_CSV = BASE_DIR / "unique_species_labels.csv"
RAG_DB_PATH = Path(os.getenv("RAG_DB_PATH", str(DATA_DIR / "plant_rag")))
SQLITE_DB_PATH = Path(os.getenv("PLANTS_SQLITE_PATH", str(DATA_DIR / "plants.db")))
DEFAULT_MODEL = os.getenv("OPENAI_MODEL", "gpt-4o-mini")
PROFILE_KEYS = (
"annaffiatura_gg",
"annaffiatura_time",
"luce",
"temperatura",
"umidita",
"altezza_media",
"pulizia",
"terriccio",
"concimazione",
"prevenzione",
)
RHS_SEARCH_URL = "https://www.rhs.org.uk/plants/search-results?query={query}"
MISSOURI_SEARCH_URL = (
"https://www.missouribotanicalgarden.org/PlantFinder/PlantFinderSearch.aspx?basic={query}"
)
EPPO_SEARCH_URL = "https://gd.eppo.int/search?query={query}"
HTTP_TIMEOUT = 12.0
HTTP_USER_AGENT = os.getenv(
"EXTERNAL_SOURCES_USER_AGENT",
"clorofilla/1.0 (contact: local-dev)",
)
def init_db(conn: sqlite3.Connection) -> None:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS plants (
id INTEGER PRIMARY KEY AUTOINCREMENT,
species_name TEXT NOT NULL UNIQUE,
indexed INTEGER NOT NULL DEFAULT 0,
image_paths TEXT,
annaffiatura_gg INTEGER,
annaffiatura_time TEXT,
luce TEXT,
temperatura TEXT,
umidita TEXT,
altezza_media TEXT,
pulizia TEXT,
terriccio TEXT,
concimazione TEXT,
prevenzione TEXT,
updated_at TEXT NOT NULL
)
"""
)
# Migration for existing DBs created before image_paths support.
try:
conn.execute("ALTER TABLE plants ADD COLUMN image_paths TEXT")
conn.commit()
except Exception:
pass
conn.execute(
"""
CREATE TABLE IF NOT EXISTS leafsnap_aliases (
leafsnap_label TEXT PRIMARY KEY,
db_species_name TEXT NOT NULL
)
"""
)
conn.commit()
def load_species() -> list[str]:
species: list[str] = []
with open(SPECIES_CSV, "r", encoding="utf-8") as f:
for row in csv.DictReader(f):
name = (row.get("species_name") or "").strip()
if name:
species.append(name)
return species
def get_rag_collection():
client = chromadb.PersistentClient(path=str(RAG_DB_PATH))
return client.get_collection(name="plants")
def get_rag_context(collection, species_name: str, max_chars: int = 9000) -> str:
results = collection.get(
where={"species_name": {"$eq": species_name}},
limit=20,
)
docs = (results or {}).get("documents", [])
if not docs:
return ""
context = "\n\n".join(docs)
if len(context) > max_chars:
context = context[:max_chars] + "\n..."
return context
def _clean_json_payload(raw_text: str) -> str:
txt = (raw_text or "").strip()
if txt.startswith("```"):
txt = txt.strip("`")
if txt.startswith("json"):
txt = txt[4:]
return txt.strip()
def normalize_profile_data(data: dict) -> dict:
allowed_keys = set(PROFILE_KEYS)
normalized = {k: data.get(k) for k in allowed_keys}
raw_days = normalized.get("annaffiatura_gg")
if raw_days is None:
normalized["annaffiatura_gg"] = None
else:
try:
normalized["annaffiatura_gg"] = int(raw_days)
except (TypeError, ValueError):
normalized["annaffiatura_gg"] = None
valid_time = {"mattino", "sera", "entrambi"}
t = normalized.get("annaffiatura_time")
if isinstance(t, str):
t = t.strip().lower()
normalized["annaffiatura_time"] = t if t in valid_time else None
else:
normalized["annaffiatura_time"] = None
for key in allowed_keys - {"annaffiatura_gg", "annaffiatura_time"}:
value = normalized.get(key)
if value is None:
continue
normalized[key] = str(value).strip() or None
return normalized
def _html_to_text(value: str) -> str:
txt = re.sub(r"