Spaces:
Running
Running
| """ | |
| app.py β PawMap | |
| Build Small Hackathon Β· Backyard AI Track Β· Junho 2026 | |
| Custom frontend via gradio.Server | |
| """ | |
| import json | |
| import logging | |
| import os | |
| import tempfile | |
| import time | |
| import uuid | |
| from pathlib import Path | |
| from gradio import Server | |
| from gradio.data_classes import FileData | |
| from fastapi.responses import HTMLResponse, JSONResponse | |
| from fastapi import Query | |
| from fastapi.staticfiles import StaticFiles | |
| from core.ai import AnimalAI | |
| from core.database import Database, DATA_DIR, PHOTOS_DIR | |
| from core.matcher import AnimalMatcher | |
| from core.seed import seed_if_empty | |
| from core.tracer import log_trace | |
| logging.basicConfig(level=logging.INFO) | |
| db = Database() | |
| ai = AnimalAI() | |
| matcher = AnimalMatcher() | |
| seed_if_empty(db) # popula o mapa com dados de demo se o banco estiver vazio | |
| def _photo_url(photo_path: str) -> str: | |
| """Convert DB-relative photo path to a URL served by the /photos/ static mount. | |
| photo_path is relative to DATA_DIR (e.g. 'photos/animal_42/abc.jpg'). | |
| The static mount serves PHOTOS_DIR at /photos/, so we strip the 'photos/' prefix. | |
| """ | |
| if not photo_path: | |
| return "" | |
| # Normalise separators | |
| p = photo_path.replace("\\", "/") | |
| if p.startswith("photos/"): | |
| p = p[len("photos/"):] | |
| return f"/photos/{p}" | |
| # In-memory session store for analyze β confirm two-step flow | |
| _pending: dict[str, dict] = {} | |
| app = Server() | |
| # Serve photos as static files at /photos/... | |
| PHOTOS_DIR.mkdir(parents=True, exist_ok=True) | |
| app.mount("/photos", StaticFiles(directory=str(PHOTOS_DIR)), name="photos") | |
| # Serve frontend assets (CSS, JS, images) at /static/... | |
| STATIC_DIR = Path(__file__).parent / "static" | |
| STATIC_DIR.mkdir(exist_ok=True) | |
| app.mount("/static", StaticFiles(directory=str(STATIC_DIR)), name="static") | |
| # βββ Frontend βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def homepage(): | |
| html_path = Path(__file__).parent / "index.html" | |
| return html_path.read_text(encoding="utf-8") | |
| # βββ Data APIs (FastAPI routes, no queuing needed) ββββββββββββββββββββββββββββ | |
| async def get_map_data( | |
| species: str = Query("all"), | |
| timeframe: str = Query("all"), | |
| ): | |
| data = db.get_map_data(species, timeframe) | |
| for item in data: | |
| item["photo_url"] = _photo_url(item.pop("last_photo", "") or "") | |
| return JSONResponse(content=data) | |
| async def get_animals(): | |
| animals = db.get_recent_animals(limit=30) | |
| for a in animals: | |
| a["photo_url"] = _photo_url(a.pop("last_photo_path", "") or "") | |
| a.pop("embedding", None) | |
| return JSONResponse(content=animals) | |
| async def get_animal(animal_id: int): | |
| detail = db.get_animal_detail(animal_id) | |
| if not detail: | |
| return JSONResponse(content={"error": "not found"}, status_code=404) | |
| for s in detail.get("sightings", []): | |
| s["photo_url"] = _photo_url(s.get("photo_path") or "") | |
| for h in detail.get("help_events", []): | |
| h["photo_url"] = _photo_url(h.get("photo_path") or "") | |
| detail.get("animal", {}).pop("embedding", None) | |
| return JSONResponse(content=detail) | |
| # βββ ML APIs (queued via Gradio) ββββββββββββββββββββββββββββββββββββββββββββββ | |
| def analyze_image(image_path: FileData) -> dict: | |
| """ | |
| Step 1: Analyze photo with AI, find similar animals. | |
| Returns session_id + AI description + top matches (no DB write yet). | |
| """ | |
| from PIL import Image as PILImage | |
| img = PILImage.open(image_path["path"]).convert("RGB") | |
| description = ai.analyze_image(img) | |
| # RejeiΓ§Γ£o: a IA nΓ£o detectou nenhum animal na foto | |
| if description.get("is_animal") is False: | |
| return { | |
| "error": "Nenhum cΓ£o ou gato identificado na foto. Por favor, fotografe um animal de rua.", | |
| "session_id": "", | |
| "description": {}, | |
| "similar": [], | |
| } | |
| embedding = ai.get_embedding(description) | |
| candidates = db.get_all_animals_with_embeddings() | |
| top_matches = matcher.find_top_matches(embedding, candidates, top_n=3) | |
| # Enrich matches with photo URLs and sighting info | |
| similar = [] | |
| for m in top_matches: | |
| sightings = db.get_animal_sightings(m["id"]) | |
| photo_path = next( | |
| (s["photo_path"] for s in sightings if s.get("photo_path")), None | |
| ) | |
| latest = sightings[0] if sightings else {} | |
| similar.append({ | |
| "id": m["id"], | |
| "score_pct": round(m["score"] * 100), | |
| "photo_url": _photo_url(photo_path) if photo_path else "", | |
| "days_ago": latest.get("days_ago", ""), | |
| }) | |
| # Save image to temp file for the confirm step | |
| tmp = tempfile.NamedTemporaryFile(suffix=".jpg", delete=False, dir=DATA_DIR) | |
| img.save(tmp.name, format="JPEG", quality=85) | |
| tmp.close() | |
| session_id = uuid.uuid4().hex | |
| _pending[session_id] = { | |
| "temp_path": tmp.name, | |
| "description": description, | |
| "embedding": embedding, | |
| "timestamp": time.time(), | |
| } | |
| _cleanup_sessions() | |
| log_trace({ | |
| "event": "analyze", | |
| "session_id": session_id, | |
| "description": {k: v for k, v in description.items() if k not in ("_ai_success",)}, | |
| "top_matches": [{"id": m["id"], "score_pct": m["score_pct"]} for m in similar], | |
| }) | |
| return { | |
| "session_id": session_id, | |
| "description": description, | |
| "similar": similar, | |
| } | |
| def confirm_sighting( | |
| session_id: str, | |
| gps_json: str = "", | |
| notes: str = "", | |
| condition: str = "", | |
| animal_name: str = "", | |
| ) -> dict: | |
| """ | |
| Step 2: User reviewed/edited the AI results β save sighting to DB. | |
| """ | |
| import datetime | |
| from PIL import Image as PILImage | |
| session = _pending.pop(session_id, None) | |
| if not session: | |
| return {"error": "SessΓ£o expirada. Tire a foto novamente."} | |
| img = PILImage.open(session["temp_path"]).convert("RGB") | |
| description = session["description"] | |
| embedding = session["embedding"] | |
| # Clean up temp file | |
| try: | |
| os.unlink(session["temp_path"]) | |
| except Exception: | |
| pass | |
| # Parse GPS | |
| try: | |
| coords = json.loads(gps_json) if gps_json and gps_json.strip() else {} | |
| except Exception: | |
| coords = {} | |
| lat = round(float(coords["lat"]), 5) if coords.get("lat") else None | |
| lng = round(float(coords["lng"]), 5) if coords.get("lng") else None | |
| # Append condition to notes | |
| full_notes = notes | |
| if condition: | |
| full_notes = (notes + f" [CondiΓ§Γ£o: {condition}]").strip() | |
| candidates = db.get_all_animals_with_embeddings() | |
| match = matcher.find_match(embedding, candidates) | |
| clean_name = animal_name.strip() or None | |
| if match: | |
| animal_id, _ = match | |
| photo_path = db.save_photo(img, animal_id=animal_id) | |
| db.add_sighting(animal_id, photo_path, lat, lng, full_notes) | |
| db.update_animal(animal_id) | |
| if clean_name: | |
| db.update_animal_name(animal_id, clean_name) | |
| animal = db.get_animal(animal_id) | |
| count = animal["sighting_count"] | |
| species = animal["species"] | |
| desc_obj = json.loads(animal.get("description") or "{}") | |
| is_new = False | |
| else: | |
| animal_id = db.create_animal(description, embedding, name=clean_name) | |
| photo_path = db.save_photo(img, animal_id=animal_id) | |
| db.add_sighting(animal_id, photo_path, lat, lng, full_notes) | |
| count = 1 | |
| species = description.get("species", "dog") | |
| desc_obj = description | |
| is_new = True | |
| # Display name: user-given > AI-generated fallback | |
| animal_row = db.get_animal(animal_id) | |
| saved_name = animal_row.get("name") if animal_row else None | |
| breed = desc_obj.get("breed_estimate", "") | |
| color = desc_obj.get("primary_color", "") | |
| name = saved_name or " ".join(filter(None, [ | |
| "Dog" if species == "dog" else "Cat", | |
| color.capitalize() if color else "", | |
| breed if breed and breed.lower() not in ("srd", "unknown", "") else "", | |
| ])).strip() or ("Dog" if species == "dog" else "Cat") | |
| result = { | |
| "animal_id": animal_id, | |
| "is_new": is_new, | |
| "count": count, | |
| "species": species, | |
| "name": name, | |
| "photo_url": _photo_url(photo_path) if photo_path else "", | |
| "location": f"Lat {lat:.4f}, Lng {lng:.4f}" if lat and lng else "LocalizaΓ§Γ£o nΓ£o registrada", | |
| "time": datetime.datetime.now().strftime("%H:%M"), | |
| } | |
| log_trace({ | |
| "event": "confirm", | |
| "session_id": session_id, | |
| "animal_id": animal_id, | |
| "is_new": is_new, | |
| "species": species, | |
| "sighting_count": count, | |
| "gps": {"lat": lat, "lng": lng}, | |
| "description": desc_obj, | |
| }) | |
| return result | |
| def _cleanup_sessions(): | |
| cutoff = time.time() - 1800 # 30 min | |
| for k in list(_pending.keys()): | |
| if _pending[k]["timestamp"] < cutoff: | |
| try: | |
| os.unlink(_pending[k]["temp_path"]) | |
| except Exception: | |
| pass | |
| _pending.pop(k, None) | |
| # βββ Help βββββββββββββββββββββββββββββββββββββββββββββ | |
| async def mark_helped(animal_id: int): | |
| """Legacy β mantido por compatibilidade. Prefira submit_help_proof.""" | |
| animal = db.get_animal(animal_id) | |
| if not animal: | |
| return JSONResponse(content={"error": "not found"}, status_code=404) | |
| db.add_sighting(animal_id, None, None, None, "", is_help_event=True, help_type="other") | |
| db.update_animal(animal_id) | |
| return JSONResponse(content={"ok": True}) | |
| def submit_help_proof( | |
| animal_id: int, | |
| help_type: str = "other", | |
| notes: str = "", | |
| image_path: FileData = None, | |
| ) -> dict: | |
| """ | |
| Registra que alguΓ©m ajudou o animal, com foto de prova opcional. | |
| A IA verifica se a foto Γ© do mesmo animal e detecta melhora de condiΓ§Γ£o. | |
| """ | |
| from PIL import Image as PILImage | |
| import json as _json | |
| photo_path = None | |
| ai_verified = False | |
| condition_update = None | |
| match_score = None | |
| if image_path and image_path.get("path"): | |
| img = PILImage.open(image_path["path"]).convert("RGB") | |
| # Analisa a foto com IA | |
| description = ai.analyze_image(img) | |
| if description.get("is_animal") is not False and description.get("_ai_success"): | |
| embedding = ai.get_embedding(description) | |
| candidates = db.get_all_animals_with_embeddings() | |
| # Verifica se Γ© o mesmo animal | |
| match = matcher.find_match(embedding, candidates) | |
| if match: | |
| matched_id, score = match | |
| ai_verified = (matched_id == animal_id) | |
| match_score = round(score * 100) | |
| # Detecta melhora de condiΓ§Γ£o | |
| animal_data = db.get_animal(animal_id) | |
| if animal_data: | |
| prev_desc = _json.loads(animal_data.get("description") or "{}") | |
| prev_condition = prev_desc.get("condition", "") | |
| new_condition = description.get("condition", "") | |
| condition_rank = {"injured": 0, "thin": 1, "healthy": 2} | |
| if (condition_rank.get(new_condition, -1) > | |
| condition_rank.get(prev_condition, -1)): | |
| condition_update = new_condition | |
| photo_path = db.save_photo(img, animal_id=animal_id) | |
| db.add_sighting( | |
| animal_id, | |
| photo_path, | |
| None, None, | |
| notes, | |
| is_help_event=True, | |
| help_type=help_type, | |
| ) | |
| db.update_animal(animal_id) | |
| log_trace({ | |
| "event": "help_proof", | |
| "animal_id": animal_id, | |
| "help_type": help_type, | |
| "has_photo": photo_path is not None, | |
| "ai_verified": ai_verified, | |
| "match_score": match_score, | |
| "condition_update": condition_update, | |
| }) | |
| return { | |
| "ok": True, | |
| "animal_id": animal_id, | |
| "help_type": help_type, | |
| "ai_verified": ai_verified, | |
| "match_score": match_score, | |
| "condition_update": condition_update, | |
| "photo_url": _photo_url(photo_path) if photo_path else "", | |
| } | |
| # βββ Admin ββββββββββββββββββββββββββββββββββββββββββββ | |
| async def push_traces(): | |
| """Publica data/traces.jsonl como dataset no HF Hub. | |
| Acesse esta URL no browser para disparar o upload. | |
| Requer HF_TOKEN e HF_DATASET_ID nos Secrets do Space. | |
| """ | |
| from core.tracer import push_to_hub, TRACES_PATH | |
| if not TRACES_PATH.exists(): | |
| return JSONResponse(content={"ok": False, "error": "Nenhum trace encontrado ainda."}) | |
| lines = TRACES_PATH.read_text().strip().splitlines() | |
| push_to_hub() | |
| return JSONResponse(content={"ok": True, "traces_published": len(lines)}) | |
| # βββ Launch ββββββββββββββββββββββββββββββββββββββββββββ | |
| if __name__ == "__main__": | |
| DATA_DIR.mkdir(parents=True, exist_ok=True) | |
| PHOTOS_DIR.mkdir(parents=True, exist_ok=True) | |
| app.launch( | |
| server_name="0.0.0.0", | |
| server_port=int(os.environ.get("PORT", 7860)), | |
| show_error=True, | |
| ) | |