fix: Gemini vision verdict authoritative, add AI manipulation detection
Browse files- app/agent.py +19 -6
- app/tools/image_tools.py +22 -14
app/agent.py
CHANGED
|
@@ -91,19 +91,32 @@ def run_text_agent(text: str, url_flags: dict) -> AnalysisResult:
|
|
| 91 |
|
| 92 |
def run_image_agent(gemini_result: dict, fc_result: dict | None = None) -> AnalysisResult:
|
| 93 |
gemini_score = gemini_result.get("risk_score", 0.0)
|
|
|
|
| 94 |
threat_types = gemini_result.get("threat_types", [])
|
| 95 |
explanation = f"Gemini vision analysis: {gemini_result.get('explanation', '')}"
|
| 96 |
-
|
| 97 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 98 |
risk_score=gemini_score,
|
| 99 |
risk_level=_risk_level(gemini_score),
|
|
|
|
|
|
|
| 100 |
threat_types=threat_types,
|
| 101 |
explanation=explanation,
|
| 102 |
-
|
|
|
|
| 103 |
)
|
| 104 |
-
if fc_result:
|
| 105 |
-
return _merge_factcheck(base, fc_result)
|
| 106 |
-
return base
|
| 107 |
|
| 108 |
|
| 109 |
def run_video_agent(gemini_result: dict, frame_scores: list[float]) -> AnalysisResult:
|
|
|
|
| 91 |
|
| 92 |
def run_image_agent(gemini_result: dict, fc_result: dict | None = None) -> AnalysisResult:
|
| 93 |
gemini_score = gemini_result.get("risk_score", 0.0)
|
| 94 |
+
is_manipulated = gemini_result.get("is_manipulated", False)
|
| 95 |
threat_types = gemini_result.get("threat_types", [])
|
| 96 |
explanation = f"Gemini vision analysis: {gemini_result.get('explanation', '')}"
|
| 97 |
+
|
| 98 |
+
# Visual verdict is authoritative — fact-check cannot override it
|
| 99 |
+
if is_manipulated or gemini_score >= 0.7:
|
| 100 |
+
verdict = "FAKE"
|
| 101 |
+
elif gemini_score <= 0.2:
|
| 102 |
+
verdict = "REAL"
|
| 103 |
+
else:
|
| 104 |
+
verdict = "UNVERIFIABLE"
|
| 105 |
+
|
| 106 |
+
tool_outputs: dict = {"gemini_vision": gemini_result}
|
| 107 |
+
if fc_result:
|
| 108 |
+
tool_outputs["fact_check"] = fc_result
|
| 109 |
+
|
| 110 |
+
return AnalysisResult(
|
| 111 |
risk_score=gemini_score,
|
| 112 |
risk_level=_risk_level(gemini_score),
|
| 113 |
+
verdict=verdict,
|
| 114 |
+
content_type="unknown",
|
| 115 |
threat_types=threat_types,
|
| 116 |
explanation=explanation,
|
| 117 |
+
simplified_explanation=gemini_result.get("explanation", ""),
|
| 118 |
+
tool_outputs=tool_outputs,
|
| 119 |
)
|
|
|
|
|
|
|
|
|
|
| 120 |
|
| 121 |
|
| 122 |
def run_video_agent(gemini_result: dict, frame_scores: list[float]) -> AnalysisResult:
|
app/tools/image_tools.py
CHANGED
|
@@ -88,12 +88,22 @@ def gemini_extract_image_text(image_bytes: bytes, mime_type: str = "image/jpeg")
|
|
| 88 |
def gemini_analyze_image(image_bytes: bytes, mime_type: str = "image/jpeg") -> dict:
|
| 89 |
b64 = base64.b64encode(image_bytes).decode()
|
| 90 |
system = (
|
| 91 |
-
"You are
|
| 92 |
-
"
|
| 93 |
-
"
|
| 94 |
-
"
|
| 95 |
-
"
|
| 96 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 97 |
)
|
| 98 |
message = HumanMessage(
|
| 99 |
content=[
|
|
@@ -102,25 +112,23 @@ def gemini_analyze_image(image_bytes: bytes, mime_type: str = "image/jpeg") -> d
|
|
| 102 |
]
|
| 103 |
)
|
| 104 |
from app.tools.retry_utils import execute_with_retry
|
| 105 |
-
|
| 106 |
-
search_tool = types.Tool(google_search=types.GoogleSearch())
|
| 107 |
-
|
| 108 |
for model in [GEMINI_MODEL] + GEMINI_MODEL_FALLBACKS:
|
| 109 |
try:
|
| 110 |
resp = execute_with_retry(
|
| 111 |
lambda m=model: ChatGoogleGenerativeAI(
|
| 112 |
-
model=m,
|
| 113 |
-
google_api_key=GEMINI_API_KEY,
|
| 114 |
temperature=0.1
|
| 115 |
-
).invoke([message]
|
| 116 |
)
|
| 117 |
raw = resp.content
|
| 118 |
if not isinstance(raw, str):
|
| 119 |
raw = str(raw)
|
| 120 |
-
|
| 121 |
raw = raw.strip().strip("```json").strip("```").strip()
|
| 122 |
return json.loads(raw)
|
| 123 |
except Exception as e:
|
| 124 |
if "429" not in str(e) and "RESOURCE_EXHAUSTED" not in str(e):
|
| 125 |
raise
|
| 126 |
-
return {"risk_score": 0.0, "threat_types": [], "explanation": "Gemini quota exhausted for all models"}
|
|
|
|
|
|
| 88 |
def gemini_analyze_image(image_bytes: bytes, mime_type: str = "image/jpeg") -> dict:
|
| 89 |
b64 = base64.b64encode(image_bytes).decode()
|
| 90 |
system = (
|
| 91 |
+
"You are a forensic image authenticity expert and cybersecurity analyst. "
|
| 92 |
+
"Your PRIMARY job is to detect whether this image has been artificially altered, manipulated, or generated. "
|
| 93 |
+
"Ignore watermarks, logos, and brand names when judging authenticity — focus only on the PIXELS and CONTENT. "
|
| 94 |
+
"\n\nExamine closely for:"
|
| 95 |
+
"\n- Face swaps: a face that looks pasted, skin tone mismatch, edge artifacts around the face/hairline"
|
| 96 |
+
"\n- AI-generated faces: overly smooth skin, symmetrical imperfections, glassy eyes, blurred ear/hair detail"
|
| 97 |
+
"\n- Photoshop/edit artifacts: inconsistent lighting/shadows, copy-paste regions, clone-stamp patterns, unnatural blurs"
|
| 98 |
+
"\n- Composite images: mismatched camera perspectives, different image qualities in same frame"
|
| 99 |
+
"\n- Fake documents: ID cards where the embedded photo doesn't match the person wearing it"
|
| 100 |
+
"\n- Deepfake video frames: temporal inconsistencies, boundary blending around faces"
|
| 101 |
+
"\n\nScoring rules:"
|
| 102 |
+
"\n- If ANY manipulation is detected → risk_score >= 0.7, verdict = FAKE"
|
| 103 |
+
"\n- If the image looks 100% authentic with no manipulation → risk_score <= 0.2"
|
| 104 |
+
"\n- Be skeptical. If uncertain, lean towards higher risk_score."
|
| 105 |
+
"\n\nYou MUST return ONLY valid JSON, nothing else: "
|
| 106 |
+
'{"risk_score": <float 0-1>, "is_manipulated": <bool>, "threat_types": [<strings>], "explanation": <string>}'
|
| 107 |
)
|
| 108 |
message = HumanMessage(
|
| 109 |
content=[
|
|
|
|
| 112 |
]
|
| 113 |
)
|
| 114 |
from app.tools.retry_utils import execute_with_retry
|
| 115 |
+
|
|
|
|
|
|
|
| 116 |
for model in [GEMINI_MODEL] + GEMINI_MODEL_FALLBACKS:
|
| 117 |
try:
|
| 118 |
resp = execute_with_retry(
|
| 119 |
lambda m=model: ChatGoogleGenerativeAI(
|
| 120 |
+
model=m,
|
| 121 |
+
google_api_key=GEMINI_API_KEY,
|
| 122 |
temperature=0.1
|
| 123 |
+
).invoke([message])
|
| 124 |
)
|
| 125 |
raw = resp.content
|
| 126 |
if not isinstance(raw, str):
|
| 127 |
raw = str(raw)
|
|
|
|
| 128 |
raw = raw.strip().strip("```json").strip("```").strip()
|
| 129 |
return json.loads(raw)
|
| 130 |
except Exception as e:
|
| 131 |
if "429" not in str(e) and "RESOURCE_EXHAUSTED" not in str(e):
|
| 132 |
raise
|
| 133 |
+
return {"risk_score": 0.0, "is_manipulated": False, "threat_types": [], "explanation": "Gemini quota exhausted for all models"}
|
| 134 |
+
|