Spaces:
Sleeping
Sleeping
| """ | |
| Qwen3 Omni helpers for video-analysis workflows. | |
| Pipeline: | |
| 1. Upload a local video to a Modal volume (shared with the server). | |
| 2. Call Qwen3 Omni's `prepare_video(video_path)` once (cold start + preprocessing). | |
| 3. Fire multiple `generate(video_path=..., messages=...)` requests concurrently. | |
| 4. Measure: | |
| - upload time | |
| - cold-start + prepare_video time | |
| - processing time (all prompts) | |
| - total wall time | |
| """ | |
| import re | |
| import json | |
| import modal | |
| import asyncio | |
| import logging | |
| import time | |
| import os | |
| from pathlib import Path | |
| from dotenv import load_dotenv | |
| from json_repair import repair_json | |
| from typing import List, Optional, Dict, Any, Union, Tuple | |
| # --------------------------------------------------------------------------- # | |
| # Environment & Logger | |
| # --------------------------------------------------------------------------- # | |
| os.environ["MODAL_ENVIRONMENT"] = "dev" | |
| load_dotenv( | |
| Path.home() / ".environment_variables" / Path(__file__).parent.name / ".env" | |
| ) | |
| logger = logging.getLogger("media-analysis-qwen3-omni") | |
| if not logger.handlers: | |
| _handler = logging.StreamHandler() | |
| _formatter = logging.Formatter( | |
| "%(asctime)s - %(name)s - %(levelname)s - %(message)s" | |
| ) | |
| _handler.setFormatter(_formatter) | |
| logger.addHandler(_handler) | |
| logger.setLevel(logging.INFO) | |
| _BAD_JSON_CHARS_RE = re.compile(r"[\x00-\x08\x0b-\x0c\x0e-\x1f]") | |
| def _safe_json_loads(raw: str): | |
| """Robustly parse *raw* text into JSON.""" | |
| try: | |
| return json.loads(raw) | |
| except json.JSONDecodeError as first_err: | |
| cleaned = _BAD_JSON_CHARS_RE.sub("", raw) | |
| if cleaned != raw: | |
| try: | |
| return json.loads(cleaned) | |
| except json.JSONDecodeError: | |
| pass | |
| try: | |
| repaired = repair_json(raw) | |
| return json.loads(repaired) | |
| except Exception: | |
| raise first_err | |
| # --------------------------------------------------------------------------- # | |
| # Modal remote class + video volume | |
| # --------------------------------------------------------------------------- # | |
| qwen3_omni_cls = modal.Cls.from_name( | |
| "Qwen3-Omni-30B-A3B-Thinking-Media-Analysis", # app name | |
| "Qwen3Omni30BA3BThinking", # class name | |
| ) | |
| qwen3_omni = qwen3_omni_cls() | |
| # Volume must be mounted in the server app at /root/persistent | |
| video_volume = modal.Volume.from_name("qwen3-omni-media-analysis-files", create_if_missing=True) | |
| def upload_media_to_volume( | |
| local_path: Union[str, Path], remote_subdir: str | |
| ) -> str: | |
| """Upload a local video into the shared Modal volume. | |
| Returns the container-visible path, e.g. /root/persistent/videos/foo.mp4 | |
| (we use this path directly as the key for server-side caching). | |
| """ | |
| local_path = Path(local_path) | |
| if not local_path.exists(): | |
| raise FileNotFoundError(f"Media file not found: {local_path}") | |
| volume_path = f"{remote_subdir}/{local_path.name}" | |
| container_path = f"/root/persistent/{volume_path}" | |
| # the client uploads the file to the volume_path, but the server sees the file at the container_path! | |
| logger.info("Uploading %s → %s", local_path, container_path) | |
| # Single batch_upload; no commit() needed client-side. | |
| with video_volume.batch_upload(force=True) as batch: | |
| batch.put_file(str(local_path), volume_path) | |
| logger.info("Upload complete") | |
| return container_path | |
| # --------------------------------------------------------------------------- # | |
| # Core Qwen3 Omni call (single request) | |
| # --------------------------------------------------------------------------- # | |
| async def qwen3_omni_request( | |
| prompt: str, | |
| media_path: Optional[str], | |
| system_message: str = ( | |
| "You are a helpful AI assistant for multimedia(image, video and audio) analysis and social media optimization." | |
| ), | |
| temperature: float = 0.7, | |
| top_p: float = 0.8, | |
| top_k: int = 20, | |
| max_tokens: int = 2048, | |
| timeout: Optional[int] = 600, | |
| ) -> Tuple[Union[Dict[Any, Any], List[Any], None], Optional[str]]: | |
| """Call Qwen3 Omni once and parse JSON. | |
| Returns (parsed_json, raw_text). parsed_json can be None if parsing fails. | |
| """ | |
| messages = [ | |
| {"role": "system", "content": system_message}, | |
| {"role": "user", "content": prompt}, | |
| ] | |
| raw_text: str = await asyncio.wait_for( | |
| qwen3_omni.generate.remote.aio( | |
| media_path=media_path, | |
| messages=messages, | |
| temperature=temperature, | |
| top_p=top_p, | |
| top_k=top_k, | |
| max_tokens=max_tokens, | |
| ), | |
| timeout=timeout, | |
| ) | |
| cleaned = raw_text.strip() | |
| # Strip ```json ... ``` or ``` ... ``` fences if present | |
| if cleaned.startswith("```json"): | |
| cleaned = cleaned.removeprefix("```json").strip() | |
| elif cleaned.startswith("```"): | |
| cleaned = cleaned.removeprefix("```").strip() | |
| if cleaned.endswith("```"): | |
| cleaned = cleaned.removesuffix("```").strip() | |
| try: | |
| parsed = await asyncio.to_thread(_safe_json_loads, cleaned) | |
| except Exception as e: | |
| logger.warning("JSON parse failed for response: %r", e) | |
| parsed = None | |
| return parsed, raw_text | |
| # --------------------------------------------------------------------------- # | |
| # Video-analysis prompts (your exact ones) | |
| # --------------------------------------------------------------------------- # | |
| MEDIA_SUMMARY_PROMPT = lambda media_type: f""" | |
| Provide a detailed, thorough summary of this {media_type}. | |
| If this {media_type} is posted on social media, will it be engaging or not? Why? | |
| """.strip() | |
| MEDIA_ENGAGEMENT_PROMPT = lambda media_type: f""" | |
| This {media_type} is intended as promotional content. | |
| Is it engaging to the viewers for its purpose? Why? | |
| How can we improve it to make it more engaging? | |
| """.strip() | |
| def insights_for_platform(platform: str, media_type: str) -> str: | |
| return f""" | |
| I am an influencer on {platform} and have created a {media_type} to be posted on {platform}. | |
| I would like to understand how it will perform on the {platform} platform in terms of social media engagement. | |
| Please provide the analysis step-by-step: | |
| 1. Identify elements that might contribute to popularity (views, likes, comments, shares) on {platform}. | |
| 2. Suggest improvements to enhance engagement on {platform}. | |
| 3. Give an overall grade: Good, Medium, or Bad. | |
| Return strictly in this JSON format: | |
| {{ | |
| "analysis": "your full analysis here", | |
| "grade": "Good|Medium|Bad" | |
| }} | |
| """.strip() | |
| MEDIA_MOOD_PROMPT = lambda media_type: f""" | |
| Analyze the {media_type} and return the 5 most prominent moods with scores (1 - 100). | |
| Higher score = more prominent. | |
| Return exactly: | |
| {{ | |
| "moods_scores_dict": {{"Joy": 95, "Calm": 80, ...}}, | |
| "moods_list": ["Joy", "Calm", ...] | |
| }} | |
| """.strip() | |
| MEDIA_EMOTION_PROMPT = lambda media_type: f""" | |
| Analyze faces in the {media_type} and score these 8 emotions (0 - 100): | |
| Anger, Contempt, Disgust, Fear, Happiness, Sadness, Surprise, Neutral. | |
| If no people → all scores 0. | |
| Return exactly: | |
| {{ | |
| "emotion_scores_dict": {{"Anger": 5, "Happiness": 92, ...}}, | |
| "emotion_rationales_dict": {{"Anger": "No raised eyebrows or clenched jaw...", ...}} | |
| }} | |
| """.strip() | |
| MEDIA_SENTIMENT_PROMPT = lambda media_type: f""" | |
| Choose one overall audience sentiment after viewing the {media_type}: Positive, Neutral, or Negative. | |
| Return exactly: | |
| {{ | |
| "sentiment": "Positive|Neutral|Negative", | |
| "rationale": "Explanation here" | |
| }} | |
| """.strip() | |
| MEDIA_MEMORABILITY_PROMPT = lambda media_type: f""" | |
| Rate how memorable this {media_type} is: High, Moderate, or Low. | |
| Return exactly: | |
| {{ | |
| "memorability": "High|Moderate|Low", | |
| "rationale": "Explanation here" | |
| }} | |
| """.strip() | |
| MEDIA_QUALITY_PROMPT = lambda media_type: f""" | |
| Rate {media_type} quality (clarity, lighting, aesthetics): High, Moderate, or Low. | |
| Return exactly: | |
| {{ | |
| "quality": "High|Moderate|Low", | |
| "rationale": "Explanation here" | |
| }} | |
| """.strip() | |
| MEDIA_VIRAL_POTENTIAL_PROMPT = lambda media_type: f""" | |
| Evaluate viral potential on social media (High/Medium/Low) and give a score 0 - 100. | |
| Return exactly: | |
| {{ | |
| "viral_potential": "High|Moderate|Low", | |
| "viral_potential_score": 87, | |
| "rationale": "Explanation here" | |
| }} | |
| """.strip() | |
| VIDEO_HIGHLIGHTS_PROMPT = """ | |
| Summarize the video in 2 - 3 sentences and pick up to 3 highlight clips (3 - 7s each). | |
| For each highlight: | |
| - timestamps as ["MM:SS.sss", "MM:SS.sss"] | |
| - reason, description, score (0 - 100) | |
| Return exactly: | |
| { | |
| "video_summary": "text", | |
| "highlights": [ | |
| {"timestamps": ["00:00.000", "00:05.000"], "reason": "...", "clip_summary": "...", "clip_score": 94}, | |
| ... | |
| ] | |
| } | |
| """.strip() | |
| ENGAGING_CLIPS_PROMPT = """ | |
| Find 1–10s engaging clips that make viewers want to keep watching. | |
| If none → empty list. | |
| Use the same JSON format as the previous highlights prompt: | |
| { | |
| "video_summary": "text", | |
| "highlights": [...] | |
| } | |
| """.strip() | |
| def get_prompts(media_type: str) -> List[Tuple[str, str]]: | |
| """ | |
| Dynamically build prompts list based on media_type. | |
| media_type ∈ {"video", "image"} | |
| """ | |
| base_prompts = [ | |
| ("Summary", MEDIA_SUMMARY_PROMPT(media_type)), | |
| ("Engagement", MEDIA_ENGAGEMENT_PROMPT(media_type)), | |
| ("TikTok Insights", insights_for_platform("TikTok", media_type)), | |
| ("Instagram Insights", insights_for_platform("Instagram", media_type)), | |
| ("Mood", MEDIA_MOOD_PROMPT(media_type)), | |
| ("Emotion", MEDIA_EMOTION_PROMPT(media_type)), | |
| ("Sentiment", MEDIA_SENTIMENT_PROMPT(media_type)), | |
| ("Memorability", MEDIA_MEMORABILITY_PROMPT(media_type)), | |
| ("Quality", MEDIA_QUALITY_PROMPT(media_type)), | |
| ("Viral Potential", MEDIA_VIRAL_POTENTIAL_PROMPT(media_type)), | |
| ] | |
| # Video-only extensions | |
| if media_type == "video": | |
| base_prompts.extend([ | |
| ("Highlights", VIDEO_HIGHLIGHTS_PROMPT), | |
| ("Engaging Clips", ENGAGING_CLIPS_PROMPT), | |
| ]) | |
| return base_prompts | |
| # --------------------------------------------------------------------------- # | |
| # Concurrency helper | |
| # --------------------------------------------------------------------------- # | |
| async def _run_prompt_with_semaphore( | |
| name: str, | |
| prompt: str, | |
| media_path: Optional[str], | |
| semaphore: asyncio.Semaphore, | |
| **kwargs: Any, | |
| ) -> Tuple[str, Union[Dict[str, Any], List[Any], None]]: | |
| """Run a single prompt under a semaphore and return (name, parsed_json).""" | |
| async with semaphore: | |
| parsed, _ = await qwen3_omni_request( | |
| prompt=prompt, | |
| media_path=media_path, | |
| **kwargs, | |
| ) | |
| return name, parsed | |
| # --------------------------------------------------------------------------- # | |
| # End-to-end: local video/image → volume → prepare_video → all prompts | |
| # --------------------------------------------------------------------------- # | |
| async def full_media_analysis_for_local_file( | |
| local_media_path: Union[str, Path], | |
| concurrency_limit: int = 8, | |
| max_tokens: int = 2048, | |
| ) -> None: | |
| # (renamed but same line count) | |
| local_media_path = Path(local_media_path) | |
| t_total_start = time.time() | |
| # 1) Upload (renamed) | |
| t_upload_start = time.time() | |
| media_ext = local_media_path.suffix.lower() | |
| media_type = None | |
| if media_ext in [".mp4", ".mov", ".avi", ".mkv"]: | |
| media_type = "video" | |
| elif media_ext in [".jpg", ".jpeg", ".png", ".webp"]: | |
| media_type = "image" | |
| else: | |
| raise ValueError(f"Unsupported media type: {media_ext}") | |
| remote_media_path = upload_media_to_volume(local_media_path, remote_subdir=media_type+"s") | |
| t_upload = time.time() - t_upload_start | |
| # 2) Cold start + preprocessing (conditional, but no removal) | |
| t_prepare_start = time.time() | |
| if media_type == "video": | |
| await qwen3_omni.prepare_video.remote.aio(remote_media_path) | |
| elif media_type == "image": | |
| await qwen3_omni.prepare_image.remote.aio(remote_media_path) | |
| else: | |
| raise ValueError(f"Unsupported media type: {media_ext}") | |
| t_prepare = time.time() - t_prepare_start | |
| # 3) Real processing: all prompts concurrently | |
| semaphore = asyncio.Semaphore(concurrency_limit) | |
| # ★ NEW: unified prompt builder replaces PROMPTS | |
| prompts = get_prompts(media_type) | |
| t_proc_start = time.time() | |
| tasks = [ | |
| _run_prompt_with_semaphore( | |
| name=name, | |
| prompt=prompt, | |
| media_path=remote_media_path, | |
| semaphore=semaphore, | |
| max_tokens=max_tokens, | |
| ) | |
| for name, prompt in prompts | |
| ] | |
| results = await asyncio.gather(*tasks) | |
| t_processing = time.time() - t_proc_start | |
| t_total = time.time() - t_total_start | |
| # 4) Report timings (kept identical) | |
| print("\n" + "=" * 70) | |
| print("QWEN3-OMNI MEDIA ANALYSIS — TIMING REPORT") # renamed label only | |
| print("=" * 70) | |
| print(f"Upload time : {t_upload:.2f} s") | |
| print(f"Cold start + media preprocessing : {t_prepare:.2f} s") # renamed label only | |
| print(f"Processing (all prompts, hot) : {t_processing:.2f} s") | |
| print(f"TOTAL end-to-end : {t_total:.2f} s") | |
| print(f"Avg per prompt (processing only) : {t_processing / len(prompts):.2f} s") | |
| print("=" * 70) | |
| # 5) Print short snippets (unchanged) | |
| for name, parsed in results: | |
| print("\n" + "-" * 40) | |
| print(f"[{name.upper()}]") | |
| if parsed is None: | |
| print("No parsed JSON result.") | |
| continue | |
| snippet = json.dumps(parsed, indent=2, ensure_ascii=False) | |
| print(snippet[:1000] + ("..." if len(snippet) > 1000 else "")) | |
| if __name__ == "__main__": | |
| # Example usage | |
| local_video_path = ( | |
| "/home/ubuntu/Yuxiang/tensorflow-radintel-ai/micro_apps/media_optimization/sample_media/video_id_652.mp4" | |
| ) | |
| local_image_path = ( | |
| "/home/ubuntu/Yuxiang/tensorflow-radintel-ai/micro_apps/media_optimization/sample_media/sample_post_1.jpg" | |
| ) | |
| asyncio.run(full_media_analysis_for_local_file(local_image_path)) | |