GreenAssistent / api.py
outshine84
cache schede e miglioramenti grafici barra
a1f4989
import json
import logging
import os
import re
import shutil
import sqlite3
import tempfile
import threading
import importlib
from datetime import datetime, timedelta
from logging.handlers import TimedRotatingFileHandler
from pathlib import Path
from typing import Any
from urllib.parse import urlparse, unquote
from uuid import uuid4
import cloudinary
import cloudinary.uploader
import chromadb
import httpx
from dotenv import load_dotenv
from fastapi import FastAPI, File, UploadFile, HTTPException, Query, Header
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.responses import HTMLResponse, JSONResponse, FileResponse
from openai import OpenAI
from pydantic import BaseModel, Field
load_dotenv()
def _default_asset_path(filename: str) -> str:
if os.getenv("SPACE_ID") and Path("/data").exists():
return str(Path("/data") / "greenassistent-assets" / filename)
return str(Path("data") / filename)
INDEX_PATH = os.getenv("PLANCLEF_INDEX_PATH", _default_asset_path("planclef.faiss"))
CACHE_PATH = os.getenv("PLANCLEF_CACHE_PATH", _default_asset_path("planclef_cache.pt"))
MODEL_NAME = os.getenv("PLANCLEF_MODEL_NAME", "ViT-B-32")
LEAFSNAP_INDEX_PATH = os.getenv("LEAFSNAP_INDEX_PATH", _default_asset_path("leafsnap.faiss"))
LEAFSNAP_CACHE_PATH = os.getenv("LEAFSNAP_CACHE_PATH", _default_asset_path("leafsnap_cache.pt"))
RAG_DB_PATH = os.getenv("RAG_DB_PATH", _default_asset_path("plant_rag"))
WIKI_USER_AGENT = os.getenv(
"WIKI_USER_AGENT",
"clorofilla/1.0 (contact: local-dev)",
)
OPENAI_MODEL = os.getenv("OPENAI_MODEL", "gpt-4o-mini")
def _default_plants_db_path() -> str:
# On Hugging Face Spaces with persistent storage enabled, /data survives restarts.
if os.getenv("SPACE_ID") and Path("/data").exists():
return "/data/plants.db"
return "data/plants.db"
def _default_user_plants_db_path() -> str:
# Keep user-saved plants in a dedicated sqlite file to avoid coupling with plants catalog growth.
if os.getenv("SPACE_ID") and Path("/data").exists():
return "/data/user_plants.db"
return "data/user_plants.db"
PLANTS_SQLITE_PATH = os.getenv("PLANTS_SQLITE_PATH", _default_plants_db_path())
USER_PLANTS_SQLITE_PATH = os.getenv("USER_PLANTS_SQLITE_PATH", _default_user_plants_db_path())
MY_SQL_CONNECTION_STRING = os.getenv("MY_SQL", "").strip()
class _MySQLResult:
def __init__(self, rows: list[dict[str, Any]] | None = None, lastrowid: int = 0):
self._rows = rows or []
self.lastrowid = int(lastrowid or 0)
def fetchone(self):
return self._rows[0] if self._rows else None
def fetchall(self):
return self._rows
class _MySQLCompatConnection:
def __init__(self, dsn: str):
pymysql_mod, dict_cursor = _load_pymysql()
if pymysql_mod is None or dict_cursor is None:
raise RuntimeError("MY_SQL impostato ma pymysql non disponibile. Installa pymysql.")
params = _parse_mysql_dsn(dsn)
self._conn = pymysql_mod.connect(
host=params["host"],
port=params["port"],
user=params["user"],
password=params["password"],
database=params["database"],
charset="utf8mb4",
autocommit=False,
cursorclass=dict_cursor,
)
def execute(self, query: str, params: tuple | list | None = None):
converted = _to_mysql_query(query)
with self._conn.cursor() as cur:
cur.execute(converted, tuple(params or ()))
rows = cur.fetchall() if cur.description else []
return _MySQLResult(rows=rows, lastrowid=cur.lastrowid or 0)
def executemany(self, query: str, params_seq: list[tuple] | tuple):
converted = _to_mysql_query(query)
with self._conn.cursor() as cur:
cur.executemany(converted, params_seq)
return _MySQLResult(rows=[], lastrowid=cur.lastrowid or 0)
def commit(self):
self._conn.commit()
def rollback(self):
self._conn.rollback()
def close(self):
self._conn.close()
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
try:
if exc_type:
self.rollback()
else:
self.commit()
finally:
self.close()
def _parse_mysql_dsn(dsn: str) -> dict[str, Any]:
parsed = urlparse(dsn)
if parsed.scheme not in {"mysql", "mysql+pymysql"}:
raise RuntimeError("MY_SQL non valido: usa formato mysql://user:pass@host:3306/database")
host = parsed.hostname or "localhost"
port = int(parsed.port or 3306)
user = unquote(parsed.username or "")
password = unquote(parsed.password or "")
database = (parsed.path or "").lstrip("/")
if not user or not database:
raise RuntimeError("MY_SQL non valido: user e database sono obbligatori")
return {
"host": host,
"port": port,
"user": user,
"password": password,
"database": database,
}
def _load_pymysql():
try:
pymysql_mod = importlib.import_module("pymysql")
cursors_mod = importlib.import_module("pymysql.cursors")
dict_cursor = getattr(cursors_mod, "DictCursor", None)
return pymysql_mod, dict_cursor
except Exception:
return None, None
def _to_mysql_query(query: str) -> str:
converted = query.replace("?", "%s")
converted = converted.replace("INSERT OR IGNORE", "INSERT IGNORE")
return converted
def _is_mysql_enabled() -> bool:
return bool(MY_SQL_CONNECTION_STRING)
def _is_mysql_conn(conn: Any) -> bool:
return isinstance(conn, _MySQLCompatConnection)
# Cloudinary configuration (optional - photo upload disabled if not set)
CLOUDINARY_CLOUD_NAME = os.getenv("CLOUDINARY_CLOUD_NAME", "")
CLOUDINARY_API_KEY = os.getenv("CLOUDINARY_API_KEY", "")
CLOUDINARY_API_SECRET = os.getenv("CLOUDINARY_API_SECRET", "")
if CLOUDINARY_CLOUD_NAME and CLOUDINARY_API_KEY and CLOUDINARY_API_SECRET:
cloudinary.config(
cloud_name=CLOUDINARY_CLOUD_NAME,
api_key=CLOUDINARY_API_KEY,
api_secret=CLOUDINARY_API_SECRET,
secure=True,
)
GOOGLE_CLIENT_IDS = [
value.strip()
for value in os.getenv("GOOGLE_CLIENT_ID", "").split(",")
if value.strip()
]
REQUIRE_GOOGLE_AUTH = os.getenv("REQUIRE_GOOGLE_AUTH", "0").strip().lower() in {
"1",
"true",
"yes",
"on",
}
ADMIN_USERS = {
value.strip().lower()
for value in os.getenv("ADMIN_USERS", "").split(",")
if value.strip()
}
PWA_DIST_DIR = Path(os.getenv("PWA_DIST_DIR", "pwa-app/dist"))
PLANT_CARD_CACHE_ENABLED = os.getenv("PLANT_CARD_CACHE_ENABLED", "1").strip().lower() in {
"1",
"true",
"yes",
"on",
}
index: Any = None
rag_collection: Any = None
logger = logging.getLogger("ai_green_assistant.api")
species_build_jobs: dict[str, dict[str, Any]] = {}
species_build_jobs_lock = threading.Lock()
def configure_logging() -> None:
"""Configure logging for all ai_green_assistant modules."""
# Configure the parent logger so all child loggers inherit the handlers
root_logger = logging.getLogger("ai_green_assistant")
if root_logger.handlers:
return
log_level_name = os.getenv("LOG_LEVEL", "INFO").upper()
log_level = getattr(logging, log_level_name, logging.INFO)
log_dir = Path(os.getenv("LOG_DIR", "logs"))
log_dir.mkdir(parents=True, exist_ok=True)
log_file = log_dir / os.getenv("LOG_FILE", "api.log")
fmt = logging.Formatter(
"%(asctime)s | %(levelname)s | %(name)s | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
file_handler = TimedRotatingFileHandler(
filename=log_file,
when="midnight",
interval=1,
backupCount=14,
encoding="utf-8",
utc=False,
)
file_handler.setFormatter(fmt)
file_handler.setLevel(log_level)
console_handler = logging.StreamHandler()
console_handler.setFormatter(fmt)
console_handler.setLevel(log_level)
root_logger.setLevel(log_level)
root_logger.propagate = True
root_logger.addHandler(file_handler)
root_logger.addHandler(console_handler)
configure_logging()
def _truncate(value: Any, max_len: int = 500) -> str:
text = str(value or "")
if len(text) <= max_len:
return text
return text[:max_len] + "..."
def _log_api(endpoint: str, event: str, payload: dict[str, Any]) -> None:
try:
serialized = json.dumps(payload, ensure_ascii=False, default=str)
except Exception:
serialized = str(payload)
logger.info("%s | %s | %s", endpoint, event, serialized)
def _response_payload_for_log(response: Any) -> dict[str, Any]:
payload: dict[str, Any] = {
"status_code": getattr(response, "status_code", None),
"content_type": getattr(response, "media_type", None) or getattr(response, "headers", {}).get("content-type", ""),
}
body = getattr(response, "body", None)
if not isinstance(body, (bytes, bytearray)) or not body:
return payload
text = body.decode("utf-8", errors="replace")
content_type = str(payload["content_type"] or "").lower()
if "application/json" in content_type:
try:
payload["body"] = json.loads(text)
except Exception:
payload["body"] = _truncate(text)
return payload
if content_type.startswith("text/") or "xml" in content_type or "javascript" in content_type:
payload["body"] = _truncate(text)
return payload
def _serve_pwa_index() -> HTMLResponse:
pwa_index = PWA_DIST_DIR / "index.html"
if pwa_index.exists():
return HTMLResponse(content=pwa_index.read_text(encoding="utf-8"))
fallback_ui = Path(__file__).with_name("ui.html")
if fallback_ui.exists():
return HTMLResponse(content=fallback_ui.read_text(encoding="utf-8"))
raise HTTPException(status_code=503, detail="Frontend non disponibile.")
def _serve_pwa_file(filename: str, media_type: str | None = None) -> FileResponse:
path = PWA_DIST_DIR / filename
if not path.exists() or not path.is_file():
raise HTTPException(status_code=404, detail=f"File statico non trovato: {filename}")
return FileResponse(path=str(path), media_type=media_type)
def _format_datetime_display(value: Any) -> Any:
raw_value = str(value or "").strip()
if not raw_value:
return value
try:
parsed = datetime.fromisoformat(raw_value.replace("Z", "+00:00"))
except ValueError:
return value
return parsed.strftime("%d/%m/%Y %H:%M:%S")
def _normalize_image_path(raw_path: str) -> str:
"""Normalize image path to be relative to data/images."""
normalized = str(raw_path or "").replace("\\", "/").strip().lstrip("/")
if normalized.lower().startswith("data/"):
normalized = normalized[5:]
if normalized.lower().startswith("images/"):
normalized = normalized[7:]
return normalized
# ---------------------------------------------------------------------------
# GPT-4o vision fallback helpers
# ---------------------------------------------------------------------------
FAISS_CONFIDENCE_THRESHOLD = float(os.getenv("FAISS_CONFIDENCE_THRESHOLD", "0.82"))
FAISS_AMBIGUITY_MARGIN = float(os.getenv("FAISS_AMBIGUITY_MARGIN", "0.015"))
RRF_AMBIGUITY_MARGIN = float(os.getenv("RRF_AMBIGUITY_MARGIN", "0.0025"))
FORCE_OPENAI_FALLBACK = os.getenv("FORCE_OPENAI_FALLBACK", "0").strip().lower() in {
"1", "true", "yes", "on"
}
def _should_trigger_gpt_fallback(top_score: float, results: list[tuple[str, float, list]]) -> tuple[bool, str]:
"""Decide whether GPT vision fallback should run.
Triggers on low FAISS confidence, explicit force flag, or very ambiguous top-vs-second gap.
"""
if FORCE_OPENAI_FALLBACK:
return True, "forced_by_env"
if top_score < FAISS_CONFIDENCE_THRESHOLD:
return True, "low_top_score"
if len(results) < 2:
return False, "single_result"
top_result_score = float(results[0][1])
second_result_score = float(results[1][1])
gap = max(0.0, top_result_score - second_result_score)
rrf_like = top_result_score <= 0.1 and second_result_score <= 0.1
if rrf_like and gap < RRF_AMBIGUITY_MARGIN:
return True, "ambiguous_rrf_gap"
if (not rrf_like) and gap < FAISS_AMBIGUITY_MARGIN:
return True, "ambiguous_similarity_gap"
return False, "high_confidence"
def _gpt_vision_identify_plant(
image_path: str,
api_key: str,
candidate_species: list[str] | None = None,
) -> tuple[str | None, str]:
"""Ask GPT-4o to identify the plant species from an image.
Returns (scientific binomial name or None, diagnostic reason).
"""
import base64
suffix = Path(image_path).suffix.lower()
mime_map = {".jpg": "image/jpeg", ".jpeg": "image/jpeg", ".png": "image/png",
".webp": "image/webp", ".gif": "image/gif"}
mime = mime_map.get(suffix, "image/jpeg")
try:
with open(image_path, "rb") as fh:
b64 = base64.b64encode(fh.read()).decode("utf-8")
client = OpenAI(api_key=api_key)
model_name = os.getenv("OPENAI_VISION_MODEL", "gpt-4o")
resp = client.chat.completions.create(
model=model_name,
max_tokens=80,
messages=[
{
"role": "user",
"content": [
{
"type": "image_url",
"image_url": {"url": f"data:{mime};base64,{b64}", "detail": "high"},
},
{
"type": "text",
"text": (
"Identify the plant species in this image. "
"Reply with ONLY the scientific Latin binomial name (Genus species). "
"If you cannot identify it, reply exactly: unknown"
),
},
],
}
],
)
raw = (resp.choices[0].message.content or "").strip()
logger.info(f"GPT vision raw output: {raw[:200] if raw else '<empty>'}")
if not raw or raw.lower().startswith("unknown"):
# Second pass: constrain the choice to top FAISS candidates.
if candidate_species:
candidates_text = "\n".join(f"- {name}" for name in candidate_species[:12])
resp2 = client.chat.completions.create(
model=model_name,
max_tokens=80,
messages=[
{
"role": "user",
"content": [
{
"type": "image_url",
"image_url": {"url": f"data:{mime};base64,{b64}", "detail": "high"},
},
{
"type": "text",
"text": (
"Choose the best matching species from this candidate list. "
"Reply with ONLY one exact binomial from the list, or 'unknown'.\n\n"
f"Candidates:\n{candidates_text}"
),
},
],
}
],
)
raw2 = (resp2.choices[0].message.content or "").strip()
logger.info(f"GPT vision candidate-mode output: {raw2[:200] if raw2 else '<empty>'}")
cleaned2 = raw2.replace("*", " ").replace("`", " ").replace("_", " ")
match2 = re.search(r"\b([A-Z][a-z\-]+)\s+([a-z][a-z\-]+)\b", cleaned2)
if match2:
picked = f"{match2.group(1)} {match2.group(2)}"
# Accept only if it is one of the provided candidates.
if any(picked.lower() == c.lower() for c in candidate_species):
return picked, "ok_candidate_mode"
return None, "model returned unknown or empty"
cleaned = raw.replace("*", " ").replace("`", " ").replace("_", " ")
match = re.search(r"\b([A-Z][a-z\-]+)\s+([a-z][a-z\-]+)\b", cleaned)
if not match:
return None, f"no binomial found in model output: {raw[:120]}"
return f"{match.group(1)} {match.group(2)}", "ok"
except Exception as exc:
logger.warning(f"GPT vision fallback failed: {exc}")
return None, f"exception: {type(exc).__name__}: {exc}"
def _insert_draft_plant_if_missing(species_name: str, api_key: str) -> bool:
"""Insert a minimal plant record (indexed=0) if the species is not in plants.db.
Returns True if a new record was inserted, False if it already existed.
"""
with get_plants_db_connection() as conn:
row = conn.execute(
"SELECT id FROM plants WHERE lower(species_name) = lower(?) LIMIT 1",
(species_name.strip(),),
).fetchone()
if row is not None:
return False
# Generate a basic care profile via GPT
profile: dict = {}
if api_key:
try:
client = OpenAI(api_key=api_key)
resp = client.chat.completions.create(
model=OPENAI_MODEL,
temperature=0,
response_format={"type": "json_object"},
messages=[
{
"role": "system",
"content": (
"Sei un botanico professionista. Usa conoscenza generale per stimare "
"i campi di cura della pianta. Rispondi SOLO con JSON valido. "
"Se non sei ragionevolmente sicuro, usa null."
),
},
{
"role": "user",
"content": (
f"Specie: {species_name}\n\n"
"Compila in JSON con queste chiavi esatte (null se incerto):\n"
"annaffiatura_gg (intero o null), annaffiatura_time (mattino|sera|entrambi|null),\n"
"luce, temperatura, umidita, altezza_media, pulizia, terriccio, concimazione, prevenzione."
),
},
],
)
data = json.loads((resp.choices[0].message.content or "{}").strip())
profile = {
"annaffiatura_gg": data.get("annaffiatura_gg") if isinstance(data.get("annaffiatura_gg"), int) else None,
"annaffiatura_time": data.get("annaffiatura_time"),
"luce": data.get("luce"),
"temperatura": data.get("temperatura"),
"umidita": data.get("umidita"),
"altezza_media": data.get("altezza_media"),
"pulizia": data.get("pulizia"),
"terriccio": data.get("terriccio"),
"concimazione": data.get("concimazione"),
"prevenzione": data.get("prevenzione"),
}
except Exception as exc:
logger.warning(f"GPT care profile generation failed for '{species_name}': {exc}")
now_iso = datetime.utcnow().isoformat()
with get_plants_db_connection() as conn:
conn.execute(
"""
INSERT OR IGNORE INTO plants (
species_name, indexed, annaffiatura_gg, annaffiatura_time, luce, temperatura,
umidita, altezza_media, pulizia, terriccio, concimazione, prevenzione, updated_at
) VALUES (?, 0, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
species_name,
profile.get("annaffiatura_gg"),
profile.get("annaffiatura_time"),
profile.get("luce"),
profile.get("temperatura"),
profile.get("umidita"),
profile.get("altezza_media"),
profile.get("pulizia"),
profile.get("terriccio"),
profile.get("concimazione"),
profile.get("prevenzione"),
now_iso,
),
)
conn.commit()
logger.info(f"Draft plant inserted: '{species_name}' (indexed=0)")
return True
def _species_build_status(species_name: str) -> dict[str, Any]:
key = species_name.strip().lower()
with species_build_jobs_lock:
payload = species_build_jobs.get(key)
if payload:
return dict(payload)
profile = get_plant_profile_from_db(species_name)
if profile and profile.get("indexed"):
return {
"species": profile.get("species_name") or species_name,
"status": "completed",
"started_at": None,
"finished_at": profile.get("updated_at"),
"error": None,
"result": {"indexed": True},
}
return {
"species": species_name,
"status": "not_started",
"started_at": None,
"finished_at": None,
"error": None,
"result": None,
}
def _set_species_build_job(species_name: str, **updates: Any) -> None:
key = species_name.strip().lower()
with species_build_jobs_lock:
current = species_build_jobs.get(key, {"species": species_name})
current.update(updates)
species_build_jobs[key] = current
def _run_species_build_job(species_name: str) -> None:
_set_species_build_job(
species_name,
status="running",
started_at=datetime.utcnow().isoformat(),
finished_at=None,
error=None,
)
try:
from add_species_to_faiss import add_to_faiss, fetch_wiki_image_urls, resolve_title
langs = tuple(x.strip().lower() for x in os.getenv("WIKI_LANGS", "it,en").split(",") if x.strip())
max_images = max(4, int(os.getenv("RAG_BUILD_MAX_IMAGES", "8")))
lang, resolved_title = resolve_title(species_name, "", langs)
image_urls = fetch_wiki_image_urls(resolved_title, lang, max_images=max_images)
if not image_urls:
logger.warning(
f"No image URLs found for '{species_name}' on {lang}:{resolved_title}. "
"Continuing build with textual ingestion only."
)
add_result = add_to_faiss(
species_name,
image_urls,
lang=lang,
resolved_title=resolved_title,
model_name=MODEL_NAME,
index_path=Path(INDEX_PATH),
cache_path=Path(CACHE_PATH),
)
hf_synced = False
hf_error = None
if os.getenv("AUTO_SYNC_HF_ASSETS", "1").strip().lower() in {"1", "true", "yes", "on"}:
try:
from upload_hf_assets import DEFAULT_REPO_ID, upload_assets
hf_token = os.getenv("HF_TOKEN", "").strip() or None
uploaded = upload_assets(
repo_id=os.getenv("HF_ASSETS_DATASET_REPO", DEFAULT_REPO_ID),
private=False,
include_plants_db=True,
skip_missing=True,
token=hf_token,
)
hf_synced = uploaded > 0
except Exception as exc:
hf_error = str(exc)
logger.warning(f"HF sync failed for '{species_name}': {exc}")
# Force lazy reload of in-memory search/rag handles after asset update.
global index, rag_collection
index = None
rag_collection = None
_set_species_build_job(
species_name,
status="completed",
finished_at=datetime.utcnow().isoformat(),
error=None,
result={
"species": species_name,
"add_result": add_result,
"hf_synced": hf_synced,
"hf_error": hf_error,
},
)
logger.info(f"Species build completed for '{species_name}'")
except Exception as exc:
_set_species_build_job(
species_name,
status="failed",
finished_at=datetime.utcnow().isoformat(),
error=f"{type(exc).__name__}: {exc}",
)
logger.exception(f"Species build failed for '{species_name}': {exc}")
def _ensure_species_build_job(species_name: str) -> dict[str, Any]:
status = _species_build_status(species_name)
if status.get("status") in {"queued", "running", "completed"}:
return status
_set_species_build_job(
species_name,
species=species_name,
status="queued",
started_at=None,
finished_at=None,
error=None,
result=None,
)
thread = threading.Thread(
target=_run_species_build_job,
args=(species_name,),
daemon=True,
name=f"species-build-{species_name[:24]}",
)
thread.start()
return _species_build_status(species_name)
def _species_to_folder_name(species_name: str) -> str:
normalized = re.sub(r"[^a-z0-9]+", "_", str(species_name or "").lower()).strip("_")
return normalized
def _get_species_preview_image_url(species_name: str) -> str:
image_paths = _get_species_images_from_db(species_name)
for raw_path in image_paths:
if isinstance(raw_path, str) and raw_path.startswith(("http://", "https://")):
return raw_path
normalized_path = _normalize_image_path(str(raw_path or ""))
if not normalized_path:
continue
local_path = Path("data") / "images" / normalized_path
if local_path.exists():
return f"/images/{normalized_path}"
# Backward compatibility: read from legacy RAG metadata if DB is empty.
try:
collection = get_rag_collection()
res = collection.get(
where={"species_name": {"$eq": species_name}},
limit=1,
)
metadatas = res.get("metadatas", []) if res else []
metadata = metadatas[0] if metadatas else {}
image_paths_json = metadata.get("image_paths", "[]") if metadata else "[]"
try:
image_paths = json.loads(image_paths_json)
except (json.JSONDecodeError, TypeError):
image_paths = []
for raw_path in image_paths:
if isinstance(raw_path, str) and raw_path.startswith(("http://", "https://")):
return raw_path
normalized_path = _normalize_image_path(str(raw_path or ""))
if not normalized_path:
continue
local_path = Path("data") / "images" / normalized_path
if local_path.exists():
return f"/images/{normalized_path}"
except Exception:
pass
folder_name = _species_to_folder_name(species_name)
if not folder_name:
return ""
image_dir = Path("data") / "images" / folder_name
if not image_dir.exists() or not image_dir.is_dir():
return ""
candidates = sorted(
[
path
for path in image_dir.iterdir()
if path.is_file() and path.suffix.lower() in {".jpg", ".jpeg", ".png", ".webp"}
]
)
if not candidates:
return ""
return f"/images/{folder_name}/{candidates[0].name}"
def get_rag_collection():
"""Get or initialize the ChromaDB collection for plant RAG."""
global rag_collection
if rag_collection is None:
try:
client = chromadb.PersistentClient(path=RAG_DB_PATH)
rag_collection = client.get_collection(
name="plants",
)
except Exception as e:
raise RuntimeError(f"Impossibile caricare il database RAG delle piante: {e}")
return rag_collection
def ensure_plant_cards_cache_table(conn: sqlite3.Connection) -> None:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS plant_cards_cache (
species_name TEXT NOT NULL,
lang TEXT NOT NULL,
title TEXT NOT NULL,
common_name TEXT,
summary TEXT NOT NULL,
markdown TEXT NOT NULL,
images_json TEXT NOT NULL,
source TEXT NOT NULL,
updated_at TEXT NOT NULL,
PRIMARY KEY (species_name, lang)
)
"""
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_plant_cards_cache_updated_at ON plant_cards_cache(updated_at)"
)
conn.commit()
def get_cached_plant_card(name: str, lang: str) -> dict[str, Any] | None:
if not PLANT_CARD_CACHE_ENABLED:
return None
species_name = (name or "").strip()
lang_code = (lang or "it").strip().lower()
if not species_name:
return None
with get_plants_db_connection() as conn:
ensure_plant_cards_cache_table(conn)
row = conn.execute(
(
"SELECT title, common_name, summary, markdown, images_json, source, updated_at "
"FROM plant_cards_cache "
"WHERE lower(species_name) = lower(?) AND lower(lang) = lower(?) "
"LIMIT 1"
),
(species_name, lang_code),
).fetchone()
if row is None:
return None
images: list[str] = []
raw_images = row["images_json"] if "images_json" in row.keys() else "[]"
try:
parsed = json.loads(raw_images or "[]")
if isinstance(parsed, list):
images = [str(item) for item in parsed if str(item).strip()]
except Exception:
images = []
return {
"title": row["title"],
"common_name": row["common_name"] or "",
"markdown": row["markdown"],
"summary": row["summary"],
"images": images,
"source": row["source"],
"cache_updated_at": row["updated_at"],
}
def upsert_cached_plant_card(name: str, lang: str, payload: dict[str, Any]) -> None:
if not PLANT_CARD_CACHE_ENABLED:
return
species_name = (name or "").strip()
lang_code = (lang or "it").strip().lower()
if not species_name:
return
title = str(payload.get("title") or species_name)
common_name = str(payload.get("common_name") or "")
summary = str(payload.get("summary") or "")
markdown = str(payload.get("markdown") or "")
source = str(payload.get("source") or "rag")
images = payload.get("images")
images_json = json.dumps(images if isinstance(images, list) else [], ensure_ascii=False)
updated_at = datetime.utcnow().replace(microsecond=0).isoformat() + "Z"
with get_plants_db_connection() as conn:
ensure_plant_cards_cache_table(conn)
conn.execute(
(
"INSERT INTO plant_cards_cache "
"(species_name, lang, title, common_name, summary, markdown, images_json, source, updated_at) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) "
"ON CONFLICT(species_name, lang) DO UPDATE SET "
"title=excluded.title, "
"common_name=excluded.common_name, "
"summary=excluded.summary, "
"markdown=excluded.markdown, "
"images_json=excluded.images_json, "
"source=excluded.source, "
"updated_at=excluded.updated_at"
),
(species_name, lang_code, title, common_name, summary, markdown, images_json, source, updated_at),
)
conn.commit()
PLANT_PROFILE_FIELDS = (
"species_name",
"indexed",
"annaffiatura_gg",
"annaffiatura_time",
"luce",
"temperatura",
"umidita",
"altezza_media",
"pulizia",
"terriccio",
"concimazione",
"prevenzione",
"updated_at",
)
def get_plants_db_connection() -> sqlite3.Connection:
db_path = Path(PLANTS_SQLITE_PATH)
if not db_path.exists():
bundled_db = Path("data") / "plants.db"
if bundled_db.exists() and bundled_db.resolve() != db_path.resolve():
db_path.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(bundled_db, db_path)
if not db_path.exists():
raise HTTPException(status_code=503, detail="Database plants.db non disponibile.")
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
try:
conn.execute("ALTER TABLE plants ADD COLUMN image_paths TEXT")
conn.commit()
except Exception:
pass
return conn
def _get_species_images_from_db(species_name: str) -> list[str]:
query = "SELECT image_paths FROM plants WHERE lower(species_name) = lower(?) LIMIT 1"
with get_plants_db_connection() as conn:
row = conn.execute(query, (species_name.strip(),)).fetchone()
if row is None:
return []
raw = row["image_paths"] if "image_paths" in row.keys() else None
if not raw:
return []
try:
parsed = json.loads(raw)
except (json.JSONDecodeError, TypeError):
return []
if not isinstance(parsed, list):
return []
return [str(v).strip() for v in parsed if str(v).strip()]
def _sqlite_table_exists(conn: sqlite3.Connection, table_name: str) -> bool:
row = conn.execute(
"SELECT 1 FROM sqlite_master WHERE type = 'table' AND name = ? LIMIT 1",
(table_name,),
).fetchone()
return row is not None
def _migrate_user_plants_if_needed(user_conn: sqlite3.Connection) -> None:
if _is_mysql_conn(user_conn):
return
user_db_path = Path(USER_PLANTS_SQLITE_PATH)
plants_db_path = Path(PLANTS_SQLITE_PATH)
try:
if user_db_path.resolve() == plants_db_path.resolve():
return
except Exception:
if str(user_db_path) == str(plants_db_path):
return
if not plants_db_path.exists():
return
if not _sqlite_table_exists(user_conn, "user_plants"):
return
dest_count = user_conn.execute("SELECT COUNT(1) AS c FROM user_plants").fetchone()["c"]
if int(dest_count or 0) > 0:
return
src_conn = sqlite3.connect(plants_db_path)
src_conn.row_factory = sqlite3.Row
try:
if not _sqlite_table_exists(src_conn, "user_plants"):
return
src_columns = {
row["name"] for row in src_conn.execute("PRAGMA table_info(user_plants)").fetchall()
}
if "user_photo_url" in src_columns:
rows = src_conn.execute(
"SELECT id, plant_name, user_given_name, user_id, user_email, user_photo_url, created_at FROM user_plants"
).fetchall()
else:
rows = src_conn.execute(
"SELECT id, plant_name, user_given_name, user_id, user_email, NULL AS user_photo_url, created_at FROM user_plants"
).fetchall()
if not rows:
return
user_conn.executemany(
(
"INSERT OR IGNORE INTO user_plants "
"(id, plant_name, user_given_name, user_id, user_email, user_photo_url, created_at) "
"VALUES (?, ?, ?, ?, ?, ?, ?)"
),
[
(
row["id"],
row["plant_name"],
row["user_given_name"],
row["user_id"],
row["user_email"],
row["user_photo_url"],
row["created_at"],
)
for row in rows
],
)
user_conn.commit()
finally:
src_conn.close()
def get_user_plants_db_connection() -> sqlite3.Connection:
if _is_mysql_enabled():
conn = _MySQLCompatConnection(MY_SQL_CONNECTION_STRING)
ensure_user_plants_table(conn)
ensure_registered_users_table(conn)
ensure_recognition_logs_table(conn)
return conn
db_path = Path(USER_PLANTS_SQLITE_PATH)
db_path.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
ensure_user_plants_table(conn)
ensure_registered_users_table(conn)
ensure_recognition_logs_table(conn)
_migrate_user_plants_if_needed(conn)
return conn
def get_plant_profile_from_db(name: str) -> dict[str, Any] | None:
query = (
"SELECT species_name, indexed, annaffiatura_gg, annaffiatura_time, luce, temperatura, "
"umidita, altezza_media, pulizia, terriccio, concimazione, prevenzione, updated_at "
"FROM plants WHERE lower(species_name) = lower(?) LIMIT 1"
)
with get_plants_db_connection() as conn:
row = conn.execute(query, (name.strip(),)).fetchone()
if row is None:
return None
payload = {field: row[field] for field in PLANT_PROFILE_FIELDS}
payload["indexed"] = bool(payload["indexed"])
payload["updated_at"] = _format_datetime_display(payload["updated_at"])
return payload
def ensure_user_plants_table(conn: sqlite3.Connection) -> None:
if _is_mysql_conn(conn):
conn.execute(
"""
CREATE TABLE IF NOT EXISTS user_plants (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
plant_name VARCHAR(255) NOT NULL,
user_given_name VARCHAR(255) NOT NULL,
user_id VARCHAR(255) NOT NULL,
user_email VARCHAR(255) NULL,
user_photo_url TEXT NULL,
created_at VARCHAR(40) NOT NULL
)
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS user_plant_photos (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
plant_id BIGINT NOT NULL,
photo_url TEXT NOT NULL,
created_at VARCHAR(40) NOT NULL,
FOREIGN KEY (plant_id) REFERENCES user_plants(id) ON DELETE CASCADE
)
"""
)
try:
conn.execute(
"CREATE INDEX idx_user_plant_photos_plant_id ON user_plant_photos(plant_id)"
)
except Exception:
pass
conn.commit()
return
conn.execute(
"""
CREATE TABLE IF NOT EXISTS user_plants (
id INTEGER PRIMARY KEY AUTOINCREMENT,
plant_name TEXT NOT NULL,
user_given_name TEXT NOT NULL,
user_id TEXT NOT NULL,
user_email TEXT,
user_photo_url TEXT,
created_at TEXT NOT NULL
)
"""
)
# Add user_photo_url column to existing databases (migration)
try:
conn.execute("ALTER TABLE user_plants ADD COLUMN user_photo_url TEXT")
conn.commit()
except Exception:
pass # Column already exists
conn.execute(
"""
CREATE TABLE IF NOT EXISTS user_plant_photos (
id INTEGER PRIMARY KEY AUTOINCREMENT,
plant_id INTEGER NOT NULL,
photo_url TEXT NOT NULL,
created_at TEXT NOT NULL,
FOREIGN KEY (plant_id) REFERENCES user_plants(id) ON DELETE CASCADE
)
"""
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_user_plant_photos_plant_id ON user_plant_photos(plant_id)"
)
conn.commit()
def ensure_registered_users_table(conn: sqlite3.Connection) -> None:
if _is_mysql_conn(conn):
conn.execute(
"""
CREATE TABLE IF NOT EXISTS registered_users (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
google_sub VARCHAR(255) NOT NULL UNIQUE,
email VARCHAR(255) NOT NULL,
registered_at VARCHAR(40) NOT NULL
)
"""
)
try:
conn.execute(
"CREATE INDEX idx_registered_users_email ON registered_users(email)"
)
except Exception:
pass
conn.commit()
return
conn.execute(
"""
CREATE TABLE IF NOT EXISTS registered_users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
google_sub TEXT NOT NULL UNIQUE,
email TEXT NOT NULL,
registered_at TEXT NOT NULL
)
"""
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_registered_users_email ON registered_users(email)"
)
conn.commit()
def ensure_recognition_logs_table(conn: sqlite3.Connection) -> None:
if _is_mysql_conn(conn):
conn.execute(
"""
CREATE TABLE IF NOT EXISTS recognition_logs (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
user_id VARCHAR(255) NOT NULL,
user_email VARCHAR(255) NULL,
user_type VARCHAR(16) NOT NULL,
chosen_species VARCHAR(255) NOT NULL,
image_url TEXT NULL,
used_openai TINYINT(1) NOT NULL DEFAULT 0,
recognition_ms INT NULL,
created_at VARCHAR(40) NOT NULL
)
"""
)
try:
conn.execute(
"CREATE INDEX idx_recognition_logs_created_at ON recognition_logs(created_at)"
)
except Exception:
pass
try:
conn.execute(
"CREATE INDEX idx_recognition_logs_species ON recognition_logs(chosen_species)"
)
except Exception:
pass
try:
conn.execute(
"CREATE INDEX idx_recognition_logs_user_id ON recognition_logs(user_id)"
)
except Exception:
pass
conn.commit()
return
conn.execute(
"""
CREATE TABLE IF NOT EXISTS recognition_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT NOT NULL,
user_email TEXT,
user_type TEXT NOT NULL,
chosen_species TEXT NOT NULL,
image_url TEXT,
used_openai INTEGER NOT NULL DEFAULT 0,
recognition_ms INTEGER,
created_at TEXT NOT NULL
)
"""
)
# Migration: add recognition_ms to existing databases.
try:
conn.execute("ALTER TABLE recognition_logs ADD COLUMN recognition_ms INTEGER")
conn.commit()
except Exception:
pass
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_recognition_logs_created_at ON recognition_logs(created_at)"
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_recognition_logs_species ON recognition_logs(chosen_species)"
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_recognition_logs_user_id ON recognition_logs(user_id)"
)
conn.commit()
def create_recognition_log(
chosen_species: str,
used_openai: bool,
image_url: str | None,
recognition_ms: int | None,
user: dict[str, Any] | None,
) -> dict[str, Any]:
species_clean = str(chosen_species or "").strip()
if not species_clean:
raise HTTPException(status_code=400, detail="Specie scelta obbligatoria.")
user_id = str((user or {}).get("sub") or "").strip() or "guest"
user_email = str((user or {}).get("email") or "").strip() or None
user_type = "user" if user and user_id != "guest" else "guest"
image_url_clean = str(image_url or "").strip() or None
recognition_ms_value = None if recognition_ms is None else max(0, int(recognition_ms))
created_at = datetime.utcnow().replace(microsecond=0).isoformat() + "Z"
with get_user_plants_db_connection() as conn:
ensure_recognition_logs_table(conn)
cursor = conn.execute(
(
"INSERT INTO recognition_logs "
"(user_id, user_email, user_type, chosen_species, image_url, used_openai, recognition_ms, created_at) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?)"
),
(
user_id,
user_email,
user_type,
species_clean,
image_url_clean,
1 if used_openai else 0,
recognition_ms_value,
created_at,
),
)
conn.commit()
return {
"id": int(cursor.lastrowid),
"user_id": user_id,
"user_email": user_email,
"user_type": user_type,
"chosen_species": species_clean,
"image_url": image_url_clean,
"used_openai": bool(used_openai),
"recognition_ms": recognition_ms_value,
"created_at": created_at,
}
def get_recognition_admin_aggregates(conn: sqlite3.Connection, chart_days: int = 30) -> dict[str, Any]:
ensure_recognition_logs_table(conn)
safe_days = int(chart_days) if chart_days in (7, 30, 90) else 30
window_start = (datetime.utcnow() - timedelta(days=safe_days - 1)).strftime("%Y-%m-%d") + "T00:00:00Z"
totals = conn.execute(
"""
SELECT
COUNT(1) AS total,
SUM(CASE WHEN user_type = 'guest' THEN 1 ELSE 0 END) AS guest_total,
SUM(CASE WHEN user_type = 'user' THEN 1 ELSE 0 END) AS user_total,
SUM(CASE WHEN used_openai = 1 THEN 1 ELSE 0 END) AS openai_total,
SUM(CASE WHEN image_url IS NOT NULL AND trim(image_url) <> '' THEN 1 ELSE 0 END) AS with_image_total,
COUNT(recognition_ms) AS timed_total,
AVG(recognition_ms * 1.0) AS avg_recognition_ms
FROM recognition_logs
WHERE created_at >= ?
"""
,
(window_start,),
).fetchone()
top_species_rows = conn.execute(
"""
SELECT chosen_species, COUNT(1) AS count
FROM recognition_logs
WHERE created_at >= ?
GROUP BY chosen_species
ORDER BY count DESC, chosen_species ASC
LIMIT 8
"""
,
(window_start,),
).fetchall()
daily_rows = conn.execute(
"""
SELECT
substr(created_at, 1, 10) AS day,
COUNT(1) AS total,
SUM(CASE WHEN used_openai = 1 THEN 1 ELSE 0 END) AS openai
FROM recognition_logs
WHERE created_at >= ?
GROUP BY substr(created_at, 1, 10)
ORDER BY day DESC
LIMIT ?
"""
,
(window_start, safe_days),
).fetchall()
daily_series = [
{
"day": str(row["day"] or ""),
"total": int(row["total"] or 0),
"openai": int(row["openai"] or 0),
}
for row in reversed(daily_rows)
]
top_species = [
{
"species": str(row["chosen_species"] or ""),
"count": int(row["count"] or 0),
}
for row in top_species_rows
]
return {
"chart_days": safe_days,
"total": int((totals["total"] or 0) if totals else 0),
"guest_total": int((totals["guest_total"] or 0) if totals else 0),
"user_total": int((totals["user_total"] or 0) if totals else 0),
"openai_total": int((totals["openai_total"] or 0) if totals else 0),
"with_image_total": int((totals["with_image_total"] or 0) if totals else 0),
"avg_recognition_ms": (
float(totals["avg_recognition_ms"])
if totals and int(totals["timed_total"] or 0) > 0 and totals["avg_recognition_ms"] is not None
else None
),
"top_species": top_species,
"daily_series": daily_series,
}
def register_google_user_if_needed(user: dict[str, Any]) -> tuple[bool, str]:
google_sub = str(user.get("sub") or "").strip()
email = str(user.get("email") or "").strip()
if not google_sub or not email:
return False, ""
with get_user_plants_db_connection() as conn:
ensure_registered_users_table(conn)
existing = conn.execute(
"SELECT registered_at FROM registered_users WHERE google_sub = ? LIMIT 1",
(google_sub,),
).fetchone()
if existing:
return False, str(existing["registered_at"] or "")
registered_at = datetime.utcnow().replace(microsecond=0).isoformat() + "Z"
conn.execute(
(
"INSERT INTO registered_users "
"(google_sub, email, registered_at) VALUES (?, ?, ?)"
),
(google_sub, email, registered_at),
)
conn.commit()
return True, registered_at
def list_registered_users_for_admin(limit: int = 300) -> list[dict[str, Any]]:
max_limit = max(1, min(int(limit), 1000))
with get_user_plants_db_connection() as conn:
ensure_registered_users_table(conn)
rows = conn.execute(
(
"SELECT email, registered_at "
"FROM registered_users "
"ORDER BY registered_at DESC "
"LIMIT ?"
),
(max_limit,),
).fetchall()
return [
{
"email": str(row["email"] or ""),
"registered_at": str(row["registered_at"] or ""),
"registered_at_display": _format_datetime_display(row["registered_at"]),
}
for row in rows
]
def _is_admin_email(email: str) -> bool:
normalized = str(email or "").strip().lower()
return bool(normalized) and normalized in ADMIN_USERS
def _require_admin_user(authorization: str | None) -> dict[str, Any]:
user = _get_google_user_from_authorization(authorization, require_auth=True)
if not user:
raise HTTPException(status_code=401, detail="Accedi con Google.")
if not _is_admin_email(str(user.get("email") or "")):
raise HTTPException(status_code=403, detail="Accesso admin non autorizzato.")
return user
def _get_user_plant_photo_urls(conn: sqlite3.Connection, plant_id: int, fallback_url: str | None) -> list[str]:
rows = conn.execute(
"SELECT photo_url FROM user_plant_photos WHERE plant_id = ? ORDER BY id DESC",
(plant_id,),
).fetchall()
urls = [str(r["photo_url"] or "").strip() for r in rows if str(r["photo_url"] or "").strip()]
if urls:
return urls
fallback = str(fallback_url or "").strip()
return [fallback] if fallback else []
def _user_plant_row_to_payload(conn: sqlite3.Connection, row: sqlite3.Row) -> dict[str, Any]:
plant_id = int(row["id"])
fallback_photo = row["user_photo_url"] if "user_photo_url" in row.keys() else None
photo_urls = _get_user_plant_photo_urls(conn, plant_id, fallback_photo)
return {
"id": plant_id,
"plant_name": row["plant_name"],
"user_given_name": row["user_given_name"],
"user": row["user_email"] or row["user_id"],
"user_photo_url": (photo_urls[0] if photo_urls else None),
"user_photos": photo_urls,
"created_at_iso": row["created_at"],
"created_at": _format_datetime_display(row["created_at"]),
}
def create_user_plant(plant_name: str, user_given_name: str, user: dict[str, Any]) -> dict[str, Any]:
plant_name_clean = plant_name.strip()
user_given_name_clean = user_given_name.strip()
user_id = str(user.get("sub") or "").strip()
user_email = str(user.get("email") or "").strip()
created_at = datetime.utcnow().replace(microsecond=0).isoformat() + "Z"
if not plant_name_clean:
raise HTTPException(status_code=400, detail="Nome pianta obbligatorio.")
if not user_given_name_clean:
raise HTTPException(status_code=400, detail="Nome scelto dall'utente obbligatorio.")
if not user_id:
raise HTTPException(status_code=401, detail="Utente Google non valido.")
with get_user_plants_db_connection() as conn:
ensure_user_plants_table(conn)
cursor = conn.execute(
(
"INSERT INTO user_plants "
"(plant_name, user_given_name, user_id, user_email, created_at) "
"VALUES (?, ?, ?, ?, ?)"
),
(plant_name_clean, user_given_name_clean, user_id, user_email, created_at),
)
conn.commit()
row = conn.execute(
(
"SELECT id, plant_name, user_given_name, user_id, user_email, user_photo_url, created_at "
"FROM user_plants WHERE id = ?"
),
(cursor.lastrowid,),
).fetchone()
return _user_plant_row_to_payload(conn, row)
def list_user_plants(user: dict[str, Any]) -> list[dict[str, Any]]:
user_id = str(user.get("sub") or "").strip()
if not user_id:
raise HTTPException(status_code=401, detail="Utente Google non valido.")
with get_user_plants_db_connection() as conn:
ensure_user_plants_table(conn)
rows = conn.execute(
(
"SELECT id, plant_name, user_given_name, user_id, user_email, user_photo_url, created_at "
"FROM user_plants WHERE user_id = ? ORDER BY id DESC"
),
(user_id,),
).fetchall()
return [_user_plant_row_to_payload(conn, row) for row in rows]
def delete_user_plant_by_id(user: dict[str, Any], plant_id: int) -> bool:
user_id = str(user.get("sub") or "").strip()
if not user_id:
raise HTTPException(status_code=401, detail="Utente Google non valido.")
with get_user_plants_db_connection() as conn:
ensure_user_plants_table(conn)
existing = conn.execute(
"SELECT id FROM user_plants WHERE id = ? AND user_id = ? LIMIT 1",
(plant_id, user_id),
).fetchone()
if existing is None:
return False
conn.execute(
"DELETE FROM user_plant_photos WHERE plant_id = ?",
(plant_id,),
)
conn.execute(
"DELETE FROM user_plants WHERE id = ? AND user_id = ?",
(plant_id, user_id),
)
conn.commit()
return True
def update_user_plant_created_at_by_id(user: dict[str, Any], plant_id: int, created_at_iso: str) -> dict[str, Any] | None:
user_id = str(user.get("sub") or "").strip()
if not user_id:
raise HTTPException(status_code=401, detail="Utente Google non valido.")
with get_user_plants_db_connection() as conn:
ensure_user_plants_table(conn)
existing = conn.execute(
"SELECT id FROM user_plants WHERE id = ? AND user_id = ? LIMIT 1",
(plant_id, user_id),
).fetchone()
if existing is None:
return None
conn.execute(
"UPDATE user_plants SET created_at = ? WHERE id = ? AND user_id = ?",
(created_at_iso, plant_id, user_id),
)
conn.commit()
row = conn.execute(
(
"SELECT id, plant_name, user_given_name, user_id, user_email, user_photo_url, created_at "
"FROM user_plants WHERE id = ? AND user_id = ? LIMIT 1"
),
(plant_id, user_id),
).fetchone()
if row is None:
return None
return _user_plant_row_to_payload(conn, row)
def _build_profile_context(profile: dict[str, Any] | None) -> str:
if not profile:
return ""
labels = {
"species_name": "Specie",
"indexed": "Presente in RAG",
"annaffiatura_gg": "Annaffiatura ogni giorni",
"annaffiatura_time": "Momento annaffiatura",
"luce": "Luce",
"temperatura": "Temperatura",
"umidita": "Umidita",
"altezza_media": "Altezza media",
"pulizia": "Pulizia",
"terriccio": "Terriccio",
"concimazione": "Concimazione",
"prevenzione": "Prevenzione",
"updated_at": "Ultimo aggiornamento",
}
lines = []
for field in PLANT_PROFILE_FIELDS:
value = profile.get(field)
if value is None or value == "":
continue
if field == "indexed":
value = "si" if value else "no"
lines.append(f"- {labels[field]}: {value}")
if not lines:
return ""
return "Dati strutturati estratti da plants.db:\n" + "\n".join(lines)
app = FastAPI(title="PlantCLEF Image Search API")
cors_origins_raw = os.getenv("CORS_ALLOW_ORIGINS", "http://localhost:5173,http://127.0.0.1:5173")
cors_origins = [origin.strip() for origin in cors_origins_raw.split(",") if origin.strip()]
app.add_middleware(
CORSMiddleware,
allow_origins=cors_origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Serve PWA static assets generated by Vite build.
app.mount(
"/assets",
StaticFiles(directory=str(PWA_DIST_DIR / "assets"), check_dir=False),
name="pwa-assets",
)
app.mount(
"/icons",
StaticFiles(directory=str(PWA_DIST_DIR / "icons"), check_dir=False),
name="pwa-icons",
)
def get_search_backend_status():
checks: dict[str, str] = {}
for module_name in ("torch", "faiss", "open_clip"):
try:
__import__(module_name)
checks[module_name] = "ok"
except Exception as e:
checks[module_name] = f"{type(e).__name__}: {e}"
files = {
"index_exists": os.path.exists(INDEX_PATH),
"cache_exists": os.path.exists(CACHE_PATH),
"index_path": INDEX_PATH,
"cache_path": CACHE_PATH,
}
native_ok = all(value == "ok" for value in checks.values())
ready = native_ok and files["index_exists"] and files["cache_exists"]
return {"ready": ready, "modules": checks, "files": files}
def get_catalog_and_faiss_stats() -> dict[str, Any]:
species_db_total = 0
species_rag_total = 0
catalog_ok = False
catalog_error = ""
try:
with get_plants_db_connection() as conn:
row = conn.execute(
"SELECT COUNT(DISTINCT lower(species_name)) AS c FROM plants"
).fetchone()
species_db_total = int((row["c"] if row else 0) or 0)
row_rag = conn.execute(
"SELECT COUNT(DISTINCT lower(species_name)) AS c FROM plants WHERE indexed = 1"
).fetchone()
species_rag_total = int((row_rag["c"] if row_rag else 0) or 0)
catalog_ok = True
except Exception as exc:
catalog_error = f"{type(exc).__name__}: {exc}"
faiss_ok = False
faiss_error = ""
plantclef_images_total = 0
plantclef_species_total = 0
leafsnap_images_total = 0
leafsnap_species_total = 0
try:
loaded_index = get_index()
plantclef_labels = list(getattr(loaded_index, "plantclef_labels", []) or [])
leafsnap_labels = list(getattr(loaded_index, "leafsnap_labels", []) or [])
plantclef_images_total = len(plantclef_labels)
plantclef_species_total = len({str(v).strip().lower() for v in plantclef_labels if str(v).strip()})
leafsnap_images_total = len(leafsnap_labels)
leafsnap_species_total = len({str(v).strip().lower() for v in leafsnap_labels if str(v).strip()})
faiss_ok = True
except Exception as exc:
faiss_error = f"{type(exc).__name__}: {exc}"
return {
"catalog": {
"ok": catalog_ok,
"error": catalog_error,
"species_db_total": species_db_total,
"species_rag_total": species_rag_total,
},
"faiss": {
"ok": faiss_ok,
"error": faiss_error,
"plantclef": {
"images_total": plantclef_images_total,
"species_total": plantclef_species_total,
},
"leafsnap": {
"images_total": leafsnap_images_total,
"species_total": leafsnap_species_total,
},
},
}
def get_public_app_config() -> dict[str, Any]:
return {
"google_client_id": GOOGLE_CLIENT_IDS[0] if GOOGLE_CLIENT_IDS else "",
"require_google_auth": REQUIRE_GOOGLE_AUTH,
}
@app.get("/app-config")
def app_config():
return JSONResponse(content=get_public_app_config())
class PlantChatRequest(BaseModel):
plant_name: str = Field(..., min_length=2, description="Nome comune o scientifico della pianta")
question: str = Field(..., min_length=3, description="Domanda sulla cura della pianta")
lang: str = Field("it", description="Lingua Wikipedia da usare per il contesto")
class SaveUserPlantRequest(BaseModel):
plant_name: str = Field(..., min_length=2, description="Nome della specie trovata")
user_given_name: str = Field(..., min_length=1, max_length=80, description="Nome scelto dall'utente")
class UpdateFirstWateringDateRequest(BaseModel):
first_watering_date: str = Field(
...,
pattern=r"^\d{4}-\d{2}-\d{2}$",
description="Data prima innaffiatura in formato YYYY-MM-DD",
)
class GoogleAuthRequest(BaseModel):
id_token: str = Field(..., min_length=20, description="Google ID token")
class RecognitionLogRequest(BaseModel):
chosen_species: str = Field(..., min_length=2, max_length=120, description="Specie selezionata")
used_openai: bool = Field(default=False, description="True se nel riconoscimento e stato usato OpenAI")
image_url: str | None = Field(default=None, max_length=1200, description="URL immagine se salvata")
recognition_ms: int | None = Field(default=None, ge=0, le=300000, description="Durata riconoscimento in ms")
def _validate_google_token(id_token: str) -> dict[str, Any]:
try:
with httpx.Client(timeout=8.0) as client:
response = client.get(
"https://oauth2.googleapis.com/tokeninfo",
params={"id_token": id_token},
)
except Exception as e:
raise HTTPException(status_code=502, detail=f"Errore verifica token Google: {e}")
if response.status_code != 200:
raise HTTPException(status_code=401, detail="Token Google non valido.")
payload = response.json()
audience = str(payload.get("aud") or "")
if GOOGLE_CLIENT_IDS and audience not in GOOGLE_CLIENT_IDS:
raise HTTPException(status_code=401, detail="Token Google con client_id non autorizzato.")
return payload
def _get_google_user_from_authorization(
authorization: str | None,
require_auth: bool | None = None,
) -> dict[str, Any] | None:
if require_auth is None:
require_auth = REQUIRE_GOOGLE_AUTH
if not authorization:
if require_auth:
raise HTTPException(status_code=401, detail="Authorization Bearer richiesta.")
return None
scheme, _, token = authorization.partition(" ")
if scheme.lower() != "bearer" or not token.strip():
raise HTTPException(status_code=401, detail="Header Authorization non valido.")
validated = _validate_google_token(token.strip())
return {
"sub": validated.get("sub", ""),
"email": validated.get("email", ""),
"name": validated.get("name", ""),
"picture": validated.get("picture", ""),
}
def fetch_wikipedia_text_context(name: str, lang: str):
base = f"https://{lang}.wikipedia.org"
wiki_headers = {
"User-Agent": WIKI_USER_AGENT,
"Accept": "application/json",
}
with httpx.Client(timeout=10.0, headers=wiki_headers, follow_redirects=True) as client:
search_resp = client.get(
f"{base}/w/api.php",
params={
"action": "opensearch",
"search": name,
"limit": 1,
"format": "json",
},
)
titles = []
if search_resp.status_code == 200:
search_data = search_resp.json()
titles = search_data[1]
if not titles:
query_resp = client.get(
f"{base}/w/api.php",
params={
"action": "query",
"list": "search",
"srsearch": name,
"srlimit": 1,
"format": "json",
},
)
if query_resp.status_code == 200:
query_data = query_resp.json()
items = query_data.get("query", {}).get("search", [])
if items:
titles = [items[0].get("title", "")]
if not titles:
raise HTTPException(status_code=404, detail=f"Nessuna pagina Wikipedia trovata per '{name}'.")
page_title = titles[0]
safe_title = page_title.replace(" ", "_")
summary_resp = client.get(f"{base}/api/rest_v1/page/summary/{safe_title}")
summary_resp.raise_for_status()
summary = summary_resp.json()
long_resp = client.get(
f"{base}/w/api.php",
params={
"action": "query",
"prop": "extracts",
"titles": page_title,
"explaintext": 1,
"redirects": 1,
"format": "json",
},
)
long_text = ""
if long_resp.status_code == 200:
long_data = long_resp.json()
pages = long_data.get("query", {}).get("pages", {})
if isinstance(pages, dict) and pages:
first_page = next(iter(pages.values()))
long_text = (first_page.get("extract") or "").strip()
title = summary.get("title", page_title)
extract = summary.get("extract", "Nessuna descrizione disponibile.")
page_url = summary.get("content_urls", {}).get("desktop", {}).get("page", f"{base}/wiki/{safe_title}")
extended_text = ""
if long_text:
if long_text.startswith(extract):
extended_text = long_text[len(extract):].strip()
else:
extended_text = long_text
thumbnail = summary.get("thumbnail", {}).get("source", "")
return {
"title": title,
"summary": extract,
"extended_text": extended_text,
"wikipedia_url": page_url,
"thumbnail": thumbnail,
}
def get_index():
global index
if index is None:
try:
from plentclef import PlentClefIndex
leafsnap_aliases: dict[str, str] = {}
try:
with sqlite3.connect(PLANTS_SQLITE_PATH) as _conn:
rows = _conn.execute(
"SELECT leafsnap_label, db_species_name FROM leafsnap_aliases"
).fetchall()
leafsnap_aliases = {r[0]: r[1] for r in rows}
except Exception:
pass # table may not exist yet; aliases simply won't be applied
index = PlentClefIndex(
model_name=MODEL_NAME,
index_path=INDEX_PATH,
index_cache=CACHE_PATH,
leafsnap_index_path=LEAFSNAP_INDEX_PATH,
leafsnap_cache_path=LEAFSNAP_CACHE_PATH,
leafsnap_aliases=leafsnap_aliases,
)
except Exception as e:
cause = f"{type(e).__name__}: {e}"
raise RuntimeError(
"Impossibile inizializzare il motore di ricerca immagini. "
"Probabile blocco di sicurezza su librerie native (es. torch/faiss). "
f"Dettaglio: {cause}."
) from e
return index
@app.post("/search")
async def search_similar(
file: UploadFile = File(..., description="Immagine della pianta da ricercare"),
k: int = Query(default=5, ge=1, le=50, description="Numero di risultati da restituire"),
authorization: str | None = Header(default=None),
):
started_at = datetime.utcnow()
_get_google_user_from_authorization(authorization, require_auth=False)
_log_api(
"/search",
"input",
{
"filename": file.filename,
"content_type": file.content_type,
"k": k,
},
)
if not file.content_type or not file.content_type.startswith("image/"):
raise HTTPException(status_code=400, detail="Il file caricato non è un'immagine valida.")
suffix = os.path.splitext(file.filename or "")[1] or ".jpg"
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp:
tmp.write(await file.read())
tmp_path = tmp.name
try:
loaded_index = get_index()
# Pass debug=True to enable detailed logging of FAISS scoring
debug_candidates = max(
k,
min(500, int(os.getenv("SEARCH_DEBUG_CANDIDATES", "50"))),
)
results, top_planclef_score = loaded_index.search(
tmp_path,
loaded_index.plantclef_labels,
k=k,
debug=True,
search_k=debug_candidates,
return_scores=True,
)
# GPT-4o vision fallback when FAISS confidence is low
api_key = os.getenv("OPENAI_API_KEY", "").strip()
gpt_species: str | None = None
gpt_job_status: dict[str, Any] | None = None
gpt_fallback_attempted = False
gpt_fallback_reason = "not_attempted"
should_trigger_gpt, gpt_trigger_basis = _should_trigger_gpt_fallback(top_planclef_score, results)
if should_trigger_gpt and api_key:
gpt_fallback_attempted = True
logger.info(
"Activating GPT-4o vision fallback: "
f"basis={gpt_trigger_basis}, top_planclef_score={top_planclef_score:.4f}, "
f"threshold={FAISS_CONFIDENCE_THRESHOLD}"
)
fallback_candidates = [species for species, _, _ in results[:12]]
gpt_species, gpt_fallback_reason = _gpt_vision_identify_plant(
tmp_path,
api_key,
candidate_species=fallback_candidates,
)
if gpt_species:
logger.info(f"GPT-4o identified: '{gpt_species}'")
_insert_draft_plant_if_missing(gpt_species, api_key)
gpt_job_status = _ensure_species_build_job(gpt_species)
# Prepend GPT result at score 1.0, avoid duplicates
results = [(gpt_species, 1.0, [])] + [
r for r in results if r[0].lower() != gpt_species.lower()
]
results = results[:k]
else:
logger.info(f"GPT fallback attempted but no species accepted: {gpt_fallback_reason}")
elif should_trigger_gpt:
gpt_fallback_reason = "OPENAI_API_KEY missing"
except RuntimeError as e:
raise HTTPException(status_code=503, detail=str(e))
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
finally:
if os.path.exists(tmp_path):
os.remove(tmp_path)
# Determine is_draft for each result (indexed=0 in plants.db)
draft_species: set[str] = set()
try:
species_names = [r[0] for r in results]
with get_plants_db_connection() as conn:
placeholders = ",".join("?" * len(species_names))
rows = conn.execute(
f"SELECT species_name, indexed FROM plants WHERE lower(species_name) IN ({placeholders})",
[n.lower() for n in species_names],
).fetchall()
indexed_map = {row["species_name"].lower(): bool(row["indexed"]) for row in rows}
for name in species_names:
if not indexed_map.get(name.lower(), True):
draft_species.add(name.lower())
except Exception as exc:
logger.warning(f"Could not determine draft status for results: {exc}")
_log_api(
"/search",
"results",
{
"k": k,
"top_planclef_score": top_planclef_score if 'top_planclef_score' in dir() else None,
"gpt_fallback_attempted": gpt_fallback_attempted if 'gpt_fallback_attempted' in dir() else False,
"gpt_fallback_used": gpt_species is not None if 'gpt_species' in dir() else False,
"gpt_fallback_reason": gpt_fallback_reason if 'gpt_fallback_reason' in dir() else "not_attempted",
"gpt_trigger_basis": gpt_trigger_basis if 'gpt_trigger_basis' in dir() else "not_evaluated",
"gpt_job_status": gpt_job_status if 'gpt_job_status' in dir() else None,
"species_found": [species for species, _, _ in results],
"scores": [float(score) for _, score, _ in results],
"draft_species": list(draft_species),
},
)
return JSONResponse(
content={
"results": [
{
"species": species,
"score": float(score),
"is_draft": species.lower() in draft_species,
"build_status": _species_build_status(species),
}
for species, score, _ in results
],
"gpt_fallback_used": gpt_species is not None if 'gpt_species' in dir() else False,
"recognition_ms": int((datetime.utcnow() - started_at).total_seconds() * 1000),
}
)
@app.middleware("http")
async def log_requests(request, call_next):
request_id = uuid4().hex[:8]
started_at = datetime.utcnow()
_log_api(
request.url.path,
"request",
{
"request_id": request_id,
"method": request.method,
"query": str(request.url.query or ""),
},
)
try:
response = await call_next(request)
except Exception as exc:
_log_api(
request.url.path,
"error",
{
"request_id": request_id,
"elapsed_ms": int((datetime.utcnow() - started_at).total_seconds() * 1000),
"error": f"{type(exc).__name__}: {exc}",
},
)
raise
_log_api(
request.url.path,
"response",
{
"request_id": request_id,
"elapsed_ms": int((datetime.utcnow() - started_at).total_seconds() * 1000),
**_response_payload_for_log(response),
},
)
return response
@app.post("/auth/google")
def auth_google(payload: GoogleAuthRequest):
validated = _validate_google_token(payload.id_token)
user = {
"sub": validated.get("sub", ""),
"email": validated.get("email", ""),
"name": validated.get("name", ""),
"picture": validated.get("picture", ""),
}
is_new_user, registered_at = register_google_user_if_needed(user)
is_admin = _is_admin_email(str(user.get("email") or ""))
return JSONResponse(
content={
"ok": True,
"user": user,
"is_admin": is_admin,
"is_new_user": is_new_user,
"registered_at": registered_at,
"expires_at": validated.get("exp", ""),
"aud": validated.get("aud", ""),
}
)
@app.get("/admin/console")
def get_admin_console(
authorization: str | None = Header(default=None),
limit: int = Query(default=300, ge=1, le=1000),
chart_days: int = Query(default=30, ge=7, le=90),
):
admin_user = _require_admin_user(authorization)
users = list_registered_users_for_admin(limit=limit)
inventory = get_catalog_and_faiss_stats()
with get_user_plants_db_connection() as conn:
ensure_recognition_logs_table(conn)
total_registered = conn.execute("SELECT COUNT(1) AS c FROM registered_users").fetchone()["c"]
total_saved_plants = conn.execute("SELECT COUNT(1) AS c FROM user_plants").fetchone()["c"]
total_external_user_images = conn.execute(
"SELECT COUNT(1) AS c FROM user_plant_photos WHERE photo_url IS NOT NULL AND trim(photo_url) <> ''"
).fetchone()["c"]
recognition = get_recognition_admin_aggregates(conn, chart_days=chart_days)
return JSONResponse(
content={
"ok": True,
"admin_email": admin_user.get("email", ""),
"stats": {
"registered_users_total": int(total_registered or 0),
"saved_plants_total": int(total_saved_plants or 0),
"external_user_images_total": int(total_external_user_images or 0),
},
"recognition": {
"chart_days": recognition["chart_days"],
"total": recognition["total"],
"guest_total": recognition["guest_total"],
"user_total": recognition["user_total"],
"openai_total": recognition["openai_total"],
"with_image_total": recognition["with_image_total"],
"avg_recognition_ms": recognition["avg_recognition_ms"],
},
"charts": {
"top_species": recognition["top_species"],
"daily_series": recognition["daily_series"],
},
"inventory": inventory,
"users": users,
}
)
@app.post("/recognitions/log")
def log_recognition(payload: RecognitionLogRequest, authorization: str | None = Header(default=None)):
user = _get_google_user_from_authorization(authorization, require_auth=False)
created = create_recognition_log(
chosen_species=payload.chosen_species,
used_openai=bool(payload.used_openai),
image_url=payload.image_url,
recognition_ms=payload.recognition_ms,
user=user,
)
return JSONResponse(content={"saved": created})
@app.post("/user/plants")
def save_user_plant(payload: SaveUserPlantRequest, authorization: str | None = Header(default=None)):
user = _get_google_user_from_authorization(authorization)
if not user:
raise HTTPException(status_code=401, detail="Accedi con Google per salvare una pianta.")
saved = create_user_plant(
plant_name=payload.plant_name,
user_given_name=payload.user_given_name,
user=user,
)
_log_api(
"/user/plants",
"saved",
{
"plant_name": saved["plant_name"],
"user_given_name": saved["user_given_name"],
"user": saved["user"],
},
)
return JSONResponse(content={"saved": saved})
@app.get("/user/plants")
def get_user_plants(authorization: str | None = Header(default=None)):
user = _get_google_user_from_authorization(authorization)
if not user:
raise HTTPException(status_code=401, detail="Accedi con Google per vedere le tue piante.")
items = list_user_plants(user)
return JSONResponse(content={"items": items})
@app.delete("/user/plants/{plant_id}")
def delete_user_plant(plant_id: int, authorization: str | None = Header(default=None)):
user = _get_google_user_from_authorization(authorization)
if not user:
raise HTTPException(status_code=401, detail="Accedi con Google per eliminare una pianta.")
deleted = delete_user_plant_by_id(user=user, plant_id=plant_id)
if not deleted:
raise HTTPException(status_code=404, detail="Pianta salvata non trovata.")
_log_api("/user/plants/{plant_id}", "deleted", {"plant_id": plant_id})
return JSONResponse(content={"deleted": True, "id": plant_id})
@app.patch("/user/plants/{plant_id}/first-watering-date")
def update_user_plant_first_watering_date(
plant_id: int,
payload: UpdateFirstWateringDateRequest,
authorization: str | None = Header(default=None),
):
user = _get_google_user_from_authorization(authorization)
if not user:
raise HTTPException(status_code=401, detail="Accedi con Google per aggiornare la data.")
created_at_iso = f"{payload.first_watering_date}T00:00:00Z"
updated = update_user_plant_created_at_by_id(user=user, plant_id=plant_id, created_at_iso=created_at_iso)
if updated is None:
raise HTTPException(status_code=404, detail="Pianta salvata non trovata.")
_log_api(
"/user/plants/{plant_id}/first-watering-date",
"updated",
{"plant_id": plant_id, "created_at_iso": updated["created_at_iso"]},
)
return JSONResponse(content={"updated": updated})
@app.post("/user/plants/{plant_id}/photo")
async def upload_user_plant_photo(
plant_id: int,
file: UploadFile = File(...),
authorization: str | None = Header(default=None),
):
"""Upload a user photo for a saved plant, store it on Cloudinary."""
user = _get_google_user_from_authorization(authorization)
if not user:
raise HTTPException(status_code=401, detail="Accedi con Google per caricare una foto.")
if not (CLOUDINARY_CLOUD_NAME and CLOUDINARY_API_KEY and CLOUDINARY_API_SECRET):
raise HTTPException(status_code=503, detail="Servizio foto non configurato.")
if not file.content_type or not file.content_type.startswith("image/"):
raise HTTPException(status_code=400, detail="Il file caricato non è un'immagine valida.")
user_id = str(user.get("sub") or "").strip()
if not user_id:
raise HTTPException(status_code=401, detail="Utente non valido.")
# Verify the plant belongs to this user
with get_user_plants_db_connection() as conn:
ensure_user_plants_table(conn)
row = conn.execute(
"SELECT id FROM user_plants WHERE id = ? AND user_id = ? LIMIT 1",
(plant_id, user_id),
).fetchone()
if row is None:
raise HTTPException(status_code=404, detail="Pianta non trovata.")
suffix = os.path.splitext(file.filename or "")[1] or ".jpg"
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp:
tmp.write(await file.read())
tmp_path = tmp.name
try:
result = cloudinary.uploader.upload(
tmp_path,
folder="clorofilla/user-plants",
public_id=f"plant_{plant_id}_user_{user_id[:12]}_{uuid4().hex[:10]}",
overwrite=False,
resource_type="image",
transformation=[{"width": 1200, "crop": "limit", "quality": "auto:good"}],
)
photo_url = result.get("secure_url", "")
except Exception as e:
raise HTTPException(status_code=500, detail=f"Errore upload foto: {e}")
finally:
if os.path.exists(tmp_path):
os.remove(tmp_path)
# Save URL to DB
with get_user_plants_db_connection() as conn:
ensure_user_plants_table(conn)
created_at = datetime.utcnow().replace(microsecond=0).isoformat() + "Z"
conn.execute(
"INSERT INTO user_plant_photos (plant_id, photo_url, created_at) VALUES (?, ?, ?)",
(plant_id, photo_url, created_at),
)
conn.execute(
"UPDATE user_plants SET user_photo_url = ? WHERE id = ? AND user_id = ?",
(photo_url, plant_id, user_id),
)
conn.commit()
updated_row = conn.execute(
"SELECT id, plant_name, user_given_name, user_id, user_email, user_photo_url, created_at "
"FROM user_plants WHERE id = ?",
(plant_id,),
).fetchone()
updated_payload = _user_plant_row_to_payload(conn, updated_row)
_log_api("/user/plants/{plant_id}/photo", "uploaded", {"plant_id": plant_id})
return JSONResponse(content={"updated": updated_payload})
@app.get("/health")
def health():
status = get_search_backend_status()
return {
"status": "ok",
"model": MODEL_NAME,
"search_backend_ready": status["ready"],
}
@app.get("/search/status")
def search_status():
return get_search_backend_status()
@app.get("/sw.js")
def pwa_sw_js():
return _serve_pwa_file("sw.js", media_type="application/javascript")
@app.get("/registerSW.js")
def pwa_register_sw_js():
return _serve_pwa_file("registerSW.js", media_type="application/javascript")
@app.get("/manifest.webmanifest")
def pwa_manifest():
return _serve_pwa_file("manifest.webmanifest", media_type="application/manifest+json")
@app.get("/favicon.ico")
def pwa_favicon():
return _serve_pwa_file("favicon.ico", media_type="image/x-icon")
@app.get("/species/previews")
def species_previews(
names: list[str] = Query(default=[], description="Nomi specie da risolvere per anteprima immagine"),
authorization: str | None = Header(default=None),
):
_get_google_user_from_authorization(authorization, require_auth=False)
if not names:
return JSONResponse(content={"previews": {}})
previews = {name: _get_species_preview_image_url(name) for name in names}
return JSONResponse(content={"previews": previews})
@app.get("/species/common-names")
def species_common_names(
names: list[str] = Query(default=[], description="Nomi specie di cui ottenere il nome comune"),
authorization: str | None = Header(default=None),
):
_get_google_user_from_authorization(authorization, require_auth=False)
if not names:
return JSONResponse(content={"common_names": {}})
try:
collection = get_rag_collection()
except Exception:
return JSONResponse(content={"common_names": {}})
result_map: dict[str, str] = {}
for name in names:
try:
res = collection.get(
where={"species_name": {"$eq": name}},
limit=1,
)
metadatas = res.get("metadatas", []) if res else []
meta = metadatas[0] if metadatas else {}
result_map[name] = meta.get("common_name", "") or ""
except Exception:
result_map[name] = ""
return JSONResponse(content={"common_names": result_map})
@app.get("/species/{name}/build-status")
def species_build_status(name: str, authorization: str | None = Header(default=None)):
_get_google_user_from_authorization(authorization, require_auth=False)
status = _species_build_status(name)
profile = get_plant_profile_from_db(name)
ready = bool(profile and profile.get("indexed"))
return JSONResponse(content={"species": name, "ready": ready, "status": status})
@app.get("/", response_class=HTMLResponse)
def ui():
return _serve_pwa_index()
@app.get("/images/{full_path:path}")
def get_image(full_path: str):
"""Serve local plant images from the RAG data directory."""
try:
normalized_path = _normalize_image_path(full_path)
file_path = Path("data") / "images" / normalized_path
file_path = file_path.resolve()
# Security check: ensure the path is within data/images
data_images_path = (Path("data") / "images").resolve()
if not str(file_path).startswith(str(data_images_path)):
raise HTTPException(status_code=403, detail="Accesso negato.")
if not file_path.exists():
raise HTTPException(status_code=404, detail="Immagine non trovata.")
return FileResponse(file_path)
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Errore nel caricamento immagine: {e}")
@app.get("/plant/{name}")
def plant_info(
name: str,
lang: str = Query(default="it", description="Codice lingua Wikipedia (es. it, en, fr)"),
refresh_cache: bool = Query(default=False, description="Forza rigenerazione cache scheda"),
authorization: str | None = Header(default=None),
):
"""Recupera informazioni su una pianta dalla RAG con riassunto OpenAI."""
_get_google_user_from_authorization(authorization, require_auth=False)
_log_api("/plant/{name}", "input", {"name": name, "lang": lang, "refresh_cache": refresh_cache})
normalized_name = (name or "").strip()
normalized_lang = (lang or "it").strip().lower()
if not refresh_cache:
cached_payload = get_cached_plant_card(normalized_name, normalized_lang)
if cached_payload is not None:
cached_payload["build_status"] = _species_build_status(cached_payload.get("title") or normalized_name)
_log_api(
"/plant/{name}",
"cache_hit",
{
"title": cached_payload.get("title", normalized_name),
"source": cached_payload.get("source", "rag"),
"cache_updated_at": cached_payload.get("cache_updated_at", ""),
},
)
return JSONResponse(content=cached_payload)
api_key = os.getenv("OPENAI_API_KEY", "").strip()
try:
retrieval_mode = "rag"
collection = get_rag_collection()
results = collection.get(
where={"species_name": {"$eq": normalized_name}},
limit=20,
)
if not results or not results.get("documents"):
wiki_data = None
try:
retrieval_mode = "wikipedia_fallback"
wiki_data = fetch_wikipedia_text_context(normalized_name, normalized_lang)
except Exception:
if normalized_lang != "en":
try:
retrieval_mode = "wikipedia_fallback_en"
wiki_data = fetch_wikipedia_text_context(normalized_name, "en")
except Exception:
wiki_data = None
if wiki_data is not None:
title = wiki_data["title"]
extract = wiki_data["summary"]
common_name = ""
thumbnail = (wiki_data.get("thumbnail") or "").strip()
image_paths = [thumbnail] if thumbnail else []
rag_used = False
else:
db_profile = get_plant_profile_from_db(normalized_name)
if db_profile is not None:
retrieval_mode = "db_draft"
rag_used = False
title = db_profile.get("species_name") or normalized_name
common_name = ""
image_paths = _get_species_images_from_db(title)
if not db_profile.get("indexed"):
_ensure_species_build_job(title)
if db_profile.get("indexed"):
extract = (
"Scheda non ancora disponibile dalla base conoscenza RAG. "
"Stiamo completando i contenuti per questa specie."
)
else:
extract = (
"Scheda in costruzione. Questa specie e stata riconosciuta, "
"ma i contenuti descrittivi sono ancora in preparazione."
)
else:
raise HTTPException(
status_code=404,
detail=f"Pianta '{normalized_name}' non trovata nella RAG, in Wikipedia o nel database locale.",
)
else:
retrieval_mode = "rag"
rag_used = True
metadatas = results.get("metadatas", [])
first_meta = metadatas[0] if metadatas else {}
title = first_meta.get("species_name", normalized_name)
common_name = first_meta.get("common_name", "")
image_paths = _get_species_images_from_db(normalized_name)
if not image_paths:
image_paths_json = first_meta.get("image_paths", "[]")
try:
image_paths = json.loads(image_paths_json)
except (json.JSONDecodeError, TypeError):
image_paths = []
documents = results.get("documents", [])
combined_text = "\n\n".join(documents[:10])
if len(combined_text) > 6000:
combined_text = combined_text[:6000] + "\n..."
if api_key:
try:
client = OpenAI(api_key=api_key)
completion = client.chat.completions.create(
model=OPENAI_MODEL,
temperature=0.3,
messages=[
{
"role": "system",
"content": (
"Sei un botanico esperto. Genera un riassunto conciso e affascinante "
"della pianta in base al testo fornito. Includi: descrizione, habitat, "
"caratteristiche distintive e usi. Rispondi in italiano."
),
},
{
"role": "user",
"content": (
f"Crea un riassunto affascinante della pianta '{title}'.\n\n"
f"Testo di riferimento:\n{combined_text}"
),
},
],
)
extract = completion.choices[0].message.content or ""
except Exception as e:
raise HTTPException(status_code=502, detail=f"Errore nella generazione del riassunto: {e}")
else:
# Fallback local summary to avoid hard failure when key is missing.
extract = _truncate(re.sub(r"\s+", " ", combined_text), 1200)
_log_api(
"/plant/{name}",
"retrieval",
{
"mode": retrieval_mode,
"rag_used": rag_used,
"documents_found": len(results.get("documents", [])) if results else 0,
},
)
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Errore nel recupero informazioni pianta: {e}")
images: list[str] = []
data_dir = Path("data")
for img_path in image_paths[:3]:
normalized_img_path = _normalize_image_path(img_path)
local_path = data_dir / "images" / normalized_img_path
if local_path.exists():
images.append(f"/images/{normalized_img_path}")
elif str(img_path).startswith("http"):
images.append(img_path)
md_lines = [f"# {title}\n"]
if common_name:
md_lines.append(f"**Nome comune:** {common_name}\n")
if images:
img_tags = "".join(
f'<img src="{url}" alt="{title}" width="280" style="margin:4px;border-radius:8px"/>'
for url in images
)
md_lines.append(img_tags + "\n")
md_lines.append(extract + "\n")
if rag_used:
source_info = "Fonte: Database RAG"
elif retrieval_mode.startswith("wikipedia"):
source_info = "Fonte: Wikipedia"
else:
source_info = "Fonte: Database locale"
md_lines.append(f"\n---\n{source_info}")
markdown = "\n".join(md_lines)
payload = {
"title": title,
"common_name": common_name,
"markdown": markdown,
"summary": extract,
"images": images,
"source": "rag" if rag_used else ("wikipedia" if retrieval_mode.startswith("wikipedia") else "db_draft"),
"build_status": _species_build_status(title),
}
if payload["source"] in {"rag", "wikipedia"}:
try:
upsert_cached_plant_card(normalized_name, normalized_lang, payload)
except Exception as cache_exc:
logger.warning(f"Impossibile aggiornare cache scheda per '{normalized_name}': {cache_exc}")
_log_api(
"/plant/{name}",
"output",
{
"title": payload["title"],
"source": payload["source"],
"images_count": len(payload["images"]),
"summary_preview": _truncate(payload["summary"]),
},
)
return JSONResponse(content=payload)
@app.get("/plant/{name}/profile")
def plant_profile(name: str, authorization: str | None = Header(default=None)):
_get_google_user_from_authorization(authorization, require_auth=False)
_log_api("/plant/{name}/profile", "input", {"name": name})
try:
profile = get_plant_profile_from_db(name)
except HTTPException:
raise
except sqlite3.Error as e:
raise HTTPException(status_code=500, detail=f"Errore accesso plants.db: {e}")
if profile is None:
raise HTTPException(status_code=404, detail=f"Profilo DB non trovato per '{name}'.")
_log_api(
"/plant/{name}/profile",
"output",
{
"species_name": profile["species_name"],
"indexed": profile["indexed"],
"updated_at": profile["updated_at"],
},
)
return JSONResponse(content=profile)
@app.post("/chat/plant-care")
def plant_care_chat(payload: PlantChatRequest, authorization: str | None = Header(default=None)):
_get_google_user_from_authorization(authorization)
_log_api(
"/chat/plant-care",
"input",
{
"plant_name": payload.plant_name,
"question": _truncate(payload.question, 300),
"lang": payload.lang,
},
)
api_key = os.getenv("OPENAI_API_KEY", "").strip()
if not api_key:
raise HTTPException(
status_code=503,
detail="OPENAI_API_KEY non configurata. Imposta la variabile ambiente e riprova.",
)
try:
retrieval_mode = "rag"
profile = get_plant_profile_from_db(payload.plant_name)
# Try to get context from RAG first
collection = get_rag_collection()
results = collection.get(
where={"species_name": {"$eq": payload.plant_name}},
limit=15, # Get multiple chunks for comprehensive context
)
if results and results.get("documents"):
# Use RAG context
documents = results.get("documents", [])
context_text = "\n\n".join(documents)
if len(context_text) > 8000:
context_text = context_text[:8000] + "\n..."
metadatas = results.get("metadatas", [])
plant_title = metadatas[0].get("species_name", payload.plant_name) if metadatas else payload.plant_name
common_name = metadatas[0].get("common_name", "") if metadatas else ""
source_info = "RAG"
source_url = ""
else:
# Fallback to Wikipedia if not found in RAG
retrieval_mode = "wikipedia_fallback"
wiki_data = fetch_wikipedia_text_context(payload.plant_name, payload.lang)
context_text = (wiki_data.get("summary", "") + "\n\n" + wiki_data.get("extended_text", "")).strip()
if len(context_text) > 8000:
context_text = context_text[:8000] + "\n..."
plant_title = wiki_data["title"]
common_name = ""
source_info = "Wikipedia"
source_url = wiki_data.get("wikipedia_url", "")
_log_api(
"/chat/plant-care",
"retrieval",
{
"mode": retrieval_mode,
"source": source_info,
"context_length": len(context_text),
"profile_found": bool(profile),
},
)
except Exception as e:
if isinstance(e, HTTPException):
raise
raise HTTPException(status_code=500, detail=f"Errore nel recupero contesto pianta: {e}")
try:
client = OpenAI(api_key=api_key)
# Build user message with plant info
user_message = f"Pianta: {plant_title}"
if common_name:
user_message += f" ({common_name})"
profile_context = _build_profile_context(profile)
user_message += f"\nDomanda: {payload.question}\n\n"
if profile_context:
user_message += f"{profile_context}\n\n"
user_message += f"Contesto dalla base di dati:\n{context_text}\n\n"
user_message += (
"Rispondi con:\n"
"1) Risposta breve\n"
"2) Cosa fare oggi\n"
"3) Errori da evitare"
)
completion = client.chat.completions.create(
model=OPENAI_MODEL,
temperature=0.3,
messages=[
{
"role": "system",
"content": (
"Sei un assistente botanico pratico e chiaro. "
"Rispondi in italiano con consigli concreti per la cura della pianta "
"(irrigazione, luce, terreno, potatura, parassiti, stagionalita). "
"Se l'informazione non e certa, dichiaralo esplicitamente. "
"Non dare indicazioni mediche per persone o animali."
),
},
{
"role": "user",
"content": user_message,
},
],
)
answer = completion.choices[0].message.content or ""
except Exception as e:
raise HTTPException(status_code=502, detail=f"Errore chiamata OpenAI: {e}")
response_payload = {
"plant": plant_title,
"common_name": common_name,
"question": payload.question,
"answer": answer.strip(),
"source": source_info,
"source_url": source_url,
"model": OPENAI_MODEL,
}
_log_api(
"/chat/plant-care",
"output",
{
"plant": response_payload["plant"],
"source": response_payload["source"],
"model": response_payload["model"],
"answer_preview": _truncate(response_payload["answer"]),
},
)
return JSONResponse(content=response_payload)
@app.get("/debug/routes")
def debug_routes():
return [r.path for r in app.routes]
if __name__ == "__main__":
import uvicorn
uvicorn.run("api:app", host="0.0.0.0", port=8000, reload=False)