Spaces:
Running
Running
| from __future__ import annotations | |
| import logging | |
| import os | |
| import queue | |
| import threading | |
| from src.types import DetectionResponse, EngineResult | |
| logger = logging.getLogger(__name__) | |
| try: | |
| from google import genai as genai_new # type: ignore | |
| except Exception: | |
| genai_new = None | |
| genai_legacy = None | |
| SYSTEM_INSTRUCTION = ( | |
| "You are a deepfake forensics analyst writing reports for security professionals. " | |
| "Given detection engine outputs, write exactly 2-3 sentences in plain English " | |
| "explaining why the content is real or fake. " | |
| "Be specific and name the strongest signals. " | |
| "Use direct declarative sentences. " | |
| "Output only the explanation text." | |
| ) | |
| DEFAULT_MODEL_CANDIDATES = ( | |
| # Source: https://ai.google.dev/models/gemini (checked March 2026). | |
| # Prefer current Gemini 3 model codes first, then compatibility fallbacks. | |
| "gemini-3-pro-preview", | |
| "gemini-3-flash-preview", | |
| "gemini-3-pro-image-preview", | |
| "gemini-3.1-pro-preview", | |
| "gemini-3.1-pro-preview-customtools", | |
| "gemini-3.1-flash-lite-preview", | |
| "gemini-2.5-pro", | |
| "gemini-2.5-flash", | |
| "gemini-2.5-flash-lite", | |
| ) | |
| _configured_candidates = [ | |
| value.strip() | |
| for value in os.environ.get("GEMINI_MODEL_CANDIDATES", "").split(",") | |
| if value.strip() | |
| ] | |
| MODEL_CANDIDATES = tuple(_configured_candidates) if _configured_candidates else DEFAULT_MODEL_CANDIDATES | |
| REQUEST_TIMEOUT_S = float(os.environ.get("GEMINI_REQUEST_TIMEOUT_S", "10")) | |
| MAX_MODEL_ATTEMPTS = max(1, int(os.environ.get("GEMINI_MAX_MODEL_ATTEMPTS", "3"))) | |
| ENABLE_LEGACY_MODEL_DISCOVERY = os.environ.get("GEMINI_DISCOVER_MODELS", "").strip().lower() in { | |
| "1", | |
| "true", | |
| "yes", | |
| "on", | |
| } | |
| _new_client = None | |
| _legacy_model = None | |
| _legacy_model_name = None | |
| _legacy_candidates = None | |
| def _get_api_key() -> str: | |
| return os.environ.get("GEMINI_API_KEY", "").strip() | |
| def _run_with_timeout(func, timeout_s: float): | |
| result_q: queue.Queue[tuple[bool, object]] = queue.Queue(maxsize=1) | |
| def _runner() -> None: | |
| try: | |
| result_q.put((True, func())) | |
| except Exception as exc: # pragma: no cover - passthrough | |
| result_q.put((False, exc)) | |
| thread = threading.Thread(target=_runner, daemon=True) | |
| thread.start() | |
| try: | |
| ok, payload = result_q.get(timeout=timeout_s) | |
| except queue.Empty as exc: | |
| raise TimeoutError(f"Gemini request timed out after {timeout_s:.1f}s") from exc | |
| if ok: | |
| return payload | |
| raise payload # type: ignore[misc] | |
| def _ensure_new_client(): | |
| global _new_client | |
| if _new_client is not None: | |
| return _new_client | |
| if genai_new is None: | |
| return None | |
| api_key = _get_api_key() | |
| if not api_key: | |
| return None | |
| try: | |
| _new_client = genai_new.Client(api_key=api_key) | |
| return _new_client | |
| except Exception as exc: | |
| logger.warning("Failed to init google.genai client: %s", exc) | |
| return None | |
| def _generate_with_new_sdk(prompt: str) -> str: | |
| client = _ensure_new_client() | |
| if client is None: | |
| raise RuntimeError("google.genai client unavailable") | |
| full_prompt = f"{SYSTEM_INSTRUCTION}\n\n{prompt}" | |
| last_error: Exception | None = None | |
| for model_name in MODEL_CANDIDATES: | |
| try: | |
| response = _run_with_timeout( | |
| lambda: client.models.generate_content( | |
| model=model_name, | |
| contents=full_prompt, | |
| ), | |
| REQUEST_TIMEOUT_S, | |
| ) | |
| text = getattr(response, "text", None) | |
| if text and str(text).strip(): | |
| logger.info("Gemini explain model selected (new SDK): %s", model_name) | |
| return str(text).strip() | |
| except Exception as exc: | |
| last_error = exc | |
| logger.debug("Gemini model %s failed on new SDK: %s", model_name, exc) | |
| if last_error: | |
| raise last_error | |
| raise RuntimeError("No Gemini model succeeded via new SDK") | |
| def _ensure_legacy_configured() -> bool: | |
| global genai_legacy | |
| if genai_legacy is None: | |
| try: | |
| import google.generativeai as _legacy # type: ignore | |
| genai_legacy = _legacy | |
| except Exception: | |
| return False | |
| if genai_legacy is None: | |
| return False | |
| api_key = _get_api_key() | |
| if not api_key: | |
| return False | |
| try: | |
| genai_legacy.configure(api_key=api_key) | |
| return True | |
| except Exception as exc: | |
| logger.warning("Failed to configure legacy Gemini SDK: %s", exc) | |
| return False | |
| def _legacy_model_candidates() -> tuple[str, ...]: | |
| global _legacy_candidates | |
| if _legacy_candidates is not None: | |
| return _legacy_candidates | |
| ordered = list(MODEL_CANDIDATES) | |
| if not ENABLE_LEGACY_MODEL_DISCOVERY: | |
| _legacy_candidates = tuple(ordered) | |
| return _legacy_candidates | |
| if genai_legacy is None: | |
| _legacy_candidates = tuple(ordered) | |
| return _legacy_candidates | |
| try: | |
| discovered: list[str] = [] | |
| for model in genai_legacy.list_models(request_options={"timeout": REQUEST_TIMEOUT_S}): | |
| methods = set(getattr(model, "supported_generation_methods", []) or []) | |
| if "generateContent" not in methods: | |
| continue | |
| name = str(getattr(model, "name", "")).strip() | |
| if not name: | |
| continue | |
| short = name.split("/", 1)[-1] | |
| discovered.append(short) | |
| if discovered: | |
| preferred = [name for name in ordered if name in discovered] | |
| remainder = [name for name in discovered if name not in preferred] | |
| _legacy_candidates = tuple(preferred + remainder) | |
| else: | |
| _legacy_candidates = tuple(ordered) | |
| except Exception as exc: | |
| logger.warning("Could not list Gemini models from legacy SDK: %s", exc) | |
| _legacy_candidates = tuple(ordered) | |
| return _legacy_candidates | |
| def _generate_with_legacy_sdk(prompt: str) -> str: | |
| global _legacy_model, _legacy_model_name | |
| if not _ensure_legacy_configured(): | |
| raise RuntimeError("legacy Gemini SDK unavailable") | |
| if _legacy_model is not None: | |
| try: | |
| response = _run_with_timeout( | |
| lambda: _legacy_model.generate_content( | |
| prompt, | |
| request_options={"timeout": REQUEST_TIMEOUT_S}, | |
| ), | |
| REQUEST_TIMEOUT_S + 1.0, | |
| ) | |
| text = (getattr(response, "text", None) or "").strip() | |
| if text: | |
| return text | |
| except Exception as exc: | |
| logger.warning("Cached Gemini model %s failed: %s", _legacy_model_name, exc) | |
| _legacy_model = None | |
| _legacy_model_name = None | |
| last_error: Exception | None = None | |
| for model_name in _legacy_model_candidates()[:MAX_MODEL_ATTEMPTS]: | |
| try: | |
| candidate = genai_legacy.GenerativeModel( | |
| model_name=model_name, | |
| system_instruction=SYSTEM_INSTRUCTION, | |
| ) | |
| response = _run_with_timeout( | |
| lambda: candidate.generate_content( | |
| prompt, | |
| request_options={"timeout": REQUEST_TIMEOUT_S}, | |
| ), | |
| REQUEST_TIMEOUT_S + 1.0, | |
| ) | |
| text = (getattr(response, "text", None) or "").strip() | |
| if text: | |
| _legacy_model = candidate | |
| _legacy_model_name = model_name | |
| logger.info("Gemini explain model selected (legacy SDK): %s", model_name) | |
| return text | |
| except Exception as exc: | |
| last_error = exc | |
| logger.debug("Gemini model %s failed on legacy SDK: %s", model_name, exc) | |
| if last_error: | |
| raise last_error | |
| raise RuntimeError("No Gemini model succeeded via legacy SDK") | |
| def explain( | |
| verdict: str, | |
| confidence: float, | |
| engine_results: list[EngineResult], | |
| generator: str, | |
| ) -> str: | |
| breakdown = "\n".join( | |
| f"- {result.engine}: {result.verdict} ({result.confidence:.0%}) - {result.explanation}" | |
| for result in engine_results | |
| ) | |
| prompt = ( | |
| f"Verdict: {verdict} ({confidence:.0%} confidence)\n" | |
| f"Attributed generator: {generator}\n" | |
| f"Engine breakdown:\n{breakdown}\n\n" | |
| "Write the forensics explanation." | |
| ) | |
| try: | |
| if genai_new is not None: | |
| return _generate_with_new_sdk(prompt) | |
| return _generate_with_legacy_sdk(prompt) | |
| except Exception as exc: | |
| logger.error("Gemini explain failed: %s", exc) | |
| top = engine_results[0] if engine_results else None | |
| primary = f"Primary signal came from the {top.engine} engine." if top else "" | |
| return ( | |
| f"Content classified as {verdict} with {confidence:.0%} confidence. " | |
| f"Attributed generator: {generator}. " | |
| f"{primary}" | |
| ).strip() | |
| class Explainer: | |
| """Compatibility wrapper for legacy callers expecting an object API.""" | |
| def explain(self, response: DetectionResponse) -> str: | |
| return explain( | |
| verdict=response.verdict, | |
| confidence=response.confidence, | |
| engine_results=response.engine_breakdown, | |
| generator=response.attributed_generator, | |
| ) | |