Spaces:
Sleeping
Sleeping
File size: 39,415 Bytes
7632cf2 4b424d6 7632cf2 a37300f 4b424d6 7632cf2 d3b4e50 7632cf2 4b424d6 7632cf2 4b424d6 7632cf2 4b424d6 7632cf2 4b424d6 7632cf2 4b424d6 d3b4e50 4b424d6 d3b4e50 4b424d6 a37300f 4b424d6 5dae8fe 4b424d6 5dae8fe 4b424d6 a37300f 4b424d6 a37300f 4b424d6 a37300f 4b424d6 a37300f 4b424d6 ba8b51d a37300f 4b424d6 a37300f ba8b51d a37300f 4b424d6 a37300f 4b424d6 a37300f 7632cf2 a37300f 7632cf2 4b424d6 a37300f 4b424d6 7632cf2 a37300f 4d2be90 4b424d6 4d2be90 4b424d6 7632cf2 4b424d6 5dae8fe 7632cf2 4b424d6 7632cf2 a37300f 4b424d6 a37300f 4b424d6 7632cf2 a37300f 4d2be90 a37300f 4b424d6 a37300f 5dae8fe 4d2be90 4b424d6 a37300f 7632cf2 a37300f 4b424d6 a37300f 4b424d6 a37300f 4b424d6 a37300f 4b424d6 a37300f 4b424d6 a37300f 4b424d6 4d2be90 4b424d6 a37300f 4b424d6 a37300f 4b424d6 a37300f 4b424d6 a37300f 4b424d6 a37300f 4b424d6 a37300f 7632cf2 4b424d6 5dae8fe 4b424d6 7632cf2 4b424d6 7632cf2 4b424d6 a37300f 4b424d6 a37300f 4b424d6 a37300f 4b424d6 7632cf2 4b424d6 7632cf2 4b424d6 a37300f 4b424d6 a37300f 4b424d6 7632cf2 a37300f 4b424d6 a37300f 4d2be90 a37300f 4b424d6 a37300f 13ce174 a37300f 4b424d6 a37300f 4b424d6 a37300f 4b424d6 ba8b51d a37300f 4b424d6 a37300f 4b424d6 a37300f 4b424d6 a37300f 4b424d6 ba8b51d 4b424d6 ba8b51d a37300f 7632cf2 a37300f 4b424d6 a37300f 4b424d6 a37300f 4b424d6 a37300f 4b424d6 a37300f 4b424d6 a37300f 4b424d6 a37300f 4b424d6 a37300f 4b424d6 a37300f 4b424d6 a37300f 4b424d6 a37300f 4b424d6 a37300f 4b424d6 a37300f 4b424d6 a37300f 4b424d6 a37300f 4b424d6 a37300f 4b424d6 a37300f 4b424d6 a37300f 4b424d6 a37300f 4b424d6 a37300f 4b424d6 13ce174 d3b4e50 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 | import re
import ast
import sys
import os
import logging
import asyncio
import json
import datetime
import requests
try:
import torch
except ImportError:
torch = None
try:
from transformers import Qwen3VLForConditionalGeneration, AutoProcessor
from peft import PeftModel
except ImportError:
Qwen3VLForConditionalGeneration = None
AutoProcessor = None
PeftModel = None
try:
from my_vision_process import process_vision_info, client
except ImportError:
process_vision_info = None
client = None
from labeling_logic import (
LABELING_PROMPT_TEMPLATE, LABELING_PROMPT_TEMPLATE_NO_COT,
SCORE_INSTRUCTIONS_SIMPLE, SCORE_INSTRUCTIONS_REASONING,
SCHEMA_SIMPLE, SCHEMA_REASONING,
FCOT_MACRO_PROMPT, FCOT_MESO_PROMPT, FCOT_SYNTHESIS_PROMPT, TEXT_ONLY_INSTRUCTIONS,
get_formatted_tag_list
)
from toon_parser import parse_veracity_toon
# Google GenAI Imports
try:
import google.generativeai as genai_legacy
from google.generativeai.types import generation_types
except ImportError:
genai_legacy = None
try:
# Modern Google GenAI SDK (v1)
from google import genai
from google.genai.types import (
GenerateContentConfig,
HttpOptions,
Retrieval,
Tool,
VertexAISearch,
GoogleSearch,
Part
)
import vertexai
except ImportError:
genai = None
vertexai = None
LITE_MODE = os.getenv("LITE_MODE", "true").lower() == "true"
processor = None
base_model = None
peft_model = None
active_model = None
logger = logging.getLogger(__name__)
def load_models():
global LITE_MODE, processor, base_model, peft_model, active_model
if LITE_MODE:
logger.info("LITE_MODE is enabled. Skipping local model loading.")
return
if base_model is not None: return
if torch is None or not torch.cuda.is_available():
logger.warning("CUDA is not available or torch is missing. This application requires a GPU for local models. Switching to LITE_MODE.")
LITE_MODE = True
return
device = torch.device("cuda")
logger.info(f"CUDA is available. Initializing models on {device}...")
local_model_path = "/app/local_model"
try:
import flash_attn
attn_implementation = "flash_attention_2"
except ImportError:
attn_implementation = "sdpa"
logger.info(f"Loading base model from {local_model_path}...")
try:
base_model = Qwen3VLForConditionalGeneration.from_pretrained(
local_model_path, dtype=torch.bfloat16, device_map="auto", attn_implementation=attn_implementation
).eval()
processor = AutoProcessor.from_pretrained(local_model_path)
active_model = base_model
except Exception as e:
logger.error(f"Failed to load local model: {e}")
LITE_MODE = True
def switch_active_model(model_name: str):
global active_model, base_model, peft_model
if model_name == "custom" and peft_model is not None:
active_model = peft_model
else:
active_model = base_model
def inference_step(video_path, prompt, generation_kwargs, sampling_fps, pred_glue=None):
global processor, active_model
if active_model is None or torch is None: raise RuntimeError("Models not loaded.")
messages =[
{"role": "user", "content":[
{"type": "video", "video": video_path, 'key_time': pred_glue, 'fps': sampling_fps,
"total_pixels": 128*12 * 28 * 28, "min_pixels": 128 * 28 * 28},
{"type": "text", "text": prompt},
]
},
]
text = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
image_inputs, video_inputs, video_kwargs = process_vision_info(messages, return_video_kwargs=True, client=client)
fps_inputs = video_kwargs['fps'][0]
inputs = processor(text=[text], images=image_inputs, videos=video_inputs, fps=fps_inputs, padding=True, return_tensors="pt")
inputs = {k: v.to(active_model.device) for k, v in inputs.items()}
with torch.no_grad():
output_ids = active_model.generate(**inputs, **generation_kwargs, use_cache=True)
generated_ids = [output_ids[i][len(inputs['input_ids'][i]):] for i in range(len(output_ids))]
output_text = processor.batch_decode(generated_ids, skip_special_tokens=True)
return output_text[0]
async def generate_simple_text(prompt: str, model_type: str, config: dict):
loop = asyncio.get_event_loop()
try:
if model_type == 'gemini':
if genai_legacy is None: return "Error: Legacy SDK missing."
genai_legacy.configure(api_key=config.get("api_key"))
model_name = config.get("model_name", "gemini-1.5-pro")
if not model_name: model_name = "gemini-1.5-pro"
model = genai_legacy.GenerativeModel(model_name)
response = await loop.run_in_executor(
None,
lambda: model.generate_content(prompt, generation_config={"temperature": 0.0})
)
return response.text
elif model_type == 'vertex':
if genai is None: return "Error: Vertex SDK missing."
api_key = config.get("api_key")
if api_key:
cl = genai.Client(vertexai=True, project=config['project_id'], location=config['location'], api_key=api_key)
else:
cl = genai.Client(vertexai=True, project=config['project_id'], location=config['location'])
response = await loop.run_in_executor(
None,
lambda: cl.models.generate_content(
model=config.get('model_name', 'gemini-1.5-pro'),
contents=prompt,
config=GenerateContentConfig(temperature=0.0)
)
)
return response.text
elif model_type == 'nrp':
api_key = config.get("api_key")
model_name = config.get("model_name", "gpt-4")
base_url = config.get("base_url", "https://api.openai.com/v1").rstrip("/")
if not api_key: return "Error: NRP API key missing."
headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}
payload = {"model": model_name, "messages":[{"role": "user", "content": prompt}], "temperature": 0.0}
def do_request():
resp = requests.post(f"{base_url}/chat/completions", headers=headers, json=payload, timeout=600)
if resp.status_code == 200:
return resp.json()["choices"][0]["message"]["content"]
return f"Error: {resp.status_code} {resp.text}"
return await loop.run_in_executor(None, do_request)
except Exception as e:
logger.error(f"Text Gen Error: {e}")
return f"Error generating text: {e}"
async def generate_community_summary(comments: list, model_type: str, config: dict):
if not comments: return "No comments available."
c_text = "\n".join([f"- {c.get('author', 'User')}: {c.get('text', '')}" for c in comments[:15]])
prompt = (
"You are a Community Context Analyst. Analyze the following user comments regarding a social media post.\n"
"Your goal is to extract 'Community Notes' - specifically looking for fact-checking, debunking, or additional context provided by users.\n"
f"COMMENTS:\n{c_text}\n\n"
"OUTPUT:\n"
"Provide a concise 1-paragraph summary of the community consensus regarding the veracity of the post."
)
return await generate_simple_text(prompt, model_type, config)
def extract_json_from_text(text):
try:
match = re.search(r'\{[\s\S]*\}', text)
if match:
return json.loads(match.group(0))
except:
pass
return {}
def validate_parsed_data(data, is_text_only):
missing =[]
if not data.get('video_context_summary'): missing.append("summary")
final = data.get('final_assessment', {})
if not final.get('reasoning') or len(str(final.get('reasoning', ''))) < 5: missing.append("final:reasoning")
vectors = data.get('veracity_vectors', {})
required_vectors =['visual_integrity_score', 'audio_integrity_score', 'source_credibility_score', 'logical_consistency_score', 'emotional_manipulation_score']
for k in required_vectors:
if k in['visual_integrity_score', 'audio_integrity_score'] and is_text_only: continue
v = vectors.get(k)
if not v or str(v) == '0' or str(v).lower() == 'n/a': missing.append(f"vector:{k}")
mod = data.get('modalities', {})
for k in['video_audio_score', 'video_caption_score', 'audio_caption_score']:
if k in['video_audio_score', 'video_caption_score'] and is_text_only: continue
v = mod.get(k)
if not v or str(v) == '0' or str(v).lower() == 'n/a': missing.append(f"modality:{k}")
fact = data.get('factuality_factors', {})
if not fact.get('claim_accuracy'): missing.append("factuality:claim_accuracy")
disinfo = data.get('disinformation_analysis', {})
if not disinfo.get('classification'): missing.append("disinfo:classification")
return missing
def smart_merge(base, new_data):
if not isinstance(new_data, dict): return new_data if new_data else base
if not isinstance(base, dict): return new_data
for k, v in new_data.items():
if k not in base: base[k] = v
else:
if isinstance(base[k], dict) and isinstance(v, dict): smart_merge(base[k], v)
else:
base_val = base[k]
new_val = v
is_base_valid = base_val and str(base_val) != "0" and str(base_val).lower() != "n/a"
is_new_valid = new_val and str(new_val) != "0" and str(new_val).lower() != "n/a"
if not is_base_valid and is_new_valid: base[k] = new_val
return base
def save_debug_log(request_id, kind, content, attempt, label=""):
if not request_id: return
try:
dir_map = {'prompt': 'data/prompts', 'response': 'data/responses'}
directory = dir_map.get(kind, 'data')
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
safe_label = f"_{label}" if label else ""
filename = f"{directory}/{request_id}_{ts}_att{attempt}{safe_label}.txt"
with open(filename, "w", encoding="utf-8") as f:
f.write(str(content))
except Exception as e:
logger.error(f"Failed to save debug log: {e}")
async def run_gemini_labeling_pipeline(video_path: str, caption: str, transcript: str, gemini_config: dict, include_comments: bool, reasoning_method: str = "cot", system_persona: str = "", request_id: str = None):
if genai_legacy is None:
yield "ERROR: Legacy SDK missing.\n"
return
api_key = gemini_config.get("api_key")
if not api_key: return
max_retries = int(gemini_config.get("max_retries", 1))
try:
genai_legacy.configure(api_key=api_key)
loop = asyncio.get_event_loop()
uploaded_file = None
is_text_only = False
if video_path and os.path.exists(video_path):
yield f"data: - Uploading video to Gemini API...\n\n"
uploaded_file = await loop.run_in_executor(None, lambda: genai_legacy.upload_file(path=video_path))
# Continuously poll the API for the updated state
while True:
curr_file = await loop.run_in_executor(None, lambda: genai_legacy.get_file(uploaded_file.name))
if curr_file.state.name == "PROCESSING":
yield f"data: - Waiting for Gemini to process the video...\n\n"
await asyncio.sleep(3)
elif curr_file.state.name == "FAILED":
yield f"data: - Gemini Video Processing FAILED.\n\n"
break
else:
break
else: is_text_only = True
active_tools =[]
if gemini_config.get("use_search", False):
active_tools.append({"google_search_retrieval": {}})
system_persona += "\n\n**CRITICAL: AGENTIC TOOLS ENABLED**\n- You MUST use the Web Search tool to fact-check the claims, look up current events, or verify entity backgrounds before concluding."
if gemini_config.get("use_code", False):
active_tools.append({"code_execution": {}})
system_persona += "\n- You MUST use the Code Execution tool for any necessary calculations, data processing, or statistical verifications."
model_name = gemini_config.get("model_name", "gemini-1.5-pro")
if not model_name: model_name = "gemini-1.5-pro"
model = genai_legacy.GenerativeModel(model_name, tools=active_tools if active_tools else None)
toon_schema = SCHEMA_REASONING if include_comments else SCHEMA_SIMPLE
score_instructions = SCORE_INSTRUCTIONS_REASONING if include_comments else SCORE_INSTRUCTIONS_SIMPLE
tag_list_text = get_formatted_tag_list()
accumulated_data = {}
prompt_used = ""
fcot_trace = {}
full_raw_text = ""
if is_text_only: system_persona += "\n" + TEXT_ONLY_INSTRUCTIONS
for attempt in range(max_retries + 1):
raw_text = ""
if attempt > 0:
missing = validate_parsed_data(accumulated_data, is_text_only)
yield f"Validation failed. Missing or incomplete fields: {missing}. Initiating Iterative Reprompt (Attempt {attempt}/{max_retries}) to acquire remaining factuality components...\n"
prompt_text = (
f"SYSTEM: Review the previous attempt which failed validation.\n"
f"CONTEXT: Caption: \"{caption}\"\nTranscript: \"{transcript}\"\n"
f"PREVIOUS (PARTIAL) DATA: {json.dumps(accumulated_data, indent=2)}\n"
f"MISSING FIELDS: {missing}\n"
f"INSTRUCTION: Analyze the provided Video and Context again. "
f"Generate the missing fields to complete the schema. You MUST provide the missing scores for {missing}.\n"
f"Output the FULL VALID TOON OBJECT containing all required fields.\n"
f"{toon_schema}"
)
save_debug_log(request_id, 'prompt', prompt_text, attempt, 'reprompt')
inputs =[prompt_text]
if uploaded_file and uploaded_file.state.name != "FAILED": inputs.append(uploaded_file)
response = await loop.run_in_executor(None, lambda: model.generate_content(inputs, generation_config={"temperature": 0.2}))
raw_text = response.text
save_debug_log(request_id, 'response', raw_text, attempt, 'reprompt')
else:
if reasoning_method == "fcot":
yield "Starting Fractal Chain of Thought (Gemini FCoT)..."
chat = model.start_chat(history=[])
macro_prompt = FCOT_MACRO_PROMPT.format(system_persona=system_persona, caption=caption, transcript=transcript)
save_debug_log(request_id, 'prompt', macro_prompt, attempt, 'fcot_macro')
inputs1 =[macro_prompt]
if uploaded_file and uploaded_file.state.name != "FAILED": inputs1.insert(0, uploaded_file)
res1 = await loop.run_in_executor(None, lambda: chat.send_message(inputs1))
macro_hypothesis = res1.text
save_debug_log(request_id, 'response', macro_hypothesis, attempt, 'fcot_macro')
fcot_trace['macro'] = macro_hypothesis
meso_prompt = FCOT_MESO_PROMPT.format(macro_hypothesis=macro_hypothesis)
save_debug_log(request_id, 'prompt', meso_prompt, attempt, 'fcot_meso')
res2 = await loop.run_in_executor(None, lambda: chat.send_message(meso_prompt))
micro_observations = res2.text
save_debug_log(request_id, 'response', micro_observations, attempt, 'fcot_meso')
fcot_trace['meso'] = micro_observations
synthesis_prompt = FCOT_SYNTHESIS_PROMPT.format(toon_schema=toon_schema, score_instructions=score_instructions, tag_list_text=tag_list_text)
save_debug_log(request_id, 'prompt', synthesis_prompt, attempt, 'fcot_synthesis')
res3 = await loop.run_in_executor(None, lambda: chat.send_message(synthesis_prompt))
raw_text = res3.text
save_debug_log(request_id, 'response', raw_text, attempt, 'fcot_synthesis')
prompt_used = f"FCoT Pipeline:\nMacro: {macro_hypothesis}\nMeso: {micro_observations}"
else:
template = LABELING_PROMPT_TEMPLATE_NO_COT if reasoning_method == "none" else LABELING_PROMPT_TEMPLATE
prompt_text = template.format(
system_persona=system_persona, caption=caption, transcript=transcript,
toon_schema=toon_schema, score_instructions=score_instructions, tag_list_text=tag_list_text
)
prompt_used = prompt_text
if is_text_only: prompt_text = "NOTE: Text Analysis Only.\n" + prompt_text
save_debug_log(request_id, 'prompt', prompt_text, attempt, f'standard_{reasoning_method}')
inputs =[prompt_text]
if uploaded_file and uploaded_file.state.name != "FAILED": inputs.append(uploaded_file)
response = await loop.run_in_executor(None, lambda: model.generate_content(inputs, generation_config={"temperature": 0.1}))
raw_text = response.text
save_debug_log(request_id, 'response', raw_text, attempt, f'standard_{reasoning_method}')
if raw_text:
full_raw_text += f"\n--- Attempt {attempt} ---\n{raw_text}\n"
parsed_step = parse_veracity_toon(raw_text)
json_data = extract_json_from_text(raw_text)
if json_data:
for k in['veracity_vectors', 'modalities', 'video_context_summary', 'final_assessment', 'factuality_factors', 'disinformation_analysis', 'tags']:
if k in json_data:
if isinstance(parsed_step.get(k), dict) and isinstance(json_data[k], dict):
parsed_step[k].update(json_data[k])
else:
parsed_step[k] = json_data[k]
accumulated_data = smart_merge(accumulated_data, parsed_step)
missing_fields = validate_parsed_data(accumulated_data, is_text_only)
if not missing_fields:
yield f"Validation Passed. All factuality components processed and confidence scores obtained. (Method: {reasoning_method})\n"
yield {"raw_toon": full_raw_text, "parsed_data": accumulated_data, "prompt_used": prompt_used, "fcot_trace": fcot_trace}
break
if attempt == max_retries:
yield f"Max retries reached. Saving incomplete data.\n"
yield {"raw_toon": full_raw_text, "parsed_data": accumulated_data, "prompt_used": prompt_used, "fcot_trace": fcot_trace}
break
if uploaded_file: await loop.run_in_executor(None, lambda: genai_legacy.delete_file(name=uploaded_file.name))
except Exception as e: yield f"ERROR: {e}"
async def run_vertex_labeling_pipeline(video_path: str, caption: str, transcript: str, vertex_config: dict, include_comments: bool, reasoning_method: str = "cot", system_persona: str = "", request_id: str = None):
if genai is None:
yield "ERROR: 'google-genai' not installed.\n"
return
project_id = vertex_config.get("project_id")
location = vertex_config.get("location", "us-central1")
model_name = vertex_config.get("model_name", "gemini-1.5-pro")
if not model_name: model_name = "gemini-1.5-pro"
max_retries = int(vertex_config.get("max_retries", 1))
api_key = vertex_config.get("api_key")
if not project_id: return
try:
# Pass api_key directly if available to use API Keys instead of ADC Service Accounts
if api_key:
client = genai.Client(vertexai=True, project=project_id, location=location, api_key=api_key)
else:
client = genai.Client(vertexai=True, project=project_id, location=location)
video_part = None
is_text_only = False
if video_path and os.path.exists(video_path):
with open(video_path, 'rb') as f: video_bytes = f.read()
video_part = Part.from_bytes(data=video_bytes, mime_type="video/mp4")
else: is_text_only = True
active_tools =[]
if vertex_config.get("use_search", False):
active_tools.append(Tool(google_search=GoogleSearch()))
system_persona += "\n\n**CRITICAL: AGENTIC TOOLS ENABLED**\n- You MUST use the Web Search tool to fact-check the claims, look up current events, or verify entity backgrounds before concluding."
if vertex_config.get("use_code", False):
try:
from google.genai.types import CodeExecution
active_tools.append(Tool(code_execution=CodeExecution()))
system_persona += "\n- You MUST use the Code Execution tool for any necessary calculations, data processing, or statistical verifications."
except ImportError:
pass
config = GenerateContentConfig(
temperature=0.1, response_mime_type="text/plain", max_output_tokens=8192,
tools=active_tools if active_tools else None
)
toon_schema = SCHEMA_REASONING if include_comments else SCHEMA_SIMPLE
score_instructions = SCORE_INSTRUCTIONS_REASONING if include_comments else SCORE_INSTRUCTIONS_SIMPLE
tag_list_text = get_formatted_tag_list()
accumulated_data = {}
prompt_used = ""
fcot_trace = {}
full_raw_text = ""
loop = asyncio.get_event_loop()
if is_text_only: system_persona += "\n" + TEXT_ONLY_INSTRUCTIONS
for attempt in range(max_retries + 1):
raw_text = ""
if attempt > 0:
missing = validate_parsed_data(accumulated_data, is_text_only)
yield f"Validation failed. Missing or incomplete fields: {missing}. Initiating Iterative Reprompt (Attempt {attempt}/{max_retries}) to acquire remaining factuality components...\n"
# REPROMPT CONSTRUCTION
prompt_text = (
f"SYSTEM: Review the previous attempt which failed validation.\n"
f"CONTEXT: Caption: \"{caption}\"\nTranscript: \"{transcript}\"\n"
f"PREVIOUS (PARTIAL) DATA: {json.dumps(accumulated_data, indent=2)}\n"
f"MISSING FIELDS: {missing}\n"
f"INSTRUCTION: Analyze the provided Video and Context again. "
f"Generate the missing fields to complete the schema. You MUST provide the missing scores for {missing}.\n"
f"Output the FULL VALID TOON OBJECT containing all required fields.\n"
f"{toon_schema}"
)
save_debug_log(request_id, 'prompt', prompt_text, attempt, 'reprompt')
contents =[prompt_text]
if video_part: contents.insert(0, video_part)
response = await loop.run_in_executor(None, lambda: client.models.generate_content(model=model_name, contents=contents, config=config))
raw_text = response.text
save_debug_log(request_id, 'response', raw_text, attempt, 'reprompt')
else:
if reasoning_method == "fcot":
yield "Starting Fractal Chain of Thought (Vertex FCoT)..."
chat = client.chats.create(model=model_name, config=config)
macro_prompt = FCOT_MACRO_PROMPT.format(system_persona=system_persona, caption=caption, transcript=transcript)
save_debug_log(request_id, 'prompt', macro_prompt, attempt, 'fcot_macro')
inputs1 =[macro_prompt]
if video_part: inputs1.insert(0, video_part)
else: inputs1[0] = "NOTE: Text Only Analysis.\n" + inputs1[0]
res1 = await loop.run_in_executor(None, lambda: chat.send_message(inputs1))
macro_hypothesis = res1.text
save_debug_log(request_id, 'response', macro_hypothesis, attempt, 'fcot_macro')
fcot_trace['macro'] = macro_hypothesis
meso_prompt = FCOT_MESO_PROMPT.format(macro_hypothesis=macro_hypothesis)
save_debug_log(request_id, 'prompt', meso_prompt, attempt, 'fcot_meso')
res2 = await loop.run_in_executor(None, lambda: chat.send_message(meso_prompt))
micro_observations = res2.text
save_debug_log(request_id, 'response', micro_observations, attempt, 'fcot_meso')
fcot_trace['meso'] = micro_observations
synthesis_prompt = FCOT_SYNTHESIS_PROMPT.format(toon_schema=toon_schema, score_instructions=score_instructions, tag_list_text=tag_list_text)
save_debug_log(request_id, 'prompt', synthesis_prompt, attempt, 'fcot_synthesis')
res3 = await loop.run_in_executor(None, lambda: chat.send_message(synthesis_prompt))
raw_text = res3.text
save_debug_log(request_id, 'response', raw_text, attempt, 'fcot_synthesis')
prompt_used = f"FCoT (Vertex):\nMacro: {macro_hypothesis}\nMeso: {micro_observations}"
else:
template = LABELING_PROMPT_TEMPLATE_NO_COT if reasoning_method == "none" else LABELING_PROMPT_TEMPLATE
prompt_text = template.format(
system_persona=system_persona, caption=caption, transcript=transcript,
toon_schema=toon_schema, score_instructions=score_instructions, tag_list_text=tag_list_text
)
contents =[]
if video_part: contents =[video_part, prompt_text]
else: contents =[f"NOTE: Text Only Analysis (No Video).\n{prompt_text}"]
prompt_used = prompt_text
save_debug_log(request_id, 'prompt', prompt_text, attempt, f'standard_{reasoning_method}')
yield f"Generating Labels (Vertex {reasoning_method.upper()})..."
response = await loop.run_in_executor(None, lambda: client.models.generate_content(model=model_name, contents=contents, config=config))
raw_text = response.text
save_debug_log(request_id, 'response', raw_text, attempt, f'standard_{reasoning_method}')
if raw_text:
full_raw_text += f"\n--- Attempt {attempt} ---\n{raw_text}\n"
parsed_step = parse_veracity_toon(raw_text)
json_data = extract_json_from_text(raw_text)
if json_data:
for k in['veracity_vectors', 'modalities', 'video_context_summary', 'final_assessment', 'factuality_factors', 'disinformation_analysis', 'tags']:
if k in json_data:
if isinstance(parsed_step.get(k), dict) and isinstance(json_data[k], dict):
parsed_step[k].update(json_data[k])
else:
parsed_step[k] = json_data[k]
accumulated_data = smart_merge(accumulated_data, parsed_step)
missing_fields = validate_parsed_data(accumulated_data, is_text_only)
if not missing_fields:
yield f"Validation Passed. All factuality components processed and confidence scores obtained. (Method: {reasoning_method})\n"
yield {"raw_toon": full_raw_text, "parsed_data": accumulated_data, "prompt_used": prompt_used, "fcot_trace": fcot_trace}
break
if attempt == max_retries:
yield f"Max retries reached. Saving incomplete data.\n"
yield {"raw_toon": full_raw_text, "parsed_data": accumulated_data, "prompt_used": prompt_used, "fcot_trace": fcot_trace}
break
except Exception as e:
yield f"ERROR: {e}"
logger.error("Vertex Labeling Error", exc_info=True)
async def run_nrp_labeling_pipeline(video_path: str, caption: str, transcript: str, nrp_config: dict, include_comments: bool, reasoning_method: str = "cot", system_persona: str = "", request_id: str = None):
api_key = nrp_config.get("api_key")
model_name = nrp_config.get("model_name", "gpt-4")
base_url = nrp_config.get("base_url", "https://api.openai.com/v1").rstrip("/")
max_retries = int(nrp_config.get("max_retries", 1))
if not api_key:
yield "ERROR: NRP API Key missing.\n"
return
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
is_text_only = True
system_persona += "\n" + TEXT_ONLY_INSTRUCTIONS
if nrp_config.get("use_search", False):
system_persona += "\n\n**CRITICAL: AGENTIC TOOLS ENABLED**\n- You MUST use the Web Search tool to fact-check the claims, look up current events, or verify entity backgrounds before concluding."
if nrp_config.get("use_code", False):
system_persona += "\n- You MUST use the Code Execution tool for any necessary calculations, data processing, or statistical verifications."
toon_schema = SCHEMA_REASONING if include_comments else SCHEMA_SIMPLE
score_instructions = SCORE_INSTRUCTIONS_REASONING if include_comments else SCORE_INSTRUCTIONS_SIMPLE
tag_list_text = get_formatted_tag_list()
accumulated_data = {}
prompt_used = ""
fcot_trace = {}
full_raw_text = ""
loop = asyncio.get_event_loop()
async def _call_nrp(messages, attempt_label=""):
payload = {
"model": model_name,
"messages": messages,
"temperature": 0.1
}
logger.info(f"[{request_id}] NRP API Call ({attempt_label}) - URL: {base_url}/chat/completions")
logger.info(f"[{request_id}] NRP API Call - Model: {model_name}")
logger.info(f"[{request_id}] NRP API Call - Messages count: {len(messages)}")
def do_request():
start_time = datetime.datetime.now()
logger.info(f"[{request_id}] Dispatching requests.post (timeout=600s)...")
resp = requests.post(f"{base_url}/chat/completions", headers=headers, json=payload, timeout=600)
elapsed = (datetime.datetime.now() - start_time).total_seconds()
logger.info(f"[{request_id}] NRP API Response received in {elapsed:.2f}s. Status Code: {resp.status_code}")
if resp.status_code != 200:
logger.error(f"[{request_id}] API Error {resp.status_code}: {resp.text}")
raise Exception(f"API Error {resp.status_code}: {resp.text}")
resp_json = resp.json()
usage = resp_json.get("usage", {})
logger.info(f"[{request_id}] NRP API Usage: {usage}")
return resp_json["choices"][0]["message"]["content"]
return await loop.run_in_executor(None, do_request)
try:
for attempt in range(max_retries + 1):
raw_text = ""
if attempt > 0:
missing = validate_parsed_data(accumulated_data, is_text_only)
yield f"Validation failed. Missing fields: {missing}. Initiating Reprompt (Attempt {attempt}/{max_retries})...\n"
prompt_text = (
f"SYSTEM: Review the previous attempt which failed validation.\n"
f"CONTEXT: Caption: \"{caption}\"\nTranscript: \"{transcript}\"\n"
f"PREVIOUS (PARTIAL) DATA: {json.dumps(accumulated_data, indent=2)}\n"
f"MISSING FIELDS: {missing}\n"
f"INSTRUCTION: Generate the missing fields to complete the schema. You MUST provide the missing scores for {missing}.\n"
f"Output the FULL VALID TOON OBJECT containing all required fields.\n"
f"{toon_schema}"
)
save_debug_log(request_id, 'prompt', prompt_text, attempt, 'reprompt')
yield f" - Sending Reprompt request to NRP API (Model: {model_name}, Timeout: 600s)...\n"
raw_text = await _call_nrp([
{"role": "system", "content": system_persona},
{"role": "user", "content": prompt_text}
], attempt_label=f"reprompt_{attempt}")
yield f" - Received Reprompt response from NRP API.\n\n"
save_debug_log(request_id, 'response', raw_text, attempt, 'reprompt')
else:
if reasoning_method == "fcot":
yield "Starting Fractal Chain of Thought (NRP FCoT)...\n"
macro_prompt = FCOT_MACRO_PROMPT.format(system_persona=system_persona, caption=caption, transcript=transcript)
macro_prompt = "NOTE: Text Only Analysis.\n" + macro_prompt
save_debug_log(request_id, 'prompt', macro_prompt, attempt, 'fcot_macro')
macro_messages =[{"role": "system", "content": system_persona}, {"role": "user", "content": macro_prompt}]
yield f" - Stage 1: Sending Macro Hypothesis request to NRP API (Timeout: 600s)...\n"
macro_hypothesis = await _call_nrp(macro_messages, attempt_label="fcot_macro")
yield f" - Stage 1: Received Macro Hypothesis response.\n"
save_debug_log(request_id, 'response', macro_hypothesis, attempt, 'fcot_macro')
fcot_trace['macro'] = macro_hypothesis
meso_prompt = FCOT_MESO_PROMPT.format(macro_hypothesis=macro_hypothesis)
save_debug_log(request_id, 'prompt', meso_prompt, attempt, 'fcot_meso')
meso_messages = macro_messages +[{"role": "assistant", "content": macro_hypothesis}, {"role": "user", "content": meso_prompt}]
yield f" - Stage 2: Sending Meso Analysis request to NRP API (Timeout: 600s)...\n"
micro_observations = await _call_nrp(meso_messages, attempt_label="fcot_meso")
yield f" - Stage 2: Received Meso Analysis response.\n"
save_debug_log(request_id, 'response', micro_observations, attempt, 'fcot_meso')
fcot_trace['meso'] = micro_observations
synthesis_prompt = FCOT_SYNTHESIS_PROMPT.format(toon_schema=toon_schema, score_instructions=score_instructions, tag_list_text=tag_list_text)
save_debug_log(request_id, 'prompt', synthesis_prompt, attempt, 'fcot_synthesis')
synthesis_messages = meso_messages +[{"role": "assistant", "content": micro_observations}, {"role": "user", "content": synthesis_prompt}]
yield f" - Stage 3: Sending Synthesis/Formatting request to NRP API (Timeout: 600s)...\n"
raw_text = await _call_nrp(synthesis_messages, attempt_label="fcot_synthesis")
yield f" - Stage 3: Received Synthesis response.\n\n"
save_debug_log(request_id, 'response', raw_text, attempt, 'fcot_synthesis')
prompt_used = f"FCoT (NRP):\nMacro: {macro_hypothesis}\nMeso: {micro_observations}"
else:
template = LABELING_PROMPT_TEMPLATE_NO_COT if reasoning_method == "none" else LABELING_PROMPT_TEMPLATE
prompt_text = template.format(
system_persona=system_persona, caption=caption, transcript=transcript,
toon_schema=toon_schema, score_instructions=score_instructions, tag_list_text=tag_list_text
)
prompt_text = f"NOTE: Text Only Analysis (No Video).\n{prompt_text}"
prompt_used = prompt_text
save_debug_log(request_id, 'prompt', prompt_text, attempt, f'standard_{reasoning_method}')
yield f"Generating Labels (NRP {reasoning_method.upper()})...\n"
yield f" - Sending Standard request to NRP API (Model: {model_name}, Timeout: 600s)...\n"
raw_text = await _call_nrp([
{"role": "system", "content": system_persona},
{"role": "user", "content": prompt_text}
], attempt_label=f"standard_{reasoning_method}")
yield f" - Received response from NRP API.\n\n"
save_debug_log(request_id, 'response', raw_text, attempt, f'standard_{reasoning_method}')
if raw_text:
full_raw_text += f"\n--- Attempt {attempt} ---\n{raw_text}\n"
parsed_step = parse_veracity_toon(raw_text)
json_data = extract_json_from_text(raw_text)
if json_data:
for k in['veracity_vectors', 'modalities', 'video_context_summary', 'final_assessment', 'factuality_factors', 'disinformation_analysis', 'tags']:
if k in json_data:
if isinstance(parsed_step.get(k), dict) and isinstance(json_data[k], dict):
parsed_step[k].update(json_data[k])
else:
parsed_step[k] = json_data[k]
accumulated_data = smart_merge(accumulated_data, parsed_step)
missing_fields = validate_parsed_data(accumulated_data, is_text_only)
if not missing_fields:
yield f"Validation Passed. All factuality components processed and confidence scores obtained. (Method: {reasoning_method})\n"
yield {"raw_toon": full_raw_text, "parsed_data": accumulated_data, "prompt_used": prompt_used, "fcot_trace": fcot_trace}
break
if attempt == max_retries:
yield f"Max retries reached. Saving incomplete data.\n"
yield {"raw_toon": full_raw_text, "parsed_data": accumulated_data, "prompt_used": prompt_used, "fcot_trace": fcot_trace}
break
except Exception as e:
yield f"ERROR: {e}"
logger.error("NRP Labeling Error", exc_info=True)
|