| """ |
| LangChain agent wiring: registers all tools and invokes them per modality. |
| Returns a structured AnalysisResult. |
| """ |
| import json |
| from langchain_google_genai import ChatGoogleGenerativeAI |
| from langchain_core.messages import HumanMessage, SystemMessage |
| from app.config import GEMINI_API_KEY, GEMINI_MODEL, GEMINI_MODEL_FALLBACKS |
| from app.models import AnalysisResult |
|
|
|
|
| def _risk_level(score: float) -> str: |
| if score < 0.3: |
| return "LOW" |
| elif score < 0.6: |
| return "MEDIUM" |
| elif score < 0.85: |
| return "HIGH" |
| return "CRITICAL" |
|
|
|
|
| def invoke_with_fallback(messages: list) -> str: |
| """Try GEMINI_MODEL then each fallback until one succeeds.""" |
| models_to_try = [GEMINI_MODEL] + GEMINI_MODEL_FALLBACKS |
| last_err = None |
| for model_name in models_to_try: |
| try: |
| llm = ChatGoogleGenerativeAI( |
| model=model_name, |
| google_api_key=GEMINI_API_KEY, |
| temperature=0.1, |
| ) |
| return llm.invoke(messages).content |
| except Exception as e: |
| last_err = e |
| if "429" not in str(e) and "RESOURCE_EXHAUSTED" not in str(e): |
| raise |
| raise RuntimeError(f"All Gemini models exhausted. Last error: {last_err}") |
|
|
|
|
| def _merge_factcheck(result: AnalysisResult, fc: dict) -> AnalysisResult: |
| """Merge a fact-check result dict into an existing AnalysisResult.""" |
| verdict = fc.get("verdict", "UNVERIFIABLE") |
| content_type = fc.get("content_type", "unknown") |
| simplified = fc.get("simplified_explanation", "") |
| fc_score = float(fc.get("risk_score", 0.5)) |
| fc_threats = fc.get("threat_types", []) |
|
|
| combined_score = round(max(result.risk_score, fc_score * 0.6), 3) |
| combined_threats = list(set(result.threat_types + fc_threats)) |
|
|
| return AnalysisResult( |
| risk_score=combined_score, |
| risk_level=_risk_level(combined_score), |
| verdict=verdict, |
| content_type=content_type, |
| threat_types=combined_threats, |
| explanation=result.explanation, |
| simplified_explanation=simplified, |
| tool_outputs={**result.tool_outputs, "fact_check": fc}, |
| ) |
|
|
|
|
| def run_text_agent(text: str, url_flags: dict) -> AnalysisResult: |
| from app.tools.fakenews_tools import classify_and_fact_check |
|
|
| system = ( |
| "You are a cybersecurity expert specializing in phishing detection. " |
| "Analyse the provided text for phishing indicators: urgency language, " |
| "impersonation, social engineering, suspicious URLs, credential harvesting. " |
| "Respond ONLY with valid JSON matching this schema: " |
| '{"risk_score": <float 0-1>, "threat_types": [<strings>], "explanation": <string>}' |
| ) |
| prompt = f"TEXT TO ANALYSE:\n{text}\n\nURL SCAN RESULTS:\n{json.dumps(url_flags)}" |
| raw = invoke_with_fallback([SystemMessage(content=system), HumanMessage(content=prompt)]) |
| raw = raw.strip().strip("```json").strip("```").strip() |
| data = json.loads(raw) |
| score = float(data["risk_score"]) |
|
|
| base = AnalysisResult( |
| risk_score=score, |
| risk_level=_risk_level(score), |
| threat_types=data.get("threat_types", []), |
| explanation=data.get("explanation", ""), |
| tool_outputs={"gemini_text": data, "url_scan": url_flags}, |
| ) |
|
|
| fc = classify_and_fact_check(text) |
| return _merge_factcheck(base, fc) |
|
|
|
|
| def run_image_agent(gemini_result: dict, fc_result: dict | None = None) -> AnalysisResult: |
| gemini_score = gemini_result.get("risk_score", 0.0) |
| is_manipulated = gemini_result.get("is_manipulated", False) |
| threat_types = gemini_result.get("threat_types", []) |
| explanation = f"Gemini vision analysis: {gemini_result.get('explanation', '')}" |
|
|
| |
| if is_manipulated or gemini_score >= 0.7: |
| verdict = "FAKE" |
| elif gemini_score <= 0.2: |
| verdict = "REAL" |
| else: |
| verdict = "UNVERIFIABLE" |
|
|
| tool_outputs: dict = {"gemini_vision": gemini_result} |
| if fc_result: |
| tool_outputs["fact_check"] = fc_result |
|
|
| return AnalysisResult( |
| risk_score=gemini_score, |
| risk_level=_risk_level(gemini_score), |
| verdict=verdict, |
| content_type="unknown", |
| threat_types=threat_types, |
| explanation=explanation, |
| simplified_explanation=gemini_result.get("explanation", ""), |
| tool_outputs=tool_outputs, |
| ) |
|
|
|
|
| def run_video_agent(gemini_result: dict, frame_scores: list[float]) -> AnalysisResult: |
| gemini_score = gemini_result.get("risk_score", 0.0) |
| avg_frame = sum(frame_scores) / len(frame_scores) if frame_scores else 0.0 |
| combined = round((gemini_score * 0.6) + (avg_frame * 0.4), 3) |
| explanation = ( |
| f"Gemini video analysis score: {gemini_score:.2f}. " |
| f"Frame-level deepfake average: {avg_frame:.2f} over {len(frame_scores)} frames. " |
| f"{gemini_result.get('explanation', '')}" |
| ) |
| return AnalysisResult( |
| risk_score=combined, |
| risk_level=_risk_level(combined), |
| verdict="FAKE" if combined > 0.5 else "UNVERIFIABLE", |
| content_type="unknown", |
| threat_types=gemini_result.get("threat_types", ["deepfake_video"]), |
| explanation=explanation, |
| simplified_explanation=( |
| "This video shows signs of AI manipulation or deepfake content." |
| if combined > 0.5 else |
| "No definitive deepfake signals detected, but proceed with caution." |
| ), |
| tool_outputs={"gemini_video": gemini_result, "frame_scores": frame_scores}, |
| ) |
|
|
|
|
| def run_audio_agent(hf_result: dict, gemini_result: dict) -> AnalysisResult: |
| hf_score = hf_result.get("deepfake_score", 0.0) |
| gemini_score = gemini_result.get("risk_score", 0.0) |
| combined = round((hf_score * 0.5) + (gemini_score * 0.5), 3) |
| threat_types = list( |
| set(hf_result.get("threat_types", []) + gemini_result.get("threat_types", [])) |
| ) |
| explanation = ( |
| f"HuggingFace audio deepfake model: {hf_result.get('label', 'N/A')} " |
| f"(confidence {hf_score:.2f}). " |
| f"Gemini audio analysis: {gemini_result.get('explanation', '')}" |
| ) |
| return AnalysisResult( |
| risk_score=combined, |
| risk_level=_risk_level(combined), |
| verdict="FAKE" if combined > 0.5 else "UNVERIFIABLE", |
| content_type="unknown", |
| threat_types=threat_types, |
| explanation=explanation, |
| simplified_explanation=( |
| "This audio appears to be AI-generated or synthetically cloned. Do not trust its authenticity." |
| if combined > 0.5 else |
| "No strong deepfake signals in audio, but remain cautious." |
| ), |
| tool_outputs={"hf_audio": hf_result, "gemini_audio": gemini_result}, |
| ) |
|
|