Spaces:
Running
Running
| """ | |
| database.py β SQLite CRUD para animais e avistamentos. | |
| Usa /data/ em producao (HF Storage Bucket) ou ./data/ localmente. | |
| """ | |
| import json | |
| import os | |
| import sqlite3 | |
| import uuid | |
| from pathlib import Path | |
| from typing import Optional | |
| import numpy as np | |
| from PIL import Image | |
| # βββ Paths βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _hf_data = Path("/data") | |
| DATA_DIR = _hf_data if (_hf_data.exists() and os.access(_hf_data, os.W_OK)) else Path("./data") | |
| DB_PATH = DATA_DIR / "viralata.db" | |
| PHOTOS_DIR = DATA_DIR / "photos" | |
| SCHEMA = Path(__file__).parent.parent / "db" / "schema.sql" | |
| class Database: | |
| def __init__(self): | |
| DATA_DIR.mkdir(parents=True, exist_ok=True) | |
| PHOTOS_DIR.mkdir(parents=True, exist_ok=True) | |
| self._init_db() | |
| # βββ Internal ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _conn(self) -> sqlite3.Connection: | |
| conn = sqlite3.connect(str(DB_PATH)) | |
| conn.row_factory = sqlite3.Row | |
| try: | |
| conn.execute("PRAGMA journal_mode=WAL") | |
| except sqlite3.OperationalError: | |
| pass | |
| conn.execute("PRAGMA foreign_keys=ON") | |
| return conn | |
| def _init_db(self): | |
| with self._conn() as conn: | |
| if SCHEMA.exists(): | |
| conn.executescript(SCHEMA.read_text()) | |
| else: | |
| conn.executescript( | |
| "CREATE TABLE IF NOT EXISTS animals (" | |
| " id INTEGER PRIMARY KEY AUTOINCREMENT," | |
| " species TEXT NOT NULL," | |
| " description TEXT," | |
| " embedding BLOB," | |
| " first_seen DATETIME DEFAULT CURRENT_TIMESTAMP," | |
| " last_seen DATETIME DEFAULT CURRENT_TIMESTAMP," | |
| " sighting_count INTEGER DEFAULT 1" | |
| ");" | |
| "CREATE TABLE IF NOT EXISTS sightings (" | |
| " id INTEGER PRIMARY KEY AUTOINCREMENT," | |
| " animal_id INTEGER NOT NULL REFERENCES animals(id)," | |
| " photo_path TEXT," | |
| " latitude REAL," | |
| " longitude REAL," | |
| " notes TEXT," | |
| " created_at DATETIME DEFAULT CURRENT_TIMESTAMP" | |
| ");" | |
| ) | |
| # Incremental migrations β safe to run on every startup | |
| cols = {row[1] for row in conn.execute("PRAGMA table_info(sightings)").fetchall()} | |
| if "is_help_event" not in cols: | |
| conn.execute("ALTER TABLE sightings ADD COLUMN is_help_event INTEGER NOT NULL DEFAULT 0") | |
| if "help_type" not in cols: | |
| conn.execute("ALTER TABLE sightings ADD COLUMN help_type TEXT") | |
| acols = {row[1] for row in conn.execute("PRAGMA table_info(animals)").fetchall()} | |
| if "name" not in acols: | |
| conn.execute("ALTER TABLE animals ADD COLUMN name TEXT") | |
| # βββ Photos ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def save_photo(self, image: Image.Image, animal_id: Optional[int] = None) -> str: | |
| """Salva foto organizada por animal_id. Ex: photos/animal_42/abc123.jpg""" | |
| from datetime import datetime | |
| filename = f"{uuid.uuid4().hex}.jpg" | |
| if animal_id is not None: | |
| subdir = PHOTOS_DIR / f"animal_{animal_id}" | |
| else: | |
| # fallback: pasta por data | |
| subdir = PHOTOS_DIR / datetime.now().strftime("%Y-%m-%d") | |
| subdir.mkdir(parents=True, exist_ok=True) | |
| path = subdir / filename | |
| image.save(str(path), format="JPEG", quality=85) | |
| rel = path.relative_to(DATA_DIR) | |
| return str(rel).replace("\\", "/") | |
| def photo_url(self, relative_path: Optional[str]) -> Optional[str]: | |
| if not relative_path: | |
| return None | |
| full = DATA_DIR / relative_path | |
| return str(full) if full.exists() else None | |
| # βββ Animals βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def create_animal(self, description: dict, embedding: list, | |
| name: Optional[str] = None) -> int: | |
| emb_blob = np.array(embedding, dtype=np.float32).tobytes() | |
| with self._conn() as conn: | |
| cur = conn.execute( | |
| "INSERT INTO animals (species, name, description, embedding) VALUES (?, ?, ?, ?)", | |
| ( | |
| description.get("species", "dog"), | |
| name or None, | |
| json.dumps(description, ensure_ascii=False), | |
| emb_blob, | |
| ), | |
| ) | |
| return cur.lastrowid | |
| def update_animal_name(self, animal_id: int, name: str): | |
| with self._conn() as conn: | |
| conn.execute("UPDATE animals SET name = ? WHERE id = ?", (name or None, animal_id)) | |
| def update_animal(self, animal_id: int): | |
| with self._conn() as conn: | |
| conn.execute( | |
| "UPDATE animals SET sighting_count = sighting_count + 1," | |
| " last_seen = CURRENT_TIMESTAMP WHERE id = ?", | |
| (animal_id,), | |
| ) | |
| def get_animal(self, animal_id: int) -> Optional[dict]: | |
| with self._conn() as conn: | |
| row = conn.execute( | |
| "SELECT * FROM animals WHERE id = ?", (animal_id,) | |
| ).fetchone() | |
| return dict(row) if row else None | |
| def get_all_animals_with_embeddings(self) -> list: | |
| with self._conn() as conn: | |
| rows = conn.execute( | |
| "SELECT id, species, description, embedding FROM animals WHERE embedding IS NOT NULL" | |
| ).fetchall() | |
| result = [] | |
| for row in rows: | |
| d = dict(row) | |
| blob = d.pop("embedding") | |
| try: | |
| d["embedding"] = np.frombuffer(blob, dtype=np.float32).tolist() | |
| except Exception: | |
| d["embedding"] = None | |
| result.append(d) | |
| return result | |
| # βββ Sightings βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def add_sighting( | |
| self, | |
| animal_id: int, | |
| photo_path: Optional[str], | |
| lat: Optional[float], | |
| lng: Optional[float], | |
| notes: Optional[str], | |
| is_help_event: bool = False, | |
| help_type: Optional[str] = None, | |
| ): | |
| with self._conn() as conn: | |
| conn.execute( | |
| "INSERT INTO sightings" | |
| " (animal_id, photo_path, latitude, longitude, notes, is_help_event, help_type)" | |
| " VALUES (?, ?, ?, ?, ?, ?, ?)", | |
| (animal_id, photo_path, lat, lng, notes or "", | |
| 1 if is_help_event else 0, help_type), | |
| ) | |
| def get_animal_sightings(self, animal_id: int) -> list: | |
| with self._conn() as conn: | |
| rows = conn.execute( | |
| "SELECT *, CAST(julianday('now') - julianday(created_at) AS INTEGER) AS days_ago" | |
| " FROM sightings WHERE animal_id = ? AND is_help_event = 0" | |
| " ORDER BY created_at DESC", | |
| (animal_id,), | |
| ).fetchall() | |
| return [dict(r) for r in rows] | |
| def get_help_events(self, animal_id: int) -> list: | |
| """Retorna apenas registros de ajuda (is_help_event=1) com URL de foto.""" | |
| with self._conn() as conn: | |
| rows = conn.execute( | |
| "SELECT * FROM sightings WHERE animal_id = ? AND is_help_event = 1" | |
| " ORDER BY created_at DESC", | |
| (animal_id,), | |
| ).fetchall() | |
| result = [] | |
| for row in rows: | |
| d = dict(row) | |
| d["photo_url"] = self.photo_url(d.get("photo_path")) or "" | |
| result.append(d) | |
| return result | |
| def get_animal_detail(self, animal_id: int) -> Optional[dict]: | |
| """Retorna animal + avistamentos normais + help_events separados.""" | |
| animal = self.get_animal(animal_id) | |
| if not animal: | |
| return None | |
| sightings = self.get_animal_sightings(animal_id) | |
| help_events = self.get_help_events(animal_id) | |
| for s in sightings: | |
| s["photo_url"] = self.photo_url(s.get("photo_path")) or "" | |
| return {"animal": animal, "sightings": sightings, "help_events": help_events} | |
| # βββ Map data ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def get_map_data(self, species: str = "all", timeframe: str = "all") -> list: | |
| filters = [] | |
| params = [] | |
| if species in ("dog", "cat"): | |
| filters.append("a.species = ?") | |
| params.append(species) | |
| if timeframe == "today": | |
| filters.append("date(a.last_seen) = date('now')") | |
| elif timeframe == "week": | |
| filters.append("a.last_seen >= datetime('now', '-7 days')") | |
| where = ("WHERE " + " AND ".join(filters)) if filters else "" | |
| sql = ( | |
| "SELECT" | |
| " a.id," | |
| " a.species," | |
| " a.sighting_count AS count," | |
| " a.description," | |
| " strftime('%d/%m/%Y', a.last_seen) AS last_seen," | |
| " CAST(julianday('now') - julianday(a.last_seen) AS INTEGER) AS days_since," | |
| " s.latitude AS lat," | |
| " s.longitude AS lng," | |
| " (SELECT photo_path FROM sightings" | |
| " WHERE animal_id = a.id AND photo_path IS NOT NULL" | |
| " ORDER BY created_at DESC LIMIT 1) AS last_photo" | |
| " FROM animals a" | |
| " JOIN sightings s ON s.id = (" | |
| " SELECT id FROM sightings" | |
| " WHERE animal_id = a.id AND latitude IS NOT NULL" | |
| " ORDER BY created_at DESC LIMIT 1" | |
| " )" | |
| " " + where + | |
| " ORDER BY a.last_seen DESC" | |
| ) | |
| with self._conn() as conn: | |
| rows = conn.execute(sql, params).fetchall() | |
| result = [] | |
| for row in rows: | |
| d = dict(row) | |
| try: | |
| desc_obj = json.loads(d.get("description") or "{}") | |
| d["desc"] = desc_obj.get("description_text") or ( | |
| " ".join(filter(None, [ | |
| desc_obj.get("size", ""), | |
| desc_obj.get("primary_color", ""), | |
| desc_obj.get("breed_estimate", ""), | |
| ])) | |
| ).strip() | |
| except Exception: | |
| d["desc"] = "" | |
| d.pop("description", None) | |
| d["photo_url"] = self.photo_url(d.get("last_photo")) or "" | |
| result.append(d) | |
| return result | |
| # βββ Animals list ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def get_recent_animals(self, limit: int = 30) -> list: | |
| sql = ( | |
| "SELECT" | |
| " a.*," | |
| " CAST(julianday('now') - julianday(a.last_seen) AS INTEGER) AS days_since," | |
| " strftime('%d/%m', a.last_seen) AS last_seen_short," | |
| " (SELECT photo_path FROM sightings" | |
| " WHERE animal_id = a.id AND photo_path IS NOT NULL" | |
| " ORDER BY created_at DESC LIMIT 1) AS last_photo_path" | |
| " FROM animals a" | |
| " ORDER BY a.last_seen DESC" | |
| " LIMIT ?" | |
| ) | |
| with self._conn() as conn: | |
| rows = conn.execute(sql, (limit,)).fetchall() | |
| result = [] | |
| for row in rows: | |
| d = dict(row) | |
| d["last_photo_url"] = self.photo_url(d.get("last_photo_path")) or "" | |
| result.append(d) | |
| return result | |
| def total_sightings(self) -> int: | |
| with self._conn() as conn: | |
| return conn.execute("SELECT COUNT(*) FROM sightings").fetchone()[0] | |
| def total_animals(self) -> int: | |
| with self._conn() as conn: | |
| return conn.execute("SELECT COUNT(*) FROM animals").fetchone()[0] | |