| | import os |
| | import sys |
| |
|
| | |
| | current_dir = os.path.dirname(os.path.abspath(__file__)) |
| | if current_dir not in sys.path: |
| | sys.path.append(current_dir) |
| |
|
| | import asyncio |
| | import subprocess |
| | from pathlib import Path |
| | import logging |
| | import csv |
| | import io |
| | import datetime |
| | import json |
| | import hashlib |
| | import re |
| | from fastapi import FastAPI, Request, Form, UploadFile, File, Body, HTTPException |
| | from fastapi.responses import HTMLResponse, StreamingResponse, PlainTextResponse, Response, FileResponse, JSONResponse |
| | from fastapi.templating import Jinja2Templates |
| | from fastapi.staticfiles import StaticFiles |
| | from fastapi.middleware.cors import CORSMiddleware |
| | import yt_dlp |
| | import inference_logic |
| | import factuality_logic |
| | import transcription |
| | import user_analysis_logic |
| | import agent_logic |
| | import common_utils |
| |
|
| | from toon_parser import parse_veracity_toon |
| | from labeling_logic import PROMPT_VARIANTS, LABELING_PROMPT_TEMPLATE, LABELING_PROMPT_TEMPLATE_NO_COT, FCOT_MACRO_PROMPT |
| | import benchmarking |
| |
|
| | logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s") |
| | logger = logging.getLogger(__name__) |
| |
|
| | LITE_MODE = os.getenv("LITE_MODE", "true").lower() == "true" |
| |
|
| | app = FastAPI() |
| |
|
| | app.add_middleware( |
| | CORSMiddleware, |
| | allow_origins=["*"], |
| | allow_credentials=True, |
| | allow_methods=["*"], |
| | allow_headers=["*"], |
| | ) |
| |
|
| | |
| | agent_mount_status = "pending" |
| | try: |
| | logger.info("Attempting to build A2A Agent App...") |
| | a2a_agent_app = agent_logic.create_a2a_app() |
| | if a2a_agent_app: |
| | app.mount("/a2a", a2a_agent_app) |
| | agent_mount_status = "success" |
| | logger.info("✅ A2A Agent App successfully mounted at /a2a") |
| | else: |
| | logger.warning("⚠️ Agent factory returned None. Mounting internal fallback.") |
| | from fastapi import FastAPI as InnerFastAPI |
| | fallback = InnerFastAPI() |
| | @fallback.post("/") |
| | @fallback.post("/jsonrpc") |
| | async def fallback_endpoint(request: Request): |
| | return {"jsonrpc": "2.0", "result": {"text": "Fallback Agent (Factory returned None)", "data": {"status": "fallback"}}} |
| | app.mount("/a2a", fallback) |
| | agent_mount_status = "fallback_none" |
| | except Exception as e: |
| | logger.critical(f"❌ Failed to mount A2A Agent: {e}", exc_info=True) |
| | from fastapi import FastAPI as InnerFastAPI |
| | fallback = InnerFastAPI() |
| | @fallback.post("/") |
| | @fallback.post("/jsonrpc") |
| | async def fallback_endpoint(request: Request): |
| | return {"jsonrpc": "2.0", "result": {"text": f"Emergency Agent (Mount Error: {str(e)})", "data": {"status": "error"}}} |
| | app.mount("/a2a", fallback) |
| | agent_mount_status = f"error_{str(e)}" |
| |
|
| | |
| | STATIC_DIR = "static" |
| | if os.path.isdir("/app/static"): |
| | STATIC_DIR = "/app/static" |
| | elif os.path.isdir("/usr/share/vchat/static"): |
| | STATIC_DIR = "/usr/share/vchat/static" |
| | elif os.path.isdir("frontend/dist"): |
| | STATIC_DIR = "frontend/dist" |
| | elif not os.path.isdir(STATIC_DIR): |
| | os.makedirs(STATIC_DIR, exist_ok=True) |
| | |
| | app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static") |
| |
|
| | |
| | assets_path = os.path.join(STATIC_DIR, "assets") |
| | if os.path.exists(assets_path): |
| | app.mount("/assets", StaticFiles(directory=assets_path), name="assets") |
| |
|
| | |
| | data_dirs =[ |
| | "data", "data/videos", "data/labels", "data/prompts", |
| | "data/responses", "metadata", "data/profiles", |
| | "data/comments", "data/mnl_labeled", "data/models/sandbox_autogluon" |
| | ] |
| |
|
| | for d in data_dirs: |
| | try: |
| | os.makedirs(d, exist_ok=True) |
| | except PermissionError as e: |
| | logger.warning(f"Permission denied creating directory {d}. Skipping. Error: {e}") |
| | except Exception as e: |
| | logger.warning(f"Failed to create directory {d}: {e}") |
| |
|
| | if os.path.isdir("data/videos"): |
| | app.mount("/videos", StaticFiles(directory="data/videos"), name="videos") |
| |
|
| | templates = Jinja2Templates(directory=STATIC_DIR) |
| |
|
| | try: |
| | csv.field_size_limit(sys.maxsize) |
| | except OverflowError: |
| | csv.field_size_limit(2147483647) |
| |
|
| | STOP_QUEUE_SIGNAL = False |
| |
|
| | |
| | QUEUE_COLUMNS =["link", "ingest_timestamp", "status", "task_type"] |
| |
|
| | GROUND_TRUTH_FIELDS =[ |
| | "id", "link", "timestamp", "caption", |
| | "visual_integrity_score", "audio_integrity_score", "source_credibility_score", |
| | "logical_consistency_score", "emotional_manipulation_score", |
| | "video_audio_score", "video_caption_score", "audio_caption_score", |
| | "final_veracity_score", "final_reasoning", |
| | "stats_likes", "stats_shares", "stats_comments", "stats_platform", |
| | "tags", "classification", "source" |
| | ] |
| |
|
| | DATASET_COLUMNS =[ |
| | "id", "link", "timestamp", "caption", |
| | "final_veracity_score", "visual_score", "audio_score", "source_score", "logic_score", "emotion_score", |
| | "align_video_audio", "align_video_caption", "align_audio_caption", |
| | "classification", "reasoning", "tags", "raw_toon", |
| | "config_type", "config_model", "config_prompt", "config_reasoning", "config_params" |
| | ] |
| |
|
| | def ensure_csv_schema(file_path: Path, fieldnames: list): |
| | if not file_path.exists(): return |
| | try: |
| | rows =[] |
| | with open(file_path, 'r', encoding='utf-8', errors='replace') as f: |
| | start_pos = f.tell() |
| | line = f.readline() |
| | if not line: return |
| | existing_header =[h.strip() for h in line.split(',')] |
| | missing =[col for col in fieldnames if col not in existing_header] |
| | if not missing: return |
| | f.seek(start_pos) |
| | dict_reader = csv.DictReader(f) |
| | rows = list(dict_reader) |
| |
|
| | with open(file_path, 'w', newline='', encoding='utf-8') as f: |
| | writer = csv.DictWriter(f, fieldnames=fieldnames, extrasaction='ignore') |
| | writer.writeheader() |
| | for row in rows: writer.writerow(row) |
| | except Exception as e: logger.error(f"Schema migration error: {e}") |
| |
|
| | def get_processed_indices(): |
| | processed_ids = set() |
| | processed_links = set() |
| | for filename in["data/dataset.csv", "data/manual_dataset.csv"]: |
| | path = Path(filename) |
| | for row in common_utils.robust_read_csv(path): |
| | if row.get('id'): processed_ids.add(row.get('id')) |
| | if row.get('link'): processed_links.add(common_utils.normalize_link(row.get('link'))) |
| | return processed_ids, processed_links |
| |
|
| | def check_if_processed(link: str, processed_ids=None, processed_links=None) -> bool: |
| | target_id = common_utils.extract_tweet_id(link) |
| | link_clean = common_utils.normalize_link(link) |
| | if processed_ids is None or processed_links is None: |
| | p_ids, p_links = get_processed_indices() |
| | else: p_ids, p_links = processed_ids, processed_links |
| | return (target_id and target_id in p_ids) or (link_clean and link_clean in p_links) |
| |
|
| | def update_queue_status(link: str, status: str, task_type: str = None): |
| | q_path = Path("data/batch_queue.csv") |
| | if not q_path.exists(): return |
| | rows =[] |
| | updated = False |
| | norm_target = common_utils.normalize_link(link) |
| | with open(q_path, 'r', encoding='utf-8') as f: |
| | reader = csv.DictReader(f) |
| | fieldnames = list(reader.fieldnames) if reader.fieldnames else list(QUEUE_COLUMNS) |
| | for f_name in QUEUE_COLUMNS: |
| | if f_name not in fieldnames: fieldnames.append(f_name) |
| | |
| | for row in reader: |
| | if "task_type" not in row or not row["task_type"]: row["task_type"] = "Ingest" |
| | if common_utils.normalize_link(row.get("link", "")) == norm_target: |
| | if task_type is None or row["task_type"] == task_type: |
| | row["status"] = status |
| | updated = True |
| | rows.append(row) |
| | |
| | if updated: |
| | with open(q_path, 'w', newline='', encoding='utf-8') as f: |
| | writer = csv.DictWriter(f, fieldnames=fieldnames, extrasaction='ignore') |
| | writer.writeheader() |
| | writer.writerows(rows) |
| |
|
| | def log_queue_error(link: str, error_msg: str, task_type: str = None): |
| | p = Path("data/queue_errors.csv") |
| | with open(p, 'a', newline='', encoding='utf-8') as f: |
| | writer = csv.writer(f) |
| | if not p.exists() or p.stat().st_size == 0: writer.writerow(["link", "timestamp", "error"]) |
| | writer.writerow([link, datetime.datetime.now().isoformat(), error_msg]) |
| | update_queue_status(link, "Error", task_type) |
| |
|
| | @app.on_event("startup") |
| | async def startup_event(): |
| | ensure_csv_schema(Path("data/dataset.csv"), DATASET_COLUMNS) |
| | ensure_csv_schema(Path("data/manual_dataset.csv"), GROUND_TRUTH_FIELDS) |
| | ensure_csv_schema(Path("data/batch_queue.csv"), QUEUE_COLUMNS) |
| | if not LITE_MODE: |
| | try: inference_logic.load_models() |
| | except Exception: pass |
| |
|
| | @app.get("/health") |
| | async def health_check(): |
| | return {"status": "ok", "agent_mount": agent_mount_status} |
| |
|
| | @app.get("/benchmarks/stats") |
| | async def get_benchmark_stats(): |
| | return benchmarking.calculate_benchmarks() |
| |
|
| | @app.get("/benchmarks/leaderboard") |
| | async def get_benchmark_leaderboard(): |
| | return benchmarking.generate_leaderboard() |
| |
|
| | @app.get("/config/prompts") |
| | async def list_prompts(): |
| | return[{"id": k, "name": v['description']} for k, v in PROMPT_VARIANTS.items()] |
| |
|
| | @app.get("/config/tags") |
| | async def list_configured_tags(): |
| | path = Path("data/tags.json") |
| | if path.exists(): |
| | with open(path, 'r') as f: return json.load(f) |
| | return {} |
| |
|
| | @app.post("/config/tags") |
| | async def save_configured_tags(tags: dict = Body(...)): |
| | path = Path("data/tags.json") |
| | with open(path, 'w', encoding='utf-8') as f: json.dump(tags, f, indent=2) |
| | return {"status": "success"} |
| |
|
| | @app.get("/tags/list") |
| | async def list_all_tags(): |
| | tags_count = {} |
| | path = Path("data/dataset.csv") |
| | if path.exists(): |
| | for row in common_utils.robust_read_csv(path): |
| | t_str = row.get("tags", "") |
| | if t_str: |
| | for t in t_str.split(','): |
| | t = t.strip() |
| | if t: tags_count[t] = tags_count.get(t, 0) + 1 |
| | sorted_tags = sorted(tags_count.items(), key=lambda x: x[1], reverse=True) |
| | return[{"name": k, "count": v} for k, v in sorted_tags] |
| |
|
| | @app.post("/extension/ingest") |
| | async def extension_ingest_link(request: Request): |
| | try: |
| | data = await request.json() |
| | link = data.get("link") |
| | comments = data.get("comments",[]) |
| | if not link: |
| | raise HTTPException(status_code=400, detail="Link required") |
| | |
| | q_path = Path("data/batch_queue.csv") |
| | existing = set() |
| | if q_path.exists(): |
| | for r in common_utils.robust_read_csv(q_path): existing.add(common_utils.normalize_link(r.get('link'))) |
| | |
| | normalized = common_utils.normalize_link(link) |
| | if normalized not in existing: |
| | with open(q_path, 'a', newline='', encoding='utf-8') as f: |
| | writer = csv.DictWriter(f, fieldnames=QUEUE_COLUMNS, extrasaction='ignore') |
| | if not q_path.exists() or q_path.stat().st_size == 0: writer.writeheader() |
| | writer.writerow({"link": link.strip(), "ingest_timestamp": datetime.datetime.now().isoformat(), "status": "Pending", "task_type": "Ingest"}) |
| | |
| | if comments: |
| | tid = common_utils.extract_tweet_id(link) or hashlib.md5(link.encode()).hexdigest()[:10] |
| | context_path = Path(f"data/comments/{tid}_ingest.json") |
| | with open(context_path, 'w', encoding='utf-8') as f: |
| | json.dump({ |
| | "link": link, |
| | "timestamp": datetime.datetime.now().isoformat(), |
| | "comments": comments |
| | }, f, indent=2) |
| | logger.info(f"Saved {len(comments)} comments for ingestion context: {tid}") |
| |
|
| | return {"status": "success", "link": link, "comments_saved": len(comments)} |
| | except Exception as e: |
| | logger.error(f"Ingest Error: {e}") |
| | return JSONResponse({"status": "error", "message": str(e)}, status_code=500) |
| |
|
| | @app.post("/manual/promote") |
| | async def promote_to_ground_truth(request: Request): |
| | try: |
| | data = await request.json() |
| | target_ids = data.get("ids",[]) |
| | if not target_ids and data.get("id"): target_ids =[data.get("id")] |
| | |
| | if not target_ids: return JSONResponse({"status": "error", "message": "No IDs provided"}, status_code=400) |
| |
|
| | ai_path = Path("data/dataset.csv") |
| | ai_rows = {} |
| | if ai_path.exists(): |
| | for row in common_utils.robust_read_csv(ai_path): |
| | if row.get('id'): ai_rows[str(row['id'])] = row |
| | |
| | manual_path = Path("data/manual_dataset.csv") |
| | manual_exists = manual_path.exists() |
| | existing_ids = set() |
| | if manual_exists: |
| | for row in common_utils.robust_read_csv(manual_path): |
| | if row.get('id'): existing_ids.add(str(row['id'])) |
| |
|
| | new_rows =[] |
| | promoted_count = 0 |
| | for tid in target_ids: |
| | tid_str = str(tid) |
| | if tid_str in existing_ids: continue |
| | found_row = ai_rows.get(tid_str) |
| | if found_row: |
| | mapped_row = { |
| | "id": found_row.get("id"), "link": found_row.get("link"), |
| | "timestamp": datetime.datetime.now().isoformat(), "caption": found_row.get("caption"), |
| | "visual_integrity_score": found_row.get("visual_score", 0), |
| | "audio_integrity_score": found_row.get("audio_score", 0), |
| | "source_credibility_score": 5, "logical_consistency_score": found_row.get("logic_score", 0), |
| | "emotional_manipulation_score": 5, "video_audio_score": 5, |
| | "video_caption_score": found_row.get("align_video_caption", 0), "audio_caption_score": 5, |
| | "final_veracity_score": found_row.get("final_veracity_score", 0), |
| | "final_reasoning": found_row.get("reasoning", ""), |
| | "stats_likes": 0, "stats_shares": 0, "stats_comments": 0, "stats_platform": "twitter", |
| | "tags": found_row.get("tags", ""), "classification": found_row.get("classification", "None"), |
| | "source": "manual_promoted" |
| | } |
| | new_rows.append(mapped_row) |
| | promoted_count += 1 |
| | existing_ids.add(tid_str) |
| |
|
| | if not new_rows: return {"status": "success", "promoted_count": 0} |
| |
|
| | mode = 'a' if manual_exists else 'w' |
| | with open(manual_path, mode, newline='', encoding='utf-8') as f: |
| | writer = csv.DictWriter(f, fieldnames=GROUND_TRUTH_FIELDS, extrasaction='ignore') |
| | if not manual_exists or manual_path.stat().st_size == 0: writer.writeheader() |
| | for r in new_rows: writer.writerow(r) |
| |
|
| | return {"status": "success", "promoted_count": promoted_count} |
| | except Exception as e: return JSONResponse({"status": "error", "message": str(e)}, status_code=500) |
| |
|
| | @app.post("/manual/delete") |
| | async def delete_ground_truth(request: Request): |
| | try: |
| | data = await request.json() |
| | target_ids = data.get("ids",[]) |
| | if not target_ids and data.get("id"): target_ids =[data.get("id")] |
| | if not target_ids: raise HTTPException(status_code=400) |
| | |
| | target_ids =[str(t) for t in target_ids] |
| | manual_path = Path("data/manual_dataset.csv") |
| | if not manual_path.exists(): return {"status": "error", "message": "File not found"} |
| |
|
| | rows =[] |
| | deleted_count = 0 |
| | with open(manual_path, 'r', encoding='utf-8') as f: |
| | reader = csv.DictReader(f) |
| | for row in reader: |
| | if str(row.get('id')) in target_ids: deleted_count += 1 |
| | else: rows.append(row) |
| | with open(manual_path, 'w', newline='', encoding='utf-8') as f: |
| | writer = csv.DictWriter(f, fieldnames=GROUND_TRUTH_FIELDS) |
| | writer.writeheader() |
| | writer.writerows(rows) |
| | |
| | return {"status": "success", "deleted_count": deleted_count} |
| | except Exception as e: return JSONResponse({"status": "error", "message": str(e)}, status_code=500) |
| |
|
| | @app.post("/manual/verify_queue") |
| | async def verify_queue_items(request: Request): |
| | try: |
| | data = await request.json() |
| | target_ids = data.get("ids",[]) |
| | resample_count = max(1, min(data.get("resample_count", 1), 100)) |
| | if not target_ids: return JSONResponse({"status": "error", "message": "No IDs provided"}, status_code=400) |
| | |
| | manual_path = Path("data/manual_dataset.csv") |
| | links_to_queue =[] |
| | if manual_path.exists(): |
| | for row in common_utils.robust_read_csv(manual_path): |
| | if str(row.get('id')) in target_ids: |
| | links_to_queue.append(row.get('link')) |
| | |
| | if not links_to_queue: |
| | return {"status": "error", "message": "No matching links found in Ground Truth."} |
| |
|
| | q_path = Path("data/batch_queue.csv") |
| | added_count = 0 |
| | with open(q_path, 'a', newline='', encoding='utf-8') as f: |
| | writer = csv.DictWriter(f, fieldnames=QUEUE_COLUMNS, extrasaction='ignore') |
| | if not q_path.exists() or q_path.stat().st_size == 0: writer.writeheader() |
| | for link in links_to_queue: |
| | for _ in range(resample_count): |
| | writer.writerow({ |
| | "link": link.strip(), |
| | "ingest_timestamp": datetime.datetime.now().isoformat(), |
| | "status": "Pending", |
| | "task_type": "Verify" |
| | }) |
| | added_count += 1 |
| | |
| | return {"status": "success", "queued_count": added_count, "message": f"Added {added_count} items to queue for verification pipeline."} |
| | except Exception as e: |
| | return JSONResponse({"status": "error", "message": str(e)}, status_code=500) |
| |
|
| | @app.get("/profiles/list") |
| | async def list_profiles(): |
| | profiles_dir = Path("data/profiles") |
| | profiles =[] |
| | if not profiles_dir.exists(): return profiles |
| | try: |
| | for d in profiles_dir.iterdir(): |
| | if d.is_dir(): |
| | hist = d / "history.csv" |
| | count = 0 |
| | if hist.exists(): |
| | with open(hist, 'r', encoding='utf-8', errors='ignore') as f: count = sum(1 for _ in f) - 1 |
| | profiles.append({"username": d.name, "posts_count": max(0, count)}) |
| | except Exception: pass |
| | return sorted(profiles, key=lambda x: x['username']) |
| |
|
| | @app.get("/profiles/{username}/posts") |
| | async def get_profile_posts(username: str): |
| | csv_path = Path(f"data/profiles/{username}/history.csv") |
| | posts =[] |
| | if not csv_path.exists(): return posts |
| | p_ids, p_links = get_processed_indices() |
| | try: |
| | for row in common_utils.robust_read_csv(csv_path): |
| | link = row.get('link', '') |
| | is_labeled = False |
| | t_id = common_utils.extract_tweet_id(link) |
| | if t_id and t_id in p_ids: is_labeled = True |
| | elif common_utils.normalize_link(link) in p_links: is_labeled = True |
| | row['is_labeled'] = is_labeled |
| | posts.append(row) |
| | except Exception: pass |
| | return posts |
| |
|
| | @app.post("/extension/ingest_user_history") |
| | async def ingest_user_history(request: Request): |
| | try: |
| | data = await request.json() |
| | username = data.get("username") |
| | posts = data.get("posts",[]) |
| | if not username or not posts: raise HTTPException(status_code=400) |
| | profile_dir = Path(f"data/profiles/{username}") |
| | profile_dir.mkdir(parents=True, exist_ok=True) |
| | csv_path = profile_dir / "history.csv" |
| | file_exists = csv_path.exists() |
| | existing = set() |
| | if file_exists: |
| | for row in common_utils.robust_read_csv(csv_path): existing.add(row.get('link')) |
| | |
| | with open(csv_path, 'a', newline='', encoding='utf-8') as f: |
| | fieldnames =["link", "timestamp", "text", "is_reply", "metric_replies", "metric_reposts", "metric_likes", "metric_views", "ingested_at"] |
| | writer = csv.DictWriter(f, fieldnames=fieldnames) |
| | if not file_exists: writer.writeheader() |
| | ts = datetime.datetime.now().isoformat() |
| | count = 0 |
| | for p in posts: |
| | if p['link'] not in existing: |
| | m = p.get('metrics', {}) |
| | writer.writerow({ |
| | "link": p.get('link'), "timestamp": p.get('timestamp'), |
| | "text": p.get('text', '').replace('\n', ' '), "is_reply": p.get('is_reply', False), |
| | "metric_replies": m.get('replies', 0), "metric_reposts": m.get('reposts', 0), |
| | "metric_likes": m.get('likes', 0), "metric_views": m.get('views', 0), |
| | "ingested_at": ts |
| | }) |
| | count += 1 |
| | return {"status": "success", "new_posts": count} |
| | except Exception as e: raise HTTPException(status_code=500, detail=str(e)) |
| |
|
| | @app.post("/extension/save_comments") |
| | async def extension_save_comments(request: Request): |
| | try: |
| | data = await request.json() |
| | link = data.get("link") |
| | comments = data.get("comments",[]) |
| | if not link: raise HTTPException(status_code=400) |
| | tweet_id = common_utils.extract_tweet_id(link) or hashlib.md5(link.encode()).hexdigest()[:10] |
| | csv_path = Path(f"data/comments/{tweet_id}.csv") |
| | with open(csv_path, 'w', newline='', encoding='utf-8') as f: |
| | writer = csv.DictWriter(f, fieldnames=["author", "text", "link", "timestamp"]) |
| | writer.writeheader() |
| | ts = datetime.datetime.now().isoformat() |
| | for c in comments: |
| | writer.writerow({ |
| | "author": c.get("author", "Unknown"), |
| | "text": c.get("text", "").replace("\n", " "), |
| | "link": c.get("link", ""), |
| | "timestamp": ts |
| | }) |
| | return {"status": "success", "count": len(comments)} |
| | except Exception as e: raise HTTPException(status_code=500, detail=str(e)) |
| |
|
| | @app.post("/extension/save_manual") |
| | @app.post("/manual/save") |
| | async def save_manual_label(request: Request): |
| | try: |
| | data = await request.json() |
| | link = data.get("link") |
| | if not link: |
| | return JSONResponse({"status": "error", "message": "Link required"}, status_code=400) |
| | |
| | tweet_id = common_utils.extract_tweet_id(link) or hashlib.md5(link.encode()).hexdigest()[:10] |
| | labels = data.get("labels", data) |
| |
|
| | row = { |
| | "id": tweet_id, "link": link, "timestamp": datetime.datetime.now().isoformat(), |
| | "caption": data.get("caption", ""), |
| | "visual_integrity_score": labels.get("visual_integrity_score", 0), |
| | "audio_integrity_score": labels.get("audio_integrity_score", 0), |
| | "source_credibility_score": labels.get("source_credibility_score", 0), |
| | "logical_consistency_score": labels.get("logical_consistency_score", 0), |
| | "emotional_manipulation_score": labels.get("emotional_manipulation_score", 5), |
| | "video_audio_score": labels.get("video_audio_score", 0), |
| | "video_caption_score": labels.get("video_caption_score", 0), |
| | "audio_caption_score": labels.get("audio_caption_score", 0), |
| | "final_veracity_score": labels.get("final_veracity_score", 0), |
| | "final_reasoning": labels.get("reasoning", labels.get("final_reasoning", "")), |
| | "stats_likes": 0, "stats_shares": 0, "stats_comments": 0, "stats_platform": "twitter", |
| | "tags": data.get("tags", labels.get("tags", "")), |
| | "classification": labels.get("classification", "None"), |
| | "source": "Manual" |
| | } |
| |
|
| | tag_str = str(row["tags"]) |
| | tag_list =[t.strip() for t in tag_str.split(',') if t.strip()] |
| |
|
| | deep_json = { |
| | "veracity_vectors": { |
| | "visual_integrity_score": str(row["visual_integrity_score"]), |
| | "audio_integrity_score": str(row["audio_integrity_score"]), |
| | "source_credibility_score": str(row["source_credibility_score"]), |
| | "logical_consistency_score": str(row["logical_consistency_score"]), |
| | "emotional_manipulation_score": str(row["emotional_manipulation_score"]) |
| | }, |
| | "modalities": { |
| | "video_audio_score": str(row["video_audio_score"]), |
| | "video_caption_score": str(row["video_caption_score"]), |
| | "audio_caption_score": str(row["audio_caption_score"]) |
| | }, |
| | "video_context_summary": row["caption"], |
| | "tags": tag_list, |
| | "factuality_factors": { |
| | "claim_accuracy": "Manual", |
| | "evidence_gap": "Manual Verification", |
| | "grounding_check": "Manual Verification" |
| | }, |
| | "disinformation_analysis": { |
| | "classification": row["classification"], |
| | "intent": "Manual Labeling", |
| | "threat_vector": "Manual Labeling" |
| | }, |
| | "final_assessment": { |
| | "veracity_score_total": str(row["final_veracity_score"]), |
| | "reasoning": row["final_reasoning"] |
| | }, |
| | "raw_parsed_structure": { |
| | "summary": {"text": row["caption"]}, |
| | "tags": {"keywords": row["tags"]}, |
| | "final": {"score": str(row["final_veracity_score"]), "reasoning": row["final_reasoning"]} |
| | }, |
| | "meta_info": { |
| | "id": tweet_id, |
| | "timestamp": row["timestamp"], |
| | "link": link, |
| | "model_selection": "Manual" |
| | } |
| | } |
| |
|
| | json_path_direct = Path(f"data/labels/{tweet_id}.json") |
| | with open(json_path_direct, 'w', encoding='utf-8') as jf: |
| | json.dump(deep_json, jf, indent=2, ensure_ascii=False) |
| | |
| | with open(Path(f"data/mnl_labeled/{tweet_id}.json"), 'w', encoding='utf-8') as jf: |
| | json.dump(row, jf, indent=2, ensure_ascii=False) |
| |
|
| | manual_path = Path("data/manual_dataset.csv") |
| | exists = manual_path.exists() |
| | ensure_csv_schema(manual_path, GROUND_TRUTH_FIELDS) |
| |
|
| | rows =[] |
| | found = False |
| | if exists: |
| | for r in common_utils.robust_read_csv(manual_path): |
| | if str(r.get('id')) == str(tweet_id): |
| | clean_row = {k: row.get(k, "") for k in GROUND_TRUTH_FIELDS} |
| | rows.append(clean_row) |
| | found = True |
| | else: |
| | rows.append(r) |
| | |
| | if not found: |
| | clean_row = {k: row.get(k, "") for k in GROUND_TRUTH_FIELDS} |
| | rows.append(clean_row) |
| | |
| | with open(manual_path, 'w', newline='', encoding='utf-8') as f: |
| | writer = csv.DictWriter(f, fieldnames=GROUND_TRUTH_FIELDS, extrasaction='ignore') |
| | writer.writeheader() |
| | writer.writerows(rows) |
| | |
| | |
| | author = common_utils.extract_twitter_username(link) |
| | if author: |
| | prof_dir = Path(f"data/profiles/{author}") |
| | prof_dir.mkdir(parents=True, exist_ok=True) |
| | hist_path = prof_dir / "history.csv" |
| | hist_exists = hist_path.exists() |
| | existing_links = set() |
| | if hist_exists: |
| | for r in common_utils.robust_read_csv(hist_path): |
| | existing_links.add(r.get('link')) |
| | if link not in existing_links: |
| | with open(hist_path, 'a', newline='', encoding='utf-8') as hf: |
| | fieldnames =["link", "timestamp", "text", "is_reply", "metric_replies", "metric_reposts", "metric_likes", "metric_views", "ingested_at"] |
| | hwriter = csv.DictWriter(hf, fieldnames=fieldnames, extrasaction='ignore') |
| | if not hist_exists: hwriter.writeheader() |
| | hwriter.writerow({ |
| | "link": link, |
| | "timestamp": row["timestamp"], |
| | "text": row["caption"], |
| | "ingested_at": row["timestamp"] |
| | }) |
| | |
| | update_queue_status(link, "Processed") |
| | return {"status": "success", "id": tweet_id} |
| | except Exception as e: |
| | logger.error(f"Save Manual Error: {e}") |
| | return JSONResponse({"status": "error", "message": str(e)}, status_code=500) |
| |
|
| | @app.get("/dataset/list") |
| | async def get_dataset_list(): |
| | dataset =[] |
| | m_path = Path("data/manual_dataset.csv") |
| | manual_ids = set() |
| | if m_path.exists(): |
| | for row in common_utils.robust_read_csv(m_path): |
| | row['source'] = 'Manual' |
| | if row.get('id'): manual_ids.add(str(row['id'])) |
| | dataset.append(row) |
| | |
| | path = Path("data/dataset.csv") |
| | if path.exists(): |
| | for row in common_utils.robust_read_csv(path): |
| | tid = str(row.get('id', '')) |
| | if tid not in manual_ids: |
| | row['source'] = 'AI' |
| | dataset.append(row) |
| | return sorted(dataset, key=lambda x: x.get('timestamp', ''), reverse=True) |
| |
|
| | @app.get("/analytics/account_integrity") |
| | async def get_account_integrity(): |
| | id_map = {} |
| | prof_dir = Path("data/profiles") |
| | prof_dir.mkdir(parents=True, exist_ok=True) |
| | |
| | existing_links_per_user = {} |
| | if prof_dir.exists(): |
| | for d in prof_dir.iterdir(): |
| | if d.is_dir(): |
| | hist_file = d / "history.csv" |
| | existing_links_per_user[d.name] = set() |
| | if hist_file.exists(): |
| | for row in common_utils.robust_read_csv(hist_file): |
| | link = row.get('link', '') |
| | tid = common_utils.extract_tweet_id(link) |
| | if tid: id_map[tid] = d.name |
| | existing_links_per_user[d.name].add(link) |
| |
|
| | scores_map = {} |
| | for fname in["data/dataset.csv", "data/manual_dataset.csv"]: |
| | path = Path(fname) |
| | if not path.exists(): continue |
| | for row in common_utils.robust_read_csv(path): |
| | tid = row.get('id') |
| | link = row.get('link', '') |
| | sc = row.get('final_veracity_score', '0') |
| | ts = row.get('timestamp', '') |
| | caption = row.get('caption', '') |
| | try: val = float(re.sub(r'[^\d.]', '', str(sc))) |
| | except: val = -1 |
| | |
| | |
| | if 0 <= val <= 100: |
| | auth = common_utils.extract_twitter_username(link) or id_map.get(tid, "Unknown") |
| | if auth and auth != "Unknown": |
| | if auth not in scores_map: scores_map[auth] = [] |
| | scores_map[auth].append({'val': val, 'ts': ts}) |
| | |
| | |
| | if auth not in existing_links_per_user: |
| | existing_links_per_user[auth] = set() |
| | Path(f"data/profiles/{auth}").mkdir(parents=True, exist_ok=True) |
| | |
| | if link not in existing_links_per_user[auth]: |
| | existing_links_per_user[auth].add(link) |
| | hist_path = Path(f"data/profiles/{auth}/history.csv") |
| | hist_exists = hist_path.exists() |
| | with open(hist_path, 'a', newline='', encoding='utf-8') as hf: |
| | fieldnames =["link", "timestamp", "text", "is_reply", "metric_replies", "metric_reposts", "metric_likes", "metric_views", "ingested_at"] |
| | hwriter = csv.DictWriter(hf, fieldnames=fieldnames, extrasaction='ignore') |
| | if not hist_exists: hwriter.writeheader() |
| | hwriter.writerow({ |
| | "link": link, |
| | "timestamp": ts, |
| | "text": caption, |
| | "ingested_at": ts |
| | }) |
| | |
| | results =[] |
| | for k, v in scores_map.items(): |
| | v_sorted = sorted(v, key=lambda x: x['ts'], reverse=True) |
| | decay_factor = 0.9 |
| | total_weight = 0 |
| | weighted_sum = 0 |
| | |
| | for i, item in enumerate(v_sorted): |
| | weight = decay_factor ** i |
| | weighted_sum += item['val'] * weight |
| | total_weight += weight |
| | |
| | avg_veracity = round(weighted_sum / total_weight, 1) if total_weight > 0 else 0 |
| | results.append({"username": k, "avg_veracity": avg_veracity, "posts_labeled": len(v)}) |
| | |
| | return sorted(results, key=lambda x: x['avg_veracity'], reverse=True) |
| |
|
| | @app.post("/queue/add") |
| | async def add_queue_item(link: str = Body(..., embed=True)): |
| | q_path = Path("data/batch_queue.csv") |
| | existing = set() |
| | if q_path.exists(): |
| | for r in common_utils.robust_read_csv(q_path): existing.add(common_utils.normalize_link(r.get('link'))) |
| | |
| | normalized = common_utils.normalize_link(link) |
| | if not normalized: raise HTTPException(status_code=400, detail="Invalid link") |
| | if normalized in existing: return {"status": "ignored", "message": "Link already in queue"} |
| | |
| | with open(q_path, 'a', newline='', encoding='utf-8') as f: |
| | writer = csv.DictWriter(f, fieldnames=QUEUE_COLUMNS, extrasaction='ignore') |
| | if not q_path.exists() or q_path.stat().st_size == 0: writer.writeheader() |
| | writer.writerow({"link": link.strip(), "ingest_timestamp": datetime.datetime.now().isoformat(), "status": "Pending", "task_type": "Ingest"}) |
| | return {"status": "success", "link": link} |
| |
|
| | @app.post("/queue/upload_csv") |
| | async def upload_csv(file: UploadFile = File(...)): |
| | contents = await file.read() |
| | lines = contents.decode('utf-8').splitlines() |
| | q_path = Path("data/batch_queue.csv") |
| | existing = set() |
| | if q_path.exists(): |
| | for r in common_utils.robust_read_csv(q_path): existing.add(common_utils.normalize_link(r.get('link'))) |
| | |
| | added = 0 |
| | with open(q_path, 'a', newline='', encoding='utf-8') as f: |
| | writer = csv.DictWriter(f, fieldnames=QUEUE_COLUMNS, extrasaction='ignore') |
| | if not q_path.exists() or q_path.stat().st_size == 0: writer.writeheader() |
| | for line in lines: |
| | if 'http' in line: |
| | raw = line.split(',')[0].strip() |
| | if common_utils.normalize_link(raw) not in existing: |
| | writer.writerow({"link": raw, "ingest_timestamp": datetime.datetime.now().isoformat(), "status": "Pending", "task_type": "Ingest"}) |
| | added += 1 |
| | return {"status": "success", "added_count": added} |
| |
|
| | @app.post("/queue/stop") |
| | async def stop_processing(): |
| | global STOP_QUEUE_SIGNAL |
| | STOP_QUEUE_SIGNAL = True |
| | return {"status": "success", "message": "Stopping queue processing..."} |
| |
|
| | @app.post("/queue/clear_processed") |
| | async def clear_processed_queue(): |
| | q_path = Path("data/batch_queue.csv") |
| | if not q_path.exists(): return {"status": "success", "removed_count": 0} |
| | p_ids, p_links = get_processed_indices() |
| | kept_rows =[] |
| | removed_count = 0 |
| | for row in common_utils.robust_read_csv(q_path): |
| | link = row.get("link") |
| | status = row.get("status", "Pending") |
| | task_type = row.get("task_type", "Ingest") |
| | |
| | is_done = False |
| | if status == "Processed": is_done = True |
| | elif task_type != "Verify" and check_if_processed(link, p_ids, p_links): is_done = True |
| | |
| | if is_done: removed_count += 1 |
| | else: kept_rows.append(row) |
| | with open(q_path, 'w', newline='', encoding='utf-8') as f: |
| | writer = csv.DictWriter(f, fieldnames=QUEUE_COLUMNS, extrasaction='ignore') |
| | writer.writeheader() |
| | writer.writerows(kept_rows) |
| | return {"status": "success", "removed_count": removed_count} |
| |
|
| | @app.post("/queue/delete") |
| | async def delete_queue_items(request: Request): |
| | try: |
| | data = await request.json() |
| | target_links = set(common_utils.normalize_link(l) for l in data.get("links",[])) |
| | q_path = Path("data/batch_queue.csv") |
| | if not q_path.exists(): return {"status": "success", "count": 0} |
| | kept_rows =[] |
| | deleted_count = 0 |
| | for row in common_utils.robust_read_csv(q_path): |
| | if common_utils.normalize_link(row.get('link')) in target_links: deleted_count += 1 |
| | else: kept_rows.append(row) |
| | with open(q_path, 'w', newline='', encoding='utf-8') as f: |
| | writer = csv.DictWriter(f, fieldnames=QUEUE_COLUMNS, extrasaction='ignore') |
| | writer.writeheader() |
| | writer.writerows(kept_rows) |
| | return {"status": "success", "count": deleted_count} |
| | except Exception as e: return JSONResponse({"status": "error", "message": str(e)}, status_code=500) |
| |
|
| | @app.post("/queue/requeue") |
| | async def requeue_items(request: Request): |
| | try: |
| | data = await request.json() |
| | target_links = set(common_utils.normalize_link(l) for l in data.get("links",[])) |
| | q_path = Path("data/batch_queue.csv") |
| | if not q_path.exists(): return {"status": "success", "count": 0} |
| | rows =[] |
| | requeued_count = 0 |
| | for row in common_utils.robust_read_csv(q_path): |
| | if common_utils.normalize_link(row.get('link')) in target_links: |
| | row['status'] = 'Pending' |
| | requeued_count += 1 |
| | rows.append(row) |
| | with open(q_path, 'w', newline='', encoding='utf-8') as f: |
| | writer = csv.DictWriter(f, fieldnames=QUEUE_COLUMNS, extrasaction='ignore') |
| | writer.writeheader() |
| | writer.writerows(rows) |
| | return {"status": "success", "count": requeued_count} |
| | except Exception as e: return JSONResponse({"status": "error", "message": str(e)}, status_code=500) |
| |
|
| | @app.post("/dataset/delete") |
| | async def delete_dataset_items(request: Request): |
| | try: |
| | data = await request.json() |
| | target_ids = data.get("ids",[]) |
| | if not target_ids: raise HTTPException(status_code=400) |
| | target_ids = set(str(t) for t in target_ids) |
| |
|
| | path = Path("data/dataset.csv") |
| | if not path.exists(): return {"status": "success", "count": 0} |
| |
|
| | rows =[] |
| | deleted_count = 0 |
| | for row in common_utils.robust_read_csv(path): |
| | if str(row.get('id')) in target_ids: |
| | deleted_count += 1 |
| | else: |
| | rows.append(row) |
| |
|
| | with open(path, 'w', newline='', encoding='utf-8') as f: |
| | writer = csv.DictWriter(f, fieldnames=DATASET_COLUMNS, extrasaction='ignore') |
| | writer.writeheader() |
| | writer.writerows(rows) |
| | |
| | return {"status": "success", "deleted_count": deleted_count} |
| | except Exception as e: |
| | return JSONResponse({"status": "error", "message": str(e)}, status_code=500) |
| |
|
| | @app.post("/analyze/user_context") |
| | async def analyze_user_context(request: Request): |
| | try: |
| | data = await request.json() |
| | rep = await user_analysis_logic.generate_user_profile_report(data.get("username")) |
| | return {"status": "success", "report": rep} |
| | except Exception as e: return JSONResponse({"error": str(e)}, status_code=500) |
| |
|
| | @app.get("/", response_class=HTMLResponse) |
| | async def read_root(request: Request): |
| | return templates.TemplateResponse("index.html", {"request": request}) |
| |
|
| | @app.get("/queue/list") |
| | async def get_queue_list(): |
| | q_path = Path("data/batch_queue.csv") |
| | items =[] |
| | p_ids, p_links = get_processed_indices() |
| | for row in common_utils.robust_read_csv(q_path): |
| | if row: |
| | l = row.get("link") |
| | status = row.get("status", "Pending") |
| | task_type = row.get("task_type") or "Ingest" |
| | |
| | if status == "Pending" and task_type != "Verify" and check_if_processed(l, p_ids, p_links): status = "Processed" |
| | |
| | |
| | comments =[] |
| | tid = common_utils.extract_tweet_id(l) or hashlib.md5(l.encode()).hexdigest()[:10] |
| | c_path = Path(f"data/comments/{tid}_ingest.json") |
| | if c_path.exists(): |
| | try: |
| | with open(c_path, 'r') as f: |
| | c_data = json.load(f) |
| | comments = c_data.get('comments',[]) |
| | except Exception: |
| | pass |
| | |
| | items.append({ |
| | "link": l, |
| | "timestamp": row.get("ingest_timestamp",""), |
| | "status": status, |
| | "task_type": task_type, |
| | "comments": comments |
| | }) |
| | return items |
| |
|
| | @app.post("/queue/run") |
| | async def run_queue_processing( |
| | model_selection: str = Form(...), |
| | gemini_api_key: str = Form(""), gemini_model_name: str = Form(""), |
| | vertex_project_id: str = Form(""), vertex_location: str = Form(""), vertex_model_name: str = Form(""), vertex_api_key: str = Form(""), |
| | nrp_api_key: str = Form(""), nrp_model_name: str = Form(""), nrp_base_url: str = Form("https://ellm.nrp-nautilus.io/v1"), |
| | include_comments: bool = Form(False), reasoning_method: str = Form("cot"), prompt_template: str = Form("standard"), |
| | custom_query: str = Form(""), max_reprompts: int = Form(1), |
| | use_search: bool = Form(False), use_code: bool = Form(False) |
| | ): |
| | global STOP_QUEUE_SIGNAL |
| | STOP_QUEUE_SIGNAL = False |
| | |
| | gemini_config = {"api_key": gemini_api_key, "model_name": gemini_model_name, "max_retries": max_reprompts, "use_search": use_search, "use_code": use_code} |
| | vertex_config = {"project_id": vertex_project_id, "location": vertex_location, "model_name": vertex_model_name, "api_key": vertex_api_key, "max_retries": max_reprompts, "use_search": use_search, "use_code": use_code} |
| | nrp_config = {"api_key": nrp_api_key, "model_name": nrp_model_name, "base_url": nrp_base_url, "max_retries": max_reprompts, "use_search": use_search, "use_code": use_code} |
| |
|
| | sel_p = PROMPT_VARIANTS.get(prompt_template, PROMPT_VARIANTS['standard']) |
| | system_persona_txt = sel_p['instruction'] |
| | if custom_query.strip(): system_persona_txt += f"\n\nSPECIAL INSTRUCTION FOR THIS BATCH: {custom_query}" |
| | |
| | if model_selection == 'vertex': |
| | active_config = vertex_config |
| | active_model_name = vertex_model_name |
| | elif model_selection == 'nrp': |
| | active_config = nrp_config |
| | active_model_name = nrp_model_name |
| | else: |
| | active_config = gemini_config |
| | active_model_name = gemini_model_name |
| |
|
| | config_params_dict = { |
| | "reprompts": max_reprompts, |
| | "include_comments": include_comments, |
| | "agent_active": False, |
| | "use_search": use_search, |
| | "use_code": use_code |
| | } |
| | config_params_str = json.dumps(config_params_dict) |
| |
|
| | async def queue_stream(): |
| | q_path = Path("data/batch_queue.csv") |
| | items =[r for r in common_utils.robust_read_csv(q_path) if r.get("link") and r.get("status", "Pending") == "Pending"] |
| | p_ids, p_links = get_processed_indices() |
| | yield f"data:[SYSTEM] Persona: {sel_p['description']}\n\n" |
| | |
| | for item in items: |
| | link = item.get("link") |
| | task_type = item.get("task_type") or "Ingest" |
| | |
| | if STOP_QUEUE_SIGNAL: |
| | yield f"data:[SYSTEM] Stopping by user request.\n\n" |
| | break |
| | |
| | if task_type != "Verify" and check_if_processed(link, p_ids, p_links): |
| | update_queue_status(link, "Processed", task_type) |
| | continue |
| | |
| | gt_data = None |
| | if task_type == "Verify": |
| | manual_path = Path("data/manual_dataset.csv") |
| | if manual_path.exists(): |
| | for row in common_utils.robust_read_csv(manual_path): |
| | if common_utils.normalize_link(row.get('link', '')) == common_utils.normalize_link(link): |
| | gt_data = row |
| | break |
| |
|
| | yield f"data:[START] {link} (Type: {task_type})\n\n" |
| | tid = common_utils.extract_tweet_id(link) or hashlib.md5(link.encode()).hexdigest()[:10] |
| | assets = await common_utils.prepare_video_assets(link, tid) |
| | |
| | if not assets or (not assets.get('video') and not assets.get('caption')): |
| | log_queue_error(link, "Download/Fetch Error", task_type) |
| | yield f"data: - Download Error.\n\n" |
| | continue |
| |
|
| | trans = common_utils.parse_vtt(assets['transcript']) if assets.get('transcript') else "No transcript (Audio/Video missing)." |
| | video_file = assets.get('video') |
| | if not video_file: |
| | yield f"data: - No video found. Text-only analysis.\n\n" |
| | video_file = None |
| | else: yield f"data: - Video found. Inferencing...\n\n" |
| | |
| | comments_path = Path(f"data/comments/{tid}_ingest.json") |
| | current_system_persona = system_persona_txt |
| | if comments_path.exists(): |
| | try: |
| | with open(comments_path, 'r') as f: |
| | c_data = json.load(f) |
| | comments = c_data.get('comments',[]) |
| | if comments: |
| | yield f"data: - Found {len(comments)} comments. Generating Community Context...\n\n" |
| | community_summary = await inference_logic.generate_community_summary(comments, model_selection, active_config) |
| | current_system_persona += f"\n\n### COMMUNITY NOTES / CONTEXT (from Comments):\n{community_summary}\n\nUse this community context to cross-reference claims but remain objective." |
| | yield f"data: - Context Generated.\n\n" |
| | except Exception as e: |
| | logger.error(f"Error processing comments for context: {e}") |
| |
|
| | res_data = None |
| | if model_selection == 'gemini': |
| | async for chunk in inference_logic.run_gemini_labeling_pipeline(video_file, assets['caption'], trans, gemini_config, include_comments, reasoning_method, current_system_persona, request_id=tid): |
| | if isinstance(chunk, str): yield f"data: - {chunk}\n\n" |
| | else: res_data = chunk |
| | elif model_selection == 'vertex': |
| | async for chunk in inference_logic.run_vertex_labeling_pipeline(video_file, assets['caption'], trans, vertex_config, include_comments, reasoning_method, current_system_persona, request_id=tid): |
| | if isinstance(chunk, str): yield f"data: - {chunk}\n\n" |
| | else: res_data = chunk |
| | elif model_selection == 'nrp': |
| | async for chunk in inference_logic.run_nrp_labeling_pipeline(video_file, assets['caption'], trans, nrp_config, include_comments, reasoning_method, current_system_persona, request_id=tid): |
| | if isinstance(chunk, str): yield f"data: - {chunk}\n\n" |
| | else: res_data = chunk |
| |
|
| | if res_data and "parsed_data" in res_data: |
| | parsed = res_data["parsed_data"] |
| | d_path = Path("data/dataset.csv") |
| | ensure_csv_schema(d_path, DATASET_COLUMNS) |
| | exists = d_path.exists() |
| | |
| | ai_score_val = parsed['final_assessment'].get('veracity_score_total', 0) |
| | try: ai_score = float(ai_score_val) |
| | except: ai_score = 0 |
| | |
| | if task_type == "Verify" and gt_data is not None: |
| | gt_final = float(gt_data.get('final_veracity_score', 0)) |
| | delta = abs(ai_score - gt_final) |
| | vec_ai = parsed.get('veracity_vectors', {}) |
| | mod_ai = parsed.get('modalities', {}) |
| | |
| | def s_float(v): |
| | try: return float(v) |
| | except: return 0.0 |
| |
|
| | yield f"data: -[VERIFICATION PIPELINE] Configuration Analysis:\n" |
| | yield f"data: Model: {active_model_name} | Provider: {model_selection}\n" |
| | yield f"data: Reasoning: {reasoning_method} | Prompt: {prompt_template} | Reprompts: {max_reprompts}\n" |
| | yield f"data: -[VERIFICATION SCORES COMPARISON (AI vs Ground Truth)]\n" |
| | yield f"data: Visual Integrity : AI {s_float(vec_ai.get('visual_integrity_score'))} | GT {s_float(gt_data.get('visual_integrity_score'))}\n" |
| | yield f"data: Audio Integrity : AI {s_float(vec_ai.get('audio_integrity_score'))} | GT {s_float(gt_data.get('audio_integrity_score'))}\n" |
| | yield f"data: Source Credibility : AI {s_float(vec_ai.get('source_credibility_score'))} | GT {s_float(gt_data.get('source_credibility_score'))}\n" |
| | yield f"data: Logical Consistency: AI {s_float(vec_ai.get('logical_consistency_score'))} | GT {s_float(gt_data.get('logical_consistency_score'))}\n" |
| | yield f"data: Emotional Manipul. : AI {s_float(vec_ai.get('emotional_manipulation_score'))} | GT {s_float(gt_data.get('emotional_manipulation_score'))}\n" |
| | yield f"data: Video-Audio Align : AI {s_float(mod_ai.get('video_audio_score'))} | GT {s_float(gt_data.get('video_audio_score'))}\n" |
| | yield f"data: Video-Caption Align: AI {s_float(mod_ai.get('video_caption_score'))} | GT {s_float(gt_data.get('video_caption_score'))}\n" |
| | yield f"data: Audio-Caption Align: AI {s_float(mod_ai.get('audio_caption_score'))} | GT {s_float(gt_data.get('audio_caption_score'))}\n" |
| | yield f"data: FINAL VERACITY : AI {ai_score} | GT {gt_final} | Delta: {delta}\n\n" |
| | |
| | comp_path = Path("data/comparison.csv") |
| | comp_exists = comp_path.exists() |
| | with open(comp_path, 'a', newline='', encoding='utf-8') as cf: |
| | cw = csv.DictWriter(cf, fieldnames=["id", "link", "timestamp", "gt_score", "ai_score", "delta", "model", "prompt", "reasoning_method"]) |
| | if not comp_exists: cw.writeheader() |
| | cw.writerow({ |
| | "id": tid, "link": link, "timestamp": datetime.datetime.now().isoformat(), |
| | "gt_score": gt_final, "ai_score": ai_score, "delta": delta, |
| | "model": active_model_name, "prompt": prompt_template, "reasoning_method": reasoning_method |
| | }) |
| | |
| | try: |
| | with open(d_path, 'a', newline='', encoding='utf-8') as f: |
| | row = { |
| | "id": tid, "link": link, "timestamp": datetime.datetime.now().isoformat(), |
| | "caption": assets['caption'], |
| | "final_veracity_score": ai_score, |
| | "visual_score": parsed['veracity_vectors'].get('visual_integrity_score', 0), |
| | "audio_score": parsed['veracity_vectors'].get('audio_integrity_score', 0), |
| | "source_score": parsed['veracity_vectors'].get('source_credibility_score', 0), |
| | "logic_score": parsed['veracity_vectors'].get('logical_consistency_score', 0), |
| | "emotion_score": parsed['veracity_vectors'].get('emotional_manipulation_score', 0), |
| | "align_video_audio": parsed['modalities'].get('video_audio_score', 0), |
| | "align_video_caption": parsed['modalities'].get('video_caption_score', 0), |
| | "align_audio_caption": parsed['modalities'].get('audio_caption_score', 0), |
| | "classification": parsed['disinformation_analysis'].get('classification', 'None'), |
| | "reasoning": parsed['final_assessment'].get('reasoning', ''), |
| | "tags": ",".join(parsed.get('tags',[])), |
| | "raw_toon": res_data.get("raw_toon", ""), |
| | "config_type": "GenAI", |
| | "config_model": active_model_name, |
| | "config_prompt": prompt_template, |
| | "config_reasoning": reasoning_method, |
| | "config_params": config_params_str |
| | } |
| | writer = csv.DictWriter(f, fieldnames=DATASET_COLUMNS, extrasaction='ignore') |
| | if not exists: writer.writeheader() |
| | writer.writerow(row) |
| | except Exception as csv_err: logger.error(f"CSV Write Failed: {csv_err}") |
| |
|
| | try: |
| | ts = datetime.datetime.now().isoformat() |
| | ts_clean = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") |
| | flat_parsed = parsed.copy() |
| | flat_parsed["raw_toon"] = res_data.get("raw_toon", "") |
| | flat_parsed["meta_info"] = { |
| | "id": tid, "timestamp": ts, "link": link, |
| | "prompt_used": res_data.get("prompt_used", ""), |
| | "model_selection": model_selection, |
| | "config_type": "GenAI", |
| | "config_model": active_model_name, |
| | "config_prompt": prompt_template, |
| | "config_reasoning": reasoning_method, |
| | "config_params": config_params_dict |
| | } |
| | with open(Path(f"data/labels/{tid}_{ts_clean}.json"), 'w', encoding='utf-8') as f: json.dump(flat_parsed, f, indent=2, ensure_ascii=False) |
| | except Exception as e: logger.error(f"Sidecar Error: {e}") |
| |
|
| | |
| | author = common_utils.extract_twitter_username(link) |
| | if author: |
| | prof_dir = Path(f"data/profiles/{author}") |
| | prof_dir.mkdir(parents=True, exist_ok=True) |
| | hist_path = prof_dir / "history.csv" |
| | hist_exists = hist_path.exists() |
| | existing_links = set() |
| | if hist_exists: |
| | for r in common_utils.robust_read_csv(hist_path): |
| | existing_links.add(r.get('link')) |
| | if link not in existing_links: |
| | with open(hist_path, 'a', newline='', encoding='utf-8') as hf: |
| | fieldnames =["link", "timestamp", "text", "is_reply", "metric_replies", "metric_reposts", "metric_likes", "metric_views", "ingested_at"] |
| | hwriter = csv.DictWriter(hf, fieldnames=fieldnames, extrasaction='ignore') |
| | if not hist_exists: hwriter.writeheader() |
| | hwriter.writerow({ |
| | "link": link, |
| | "timestamp": datetime.datetime.now().isoformat(), |
| | "text": assets['caption'], |
| | "ingested_at": datetime.datetime.now().isoformat() |
| | }) |
| |
|
| | p_ids.add(tid) |
| | p_links.add(common_utils.normalize_link(link)) |
| | update_queue_status(link, "Processed", task_type) |
| | yield f"data:[SUCCESS] Saved.\n\n" |
| | else: |
| | err_msg = res_data.get('error') if isinstance(res_data, dict) else "Inference failed" |
| | log_queue_error(link, err_msg, task_type) |
| | yield f"data: [FAIL] {err_msg}.\n\n" |
| | await asyncio.sleep(0.5) |
| | yield "event: close\ndata: Done\n\n" |
| |
|
| | return StreamingResponse(queue_stream(), media_type="text/event-stream") |