| | import logging |
| | import os |
| | import asyncio |
| | import hashlib |
| | import datetime |
| | import json |
| | import re |
| | import csv |
| | from pathlib import Path |
| | from typing import Any, Dict |
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| | |
| | try: |
| | import nest_asyncio |
| | nest_asyncio.apply() |
| | except (ValueError, ImportError): |
| | pass |
| |
|
| | import common_utils |
| | import inference_logic |
| |
|
| | |
| |
|
| | def analyze_video_veracity(video_url: str, specific_question: str = "", agent_config: dict = None) -> dict: |
| | """Tool to analyze video veracity.""" |
| | if agent_config is None: agent_config = {} |
| | loop = asyncio.get_event_loop() |
| | if loop.is_running(): |
| | import concurrent.futures |
| | with concurrent.futures.ThreadPoolExecutor() as pool: |
| | return pool.submit(asyncio.run, _analyze_video_async(video_url, specific_question, agent_config)).result() |
| | else: |
| | return asyncio.run(_analyze_video_async(video_url, specific_question, agent_config)) |
| |
|
| | async def _analyze_video_async(video_url: str, context: str, agent_config: dict) -> dict: |
| | try: |
| | use_search = agent_config.get("use_search", False) |
| | use_code = agent_config.get("use_code", False) |
| | provider = agent_config.get("provider", "vertex") |
| | api_key = agent_config.get("api_key", os.getenv("GEMINI_API_KEY", "")) |
| | project_id = agent_config.get("project_id", os.getenv("VERTEX_PROJECT_ID", "")) |
| | location = agent_config.get("location", os.getenv("VERTEX_LOCATION", "us-central1")) |
| | model_name = agent_config.get("model_name", os.getenv("VERTEX_MODEL_NAME", "gemini-1.5-pro")) |
| | reasoning_method = agent_config.get("reasoning_method", "cot") |
| | prompt_template = agent_config.get("prompt_template", "standard") |
| | |
| | request_id = hashlib.md5(f"{video_url}_{datetime.datetime.now()}".encode()).hexdigest()[:10] |
| | assets = await common_utils.prepare_video_assets(video_url, request_id) |
| | |
| | |
| | try: |
| | from labeling_logic import PROMPT_VARIANTS |
| | sel_p = PROMPT_VARIANTS.get(prompt_template, PROMPT_VARIANTS['standard']) |
| | system_persona_txt = sel_p['instruction'] |
| | except Exception: |
| | system_persona_txt = "You are a Factuality Agent." |
| | |
| | system_persona = f"You are the LiarMP4 Verifier. Context: {context}\n\nPersona: {system_persona_txt}" |
| | |
| | trans = common_utils.parse_vtt(assets['transcript']) if assets.get('transcript') else "No transcript." |
| |
|
| | final_result = None |
| | raw_toon_text = "" |
| | pipeline_logs =[] |
| | |
| | if provider == "gemini": |
| | if not api_key: |
| | return {"error": "Gemini API Key missing. Please provide it in the Inference Config."} |
| | gemini_config = {"api_key": api_key, "model_name": model_name, "max_retries": 3, "use_search": use_search, "use_code": use_code} |
| | async for chunk in inference_logic.run_gemini_labeling_pipeline( |
| | video_path=assets.get('video'), |
| | caption=assets.get('caption', ''), |
| | transcript=trans, |
| | gemini_config=gemini_config, |
| | include_comments=False, |
| | reasoning_method=reasoning_method, |
| | system_persona=system_persona, |
| | request_id=request_id |
| | ): |
| | if isinstance(chunk, str): |
| | pipeline_logs.append(chunk.strip()) |
| | elif isinstance(chunk, dict) and "parsed_data" in chunk: |
| | final_result = chunk["parsed_data"] |
| | raw_toon_text = chunk.get("raw_toon", "") |
| | else: |
| | if not project_id: |
| | return {"error": "Vertex Project ID missing. Please provide it in the Inference Config."} |
| | vertex_config = { |
| | "project_id": project_id, |
| | "location": location, |
| | "model_name": model_name, |
| | "max_retries": 3, |
| | "use_search": use_search, |
| | "use_code": use_code, |
| | "api_key": api_key |
| | } |
| | async for chunk in inference_logic.run_vertex_labeling_pipeline( |
| | video_path=assets.get('video'), |
| | caption=assets.get('caption', ''), |
| | transcript=trans, |
| | vertex_config=vertex_config, |
| | include_comments=False, |
| | reasoning_method=reasoning_method, |
| | system_persona=system_persona, |
| | request_id=request_id |
| | ): |
| | if isinstance(chunk, str): |
| | pipeline_logs.append(chunk.strip()) |
| | elif isinstance(chunk, dict) and "parsed_data" in chunk: |
| | final_result = chunk["parsed_data"] |
| | raw_toon_text = chunk.get("raw_toon", "") |
| | |
| | if final_result: |
| | |
| | gt_score = None |
| | manual_path = Path("data/manual_dataset.csv") |
| | if manual_path.exists(): |
| | for row in common_utils.robust_read_csv(manual_path): |
| | if common_utils.normalize_link(row.get('link', '')) == common_utils.normalize_link(video_url): |
| | try: gt_score = float(row.get('final_veracity_score', 0)) |
| | except: pass |
| | break |
| |
|
| | |
| | ai_score_val = final_result.get('final_assessment', {}).get('veracity_score_total', 0) |
| | try: ai_score = float(ai_score_val) |
| | except: ai_score = 0 |
| | |
| | reasoning = final_result.get('final_assessment', {}).get('reasoning', 'No reasoning provided.') |
| |
|
| | vec = final_result.get('veracity_vectors', {}) |
| | mod = final_result.get('modalities', {}) |
| | fact = final_result.get('factuality_factors', {}) |
| |
|
| | reply_text = f"[ANALYSIS COMPLETE]\nVideo: {video_url}\n\n" |
| | reply_text += "--- AGENT PIPELINE LOGS ---\n" |
| | reply_text += "\n".join([log for log in pipeline_logs if log]) + "\n\n" |
| |
|
| | reply_text += f"Final Veracity Score: {ai_score}/100\n" |
| | reply_text += f"Reasoning: {reasoning}\n\n" |
| | |
| | reply_text += "--- VERACITY VECTORS ---\n" |
| | reply_text += f"Visual Integrity : {vec.get('visual_integrity_score', 'N/A')}\n" |
| | reply_text += f"Audio Integrity : {vec.get('audio_integrity_score', 'N/A')}\n" |
| | reply_text += f"Source Credibility : {vec.get('source_credibility_score', 'N/A')}\n" |
| | reply_text += f"Logical Consistency : {vec.get('logical_consistency_score', 'N/A')}\n" |
| | reply_text += f"Emotional Manipulation : {vec.get('emotional_manipulation_score', 'N/A')}\n\n" |
| | |
| | reply_text += "--- MODALITIES ---\n" |
| | reply_text += f"Video-Audio : {mod.get('video_audio_score', 'N/A')}\n" |
| | reply_text += f"Video-Caption : {mod.get('video_caption_score', 'N/A')}\n" |
| | reply_text += f"Audio-Caption : {mod.get('audio_caption_score', 'N/A')}\n" |
| |
|
| | reply_text += "\n--- FACTUALITY FACTORS ---\n" |
| | reply_text += f"Claim Accuracy : {fact.get('claim_accuracy', 'N/A')}\n" |
| | reply_text += f"Evidence Gap : {fact.get('evidence_gap', 'N/A')}\n" |
| | reply_text += f"Grounding Check : {fact.get('grounding_check', 'N/A')}\n" |
| |
|
| | if gt_score is not None: |
| | delta = abs(ai_score - gt_score) |
| | reply_text += f"\n--- GROUND TRUTH COMPARISON ---\n" |
| | reply_text += f"Verified GT Score : {gt_score}/100\n" |
| | reply_text += f"AI Generated Score : {ai_score}/100\n" |
| | reply_text += f"Accuracy Delta : {delta} points\n" |
| | |
| | reply_text += "\n--- RAW TOON OUTPUT ---\n" |
| | reply_text += f"{raw_toon_text}\n\n" |
| |
|
| | config_params_str = json.dumps({"agent_active": True, "use_search": use_search, "use_code": use_code}) |
| | |
| | |
| | d_path = Path("data/dataset.csv") |
| | try: |
| | with open(d_path, 'a', newline='', encoding='utf-8') as f: |
| | row = { |
| | "id": request_id, "link": video_url, "timestamp": datetime.datetime.now().isoformat(), |
| | "caption": assets.get('caption', ''), |
| | "final_veracity_score": ai_score, |
| | "visual_score": final_result.get('veracity_vectors', {}).get('visual_integrity_score', 0), |
| | "audio_score": final_result.get('veracity_vectors', {}).get('audio_integrity_score', 0), |
| | "source_score": final_result.get('veracity_vectors', {}).get('source_credibility_score', 0), |
| | "logic_score": final_result.get('veracity_vectors', {}).get('logical_consistency_score', 0), |
| | "emotion_score": final_result.get('veracity_vectors', {}).get('emotional_manipulation_score', 0), |
| | "align_video_audio": final_result.get('modalities', {}).get('video_audio_score', 0), |
| | "align_video_caption": final_result.get('modalities', {}).get('video_caption_score', 0), |
| | "align_audio_caption": final_result.get('modalities', {}).get('audio_caption_score', 0), |
| | "classification": final_result.get('disinformation_analysis', {}).get('classification', 'None'), |
| | "reasoning": reasoning, |
| | "tags": ",".join(final_result.get('tags',[])), |
| | "raw_toon": raw_toon_text, |
| | "config_type": "A2A Agent", |
| | "config_model": model_name, |
| | "config_prompt": prompt_template, |
| | "config_reasoning": reasoning_method, |
| | "config_params": config_params_str |
| | } |
| | writer = csv.DictWriter(f, fieldnames=[ |
| | "id", "link", "timestamp", "caption", |
| | "final_veracity_score", "visual_score", "audio_score", "source_score", "logic_score", "emotion_score", |
| | "align_video_audio", "align_video_caption", "align_audio_caption", |
| | "classification", "reasoning", "tags", "raw_toon", |
| | "config_type", "config_model", "config_prompt", "config_reasoning", "config_params" |
| | ], extrasaction='ignore') |
| | if not d_path.exists() or d_path.stat().st_size == 0: writer.writeheader() |
| | writer.writerow(row) |
| | except Exception as e: |
| | logger.error(f"Failed writing A2A to dataset: {e}") |
| |
|
| | |
| | try: |
| | ts_clean = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") |
| | flat_parsed = final_result.copy() |
| | flat_parsed["raw_toon"] = raw_toon_text |
| | flat_parsed["meta_info"] = { |
| | "id": request_id, "timestamp": datetime.datetime.now().isoformat(), "link": video_url, |
| | "prompt_used": "A2A Agent Prompt", |
| | "model_selection": provider, |
| | "config_type": "GenAI A2A", |
| | "config_model": model_name, |
| | "config_prompt": prompt_template, |
| | "config_reasoning": reasoning_method, |
| | "config_params": {"agent_active": True, "use_search": use_search, "use_code": use_code} |
| | } |
| | with open(Path(f"data/labels/{request_id}_{ts_clean}.json"), 'w', encoding='utf-8') as f: |
| | json.dump(flat_parsed, f, indent=2, ensure_ascii=False) |
| | except Exception as e: |
| | logger.error(f"Failed saving A2A raw JSON sidecar: {e}") |
| |
|
| | reply_text += f"\n[Pipeline] Successfully parsed context, analyzed factuality, and saved raw AI Label File to Data Manager (Provider: {provider}, Model: {model_name}, Search: {use_search})." |
| |
|
| | return {"text": reply_text, "data": final_result} |
| | |
| | return {"error": "Inference yielded no data or credentials missing."} |
| |
|
| | except Exception as e: |
| | logger.error(f"[Tool Error] {e}") |
| | return {"error": str(e)} |
| |
|
| | |
| | def create_a2a_app(): |
| | """Creates a robust Starlette/FastAPI app that implements core A2A JSON-RPC behavior.""" |
| | from fastapi import FastAPI, Request |
| | |
| | a2a_app = FastAPI(title="LiarMP4 A2A Agent") |
| |
|
| | @a2a_app.post("/") |
| | @a2a_app.post("/jsonrpc") |
| | async def jsonrpc_handler(request: Request): |
| | try: |
| | data = await request.json() |
| | method = data.get("method", "agent.process") |
| | params = data.get("params", {}) |
| | |
| | input_text = "" |
| | agent_config = {} |
| | if isinstance(params, dict): |
| | input_text = params.get("input", params.get("text", params.get("query", params.get("prompt", "")))) |
| | agent_config = params.get("agent_config", {}) |
| | if not input_text and "url" in params: |
| | input_text = params["url"] |
| | elif isinstance(params, list) and len(params) > 0: |
| | if isinstance(params[0], dict): |
| | input_text = params[0].get("text", params[0].get("input", "")) |
| | else: |
| | input_text = str(params[0]) |
| | elif isinstance(params, str): |
| | input_text = params |
| | |
| | |
| | accepted_methods =["agent.process", "agent.generate", "model.generate", "a2a.generate", "a2a.interact", "agent.interact"] |
| | |
| | if method in accepted_methods or not method: |
| | |
| | |
| | update_config = {} |
| | low_input = str(input_text).lower() |
| | if "set provider to " in low_input: |
| | val = low_input.split("set provider to ")[-1].strip().split()[0] |
| | if val in["gemini", "vertex"]: update_config["provider"] = val |
| | if "set api key to " in low_input: |
| | val = input_text.split("set api key to ")[-1].strip().split()[0] |
| | update_config["api_key"] = val |
| | if "set project id to " in low_input: |
| | val = input_text.split("set project id to ")[-1].strip().split()[0] |
| | update_config["project_id"] = val |
| | |
| | if update_config: |
| | return { |
| | "jsonrpc": "2.0", "id": data.get("id", 1), |
| | "result": { |
| | "text": f"✅ Agent configuration updated automatically ({', '.join(update_config.keys())}). You can now provide a video link or further instructions.", |
| | "update_config": update_config |
| | } |
| | } |
| |
|
| | urls = re.findall(r'(https?://[^\s]+)', str(input_text)) |
| | |
| | if urls: |
| | url = urls[0] |
| | logger.info(f"Agent Processing Video URL: {url} | Config: {agent_config}") |
| | res = await _analyze_video_async(url, str(input_text), agent_config) |
| | |
| | if "error" in res: |
| | reply = f"Error analyzing video: {res['error']}" |
| | else: |
| | reply = res.get("text", "Processing finished but no reply generated.") |
| | else: |
| | |
| | provider = agent_config.get("provider", "vertex") |
| | api_key = agent_config.get("api_key", "") |
| | project_id = agent_config.get("project_id", "") |
| | |
| | base_capabilities = ( |
| | "**Agent Capabilities:**\n" |
| | "- Process raw video & audio modalities via A2A\n" |
| | "- Fetch & analyze comment sentiment and community context\n" |
| | "- Run full Factuality pipeline (FCoT) & Generate Veracity Vectors\n" |
| | "- Automatically save raw AI Labeled JSON files & sync to Data Manager\n" |
| | "- Verify and compare AI outputs against Ground Truth\n" |
| | "- Reprompt dynamically for missing scores or incomplete data\n\n" |
| | "**Easy Command:**\n" |
| | "Use `Run full pipeline on[URL]` to analyze a video, extract all vectors (source, logic, emotion, etc.), and save aligned files." |
| | ) |
| | |
| | if provider == 'vertex' and not project_id: |
| | reply = f"Welcome to the LiarMP4 Agent Nexus!\n\nIt looks like you haven't configured **Vertex AI** yet. Please enter your Google Cloud Project ID in the 'Inference Config' panel on the left, or tell me directly: *'set project id to[YOUR_PROJECT]'*.\n\n{base_capabilities}" |
| | elif provider == 'gemini' and not api_key: |
| | reply = f"👋 Welcome to the LiarMP4 Agent Nexus!\n\nIt looks like you haven't configured **Gemini** yet. Please enter your API Key in the 'Inference Config' panel on the left, or tell me directly: *'set api key to[YOUR_KEY]'*.\n\n{base_capabilities}" |
| | else: |
| | reply = f"✅ I am the LiarMP4 Verifier, fully configured ({provider.capitalize()}) and ready!\n\n{base_capabilities}" |
| |
|
| | return { |
| | "jsonrpc": "2.0", |
| | "id": data.get("id", 1), |
| | "result": { |
| | "text": reply, |
| | "data": {"status": "success", "agent": "LiarMP4_A2A"} |
| | } |
| | } |
| | else: |
| | logger.warning(f"A2A Agent rejected unknown method: {method}") |
| | return { |
| | "jsonrpc": "2.0", |
| | "id": data.get("id", 1), |
| | "error": { |
| | "code": -32601, |
| | "message": f"Method '{method}' not found. Supported: {', '.join(accepted_methods)}" |
| | } |
| | } |
| | except Exception as e: |
| | logger.error(f"A2A Parse Error: {e}") |
| | return {"jsonrpc": "2.0", "id": None, "error": {"code": -32700, "message": "Parse error"}} |
| | |
| | logger.info("✅ A2A Custom Agent App created successfully.") |
| | return a2a_app |
| |
|