""" LangChain agent wiring: registers all tools and invokes them per modality. Returns a structured AnalysisResult. """ import json from langchain_google_genai import ChatGoogleGenerativeAI from langchain_core.messages import HumanMessage, SystemMessage from app.config import GEMINI_API_KEY, GEMINI_MODEL, GEMINI_MODEL_FALLBACKS from app.models import AnalysisResult def _risk_level(score: float) -> str: if score < 0.3: return "LOW" elif score < 0.6: return "MEDIUM" elif score < 0.85: return "HIGH" return "CRITICAL" def invoke_with_fallback(messages: list) -> str: """Try GEMINI_MODEL then each fallback until one succeeds.""" models_to_try = [GEMINI_MODEL] + GEMINI_MODEL_FALLBACKS last_err = None for model_name in models_to_try: try: llm = ChatGoogleGenerativeAI( model=model_name, google_api_key=GEMINI_API_KEY, temperature=0.1, ) return llm.invoke(messages).content except Exception as e: last_err = e if "429" not in str(e) and "RESOURCE_EXHAUSTED" not in str(e): raise raise RuntimeError(f"All Gemini models exhausted. Last error: {last_err}") def _merge_factcheck(result: AnalysisResult, fc: dict) -> AnalysisResult: """Merge a fact-check result dict into an existing AnalysisResult.""" verdict = fc.get("verdict", "UNVERIFIABLE") content_type = fc.get("content_type", "unknown") simplified = fc.get("simplified_explanation", "") fc_score = float(fc.get("risk_score", 0.5)) fc_threats = fc.get("threat_types", []) combined_score = round(max(result.risk_score, fc_score * 0.6), 3) combined_threats = list(set(result.threat_types + fc_threats)) return AnalysisResult( risk_score=combined_score, risk_level=_risk_level(combined_score), verdict=verdict, content_type=content_type, threat_types=combined_threats, explanation=result.explanation, simplified_explanation=simplified, tool_outputs={**result.tool_outputs, "fact_check": fc}, ) def run_text_agent(text: str, url_flags: dict) -> AnalysisResult: from app.tools.fakenews_tools import classify_and_fact_check system = ( "You are a cybersecurity expert specializing in phishing detection. " "Analyse the provided text for phishing indicators: urgency language, " "impersonation, social engineering, suspicious URLs, credential harvesting. " "Respond ONLY with valid JSON matching this schema: " '{"risk_score": , "threat_types": [], "explanation": }' ) prompt = f"TEXT TO ANALYSE:\n{text}\n\nURL SCAN RESULTS:\n{json.dumps(url_flags)}" raw = invoke_with_fallback([SystemMessage(content=system), HumanMessage(content=prompt)]) raw = raw.strip().strip("```json").strip("```").strip() data = json.loads(raw) score = float(data["risk_score"]) base = AnalysisResult( risk_score=score, risk_level=_risk_level(score), threat_types=data.get("threat_types", []), explanation=data.get("explanation", ""), tool_outputs={"gemini_text": data, "url_scan": url_flags}, ) fc = classify_and_fact_check(text) return _merge_factcheck(base, fc) def run_image_agent(gemini_result: dict, fc_result: dict | None = None) -> AnalysisResult: gemini_score = gemini_result.get("risk_score", 0.0) is_manipulated = gemini_result.get("is_manipulated", False) threat_types = gemini_result.get("threat_types", []) explanation = f"Gemini vision analysis: {gemini_result.get('explanation', '')}" # Visual verdict is authoritative — fact-check cannot override it if is_manipulated or gemini_score >= 0.7: verdict = "FAKE" elif gemini_score <= 0.2: verdict = "REAL" else: verdict = "UNVERIFIABLE" tool_outputs: dict = {"gemini_vision": gemini_result} if fc_result: tool_outputs["fact_check"] = fc_result return AnalysisResult( risk_score=gemini_score, risk_level=_risk_level(gemini_score), verdict=verdict, content_type="unknown", threat_types=threat_types, explanation=explanation, simplified_explanation=gemini_result.get("explanation", ""), tool_outputs=tool_outputs, ) def run_video_agent(gemini_result: dict, frame_scores: list[float]) -> AnalysisResult: gemini_score = gemini_result.get("risk_score", 0.0) avg_frame = sum(frame_scores) / len(frame_scores) if frame_scores else 0.0 combined = round((gemini_score * 0.6) + (avg_frame * 0.4), 3) explanation = ( f"Gemini video analysis score: {gemini_score:.2f}. " f"Frame-level deepfake average: {avg_frame:.2f} over {len(frame_scores)} frames. " f"{gemini_result.get('explanation', '')}" ) return AnalysisResult( risk_score=combined, risk_level=_risk_level(combined), verdict="FAKE" if combined > 0.5 else "UNVERIFIABLE", content_type="unknown", threat_types=gemini_result.get("threat_types", ["deepfake_video"]), explanation=explanation, simplified_explanation=( "This video shows signs of AI manipulation or deepfake content." if combined > 0.5 else "No definitive deepfake signals detected, but proceed with caution." ), tool_outputs={"gemini_video": gemini_result, "frame_scores": frame_scores}, ) def run_audio_agent(hf_result: dict, gemini_result: dict) -> AnalysisResult: hf_score = hf_result.get("deepfake_score", 0.0) gemini_score = gemini_result.get("risk_score", 0.0) combined = round((hf_score * 0.5) + (gemini_score * 0.5), 3) threat_types = list( set(hf_result.get("threat_types", []) + gemini_result.get("threat_types", [])) ) explanation = ( f"HuggingFace audio deepfake model: {hf_result.get('label', 'N/A')} " f"(confidence {hf_score:.2f}). " f"Gemini audio analysis: {gemini_result.get('explanation', '')}" ) return AnalysisResult( risk_score=combined, risk_level=_risk_level(combined), verdict="FAKE" if combined > 0.5 else "UNVERIFIABLE", content_type="unknown", threat_types=threat_types, explanation=explanation, simplified_explanation=( "This audio appears to be AI-generated or synthetically cloned. Do not trust its authenticity." if combined > 0.5 else "No strong deepfake signals in audio, but remain cautious." ), tool_outputs={"hf_audio": hf_result, "gemini_audio": gemini_result}, )