|
|
import torch |
|
|
import re |
|
|
import ast |
|
|
import sys |
|
|
import os |
|
|
import logging |
|
|
import asyncio |
|
|
import json |
|
|
from transformers import Qwen3VLForConditionalGeneration, AutoProcessor |
|
|
from peft import PeftModel |
|
|
from my_vision_process import process_vision_info, client |
|
|
from labeling_logic import ( |
|
|
LABELING_PROMPT_TEMPLATE, SCORE_INSTRUCTIONS_SIMPLE, SCORE_INSTRUCTIONS_REASONING, |
|
|
SCHEMA_SIMPLE, SCHEMA_REASONING, |
|
|
FCOT_MACRO_PROMPT, FCOT_MESO_PROMPT, FCOT_SYNTHESIS_PROMPT |
|
|
) |
|
|
from toon_parser import parse_veracity_toon |
|
|
from agents import get_video_analysis_system |
|
|
|
|
|
|
|
|
try: |
|
|
import google.generativeai as genai_legacy |
|
|
from google.generativeai.types import generation_types |
|
|
except ImportError: |
|
|
genai_legacy = None |
|
|
|
|
|
try: |
|
|
|
|
|
from google import genai |
|
|
from google.genai.types import ( |
|
|
GenerateContentConfig, |
|
|
HttpOptions, |
|
|
Retrieval, |
|
|
Tool, |
|
|
VertexAISearch, |
|
|
GoogleSearch, |
|
|
Part |
|
|
) |
|
|
import vertexai |
|
|
except ImportError: |
|
|
genai = None |
|
|
vertexai = None |
|
|
|
|
|
LITE_MODE = os.getenv("LITE_MODE", "false").lower() == "true" |
|
|
processor = None |
|
|
base_model = None |
|
|
peft_model = None |
|
|
active_model = None |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
def load_models(): |
|
|
|
|
|
global LITE_MODE, processor, base_model, peft_model, active_model |
|
|
|
|
|
if LITE_MODE: |
|
|
logger.info("LITE_MODE is enabled. Skipping local model loading.") |
|
|
return |
|
|
|
|
|
if base_model is not None: return |
|
|
|
|
|
if not torch.cuda.is_available(): |
|
|
|
|
|
|
|
|
logger.warning("CUDA is not available. This application requires a GPU for local models. Switching to LITE_MODE.") |
|
|
LITE_MODE = True |
|
|
return |
|
|
|
|
|
device = torch.device("cuda") |
|
|
logger.info(f"CUDA is available. Initializing models on {device}...") |
|
|
local_model_path = "/app/local_model" |
|
|
|
|
|
try: |
|
|
import flash_attn |
|
|
attn_implementation = "flash_attention_2" |
|
|
except ImportError: |
|
|
attn_implementation = "sdpa" |
|
|
|
|
|
logger.info(f"Loading base model from {local_model_path}...") |
|
|
try: |
|
|
base_model = Qwen3VLForConditionalGeneration.from_pretrained( |
|
|
local_model_path, dtype=torch.bfloat16, device_map="auto", attn_implementation=attn_implementation |
|
|
).eval() |
|
|
processor = AutoProcessor.from_pretrained(local_model_path) |
|
|
active_model = base_model |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to load local model: {e}") |
|
|
LITE_MODE = True |
|
|
|
|
|
def switch_active_model(model_name: str): |
|
|
global active_model, base_model, peft_model |
|
|
if model_name == "custom" and peft_model is not None: |
|
|
active_model = peft_model |
|
|
else: |
|
|
active_model = base_model |
|
|
|
|
|
def inference_step(video_path, prompt, generation_kwargs, sampling_fps, pred_glue=None): |
|
|
global processor, active_model |
|
|
if active_model is None: raise RuntimeError("Models not loaded.") |
|
|
|
|
|
messages = [ |
|
|
{"role": "user", "content": [ |
|
|
{"type": "video", "video": video_path, 'key_time': pred_glue, 'fps': sampling_fps, |
|
|
"total_pixels": 128*12 * 28 * 28, "min_pixels": 128 * 28 * 28}, |
|
|
{"type": "text", "text": prompt}, |
|
|
] |
|
|
}, |
|
|
] |
|
|
text = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) |
|
|
image_inputs, video_inputs, video_kwargs = process_vision_info(messages, return_video_kwargs=True, client=client) |
|
|
fps_inputs = video_kwargs['fps'][0] |
|
|
inputs = processor(text=[text], images=image_inputs, videos=video_inputs, fps=fps_inputs, padding=True, return_tensors="pt") |
|
|
inputs = {k: v.to(active_model.device) for k, v in inputs.items()} |
|
|
|
|
|
with torch.no_grad(): |
|
|
output_ids = active_model.generate(**inputs, **generation_kwargs, use_cache=True) |
|
|
|
|
|
generated_ids = [output_ids[i][len(inputs['input_ids'][i]):] for i in range(len(output_ids))] |
|
|
output_text = processor.batch_decode(generated_ids, skip_special_tokens=True) |
|
|
return output_text[0] |
|
|
|
|
|
async def run_inference_pipeline(video_path, question, generation_config, prompts): |
|
|
num_perceptions = int(generation_config.pop("num_perceptions", 3)) |
|
|
sampling_fps = float(generation_config.pop("sampling_fps", 2.0)) |
|
|
if generation_config.get("temperature", 0.0) > 0.0: generation_config["do_sample"] = True |
|
|
pred_glue = None |
|
|
final_answer = "No answer." |
|
|
|
|
|
for percption in range(num_perceptions): |
|
|
yield f" Perception {percption + 1}/{num_perceptions} \n" |
|
|
current_prompt = prompts["glue"].replace("[QUESTION]", question) if percption < num_perceptions - 1 else prompts["final"].replace("[QUESTION]", question) |
|
|
ans = inference_step(video_path, current_prompt, generation_kwargs=generation_config, pred_glue=pred_glue, sampling_fps=sampling_fps) |
|
|
yield f"Model Output: {ans}\n" |
|
|
final_answer = ans |
|
|
match_glue = re.search(r'<glue>(.*?)</glue>', ans, re.DOTALL) |
|
|
if match_glue: |
|
|
pred_glue = ast.literal_eval(match_glue.group(1).strip()) |
|
|
else: |
|
|
pred_glue = None |
|
|
yield f"\n Final Answer \n{final_answer}\n" |
|
|
|
|
|
async def attempt_toon_repair(original_text: str, schema: str, client, model_type: str, config: dict): |
|
|
logger.info("Attempting TOON Repair via separate AI call...") |
|
|
repair_prompt = ( |
|
|
f"SYSTEM: You are a data formatting expert. The following output from a previous model " |
|
|
f"failed to parse correctly. \n" |
|
|
f"YOUR TASK: Rewrite the data strictly into the following TOON schema. Do not add new content, " |
|
|
f"just format it. If scores are missing, infer them from the text or default to 0.\n\n" |
|
|
f"TARGET SCHEMA:\n{schema}\n\n" |
|
|
f"BAD OUTPUT:\n{original_text}\n" |
|
|
) |
|
|
|
|
|
try: |
|
|
loop = asyncio.get_event_loop() |
|
|
repaired_text = "" |
|
|
|
|
|
if model_type == 'gemini': |
|
|
model = genai_legacy.GenerativeModel("models/gemini-2.0-flash-exp") |
|
|
response = await loop.run_in_executor( |
|
|
None, |
|
|
lambda: model.generate_content(repair_prompt, generation_config={"temperature": 0.0}) |
|
|
) |
|
|
repaired_text = response.text |
|
|
|
|
|
elif model_type == 'vertex': |
|
|
|
|
|
cl = client if client else genai.Client(vertexai=True, project=config['project_id'], location=config['location']) |
|
|
response = await loop.run_in_executor( |
|
|
None, |
|
|
lambda: cl.models.generate_content( |
|
|
model=config['model_name'], |
|
|
contents=repair_prompt, |
|
|
config=GenerateContentConfig(temperature=0.0) |
|
|
) |
|
|
) |
|
|
repaired_text = response.text |
|
|
|
|
|
logger.info(f"Repair successful. New text length: {len(repaired_text)}") |
|
|
return repaired_text |
|
|
except Exception as e: |
|
|
logger.error(f"Repair failed: {e}") |
|
|
return original_text |
|
|
|
|
|
async def run_gemini_labeling_pipeline(video_path: str, caption: str, transcript: str, gemini_config: dict, include_comments: bool, reasoning_method: str = "cot"): |
|
|
if genai_legacy is None: |
|
|
yield "ERROR: Legacy SDK missing.\n" |
|
|
return |
|
|
|
|
|
api_key = gemini_config.get("api_key") |
|
|
if not api_key: return |
|
|
|
|
|
try: |
|
|
genai_legacy.configure(api_key=api_key) |
|
|
loop = asyncio.get_event_loop() |
|
|
uploaded_file = await loop.run_in_executor(None, lambda: genai_legacy.upload_file(path=video_path)) |
|
|
while uploaded_file.state.name == "PROCESSING": await asyncio.sleep(2) |
|
|
|
|
|
model = genai_legacy.GenerativeModel("models/gemini-2.0-flash-exp") |
|
|
toon_schema = SCHEMA_REASONING if include_comments else SCHEMA_SIMPLE |
|
|
score_instructions = SCORE_INSTRUCTIONS_REASONING if include_comments else SCORE_INSTRUCTIONS_SIMPLE |
|
|
|
|
|
raw_text = "" |
|
|
prompt_used = "" |
|
|
|
|
|
if reasoning_method == "fcot": |
|
|
|
|
|
pass |
|
|
elif reasoning_method == "agentic": |
|
|
yield "Starting Multi-Agent Analysis..." |
|
|
|
|
|
model_name = gemini_config.get("model_name", "models/gemini-2.1-flash-lite") |
|
|
system = get_video_analysis_system(api_key=api_key, model_name=model_name) |
|
|
|
|
|
results = await loop.run_in_executor(None, lambda: system.run( |
|
|
video_path=video_path, |
|
|
caption=caption, |
|
|
transcript=transcript |
|
|
)) |
|
|
|
|
|
raw_text = "\n\n".join([str(res) for res in results]) |
|
|
prompt_used = "Multi-Agent System Execution" |
|
|
else: |
|
|
|
|
|
prompt_text = LABELING_PROMPT_TEMPLATE.format( |
|
|
caption=caption, |
|
|
transcript=transcript, |
|
|
toon_schema=toon_schema, |
|
|
score_instructions=score_instructions |
|
|
) |
|
|
prompt_used = prompt_text |
|
|
yield "Generating Labels (Standard CoT)..." |
|
|
response = await loop.run_in_executor(None, lambda: model.generate_content([prompt_text, uploaded_file], generation_config={"temperature": 0.1})) |
|
|
raw_text = response.text |
|
|
|
|
|
if not raw_text: |
|
|
yield "Model returned empty response (possibly triggered safety filter)." |
|
|
yield {"error": "Empty Response"} |
|
|
return |
|
|
|
|
|
parsed_data = parse_veracity_toon(raw_text) |
|
|
|
|
|
is_zero = parsed_data['veracity_vectors']['visual_integrity_score'] == '0' |
|
|
if is_zero: |
|
|
yield "Parsing incomplete (score 0). Initiating Auto-Repair..." |
|
|
raw_text = await attempt_toon_repair(raw_text, toon_schema, None, 'gemini', gemini_config) |
|
|
parsed_data = parse_veracity_toon(raw_text) |
|
|
|
|
|
yield {"raw_toon": raw_text, "parsed_data": parsed_data, "prompt_used": prompt_used} |
|
|
await loop.run_in_executor(None, lambda: genai_legacy.delete_file(name=uploaded_file.name)) |
|
|
|
|
|
except Exception as e: |
|
|
yield f"ERROR: {e}" |
|
|
|
|
|
async def run_vertex_labeling_pipeline(video_path: str, caption: str, transcript: str, vertex_config: dict, include_comments: bool, reasoning_method: str = "cot"): |
|
|
if genai is None: |
|
|
yield "ERROR: 'google-genai' not installed.\n" |
|
|
return |
|
|
|
|
|
project_id = vertex_config.get("project_id") |
|
|
location = vertex_config.get("location", "us-central1") |
|
|
model_name = vertex_config.get("model_name", "gemini-1.5-pro-preview-0409") |
|
|
|
|
|
if not project_id: return |
|
|
|
|
|
try: |
|
|
client = genai.Client(vertexai=True, project=project_id, location=location) |
|
|
with open(video_path, 'rb') as f: |
|
|
video_bytes = f.read() |
|
|
video_part = Part.from_bytes(data=video_bytes, mime_type="video/mp4") |
|
|
|
|
|
toon_schema = SCHEMA_REASONING if include_comments else SCHEMA_SIMPLE |
|
|
score_instructions = SCORE_INSTRUCTIONS_REASONING if include_comments else SCORE_INSTRUCTIONS_SIMPLE |
|
|
|
|
|
raw_text = "" |
|
|
prompt_used = "" |
|
|
|
|
|
loop = asyncio.get_event_loop() |
|
|
config = GenerateContentConfig( |
|
|
temperature=0.1, |
|
|
response_mime_type="text/plain", |
|
|
max_output_tokens=8192, |
|
|
tools=[Tool(google_search=GoogleSearch())] |
|
|
) |
|
|
|
|
|
if reasoning_method == "fcot": |
|
|
|
|
|
yield "Starting Fractal Chain of Thought (Vertex FCoT)..." |
|
|
chat = client.chats.create(model=model_name, config=config) |
|
|
|
|
|
|
|
|
yield "FCoT Step 1: Macro-Scale..." |
|
|
macro_prompt = FCOT_MACRO_PROMPT.format(caption=caption, transcript=transcript) |
|
|
|
|
|
res1 = await loop.run_in_executor(None, lambda: chat.send_message([video_part, macro_prompt])) |
|
|
macro_hypothesis = res1.text |
|
|
yield f"Hypothesis: {macro_hypothesis[:80]}...\n" |
|
|
|
|
|
|
|
|
yield "FCoT Step 2: Meso-Scale..." |
|
|
meso_prompt = FCOT_MESO_PROMPT.format(macro_hypothesis=macro_hypothesis) |
|
|
res2 = await loop.run_in_executor(None, lambda: chat.send_message(meso_prompt)) |
|
|
|
|
|
|
|
|
yield "FCoT Step 3: Synthesis..." |
|
|
synthesis_prompt = FCOT_SYNTHESIS_PROMPT.format(toon_schema=toon_schema, score_instructions=score_instructions) |
|
|
res3 = await loop.run_in_executor(None, lambda: chat.send_message(synthesis_prompt)) |
|
|
|
|
|
raw_text = res3.text |
|
|
prompt_used = f"FCoT (Vertex):\nMacro: {macro_prompt}\nMeso: {meso_prompt}\nSynthesis: {synthesis_prompt}" |
|
|
|
|
|
else: |
|
|
|
|
|
prompt_text = LABELING_PROMPT_TEMPLATE.format( |
|
|
caption=caption, |
|
|
transcript=transcript, |
|
|
toon_schema=toon_schema, |
|
|
score_instructions=score_instructions |
|
|
) |
|
|
prompt_used = prompt_text |
|
|
yield "Generating Labels (Vertex CoT)..." |
|
|
|
|
|
response = await loop.run_in_executor( |
|
|
None, |
|
|
lambda: client.models.generate_content( |
|
|
model=model_name, |
|
|
contents=[video_part, prompt_text], |
|
|
config=config |
|
|
) |
|
|
) |
|
|
raw_text = response.text |
|
|
|
|
|
if not raw_text: |
|
|
yield "Model returned empty response." |
|
|
yield {"error": "Empty Response"} |
|
|
return |
|
|
|
|
|
parsed_data = parse_veracity_toon(raw_text) |
|
|
|
|
|
is_zero = parsed_data['veracity_vectors']['visual_integrity_score'] == '0' |
|
|
if is_zero: |
|
|
yield "Parsing incomplete (score 0). Initiating Auto-Repair..." |
|
|
raw_text = await attempt_toon_repair(raw_text, toon_schema, client, 'vertex', vertex_config) |
|
|
parsed_data = parse_veracity_toon(raw_text) |
|
|
|
|
|
yield {"raw_toon": raw_text, "parsed_data": parsed_data, "prompt_used": prompt_used} |
|
|
|
|
|
except Exception as e: |
|
|
yield f"ERROR: {e}" |
|
|
logger.error("Vertex Labeling Error", exc_info=True) |
|
|
|
|
|
|
|
|
async def run_gemini_pipeline(video_path, question, checks, gemini_config, generation_config=None): |
|
|
if genai_legacy is None: return |
|
|
api_key = gemini_config.get("api_key") |
|
|
if not api_key: return |
|
|
genai_legacy.configure(api_key=api_key) |
|
|
loop = asyncio.get_event_loop() |
|
|
uploaded_file = await loop.run_in_executor(None, lambda: genai_legacy.upload_file(path=video_path)) |
|
|
while uploaded_file.state.name == "PROCESSING": await asyncio.sleep(2) |
|
|
model = genai_legacy.GenerativeModel(gemini_config.get("model_name", "models/gemini-1.5-pro-latest")) |
|
|
response = await loop.run_in_executor(None, lambda: model.generate_content([question, uploaded_file])) |
|
|
yield response.text |
|
|
await loop.run_in_executor(None, lambda: genai_legacy.delete_file(name=uploaded_file.name)) |
|
|
|
|
|
async def run_vertex_pipeline(video_path, question, checks, vertex_config, generation_config=None): |
|
|
if genai is None: return |
|
|
client = genai.Client(vertexai=True, project=vertex_config['project_id'], location=vertex_config['location']) |
|
|
with open(video_path, 'rb') as f: video_bytes = f.read() |
|
|
video_part = Part.from_bytes(data=video_bytes, mime_type="video/mp4") |
|
|
loop = asyncio.get_event_loop() |
|
|
response = await loop.run_in_executor( |
|
|
None, |
|
|
lambda: client.models.generate_content( |
|
|
model=vertex_config.get("model_name", "gemini-2.5-flash-lite"), |
|
|
contents=[video_part, question] |
|
|
) |
|
|
) |
|
|
yield response.text |
|
|
|