Media_Optimization / qwen3_omni_asyncllm_modal_client.py
Daniel-IADAR's picture
Upload folder using huggingface_hub
3ec62b4 verified
"""
Qwen3 Omni helpers for video-analysis workflows.
Pipeline:
1. Upload a local video to a Modal volume (shared with the server).
2. Call Qwen3 Omni's `prepare_video(video_path)` once (cold start + preprocessing).
3. Fire multiple `generate(video_path=..., messages=...)` requests concurrently.
4. Measure:
- upload time
- cold-start + prepare_video time
- processing time (all prompts)
- total wall time
"""
import re
import json
import modal
import asyncio
import logging
import time
import os
from pathlib import Path
from dotenv import load_dotenv
from json_repair import repair_json
from typing import List, Optional, Dict, Any, Union, Tuple
# --------------------------------------------------------------------------- #
# Environment & Logger
# --------------------------------------------------------------------------- #
os.environ["MODAL_ENVIRONMENT"] = "dev"
load_dotenv(
Path.home() / ".environment_variables" / Path(__file__).parent.name / ".env"
)
logger = logging.getLogger("media-analysis-qwen3-omni")
if not logger.handlers:
_handler = logging.StreamHandler()
_formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
_handler.setFormatter(_formatter)
logger.addHandler(_handler)
logger.setLevel(logging.INFO)
_BAD_JSON_CHARS_RE = re.compile(r"[\x00-\x08\x0b-\x0c\x0e-\x1f]")
def _safe_json_loads(raw: str):
"""Robustly parse *raw* text into JSON."""
try:
return json.loads(raw)
except json.JSONDecodeError as first_err:
cleaned = _BAD_JSON_CHARS_RE.sub("", raw)
if cleaned != raw:
try:
return json.loads(cleaned)
except json.JSONDecodeError:
pass
try:
repaired = repair_json(raw)
return json.loads(repaired)
except Exception:
raise first_err
# --------------------------------------------------------------------------- #
# Modal remote class + video volume
# --------------------------------------------------------------------------- #
qwen3_omni_cls = modal.Cls.from_name(
"Qwen3-Omni-30B-A3B-Thinking-Media-Analysis", # app name
"Qwen3Omni30BA3BThinking", # class name
)
qwen3_omni = qwen3_omni_cls()
# Volume must be mounted in the server app at /root/persistent
video_volume = modal.Volume.from_name("qwen3-omni-media-analysis-files", create_if_missing=True)
def upload_media_to_volume(
local_path: Union[str, Path], remote_subdir: str
) -> str:
"""Upload a local video into the shared Modal volume.
Returns the container-visible path, e.g. /root/persistent/videos/foo.mp4
(we use this path directly as the key for server-side caching).
"""
local_path = Path(local_path)
if not local_path.exists():
raise FileNotFoundError(f"Media file not found: {local_path}")
volume_path = f"{remote_subdir}/{local_path.name}"
container_path = f"/root/persistent/{volume_path}"
# the client uploads the file to the volume_path, but the server sees the file at the container_path!
logger.info("Uploading %s → %s", local_path, container_path)
# Single batch_upload; no commit() needed client-side.
with video_volume.batch_upload(force=True) as batch:
batch.put_file(str(local_path), volume_path)
logger.info("Upload complete")
return container_path
# --------------------------------------------------------------------------- #
# Core Qwen3 Omni call (single request)
# --------------------------------------------------------------------------- #
async def qwen3_omni_request(
prompt: str,
media_path: Optional[str],
system_message: str = (
"You are a helpful AI assistant for multimedia(image, video and audio) analysis and social media optimization."
),
temperature: float = 0.7,
top_p: float = 0.8,
top_k: int = 20,
max_tokens: int = 2048,
timeout: Optional[int] = 600,
) -> Tuple[Union[Dict[Any, Any], List[Any], None], Optional[str]]:
"""Call Qwen3 Omni once and parse JSON.
Returns (parsed_json, raw_text). parsed_json can be None if parsing fails.
"""
messages = [
{"role": "system", "content": system_message},
{"role": "user", "content": prompt},
]
raw_text: str = await asyncio.wait_for(
qwen3_omni.generate.remote.aio(
media_path=media_path,
messages=messages,
temperature=temperature,
top_p=top_p,
top_k=top_k,
max_tokens=max_tokens,
),
timeout=timeout,
)
cleaned = raw_text.strip()
# Strip ```json ... ``` or ``` ... ``` fences if present
if cleaned.startswith("```json"):
cleaned = cleaned.removeprefix("```json").strip()
elif cleaned.startswith("```"):
cleaned = cleaned.removeprefix("```").strip()
if cleaned.endswith("```"):
cleaned = cleaned.removesuffix("```").strip()
try:
parsed = await asyncio.to_thread(_safe_json_loads, cleaned)
except Exception as e:
logger.warning("JSON parse failed for response: %r", e)
parsed = None
return parsed, raw_text
# --------------------------------------------------------------------------- #
# Video-analysis prompts (your exact ones)
# --------------------------------------------------------------------------- #
MEDIA_SUMMARY_PROMPT = lambda media_type: f"""
Provide a detailed, thorough summary of this {media_type}.
If this {media_type} is posted on social media, will it be engaging or not? Why?
""".strip()
MEDIA_ENGAGEMENT_PROMPT = lambda media_type: f"""
This {media_type} is intended as promotional content.
Is it engaging to the viewers for its purpose? Why?
How can we improve it to make it more engaging?
""".strip()
def insights_for_platform(platform: str, media_type: str) -> str:
return f"""
I am an influencer on {platform} and have created a {media_type} to be posted on {platform}.
I would like to understand how it will perform on the {platform} platform in terms of social media engagement.
Please provide the analysis step-by-step:
1. Identify elements that might contribute to popularity (views, likes, comments, shares) on {platform}.
2. Suggest improvements to enhance engagement on {platform}.
3. Give an overall grade: Good, Medium, or Bad.
Return strictly in this JSON format:
{{
"analysis": "your full analysis here",
"grade": "Good|Medium|Bad"
}}
""".strip()
MEDIA_MOOD_PROMPT = lambda media_type: f"""
Analyze the {media_type} and return the 5 most prominent moods with scores (1 - 100).
Higher score = more prominent.
Return exactly:
{{
"moods_scores_dict": {{"Joy": 95, "Calm": 80, ...}},
"moods_list": ["Joy", "Calm", ...]
}}
""".strip()
MEDIA_EMOTION_PROMPT = lambda media_type: f"""
Analyze faces in the {media_type} and score these 8 emotions (0 - 100):
Anger, Contempt, Disgust, Fear, Happiness, Sadness, Surprise, Neutral.
If no people → all scores 0.
Return exactly:
{{
"emotion_scores_dict": {{"Anger": 5, "Happiness": 92, ...}},
"emotion_rationales_dict": {{"Anger": "No raised eyebrows or clenched jaw...", ...}}
}}
""".strip()
MEDIA_SENTIMENT_PROMPT = lambda media_type: f"""
Choose one overall audience sentiment after viewing the {media_type}: Positive, Neutral, or Negative.
Return exactly:
{{
"sentiment": "Positive|Neutral|Negative",
"rationale": "Explanation here"
}}
""".strip()
MEDIA_MEMORABILITY_PROMPT = lambda media_type: f"""
Rate how memorable this {media_type} is: High, Moderate, or Low.
Return exactly:
{{
"memorability": "High|Moderate|Low",
"rationale": "Explanation here"
}}
""".strip()
MEDIA_QUALITY_PROMPT = lambda media_type: f"""
Rate {media_type} quality (clarity, lighting, aesthetics): High, Moderate, or Low.
Return exactly:
{{
"quality": "High|Moderate|Low",
"rationale": "Explanation here"
}}
""".strip()
MEDIA_VIRAL_POTENTIAL_PROMPT = lambda media_type: f"""
Evaluate viral potential on social media (High/Medium/Low) and give a score 0 - 100.
Return exactly:
{{
"viral_potential": "High|Moderate|Low",
"viral_potential_score": 87,
"rationale": "Explanation here"
}}
""".strip()
VIDEO_HIGHLIGHTS_PROMPT = """
Summarize the video in 2 - 3 sentences and pick up to 3 highlight clips (3 - 7s each).
For each highlight:
- timestamps as ["MM:SS.sss", "MM:SS.sss"]
- reason, description, score (0 - 100)
Return exactly:
{
"video_summary": "text",
"highlights": [
{"timestamps": ["00:00.000", "00:05.000"], "reason": "...", "clip_summary": "...", "clip_score": 94},
...
]
}
""".strip()
ENGAGING_CLIPS_PROMPT = """
Find 1–10s engaging clips that make viewers want to keep watching.
If none → empty list.
Use the same JSON format as the previous highlights prompt:
{
"video_summary": "text",
"highlights": [...]
}
""".strip()
def get_prompts(media_type: str) -> List[Tuple[str, str]]:
"""
Dynamically build prompts list based on media_type.
media_type ∈ {"video", "image"}
"""
base_prompts = [
("Summary", MEDIA_SUMMARY_PROMPT(media_type)),
("Engagement", MEDIA_ENGAGEMENT_PROMPT(media_type)),
("TikTok Insights", insights_for_platform("TikTok", media_type)),
("Instagram Insights", insights_for_platform("Instagram", media_type)),
("Mood", MEDIA_MOOD_PROMPT(media_type)),
("Emotion", MEDIA_EMOTION_PROMPT(media_type)),
("Sentiment", MEDIA_SENTIMENT_PROMPT(media_type)),
("Memorability", MEDIA_MEMORABILITY_PROMPT(media_type)),
("Quality", MEDIA_QUALITY_PROMPT(media_type)),
("Viral Potential", MEDIA_VIRAL_POTENTIAL_PROMPT(media_type)),
]
# Video-only extensions
if media_type == "video":
base_prompts.extend([
("Highlights", VIDEO_HIGHLIGHTS_PROMPT),
("Engaging Clips", ENGAGING_CLIPS_PROMPT),
])
return base_prompts
# --------------------------------------------------------------------------- #
# Concurrency helper
# --------------------------------------------------------------------------- #
async def _run_prompt_with_semaphore(
name: str,
prompt: str,
media_path: Optional[str],
semaphore: asyncio.Semaphore,
**kwargs: Any,
) -> Tuple[str, Union[Dict[str, Any], List[Any], None]]:
"""Run a single prompt under a semaphore and return (name, parsed_json)."""
async with semaphore:
parsed, _ = await qwen3_omni_request(
prompt=prompt,
media_path=media_path,
**kwargs,
)
return name, parsed
# --------------------------------------------------------------------------- #
# End-to-end: local video/image → volume → prepare_video → all prompts
# --------------------------------------------------------------------------- #
async def full_media_analysis_for_local_file(
local_media_path: Union[str, Path],
concurrency_limit: int = 8,
max_tokens: int = 2048,
) -> None:
# (renamed but same line count)
local_media_path = Path(local_media_path)
t_total_start = time.time()
# 1) Upload (renamed)
t_upload_start = time.time()
media_ext = local_media_path.suffix.lower()
media_type = None
if media_ext in [".mp4", ".mov", ".avi", ".mkv"]:
media_type = "video"
elif media_ext in [".jpg", ".jpeg", ".png", ".webp"]:
media_type = "image"
else:
raise ValueError(f"Unsupported media type: {media_ext}")
remote_media_path = upload_media_to_volume(local_media_path, remote_subdir=media_type+"s")
t_upload = time.time() - t_upload_start
# 2) Cold start + preprocessing (conditional, but no removal)
t_prepare_start = time.time()
if media_type == "video":
await qwen3_omni.prepare_video.remote.aio(remote_media_path)
elif media_type == "image":
await qwen3_omni.prepare_image.remote.aio(remote_media_path)
else:
raise ValueError(f"Unsupported media type: {media_ext}")
t_prepare = time.time() - t_prepare_start
# 3) Real processing: all prompts concurrently
semaphore = asyncio.Semaphore(concurrency_limit)
# ★ NEW: unified prompt builder replaces PROMPTS
prompts = get_prompts(media_type)
t_proc_start = time.time()
tasks = [
_run_prompt_with_semaphore(
name=name,
prompt=prompt,
media_path=remote_media_path,
semaphore=semaphore,
max_tokens=max_tokens,
)
for name, prompt in prompts
]
results = await asyncio.gather(*tasks)
t_processing = time.time() - t_proc_start
t_total = time.time() - t_total_start
# 4) Report timings (kept identical)
print("\n" + "=" * 70)
print("QWEN3-OMNI MEDIA ANALYSIS — TIMING REPORT") # renamed label only
print("=" * 70)
print(f"Upload time : {t_upload:.2f} s")
print(f"Cold start + media preprocessing : {t_prepare:.2f} s") # renamed label only
print(f"Processing (all prompts, hot) : {t_processing:.2f} s")
print(f"TOTAL end-to-end : {t_total:.2f} s")
print(f"Avg per prompt (processing only) : {t_processing / len(prompts):.2f} s")
print("=" * 70)
# 5) Print short snippets (unchanged)
for name, parsed in results:
print("\n" + "-" * 40)
print(f"[{name.upper()}]")
if parsed is None:
print("No parsed JSON result.")
continue
snippet = json.dumps(parsed, indent=2, ensure_ascii=False)
print(snippet[:1000] + ("..." if len(snippet) > 1000 else ""))
if __name__ == "__main__":
# Example usage
local_video_path = (
"/home/ubuntu/Yuxiang/tensorflow-radintel-ai/micro_apps/media_optimization/sample_media/video_id_652.mp4"
)
local_image_path = (
"/home/ubuntu/Yuxiang/tensorflow-radintel-ai/micro_apps/media_optimization/sample_media/sample_post_1.jpg"
)
asyncio.run(full_media_analysis_for_local_file(local_image_path))