import logging import os import asyncio import hashlib import datetime import json import re import csv from pathlib import Path from typing import Any, Dict logger = logging.getLogger(__name__) # Apply nested asyncio if possible try: import nest_asyncio nest_asyncio.apply() except (ValueError, ImportError): pass import common_utils import inference_logic # --- Tool Definition & Agent Logic --- def analyze_video_veracity(video_url: str, specific_question: str = "", agent_config: dict = None) -> dict: """Tool to analyze video veracity.""" if agent_config is None: agent_config = {} loop = asyncio.get_event_loop() if loop.is_running(): import concurrent.futures with concurrent.futures.ThreadPoolExecutor() as pool: return pool.submit(asyncio.run, _analyze_video_async(video_url, specific_question, agent_config)).result() else: return asyncio.run(_analyze_video_async(video_url, specific_question, agent_config)) async def _analyze_video_async(video_url: str, context: str, agent_config: dict) -> dict: try: use_search = agent_config.get("use_search", False) use_code = agent_config.get("use_code", False) provider = agent_config.get("provider", "vertex") api_key = agent_config.get("api_key", os.getenv("GEMINI_API_KEY", "")) project_id = agent_config.get("project_id", os.getenv("VERTEX_PROJECT_ID", "")) location = agent_config.get("location", os.getenv("VERTEX_LOCATION", "us-central1")) model_name = agent_config.get("model_name", os.getenv("VERTEX_MODEL_NAME", "gemini-1.5-pro")) reasoning_method = agent_config.get("reasoning_method", "cot") prompt_template = agent_config.get("prompt_template", "standard") request_id = hashlib.md5(f"{video_url}_{datetime.datetime.now()}".encode()).hexdigest()[:10] assets = await common_utils.prepare_video_assets(video_url, request_id) # We need the prompt instructions try: from labeling_logic import PROMPT_VARIANTS sel_p = PROMPT_VARIANTS.get(prompt_template, PROMPT_VARIANTS['standard']) system_persona_txt = sel_p['instruction'] except Exception: system_persona_txt = "You are a Factuality Agent." system_persona = f"You are the LiarMP4 Verifier. Context: {context}\n\nPersona: {system_persona_txt}" trans = common_utils.parse_vtt(assets['transcript']) if assets.get('transcript') else "No transcript." final_result = None raw_toon_text = "" pipeline_logs =[] if provider == "gemini": if not api_key: return {"error": "Gemini API Key missing. Please provide it in the Inference Config."} gemini_config = {"api_key": api_key, "model_name": model_name, "max_retries": 3, "use_search": use_search, "use_code": use_code} async for chunk in inference_logic.run_gemini_labeling_pipeline( video_path=assets.get('video'), caption=assets.get('caption', ''), transcript=trans, gemini_config=gemini_config, include_comments=False, reasoning_method=reasoning_method, system_persona=system_persona, request_id=request_id ): if isinstance(chunk, str): pipeline_logs.append(chunk.strip()) elif isinstance(chunk, dict) and "parsed_data" in chunk: final_result = chunk["parsed_data"] raw_toon_text = chunk.get("raw_toon", "") else: if not project_id: return {"error": "Vertex Project ID missing. Please provide it in the Inference Config."} vertex_config = { "project_id": project_id, "location": location, "model_name": model_name, "max_retries": 3, "use_search": use_search, "use_code": use_code, "api_key": api_key } async for chunk in inference_logic.run_vertex_labeling_pipeline( video_path=assets.get('video'), caption=assets.get('caption', ''), transcript=trans, vertex_config=vertex_config, include_comments=False, reasoning_method=reasoning_method, system_persona=system_persona, request_id=request_id ): if isinstance(chunk, str): pipeline_logs.append(chunk.strip()) elif isinstance(chunk, dict) and "parsed_data" in chunk: final_result = chunk["parsed_data"] raw_toon_text = chunk.get("raw_toon", "") if final_result: # 1. Compare to GT Database gt_score = None manual_path = Path("data/manual_dataset.csv") if manual_path.exists(): for row in common_utils.robust_read_csv(manual_path): if common_utils.normalize_link(row.get('link', '')) == common_utils.normalize_link(video_url): try: gt_score = float(row.get('final_veracity_score', 0)) except: pass break # 2. Extract Data ai_score_val = final_result.get('final_assessment', {}).get('veracity_score_total', 0) try: ai_score = float(ai_score_val) except: ai_score = 0 reasoning = final_result.get('final_assessment', {}).get('reasoning', 'No reasoning provided.') vec = final_result.get('veracity_vectors', {}) mod = final_result.get('modalities', {}) fact = final_result.get('factuality_factors', {}) reply_text = f"[ANALYSIS COMPLETE]\nVideo: {video_url}\n\n" reply_text += "--- AGENT PIPELINE LOGS ---\n" reply_text += "\n".join([log for log in pipeline_logs if log]) + "\n\n" reply_text += f"Final Veracity Score: {ai_score}/100\n" reply_text += f"Reasoning: {reasoning}\n\n" reply_text += "--- VERACITY VECTORS ---\n" reply_text += f"Visual Integrity : {vec.get('visual_integrity_score', 'N/A')}\n" reply_text += f"Audio Integrity : {vec.get('audio_integrity_score', 'N/A')}\n" reply_text += f"Source Credibility : {vec.get('source_credibility_score', 'N/A')}\n" reply_text += f"Logical Consistency : {vec.get('logical_consistency_score', 'N/A')}\n" reply_text += f"Emotional Manipulation : {vec.get('emotional_manipulation_score', 'N/A')}\n\n" reply_text += "--- MODALITIES ---\n" reply_text += f"Video-Audio : {mod.get('video_audio_score', 'N/A')}\n" reply_text += f"Video-Caption : {mod.get('video_caption_score', 'N/A')}\n" reply_text += f"Audio-Caption : {mod.get('audio_caption_score', 'N/A')}\n" reply_text += "\n--- FACTUALITY FACTORS ---\n" reply_text += f"Claim Accuracy : {fact.get('claim_accuracy', 'N/A')}\n" reply_text += f"Evidence Gap : {fact.get('evidence_gap', 'N/A')}\n" reply_text += f"Grounding Check : {fact.get('grounding_check', 'N/A')}\n" if gt_score is not None: delta = abs(ai_score - gt_score) reply_text += f"\n--- GROUND TRUTH COMPARISON ---\n" reply_text += f"Verified GT Score : {gt_score}/100\n" reply_text += f"AI Generated Score : {ai_score}/100\n" reply_text += f"Accuracy Delta : {delta} points\n" reply_text += "\n--- RAW TOON OUTPUT ---\n" reply_text += f"{raw_toon_text}\n\n" config_params_str = json.dumps({"agent_active": True, "use_search": use_search, "use_code": use_code}) # 3. Save to Dataset properly to track agent config accuracy d_path = Path("data/dataset.csv") try: with open(d_path, 'a', newline='', encoding='utf-8') as f: row = { "id": request_id, "link": video_url, "timestamp": datetime.datetime.now().isoformat(), "caption": assets.get('caption', ''), "final_veracity_score": ai_score, "visual_score": final_result.get('veracity_vectors', {}).get('visual_integrity_score', 0), "audio_score": final_result.get('veracity_vectors', {}).get('audio_integrity_score', 0), "source_score": final_result.get('veracity_vectors', {}).get('source_credibility_score', 0), "logic_score": final_result.get('veracity_vectors', {}).get('logical_consistency_score', 0), "emotion_score": final_result.get('veracity_vectors', {}).get('emotional_manipulation_score', 0), "align_video_audio": final_result.get('modalities', {}).get('video_audio_score', 0), "align_video_caption": final_result.get('modalities', {}).get('video_caption_score', 0), "align_audio_caption": final_result.get('modalities', {}).get('audio_caption_score', 0), "classification": final_result.get('disinformation_analysis', {}).get('classification', 'None'), "reasoning": reasoning, "tags": ",".join(final_result.get('tags',[])), "raw_toon": raw_toon_text, "config_type": "A2A Agent", "config_model": model_name, "config_prompt": prompt_template, "config_reasoning": reasoning_method, "config_params": config_params_str } writer = csv.DictWriter(f, fieldnames=[ "id", "link", "timestamp", "caption", "final_veracity_score", "visual_score", "audio_score", "source_score", "logic_score", "emotion_score", "align_video_audio", "align_video_caption", "align_audio_caption", "classification", "reasoning", "tags", "raw_toon", "config_type", "config_model", "config_prompt", "config_reasoning", "config_params" ], extrasaction='ignore') if not d_path.exists() or d_path.stat().st_size == 0: writer.writeheader() writer.writerow(row) except Exception as e: logger.error(f"Failed writing A2A to dataset: {e}") # 4. Save Raw JSON AI-generated file exactly like the ingest queue try: ts_clean = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") flat_parsed = final_result.copy() flat_parsed["raw_toon"] = raw_toon_text flat_parsed["meta_info"] = { "id": request_id, "timestamp": datetime.datetime.now().isoformat(), "link": video_url, "prompt_used": "A2A Agent Prompt", "model_selection": provider, "config_type": "GenAI A2A", "config_model": model_name, "config_prompt": prompt_template, "config_reasoning": reasoning_method, "config_params": {"agent_active": True, "use_search": use_search, "use_code": use_code} } with open(Path(f"data/labels/{request_id}_{ts_clean}.json"), 'w', encoding='utf-8') as f: json.dump(flat_parsed, f, indent=2, ensure_ascii=False) except Exception as e: logger.error(f"Failed saving A2A raw JSON sidecar: {e}") reply_text += f"\n[Pipeline] Successfully parsed context, analyzed factuality, and saved raw AI Label File to Data Manager (Provider: {provider}, Model: {model_name}, Search: {use_search})." return {"text": reply_text, "data": final_result} return {"error": "Inference yielded no data or credentials missing."} except Exception as e: logger.error(f"[Tool Error] {e}") return {"error": str(e)} # --- Custom A2A App --- def create_a2a_app(): """Creates a robust Starlette/FastAPI app that implements core A2A JSON-RPC behavior.""" from fastapi import FastAPI, Request a2a_app = FastAPI(title="LiarMP4 A2A Agent") @a2a_app.post("/") @a2a_app.post("/jsonrpc") async def jsonrpc_handler(request: Request): try: data = await request.json() method = data.get("method", "agent.process") params = data.get("params", {}) input_text = "" agent_config = {} if isinstance(params, dict): input_text = params.get("input", params.get("text", params.get("query", params.get("prompt", "")))) agent_config = params.get("agent_config", {}) if not input_text and "url" in params: input_text = params["url"] elif isinstance(params, list) and len(params) > 0: if isinstance(params[0], dict): input_text = params[0].get("text", params[0].get("input", "")) else: input_text = str(params[0]) elif isinstance(params, str): input_text = params # Accept an array of standard agentic invocation methods accepted_methods =["agent.process", "agent.generate", "model.generate", "a2a.generate", "a2a.interact", "agent.interact"] if method in accepted_methods or not method: # Dynamic Setup & Config Management via Agent Conversation update_config = {} low_input = str(input_text).lower() if "set provider to " in low_input: val = low_input.split("set provider to ")[-1].strip().split()[0] if val in["gemini", "vertex"]: update_config["provider"] = val if "set api key to " in low_input: val = input_text.split("set api key to ")[-1].strip().split()[0] update_config["api_key"] = val if "set project id to " in low_input: val = input_text.split("set project id to ")[-1].strip().split()[0] update_config["project_id"] = val if update_config: return { "jsonrpc": "2.0", "id": data.get("id", 1), "result": { "text": f"✅ Agent configuration updated automatically ({', '.join(update_config.keys())}). You can now provide a video link or further instructions.", "update_config": update_config } } urls = re.findall(r'(https?://[^\s]+)', str(input_text)) if urls: url = urls[0] logger.info(f"Agent Processing Video URL: {url} | Config: {agent_config}") res = await _analyze_video_async(url, str(input_text), agent_config) if "error" in res: reply = f"Error analyzing video: {res['error']}" else: reply = res.get("text", "Processing finished but no reply generated.") else: # Agent Setup Guidance Logic provider = agent_config.get("provider", "vertex") api_key = agent_config.get("api_key", "") project_id = agent_config.get("project_id", "") base_capabilities = ( "**Agent Capabilities:**\n" "- Process raw video & audio modalities via A2A\n" "- Fetch & analyze comment sentiment and community context\n" "- Run full Factuality pipeline (FCoT) & Generate Veracity Vectors\n" "- Automatically save raw AI Labeled JSON files & sync to Data Manager\n" "- Verify and compare AI outputs against Ground Truth\n" "- Reprompt dynamically for missing scores or incomplete data\n\n" "**Easy Command:**\n" "Use `Run full pipeline on[URL]` to analyze a video, extract all vectors (source, logic, emotion, etc.), and save aligned files." ) if provider == 'vertex' and not project_id: reply = f"Welcome to the LiarMP4 Agent Nexus!\n\nIt looks like you haven't configured **Vertex AI** yet. Please enter your Google Cloud Project ID in the 'Inference Config' panel on the left, or tell me directly: *'set project id to[YOUR_PROJECT]'*.\n\n{base_capabilities}" elif provider == 'gemini' and not api_key: reply = f"👋 Welcome to the LiarMP4 Agent Nexus!\n\nIt looks like you haven't configured **Gemini** yet. Please enter your API Key in the 'Inference Config' panel on the left, or tell me directly: *'set api key to[YOUR_KEY]'*.\n\n{base_capabilities}" else: reply = f"✅ I am the LiarMP4 Verifier, fully configured ({provider.capitalize()}) and ready!\n\n{base_capabilities}" return { "jsonrpc": "2.0", "id": data.get("id", 1), "result": { "text": reply, "data": {"status": "success", "agent": "LiarMP4_A2A"} } } else: logger.warning(f"A2A Agent rejected unknown method: {method}") return { "jsonrpc": "2.0", "id": data.get("id", 1), "error": { "code": -32601, "message": f"Method '{method}' not found. Supported: {', '.join(accepted_methods)}" } } except Exception as e: logger.error(f"A2A Parse Error: {e}") return {"jsonrpc": "2.0", "id": None, "error": {"code": -32700, "message": "Parse error"}} logger.info("✅ A2A Custom Agent App created successfully.") return a2a_app