deepdetection / src /explainability /explainer.py
akagtag's picture
Refresh Gemini candidate order to current Gemini 3 model codes
3c2e1da
from __future__ import annotations
import logging
import os
import queue
import threading
from src.types import DetectionResponse, EngineResult
logger = logging.getLogger(__name__)
try:
from google import genai as genai_new # type: ignore
except Exception:
genai_new = None
genai_legacy = None
SYSTEM_INSTRUCTION = (
"You are a deepfake forensics analyst writing reports for security professionals. "
"Given detection engine outputs, write exactly 2-3 sentences in plain English "
"explaining why the content is real or fake. "
"Be specific and name the strongest signals. "
"Use direct declarative sentences. "
"Output only the explanation text."
)
DEFAULT_MODEL_CANDIDATES = (
# Source: https://ai.google.dev/models/gemini (checked March 2026).
# Prefer current Gemini 3 model codes first, then compatibility fallbacks.
"gemini-3-pro-preview",
"gemini-3-flash-preview",
"gemini-3-pro-image-preview",
"gemini-3.1-pro-preview",
"gemini-3.1-pro-preview-customtools",
"gemini-3.1-flash-lite-preview",
"gemini-2.5-pro",
"gemini-2.5-flash",
"gemini-2.5-flash-lite",
)
_configured_candidates = [
value.strip()
for value in os.environ.get("GEMINI_MODEL_CANDIDATES", "").split(",")
if value.strip()
]
MODEL_CANDIDATES = tuple(_configured_candidates) if _configured_candidates else DEFAULT_MODEL_CANDIDATES
REQUEST_TIMEOUT_S = float(os.environ.get("GEMINI_REQUEST_TIMEOUT_S", "10"))
MAX_MODEL_ATTEMPTS = max(1, int(os.environ.get("GEMINI_MAX_MODEL_ATTEMPTS", "3")))
ENABLE_LEGACY_MODEL_DISCOVERY = os.environ.get("GEMINI_DISCOVER_MODELS", "").strip().lower() in {
"1",
"true",
"yes",
"on",
}
_new_client = None
_legacy_model = None
_legacy_model_name = None
_legacy_candidates = None
def _get_api_key() -> str:
return os.environ.get("GEMINI_API_KEY", "").strip()
def _run_with_timeout(func, timeout_s: float):
result_q: queue.Queue[tuple[bool, object]] = queue.Queue(maxsize=1)
def _runner() -> None:
try:
result_q.put((True, func()))
except Exception as exc: # pragma: no cover - passthrough
result_q.put((False, exc))
thread = threading.Thread(target=_runner, daemon=True)
thread.start()
try:
ok, payload = result_q.get(timeout=timeout_s)
except queue.Empty as exc:
raise TimeoutError(f"Gemini request timed out after {timeout_s:.1f}s") from exc
if ok:
return payload
raise payload # type: ignore[misc]
def _ensure_new_client():
global _new_client
if _new_client is not None:
return _new_client
if genai_new is None:
return None
api_key = _get_api_key()
if not api_key:
return None
try:
_new_client = genai_new.Client(api_key=api_key)
return _new_client
except Exception as exc:
logger.warning("Failed to init google.genai client: %s", exc)
return None
def _generate_with_new_sdk(prompt: str) -> str:
client = _ensure_new_client()
if client is None:
raise RuntimeError("google.genai client unavailable")
full_prompt = f"{SYSTEM_INSTRUCTION}\n\n{prompt}"
last_error: Exception | None = None
for model_name in MODEL_CANDIDATES:
try:
response = _run_with_timeout(
lambda: client.models.generate_content(
model=model_name,
contents=full_prompt,
),
REQUEST_TIMEOUT_S,
)
text = getattr(response, "text", None)
if text and str(text).strip():
logger.info("Gemini explain model selected (new SDK): %s", model_name)
return str(text).strip()
except Exception as exc:
last_error = exc
logger.debug("Gemini model %s failed on new SDK: %s", model_name, exc)
if last_error:
raise last_error
raise RuntimeError("No Gemini model succeeded via new SDK")
def _ensure_legacy_configured() -> bool:
global genai_legacy
if genai_legacy is None:
try:
import google.generativeai as _legacy # type: ignore
genai_legacy = _legacy
except Exception:
return False
if genai_legacy is None:
return False
api_key = _get_api_key()
if not api_key:
return False
try:
genai_legacy.configure(api_key=api_key)
return True
except Exception as exc:
logger.warning("Failed to configure legacy Gemini SDK: %s", exc)
return False
def _legacy_model_candidates() -> tuple[str, ...]:
global _legacy_candidates
if _legacy_candidates is not None:
return _legacy_candidates
ordered = list(MODEL_CANDIDATES)
if not ENABLE_LEGACY_MODEL_DISCOVERY:
_legacy_candidates = tuple(ordered)
return _legacy_candidates
if genai_legacy is None:
_legacy_candidates = tuple(ordered)
return _legacy_candidates
try:
discovered: list[str] = []
for model in genai_legacy.list_models(request_options={"timeout": REQUEST_TIMEOUT_S}):
methods = set(getattr(model, "supported_generation_methods", []) or [])
if "generateContent" not in methods:
continue
name = str(getattr(model, "name", "")).strip()
if not name:
continue
short = name.split("/", 1)[-1]
discovered.append(short)
if discovered:
preferred = [name for name in ordered if name in discovered]
remainder = [name for name in discovered if name not in preferred]
_legacy_candidates = tuple(preferred + remainder)
else:
_legacy_candidates = tuple(ordered)
except Exception as exc:
logger.warning("Could not list Gemini models from legacy SDK: %s", exc)
_legacy_candidates = tuple(ordered)
return _legacy_candidates
def _generate_with_legacy_sdk(prompt: str) -> str:
global _legacy_model, _legacy_model_name
if not _ensure_legacy_configured():
raise RuntimeError("legacy Gemini SDK unavailable")
if _legacy_model is not None:
try:
response = _run_with_timeout(
lambda: _legacy_model.generate_content(
prompt,
request_options={"timeout": REQUEST_TIMEOUT_S},
),
REQUEST_TIMEOUT_S + 1.0,
)
text = (getattr(response, "text", None) or "").strip()
if text:
return text
except Exception as exc:
logger.warning("Cached Gemini model %s failed: %s", _legacy_model_name, exc)
_legacy_model = None
_legacy_model_name = None
last_error: Exception | None = None
for model_name in _legacy_model_candidates()[:MAX_MODEL_ATTEMPTS]:
try:
candidate = genai_legacy.GenerativeModel(
model_name=model_name,
system_instruction=SYSTEM_INSTRUCTION,
)
response = _run_with_timeout(
lambda: candidate.generate_content(
prompt,
request_options={"timeout": REQUEST_TIMEOUT_S},
),
REQUEST_TIMEOUT_S + 1.0,
)
text = (getattr(response, "text", None) or "").strip()
if text:
_legacy_model = candidate
_legacy_model_name = model_name
logger.info("Gemini explain model selected (legacy SDK): %s", model_name)
return text
except Exception as exc:
last_error = exc
logger.debug("Gemini model %s failed on legacy SDK: %s", model_name, exc)
if last_error:
raise last_error
raise RuntimeError("No Gemini model succeeded via legacy SDK")
def explain(
verdict: str,
confidence: float,
engine_results: list[EngineResult],
generator: str,
) -> str:
breakdown = "\n".join(
f"- {result.engine}: {result.verdict} ({result.confidence:.0%}) - {result.explanation}"
for result in engine_results
)
prompt = (
f"Verdict: {verdict} ({confidence:.0%} confidence)\n"
f"Attributed generator: {generator}\n"
f"Engine breakdown:\n{breakdown}\n\n"
"Write the forensics explanation."
)
try:
if genai_new is not None:
return _generate_with_new_sdk(prompt)
return _generate_with_legacy_sdk(prompt)
except Exception as exc:
logger.error("Gemini explain failed: %s", exc)
top = engine_results[0] if engine_results else None
primary = f"Primary signal came from the {top.engine} engine." if top else ""
return (
f"Content classified as {verdict} with {confidence:.0%} confidence. "
f"Attributed generator: {generator}. "
f"{primary}"
).strip()
class Explainer:
"""Compatibility wrapper for legacy callers expecting an object API."""
def explain(self, response: DetectionResponse) -> str:
return explain(
verdict=response.verdict,
confidence=response.confidence,
engine_results=response.engine_breakdown,
generator=response.attributed_generator,
)