Spaces:
Sleeping
Sleeping
| import logging | |
| import os | |
| import asyncio | |
| import hashlib | |
| import datetime | |
| import json | |
| import re | |
| import csv | |
| from pathlib import Path | |
| from typing import Any, Dict | |
| logger = logging.getLogger(__name__) | |
| # Apply nested asyncio if possible | |
| try: | |
| import nest_asyncio | |
| nest_asyncio.apply() | |
| except (ValueError, ImportError): | |
| pass | |
| import common_utils | |
| import inference_logic | |
| # --- Tool Definition & Agent Logic --- | |
| def analyze_video_veracity(video_url: str, specific_question: str = "", agent_config: dict = None) -> dict: | |
| """Tool to analyze video veracity.""" | |
| if agent_config is None: agent_config = {} | |
| loop = asyncio.get_event_loop() | |
| if loop.is_running(): | |
| import concurrent.futures | |
| with concurrent.futures.ThreadPoolExecutor() as pool: | |
| return pool.submit(asyncio.run, _analyze_video_async(video_url, specific_question, agent_config)).result() | |
| else: | |
| return asyncio.run(_analyze_video_async(video_url, specific_question, agent_config)) | |
| async def _analyze_video_async(video_url: str, context: str, agent_config: dict) -> dict: | |
| try: | |
| use_search = agent_config.get("use_search", False) | |
| use_code = agent_config.get("use_code", False) | |
| provider = agent_config.get("provider", "vertex") | |
| api_key = agent_config.get("api_key", os.getenv("GEMINI_API_KEY", "")) | |
| project_id = agent_config.get("project_id", os.getenv("VERTEX_PROJECT_ID", "")) | |
| location = agent_config.get("location", os.getenv("VERTEX_LOCATION", "us-central1")) | |
| model_name = agent_config.get("model_name", os.getenv("VERTEX_MODEL_NAME", "gemini-1.5-pro")) | |
| reasoning_method = agent_config.get("reasoning_method", "cot") | |
| prompt_template = agent_config.get("prompt_template", "standard") | |
| request_id = hashlib.md5(f"{video_url}_{datetime.datetime.now()}".encode()).hexdigest()[:10] | |
| assets = await common_utils.prepare_video_assets(video_url, request_id) | |
| # We need the prompt instructions | |
| try: | |
| from labeling_logic import PROMPT_VARIANTS | |
| sel_p = PROMPT_VARIANTS.get(prompt_template, PROMPT_VARIANTS['standard']) | |
| system_persona_txt = sel_p['instruction'] | |
| except Exception: | |
| system_persona_txt = "You are a Factuality Agent." | |
| system_persona = f"You are the LiarMP4 Verifier. Context: {context}\n\nPersona: {system_persona_txt}" | |
| trans = common_utils.parse_vtt(assets['transcript']) if assets.get('transcript') else "No transcript." | |
| final_result = None | |
| raw_toon_text = "" | |
| pipeline_logs =[] | |
| if provider == "gemini": | |
| if not api_key: | |
| return {"error": "Gemini API Key missing. Please provide it in the Inference Config."} | |
| gemini_config = {"api_key": api_key, "model_name": model_name, "max_retries": 3, "use_search": use_search, "use_code": use_code} | |
| async for chunk in inference_logic.run_gemini_labeling_pipeline( | |
| video_path=assets.get('video'), | |
| caption=assets.get('caption', ''), | |
| transcript=trans, | |
| gemini_config=gemini_config, | |
| include_comments=False, | |
| reasoning_method=reasoning_method, | |
| system_persona=system_persona, | |
| request_id=request_id | |
| ): | |
| if isinstance(chunk, str): | |
| pipeline_logs.append(chunk.strip()) | |
| elif isinstance(chunk, dict) and "parsed_data" in chunk: | |
| final_result = chunk["parsed_data"] | |
| raw_toon_text = chunk.get("raw_toon", "") | |
| else: | |
| if not project_id: | |
| return {"error": "Vertex Project ID missing. Please provide it in the Inference Config."} | |
| vertex_config = { | |
| "project_id": project_id, | |
| "location": location, | |
| "model_name": model_name, | |
| "max_retries": 3, | |
| "use_search": use_search, | |
| "use_code": use_code, | |
| "api_key": api_key | |
| } | |
| async for chunk in inference_logic.run_vertex_labeling_pipeline( | |
| video_path=assets.get('video'), | |
| caption=assets.get('caption', ''), | |
| transcript=trans, | |
| vertex_config=vertex_config, | |
| include_comments=False, | |
| reasoning_method=reasoning_method, | |
| system_persona=system_persona, | |
| request_id=request_id | |
| ): | |
| if isinstance(chunk, str): | |
| pipeline_logs.append(chunk.strip()) | |
| elif isinstance(chunk, dict) and "parsed_data" in chunk: | |
| final_result = chunk["parsed_data"] | |
| raw_toon_text = chunk.get("raw_toon", "") | |
| if final_result: | |
| # 1. Compare to GT Database | |
| gt_score = None | |
| manual_path = Path("data/manual_dataset.csv") | |
| if manual_path.exists(): | |
| for row in common_utils.robust_read_csv(manual_path): | |
| if common_utils.normalize_link(row.get('link', '')) == common_utils.normalize_link(video_url): | |
| try: gt_score = float(row.get('final_veracity_score', 0)) | |
| except: pass | |
| break | |
| # 2. Extract Data | |
| ai_score_val = final_result.get('final_assessment', {}).get('veracity_score_total', 0) | |
| try: ai_score = float(ai_score_val) | |
| except: ai_score = 0 | |
| reasoning = final_result.get('final_assessment', {}).get('reasoning', 'No reasoning provided.') | |
| vec = final_result.get('veracity_vectors', {}) | |
| mod = final_result.get('modalities', {}) | |
| fact = final_result.get('factuality_factors', {}) | |
| reply_text = f"[ANALYSIS COMPLETE]\nVideo: {video_url}\n\n" | |
| reply_text += "--- AGENT PIPELINE LOGS ---\n" | |
| reply_text += "\n".join([log for log in pipeline_logs if log]) + "\n\n" | |
| reply_text += f"Final Veracity Score: {ai_score}/100\n" | |
| reply_text += f"Reasoning: {reasoning}\n\n" | |
| reply_text += "--- VERACITY VECTORS ---\n" | |
| reply_text += f"Visual Integrity : {vec.get('visual_integrity_score', 'N/A')}\n" | |
| reply_text += f"Audio Integrity : {vec.get('audio_integrity_score', 'N/A')}\n" | |
| reply_text += f"Source Credibility : {vec.get('source_credibility_score', 'N/A')}\n" | |
| reply_text += f"Logical Consistency : {vec.get('logical_consistency_score', 'N/A')}\n" | |
| reply_text += f"Emotional Manipulation : {vec.get('emotional_manipulation_score', 'N/A')}\n\n" | |
| reply_text += "--- MODALITIES ---\n" | |
| reply_text += f"Video-Audio : {mod.get('video_audio_score', 'N/A')}\n" | |
| reply_text += f"Video-Caption : {mod.get('video_caption_score', 'N/A')}\n" | |
| reply_text += f"Audio-Caption : {mod.get('audio_caption_score', 'N/A')}\n" | |
| reply_text += "\n--- FACTUALITY FACTORS ---\n" | |
| reply_text += f"Claim Accuracy : {fact.get('claim_accuracy', 'N/A')}\n" | |
| reply_text += f"Evidence Gap : {fact.get('evidence_gap', 'N/A')}\n" | |
| reply_text += f"Grounding Check : {fact.get('grounding_check', 'N/A')}\n" | |
| if gt_score is not None: | |
| delta = abs(ai_score - gt_score) | |
| reply_text += f"\n--- GROUND TRUTH COMPARISON ---\n" | |
| reply_text += f"Verified GT Score : {gt_score}/100\n" | |
| reply_text += f"AI Generated Score : {ai_score}/100\n" | |
| reply_text += f"Accuracy Delta : {delta} points\n" | |
| reply_text += "\n--- RAW TOON OUTPUT ---\n" | |
| reply_text += f"{raw_toon_text}\n\n" | |
| config_params_str = json.dumps({"agent_active": True, "use_search": use_search, "use_code": use_code}) | |
| # 3. Save to Dataset properly to track agent config accuracy | |
| d_path = Path("data/dataset.csv") | |
| try: | |
| with open(d_path, 'a', newline='', encoding='utf-8') as f: | |
| row = { | |
| "id": request_id, "link": video_url, "timestamp": datetime.datetime.now().isoformat(), | |
| "caption": assets.get('caption', ''), | |
| "final_veracity_score": ai_score, | |
| "visual_score": final_result.get('veracity_vectors', {}).get('visual_integrity_score', 0), | |
| "audio_score": final_result.get('veracity_vectors', {}).get('audio_integrity_score', 0), | |
| "source_score": final_result.get('veracity_vectors', {}).get('source_credibility_score', 0), | |
| "logic_score": final_result.get('veracity_vectors', {}).get('logical_consistency_score', 0), | |
| "emotion_score": final_result.get('veracity_vectors', {}).get('emotional_manipulation_score', 0), | |
| "align_video_audio": final_result.get('modalities', {}).get('video_audio_score', 0), | |
| "align_video_caption": final_result.get('modalities', {}).get('video_caption_score', 0), | |
| "align_audio_caption": final_result.get('modalities', {}).get('audio_caption_score', 0), | |
| "classification": final_result.get('disinformation_analysis', {}).get('classification', 'None'), | |
| "reasoning": reasoning, | |
| "tags": ",".join(final_result.get('tags',[])), | |
| "raw_toon": raw_toon_text, | |
| "config_type": "A2A Agent", | |
| "config_model": model_name, | |
| "config_prompt": prompt_template, | |
| "config_reasoning": reasoning_method, | |
| "config_params": config_params_str | |
| } | |
| writer = csv.DictWriter(f, fieldnames=[ | |
| "id", "link", "timestamp", "caption", | |
| "final_veracity_score", "visual_score", "audio_score", "source_score", "logic_score", "emotion_score", | |
| "align_video_audio", "align_video_caption", "align_audio_caption", | |
| "classification", "reasoning", "tags", "raw_toon", | |
| "config_type", "config_model", "config_prompt", "config_reasoning", "config_params" | |
| ], extrasaction='ignore') | |
| if not d_path.exists() or d_path.stat().st_size == 0: writer.writeheader() | |
| writer.writerow(row) | |
| except Exception as e: | |
| logger.error(f"Failed writing A2A to dataset: {e}") | |
| # 4. Save Raw JSON AI-generated file exactly like the ingest queue | |
| try: | |
| ts_clean = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") | |
| flat_parsed = final_result.copy() | |
| flat_parsed["raw_toon"] = raw_toon_text | |
| flat_parsed["meta_info"] = { | |
| "id": request_id, "timestamp": datetime.datetime.now().isoformat(), "link": video_url, | |
| "prompt_used": "A2A Agent Prompt", | |
| "model_selection": provider, | |
| "config_type": "GenAI A2A", | |
| "config_model": model_name, | |
| "config_prompt": prompt_template, | |
| "config_reasoning": reasoning_method, | |
| "config_params": {"agent_active": True, "use_search": use_search, "use_code": use_code} | |
| } | |
| with open(Path(f"data/labels/{request_id}_{ts_clean}.json"), 'w', encoding='utf-8') as f: | |
| json.dump(flat_parsed, f, indent=2, ensure_ascii=False) | |
| except Exception as e: | |
| logger.error(f"Failed saving A2A raw JSON sidecar: {e}") | |
| reply_text += f"\n[Pipeline] Successfully parsed context, analyzed factuality, and saved raw AI Label File to Data Manager (Provider: {provider}, Model: {model_name}, Search: {use_search})." | |
| return {"text": reply_text, "data": final_result} | |
| return {"error": "Inference yielded no data or credentials missing."} | |
| except Exception as e: | |
| logger.error(f"[Tool Error] {e}") | |
| return {"error": str(e)} | |
| # --- Custom A2A App --- | |
| def create_a2a_app(): | |
| """Creates a robust Starlette/FastAPI app that implements core A2A JSON-RPC behavior.""" | |
| from fastapi import FastAPI, Request | |
| a2a_app = FastAPI(title="LiarMP4 A2A Agent") | |
| async def jsonrpc_handler(request: Request): | |
| try: | |
| data = await request.json() | |
| method = data.get("method", "agent.process") | |
| params = data.get("params", {}) | |
| input_text = "" | |
| agent_config = {} | |
| if isinstance(params, dict): | |
| input_text = params.get("input", params.get("text", params.get("query", params.get("prompt", "")))) | |
| agent_config = params.get("agent_config", {}) | |
| if not input_text and "url" in params: | |
| input_text = params["url"] | |
| elif isinstance(params, list) and len(params) > 0: | |
| if isinstance(params[0], dict): | |
| input_text = params[0].get("text", params[0].get("input", "")) | |
| else: | |
| input_text = str(params[0]) | |
| elif isinstance(params, str): | |
| input_text = params | |
| # Accept an array of standard agentic invocation methods | |
| accepted_methods =["agent.process", "agent.generate", "model.generate", "a2a.generate", "a2a.interact", "agent.interact"] | |
| if method in accepted_methods or not method: | |
| # Dynamic Setup & Config Management via Agent Conversation | |
| update_config = {} | |
| low_input = str(input_text).lower() | |
| if "set provider to " in low_input: | |
| val = low_input.split("set provider to ")[-1].strip().split()[0] | |
| if val in["gemini", "vertex"]: update_config["provider"] = val | |
| if "set api key to " in low_input: | |
| val = input_text.split("set api key to ")[-1].strip().split()[0] | |
| update_config["api_key"] = val | |
| if "set project id to " in low_input: | |
| val = input_text.split("set project id to ")[-1].strip().split()[0] | |
| update_config["project_id"] = val | |
| if update_config: | |
| return { | |
| "jsonrpc": "2.0", "id": data.get("id", 1), | |
| "result": { | |
| "text": f"✅ Agent configuration updated automatically ({', '.join(update_config.keys())}). You can now provide a video link or further instructions.", | |
| "update_config": update_config | |
| } | |
| } | |
| urls = re.findall(r'(https?://[^\s]+)', str(input_text)) | |
| if urls: | |
| url = urls[0] | |
| logger.info(f"Agent Processing Video URL: {url} | Config: {agent_config}") | |
| res = await _analyze_video_async(url, str(input_text), agent_config) | |
| if "error" in res: | |
| reply = f"Error analyzing video: {res['error']}" | |
| else: | |
| reply = res.get("text", "Processing finished but no reply generated.") | |
| else: | |
| # Agent Setup Guidance Logic | |
| provider = agent_config.get("provider", "vertex") | |
| api_key = agent_config.get("api_key", "") | |
| project_id = agent_config.get("project_id", "") | |
| base_capabilities = ( | |
| "**Agent Capabilities:**\n" | |
| "- Process raw video & audio modalities via A2A\n" | |
| "- Fetch & analyze comment sentiment and community context\n" | |
| "- Run full Factuality pipeline (FCoT) & Generate Veracity Vectors\n" | |
| "- Automatically save raw AI Labeled JSON files & sync to Data Manager\n" | |
| "- Verify and compare AI outputs against Ground Truth\n" | |
| "- Reprompt dynamically for missing scores or incomplete data\n\n" | |
| "**Easy Command:**\n" | |
| "Use `Run full pipeline on[URL]` to analyze a video, extract all vectors (source, logic, emotion, etc.), and save aligned files." | |
| ) | |
| if provider == 'vertex' and not project_id: | |
| reply = f"Welcome to the LiarMP4 Agent Nexus!\n\nIt looks like you haven't configured **Vertex AI** yet. Please enter your Google Cloud Project ID in the 'Inference Config' panel on the left, or tell me directly: *'set project id to[YOUR_PROJECT]'*.\n\n{base_capabilities}" | |
| elif provider == 'gemini' and not api_key: | |
| reply = f"👋 Welcome to the LiarMP4 Agent Nexus!\n\nIt looks like you haven't configured **Gemini** yet. Please enter your API Key in the 'Inference Config' panel on the left, or tell me directly: *'set api key to[YOUR_KEY]'*.\n\n{base_capabilities}" | |
| else: | |
| reply = f"✅ I am the LiarMP4 Verifier, fully configured ({provider.capitalize()}) and ready!\n\n{base_capabilities}" | |
| return { | |
| "jsonrpc": "2.0", | |
| "id": data.get("id", 1), | |
| "result": { | |
| "text": reply, | |
| "data": {"status": "success", "agent": "LiarMP4_A2A"} | |
| } | |
| } | |
| else: | |
| logger.warning(f"A2A Agent rejected unknown method: {method}") | |
| return { | |
| "jsonrpc": "2.0", | |
| "id": data.get("id", 1), | |
| "error": { | |
| "code": -32601, | |
| "message": f"Method '{method}' not found. Supported: {', '.join(accepted_methods)}" | |
| } | |
| } | |
| except Exception as e: | |
| logger.error(f"A2A Parse Error: {e}") | |
| return {"jsonrpc": "2.0", "id": None, "error": {"code": -32700, "message": "Parse error"}} | |
| logger.info("✅ A2A Custom Agent App created successfully.") | |
| return a2a_app | |