Spaces:
Sleeping
Sleeping
| import os | |
| import sys | |
| # Ensure 'src' is in sys.path so sibling imports work | |
| current_dir = os.path.dirname(os.path.abspath(__file__)) | |
| if current_dir not in sys.path: | |
| sys.path.append(current_dir) | |
| import asyncio | |
| import subprocess | |
| from pathlib import Path | |
| import logging | |
| import csv | |
| import io | |
| import datetime | |
| import json | |
| import hashlib | |
| import re | |
| from fastapi import FastAPI, Request, Form, UploadFile, File, Body, HTTPException | |
| from fastapi.responses import HTMLResponse, StreamingResponse, PlainTextResponse, Response, FileResponse, JSONResponse | |
| from fastapi.templating import Jinja2Templates | |
| from fastapi.staticfiles import StaticFiles | |
| from fastapi.middleware.cors import CORSMiddleware | |
| import yt_dlp | |
| import inference_logic | |
| import factuality_logic | |
| import transcription | |
| import user_analysis_logic | |
| import agent_logic | |
| import common_utils | |
| from toon_parser import parse_veracity_toon | |
| from labeling_logic import PROMPT_VARIANTS, LABELING_PROMPT_TEMPLATE, LABELING_PROMPT_TEMPLATE_NO_COT, FCOT_MACRO_PROMPT | |
| import benchmarking | |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s") | |
| logger = logging.getLogger(__name__) | |
| LITE_MODE = os.getenv("LITE_MODE", "true").lower() == "true" | |
| app = FastAPI() | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # --- CRITICAL: Mount A2A Agent Application FIRST --- | |
| agent_mount_status = "pending" | |
| try: | |
| logger.info("Attempting to build A2A Agent App...") | |
| a2a_agent_app = agent_logic.create_a2a_app() | |
| if a2a_agent_app: | |
| app.mount("/a2a", a2a_agent_app) | |
| agent_mount_status = "success" | |
| logger.info("✅ A2A Agent App successfully mounted at /a2a") | |
| else: | |
| logger.warning("⚠️ Agent factory returned None. Mounting internal fallback.") | |
| from fastapi import FastAPI as InnerFastAPI | |
| fallback = InnerFastAPI() | |
| async def fallback_endpoint(request: Request): | |
| return {"jsonrpc": "2.0", "result": {"text": "Fallback Agent (Factory returned None)", "data": {"status": "fallback"}}} | |
| app.mount("/a2a", fallback) | |
| agent_mount_status = "fallback_none" | |
| except Exception as e: | |
| logger.critical(f"❌ Failed to mount A2A Agent: {e}", exc_info=True) | |
| from fastapi import FastAPI as InnerFastAPI | |
| fallback = InnerFastAPI() | |
| async def fallback_endpoint(request: Request): | |
| return {"jsonrpc": "2.0", "result": {"text": f"Emergency Agent (Mount Error: {str(e)})", "data": {"status": "error"}}} | |
| app.mount("/a2a", fallback) | |
| agent_mount_status = f"error_{str(e)}" | |
| # --- Static Files & Frontend --- | |
| STATIC_DIR = "static" | |
| if os.path.isdir("/app/static"): | |
| STATIC_DIR = "/app/static" | |
| elif os.path.isdir("/usr/share/vchat/static"): | |
| STATIC_DIR = "/usr/share/vchat/static" | |
| elif os.path.isdir("frontend/dist"): | |
| STATIC_DIR = "frontend/dist" | |
| elif not os.path.isdir(STATIC_DIR): | |
| os.makedirs(STATIC_DIR, exist_ok=True) | |
| app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static") | |
| # Explicitly mount assets for Vite support | |
| assets_path = os.path.join(STATIC_DIR, "assets") | |
| if os.path.exists(assets_path): | |
| app.mount("/assets", StaticFiles(directory=assets_path), name="assets") | |
| # Safely Ensure all data directories exist | |
| data_dirs =[ | |
| "data", "data/videos", "data/labels", "data/prompts", | |
| "data/responses", "metadata", "data/profiles", | |
| "data/comments", "data/mnl_labeled", "data/models/sandbox_autogluon" | |
| ] | |
| for d in data_dirs: | |
| try: | |
| os.makedirs(d, exist_ok=True) | |
| except PermissionError as e: | |
| logger.warning(f"Permission denied creating directory {d}. Skipping. Error: {e}") | |
| except Exception as e: | |
| logger.warning(f"Failed to create directory {d}: {e}") | |
| if os.path.isdir("data/videos"): | |
| app.mount("/videos", StaticFiles(directory="data/videos"), name="videos") | |
| templates = Jinja2Templates(directory=STATIC_DIR) | |
| try: | |
| csv.field_size_limit(sys.maxsize) | |
| except OverflowError: | |
| csv.field_size_limit(2147483647) | |
| STOP_QUEUE_SIGNAL = False | |
| # --- CONSTANTS --- | |
| QUEUE_COLUMNS =["link", "ingest_timestamp", "status", "task_type"] | |
| GROUND_TRUTH_FIELDS =[ | |
| "id", "link", "timestamp", "caption", | |
| "visual_integrity_score", "audio_integrity_score", "source_credibility_score", | |
| "logical_consistency_score", "emotional_manipulation_score", | |
| "video_audio_score", "video_caption_score", "audio_caption_score", | |
| "final_veracity_score", "final_reasoning", | |
| "stats_likes", "stats_shares", "stats_comments", "stats_platform", | |
| "tags", "classification", "source" | |
| ] | |
| DATASET_COLUMNS =[ | |
| "id", "link", "timestamp", "caption", | |
| "final_veracity_score", "visual_score", "audio_score", "source_score", "logic_score", "emotion_score", | |
| "align_video_audio", "align_video_caption", "align_audio_caption", | |
| "classification", "reasoning", "tags", "raw_toon", | |
| "config_type", "config_model", "config_prompt", "config_reasoning", "config_params" | |
| ] | |
| def ensure_csv_schema(file_path: Path, fieldnames: list): | |
| if not file_path.exists(): return | |
| try: | |
| rows =[] | |
| with open(file_path, 'r', encoding='utf-8', errors='replace') as f: | |
| start_pos = f.tell() | |
| line = f.readline() | |
| if not line: return | |
| existing_header =[h.strip() for h in line.split(',')] | |
| missing =[col for col in fieldnames if col not in existing_header] | |
| if not missing: return | |
| f.seek(start_pos) | |
| dict_reader = csv.DictReader(f) | |
| rows = list(dict_reader) | |
| with open(file_path, 'w', newline='', encoding='utf-8') as f: | |
| writer = csv.DictWriter(f, fieldnames=fieldnames, extrasaction='ignore') | |
| writer.writeheader() | |
| for row in rows: writer.writerow(row) | |
| except Exception as e: logger.error(f"Schema migration error: {e}") | |
| def get_processed_indices(): | |
| processed_ids = set() | |
| processed_links = set() | |
| for filename in["data/dataset.csv", "data/manual_dataset.csv"]: | |
| path = Path(filename) | |
| for row in common_utils.robust_read_csv(path): | |
| if row.get('id'): processed_ids.add(row.get('id')) | |
| if row.get('link'): processed_links.add(common_utils.normalize_link(row.get('link'))) | |
| return processed_ids, processed_links | |
| def check_if_processed(link: str, processed_ids=None, processed_links=None) -> bool: | |
| target_id = common_utils.extract_tweet_id(link) | |
| link_clean = common_utils.normalize_link(link) | |
| if processed_ids is None or processed_links is None: | |
| p_ids, p_links = get_processed_indices() | |
| else: p_ids, p_links = processed_ids, processed_links | |
| return (target_id and target_id in p_ids) or (link_clean and link_clean in p_links) | |
| def update_queue_status(link: str, status: str, task_type: str = None): | |
| q_path = Path("data/batch_queue.csv") | |
| if not q_path.exists(): return | |
| rows =[] | |
| updated = False | |
| norm_target = common_utils.normalize_link(link) | |
| with open(q_path, 'r', encoding='utf-8') as f: | |
| reader = csv.DictReader(f) | |
| fieldnames = list(reader.fieldnames) if reader.fieldnames else list(QUEUE_COLUMNS) | |
| for f_name in QUEUE_COLUMNS: | |
| if f_name not in fieldnames: fieldnames.append(f_name) | |
| for row in reader: | |
| if "task_type" not in row or not row["task_type"]: row["task_type"] = "Ingest" | |
| if common_utils.normalize_link(row.get("link", "")) == norm_target: | |
| if task_type is None or row["task_type"] == task_type: | |
| row["status"] = status | |
| updated = True | |
| rows.append(row) | |
| if updated: | |
| with open(q_path, 'w', newline='', encoding='utf-8') as f: | |
| writer = csv.DictWriter(f, fieldnames=fieldnames, extrasaction='ignore') | |
| writer.writeheader() | |
| writer.writerows(rows) | |
| def log_queue_error(link: str, error_msg: str, task_type: str = None): | |
| p = Path("data/queue_errors.csv") | |
| with open(p, 'a', newline='', encoding='utf-8') as f: | |
| writer = csv.writer(f) | |
| if not p.exists() or p.stat().st_size == 0: writer.writerow(["link", "timestamp", "error"]) | |
| writer.writerow([link, datetime.datetime.now().isoformat(), error_msg]) | |
| update_queue_status(link, "Error", task_type) | |
| async def startup_event(): | |
| ensure_csv_schema(Path("data/dataset.csv"), DATASET_COLUMNS) | |
| ensure_csv_schema(Path("data/manual_dataset.csv"), GROUND_TRUTH_FIELDS) | |
| ensure_csv_schema(Path("data/batch_queue.csv"), QUEUE_COLUMNS) | |
| if not LITE_MODE: | |
| try: inference_logic.load_models() | |
| except Exception: pass | |
| async def health_check(): | |
| return {"status": "ok", "agent_mount": agent_mount_status} | |
| async def get_benchmark_stats(): | |
| return benchmarking.calculate_benchmarks() | |
| async def get_benchmark_leaderboard(): | |
| return benchmarking.generate_leaderboard() | |
| async def list_prompts(): | |
| return[{"id": k, "name": v['description']} for k, v in PROMPT_VARIANTS.items()] | |
| async def list_configured_tags(): | |
| path = Path("data/tags.json") | |
| if path.exists(): | |
| with open(path, 'r') as f: return json.load(f) | |
| return {} | |
| async def save_configured_tags(tags: dict = Body(...)): | |
| path = Path("data/tags.json") | |
| with open(path, 'w', encoding='utf-8') as f: json.dump(tags, f, indent=2) | |
| return {"status": "success"} | |
| async def list_all_tags(): | |
| tags_count = {} | |
| path = Path("data/dataset.csv") | |
| if path.exists(): | |
| for row in common_utils.robust_read_csv(path): | |
| t_str = row.get("tags", "") | |
| if t_str: | |
| for t in t_str.split(','): | |
| t = t.strip() | |
| if t: tags_count[t] = tags_count.get(t, 0) + 1 | |
| sorted_tags = sorted(tags_count.items(), key=lambda x: x[1], reverse=True) | |
| return[{"name": k, "count": v} for k, v in sorted_tags] | |
| async def extension_ingest_link(request: Request): | |
| try: | |
| data = await request.json() | |
| link = data.get("link") | |
| comments = data.get("comments",[]) | |
| if not link: | |
| raise HTTPException(status_code=400, detail="Link required") | |
| q_path = Path("data/batch_queue.csv") | |
| existing = set() | |
| if q_path.exists(): | |
| for r in common_utils.robust_read_csv(q_path): existing.add(common_utils.normalize_link(r.get('link'))) | |
| normalized = common_utils.normalize_link(link) | |
| if normalized not in existing: | |
| with open(q_path, 'a', newline='', encoding='utf-8') as f: | |
| writer = csv.DictWriter(f, fieldnames=QUEUE_COLUMNS, extrasaction='ignore') | |
| if not q_path.exists() or q_path.stat().st_size == 0: writer.writeheader() | |
| writer.writerow({"link": link.strip(), "ingest_timestamp": datetime.datetime.now().isoformat(), "status": "Pending", "task_type": "Ingest"}) | |
| if comments: | |
| tid = common_utils.extract_tweet_id(link) or hashlib.md5(link.encode()).hexdigest()[:10] | |
| context_path = Path(f"data/comments/{tid}_ingest.json") | |
| with open(context_path, 'w', encoding='utf-8') as f: | |
| json.dump({ | |
| "link": link, | |
| "timestamp": datetime.datetime.now().isoformat(), | |
| "comments": comments | |
| }, f, indent=2) | |
| logger.info(f"Saved {len(comments)} comments for ingestion context: {tid}") | |
| return {"status": "success", "link": link, "comments_saved": len(comments)} | |
| except Exception as e: | |
| logger.error(f"Ingest Error: {e}") | |
| return JSONResponse({"status": "error", "message": str(e)}, status_code=500) | |
| async def promote_to_ground_truth(request: Request): | |
| try: | |
| data = await request.json() | |
| target_ids = data.get("ids",[]) | |
| if not target_ids and data.get("id"): target_ids =[data.get("id")] | |
| if not target_ids: return JSONResponse({"status": "error", "message": "No IDs provided"}, status_code=400) | |
| ai_path = Path("data/dataset.csv") | |
| ai_rows = {} | |
| if ai_path.exists(): | |
| for row in common_utils.robust_read_csv(ai_path): | |
| if row.get('id'): ai_rows[str(row['id'])] = row | |
| manual_path = Path("data/manual_dataset.csv") | |
| manual_exists = manual_path.exists() | |
| existing_ids = set() | |
| if manual_exists: | |
| for row in common_utils.robust_read_csv(manual_path): | |
| if row.get('id'): existing_ids.add(str(row['id'])) | |
| new_rows =[] | |
| promoted_count = 0 | |
| for tid in target_ids: | |
| tid_str = str(tid) | |
| if tid_str in existing_ids: continue | |
| found_row = ai_rows.get(tid_str) | |
| if found_row: | |
| mapped_row = { | |
| "id": found_row.get("id"), "link": found_row.get("link"), | |
| "timestamp": datetime.datetime.now().isoformat(), "caption": found_row.get("caption"), | |
| "visual_integrity_score": found_row.get("visual_score", 0), | |
| "audio_integrity_score": found_row.get("audio_score", 0), | |
| "source_credibility_score": 5, "logical_consistency_score": found_row.get("logic_score", 0), | |
| "emotional_manipulation_score": 5, "video_audio_score": 5, | |
| "video_caption_score": found_row.get("align_video_caption", 0), "audio_caption_score": 5, | |
| "final_veracity_score": found_row.get("final_veracity_score", 0), | |
| "final_reasoning": found_row.get("reasoning", ""), | |
| "stats_likes": 0, "stats_shares": 0, "stats_comments": 0, "stats_platform": "twitter", | |
| "tags": found_row.get("tags", ""), "classification": found_row.get("classification", "None"), | |
| "source": "manual_promoted" | |
| } | |
| new_rows.append(mapped_row) | |
| promoted_count += 1 | |
| existing_ids.add(tid_str) | |
| if not new_rows: return {"status": "success", "promoted_count": 0} | |
| mode = 'a' if manual_exists else 'w' | |
| with open(manual_path, mode, newline='', encoding='utf-8') as f: | |
| writer = csv.DictWriter(f, fieldnames=GROUND_TRUTH_FIELDS, extrasaction='ignore') | |
| if not manual_exists or manual_path.stat().st_size == 0: writer.writeheader() | |
| for r in new_rows: writer.writerow(r) | |
| return {"status": "success", "promoted_count": promoted_count} | |
| except Exception as e: return JSONResponse({"status": "error", "message": str(e)}, status_code=500) | |
| async def delete_ground_truth(request: Request): | |
| try: | |
| data = await request.json() | |
| target_ids = data.get("ids",[]) | |
| if not target_ids and data.get("id"): target_ids =[data.get("id")] | |
| if not target_ids: raise HTTPException(status_code=400) | |
| target_ids =[str(t) for t in target_ids] | |
| manual_path = Path("data/manual_dataset.csv") | |
| if not manual_path.exists(): return {"status": "error", "message": "File not found"} | |
| rows =[] | |
| deleted_count = 0 | |
| with open(manual_path, 'r', encoding='utf-8') as f: | |
| reader = csv.DictReader(f) | |
| for row in reader: | |
| if str(row.get('id')) in target_ids: deleted_count += 1 | |
| else: rows.append(row) | |
| with open(manual_path, 'w', newline='', encoding='utf-8') as f: | |
| writer = csv.DictWriter(f, fieldnames=GROUND_TRUTH_FIELDS) | |
| writer.writeheader() | |
| writer.writerows(rows) | |
| return {"status": "success", "deleted_count": deleted_count} | |
| except Exception as e: return JSONResponse({"status": "error", "message": str(e)}, status_code=500) | |
| async def verify_queue_items(request: Request): | |
| try: | |
| data = await request.json() | |
| target_ids = data.get("ids",[]) | |
| resample_count = max(1, min(data.get("resample_count", 1), 100)) | |
| if not target_ids: return JSONResponse({"status": "error", "message": "No IDs provided"}, status_code=400) | |
| manual_path = Path("data/manual_dataset.csv") | |
| links_to_queue =[] | |
| if manual_path.exists(): | |
| for row in common_utils.robust_read_csv(manual_path): | |
| if str(row.get('id')) in target_ids: | |
| links_to_queue.append(row.get('link')) | |
| if not links_to_queue: | |
| return {"status": "error", "message": "No matching links found in Ground Truth."} | |
| q_path = Path("data/batch_queue.csv") | |
| added_count = 0 | |
| with open(q_path, 'a', newline='', encoding='utf-8') as f: | |
| writer = csv.DictWriter(f, fieldnames=QUEUE_COLUMNS, extrasaction='ignore') | |
| if not q_path.exists() or q_path.stat().st_size == 0: writer.writeheader() | |
| for link in links_to_queue: | |
| for _ in range(resample_count): | |
| writer.writerow({ | |
| "link": link.strip(), | |
| "ingest_timestamp": datetime.datetime.now().isoformat(), | |
| "status": "Pending", | |
| "task_type": "Verify" | |
| }) | |
| added_count += 1 | |
| return {"status": "success", "queued_count": added_count, "message": f"Added {added_count} items to queue for verification pipeline."} | |
| except Exception as e: | |
| return JSONResponse({"status": "error", "message": str(e)}, status_code=500) | |
| async def list_profiles(): | |
| profiles_dir = Path("data/profiles") | |
| profiles =[] | |
| if not profiles_dir.exists(): return profiles | |
| try: | |
| for d in profiles_dir.iterdir(): | |
| if d.is_dir(): | |
| hist = d / "history.csv" | |
| count = 0 | |
| if hist.exists(): | |
| with open(hist, 'r', encoding='utf-8', errors='ignore') as f: count = sum(1 for _ in f) - 1 | |
| profiles.append({"username": d.name, "posts_count": max(0, count)}) | |
| except Exception: pass | |
| return sorted(profiles, key=lambda x: x['username']) | |
| async def get_profile_posts(username: str): | |
| csv_path = Path(f"data/profiles/{username}/history.csv") | |
| posts =[] | |
| if not csv_path.exists(): return posts | |
| p_ids, p_links = get_processed_indices() | |
| try: | |
| for row in common_utils.robust_read_csv(csv_path): | |
| link = row.get('link', '') | |
| is_labeled = False | |
| t_id = common_utils.extract_tweet_id(link) | |
| if t_id and t_id in p_ids: is_labeled = True | |
| elif common_utils.normalize_link(link) in p_links: is_labeled = True | |
| row['is_labeled'] = is_labeled | |
| posts.append(row) | |
| except Exception: pass | |
| return posts | |
| async def ingest_user_history(request: Request): | |
| try: | |
| data = await request.json() | |
| username = data.get("username") | |
| posts = data.get("posts",[]) | |
| if not username or not posts: raise HTTPException(status_code=400) | |
| profile_dir = Path(f"data/profiles/{username}") | |
| profile_dir.mkdir(parents=True, exist_ok=True) | |
| csv_path = profile_dir / "history.csv" | |
| file_exists = csv_path.exists() | |
| existing = set() | |
| if file_exists: | |
| for row in common_utils.robust_read_csv(csv_path): existing.add(row.get('link')) | |
| with open(csv_path, 'a', newline='', encoding='utf-8') as f: | |
| fieldnames =["link", "timestamp", "text", "is_reply", "metric_replies", "metric_reposts", "metric_likes", "metric_views", "ingested_at"] | |
| writer = csv.DictWriter(f, fieldnames=fieldnames) | |
| if not file_exists: writer.writeheader() | |
| ts = datetime.datetime.now().isoformat() | |
| count = 0 | |
| for p in posts: | |
| if p['link'] not in existing: | |
| m = p.get('metrics', {}) | |
| writer.writerow({ | |
| "link": p.get('link'), "timestamp": p.get('timestamp'), | |
| "text": p.get('text', '').replace('\n', ' '), "is_reply": p.get('is_reply', False), | |
| "metric_replies": m.get('replies', 0), "metric_reposts": m.get('reposts', 0), | |
| "metric_likes": m.get('likes', 0), "metric_views": m.get('views', 0), | |
| "ingested_at": ts | |
| }) | |
| count += 1 | |
| return {"status": "success", "new_posts": count} | |
| except Exception as e: raise HTTPException(status_code=500, detail=str(e)) | |
| async def extension_save_comments(request: Request): | |
| try: | |
| data = await request.json() | |
| link = data.get("link") | |
| comments = data.get("comments",[]) | |
| if not link: raise HTTPException(status_code=400) | |
| tweet_id = common_utils.extract_tweet_id(link) or hashlib.md5(link.encode()).hexdigest()[:10] | |
| csv_path = Path(f"data/comments/{tweet_id}.csv") | |
| with open(csv_path, 'w', newline='', encoding='utf-8') as f: | |
| writer = csv.DictWriter(f, fieldnames=["author", "text", "link", "timestamp"]) | |
| writer.writeheader() | |
| ts = datetime.datetime.now().isoformat() | |
| for c in comments: | |
| writer.writerow({ | |
| "author": c.get("author", "Unknown"), | |
| "text": c.get("text", "").replace("\n", " "), | |
| "link": c.get("link", ""), | |
| "timestamp": ts | |
| }) | |
| return {"status": "success", "count": len(comments)} | |
| except Exception as e: raise HTTPException(status_code=500, detail=str(e)) | |
| async def save_manual_label(request: Request): | |
| try: | |
| data = await request.json() | |
| link = data.get("link") | |
| if not link: | |
| return JSONResponse({"status": "error", "message": "Link required"}, status_code=400) | |
| tweet_id = common_utils.extract_tweet_id(link) or hashlib.md5(link.encode()).hexdigest()[:10] | |
| labels = data.get("labels", data) | |
| row = { | |
| "id": tweet_id, "link": link, "timestamp": datetime.datetime.now().isoformat(), | |
| "caption": data.get("caption", ""), | |
| "visual_integrity_score": labels.get("visual_integrity_score", 0), | |
| "audio_integrity_score": labels.get("audio_integrity_score", 0), | |
| "source_credibility_score": labels.get("source_credibility_score", 0), | |
| "logical_consistency_score": labels.get("logical_consistency_score", 0), | |
| "emotional_manipulation_score": labels.get("emotional_manipulation_score", 5), | |
| "video_audio_score": labels.get("video_audio_score", 0), | |
| "video_caption_score": labels.get("video_caption_score", 0), | |
| "audio_caption_score": labels.get("audio_caption_score", 0), | |
| "final_veracity_score": labels.get("final_veracity_score", 0), | |
| "final_reasoning": labels.get("reasoning", labels.get("final_reasoning", "")), | |
| "stats_likes": 0, "stats_shares": 0, "stats_comments": 0, "stats_platform": "twitter", | |
| "tags": data.get("tags", labels.get("tags", "")), | |
| "classification": labels.get("classification", "None"), | |
| "source": "Manual" | |
| } | |
| tag_str = str(row["tags"]) | |
| tag_list =[t.strip() for t in tag_str.split(',') if t.strip()] | |
| deep_json = { | |
| "veracity_vectors": { | |
| "visual_integrity_score": str(row["visual_integrity_score"]), | |
| "audio_integrity_score": str(row["audio_integrity_score"]), | |
| "source_credibility_score": str(row["source_credibility_score"]), | |
| "logical_consistency_score": str(row["logical_consistency_score"]), | |
| "emotional_manipulation_score": str(row["emotional_manipulation_score"]) | |
| }, | |
| "modalities": { | |
| "video_audio_score": str(row["video_audio_score"]), | |
| "video_caption_score": str(row["video_caption_score"]), | |
| "audio_caption_score": str(row["audio_caption_score"]) | |
| }, | |
| "video_context_summary": row["caption"], | |
| "tags": tag_list, | |
| "factuality_factors": { | |
| "claim_accuracy": "Manual", | |
| "evidence_gap": "Manual Verification", | |
| "grounding_check": "Manual Verification" | |
| }, | |
| "disinformation_analysis": { | |
| "classification": row["classification"], | |
| "intent": "Manual Labeling", | |
| "threat_vector": "Manual Labeling" | |
| }, | |
| "final_assessment": { | |
| "veracity_score_total": str(row["final_veracity_score"]), | |
| "reasoning": row["final_reasoning"] | |
| }, | |
| "raw_parsed_structure": { | |
| "summary": {"text": row["caption"]}, | |
| "tags": {"keywords": row["tags"]}, | |
| "final": {"score": str(row["final_veracity_score"]), "reasoning": row["final_reasoning"]} | |
| }, | |
| "meta_info": { | |
| "id": tweet_id, | |
| "timestamp": row["timestamp"], | |
| "link": link, | |
| "model_selection": "Manual" | |
| } | |
| } | |
| json_path_direct = Path(f"data/labels/{tweet_id}.json") | |
| with open(json_path_direct, 'w', encoding='utf-8') as jf: | |
| json.dump(deep_json, jf, indent=2, ensure_ascii=False) | |
| with open(Path(f"data/mnl_labeled/{tweet_id}.json"), 'w', encoding='utf-8') as jf: | |
| json.dump(row, jf, indent=2, ensure_ascii=False) | |
| manual_path = Path("data/manual_dataset.csv") | |
| exists = manual_path.exists() | |
| ensure_csv_schema(manual_path, GROUND_TRUTH_FIELDS) | |
| rows =[] | |
| found = False | |
| if exists: | |
| for r in common_utils.robust_read_csv(manual_path): | |
| if str(r.get('id')) == str(tweet_id): | |
| clean_row = {k: row.get(k, "") for k in GROUND_TRUTH_FIELDS} | |
| rows.append(clean_row) | |
| found = True | |
| else: | |
| rows.append(r) | |
| if not found: | |
| clean_row = {k: row.get(k, "") for k in GROUND_TRUTH_FIELDS} | |
| rows.append(clean_row) | |
| with open(manual_path, 'w', newline='', encoding='utf-8') as f: | |
| writer = csv.DictWriter(f, fieldnames=GROUND_TRUTH_FIELDS, extrasaction='ignore') | |
| writer.writeheader() | |
| writer.writerows(rows) | |
| # Add to User Profiles Catalog | |
| author = common_utils.extract_twitter_username(link) | |
| if author: | |
| prof_dir = Path(f"data/profiles/{author}") | |
| prof_dir.mkdir(parents=True, exist_ok=True) | |
| hist_path = prof_dir / "history.csv" | |
| hist_exists = hist_path.exists() | |
| existing_links = set() | |
| if hist_exists: | |
| for r in common_utils.robust_read_csv(hist_path): | |
| existing_links.add(r.get('link')) | |
| if link not in existing_links: | |
| with open(hist_path, 'a', newline='', encoding='utf-8') as hf: | |
| fieldnames =["link", "timestamp", "text", "is_reply", "metric_replies", "metric_reposts", "metric_likes", "metric_views", "ingested_at"] | |
| hwriter = csv.DictWriter(hf, fieldnames=fieldnames, extrasaction='ignore') | |
| if not hist_exists: hwriter.writeheader() | |
| hwriter.writerow({ | |
| "link": link, | |
| "timestamp": row["timestamp"], | |
| "text": row["caption"], | |
| "ingested_at": row["timestamp"] | |
| }) | |
| update_queue_status(link, "Processed") | |
| return {"status": "success", "id": tweet_id} | |
| except Exception as e: | |
| logger.error(f"Save Manual Error: {e}") | |
| return JSONResponse({"status": "error", "message": str(e)}, status_code=500) | |
| async def get_dataset_list(): | |
| dataset =[] | |
| m_path = Path("data/manual_dataset.csv") | |
| manual_ids = set() | |
| if m_path.exists(): | |
| for row in common_utils.robust_read_csv(m_path): | |
| row['source'] = 'Manual' | |
| if row.get('id'): manual_ids.add(str(row['id'])) | |
| dataset.append(row) | |
| path = Path("data/dataset.csv") | |
| if path.exists(): | |
| for row in common_utils.robust_read_csv(path): | |
| tid = str(row.get('id', '')) | |
| if tid not in manual_ids: | |
| row['source'] = 'AI' | |
| dataset.append(row) | |
| return sorted(dataset, key=lambda x: x.get('timestamp', ''), reverse=True) | |
| async def get_account_integrity(): | |
| id_map = {} | |
| prof_dir = Path("data/profiles") | |
| prof_dir.mkdir(parents=True, exist_ok=True) | |
| existing_links_per_user = {} | |
| if prof_dir.exists(): | |
| for d in prof_dir.iterdir(): | |
| if d.is_dir(): | |
| hist_file = d / "history.csv" | |
| existing_links_per_user[d.name] = set() | |
| if hist_file.exists(): | |
| for row in common_utils.robust_read_csv(hist_file): | |
| link = row.get('link', '') | |
| tid = common_utils.extract_tweet_id(link) | |
| if tid: id_map[tid] = d.name | |
| existing_links_per_user[d.name].add(link) | |
| scores_map = {} | |
| for fname in["data/dataset.csv", "data/manual_dataset.csv"]: | |
| path = Path(fname) | |
| if not path.exists(): continue | |
| for row in common_utils.robust_read_csv(path): | |
| tid = row.get('id') | |
| link = row.get('link', '') | |
| sc = row.get('final_veracity_score', '0') | |
| ts = row.get('timestamp', '') | |
| caption = row.get('caption', '') | |
| try: val = float(re.sub(r'[^\d.]', '', str(sc))) | |
| except: val = -1 | |
| # Require scores to be between 0 and 100 | |
| if 0 <= val <= 100: | |
| auth = common_utils.extract_twitter_username(link) or id_map.get(tid, "Unknown") | |
| if auth and auth != "Unknown": | |
| if auth not in scores_map: scores_map[auth] = [] | |
| scores_map[auth].append({'val': val, 'ts': ts}) | |
| # Auto-add missing accounts/links to the Profile catalog | |
| if auth not in existing_links_per_user: | |
| existing_links_per_user[auth] = set() | |
| Path(f"data/profiles/{auth}").mkdir(parents=True, exist_ok=True) | |
| if link not in existing_links_per_user[auth]: | |
| existing_links_per_user[auth].add(link) | |
| hist_path = Path(f"data/profiles/{auth}/history.csv") | |
| hist_exists = hist_path.exists() | |
| with open(hist_path, 'a', newline='', encoding='utf-8') as hf: | |
| fieldnames =["link", "timestamp", "text", "is_reply", "metric_replies", "metric_reposts", "metric_likes", "metric_views", "ingested_at"] | |
| hwriter = csv.DictWriter(hf, fieldnames=fieldnames, extrasaction='ignore') | |
| if not hist_exists: hwriter.writeheader() | |
| hwriter.writerow({ | |
| "link": link, | |
| "timestamp": ts, | |
| "text": caption, | |
| "ingested_at": ts | |
| }) | |
| results =[] | |
| for k, v in scores_map.items(): | |
| v_sorted = sorted(v, key=lambda x: x['ts'], reverse=True) | |
| decay_factor = 0.9 | |
| total_weight = 0 | |
| weighted_sum = 0 | |
| for i, item in enumerate(v_sorted): | |
| weight = decay_factor ** i | |
| weighted_sum += item['val'] * weight | |
| total_weight += weight | |
| avg_veracity = round(weighted_sum / total_weight, 1) if total_weight > 0 else 0 | |
| results.append({"username": k, "avg_veracity": avg_veracity, "posts_labeled": len(v)}) | |
| return sorted(results, key=lambda x: x['avg_veracity'], reverse=True) | |
| async def add_queue_item(link: str = Body(..., embed=True)): | |
| q_path = Path("data/batch_queue.csv") | |
| existing = set() | |
| if q_path.exists(): | |
| for r in common_utils.robust_read_csv(q_path): existing.add(common_utils.normalize_link(r.get('link'))) | |
| normalized = common_utils.normalize_link(link) | |
| if not normalized: raise HTTPException(status_code=400, detail="Invalid link") | |
| if normalized in existing: return {"status": "ignored", "message": "Link already in queue"} | |
| with open(q_path, 'a', newline='', encoding='utf-8') as f: | |
| writer = csv.DictWriter(f, fieldnames=QUEUE_COLUMNS, extrasaction='ignore') | |
| if not q_path.exists() or q_path.stat().st_size == 0: writer.writeheader() | |
| writer.writerow({"link": link.strip(), "ingest_timestamp": datetime.datetime.now().isoformat(), "status": "Pending", "task_type": "Ingest"}) | |
| return {"status": "success", "link": link} | |
| async def upload_csv(file: UploadFile = File(...)): | |
| contents = await file.read() | |
| lines = contents.decode('utf-8').splitlines() | |
| q_path = Path("data/batch_queue.csv") | |
| existing = set() | |
| if q_path.exists(): | |
| for r in common_utils.robust_read_csv(q_path): existing.add(common_utils.normalize_link(r.get('link'))) | |
| added = 0 | |
| with open(q_path, 'a', newline='', encoding='utf-8') as f: | |
| writer = csv.DictWriter(f, fieldnames=QUEUE_COLUMNS, extrasaction='ignore') | |
| if not q_path.exists() or q_path.stat().st_size == 0: writer.writeheader() | |
| for line in lines: | |
| if 'http' in line: | |
| raw = line.split(',')[0].strip() | |
| if common_utils.normalize_link(raw) not in existing: | |
| writer.writerow({"link": raw, "ingest_timestamp": datetime.datetime.now().isoformat(), "status": "Pending", "task_type": "Ingest"}) | |
| added += 1 | |
| return {"status": "success", "added_count": added} | |
| async def stop_processing(): | |
| global STOP_QUEUE_SIGNAL | |
| STOP_QUEUE_SIGNAL = True | |
| return {"status": "success", "message": "Stopping queue processing..."} | |
| async def clear_processed_queue(): | |
| q_path = Path("data/batch_queue.csv") | |
| if not q_path.exists(): return {"status": "success", "removed_count": 0} | |
| p_ids, p_links = get_processed_indices() | |
| kept_rows =[] | |
| removed_count = 0 | |
| for row in common_utils.robust_read_csv(q_path): | |
| link = row.get("link") | |
| status = row.get("status", "Pending") | |
| task_type = row.get("task_type", "Ingest") | |
| is_done = False | |
| if status == "Processed": is_done = True | |
| elif task_type != "Verify" and check_if_processed(link, p_ids, p_links): is_done = True | |
| if is_done: removed_count += 1 | |
| else: kept_rows.append(row) | |
| with open(q_path, 'w', newline='', encoding='utf-8') as f: | |
| writer = csv.DictWriter(f, fieldnames=QUEUE_COLUMNS, extrasaction='ignore') | |
| writer.writeheader() | |
| writer.writerows(kept_rows) | |
| return {"status": "success", "removed_count": removed_count} | |
| async def delete_queue_items(request: Request): | |
| try: | |
| data = await request.json() | |
| target_links = set(common_utils.normalize_link(l) for l in data.get("links",[])) | |
| q_path = Path("data/batch_queue.csv") | |
| if not q_path.exists(): return {"status": "success", "count": 0} | |
| kept_rows =[] | |
| deleted_count = 0 | |
| for row in common_utils.robust_read_csv(q_path): | |
| if common_utils.normalize_link(row.get('link')) in target_links: deleted_count += 1 | |
| else: kept_rows.append(row) | |
| with open(q_path, 'w', newline='', encoding='utf-8') as f: | |
| writer = csv.DictWriter(f, fieldnames=QUEUE_COLUMNS, extrasaction='ignore') | |
| writer.writeheader() | |
| writer.writerows(kept_rows) | |
| return {"status": "success", "count": deleted_count} | |
| except Exception as e: return JSONResponse({"status": "error", "message": str(e)}, status_code=500) | |
| async def requeue_items(request: Request): | |
| try: | |
| data = await request.json() | |
| target_links = set(common_utils.normalize_link(l) for l in data.get("links",[])) | |
| q_path = Path("data/batch_queue.csv") | |
| if not q_path.exists(): return {"status": "success", "count": 0} | |
| rows =[] | |
| requeued_count = 0 | |
| for row in common_utils.robust_read_csv(q_path): | |
| if common_utils.normalize_link(row.get('link')) in target_links: | |
| row['status'] = 'Pending' | |
| requeued_count += 1 | |
| rows.append(row) | |
| with open(q_path, 'w', newline='', encoding='utf-8') as f: | |
| writer = csv.DictWriter(f, fieldnames=QUEUE_COLUMNS, extrasaction='ignore') | |
| writer.writeheader() | |
| writer.writerows(rows) | |
| return {"status": "success", "count": requeued_count} | |
| except Exception as e: return JSONResponse({"status": "error", "message": str(e)}, status_code=500) | |
| async def delete_dataset_items(request: Request): | |
| try: | |
| data = await request.json() | |
| target_ids = data.get("ids",[]) | |
| if not target_ids: raise HTTPException(status_code=400) | |
| target_ids = set(str(t) for t in target_ids) | |
| path = Path("data/dataset.csv") | |
| if not path.exists(): return {"status": "success", "count": 0} | |
| rows =[] | |
| deleted_count = 0 | |
| for row in common_utils.robust_read_csv(path): | |
| if str(row.get('id')) in target_ids: | |
| deleted_count += 1 | |
| else: | |
| rows.append(row) | |
| with open(path, 'w', newline='', encoding='utf-8') as f: | |
| writer = csv.DictWriter(f, fieldnames=DATASET_COLUMNS, extrasaction='ignore') | |
| writer.writeheader() | |
| writer.writerows(rows) | |
| return {"status": "success", "deleted_count": deleted_count} | |
| except Exception as e: | |
| return JSONResponse({"status": "error", "message": str(e)}, status_code=500) | |
| async def analyze_user_context(request: Request): | |
| try: | |
| data = await request.json() | |
| rep = await user_analysis_logic.generate_user_profile_report(data.get("username")) | |
| return {"status": "success", "report": rep} | |
| except Exception as e: return JSONResponse({"error": str(e)}, status_code=500) | |
| async def read_root(request: Request): | |
| return templates.TemplateResponse("index.html", {"request": request}) | |
| async def get_queue_list(): | |
| q_path = Path("data/batch_queue.csv") | |
| items =[] | |
| p_ids, p_links = get_processed_indices() | |
| for row in common_utils.robust_read_csv(q_path): | |
| if row: | |
| l = row.get("link") | |
| status = row.get("status", "Pending") | |
| task_type = row.get("task_type") or "Ingest" | |
| if status == "Pending" and task_type != "Verify" and check_if_processed(l, p_ids, p_links): status = "Processed" | |
| # Fetch associated comments to display in the dropdown | |
| comments =[] | |
| tid = common_utils.extract_tweet_id(l) or hashlib.md5(l.encode()).hexdigest()[:10] | |
| c_path = Path(f"data/comments/{tid}_ingest.json") | |
| if c_path.exists(): | |
| try: | |
| with open(c_path, 'r') as f: | |
| c_data = json.load(f) | |
| comments = c_data.get('comments',[]) | |
| except Exception: | |
| pass | |
| items.append({ | |
| "link": l, | |
| "timestamp": row.get("ingest_timestamp",""), | |
| "status": status, | |
| "task_type": task_type, | |
| "comments": comments | |
| }) | |
| return items | |
| async def run_queue_processing( | |
| model_selection: str = Form(...), | |
| gemini_api_key: str = Form(""), gemini_model_name: str = Form(""), | |
| vertex_project_id: str = Form(""), vertex_location: str = Form(""), vertex_model_name: str = Form(""), vertex_api_key: str = Form(""), | |
| nrp_api_key: str = Form(""), nrp_model_name: str = Form(""), nrp_base_url: str = Form("https://ellm.nrp-nautilus.io/v1"), | |
| include_comments: bool = Form(False), reasoning_method: str = Form("cot"), prompt_template: str = Form("standard"), | |
| custom_query: str = Form(""), max_reprompts: int = Form(1), | |
| use_search: bool = Form(False), use_code: bool = Form(False) | |
| ): | |
| global STOP_QUEUE_SIGNAL | |
| STOP_QUEUE_SIGNAL = False | |
| gemini_config = {"api_key": gemini_api_key, "model_name": gemini_model_name, "max_retries": max_reprompts, "use_search": use_search, "use_code": use_code} | |
| vertex_config = {"project_id": vertex_project_id, "location": vertex_location, "model_name": vertex_model_name, "api_key": vertex_api_key, "max_retries": max_reprompts, "use_search": use_search, "use_code": use_code} | |
| nrp_config = {"api_key": nrp_api_key, "model_name": nrp_model_name, "base_url": nrp_base_url, "max_retries": max_reprompts, "use_search": use_search, "use_code": use_code} | |
| sel_p = PROMPT_VARIANTS.get(prompt_template, PROMPT_VARIANTS['standard']) | |
| system_persona_txt = sel_p['instruction'] | |
| if custom_query.strip(): system_persona_txt += f"\n\nSPECIAL INSTRUCTION FOR THIS BATCH: {custom_query}" | |
| if model_selection == 'vertex': | |
| active_config = vertex_config | |
| active_model_name = vertex_model_name | |
| elif model_selection == 'nrp': | |
| active_config = nrp_config | |
| active_model_name = nrp_model_name | |
| else: | |
| active_config = gemini_config | |
| active_model_name = gemini_model_name | |
| config_params_dict = { | |
| "reprompts": max_reprompts, | |
| "include_comments": include_comments, | |
| "agent_active": False, | |
| "use_search": use_search, | |
| "use_code": use_code | |
| } | |
| config_params_str = json.dumps(config_params_dict) | |
| async def queue_stream(): | |
| q_path = Path("data/batch_queue.csv") | |
| items =[r for r in common_utils.robust_read_csv(q_path) if r.get("link") and r.get("status", "Pending") == "Pending"] | |
| p_ids, p_links = get_processed_indices() | |
| yield f"data:[SYSTEM] Persona: {sel_p['description']}\n\n" | |
| for item in items: | |
| link = item.get("link") | |
| task_type = item.get("task_type") or "Ingest" | |
| if STOP_QUEUE_SIGNAL: | |
| yield f"data:[SYSTEM] Stopping by user request.\n\n" | |
| break | |
| if task_type != "Verify" and check_if_processed(link, p_ids, p_links): | |
| update_queue_status(link, "Processed", task_type) | |
| continue | |
| gt_data = None | |
| if task_type == "Verify": | |
| manual_path = Path("data/manual_dataset.csv") | |
| if manual_path.exists(): | |
| for row in common_utils.robust_read_csv(manual_path): | |
| if common_utils.normalize_link(row.get('link', '')) == common_utils.normalize_link(link): | |
| gt_data = row | |
| break | |
| yield f"data:[START] {link} (Type: {task_type})\n\n" | |
| tid = common_utils.extract_tweet_id(link) or hashlib.md5(link.encode()).hexdigest()[:10] | |
| assets = await common_utils.prepare_video_assets(link, tid) | |
| if not assets or (not assets.get('video') and not assets.get('caption')): | |
| log_queue_error(link, "Download/Fetch Error", task_type) | |
| yield f"data: - Download Error.\n\n" | |
| continue | |
| trans = common_utils.parse_vtt(assets['transcript']) if assets.get('transcript') else "No transcript (Audio/Video missing)." | |
| video_file = assets.get('video') | |
| if not video_file: | |
| yield f"data: - No video found. Text-only analysis.\n\n" | |
| video_file = None | |
| else: yield f"data: - Video found. Inferencing...\n\n" | |
| comments_path = Path(f"data/comments/{tid}_ingest.json") | |
| current_system_persona = system_persona_txt | |
| if comments_path.exists(): | |
| try: | |
| with open(comments_path, 'r') as f: | |
| c_data = json.load(f) | |
| comments = c_data.get('comments',[]) | |
| if comments: | |
| yield f"data: - Found {len(comments)} comments. Generating Community Context...\n\n" | |
| community_summary = await inference_logic.generate_community_summary(comments, model_selection, active_config) | |
| current_system_persona += f"\n\n### COMMUNITY NOTES / CONTEXT (from Comments):\n{community_summary}\n\nUse this community context to cross-reference claims but remain objective." | |
| yield f"data: - Context Generated.\n\n" | |
| except Exception as e: | |
| logger.error(f"Error processing comments for context: {e}") | |
| res_data = None | |
| if model_selection == 'gemini': | |
| async for chunk in inference_logic.run_gemini_labeling_pipeline(video_file, assets['caption'], trans, gemini_config, include_comments, reasoning_method, current_system_persona, request_id=tid): | |
| if isinstance(chunk, str): yield f"data: - {chunk}\n\n" | |
| else: res_data = chunk | |
| elif model_selection == 'vertex': | |
| async for chunk in inference_logic.run_vertex_labeling_pipeline(video_file, assets['caption'], trans, vertex_config, include_comments, reasoning_method, current_system_persona, request_id=tid): | |
| if isinstance(chunk, str): yield f"data: - {chunk}\n\n" | |
| else: res_data = chunk | |
| elif model_selection == 'nrp': | |
| async for chunk in inference_logic.run_nrp_labeling_pipeline(video_file, assets['caption'], trans, nrp_config, include_comments, reasoning_method, current_system_persona, request_id=tid): | |
| if isinstance(chunk, str): yield f"data: - {chunk}\n\n" | |
| else: res_data = chunk | |
| if res_data and "parsed_data" in res_data: | |
| parsed = res_data["parsed_data"] | |
| d_path = Path("data/dataset.csv") | |
| ensure_csv_schema(d_path, DATASET_COLUMNS) | |
| exists = d_path.exists() | |
| ai_score_val = parsed['final_assessment'].get('veracity_score_total', 0) | |
| try: ai_score = float(ai_score_val) | |
| except: ai_score = 0 | |
| if task_type == "Verify" and gt_data is not None: | |
| gt_final = float(gt_data.get('final_veracity_score', 0)) | |
| delta = abs(ai_score - gt_final) | |
| vec_ai = parsed.get('veracity_vectors', {}) | |
| mod_ai = parsed.get('modalities', {}) | |
| def s_float(v): | |
| try: return float(v) | |
| except: return 0.0 | |
| yield f"data: -[VERIFICATION PIPELINE] Configuration Analysis:\n" | |
| yield f"data: Model: {active_model_name} | Provider: {model_selection}\n" | |
| yield f"data: Reasoning: {reasoning_method} | Prompt: {prompt_template} | Reprompts: {max_reprompts}\n" | |
| yield f"data: -[VERIFICATION SCORES COMPARISON (AI vs Ground Truth)]\n" | |
| yield f"data: Visual Integrity : AI {s_float(vec_ai.get('visual_integrity_score'))} | GT {s_float(gt_data.get('visual_integrity_score'))}\n" | |
| yield f"data: Audio Integrity : AI {s_float(vec_ai.get('audio_integrity_score'))} | GT {s_float(gt_data.get('audio_integrity_score'))}\n" | |
| yield f"data: Source Credibility : AI {s_float(vec_ai.get('source_credibility_score'))} | GT {s_float(gt_data.get('source_credibility_score'))}\n" | |
| yield f"data: Logical Consistency: AI {s_float(vec_ai.get('logical_consistency_score'))} | GT {s_float(gt_data.get('logical_consistency_score'))}\n" | |
| yield f"data: Emotional Manipul. : AI {s_float(vec_ai.get('emotional_manipulation_score'))} | GT {s_float(gt_data.get('emotional_manipulation_score'))}\n" | |
| yield f"data: Video-Audio Align : AI {s_float(mod_ai.get('video_audio_score'))} | GT {s_float(gt_data.get('video_audio_score'))}\n" | |
| yield f"data: Video-Caption Align: AI {s_float(mod_ai.get('video_caption_score'))} | GT {s_float(gt_data.get('video_caption_score'))}\n" | |
| yield f"data: Audio-Caption Align: AI {s_float(mod_ai.get('audio_caption_score'))} | GT {s_float(gt_data.get('audio_caption_score'))}\n" | |
| yield f"data: FINAL VERACITY : AI {ai_score} | GT {gt_final} | Delta: {delta}\n\n" | |
| comp_path = Path("data/comparison.csv") | |
| comp_exists = comp_path.exists() | |
| with open(comp_path, 'a', newline='', encoding='utf-8') as cf: | |
| cw = csv.DictWriter(cf, fieldnames=["id", "link", "timestamp", "gt_score", "ai_score", "delta", "model", "prompt", "reasoning_method"]) | |
| if not comp_exists: cw.writeheader() | |
| cw.writerow({ | |
| "id": tid, "link": link, "timestamp": datetime.datetime.now().isoformat(), | |
| "gt_score": gt_final, "ai_score": ai_score, "delta": delta, | |
| "model": active_model_name, "prompt": prompt_template, "reasoning_method": reasoning_method | |
| }) | |
| try: | |
| with open(d_path, 'a', newline='', encoding='utf-8') as f: | |
| row = { | |
| "id": tid, "link": link, "timestamp": datetime.datetime.now().isoformat(), | |
| "caption": assets['caption'], | |
| "final_veracity_score": ai_score, | |
| "visual_score": parsed['veracity_vectors'].get('visual_integrity_score', 0), | |
| "audio_score": parsed['veracity_vectors'].get('audio_integrity_score', 0), | |
| "source_score": parsed['veracity_vectors'].get('source_credibility_score', 0), | |
| "logic_score": parsed['veracity_vectors'].get('logical_consistency_score', 0), | |
| "emotion_score": parsed['veracity_vectors'].get('emotional_manipulation_score', 0), | |
| "align_video_audio": parsed['modalities'].get('video_audio_score', 0), | |
| "align_video_caption": parsed['modalities'].get('video_caption_score', 0), | |
| "align_audio_caption": parsed['modalities'].get('audio_caption_score', 0), | |
| "classification": parsed['disinformation_analysis'].get('classification', 'None'), | |
| "reasoning": parsed['final_assessment'].get('reasoning', ''), | |
| "tags": ",".join(parsed.get('tags',[])), | |
| "raw_toon": res_data.get("raw_toon", ""), | |
| "config_type": "GenAI", | |
| "config_model": active_model_name, | |
| "config_prompt": prompt_template, | |
| "config_reasoning": reasoning_method, | |
| "config_params": config_params_str | |
| } | |
| writer = csv.DictWriter(f, fieldnames=DATASET_COLUMNS, extrasaction='ignore') | |
| if not exists: writer.writeheader() | |
| writer.writerow(row) | |
| except Exception as csv_err: logger.error(f"CSV Write Failed: {csv_err}") | |
| try: | |
| ts = datetime.datetime.now().isoformat() | |
| ts_clean = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") | |
| flat_parsed = parsed.copy() | |
| flat_parsed["raw_toon"] = res_data.get("raw_toon", "") | |
| flat_parsed["meta_info"] = { | |
| "id": tid, "timestamp": ts, "link": link, | |
| "prompt_used": res_data.get("prompt_used", ""), | |
| "model_selection": model_selection, | |
| "config_type": "GenAI", | |
| "config_model": active_model_name, | |
| "config_prompt": prompt_template, | |
| "config_reasoning": reasoning_method, | |
| "config_params": config_params_dict | |
| } | |
| with open(Path(f"data/labels/{tid}_{ts_clean}.json"), 'w', encoding='utf-8') as f: json.dump(flat_parsed, f, indent=2, ensure_ascii=False) | |
| except Exception as e: logger.error(f"Sidecar Error: {e}") | |
| # Add to User Profiles Catalog | |
| author = common_utils.extract_twitter_username(link) | |
| if author: | |
| prof_dir = Path(f"data/profiles/{author}") | |
| prof_dir.mkdir(parents=True, exist_ok=True) | |
| hist_path = prof_dir / "history.csv" | |
| hist_exists = hist_path.exists() | |
| existing_links = set() | |
| if hist_exists: | |
| for r in common_utils.robust_read_csv(hist_path): | |
| existing_links.add(r.get('link')) | |
| if link not in existing_links: | |
| with open(hist_path, 'a', newline='', encoding='utf-8') as hf: | |
| fieldnames =["link", "timestamp", "text", "is_reply", "metric_replies", "metric_reposts", "metric_likes", "metric_views", "ingested_at"] | |
| hwriter = csv.DictWriter(hf, fieldnames=fieldnames, extrasaction='ignore') | |
| if not hist_exists: hwriter.writeheader() | |
| hwriter.writerow({ | |
| "link": link, | |
| "timestamp": datetime.datetime.now().isoformat(), | |
| "text": assets['caption'], | |
| "ingested_at": datetime.datetime.now().isoformat() | |
| }) | |
| p_ids.add(tid) | |
| p_links.add(common_utils.normalize_link(link)) | |
| update_queue_status(link, "Processed", task_type) | |
| yield f"data:[SUCCESS] Saved.\n\n" | |
| else: | |
| err_msg = res_data.get('error') if isinstance(res_data, dict) else "Inference failed" | |
| log_queue_error(link, err_msg, task_type) | |
| yield f"data: [FAIL] {err_msg}.\n\n" | |
| await asyncio.sleep(0.5) | |
| yield "event: close\ndata: Done\n\n" | |
| return StreamingResponse(queue_stream(), media_type="text/event-stream") |