from __future__ import annotations import logging import os import queue import threading from src.types import DetectionResponse, EngineResult logger = logging.getLogger(__name__) try: from google import genai as genai_new # type: ignore except Exception: genai_new = None genai_legacy = None SYSTEM_INSTRUCTION = ( "You are a deepfake forensics analyst writing reports for security professionals. " "Given detection engine outputs, write exactly 2-3 sentences in plain English " "explaining why the content is real or fake. " "Be specific and name the strongest signals. " "Use direct declarative sentences. " "Output only the explanation text." ) DEFAULT_MODEL_CANDIDATES = ( # Source: https://ai.google.dev/models/gemini (checked March 2026). # Prefer current Gemini 3 model codes first, then compatibility fallbacks. "gemini-3-pro-preview", "gemini-3-flash-preview", "gemini-3-pro-image-preview", "gemini-3.1-pro-preview", "gemini-3.1-pro-preview-customtools", "gemini-3.1-flash-lite-preview", "gemini-2.5-pro", "gemini-2.5-flash", "gemini-2.5-flash-lite", ) _configured_candidates = [ value.strip() for value in os.environ.get("GEMINI_MODEL_CANDIDATES", "").split(",") if value.strip() ] MODEL_CANDIDATES = tuple(_configured_candidates) if _configured_candidates else DEFAULT_MODEL_CANDIDATES REQUEST_TIMEOUT_S = float(os.environ.get("GEMINI_REQUEST_TIMEOUT_S", "10")) MAX_MODEL_ATTEMPTS = max(1, int(os.environ.get("GEMINI_MAX_MODEL_ATTEMPTS", "3"))) ENABLE_LEGACY_MODEL_DISCOVERY = os.environ.get("GEMINI_DISCOVER_MODELS", "").strip().lower() in { "1", "true", "yes", "on", } _new_client = None _legacy_model = None _legacy_model_name = None _legacy_candidates = None def _get_api_key() -> str: return os.environ.get("GEMINI_API_KEY", "").strip() def _run_with_timeout(func, timeout_s: float): result_q: queue.Queue[tuple[bool, object]] = queue.Queue(maxsize=1) def _runner() -> None: try: result_q.put((True, func())) except Exception as exc: # pragma: no cover - passthrough result_q.put((False, exc)) thread = threading.Thread(target=_runner, daemon=True) thread.start() try: ok, payload = result_q.get(timeout=timeout_s) except queue.Empty as exc: raise TimeoutError(f"Gemini request timed out after {timeout_s:.1f}s") from exc if ok: return payload raise payload # type: ignore[misc] def _ensure_new_client(): global _new_client if _new_client is not None: return _new_client if genai_new is None: return None api_key = _get_api_key() if not api_key: return None try: _new_client = genai_new.Client(api_key=api_key) return _new_client except Exception as exc: logger.warning("Failed to init google.genai client: %s", exc) return None def _generate_with_new_sdk(prompt: str) -> str: client = _ensure_new_client() if client is None: raise RuntimeError("google.genai client unavailable") full_prompt = f"{SYSTEM_INSTRUCTION}\n\n{prompt}" last_error: Exception | None = None for model_name in MODEL_CANDIDATES: try: response = _run_with_timeout( lambda: client.models.generate_content( model=model_name, contents=full_prompt, ), REQUEST_TIMEOUT_S, ) text = getattr(response, "text", None) if text and str(text).strip(): logger.info("Gemini explain model selected (new SDK): %s", model_name) return str(text).strip() except Exception as exc: last_error = exc logger.debug("Gemini model %s failed on new SDK: %s", model_name, exc) if last_error: raise last_error raise RuntimeError("No Gemini model succeeded via new SDK") def _ensure_legacy_configured() -> bool: global genai_legacy if genai_legacy is None: try: import google.generativeai as _legacy # type: ignore genai_legacy = _legacy except Exception: return False if genai_legacy is None: return False api_key = _get_api_key() if not api_key: return False try: genai_legacy.configure(api_key=api_key) return True except Exception as exc: logger.warning("Failed to configure legacy Gemini SDK: %s", exc) return False def _legacy_model_candidates() -> tuple[str, ...]: global _legacy_candidates if _legacy_candidates is not None: return _legacy_candidates ordered = list(MODEL_CANDIDATES) if not ENABLE_LEGACY_MODEL_DISCOVERY: _legacy_candidates = tuple(ordered) return _legacy_candidates if genai_legacy is None: _legacy_candidates = tuple(ordered) return _legacy_candidates try: discovered: list[str] = [] for model in genai_legacy.list_models(request_options={"timeout": REQUEST_TIMEOUT_S}): methods = set(getattr(model, "supported_generation_methods", []) or []) if "generateContent" not in methods: continue name = str(getattr(model, "name", "")).strip() if not name: continue short = name.split("/", 1)[-1] discovered.append(short) if discovered: preferred = [name for name in ordered if name in discovered] remainder = [name for name in discovered if name not in preferred] _legacy_candidates = tuple(preferred + remainder) else: _legacy_candidates = tuple(ordered) except Exception as exc: logger.warning("Could not list Gemini models from legacy SDK: %s", exc) _legacy_candidates = tuple(ordered) return _legacy_candidates def _generate_with_legacy_sdk(prompt: str) -> str: global _legacy_model, _legacy_model_name if not _ensure_legacy_configured(): raise RuntimeError("legacy Gemini SDK unavailable") if _legacy_model is not None: try: response = _run_with_timeout( lambda: _legacy_model.generate_content( prompt, request_options={"timeout": REQUEST_TIMEOUT_S}, ), REQUEST_TIMEOUT_S + 1.0, ) text = (getattr(response, "text", None) or "").strip() if text: return text except Exception as exc: logger.warning("Cached Gemini model %s failed: %s", _legacy_model_name, exc) _legacy_model = None _legacy_model_name = None last_error: Exception | None = None for model_name in _legacy_model_candidates()[:MAX_MODEL_ATTEMPTS]: try: candidate = genai_legacy.GenerativeModel( model_name=model_name, system_instruction=SYSTEM_INSTRUCTION, ) response = _run_with_timeout( lambda: candidate.generate_content( prompt, request_options={"timeout": REQUEST_TIMEOUT_S}, ), REQUEST_TIMEOUT_S + 1.0, ) text = (getattr(response, "text", None) or "").strip() if text: _legacy_model = candidate _legacy_model_name = model_name logger.info("Gemini explain model selected (legacy SDK): %s", model_name) return text except Exception as exc: last_error = exc logger.debug("Gemini model %s failed on legacy SDK: %s", model_name, exc) if last_error: raise last_error raise RuntimeError("No Gemini model succeeded via legacy SDK") def explain( verdict: str, confidence: float, engine_results: list[EngineResult], generator: str, ) -> str: breakdown = "\n".join( f"- {result.engine}: {result.verdict} ({result.confidence:.0%}) - {result.explanation}" for result in engine_results ) prompt = ( f"Verdict: {verdict} ({confidence:.0%} confidence)\n" f"Attributed generator: {generator}\n" f"Engine breakdown:\n{breakdown}\n\n" "Write the forensics explanation." ) try: if genai_new is not None: return _generate_with_new_sdk(prompt) return _generate_with_legacy_sdk(prompt) except Exception as exc: logger.error("Gemini explain failed: %s", exc) top = engine_results[0] if engine_results else None primary = f"Primary signal came from the {top.engine} engine." if top else "" return ( f"Content classified as {verdict} with {confidence:.0%} confidence. " f"Attributed generator: {generator}. " f"{primary}" ).strip() class Explainer: """Compatibility wrapper for legacy callers expecting an object API.""" def explain(self, response: DetectionResponse) -> str: return explain( verdict=response.verdict, confidence=response.confidence, engine_results=response.engine_breakdown, generator=response.attributed_generator, )