| import re |
| import ast |
| import sys |
| import os |
| import logging |
| import asyncio |
| import json |
| import datetime |
| import requests |
|
|
| try: |
| import torch |
| except ImportError: |
| torch = None |
|
|
| try: |
| from transformers import Qwen3VLForConditionalGeneration, AutoProcessor |
| from peft import PeftModel |
| except ImportError: |
| Qwen3VLForConditionalGeneration = None |
| AutoProcessor = None |
| PeftModel = None |
|
|
| try: |
| from my_vision_process import process_vision_info, client |
| except ImportError: |
| process_vision_info = None |
| client = None |
|
|
| from labeling_logic import ( |
| LABELING_PROMPT_TEMPLATE, LABELING_PROMPT_TEMPLATE_NO_COT, |
| SCORE_INSTRUCTIONS_SIMPLE, SCORE_INSTRUCTIONS_REASONING, |
| SCHEMA_SIMPLE, SCHEMA_REASONING, |
| FCOT_MACRO_PROMPT, FCOT_MESO_PROMPT, FCOT_SYNTHESIS_PROMPT, TEXT_ONLY_INSTRUCTIONS, |
| get_formatted_tag_list |
| ) |
| from toon_parser import parse_veracity_toon |
|
|
| |
| try: |
| import google.generativeai as genai_legacy |
| from google.generativeai.types import generation_types |
| except ImportError: |
| genai_legacy = None |
|
|
| try: |
| |
| from google import genai |
| from google.genai.types import ( |
| GenerateContentConfig, |
| HttpOptions, |
| Retrieval, |
| Tool, |
| VertexAISearch, |
| GoogleSearch, |
| Part |
| ) |
| import vertexai |
| except ImportError: |
| genai = None |
| vertexai = None |
|
|
| LITE_MODE = os.getenv("LITE_MODE", "true").lower() == "true" |
| processor = None |
| base_model = None |
| peft_model = None |
| active_model = None |
| logger = logging.getLogger(__name__) |
|
|
| def load_models(): |
| global LITE_MODE, processor, base_model, peft_model, active_model |
| |
| if LITE_MODE: |
| logger.info("LITE_MODE is enabled. Skipping local model loading.") |
| return |
| |
| if base_model is not None: return |
| |
| if torch is None or not torch.cuda.is_available(): |
| logger.warning("CUDA is not available or torch is missing. This application requires a GPU for local models. Switching to LITE_MODE.") |
| LITE_MODE = True |
| return |
| |
| device = torch.device("cuda") |
| logger.info(f"CUDA is available. Initializing models on {device}...") |
| local_model_path = "/app/local_model" |
| |
| try: |
| import flash_attn |
| attn_implementation = "flash_attention_2" |
| except ImportError: |
| attn_implementation = "sdpa" |
|
|
| logger.info(f"Loading base model from {local_model_path}...") |
| try: |
| base_model = Qwen3VLForConditionalGeneration.from_pretrained( |
| local_model_path, dtype=torch.bfloat16, device_map="auto", attn_implementation=attn_implementation |
| ).eval() |
| processor = AutoProcessor.from_pretrained(local_model_path) |
| active_model = base_model |
| except Exception as e: |
| logger.error(f"Failed to load local model: {e}") |
| LITE_MODE = True |
|
|
| def switch_active_model(model_name: str): |
| global active_model, base_model, peft_model |
| if model_name == "custom" and peft_model is not None: |
| active_model = peft_model |
| else: |
| active_model = base_model |
|
|
| def inference_step(video_path, prompt, generation_kwargs, sampling_fps, pred_glue=None): |
| global processor, active_model |
| if active_model is None or torch is None: raise RuntimeError("Models not loaded.") |
|
|
| messages =[ |
| {"role": "user", "content":[ |
| {"type": "video", "video": video_path, 'key_time': pred_glue, 'fps': sampling_fps, |
| "total_pixels": 128*12 * 28 * 28, "min_pixels": 128 * 28 * 28}, |
| {"type": "text", "text": prompt}, |
| ] |
| }, |
| ] |
| text = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) |
| image_inputs, video_inputs, video_kwargs = process_vision_info(messages, return_video_kwargs=True, client=client) |
| fps_inputs = video_kwargs['fps'][0] |
| inputs = processor(text=[text], images=image_inputs, videos=video_inputs, fps=fps_inputs, padding=True, return_tensors="pt") |
| inputs = {k: v.to(active_model.device) for k, v in inputs.items()} |
|
|
| with torch.no_grad(): |
| output_ids = active_model.generate(**inputs, **generation_kwargs, use_cache=True) |
| |
| generated_ids = [output_ids[i][len(inputs['input_ids'][i]):] for i in range(len(output_ids))] |
| output_text = processor.batch_decode(generated_ids, skip_special_tokens=True) |
| return output_text[0] |
|
|
| async def generate_simple_text(prompt: str, model_type: str, config: dict): |
| loop = asyncio.get_event_loop() |
| try: |
| if model_type == 'gemini': |
| if genai_legacy is None: return "Error: Legacy SDK missing." |
| genai_legacy.configure(api_key=config.get("api_key")) |
| model_name = config.get("model_name", "gemini-1.5-pro") |
| if not model_name: model_name = "gemini-1.5-pro" |
| model = genai_legacy.GenerativeModel(model_name) |
| response = await loop.run_in_executor( |
| None, |
| lambda: model.generate_content(prompt, generation_config={"temperature": 0.0}) |
| ) |
| return response.text |
| |
| elif model_type == 'vertex': |
| if genai is None: return "Error: Vertex SDK missing." |
| api_key = config.get("api_key") |
| if api_key: |
| cl = genai.Client(vertexai=True, project=config['project_id'], location=config['location'], api_key=api_key) |
| else: |
| cl = genai.Client(vertexai=True, project=config['project_id'], location=config['location']) |
| response = await loop.run_in_executor( |
| None, |
| lambda: cl.models.generate_content( |
| model=config.get('model_name', 'gemini-1.5-pro'), |
| contents=prompt, |
| config=GenerateContentConfig(temperature=0.0) |
| ) |
| ) |
| return response.text |
|
|
| elif model_type == 'nrp': |
| api_key = config.get("api_key") |
| model_name = config.get("model_name", "gpt-4") |
| base_url = config.get("base_url", "https://api.openai.com/v1").rstrip("/") |
| if not api_key: return "Error: NRP API key missing." |
| headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"} |
| payload = {"model": model_name, "messages":[{"role": "user", "content": prompt}], "temperature": 0.0} |
| def do_request(): |
| resp = requests.post(f"{base_url}/chat/completions", headers=headers, json=payload, timeout=600) |
| if resp.status_code == 200: |
| return resp.json()["choices"][0]["message"]["content"] |
| return f"Error: {resp.status_code} {resp.text}" |
| return await loop.run_in_executor(None, do_request) |
|
|
| except Exception as e: |
| logger.error(f"Text Gen Error: {e}") |
| return f"Error generating text: {e}" |
|
|
| async def generate_community_summary(comments: list, model_type: str, config: dict): |
| if not comments: return "No comments available." |
| c_text = "\n".join([f"- {c.get('author', 'User')}: {c.get('text', '')}" for c in comments[:15]]) |
| prompt = ( |
| "You are a Community Context Analyst. Analyze the following user comments regarding a social media post.\n" |
| "Your goal is to extract 'Community Notes' - specifically looking for fact-checking, debunking, or additional context provided by users.\n" |
| f"COMMENTS:\n{c_text}\n\n" |
| "OUTPUT:\n" |
| "Provide a concise 1-paragraph summary of the community consensus regarding the veracity of the post." |
| ) |
| return await generate_simple_text(prompt, model_type, config) |
|
|
| def extract_json_from_text(text): |
| try: |
| match = re.search(r'\{[\s\S]*\}', text) |
| if match: |
| return json.loads(match.group(0)) |
| except: |
| pass |
| return {} |
|
|
| def validate_parsed_data(data, is_text_only): |
| missing =[] |
| |
| if not data.get('video_context_summary'): missing.append("summary") |
| |
| final = data.get('final_assessment', {}) |
| if not final.get('reasoning') or len(str(final.get('reasoning', ''))) < 5: missing.append("final:reasoning") |
| |
| vectors = data.get('veracity_vectors', {}) |
| required_vectors =['visual_integrity_score', 'audio_integrity_score', 'source_credibility_score', 'logical_consistency_score', 'emotional_manipulation_score'] |
| for k in required_vectors: |
| if k in['visual_integrity_score', 'audio_integrity_score'] and is_text_only: continue |
| v = vectors.get(k) |
| if not v or str(v) == '0' or str(v).lower() == 'n/a': missing.append(f"vector:{k}") |
|
|
| mod = data.get('modalities', {}) |
| for k in['video_audio_score', 'video_caption_score', 'audio_caption_score']: |
| if k in['video_audio_score', 'video_caption_score'] and is_text_only: continue |
| v = mod.get(k) |
| if not v or str(v) == '0' or str(v).lower() == 'n/a': missing.append(f"modality:{k}") |
|
|
| fact = data.get('factuality_factors', {}) |
| if not fact.get('claim_accuracy'): missing.append("factuality:claim_accuracy") |
|
|
| disinfo = data.get('disinformation_analysis', {}) |
| if not disinfo.get('classification'): missing.append("disinfo:classification") |
|
|
| return missing |
|
|
| def smart_merge(base, new_data): |
| if not isinstance(new_data, dict): return new_data if new_data else base |
| if not isinstance(base, dict): return new_data |
| for k, v in new_data.items(): |
| if k not in base: base[k] = v |
| else: |
| if isinstance(base[k], dict) and isinstance(v, dict): smart_merge(base[k], v) |
| else: |
| base_val = base[k] |
| new_val = v |
| is_base_valid = base_val and str(base_val) != "0" and str(base_val).lower() != "n/a" |
| is_new_valid = new_val and str(new_val) != "0" and str(new_val).lower() != "n/a" |
| if not is_base_valid and is_new_valid: base[k] = new_val |
| return base |
|
|
| def save_debug_log(request_id, kind, content, attempt, label=""): |
| if not request_id: return |
| try: |
| dir_map = {'prompt': 'data/prompts', 'response': 'data/responses'} |
| directory = dir_map.get(kind, 'data') |
| ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") |
| safe_label = f"_{label}" if label else "" |
| filename = f"{directory}/{request_id}_{ts}_att{attempt}{safe_label}.txt" |
| with open(filename, "w", encoding="utf-8") as f: |
| f.write(str(content)) |
| except Exception as e: |
| logger.error(f"Failed to save debug log: {e}") |
|
|
| async def run_gemini_labeling_pipeline(video_path: str, caption: str, transcript: str, gemini_config: dict, include_comments: bool, reasoning_method: str = "cot", system_persona: str = "", request_id: str = None): |
| if genai_legacy is None: |
| yield "ERROR: Legacy SDK missing.\n" |
| return |
| api_key = gemini_config.get("api_key") |
| if not api_key: return |
| max_retries = int(gemini_config.get("max_retries", 1)) |
| |
| try: |
| genai_legacy.configure(api_key=api_key) |
| loop = asyncio.get_event_loop() |
| uploaded_file = None |
| is_text_only = False |
| if video_path and os.path.exists(video_path): |
| yield f"data: - Uploading video to Gemini API...\n\n" |
| uploaded_file = await loop.run_in_executor(None, lambda: genai_legacy.upload_file(path=video_path)) |
| |
| |
| while True: |
| curr_file = await loop.run_in_executor(None, lambda: genai_legacy.get_file(uploaded_file.name)) |
| if curr_file.state.name == "PROCESSING": |
| yield f"data: - Waiting for Gemini to process the video...\n\n" |
| await asyncio.sleep(3) |
| elif curr_file.state.name == "FAILED": |
| yield f"data: - Gemini Video Processing FAILED.\n\n" |
| break |
| else: |
| break |
| else: is_text_only = True |
| |
| active_tools =[] |
| if gemini_config.get("use_search", False): |
| active_tools.append({"google_search_retrieval": {}}) |
| system_persona += "\n\n**CRITICAL: AGENTIC TOOLS ENABLED**\n- You MUST use the Web Search tool to fact-check the claims, look up current events, or verify entity backgrounds before concluding." |
| if gemini_config.get("use_code", False): |
| active_tools.append({"code_execution": {}}) |
| system_persona += "\n- You MUST use the Code Execution tool for any necessary calculations, data processing, or statistical verifications." |
|
|
| model_name = gemini_config.get("model_name", "gemini-1.5-pro") |
| if not model_name: model_name = "gemini-1.5-pro" |
| model = genai_legacy.GenerativeModel(model_name, tools=active_tools if active_tools else None) |
| |
| toon_schema = SCHEMA_REASONING if include_comments else SCHEMA_SIMPLE |
| score_instructions = SCORE_INSTRUCTIONS_REASONING if include_comments else SCORE_INSTRUCTIONS_SIMPLE |
| tag_list_text = get_formatted_tag_list() |
| |
| accumulated_data = {} |
| prompt_used = "" |
| fcot_trace = {} |
| full_raw_text = "" |
| if is_text_only: system_persona += "\n" + TEXT_ONLY_INSTRUCTIONS |
|
|
| for attempt in range(max_retries + 1): |
| raw_text = "" |
| if attempt > 0: |
| missing = validate_parsed_data(accumulated_data, is_text_only) |
| yield f"Validation failed. Missing or incomplete fields: {missing}. Initiating Iterative Reprompt (Attempt {attempt}/{max_retries}) to acquire remaining factuality components...\n" |
| prompt_text = ( |
| f"SYSTEM: Review the previous attempt which failed validation.\n" |
| f"CONTEXT: Caption: \"{caption}\"\nTranscript: \"{transcript}\"\n" |
| f"PREVIOUS (PARTIAL) DATA: {json.dumps(accumulated_data, indent=2)}\n" |
| f"MISSING FIELDS: {missing}\n" |
| f"INSTRUCTION: Analyze the provided Video and Context again. " |
| f"Generate the missing fields to complete the schema. You MUST provide the missing scores for {missing}.\n" |
| f"Output the FULL VALID TOON OBJECT containing all required fields.\n" |
| f"{toon_schema}" |
| ) |
| save_debug_log(request_id, 'prompt', prompt_text, attempt, 'reprompt') |
| inputs =[prompt_text] |
| if uploaded_file and uploaded_file.state.name != "FAILED": inputs.append(uploaded_file) |
| response = await loop.run_in_executor(None, lambda: model.generate_content(inputs, generation_config={"temperature": 0.2})) |
| raw_text = response.text |
| save_debug_log(request_id, 'response', raw_text, attempt, 'reprompt') |
| else: |
| if reasoning_method == "fcot": |
| yield "Starting Fractal Chain of Thought (Gemini FCoT)..." |
| chat = model.start_chat(history=[]) |
| |
| macro_prompt = FCOT_MACRO_PROMPT.format(system_persona=system_persona, caption=caption, transcript=transcript) |
| save_debug_log(request_id, 'prompt', macro_prompt, attempt, 'fcot_macro') |
| inputs1 =[macro_prompt] |
| if uploaded_file and uploaded_file.state.name != "FAILED": inputs1.insert(0, uploaded_file) |
| res1 = await loop.run_in_executor(None, lambda: chat.send_message(inputs1)) |
| macro_hypothesis = res1.text |
| save_debug_log(request_id, 'response', macro_hypothesis, attempt, 'fcot_macro') |
| fcot_trace['macro'] = macro_hypothesis |
|
|
| meso_prompt = FCOT_MESO_PROMPT.format(macro_hypothesis=macro_hypothesis) |
| save_debug_log(request_id, 'prompt', meso_prompt, attempt, 'fcot_meso') |
| res2 = await loop.run_in_executor(None, lambda: chat.send_message(meso_prompt)) |
| micro_observations = res2.text |
| save_debug_log(request_id, 'response', micro_observations, attempt, 'fcot_meso') |
| fcot_trace['meso'] = micro_observations |
|
|
| synthesis_prompt = FCOT_SYNTHESIS_PROMPT.format(toon_schema=toon_schema, score_instructions=score_instructions, tag_list_text=tag_list_text) |
| save_debug_log(request_id, 'prompt', synthesis_prompt, attempt, 'fcot_synthesis') |
| res3 = await loop.run_in_executor(None, lambda: chat.send_message(synthesis_prompt)) |
| raw_text = res3.text |
| save_debug_log(request_id, 'response', raw_text, attempt, 'fcot_synthesis') |
| prompt_used = f"FCoT Pipeline:\nMacro: {macro_hypothesis}\nMeso: {micro_observations}" |
| else: |
| template = LABELING_PROMPT_TEMPLATE_NO_COT if reasoning_method == "none" else LABELING_PROMPT_TEMPLATE |
| prompt_text = template.format( |
| system_persona=system_persona, caption=caption, transcript=transcript, |
| toon_schema=toon_schema, score_instructions=score_instructions, tag_list_text=tag_list_text |
| ) |
| prompt_used = prompt_text |
| if is_text_only: prompt_text = "NOTE: Text Analysis Only.\n" + prompt_text |
| save_debug_log(request_id, 'prompt', prompt_text, attempt, f'standard_{reasoning_method}') |
| inputs =[prompt_text] |
| if uploaded_file and uploaded_file.state.name != "FAILED": inputs.append(uploaded_file) |
| response = await loop.run_in_executor(None, lambda: model.generate_content(inputs, generation_config={"temperature": 0.1})) |
| raw_text = response.text |
| save_debug_log(request_id, 'response', raw_text, attempt, f'standard_{reasoning_method}') |
|
|
| if raw_text: |
| full_raw_text += f"\n--- Attempt {attempt} ---\n{raw_text}\n" |
| parsed_step = parse_veracity_toon(raw_text) |
| json_data = extract_json_from_text(raw_text) |
| if json_data: |
| for k in['veracity_vectors', 'modalities', 'video_context_summary', 'final_assessment', 'factuality_factors', 'disinformation_analysis', 'tags']: |
| if k in json_data: |
| if isinstance(parsed_step.get(k), dict) and isinstance(json_data[k], dict): |
| parsed_step[k].update(json_data[k]) |
| else: |
| parsed_step[k] = json_data[k] |
| accumulated_data = smart_merge(accumulated_data, parsed_step) |
| |
| missing_fields = validate_parsed_data(accumulated_data, is_text_only) |
| if not missing_fields: |
| yield f"Validation Passed. All factuality components processed and confidence scores obtained. (Method: {reasoning_method})\n" |
| yield {"raw_toon": full_raw_text, "parsed_data": accumulated_data, "prompt_used": prompt_used, "fcot_trace": fcot_trace} |
| break |
| |
| if attempt == max_retries: |
| yield f"Max retries reached. Saving incomplete data.\n" |
| yield {"raw_toon": full_raw_text, "parsed_data": accumulated_data, "prompt_used": prompt_used, "fcot_trace": fcot_trace} |
| break |
|
|
| if uploaded_file: await loop.run_in_executor(None, lambda: genai_legacy.delete_file(name=uploaded_file.name)) |
| except Exception as e: yield f"ERROR: {e}" |
|
|
| async def run_vertex_labeling_pipeline(video_path: str, caption: str, transcript: str, vertex_config: dict, include_comments: bool, reasoning_method: str = "cot", system_persona: str = "", request_id: str = None): |
| if genai is None: |
| yield "ERROR: 'google-genai' not installed.\n" |
| return |
|
|
| project_id = vertex_config.get("project_id") |
| location = vertex_config.get("location", "us-central1") |
| model_name = vertex_config.get("model_name", "gemini-1.5-pro") |
| if not model_name: model_name = "gemini-1.5-pro" |
| max_retries = int(vertex_config.get("max_retries", 1)) |
| api_key = vertex_config.get("api_key") |
|
|
| if not project_id: return |
|
|
| try: |
| |
| if api_key: |
| client = genai.Client(vertexai=True, project=project_id, location=location, api_key=api_key) |
| else: |
| client = genai.Client(vertexai=True, project=project_id, location=location) |
|
|
| video_part = None |
| is_text_only = False |
| if video_path and os.path.exists(video_path): |
| with open(video_path, 'rb') as f: video_bytes = f.read() |
| video_part = Part.from_bytes(data=video_bytes, mime_type="video/mp4") |
| else: is_text_only = True |
|
|
| active_tools =[] |
| if vertex_config.get("use_search", False): |
| active_tools.append(Tool(google_search=GoogleSearch())) |
| system_persona += "\n\n**CRITICAL: AGENTIC TOOLS ENABLED**\n- You MUST use the Web Search tool to fact-check the claims, look up current events, or verify entity backgrounds before concluding." |
| if vertex_config.get("use_code", False): |
| try: |
| from google.genai.types import CodeExecution |
| active_tools.append(Tool(code_execution=CodeExecution())) |
| system_persona += "\n- You MUST use the Code Execution tool for any necessary calculations, data processing, or statistical verifications." |
| except ImportError: |
| pass |
|
|
| config = GenerateContentConfig( |
| temperature=0.1, response_mime_type="text/plain", max_output_tokens=8192, |
| tools=active_tools if active_tools else None |
| ) |
|
|
| toon_schema = SCHEMA_REASONING if include_comments else SCHEMA_SIMPLE |
| score_instructions = SCORE_INSTRUCTIONS_REASONING if include_comments else SCORE_INSTRUCTIONS_SIMPLE |
| tag_list_text = get_formatted_tag_list() |
| |
| accumulated_data = {} |
| prompt_used = "" |
| fcot_trace = {} |
| full_raw_text = "" |
| loop = asyncio.get_event_loop() |
| |
| if is_text_only: system_persona += "\n" + TEXT_ONLY_INSTRUCTIONS |
|
|
| for attempt in range(max_retries + 1): |
| raw_text = "" |
| if attempt > 0: |
| missing = validate_parsed_data(accumulated_data, is_text_only) |
| yield f"Validation failed. Missing or incomplete fields: {missing}. Initiating Iterative Reprompt (Attempt {attempt}/{max_retries}) to acquire remaining factuality components...\n" |
| |
| |
| prompt_text = ( |
| f"SYSTEM: Review the previous attempt which failed validation.\n" |
| f"CONTEXT: Caption: \"{caption}\"\nTranscript: \"{transcript}\"\n" |
| f"PREVIOUS (PARTIAL) DATA: {json.dumps(accumulated_data, indent=2)}\n" |
| f"MISSING FIELDS: {missing}\n" |
| f"INSTRUCTION: Analyze the provided Video and Context again. " |
| f"Generate the missing fields to complete the schema. You MUST provide the missing scores for {missing}.\n" |
| f"Output the FULL VALID TOON OBJECT containing all required fields.\n" |
| f"{toon_schema}" |
| ) |
| |
| save_debug_log(request_id, 'prompt', prompt_text, attempt, 'reprompt') |
| contents =[prompt_text] |
| if video_part: contents.insert(0, video_part) |
| |
| response = await loop.run_in_executor(None, lambda: client.models.generate_content(model=model_name, contents=contents, config=config)) |
| raw_text = response.text |
| save_debug_log(request_id, 'response', raw_text, attempt, 'reprompt') |
| else: |
| if reasoning_method == "fcot": |
| yield "Starting Fractal Chain of Thought (Vertex FCoT)..." |
| chat = client.chats.create(model=model_name, config=config) |
| |
| macro_prompt = FCOT_MACRO_PROMPT.format(system_persona=system_persona, caption=caption, transcript=transcript) |
| save_debug_log(request_id, 'prompt', macro_prompt, attempt, 'fcot_macro') |
| inputs1 =[macro_prompt] |
| if video_part: inputs1.insert(0, video_part) |
| else: inputs1[0] = "NOTE: Text Only Analysis.\n" + inputs1[0] |
|
|
| res1 = await loop.run_in_executor(None, lambda: chat.send_message(inputs1)) |
| macro_hypothesis = res1.text |
| save_debug_log(request_id, 'response', macro_hypothesis, attempt, 'fcot_macro') |
| fcot_trace['macro'] = macro_hypothesis |
|
|
| meso_prompt = FCOT_MESO_PROMPT.format(macro_hypothesis=macro_hypothesis) |
| save_debug_log(request_id, 'prompt', meso_prompt, attempt, 'fcot_meso') |
| res2 = await loop.run_in_executor(None, lambda: chat.send_message(meso_prompt)) |
| micro_observations = res2.text |
| save_debug_log(request_id, 'response', micro_observations, attempt, 'fcot_meso') |
| fcot_trace['meso'] = micro_observations |
| |
| synthesis_prompt = FCOT_SYNTHESIS_PROMPT.format(toon_schema=toon_schema, score_instructions=score_instructions, tag_list_text=tag_list_text) |
| save_debug_log(request_id, 'prompt', synthesis_prompt, attempt, 'fcot_synthesis') |
| res3 = await loop.run_in_executor(None, lambda: chat.send_message(synthesis_prompt)) |
| raw_text = res3.text |
| save_debug_log(request_id, 'response', raw_text, attempt, 'fcot_synthesis') |
| prompt_used = f"FCoT (Vertex):\nMacro: {macro_hypothesis}\nMeso: {micro_observations}" |
| else: |
| template = LABELING_PROMPT_TEMPLATE_NO_COT if reasoning_method == "none" else LABELING_PROMPT_TEMPLATE |
| prompt_text = template.format( |
| system_persona=system_persona, caption=caption, transcript=transcript, |
| toon_schema=toon_schema, score_instructions=score_instructions, tag_list_text=tag_list_text |
| ) |
| contents =[] |
| if video_part: contents =[video_part, prompt_text] |
| else: contents =[f"NOTE: Text Only Analysis (No Video).\n{prompt_text}"] |
| prompt_used = prompt_text |
| save_debug_log(request_id, 'prompt', prompt_text, attempt, f'standard_{reasoning_method}') |
| yield f"Generating Labels (Vertex {reasoning_method.upper()})..." |
| response = await loop.run_in_executor(None, lambda: client.models.generate_content(model=model_name, contents=contents, config=config)) |
| raw_text = response.text |
| save_debug_log(request_id, 'response', raw_text, attempt, f'standard_{reasoning_method}') |
|
|
| if raw_text: |
| full_raw_text += f"\n--- Attempt {attempt} ---\n{raw_text}\n" |
| parsed_step = parse_veracity_toon(raw_text) |
| json_data = extract_json_from_text(raw_text) |
| if json_data: |
| for k in['veracity_vectors', 'modalities', 'video_context_summary', 'final_assessment', 'factuality_factors', 'disinformation_analysis', 'tags']: |
| if k in json_data: |
| if isinstance(parsed_step.get(k), dict) and isinstance(json_data[k], dict): |
| parsed_step[k].update(json_data[k]) |
| else: |
| parsed_step[k] = json_data[k] |
| accumulated_data = smart_merge(accumulated_data, parsed_step) |
|
|
| missing_fields = validate_parsed_data(accumulated_data, is_text_only) |
| if not missing_fields: |
| yield f"Validation Passed. All factuality components processed and confidence scores obtained. (Method: {reasoning_method})\n" |
| yield {"raw_toon": full_raw_text, "parsed_data": accumulated_data, "prompt_used": prompt_used, "fcot_trace": fcot_trace} |
| break |
|
|
| if attempt == max_retries: |
| yield f"Max retries reached. Saving incomplete data.\n" |
| yield {"raw_toon": full_raw_text, "parsed_data": accumulated_data, "prompt_used": prompt_used, "fcot_trace": fcot_trace} |
| break |
|
|
| except Exception as e: |
| yield f"ERROR: {e}" |
| logger.error("Vertex Labeling Error", exc_info=True) |
|
|
| async def run_nrp_labeling_pipeline(video_path: str, caption: str, transcript: str, nrp_config: dict, include_comments: bool, reasoning_method: str = "cot", system_persona: str = "", request_id: str = None): |
| api_key = nrp_config.get("api_key") |
| model_name = nrp_config.get("model_name", "gpt-4") |
| base_url = nrp_config.get("base_url", "https://api.openai.com/v1").rstrip("/") |
| max_retries = int(nrp_config.get("max_retries", 1)) |
|
|
| if not api_key: |
| yield "ERROR: NRP API Key missing.\n" |
| return |
|
|
| headers = { |
| "Authorization": f"Bearer {api_key}", |
| "Content-Type": "application/json" |
| } |
|
|
| is_text_only = True |
| system_persona += "\n" + TEXT_ONLY_INSTRUCTIONS |
| |
| if nrp_config.get("use_search", False): |
| system_persona += "\n\n**CRITICAL: AGENTIC TOOLS ENABLED**\n- You MUST use the Web Search tool to fact-check the claims, look up current events, or verify entity backgrounds before concluding." |
| if nrp_config.get("use_code", False): |
| system_persona += "\n- You MUST use the Code Execution tool for any necessary calculations, data processing, or statistical verifications." |
|
|
| toon_schema = SCHEMA_REASONING if include_comments else SCHEMA_SIMPLE |
| score_instructions = SCORE_INSTRUCTIONS_REASONING if include_comments else SCORE_INSTRUCTIONS_SIMPLE |
| tag_list_text = get_formatted_tag_list() |
|
|
| accumulated_data = {} |
| prompt_used = "" |
| fcot_trace = {} |
| full_raw_text = "" |
| loop = asyncio.get_event_loop() |
|
|
| async def _call_nrp(messages, attempt_label=""): |
| payload = { |
| "model": model_name, |
| "messages": messages, |
| "temperature": 0.1 |
| } |
| |
| logger.info(f"[{request_id}] NRP API Call ({attempt_label}) - URL: {base_url}/chat/completions") |
| logger.info(f"[{request_id}] NRP API Call - Model: {model_name}") |
| logger.info(f"[{request_id}] NRP API Call - Messages count: {len(messages)}") |
|
|
| def do_request(): |
| start_time = datetime.datetime.now() |
| logger.info(f"[{request_id}] Dispatching requests.post (timeout=600s)...") |
| |
| resp = requests.post(f"{base_url}/chat/completions", headers=headers, json=payload, timeout=600) |
| |
| elapsed = (datetime.datetime.now() - start_time).total_seconds() |
| logger.info(f"[{request_id}] NRP API Response received in {elapsed:.2f}s. Status Code: {resp.status_code}") |
| |
| if resp.status_code != 200: |
| logger.error(f"[{request_id}] API Error {resp.status_code}: {resp.text}") |
| raise Exception(f"API Error {resp.status_code}: {resp.text}") |
| |
| resp_json = resp.json() |
| usage = resp_json.get("usage", {}) |
| logger.info(f"[{request_id}] NRP API Usage: {usage}") |
| |
| return resp_json["choices"][0]["message"]["content"] |
| |
| return await loop.run_in_executor(None, do_request) |
|
|
| try: |
| for attempt in range(max_retries + 1): |
| raw_text = "" |
| if attempt > 0: |
| missing = validate_parsed_data(accumulated_data, is_text_only) |
| yield f"Validation failed. Missing fields: {missing}. Initiating Reprompt (Attempt {attempt}/{max_retries})...\n" |
| |
| prompt_text = ( |
| f"SYSTEM: Review the previous attempt which failed validation.\n" |
| f"CONTEXT: Caption: \"{caption}\"\nTranscript: \"{transcript}\"\n" |
| f"PREVIOUS (PARTIAL) DATA: {json.dumps(accumulated_data, indent=2)}\n" |
| f"MISSING FIELDS: {missing}\n" |
| f"INSTRUCTION: Generate the missing fields to complete the schema. You MUST provide the missing scores for {missing}.\n" |
| f"Output the FULL VALID TOON OBJECT containing all required fields.\n" |
| f"{toon_schema}" |
| ) |
| |
| save_debug_log(request_id, 'prompt', prompt_text, attempt, 'reprompt') |
| |
| yield f" - Sending Reprompt request to NRP API (Model: {model_name}, Timeout: 600s)...\n" |
| raw_text = await _call_nrp([ |
| {"role": "system", "content": system_persona}, |
| {"role": "user", "content": prompt_text} |
| ], attempt_label=f"reprompt_{attempt}") |
| yield f" - Received Reprompt response from NRP API.\n\n" |
| |
| save_debug_log(request_id, 'response', raw_text, attempt, 'reprompt') |
| else: |
| if reasoning_method == "fcot": |
| yield "Starting Fractal Chain of Thought (NRP FCoT)...\n" |
| |
| macro_prompt = FCOT_MACRO_PROMPT.format(system_persona=system_persona, caption=caption, transcript=transcript) |
| macro_prompt = "NOTE: Text Only Analysis.\n" + macro_prompt |
| save_debug_log(request_id, 'prompt', macro_prompt, attempt, 'fcot_macro') |
| |
| macro_messages =[{"role": "system", "content": system_persona}, {"role": "user", "content": macro_prompt}] |
| yield f" - Stage 1: Sending Macro Hypothesis request to NRP API (Timeout: 600s)...\n" |
| macro_hypothesis = await _call_nrp(macro_messages, attempt_label="fcot_macro") |
| yield f" - Stage 1: Received Macro Hypothesis response.\n" |
| |
| save_debug_log(request_id, 'response', macro_hypothesis, attempt, 'fcot_macro') |
| fcot_trace['macro'] = macro_hypothesis |
|
|
| meso_prompt = FCOT_MESO_PROMPT.format(macro_hypothesis=macro_hypothesis) |
| save_debug_log(request_id, 'prompt', meso_prompt, attempt, 'fcot_meso') |
| meso_messages = macro_messages +[{"role": "assistant", "content": macro_hypothesis}, {"role": "user", "content": meso_prompt}] |
| |
| yield f" - Stage 2: Sending Meso Analysis request to NRP API (Timeout: 600s)...\n" |
| micro_observations = await _call_nrp(meso_messages, attempt_label="fcot_meso") |
| yield f" - Stage 2: Received Meso Analysis response.\n" |
| |
| save_debug_log(request_id, 'response', micro_observations, attempt, 'fcot_meso') |
| fcot_trace['meso'] = micro_observations |
|
|
| synthesis_prompt = FCOT_SYNTHESIS_PROMPT.format(toon_schema=toon_schema, score_instructions=score_instructions, tag_list_text=tag_list_text) |
| save_debug_log(request_id, 'prompt', synthesis_prompt, attempt, 'fcot_synthesis') |
| synthesis_messages = meso_messages +[{"role": "assistant", "content": micro_observations}, {"role": "user", "content": synthesis_prompt}] |
| |
| yield f" - Stage 3: Sending Synthesis/Formatting request to NRP API (Timeout: 600s)...\n" |
| raw_text = await _call_nrp(synthesis_messages, attempt_label="fcot_synthesis") |
| yield f" - Stage 3: Received Synthesis response.\n\n" |
| |
| save_debug_log(request_id, 'response', raw_text, attempt, 'fcot_synthesis') |
| prompt_used = f"FCoT (NRP):\nMacro: {macro_hypothesis}\nMeso: {micro_observations}" |
| |
| else: |
| template = LABELING_PROMPT_TEMPLATE_NO_COT if reasoning_method == "none" else LABELING_PROMPT_TEMPLATE |
| prompt_text = template.format( |
| system_persona=system_persona, caption=caption, transcript=transcript, |
| toon_schema=toon_schema, score_instructions=score_instructions, tag_list_text=tag_list_text |
| ) |
| prompt_text = f"NOTE: Text Only Analysis (No Video).\n{prompt_text}" |
| prompt_used = prompt_text |
| save_debug_log(request_id, 'prompt', prompt_text, attempt, f'standard_{reasoning_method}') |
| yield f"Generating Labels (NRP {reasoning_method.upper()})...\n" |
| yield f" - Sending Standard request to NRP API (Model: {model_name}, Timeout: 600s)...\n" |
| |
| raw_text = await _call_nrp([ |
| {"role": "system", "content": system_persona}, |
| {"role": "user", "content": prompt_text} |
| ], attempt_label=f"standard_{reasoning_method}") |
| |
| yield f" - Received response from NRP API.\n\n" |
| save_debug_log(request_id, 'response', raw_text, attempt, f'standard_{reasoning_method}') |
|
|
| if raw_text: |
| full_raw_text += f"\n--- Attempt {attempt} ---\n{raw_text}\n" |
| parsed_step = parse_veracity_toon(raw_text) |
| json_data = extract_json_from_text(raw_text) |
| if json_data: |
| for k in['veracity_vectors', 'modalities', 'video_context_summary', 'final_assessment', 'factuality_factors', 'disinformation_analysis', 'tags']: |
| if k in json_data: |
| if isinstance(parsed_step.get(k), dict) and isinstance(json_data[k], dict): |
| parsed_step[k].update(json_data[k]) |
| else: |
| parsed_step[k] = json_data[k] |
| accumulated_data = smart_merge(accumulated_data, parsed_step) |
|
|
| missing_fields = validate_parsed_data(accumulated_data, is_text_only) |
| if not missing_fields: |
| yield f"Validation Passed. All factuality components processed and confidence scores obtained. (Method: {reasoning_method})\n" |
| yield {"raw_toon": full_raw_text, "parsed_data": accumulated_data, "prompt_used": prompt_used, "fcot_trace": fcot_trace} |
| break |
|
|
| if attempt == max_retries: |
| yield f"Max retries reached. Saving incomplete data.\n" |
| yield {"raw_toon": full_raw_text, "parsed_data": accumulated_data, "prompt_used": prompt_used, "fcot_trace": fcot_trace} |
| break |
|
|
| except Exception as e: |
| yield f"ERROR: {e}" |
| logger.error("NRP Labeling Error", exc_info=True) |
|
|