|
|
import os |
|
|
import sys |
|
|
import asyncio |
|
|
import subprocess |
|
|
from pathlib import Path |
|
|
import logging |
|
|
import csv |
|
|
import io |
|
|
import datetime |
|
|
import json |
|
|
import hashlib |
|
|
import re |
|
|
from fastapi import FastAPI, Request, Form, UploadFile, File, Body, HTTPException |
|
|
from fastapi.responses import HTMLResponse, StreamingResponse, PlainTextResponse, Response, FileResponse, JSONResponse |
|
|
from fastapi.templating import Jinja2Templates |
|
|
from fastapi.staticfiles import StaticFiles |
|
|
from fastapi.middleware.cors import CORSMiddleware |
|
|
import yt_dlp |
|
|
import inference_logic |
|
|
import factuality_logic |
|
|
import transcription |
|
|
from factuality_logic import parse_vtt |
|
|
from toon_parser import parse_veracity_toon |
|
|
|
|
|
|
|
|
try: |
|
|
csv.field_size_limit(sys.maxsize) |
|
|
except OverflowError: |
|
|
csv.field_size_limit(2147483647) |
|
|
|
|
|
try: |
|
|
import mlcroissant as mlc |
|
|
CROISSANT_AVAILABLE = True |
|
|
except ImportError: |
|
|
try: |
|
|
import croissant as mlc |
|
|
CROISSANT_AVAILABLE = True |
|
|
except ImportError: |
|
|
mlc = None |
|
|
CROISSANT_AVAILABLE = False |
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s") |
|
|
LITE_MODE = os.getenv("LITE_MODE", "false").lower() == "true" |
|
|
|
|
|
app = FastAPI() |
|
|
|
|
|
app.add_middleware( |
|
|
CORSMiddleware, |
|
|
allow_origins=["*"], |
|
|
allow_credentials=True, |
|
|
allow_methods=["*"], |
|
|
allow_headers=["*"], |
|
|
) |
|
|
|
|
|
STATIC_DIR = "static" |
|
|
if os.path.isdir("/usr/share/vchat/static"): |
|
|
STATIC_DIR = "/usr/share/vchat/static" |
|
|
elif os.path.isdir("frontend/dist"): |
|
|
STATIC_DIR = "frontend/dist" |
|
|
elif not os.path.isdir(STATIC_DIR): |
|
|
os.makedirs(STATIC_DIR, exist_ok=True) |
|
|
dummy_index = Path(STATIC_DIR) / "index.html" |
|
|
if not dummy_index.exists(): |
|
|
dummy_index.write_text("<html><body>vChat Backend Running. Access via Port 8005 (Go Server).</body></html>") |
|
|
|
|
|
app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static") |
|
|
templates = Jinja2Templates(directory=STATIC_DIR) |
|
|
|
|
|
os.makedirs("data/videos", exist_ok=True) |
|
|
os.makedirs("data", exist_ok=True) |
|
|
os.makedirs("data/labels", exist_ok=True) |
|
|
os.makedirs("data/prompts", exist_ok=True) |
|
|
os.makedirs("data/responses", exist_ok=True) |
|
|
os.makedirs("metadata", exist_ok=True) |
|
|
|
|
|
STOP_QUEUE_SIGNAL = False |
|
|
|
|
|
|
|
|
def robust_read_csv(file_path: Path): |
|
|
""" |
|
|
Reads a CSV file tolerantly. Yields dictionaries. |
|
|
Handles 'line contains NUL', formatting errors, etc. by skipping bad rows. |
|
|
""" |
|
|
if not file_path.exists(): |
|
|
return |
|
|
|
|
|
with open(file_path, 'r', encoding='utf-8', errors='replace') as f: |
|
|
|
|
|
try: |
|
|
reader = csv.DictReader(f) |
|
|
|
|
|
while True: |
|
|
try: |
|
|
row = next(reader) |
|
|
yield row |
|
|
except StopIteration: |
|
|
break |
|
|
except csv.Error as e: |
|
|
logging.warning(f"CSV Parse Error in {file_path}: {e}") |
|
|
continue |
|
|
except Exception as e: |
|
|
logging.error(f"Failed to initialize CSV reader for {file_path}: {e}") |
|
|
return |
|
|
|
|
|
def ensure_manual_dataset(): |
|
|
"""Ensures the manual dataset file exists with headers.""" |
|
|
p = Path("data/manual_dataset.csv") |
|
|
if not p.exists(): |
|
|
with open(p, 'w', newline='', encoding='utf-8') as f: |
|
|
|
|
|
writer = csv.writer(f) |
|
|
writer.writerow([ |
|
|
"id", "link", "caption", "collecttime", "source", |
|
|
"visual_integrity_score", "audio_integrity_score", "source_credibility_score", |
|
|
"logical_consistency_score", "emotional_manipulation_score", |
|
|
"video_audio_score", "video_caption_score", "audio_caption_score", |
|
|
"final_veracity_score", "final_reasoning", |
|
|
"stats_likes", "stats_shares", "stats_comments", "stats_platform", "tags" |
|
|
]) |
|
|
|
|
|
@app.on_event("startup") |
|
|
async def startup_event(): |
|
|
logging.info("Application starting up...") |
|
|
ensure_manual_dataset() |
|
|
if not LITE_MODE: |
|
|
try: |
|
|
inference_logic.load_models() |
|
|
transcription.load_model() |
|
|
except Exception as e: |
|
|
logging.fatal(f"Could not load models. Error: {e}", exc_info=True) |
|
|
else: |
|
|
logging.info("Running in LITE mode.") |
|
|
|
|
|
@app.get("/", response_class=HTMLResponse) |
|
|
async def read_root(request: Request): |
|
|
custom_model_available = False |
|
|
if not LITE_MODE: |
|
|
custom_model_available = inference_logic.peft_model is not None |
|
|
if not (Path(STATIC_DIR) / "index.html").exists(): |
|
|
return HTMLResponse(content="Frontend not found. Please build frontend or access via Go server.", status_code=404) |
|
|
return templates.TemplateResponse("index.html", { |
|
|
"request": request, |
|
|
"custom_model_available": custom_model_available, |
|
|
"lite_mode": LITE_MODE |
|
|
}) |
|
|
|
|
|
@app.get("/model-architecture", response_class=PlainTextResponse) |
|
|
async def get_model_architecture(): |
|
|
if LITE_MODE: return "Running in LITE mode." |
|
|
if inference_logic.base_model: return str(inference_logic.base_model) |
|
|
return "Model not loaded." |
|
|
|
|
|
@app.get("/download-dataset") |
|
|
async def download_dataset(): |
|
|
file_path = Path("data/dataset.csv") |
|
|
if file_path.exists(): |
|
|
return FileResponse(path=file_path, filename="dataset.csv", media_type='text/csv') |
|
|
return Response("Dataset not found.", status_code=404) |
|
|
|
|
|
progress_message = "" |
|
|
def progress_hook(d): |
|
|
global progress_message |
|
|
if d['status'] == 'downloading': |
|
|
progress_message = f"Downloading: {d.get('_percent_str', 'N/A')} at {d.get('_speed_str', 'N/A')}\r" |
|
|
elif d['status'] == 'finished': |
|
|
progress_message = f"\nDownload finished. Preparing video assets...\n" |
|
|
|
|
|
async def run_subprocess_async(command: list[str]): |
|
|
process = await asyncio.create_subprocess_exec(*command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
|
|
stdout, stderr = await process.communicate() |
|
|
if process.returncode != 0: |
|
|
raise RuntimeError(f"Process failed:\n{stderr.decode()}") |
|
|
return stdout.decode() |
|
|
|
|
|
def extract_tweet_id(url: str) -> str | None: |
|
|
match = re.search(r"(?:twitter|x)\.com/[^/]+/status/(\d+)", url) |
|
|
if match: return match.group(1) |
|
|
return None |
|
|
|
|
|
def normalize_link(link: str) -> str: |
|
|
"""Standardize links for comparison.""" |
|
|
if not link: return "" |
|
|
|
|
|
s = link.split('?')[0].strip().rstrip('/').replace('http://', '').replace('https://', '').replace('www.', '') |
|
|
return s |
|
|
|
|
|
def check_if_processed(link: str) -> bool: |
|
|
target_id = extract_tweet_id(link) |
|
|
link_clean = normalize_link(link) |
|
|
|
|
|
for filename in ["data/dataset.csv", "data/manual_dataset.csv"]: |
|
|
path = Path(filename) |
|
|
for row in robust_read_csv(path): |
|
|
row_link = normalize_link(row.get('link', '')) |
|
|
if row_link and row_link == link_clean: return True |
|
|
|
|
|
row_id = row.get('id', '') |
|
|
if target_id and row_id == target_id: return True |
|
|
return False |
|
|
|
|
|
async def prepare_video_assets_async(url: str) -> dict: |
|
|
global progress_message |
|
|
loop = asyncio.get_event_loop() |
|
|
is_local = not (url.startswith("http://") or url.startswith("https://")) |
|
|
video_id = "unknown" |
|
|
transcript_path = None |
|
|
|
|
|
if is_local: |
|
|
original_path = Path(url) |
|
|
if not original_path.exists(): raise FileNotFoundError(f"File not found: {url}") |
|
|
video_id = hashlib.md5(str(url).encode('utf-8')).hexdigest()[:16] |
|
|
metadata = {"id": video_id, "link": url, "caption": original_path.stem} |
|
|
else: |
|
|
tweet_id = extract_tweet_id(url) |
|
|
video_id = tweet_id if tweet_id else hashlib.md5(url.encode('utf-8')).hexdigest()[:16] |
|
|
sanitized_check = Path(f"data/videos/{video_id}_fixed.mp4") |
|
|
|
|
|
ydl_opts = { |
|
|
'format': 'best[ext=mp4]/best', |
|
|
'outtmpl': 'data/videos/%(id)s.%(ext)s', |
|
|
'progress_hooks': [progress_hook], 'quiet': True, 'noplaylist': True, 'no_overwrites': True, |
|
|
'writesubtitles': True, 'writeautomaticsub': True, 'subtitleslangs': ['en'] |
|
|
} |
|
|
|
|
|
if sanitized_check.exists(): |
|
|
original_path = sanitized_check |
|
|
metadata = {"id": video_id, "link": url, "caption": "Cached Video"} |
|
|
else: |
|
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl: |
|
|
info = await loop.run_in_executor(None, lambda: ydl.extract_info(url, download=True)) |
|
|
original_path = Path(ydl.prepare_filename(info)) |
|
|
metadata = { |
|
|
"id": info.get("id", video_id), "link": info.get("webpage_url", url), |
|
|
"caption": info.get("description", info.get("title", "N/A")).encode('ascii', 'ignore').decode('ascii').strip()[:500], |
|
|
"postdatetime": info.get("upload_date", "N/A") |
|
|
} |
|
|
video_id = info.get("id", video_id) |
|
|
|
|
|
transcript_path = next(Path("data/videos").glob(f"{video_id}*.en.vtt"), None) |
|
|
if not transcript_path: transcript_path = next(Path("data/videos").glob(f"{video_id}*.vtt"), None) |
|
|
|
|
|
sanitized_path = Path(f"data/videos/{video_id}_fixed.mp4") |
|
|
if not sanitized_path.exists() and original_path.exists(): |
|
|
await run_subprocess_async(["ffmpeg", "-i", str(original_path), "-c:v", "libx264", "-preset", "fast", "-crf", "23", "-c:a", "aac", "-y", str(sanitized_path)]) |
|
|
|
|
|
audio_path = sanitized_path.with_suffix('.wav') |
|
|
if not audio_path.exists() and sanitized_path.exists(): |
|
|
try: |
|
|
await run_subprocess_async(["ffmpeg", "-i", str(sanitized_path), "-vn", "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1", "-y", str(audio_path)]) |
|
|
except: pass |
|
|
|
|
|
if not transcript_path and audio_path.exists() and not LITE_MODE: |
|
|
transcript_path = await loop.run_in_executor(None, transcription.generate_transcript, str(audio_path)) |
|
|
|
|
|
return {"video": str(sanitized_path), "transcript": str(transcript_path) if transcript_path else None, "metadata": metadata} |
|
|
|
|
|
def safe_int(value): |
|
|
try: |
|
|
clean = re.sub(r'[^\d]', '', str(value)) |
|
|
return int(clean) if clean else 0 |
|
|
except Exception: |
|
|
return 0 |
|
|
|
|
|
async def generate_and_save_croissant_metadata(row_data: dict) -> str: |
|
|
try: |
|
|
sanitized_data = { |
|
|
"id": str(row_data.get("id", "")), |
|
|
"link": str(row_data.get("link", "")), |
|
|
"visual_integrity_score": safe_int(row_data.get("visual_integrity_score")), |
|
|
"final_veracity_score": safe_int(row_data.get("final_veracity_score")) |
|
|
} |
|
|
video_id = sanitized_data["id"] |
|
|
timestamp = datetime.datetime.now().strftime('%Y%m%d%H%M%S') |
|
|
croissant_json = { |
|
|
"@context": "https://schema.org/", |
|
|
"@type": "Dataset", |
|
|
"name": f"vchat-label-{video_id}", |
|
|
"description": f"Veracity analysis labels for video {video_id}", |
|
|
"url": sanitized_data["link"], |
|
|
"variableMeasured": sanitized_data |
|
|
} |
|
|
path = Path("metadata") / f"{video_id}_{timestamp}.json" |
|
|
path.write_text(json.dumps(croissant_json, indent=2)) |
|
|
return str(path) |
|
|
except Exception: |
|
|
return "N/A (Error)" |
|
|
|
|
|
async def get_labels_for_link(video_url: str, gemini_config: dict, vertex_config: dict, model_selection: str, include_comments: bool, reasoning_method: str = "cot"): |
|
|
try: |
|
|
yield f"Downloading assets for {video_url}..." |
|
|
paths = await prepare_video_assets_async(video_url) |
|
|
video_path = paths["video"] |
|
|
transcript_text = parse_vtt(paths["transcript"]) if paths["transcript"] else "No transcript." |
|
|
caption = paths["metadata"].get("caption", "") |
|
|
|
|
|
yield f"Assets ready. Running inference ({model_selection}, {reasoning_method.upper()})..." |
|
|
final_labels = None |
|
|
raw_toon = "" |
|
|
prompt_used = "" |
|
|
|
|
|
pipeline = inference_logic.run_gemini_labeling_pipeline if model_selection == 'gemini' else inference_logic.run_vertex_labeling_pipeline |
|
|
config = gemini_config if model_selection == 'gemini' else vertex_config |
|
|
|
|
|
async for msg in pipeline(video_path, caption, transcript_text, config, include_comments, reasoning_method): |
|
|
if isinstance(msg, dict) and "parsed_data" in msg: |
|
|
final_labels = msg["parsed_data"] |
|
|
raw_toon = msg.get("raw_toon", "") |
|
|
prompt_used = msg.get("prompt_used", "") |
|
|
elif isinstance(msg, str): yield msg |
|
|
|
|
|
if not final_labels: raise RuntimeError("No labels generated.") |
|
|
|
|
|
final_labels["meta_info"] = { |
|
|
"prompt_used": prompt_used, |
|
|
"model_selection": model_selection, |
|
|
"reasoning_method": reasoning_method |
|
|
} |
|
|
|
|
|
vec = final_labels.get("veracity_vectors", {}) |
|
|
mod = final_labels.get("modalities", {}) |
|
|
fin = final_labels.get("final_assessment", {}) |
|
|
tags = final_labels.get("tags", []) |
|
|
|
|
|
row = { |
|
|
"id": paths["metadata"]["id"], |
|
|
"link": paths["metadata"]["link"], |
|
|
"caption": caption, |
|
|
"postdatetime": paths["metadata"].get("postdatetime", ""), |
|
|
"collecttime": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), |
|
|
"videotranscriptionpath": paths["transcript"] or "", |
|
|
"visual_integrity_score": vec.get("visual_integrity_score", "0"), |
|
|
"audio_integrity_score": vec.get("audio_integrity_score", "0"), |
|
|
"source_credibility_score": vec.get("source_credibility_score", "0"), |
|
|
"logical_consistency_score": vec.get("logical_consistency_score", "0"), |
|
|
"emotional_manipulation_score": vec.get("emotional_manipulation_score", "0"), |
|
|
"video_audio_score": mod.get("video_audio_score", "0"), |
|
|
"video_caption_score": mod.get("video_caption_score", "0"), |
|
|
"audio_caption_score": mod.get("audio_caption_score", "0"), |
|
|
"final_veracity_score": fin.get("veracity_score_total", "0"), |
|
|
"final_reasoning": fin.get("reasoning", ""), |
|
|
"tags": ", ".join(tags) |
|
|
} |
|
|
yield {"csv_row": row, "full_json": final_labels, "raw_toon": raw_toon} |
|
|
|
|
|
except Exception as e: |
|
|
yield {"error": str(e)} |
|
|
|
|
|
@app.get("/queue/list") |
|
|
async def get_queue_list(): |
|
|
queue_path = Path("data/batch_queue.csv") |
|
|
items = [] |
|
|
|
|
|
for row in robust_read_csv(queue_path): |
|
|
if len(row) > 0: |
|
|
link = row.get("link") |
|
|
if not link: continue |
|
|
status = "Processed" if check_if_processed(link) else "Pending" |
|
|
items.append({ |
|
|
"link": link, |
|
|
"timestamp": row.get("ingest_timestamp", ""), |
|
|
"status": status |
|
|
}) |
|
|
return items |
|
|
|
|
|
@app.delete("/queue/delete") |
|
|
async def delete_queue_item(link: str): |
|
|
queue_path = Path("data/batch_queue.csv") |
|
|
if not queue_path.exists(): |
|
|
return {"status": "error", "message": "Queue file not found"} |
|
|
|
|
|
rows = [] |
|
|
deleted = False |
|
|
try: |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
with open(queue_path, 'r', encoding='utf-8', errors='replace') as f: |
|
|
reader = csv.reader(f) |
|
|
try: header = next(reader) |
|
|
except StopIteration: header = ["link", "ingest_timestamp"] |
|
|
|
|
|
all_rows = list(robust_read_csv(queue_path)) |
|
|
new_rows = [] |
|
|
for row in all_rows: |
|
|
if row.get("link") == link: |
|
|
deleted = True |
|
|
else: |
|
|
new_rows.append(row) |
|
|
|
|
|
if deleted: |
|
|
with open(queue_path, 'w', newline='', encoding='utf-8') as f: |
|
|
writer = csv.DictWriter(f, fieldnames=header) |
|
|
writer.writeheader() |
|
|
writer.writerows(new_rows) |
|
|
return {"status": "success", "link": link} |
|
|
else: |
|
|
return {"status": "not_found", "message": "Link not found in queue"} |
|
|
|
|
|
except Exception as e: |
|
|
return {"status": "error", "message": str(e)} |
|
|
|
|
|
@app.post("/queue/stop") |
|
|
async def stop_queue_processing(): |
|
|
global STOP_QUEUE_SIGNAL |
|
|
STOP_QUEUE_SIGNAL = True |
|
|
return {"status": "stopping"} |
|
|
|
|
|
@app.post("/queue/upload_csv") |
|
|
async def upload_csv_to_queue(file: UploadFile = File(...)): |
|
|
try: |
|
|
content = await file.read() |
|
|
decoded = content.decode('utf-8').splitlines() |
|
|
reader = csv.reader(decoded) |
|
|
links_to_add = [] |
|
|
header = next(reader, None) |
|
|
if not header: return {"status": "empty file"} |
|
|
|
|
|
link_idx = 0 |
|
|
header_lower = [h.lower() for h in header] |
|
|
if "link" in header_lower: link_idx = header_lower.index("link") |
|
|
elif "url" in header_lower: link_idx = header_lower.index("url") |
|
|
elif "http" in header[0]: |
|
|
links_to_add.append(header[0]) |
|
|
link_idx = 0 |
|
|
|
|
|
for row in reader: |
|
|
if len(row) > link_idx and row[link_idx].strip(): |
|
|
links_to_add.append(row[link_idx].strip()) |
|
|
|
|
|
queue_path = Path("data/batch_queue.csv") |
|
|
existing_links = set() |
|
|
if queue_path.exists(): |
|
|
with open(queue_path, 'r', encoding='utf-8', errors='replace') as f: |
|
|
existing_links = set(f.read().splitlines()) |
|
|
|
|
|
added_count = 0 |
|
|
with open(queue_path, 'a', newline='', encoding='utf-8') as f: |
|
|
writer = csv.writer(f) |
|
|
if not queue_path.exists() or queue_path.stat().st_size == 0: |
|
|
writer.writerow(["link", "ingest_timestamp"]) |
|
|
|
|
|
for link in links_to_add: |
|
|
duplicate = False |
|
|
for line in existing_links: |
|
|
if link in line: |
|
|
duplicate = True |
|
|
break |
|
|
if duplicate: continue |
|
|
|
|
|
writer.writerow([link, datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")]) |
|
|
added_count += 1 |
|
|
|
|
|
return {"status": "success", "added": added_count} |
|
|
except Exception as e: |
|
|
return JSONResponse(status_code=400, content={"error": str(e), "status": "failed"}) |
|
|
|
|
|
@app.post("/queue/run") |
|
|
async def run_queue_processing( |
|
|
model_selection: str = Form(...), |
|
|
gemini_api_key: str = Form(""), gemini_model_name: str = Form(""), |
|
|
vertex_project_id: str = Form(""), vertex_location: str = Form(""), vertex_model_name: str = Form(""), vertex_api_key: str = Form(""), |
|
|
include_comments: bool = Form(False), |
|
|
reasoning_method: str = Form("cot") |
|
|
): |
|
|
global STOP_QUEUE_SIGNAL |
|
|
STOP_QUEUE_SIGNAL = False |
|
|
gemini_config = {"api_key": gemini_api_key, "model_name": gemini_model_name} |
|
|
vertex_config = {"project_id": vertex_project_id, "location": vertex_location, "model_name": vertex_model_name, "api_key": vertex_api_key} |
|
|
|
|
|
async def queue_stream(): |
|
|
queue_path = Path("data/batch_queue.csv") |
|
|
items = [] |
|
|
for row in robust_read_csv(queue_path): |
|
|
l = row.get("link") |
|
|
if l: items.append(l) |
|
|
|
|
|
if not items: |
|
|
yield "data: Queue empty.\n\n" |
|
|
return |
|
|
|
|
|
processed_count = 0 |
|
|
total = len(items) |
|
|
|
|
|
for i, link in enumerate(items): |
|
|
if STOP_QUEUE_SIGNAL: |
|
|
yield "data: [SYSTEM] Stopped by user.\n\n" |
|
|
break |
|
|
|
|
|
if check_if_processed(link): |
|
|
yield f"data: [SKIP] {link} processed.\n\n" |
|
|
continue |
|
|
|
|
|
yield f"data: [START] {i+1}/{total}: {link}\n\n" |
|
|
final_data = None |
|
|
async for res in get_labels_for_link(link, gemini_config, vertex_config, model_selection, include_comments, reasoning_method): |
|
|
if isinstance(res, str): yield f"data: {res}\n\n" |
|
|
if isinstance(res, dict) and "csv_row" in res: final_data = res |
|
|
|
|
|
if final_data: |
|
|
row = final_data["csv_row"] |
|
|
vid_id = row["id"] |
|
|
ts = datetime.datetime.now().strftime('%Y%m%d%H%M%S') |
|
|
|
|
|
|
|
|
json_path = f"data/labels/{vid_id}_{ts}_labels.json" |
|
|
with open(json_path, 'w') as f: json.dump(final_data["full_json"], f, indent=2) |
|
|
|
|
|
|
|
|
with open(f"data/labels/{vid_id}_{ts}.toon", 'w') as f: f.write(final_data["raw_toon"]) |
|
|
|
|
|
|
|
|
prompt_content = final_data.get("full_json", {}).get("meta_info", {}).get("prompt_used", "") |
|
|
if prompt_content: |
|
|
with open(f"data/prompts/{vid_id}_{ts}_prompt.txt", 'w', encoding='utf-8') as f: |
|
|
f.write(prompt_content) |
|
|
|
|
|
|
|
|
raw_response = final_data.get("raw_toon", "") |
|
|
if raw_response: |
|
|
with open(f"data/responses/{vid_id}.txt", 'w', encoding='utf-8') as f: |
|
|
f.write(raw_response) |
|
|
|
|
|
row["metadatapath"] = await generate_and_save_croissant_metadata(row) |
|
|
row["json_path"] = json_path |
|
|
|
|
|
|
|
|
dpath = Path("data/dataset.csv") |
|
|
exists = dpath.exists() |
|
|
with open(dpath, 'a', newline='', encoding='utf-8') as f: |
|
|
writer = csv.DictWriter(f, fieldnames=list(row.keys()), extrasaction='ignore') |
|
|
if not exists: writer.writeheader() |
|
|
writer.writerow(row) |
|
|
|
|
|
processed_count += 1 |
|
|
yield f"data: [SUCCESS] Labeled.\n\n" |
|
|
else: |
|
|
yield f"data: [FAIL] Failed to label.\n\n" |
|
|
|
|
|
yield f"data: Batch Complete. +{processed_count} videos labeled.\n\n" |
|
|
yield "event: close\ndata: Done\n\n" |
|
|
|
|
|
return StreamingResponse(queue_stream(), media_type="text/event-stream") |
|
|
|
|
|
@app.post("/extension/ingest") |
|
|
async def extension_ingest(request: Request): |
|
|
try: |
|
|
data = await request.json() |
|
|
link = data.get("link") |
|
|
if not link: raise HTTPException(status_code=400, detail="No link") |
|
|
queue_path = Path("data/batch_queue.csv") |
|
|
file_exists = queue_path.exists() |
|
|
|
|
|
if file_exists: |
|
|
with open(queue_path, 'r', encoding='utf-8', errors='replace') as f: |
|
|
if link in f.read(): |
|
|
return {"status": "queued", "msg": "Duplicate"} |
|
|
|
|
|
with open(queue_path, 'a', newline='', encoding='utf-8') as f: |
|
|
writer = csv.writer(f) |
|
|
if not file_exists: writer.writerow(["link", "ingest_timestamp"]) |
|
|
writer.writerow([link, datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")]) |
|
|
|
|
|
return {"status": "queued", "link": link} |
|
|
except Exception as e: |
|
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
@app.post("/extension/save_comments") |
|
|
async def extension_save_comments(request: Request): |
|
|
try: |
|
|
data = await request.json() |
|
|
link = data.get("link") |
|
|
|
|
|
comments = data.get("comments", []) |
|
|
if not link or not comments: raise HTTPException(status_code=400, detail="Missing data") |
|
|
|
|
|
csv_path = Path("data/comments.csv") |
|
|
exists = csv_path.exists() |
|
|
|
|
|
|
|
|
fieldnames = ["link", "author", "comment_text", "timestamp"] |
|
|
|
|
|
with open(csv_path, 'a', newline='', encoding='utf-8') as f: |
|
|
writer = csv.DictWriter(f, fieldnames=fieldnames, extrasaction='ignore') |
|
|
if not exists: writer.writeheader() |
|
|
|
|
|
ts = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") |
|
|
for c in comments: |
|
|
row = {"link": link, "timestamp": ts} |
|
|
if isinstance(c, dict): |
|
|
row["author"] = c.get("author", "Unknown") |
|
|
row["comment_text"] = c.get("text", "").strip() |
|
|
else: |
|
|
|
|
|
row["author"] = "Unknown" |
|
|
row["comment_text"] = str(c).strip() |
|
|
|
|
|
if row["comment_text"]: |
|
|
writer.writerow(row) |
|
|
|
|
|
return {"status": "saved", "count": len(comments)} |
|
|
except Exception as e: |
|
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
@app.post("/extension/save_manual") |
|
|
async def extension_save_manual(request: Request): |
|
|
try: |
|
|
data = await request.json() |
|
|
link = data.get("link") |
|
|
labels = data.get("labels", {}) |
|
|
stats = data.get("stats", {}) |
|
|
tags = data.get("tags", "") |
|
|
|
|
|
if not link: raise HTTPException(status_code=400, detail="No link") |
|
|
|
|
|
video_id = extract_tweet_id(link) or hashlib.md5(link.encode()).hexdigest()[:16] |
|
|
|
|
|
|
|
|
ensure_manual_dataset() |
|
|
|
|
|
|
|
|
row_data = { |
|
|
"id": video_id, |
|
|
"link": link, |
|
|
"caption": data.get("caption", ""), |
|
|
"collecttime": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), |
|
|
"source": "manual_extension", |
|
|
|
|
|
|
|
|
"visual_integrity_score": labels.get("visual_integrity_score", 0), |
|
|
"audio_integrity_score": labels.get("audio_integrity_score", 0), |
|
|
"source_credibility_score": labels.get("source_credibility_score", 0), |
|
|
"logical_consistency_score": labels.get("logical_consistency_score", 0), |
|
|
"emotional_manipulation_score": labels.get("emotional_manipulation_score", 0), |
|
|
|
|
|
|
|
|
"video_audio_score": labels.get("video_audio_score", 0), |
|
|
"video_caption_score": labels.get("video_caption_score", 0), |
|
|
"audio_caption_score": labels.get("audio_caption_score", 0), |
|
|
|
|
|
"final_veracity_score": labels.get("final_veracity_score", 0), |
|
|
"final_reasoning": labels.get("reasoning", ""), |
|
|
|
|
|
|
|
|
"stats_likes": stats.get("likes", 0), |
|
|
"stats_shares": stats.get("shares", 0), |
|
|
"stats_comments": stats.get("comments", 0), |
|
|
"stats_platform": stats.get("platform", "unknown"), |
|
|
"tags": tags |
|
|
} |
|
|
|
|
|
|
|
|
dpath = Path("data/manual_dataset.csv") |
|
|
rows = [] |
|
|
replaced = False |
|
|
|
|
|
|
|
|
if dpath.exists(): |
|
|
rows = list(robust_read_csv(dpath)) |
|
|
|
|
|
new_rows = [] |
|
|
for r in rows: |
|
|
if r.get('id') == video_id: |
|
|
new_rows.append(row_data) |
|
|
replaced = True |
|
|
else: |
|
|
new_rows.append(r) |
|
|
|
|
|
if not replaced: |
|
|
new_rows.append(row_data) |
|
|
|
|
|
|
|
|
with open(dpath, 'w', newline='', encoding='utf-8') as f: |
|
|
writer = csv.DictWriter(f, fieldnames=list(row_data.keys()), extrasaction='ignore') |
|
|
writer.writeheader() |
|
|
writer.writerows(new_rows) |
|
|
|
|
|
|
|
|
ai_path = Path("data/dataset.csv") |
|
|
ai_data = None |
|
|
if ai_path.exists(): |
|
|
for row in robust_read_csv(ai_path): |
|
|
|
|
|
r_link = normalize_link(row.get('link', '')) |
|
|
t_link = normalize_link(link) |
|
|
|
|
|
if r_link == t_link or row.get('id') == video_id: |
|
|
ai_data = row |
|
|
break |
|
|
|
|
|
if ai_data: |
|
|
|
|
|
comp_path = Path("data/comparison.csv") |
|
|
comp_exists = comp_path.exists() |
|
|
|
|
|
|
|
|
def get_int(d, k): |
|
|
try: |
|
|
|
|
|
val = str(d.get(k, 0)) |
|
|
val = re.sub(r'[^\d]', '', val) |
|
|
return int(val) if val else 0 |
|
|
except: return 0 |
|
|
|
|
|
comparison_row = { |
|
|
"id": video_id, |
|
|
"link": link, |
|
|
"timestamp": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), |
|
|
|
|
|
|
|
|
"ai_visual": get_int(ai_data, "visual_integrity_score"), |
|
|
"manual_visual": row_data["visual_integrity_score"], |
|
|
"delta_visual": get_int(ai_data, "visual_integrity_score") - row_data["visual_integrity_score"], |
|
|
|
|
|
|
|
|
"ai_audio": get_int(ai_data, "audio_integrity_score"), |
|
|
"manual_audio": row_data["audio_integrity_score"], |
|
|
"delta_audio": get_int(ai_data, "audio_integrity_score") - row_data["audio_integrity_score"], |
|
|
|
|
|
|
|
|
"ai_source": get_int(ai_data, "source_credibility_score"), |
|
|
"manual_source": row_data["source_credibility_score"], |
|
|
"delta_source": get_int(ai_data, "source_credibility_score") - row_data["source_credibility_score"], |
|
|
|
|
|
|
|
|
"ai_logic": get_int(ai_data, "logical_consistency_score"), |
|
|
"manual_logic": row_data["logical_consistency_score"], |
|
|
"delta_logic": get_int(ai_data, "logical_consistency_score") - row_data["logical_consistency_score"], |
|
|
|
|
|
|
|
|
"ai_final": get_int(ai_data, "final_veracity_score"), |
|
|
"manual_final": row_data["final_veracity_score"], |
|
|
"delta_final": get_int(ai_data, "final_veracity_score") - row_data["final_veracity_score"] |
|
|
} |
|
|
|
|
|
|
|
|
comp_rows = [] |
|
|
if comp_exists: |
|
|
comp_rows = list(robust_read_csv(comp_path)) |
|
|
|
|
|
final_comp_rows = [] |
|
|
comp_replaced = False |
|
|
for cr in comp_rows: |
|
|
if cr.get('id') == video_id: |
|
|
final_comp_rows.append(comparison_row) |
|
|
comp_replaced = True |
|
|
else: |
|
|
final_comp_rows.append(cr) |
|
|
if not comp_replaced: |
|
|
final_comp_rows.append(comparison_row) |
|
|
|
|
|
with open(comp_path, 'w', newline='', encoding='utf-8') as f: |
|
|
writer = csv.DictWriter(f, fieldnames=list(comparison_row.keys()), extrasaction='ignore') |
|
|
writer.writeheader() |
|
|
writer.writerows(final_comp_rows) |
|
|
|
|
|
return {"status": "saved", "compared": True if ai_data else False} |
|
|
except Exception as e: |
|
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
@app.get("/workflow/status") |
|
|
async def get_workflow_status(): |
|
|
""" |
|
|
Returns a list of all known links (from queue and AI dataset), |
|
|
indicating whether they have been Manually Labeled. |
|
|
Matches primarily on ID (Tweet ID or Hash), falling back to Link. |
|
|
""" |
|
|
all_links = {} |
|
|
|
|
|
def get_canonical_key(link, rid=None): |
|
|
|
|
|
tid = extract_tweet_id(link) |
|
|
if tid: return tid |
|
|
|
|
|
if rid and str(rid).strip(): return str(rid).strip() |
|
|
|
|
|
return normalize_link(link) |
|
|
|
|
|
|
|
|
qp = Path("data/batch_queue.csv") |
|
|
for row in robust_read_csv(qp): |
|
|
url = row.get("link", "").strip() |
|
|
if url: |
|
|
|
|
|
key = get_canonical_key(url) |
|
|
all_links[key] = { |
|
|
"link": url, "source": "queue", |
|
|
"ai_status": "Pending", "manual_status": "Pending", |
|
|
"ai_data": None |
|
|
} |
|
|
|
|
|
|
|
|
dp = Path("data/dataset.csv") |
|
|
for row in robust_read_csv(dp): |
|
|
url = row.get("link", "").strip() |
|
|
rid = row.get("id", "").strip() |
|
|
|
|
|
|
|
|
key = get_canonical_key(url, rid) |
|
|
|
|
|
if key not in all_links: |
|
|
all_links[key] = {"link": url, "source": "dataset", "manual_status": "Pending"} |
|
|
|
|
|
all_links[key]["ai_status"] = "Labeled" |
|
|
all_links[key]["ai_data"] = { |
|
|
"visual": row.get("visual_integrity_score"), |
|
|
"final": row.get("final_veracity_score"), |
|
|
"reasoning": row.get("final_reasoning"), |
|
|
"tags": row.get("tags", "") |
|
|
} |
|
|
|
|
|
|
|
|
mp = Path("data/manual_dataset.csv") |
|
|
for row in robust_read_csv(mp): |
|
|
url = row.get("link", "").strip() |
|
|
rid = row.get("id", "").strip() |
|
|
|
|
|
key = get_canonical_key(url, rid) |
|
|
|
|
|
if key in all_links: |
|
|
all_links[key]["manual_status"] = "Completed" |
|
|
all_links[key]["manual_tags"] = row.get("tags", "") |
|
|
else: |
|
|
|
|
|
all_links[key] = { |
|
|
"link": url, "source": "manual_only", |
|
|
"ai_status": "Unknown", "manual_status": "Completed", |
|
|
"manual_tags": row.get("tags", "") |
|
|
} |
|
|
|
|
|
return list(all_links.values()) |
|
|
|
|
|
@app.get("/manage/list") |
|
|
async def list_data(): |
|
|
data = [] |
|
|
|
|
|
|
|
|
manual_index = set() |
|
|
mp = Path("data/manual_dataset.csv") |
|
|
for row in robust_read_csv(mp): |
|
|
if row.get('link'): manual_index.add(normalize_link(row['link'])) |
|
|
if row.get('id'): manual_index.add(row['id'].strip()) |
|
|
|
|
|
def read_csv(path, source_type): |
|
|
for row in robust_read_csv(path): |
|
|
if not row.get('id') or row['id'].strip() == "": |
|
|
link = row.get('link', '') |
|
|
tid = extract_tweet_id(link) |
|
|
row['id'] = tid if tid else hashlib.md5(link.encode()).hexdigest()[:16] |
|
|
|
|
|
json_content = None |
|
|
if row.get('json_path') and os.path.exists(row['json_path']): |
|
|
try: |
|
|
with open(row['json_path'], 'r') as jf: json_content = json.load(jf) |
|
|
except: pass |
|
|
|
|
|
row['source_type'] = source_type |
|
|
row['json_data'] = json_content |
|
|
|
|
|
|
|
|
if source_type == "auto": |
|
|
lid = row.get('id') |
|
|
llink = normalize_link(row.get('link', '')) |
|
|
if lid in manual_index or llink in manual_index: |
|
|
row['manual_verification_status'] = "Verified" |
|
|
else: |
|
|
row['manual_verification_status'] = "Need Manual" |
|
|
|
|
|
data.append(row) |
|
|
|
|
|
read_csv(Path("data/dataset.csv"), "auto") |
|
|
read_csv(Path("data/manual_dataset.csv"), "manual") |
|
|
data.sort(key=lambda x: x.get('collecttime', ''), reverse=True) |
|
|
return data |
|
|
|
|
|
@app.get("/manage/comparison_data") |
|
|
async def get_comparison_data(): |
|
|
""" |
|
|
Returns an aggregated dataset joining AI Labels vs Manual Labels for visualization. |
|
|
""" |
|
|
ai_data = {} |
|
|
|
|
|
for row in robust_read_csv(Path("data/dataset.csv")): |
|
|
|
|
|
key = row.get("id") |
|
|
if not key: key = normalize_link(row.get("link")) |
|
|
ai_data[key] = row |
|
|
|
|
|
comparisons = [] |
|
|
|
|
|
|
|
|
for manual in robust_read_csv(Path("data/manual_dataset.csv")): |
|
|
key = manual.get("id") |
|
|
if not key: key = normalize_link(manual.get("link")) |
|
|
|
|
|
if key in ai_data: |
|
|
ai = ai_data[key] |
|
|
|
|
|
def get_score(d, k): |
|
|
try: |
|
|
val = str(d.get(k, 0)) |
|
|
val = re.sub(r'[^\d]', '', val) |
|
|
return int(val) if val else 0 |
|
|
except: return 0 |
|
|
|
|
|
item = { |
|
|
"id": key, |
|
|
"link": manual.get("link"), |
|
|
"scores": { |
|
|
"visual": {"ai": get_score(ai, "visual_integrity_score"), "manual": get_score(manual, "visual_integrity_score")}, |
|
|
"audio": {"ai": get_score(ai, "audio_integrity_score"), "manual": get_score(manual, "audio_integrity_score")}, |
|
|
"final": {"ai": get_score(ai, "final_veracity_score"), "manual": get_score(manual, "final_veracity_score")} |
|
|
} |
|
|
} |
|
|
|
|
|
item["deltas"] = { |
|
|
"visual": item["scores"]["visual"]["ai"] - item["scores"]["visual"]["manual"], |
|
|
"audio": item["scores"]["audio"]["ai"] - item["scores"]["audio"]["manual"], |
|
|
"final": item["scores"]["final"]["ai"] - item["scores"]["final"]["manual"] |
|
|
} |
|
|
comparisons.append(item) |
|
|
|
|
|
return comparisons |
|
|
|
|
|
@app.delete("/manage/delete") |
|
|
async def delete_data(id: str = "", link: str = ""): |
|
|
if not id and not link: raise HTTPException(status_code=400, detail="Must provide ID or Link") |
|
|
deleted_count = 0 |
|
|
target_id = id |
|
|
|
|
|
def remove_from_csv(path): |
|
|
nonlocal deleted_count, target_id |
|
|
if not path.exists(): return |
|
|
|
|
|
|
|
|
rows = list(robust_read_csv(path)) |
|
|
|
|
|
|
|
|
fieldnames = [] |
|
|
with open(path, 'r', encoding='utf-8', errors='replace') as f: |
|
|
reader = csv.DictReader(f) |
|
|
fieldnames = reader.fieldnames |
|
|
|
|
|
new_rows = [] |
|
|
found_in_file = False |
|
|
for row in rows: |
|
|
is_match = False |
|
|
if id and row.get('id') == id: is_match = True |
|
|
elif link and normalize_link(row.get('link', '')) == normalize_link(link): is_match = True |
|
|
|
|
|
if is_match: |
|
|
found_in_file = True |
|
|
deleted_count += 1 |
|
|
if not target_id: target_id = row.get('id') |
|
|
else: |
|
|
new_rows.append(row) |
|
|
|
|
|
if found_in_file and fieldnames: |
|
|
with open(path, 'w', newline='', encoding='utf-8') as f: |
|
|
writer = csv.DictWriter(f, fieldnames=fieldnames) |
|
|
writer.writeheader() |
|
|
writer.writerows(new_rows) |
|
|
|
|
|
remove_from_csv(Path("data/dataset.csv")) |
|
|
remove_from_csv(Path("data/manual_dataset.csv")) |
|
|
if target_id: |
|
|
for p in Path("data/labels").glob(f"{target_id}_*"): p.unlink(missing_ok=True) |
|
|
for p in Path("metadata").glob(f"{target_id}_*"): p.unlink(missing_ok=True) |
|
|
return {"status": "deleted", "count": deleted_count} |
|
|
|
|
|
@app.post("/label_video") |
|
|
async def label_video_endpoint( |
|
|
video_url: str = Form(...), model_selection: str = Form(...), |
|
|
gemini_api_key: str = Form(""), gemini_model_name: str = Form(""), |
|
|
vertex_project_id: str = Form(""), vertex_location: str = Form(""), vertex_model_name: str = Form(""), vertex_api_key: str = Form(""), |
|
|
include_comments: bool = Form(False), |
|
|
reasoning_method: str = Form("cot") |
|
|
): |
|
|
gemini_config = {"api_key": gemini_api_key, "model_name": gemini_model_name} |
|
|
vertex_config = {"project_id": vertex_project_id, "location": vertex_location, "model_name": vertex_model_name, "api_key": vertex_api_key} |
|
|
async def stream(): |
|
|
async for msg in get_labels_for_link(video_url, gemini_config, vertex_config, model_selection, include_comments, reasoning_method): |
|
|
if isinstance(msg, str): yield f"data: {msg}\n\n" |
|
|
if isinstance(msg, dict) and "csv_row" in msg: yield "data: Done. Labels generated.\n\n" |
|
|
yield "event: close\ndata: Done.\n\n" |
|
|
return StreamingResponse(stream(), media_type="text/event-stream") |