import re import ast import sys import os import logging import asyncio import json import datetime import requests try: import torch except ImportError: torch = None try: from transformers import Qwen3VLForConditionalGeneration, AutoProcessor from peft import PeftModel except ImportError: Qwen3VLForConditionalGeneration = None AutoProcessor = None PeftModel = None try: from my_vision_process import process_vision_info, client except ImportError: process_vision_info = None client = None from labeling_logic import ( LABELING_PROMPT_TEMPLATE, LABELING_PROMPT_TEMPLATE_NO_COT, SCORE_INSTRUCTIONS_SIMPLE, SCORE_INSTRUCTIONS_REASONING, SCHEMA_SIMPLE, SCHEMA_REASONING, FCOT_MACRO_PROMPT, FCOT_MESO_PROMPT, FCOT_SYNTHESIS_PROMPT, TEXT_ONLY_INSTRUCTIONS, get_formatted_tag_list ) from toon_parser import parse_veracity_toon # Google GenAI Imports try: import google.generativeai as genai_legacy from google.generativeai.types import generation_types except ImportError: genai_legacy = None try: # Modern Google GenAI SDK (v1) from google import genai from google.genai.types import ( GenerateContentConfig, HttpOptions, Retrieval, Tool, VertexAISearch, GoogleSearch, Part ) import vertexai except ImportError: genai = None vertexai = None LITE_MODE = os.getenv("LITE_MODE", "true").lower() == "true" processor = None base_model = None peft_model = None active_model = None logger = logging.getLogger(__name__) def load_models(): global LITE_MODE, processor, base_model, peft_model, active_model if LITE_MODE: logger.info("LITE_MODE is enabled. Skipping local model loading.") return if base_model is not None: return if torch is None or not torch.cuda.is_available(): logger.warning("CUDA is not available or torch is missing. This application requires a GPU for local models. Switching to LITE_MODE.") LITE_MODE = True return device = torch.device("cuda") logger.info(f"CUDA is available. Initializing models on {device}...") local_model_path = "/app/local_model" try: import flash_attn attn_implementation = "flash_attention_2" except ImportError: attn_implementation = "sdpa" logger.info(f"Loading base model from {local_model_path}...") try: base_model = Qwen3VLForConditionalGeneration.from_pretrained( local_model_path, dtype=torch.bfloat16, device_map="auto", attn_implementation=attn_implementation ).eval() processor = AutoProcessor.from_pretrained(local_model_path) active_model = base_model except Exception as e: logger.error(f"Failed to load local model: {e}") LITE_MODE = True def switch_active_model(model_name: str): global active_model, base_model, peft_model if model_name == "custom" and peft_model is not None: active_model = peft_model else: active_model = base_model def inference_step(video_path, prompt, generation_kwargs, sampling_fps, pred_glue=None): global processor, active_model if active_model is None or torch is None: raise RuntimeError("Models not loaded.") messages =[ {"role": "user", "content":[ {"type": "video", "video": video_path, 'key_time': pred_glue, 'fps': sampling_fps, "total_pixels": 128*12 * 28 * 28, "min_pixels": 128 * 28 * 28}, {"type": "text", "text": prompt}, ] }, ] text = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) image_inputs, video_inputs, video_kwargs = process_vision_info(messages, return_video_kwargs=True, client=client) fps_inputs = video_kwargs['fps'][0] inputs = processor(text=[text], images=image_inputs, videos=video_inputs, fps=fps_inputs, padding=True, return_tensors="pt") inputs = {k: v.to(active_model.device) for k, v in inputs.items()} with torch.no_grad(): output_ids = active_model.generate(**inputs, **generation_kwargs, use_cache=True) generated_ids = [output_ids[i][len(inputs['input_ids'][i]):] for i in range(len(output_ids))] output_text = processor.batch_decode(generated_ids, skip_special_tokens=True) return output_text[0] async def generate_simple_text(prompt: str, model_type: str, config: dict): loop = asyncio.get_event_loop() try: if model_type == 'gemini': if genai_legacy is None: return "Error: Legacy SDK missing." genai_legacy.configure(api_key=config.get("api_key")) model_name = config.get("model_name", "gemini-1.5-pro") if not model_name: model_name = "gemini-1.5-pro" model = genai_legacy.GenerativeModel(model_name) response = await loop.run_in_executor( None, lambda: model.generate_content(prompt, generation_config={"temperature": 0.0}) ) return response.text elif model_type == 'vertex': if genai is None: return "Error: Vertex SDK missing." api_key = config.get("api_key") if api_key: cl = genai.Client(vertexai=True, project=config['project_id'], location=config['location'], api_key=api_key) else: cl = genai.Client(vertexai=True, project=config['project_id'], location=config['location']) response = await loop.run_in_executor( None, lambda: cl.models.generate_content( model=config.get('model_name', 'gemini-1.5-pro'), contents=prompt, config=GenerateContentConfig(temperature=0.0) ) ) return response.text elif model_type == 'nrp': api_key = config.get("api_key") model_name = config.get("model_name", "gpt-4") base_url = config.get("base_url", "https://api.openai.com/v1").rstrip("/") if not api_key: return "Error: NRP API key missing." headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"} payload = {"model": model_name, "messages":[{"role": "user", "content": prompt}], "temperature": 0.0} def do_request(): resp = requests.post(f"{base_url}/chat/completions", headers=headers, json=payload, timeout=600) if resp.status_code == 200: return resp.json()["choices"][0]["message"]["content"] return f"Error: {resp.status_code} {resp.text}" return await loop.run_in_executor(None, do_request) except Exception as e: logger.error(f"Text Gen Error: {e}") return f"Error generating text: {e}" async def generate_community_summary(comments: list, model_type: str, config: dict): if not comments: return "No comments available." c_text = "\n".join([f"- {c.get('author', 'User')}: {c.get('text', '')}" for c in comments[:15]]) prompt = ( "You are a Community Context Analyst. Analyze the following user comments regarding a social media post.\n" "Your goal is to extract 'Community Notes' - specifically looking for fact-checking, debunking, or additional context provided by users.\n" f"COMMENTS:\n{c_text}\n\n" "OUTPUT:\n" "Provide a concise 1-paragraph summary of the community consensus regarding the veracity of the post." ) return await generate_simple_text(prompt, model_type, config) def extract_json_from_text(text): try: match = re.search(r'\{[\s\S]*\}', text) if match: return json.loads(match.group(0)) except: pass return {} def validate_parsed_data(data, is_text_only): missing =[] if not data.get('video_context_summary'): missing.append("summary") final = data.get('final_assessment', {}) if not final.get('reasoning') or len(str(final.get('reasoning', ''))) < 5: missing.append("final:reasoning") vectors = data.get('veracity_vectors', {}) required_vectors =['visual_integrity_score', 'audio_integrity_score', 'source_credibility_score', 'logical_consistency_score', 'emotional_manipulation_score'] for k in required_vectors: if k in['visual_integrity_score', 'audio_integrity_score'] and is_text_only: continue v = vectors.get(k) if not v or str(v) == '0' or str(v).lower() == 'n/a': missing.append(f"vector:{k}") mod = data.get('modalities', {}) for k in['video_audio_score', 'video_caption_score', 'audio_caption_score']: if k in['video_audio_score', 'video_caption_score'] and is_text_only: continue v = mod.get(k) if not v or str(v) == '0' or str(v).lower() == 'n/a': missing.append(f"modality:{k}") fact = data.get('factuality_factors', {}) if not fact.get('claim_accuracy'): missing.append("factuality:claim_accuracy") disinfo = data.get('disinformation_analysis', {}) if not disinfo.get('classification'): missing.append("disinfo:classification") return missing def smart_merge(base, new_data): if not isinstance(new_data, dict): return new_data if new_data else base if not isinstance(base, dict): return new_data for k, v in new_data.items(): if k not in base: base[k] = v else: if isinstance(base[k], dict) and isinstance(v, dict): smart_merge(base[k], v) else: base_val = base[k] new_val = v is_base_valid = base_val and str(base_val) != "0" and str(base_val).lower() != "n/a" is_new_valid = new_val and str(new_val) != "0" and str(new_val).lower() != "n/a" if not is_base_valid and is_new_valid: base[k] = new_val return base def save_debug_log(request_id, kind, content, attempt, label=""): if not request_id: return try: dir_map = {'prompt': 'data/prompts', 'response': 'data/responses'} directory = dir_map.get(kind, 'data') ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") safe_label = f"_{label}" if label else "" filename = f"{directory}/{request_id}_{ts}_att{attempt}{safe_label}.txt" with open(filename, "w", encoding="utf-8") as f: f.write(str(content)) except Exception as e: logger.error(f"Failed to save debug log: {e}") async def run_gemini_labeling_pipeline(video_path: str, caption: str, transcript: str, gemini_config: dict, include_comments: bool, reasoning_method: str = "cot", system_persona: str = "", request_id: str = None): if genai_legacy is None: yield "ERROR: Legacy SDK missing.\n" return api_key = gemini_config.get("api_key") if not api_key: return max_retries = int(gemini_config.get("max_retries", 1)) try: genai_legacy.configure(api_key=api_key) loop = asyncio.get_event_loop() uploaded_file = None is_text_only = False if video_path and os.path.exists(video_path): yield f"data: - Uploading video to Gemini API...\n\n" uploaded_file = await loop.run_in_executor(None, lambda: genai_legacy.upload_file(path=video_path)) # Continuously poll the API for the updated state while True: curr_file = await loop.run_in_executor(None, lambda: genai_legacy.get_file(uploaded_file.name)) if curr_file.state.name == "PROCESSING": yield f"data: - Waiting for Gemini to process the video...\n\n" await asyncio.sleep(3) elif curr_file.state.name == "FAILED": yield f"data: - Gemini Video Processing FAILED.\n\n" break else: break else: is_text_only = True active_tools =[] if gemini_config.get("use_search", False): active_tools.append({"google_search_retrieval": {}}) system_persona += "\n\n**CRITICAL: AGENTIC TOOLS ENABLED**\n- You MUST use the Web Search tool to fact-check the claims, look up current events, or verify entity backgrounds before concluding." if gemini_config.get("use_code", False): active_tools.append({"code_execution": {}}) system_persona += "\n- You MUST use the Code Execution tool for any necessary calculations, data processing, or statistical verifications." model_name = gemini_config.get("model_name", "gemini-1.5-pro") if not model_name: model_name = "gemini-1.5-pro" model = genai_legacy.GenerativeModel(model_name, tools=active_tools if active_tools else None) toon_schema = SCHEMA_REASONING if include_comments else SCHEMA_SIMPLE score_instructions = SCORE_INSTRUCTIONS_REASONING if include_comments else SCORE_INSTRUCTIONS_SIMPLE tag_list_text = get_formatted_tag_list() accumulated_data = {} prompt_used = "" fcot_trace = {} full_raw_text = "" if is_text_only: system_persona += "\n" + TEXT_ONLY_INSTRUCTIONS for attempt in range(max_retries + 1): raw_text = "" if attempt > 0: missing = validate_parsed_data(accumulated_data, is_text_only) yield f"Validation failed. Missing or incomplete fields: {missing}. Initiating Iterative Reprompt (Attempt {attempt}/{max_retries}) to acquire remaining factuality components...\n" prompt_text = ( f"SYSTEM: Review the previous attempt which failed validation.\n" f"CONTEXT: Caption: \"{caption}\"\nTranscript: \"{transcript}\"\n" f"PREVIOUS (PARTIAL) DATA: {json.dumps(accumulated_data, indent=2)}\n" f"MISSING FIELDS: {missing}\n" f"INSTRUCTION: Analyze the provided Video and Context again. " f"Generate the missing fields to complete the schema. You MUST provide the missing scores for {missing}.\n" f"Output the FULL VALID TOON OBJECT containing all required fields.\n" f"{toon_schema}" ) save_debug_log(request_id, 'prompt', prompt_text, attempt, 'reprompt') inputs =[prompt_text] if uploaded_file and uploaded_file.state.name != "FAILED": inputs.append(uploaded_file) response = await loop.run_in_executor(None, lambda: model.generate_content(inputs, generation_config={"temperature": 0.2})) raw_text = response.text save_debug_log(request_id, 'response', raw_text, attempt, 'reprompt') else: if reasoning_method == "fcot": yield "Starting Fractal Chain of Thought (Gemini FCoT)..." chat = model.start_chat(history=[]) macro_prompt = FCOT_MACRO_PROMPT.format(system_persona=system_persona, caption=caption, transcript=transcript) save_debug_log(request_id, 'prompt', macro_prompt, attempt, 'fcot_macro') inputs1 =[macro_prompt] if uploaded_file and uploaded_file.state.name != "FAILED": inputs1.insert(0, uploaded_file) res1 = await loop.run_in_executor(None, lambda: chat.send_message(inputs1)) macro_hypothesis = res1.text save_debug_log(request_id, 'response', macro_hypothesis, attempt, 'fcot_macro') fcot_trace['macro'] = macro_hypothesis meso_prompt = FCOT_MESO_PROMPT.format(macro_hypothesis=macro_hypothesis) save_debug_log(request_id, 'prompt', meso_prompt, attempt, 'fcot_meso') res2 = await loop.run_in_executor(None, lambda: chat.send_message(meso_prompt)) micro_observations = res2.text save_debug_log(request_id, 'response', micro_observations, attempt, 'fcot_meso') fcot_trace['meso'] = micro_observations synthesis_prompt = FCOT_SYNTHESIS_PROMPT.format(toon_schema=toon_schema, score_instructions=score_instructions, tag_list_text=tag_list_text) save_debug_log(request_id, 'prompt', synthesis_prompt, attempt, 'fcot_synthesis') res3 = await loop.run_in_executor(None, lambda: chat.send_message(synthesis_prompt)) raw_text = res3.text save_debug_log(request_id, 'response', raw_text, attempt, 'fcot_synthesis') prompt_used = f"FCoT Pipeline:\nMacro: {macro_hypothesis}\nMeso: {micro_observations}" else: template = LABELING_PROMPT_TEMPLATE_NO_COT if reasoning_method == "none" else LABELING_PROMPT_TEMPLATE prompt_text = template.format( system_persona=system_persona, caption=caption, transcript=transcript, toon_schema=toon_schema, score_instructions=score_instructions, tag_list_text=tag_list_text ) prompt_used = prompt_text if is_text_only: prompt_text = "NOTE: Text Analysis Only.\n" + prompt_text save_debug_log(request_id, 'prompt', prompt_text, attempt, f'standard_{reasoning_method}') inputs =[prompt_text] if uploaded_file and uploaded_file.state.name != "FAILED": inputs.append(uploaded_file) response = await loop.run_in_executor(None, lambda: model.generate_content(inputs, generation_config={"temperature": 0.1})) raw_text = response.text save_debug_log(request_id, 'response', raw_text, attempt, f'standard_{reasoning_method}') if raw_text: full_raw_text += f"\n--- Attempt {attempt} ---\n{raw_text}\n" parsed_step = parse_veracity_toon(raw_text) json_data = extract_json_from_text(raw_text) if json_data: for k in['veracity_vectors', 'modalities', 'video_context_summary', 'final_assessment', 'factuality_factors', 'disinformation_analysis', 'tags']: if k in json_data: if isinstance(parsed_step.get(k), dict) and isinstance(json_data[k], dict): parsed_step[k].update(json_data[k]) else: parsed_step[k] = json_data[k] accumulated_data = smart_merge(accumulated_data, parsed_step) missing_fields = validate_parsed_data(accumulated_data, is_text_only) if not missing_fields: yield f"Validation Passed. All factuality components processed and confidence scores obtained. (Method: {reasoning_method})\n" yield {"raw_toon": full_raw_text, "parsed_data": accumulated_data, "prompt_used": prompt_used, "fcot_trace": fcot_trace} break if attempt == max_retries: yield f"Max retries reached. Saving incomplete data.\n" yield {"raw_toon": full_raw_text, "parsed_data": accumulated_data, "prompt_used": prompt_used, "fcot_trace": fcot_trace} break if uploaded_file: await loop.run_in_executor(None, lambda: genai_legacy.delete_file(name=uploaded_file.name)) except Exception as e: yield f"ERROR: {e}" async def run_vertex_labeling_pipeline(video_path: str, caption: str, transcript: str, vertex_config: dict, include_comments: bool, reasoning_method: str = "cot", system_persona: str = "", request_id: str = None): if genai is None: yield "ERROR: 'google-genai' not installed.\n" return project_id = vertex_config.get("project_id") location = vertex_config.get("location", "us-central1") model_name = vertex_config.get("model_name", "gemini-1.5-pro") if not model_name: model_name = "gemini-1.5-pro" max_retries = int(vertex_config.get("max_retries", 1)) api_key = vertex_config.get("api_key") if not project_id: return try: # Pass api_key directly if available to use API Keys instead of ADC Service Accounts if api_key: client = genai.Client(vertexai=True, project=project_id, location=location, api_key=api_key) else: client = genai.Client(vertexai=True, project=project_id, location=location) video_part = None is_text_only = False if video_path and os.path.exists(video_path): with open(video_path, 'rb') as f: video_bytes = f.read() video_part = Part.from_bytes(data=video_bytes, mime_type="video/mp4") else: is_text_only = True active_tools =[] if vertex_config.get("use_search", False): active_tools.append(Tool(google_search=GoogleSearch())) system_persona += "\n\n**CRITICAL: AGENTIC TOOLS ENABLED**\n- You MUST use the Web Search tool to fact-check the claims, look up current events, or verify entity backgrounds before concluding." if vertex_config.get("use_code", False): try: from google.genai.types import CodeExecution active_tools.append(Tool(code_execution=CodeExecution())) system_persona += "\n- You MUST use the Code Execution tool for any necessary calculations, data processing, or statistical verifications." except ImportError: pass config = GenerateContentConfig( temperature=0.1, response_mime_type="text/plain", max_output_tokens=8192, tools=active_tools if active_tools else None ) toon_schema = SCHEMA_REASONING if include_comments else SCHEMA_SIMPLE score_instructions = SCORE_INSTRUCTIONS_REASONING if include_comments else SCORE_INSTRUCTIONS_SIMPLE tag_list_text = get_formatted_tag_list() accumulated_data = {} prompt_used = "" fcot_trace = {} full_raw_text = "" loop = asyncio.get_event_loop() if is_text_only: system_persona += "\n" + TEXT_ONLY_INSTRUCTIONS for attempt in range(max_retries + 1): raw_text = "" if attempt > 0: missing = validate_parsed_data(accumulated_data, is_text_only) yield f"Validation failed. Missing or incomplete fields: {missing}. Initiating Iterative Reprompt (Attempt {attempt}/{max_retries}) to acquire remaining factuality components...\n" # REPROMPT CONSTRUCTION prompt_text = ( f"SYSTEM: Review the previous attempt which failed validation.\n" f"CONTEXT: Caption: \"{caption}\"\nTranscript: \"{transcript}\"\n" f"PREVIOUS (PARTIAL) DATA: {json.dumps(accumulated_data, indent=2)}\n" f"MISSING FIELDS: {missing}\n" f"INSTRUCTION: Analyze the provided Video and Context again. " f"Generate the missing fields to complete the schema. You MUST provide the missing scores for {missing}.\n" f"Output the FULL VALID TOON OBJECT containing all required fields.\n" f"{toon_schema}" ) save_debug_log(request_id, 'prompt', prompt_text, attempt, 'reprompt') contents =[prompt_text] if video_part: contents.insert(0, video_part) response = await loop.run_in_executor(None, lambda: client.models.generate_content(model=model_name, contents=contents, config=config)) raw_text = response.text save_debug_log(request_id, 'response', raw_text, attempt, 'reprompt') else: if reasoning_method == "fcot": yield "Starting Fractal Chain of Thought (Vertex FCoT)..." chat = client.chats.create(model=model_name, config=config) macro_prompt = FCOT_MACRO_PROMPT.format(system_persona=system_persona, caption=caption, transcript=transcript) save_debug_log(request_id, 'prompt', macro_prompt, attempt, 'fcot_macro') inputs1 =[macro_prompt] if video_part: inputs1.insert(0, video_part) else: inputs1[0] = "NOTE: Text Only Analysis.\n" + inputs1[0] res1 = await loop.run_in_executor(None, lambda: chat.send_message(inputs1)) macro_hypothesis = res1.text save_debug_log(request_id, 'response', macro_hypothesis, attempt, 'fcot_macro') fcot_trace['macro'] = macro_hypothesis meso_prompt = FCOT_MESO_PROMPT.format(macro_hypothesis=macro_hypothesis) save_debug_log(request_id, 'prompt', meso_prompt, attempt, 'fcot_meso') res2 = await loop.run_in_executor(None, lambda: chat.send_message(meso_prompt)) micro_observations = res2.text save_debug_log(request_id, 'response', micro_observations, attempt, 'fcot_meso') fcot_trace['meso'] = micro_observations synthesis_prompt = FCOT_SYNTHESIS_PROMPT.format(toon_schema=toon_schema, score_instructions=score_instructions, tag_list_text=tag_list_text) save_debug_log(request_id, 'prompt', synthesis_prompt, attempt, 'fcot_synthesis') res3 = await loop.run_in_executor(None, lambda: chat.send_message(synthesis_prompt)) raw_text = res3.text save_debug_log(request_id, 'response', raw_text, attempt, 'fcot_synthesis') prompt_used = f"FCoT (Vertex):\nMacro: {macro_hypothesis}\nMeso: {micro_observations}" else: template = LABELING_PROMPT_TEMPLATE_NO_COT if reasoning_method == "none" else LABELING_PROMPT_TEMPLATE prompt_text = template.format( system_persona=system_persona, caption=caption, transcript=transcript, toon_schema=toon_schema, score_instructions=score_instructions, tag_list_text=tag_list_text ) contents =[] if video_part: contents =[video_part, prompt_text] else: contents =[f"NOTE: Text Only Analysis (No Video).\n{prompt_text}"] prompt_used = prompt_text save_debug_log(request_id, 'prompt', prompt_text, attempt, f'standard_{reasoning_method}') yield f"Generating Labels (Vertex {reasoning_method.upper()})..." response = await loop.run_in_executor(None, lambda: client.models.generate_content(model=model_name, contents=contents, config=config)) raw_text = response.text save_debug_log(request_id, 'response', raw_text, attempt, f'standard_{reasoning_method}') if raw_text: full_raw_text += f"\n--- Attempt {attempt} ---\n{raw_text}\n" parsed_step = parse_veracity_toon(raw_text) json_data = extract_json_from_text(raw_text) if json_data: for k in['veracity_vectors', 'modalities', 'video_context_summary', 'final_assessment', 'factuality_factors', 'disinformation_analysis', 'tags']: if k in json_data: if isinstance(parsed_step.get(k), dict) and isinstance(json_data[k], dict): parsed_step[k].update(json_data[k]) else: parsed_step[k] = json_data[k] accumulated_data = smart_merge(accumulated_data, parsed_step) missing_fields = validate_parsed_data(accumulated_data, is_text_only) if not missing_fields: yield f"Validation Passed. All factuality components processed and confidence scores obtained. (Method: {reasoning_method})\n" yield {"raw_toon": full_raw_text, "parsed_data": accumulated_data, "prompt_used": prompt_used, "fcot_trace": fcot_trace} break if attempt == max_retries: yield f"Max retries reached. Saving incomplete data.\n" yield {"raw_toon": full_raw_text, "parsed_data": accumulated_data, "prompt_used": prompt_used, "fcot_trace": fcot_trace} break except Exception as e: yield f"ERROR: {e}" logger.error("Vertex Labeling Error", exc_info=True) async def run_nrp_labeling_pipeline(video_path: str, caption: str, transcript: str, nrp_config: dict, include_comments: bool, reasoning_method: str = "cot", system_persona: str = "", request_id: str = None): api_key = nrp_config.get("api_key") model_name = nrp_config.get("model_name", "gpt-4") base_url = nrp_config.get("base_url", "https://api.openai.com/v1").rstrip("/") max_retries = int(nrp_config.get("max_retries", 1)) if not api_key: yield "ERROR: NRP API Key missing.\n" return headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" } is_text_only = True system_persona += "\n" + TEXT_ONLY_INSTRUCTIONS if nrp_config.get("use_search", False): system_persona += "\n\n**CRITICAL: AGENTIC TOOLS ENABLED**\n- You MUST use the Web Search tool to fact-check the claims, look up current events, or verify entity backgrounds before concluding." if nrp_config.get("use_code", False): system_persona += "\n- You MUST use the Code Execution tool for any necessary calculations, data processing, or statistical verifications." toon_schema = SCHEMA_REASONING if include_comments else SCHEMA_SIMPLE score_instructions = SCORE_INSTRUCTIONS_REASONING if include_comments else SCORE_INSTRUCTIONS_SIMPLE tag_list_text = get_formatted_tag_list() accumulated_data = {} prompt_used = "" fcot_trace = {} full_raw_text = "" loop = asyncio.get_event_loop() async def _call_nrp(messages, attempt_label=""): payload = { "model": model_name, "messages": messages, "temperature": 0.1 } logger.info(f"[{request_id}] NRP API Call ({attempt_label}) - URL: {base_url}/chat/completions") logger.info(f"[{request_id}] NRP API Call - Model: {model_name}") logger.info(f"[{request_id}] NRP API Call - Messages count: {len(messages)}") def do_request(): start_time = datetime.datetime.now() logger.info(f"[{request_id}] Dispatching requests.post (timeout=600s)...") resp = requests.post(f"{base_url}/chat/completions", headers=headers, json=payload, timeout=600) elapsed = (datetime.datetime.now() - start_time).total_seconds() logger.info(f"[{request_id}] NRP API Response received in {elapsed:.2f}s. Status Code: {resp.status_code}") if resp.status_code != 200: logger.error(f"[{request_id}] API Error {resp.status_code}: {resp.text}") raise Exception(f"API Error {resp.status_code}: {resp.text}") resp_json = resp.json() usage = resp_json.get("usage", {}) logger.info(f"[{request_id}] NRP API Usage: {usage}") return resp_json["choices"][0]["message"]["content"] return await loop.run_in_executor(None, do_request) try: for attempt in range(max_retries + 1): raw_text = "" if attempt > 0: missing = validate_parsed_data(accumulated_data, is_text_only) yield f"Validation failed. Missing fields: {missing}. Initiating Reprompt (Attempt {attempt}/{max_retries})...\n" prompt_text = ( f"SYSTEM: Review the previous attempt which failed validation.\n" f"CONTEXT: Caption: \"{caption}\"\nTranscript: \"{transcript}\"\n" f"PREVIOUS (PARTIAL) DATA: {json.dumps(accumulated_data, indent=2)}\n" f"MISSING FIELDS: {missing}\n" f"INSTRUCTION: Generate the missing fields to complete the schema. You MUST provide the missing scores for {missing}.\n" f"Output the FULL VALID TOON OBJECT containing all required fields.\n" f"{toon_schema}" ) save_debug_log(request_id, 'prompt', prompt_text, attempt, 'reprompt') yield f" - Sending Reprompt request to NRP API (Model: {model_name}, Timeout: 600s)...\n" raw_text = await _call_nrp([ {"role": "system", "content": system_persona}, {"role": "user", "content": prompt_text} ], attempt_label=f"reprompt_{attempt}") yield f" - Received Reprompt response from NRP API.\n\n" save_debug_log(request_id, 'response', raw_text, attempt, 'reprompt') else: if reasoning_method == "fcot": yield "Starting Fractal Chain of Thought (NRP FCoT)...\n" macro_prompt = FCOT_MACRO_PROMPT.format(system_persona=system_persona, caption=caption, transcript=transcript) macro_prompt = "NOTE: Text Only Analysis.\n" + macro_prompt save_debug_log(request_id, 'prompt', macro_prompt, attempt, 'fcot_macro') macro_messages =[{"role": "system", "content": system_persona}, {"role": "user", "content": macro_prompt}] yield f" - Stage 1: Sending Macro Hypothesis request to NRP API (Timeout: 600s)...\n" macro_hypothesis = await _call_nrp(macro_messages, attempt_label="fcot_macro") yield f" - Stage 1: Received Macro Hypothesis response.\n" save_debug_log(request_id, 'response', macro_hypothesis, attempt, 'fcot_macro') fcot_trace['macro'] = macro_hypothesis meso_prompt = FCOT_MESO_PROMPT.format(macro_hypothesis=macro_hypothesis) save_debug_log(request_id, 'prompt', meso_prompt, attempt, 'fcot_meso') meso_messages = macro_messages +[{"role": "assistant", "content": macro_hypothesis}, {"role": "user", "content": meso_prompt}] yield f" - Stage 2: Sending Meso Analysis request to NRP API (Timeout: 600s)...\n" micro_observations = await _call_nrp(meso_messages, attempt_label="fcot_meso") yield f" - Stage 2: Received Meso Analysis response.\n" save_debug_log(request_id, 'response', micro_observations, attempt, 'fcot_meso') fcot_trace['meso'] = micro_observations synthesis_prompt = FCOT_SYNTHESIS_PROMPT.format(toon_schema=toon_schema, score_instructions=score_instructions, tag_list_text=tag_list_text) save_debug_log(request_id, 'prompt', synthesis_prompt, attempt, 'fcot_synthesis') synthesis_messages = meso_messages +[{"role": "assistant", "content": micro_observations}, {"role": "user", "content": synthesis_prompt}] yield f" - Stage 3: Sending Synthesis/Formatting request to NRP API (Timeout: 600s)...\n" raw_text = await _call_nrp(synthesis_messages, attempt_label="fcot_synthesis") yield f" - Stage 3: Received Synthesis response.\n\n" save_debug_log(request_id, 'response', raw_text, attempt, 'fcot_synthesis') prompt_used = f"FCoT (NRP):\nMacro: {macro_hypothesis}\nMeso: {micro_observations}" else: template = LABELING_PROMPT_TEMPLATE_NO_COT if reasoning_method == "none" else LABELING_PROMPT_TEMPLATE prompt_text = template.format( system_persona=system_persona, caption=caption, transcript=transcript, toon_schema=toon_schema, score_instructions=score_instructions, tag_list_text=tag_list_text ) prompt_text = f"NOTE: Text Only Analysis (No Video).\n{prompt_text}" prompt_used = prompt_text save_debug_log(request_id, 'prompt', prompt_text, attempt, f'standard_{reasoning_method}') yield f"Generating Labels (NRP {reasoning_method.upper()})...\n" yield f" - Sending Standard request to NRP API (Model: {model_name}, Timeout: 600s)...\n" raw_text = await _call_nrp([ {"role": "system", "content": system_persona}, {"role": "user", "content": prompt_text} ], attempt_label=f"standard_{reasoning_method}") yield f" - Received response from NRP API.\n\n" save_debug_log(request_id, 'response', raw_text, attempt, f'standard_{reasoning_method}') if raw_text: full_raw_text += f"\n--- Attempt {attempt} ---\n{raw_text}\n" parsed_step = parse_veracity_toon(raw_text) json_data = extract_json_from_text(raw_text) if json_data: for k in['veracity_vectors', 'modalities', 'video_context_summary', 'final_assessment', 'factuality_factors', 'disinformation_analysis', 'tags']: if k in json_data: if isinstance(parsed_step.get(k), dict) and isinstance(json_data[k], dict): parsed_step[k].update(json_data[k]) else: parsed_step[k] = json_data[k] accumulated_data = smart_merge(accumulated_data, parsed_step) missing_fields = validate_parsed_data(accumulated_data, is_text_only) if not missing_fields: yield f"Validation Passed. All factuality components processed and confidence scores obtained. (Method: {reasoning_method})\n" yield {"raw_toon": full_raw_text, "parsed_data": accumulated_data, "prompt_used": prompt_used, "fcot_trace": fcot_trace} break if attempt == max_retries: yield f"Max retries reached. Saving incomplete data.\n" yield {"raw_toon": full_raw_text, "parsed_data": accumulated_data, "prompt_used": prompt_used, "fcot_trace": fcot_trace} break except Exception as e: yield f"ERROR: {e}" logger.error("NRP Labeling Error", exc_info=True)