Spaces:
Runtime error
Runtime error
| import os | |
| import json | |
| import requests | |
| import subprocess | |
| import shutil | |
| import time | |
| import re | |
| import threading | |
| from typing import Dict, List, Set, Optional | |
| from huggingface_hub import HfApi, list_repo_files | |
| from fastapi import FastAPI, File, UploadFile, Form | |
| from fastapi.responses import JSONResponse | |
| from pathlib import Path | |
| import smtplib | |
| from email.message import EmailMessage | |
| import tempfile | |
| import rarfile | |
| import zipfile | |
| import cv2 | |
| import numpy as np | |
| from PIL import Image | |
| import torch | |
| from transformers import AutoProcessor, AutoModelForCausalLM | |
| # from transformers import AutoProcessor, AutoModelForCausalLM | |
| # import os | |
| # # Try to ensure unrar is available | |
| # def setup_unrar(): | |
| # # Try system installation (may not work on all Spaces) | |
| # if not os.path.exists("/usr/bin/unrar"): | |
| # os.system("apt-get update > /dev/null 2>&1 && apt-get install -y unrar > /dev/null 2>&1 || true") | |
| # # Setup Python alternatives | |
| # try: | |
| # import rarfile | |
| # if os.path.exists("/usr/bin/unrar"): | |
| # rarfile.UNRAR_TOOL = "/usr/bin/unrar" | |
| # except ImportError: | |
| # os.system("pip install rarfile") | |
| # try: | |
| # import patoolib | |
| # except ImportError: | |
| # os.system("pip install patool") | |
| # setup_unrar() | |
| rarfile.UNRAR_TOOL = None # Forces use of internal Python extractor | |
| rarfile.PATH_SEP = '/' # Safe default | |
| def log_message(message: str): | |
| """Log messages with timestamp""" | |
| timestamp = time.strftime("%Y-%m-%d %H:%M:%S") | |
| log_entry = f"[{timestamp}] {message}" | |
| print(log_entry) | |
| if 'processing_status' in globals(): | |
| processing_status["logs"].append(log_entry) | |
| processing_status["last_update"] = timestamp | |
| if len(processing_status["logs"]) > 100: | |
| processing_status["logs"] = processing_status["logs"][-100:] | |
| # Attempt to install flash-attn | |
| try: | |
| subprocess.run('pip install flash-attn --no-build-isolation', env={'FLASH_ATTENTION_SKIP_CUDA_BUILD': "TRUE"}, check=True, shell=True) | |
| except subprocess.CalledProcessError as e: | |
| print(f"Error installing flash-attn: {e}") | |
| print("Continuing without flash-attn.") | |
| # Determine the device to use | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| # Initialize FastAPI | |
| app = FastAPI() | |
| # ==== CONFIGURATION ==== | |
| HF_TOKEN = os.getenv("HF_TOKEN", "") | |
| SOURCE_REPO_ID = os.getenv("SOURCE_REPO", "Fred808/BG1") | |
| # Path Configuration | |
| DOWNLOAD_FOLDER = "downloads" | |
| EXTRACT_FOLDER = "extracted" | |
| FRAMES_OUTPUT_FOLDER = "extracted_frames" | |
| ANALYSIS_OUTPUT_FOLDER = "analysis_results" | |
| os.makedirs(DOWNLOAD_FOLDER, exist_ok=True) | |
| os.makedirs(EXTRACT_FOLDER, exist_ok=True) | |
| os.makedirs(FRAMES_OUTPUT_FOLDER, exist_ok=True) | |
| os.makedirs(ANALYSIS_OUTPUT_FOLDER, exist_ok=True) | |
| # State Files | |
| DOWNLOAD_STATE_FILE = "download_progress.json" | |
| PROCESS_STATE_FILE = "process_progress.json" | |
| FAILED_FILES_LOG = "failed_files.log" | |
| # Processing Parameters | |
| CHUNK_SIZE = 1 | |
| PROCESSING_DELAY = 2 | |
| MAX_RETRIES = 3 | |
| MIN_FREE_SPACE_GB = 2 # Minimum free space in GB before processing | |
| # Frame Extraction Parameters | |
| DEFAULT_FPS = 0.1 # Default frames per second for extraction | |
| # Initialize HF API | |
| hf_api = HfApi(token=HF_TOKEN) | |
| # Global State | |
| processing_status = { | |
| "is_running": False, | |
| "current_file": None, | |
| "total_files": 0, | |
| "processed_files": 0, | |
| "failed_files": 0, | |
| "extracted_courses": 0, | |
| "extracted_videos": 0, | |
| "extracted_frames_count": 0, | |
| "analyzed_frames_count": 0, | |
| "last_update": None, | |
| "logs": [] | |
| } | |
| import torch | |
| import subprocess | |
| import sys | |
| device = "cpu" # Explicitly ensure CPU usage | |
| try: | |
| vision_model = AutoModelForCausalLM.from_pretrained('microsoft/Florence-2-base', trust_remote_code=True, | |
| attn_implementation="eager" ).to(device).eval() | |
| vision_processor = AutoProcessor.from_pretrained('microsoft/Florence-2-base', trust_remote_code=True) | |
| except Exception as e: | |
| print(f"Error loading base model: {e}") | |
| vision_language_model_base = None | |
| vision_language_processor_base = None | |
| # Preprompt templates | |
| PREPROMPT_TEMPLATES = { | |
| "default": "This image shows: ", | |
| "design": "This design tutorial frame shows: ", | |
| "ui": "This user interface demonstrates: ", | |
| "motion": "This motion design example illustrates: " | |
| } | |
| def get_preprompt(video_filename: str) -> str: | |
| """Select appropriate preprompt based on video content""" | |
| filename = video_filename.lower() | |
| if any(x in filename for x in ["ui", "interface", "ux"]): | |
| return PREPROMPT_TEMPLATES["ui"] | |
| elif any(x in filename for x in ["design", "tutorial"]): | |
| return PREPROMPT_TEMPLATES["design"] | |
| elif any(x in filename for x in ["motion", "animation"]): | |
| return PREPROMPT_TEMPLATES["motion"] | |
| return PREPROMPT_TEMPLATES["default"] | |
| def log_message(message: str): | |
| """Log messages with timestamp""" | |
| timestamp = time.strftime("%Y-%m-%d %H:%M:%S") | |
| log_entry = f"[{timestamp}] {message}" | |
| print(log_entry) | |
| processing_status["logs"].append(log_entry) | |
| processing_status["last_update"] = timestamp | |
| if len(processing_status["logs"]) > 100: | |
| processing_status["logs"] = processing_status["logs"][-100:] | |
| import patoolib | |
| from patoolib.util import PatoolError | |
| def extract_with_retry(rar_path: str, output_dir: str, max_retries: int = 2) -> bool: | |
| """Extract RAR or ZIP file using patoolib with retry logic""" | |
| filename = os.path.basename(rar_path) | |
| # Handle multi-part RARs (skip them if patoolib can't handle) | |
| if is_multipart_rar(filename): | |
| base_name = get_rar_part_base(filename) | |
| first_part = f"{base_name}.part01.rar" | |
| first_part_path = os.path.join(os.path.dirname(rar_path), first_part) | |
| if not os.path.exists(first_part_path): | |
| log_message(f"⚠️ Multi-part RAR detected but first part not found: {first_part}") | |
| return False | |
| rar_path = first_part_path | |
| log_message(f"📦 Processing multi-part RAR starting with: {first_part}") | |
| for attempt in range(max_retries): | |
| try: | |
| os.makedirs(output_dir, exist_ok=True) | |
| patoolib.extract_archive(rar_path, outdir=output_dir, verbosity=-1) | |
| log_message(f"✅ Successfully extracted: {filename} using patoolib") | |
| return True | |
| except PatoolError as e: | |
| log_message(f"⚠️ patoolib extraction attempt {attempt + 1} failed: {str(e)}") | |
| time.sleep(1) | |
| except Exception as e: | |
| log_message(f"❌ patoolib exception: {str(e)}") | |
| time.sleep(1) | |
| return False | |
| def log_failed_file(filename: str, error: str): | |
| """Log failed files to persistent file""" | |
| with open(FAILED_FILES_LOG, "a") as f: | |
| f.write(f'{time.strftime("%Y-%m-%d %H:%M:%S")} - {filename}: {error}\n') | |
| def get_disk_usage(path: str) -> Dict[str, float]: | |
| """Get disk usage statistics in GB""" | |
| statvfs = os.statvfs(path) | |
| total = statvfs.f_frsize * statvfs.f_blocks / (1024**3) | |
| free = statvfs.f_frsize * statvfs.f_bavail / (1024**3) | |
| used = total - free | |
| return {"total": total, "free": free, "used": used} | |
| def check_disk_space(path: str = ".") -> bool: | |
| """Check if there\'s enough disk space""" | |
| disk_info = get_disk_usage(path) | |
| if disk_info["free"] < MIN_FREE_SPACE_GB: | |
| log_message(f'⚠️ Low disk space: {disk_info["free"]:.2f}GB free, {disk_info["used"]:.2f}GB used') | |
| return False | |
| return True | |
| def cleanup_temp_files(): | |
| """Clean up temporary files to free space""" | |
| log_message("🧹 Cleaning up temporary files...") | |
| # Clean old downloads (keep only current processing file) | |
| current_file = processing_status.get("current_file") | |
| for file in os.listdir(DOWNLOAD_FOLDER): | |
| if file != current_file and file.endswith((".rar", ".zip")): | |
| try: | |
| os.remove(os.path.join(DOWNLOAD_FOLDER, file)) | |
| log_message(f"🗑️ Removed old download: {file}") | |
| except: | |
| pass | |
| def load_json_state(file_path: str, default_value): | |
| """Load state from JSON file""" | |
| if os.path.exists(file_path): | |
| try: | |
| with open(file_path, "r") as f: | |
| return json.load(f) | |
| except json.JSONDecodeError: | |
| log_message(f"⚠️ Corrupted state file: {file_path}") | |
| return default_value | |
| def save_json_state(file_path: str, data): | |
| """Save state to JSON file""" | |
| with open(file_path, "w") as f: | |
| json.dump(data, f, indent=2) | |
| def download_with_retry(url: str, dest_path: str, max_retries: int = 3) -> bool: | |
| """Download file with retry logic and disk space checking""" | |
| if not check_disk_space(): | |
| cleanup_temp_files() | |
| if not check_disk_space(): | |
| log_message("❌ Insufficient disk space even after cleanup") | |
| return False | |
| headers = {"Authorization": f"Bearer {HF_TOKEN}"} | |
| for attempt in range(max_retries): | |
| try: | |
| with requests.get(url, headers=headers, stream=True) as r: | |
| r.raise_for_status() | |
| # Check content length if available | |
| content_length = r.headers.get("content-length") | |
| if content_length: | |
| size_gb = int(content_length) / (1024**3) | |
| disk_info = get_disk_usage(".") | |
| if size_gb > disk_info["free"] - 0.5: # Leave 0.5GB buffer | |
| log_message(f'❌ File too large: {size_gb:.2f}GB, only {disk_info["free"]:.2f}GB free') | |
| return False | |
| with open(dest_path, "wb") as f: | |
| for chunk in r.iter_content(chunk_size=8192): | |
| f.write(chunk) | |
| return True | |
| except Exception as e: | |
| if attempt < max_retries - 1: | |
| time.sleep(2 ** attempt) | |
| continue | |
| log_message(f"❌ Download failed after {max_retries} attempts: {e}") | |
| return False | |
| return False | |
| def is_multipart_rar(filename: str) -> bool: | |
| """Check if this is a multi-part RAR file""" | |
| return ".part" in filename.lower() and filename.lower().endswith(".rar") | |
| def get_rar_part_base(filename: str) -> str: | |
| """Get the base name for multi-part RAR files""" | |
| if ".part" in filename.lower(): | |
| return filename.split(".part")[0] | |
| return filename.replace(".rar", "") | |
| def ensure_dir(path): | |
| os.makedirs(path, exist_ok=True) | |
| def extract_frames(video_path, output_dir, fps=DEFAULT_FPS): | |
| """Extract frames from video at the specified frames per second (fps).""" | |
| log_message(f"[INFO] Extracting frames from {video_path} to {output_dir} at {fps} fps...") | |
| ensure_dir(output_dir) | |
| cap = cv2.VideoCapture(str(video_path)) | |
| if not cap.isOpened(): | |
| log_message(f"[ERROR] Failed to open video file: {video_path}") | |
| return 0 | |
| video_fps = cap.get(cv2.CAP_PROP_FPS) | |
| if not video_fps or video_fps <= 0: | |
| video_fps = 30 # fallback if FPS is not available | |
| log_message(f"[WARN] Using fallback FPS: {video_fps}") | |
| frame_interval = int(round(video_fps / fps)) | |
| frame_idx = 0 | |
| saved_idx = 1 | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| log_message(f"[DEBUG] Total frames in video: {total_frames}") | |
| while cap.isOpened(): | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| if frame_idx % frame_interval == 0: | |
| if saved_idx <= 10: # Limit to 10 frames for testing | |
| frame_name = f"{saved_idx:04d}.png" | |
| cv2.imwrite(str(Path(output_dir) / frame_name), frame) | |
| saved_idx += 1 | |
| else: | |
| break # Stop extracting after 10 frames | |
| frame_idx += 1 | |
| cap.release() | |
| log_message(f"Extracted {saved_idx-1} frames from {video_path} to {output_dir}") | |
| return saved_idx - 1 | |
| def analyze_single_frame(image_path: str, preprompt: str = "") -> dict: | |
| """Consistent frame processing function with robust error handling""" | |
| if not vision_model or not vision_processor: | |
| return { | |
| "image": os.path.basename(image_path), | |
| "description": "[ERROR] Model not loaded", | |
| "success": False | |
| } | |
| try: | |
| # Load and resize image | |
| image = Image.open(image_path).convert("RGB") | |
| image = image.resize((224, 224)) | |
| # Ensure tokenizer padding config is safe | |
| tokenizer = vision_processor.tokenizer | |
| if tokenizer.pad_token is None: | |
| tokenizer.pad_token = tokenizer.eos_token | |
| tokenizer.padding_side = "right" | |
| # Preprocess inputs | |
| inputs = vision_processor( | |
| images=[image], | |
| text=preprompt, | |
| return_tensors="pt", | |
| padding="max_length", | |
| truncation=True, | |
| max_length=512 | |
| ).to(device) | |
| # Safety: check pixel_values shape | |
| pixel_values = inputs["pixel_values"] | |
| if pixel_values.dim() == 3: | |
| pixel_values = pixel_values.unsqueeze(0) | |
| # Generate caption | |
| with torch.no_grad(): | |
| outputs = vision_model.generate( | |
| input_ids=inputs["input_ids"], | |
| attention_mask=inputs["attention_mask"], | |
| pixel_values=pixel_values, | |
| max_new_tokens=500, | |
| num_beams=5, | |
| early_stopping=False, | |
| pad_token_id=tokenizer.pad_token_id | |
| ) | |
| caption = vision_processor.batch_decode( | |
| outputs, | |
| skip_special_tokens=True | |
| )[0].strip() | |
| return { | |
| "image": os.path.basename(image_path), | |
| "description": caption, | |
| "success": True | |
| } | |
| except Exception as e: | |
| return { | |
| "image": os.path.basename(image_path), | |
| "description": f"[ERROR] {str(e)}", | |
| "success": False | |
| } | |
| def process_video_frames(frames_dir: str, video_filename: str, output_file: str) -> bool: | |
| """Main processing function with first-frame validation""" | |
| try: | |
| frames = sorted(Path(frames_dir).glob("*.png")) | |
| if not frames: | |
| print("❌ No frames found in directory") | |
| return False | |
| # Validate first frame | |
| first_frame_result = analyze_single_frame(str(frames[0]), get_preprompt(video_filename)) | |
| print("\n=== FIRST FRAME VALIDATION ===") | |
| print(f'Image: {first_frame_result["image"]}') | |
| print(f'Result: {first_frame_result["description"]}') | |
| print(f'Status: {"Success" if first_frame_result["success"] else "Failed"}\n') | |
| if not first_frame_result["success"]: | |
| print("❌ Aborting due to first frame failure") | |
| return False | |
| preprompt = get_preprompt(video_filename) | |
| results = { | |
| "metadata": { | |
| "video": video_filename, | |
| "preprompt": preprompt, | |
| "total_frames": len(frames), | |
| "processed_frames": 0, | |
| "failed_frames": 0 | |
| }, | |
| "frames": [] | |
| } | |
| for i, frame_path in enumerate(frames): | |
| result = analyze_single_frame(str(frame_path), preprompt) | |
| results["frames"].append(result) | |
| if result["success"]: | |
| results["metadata"]["processed_frames"] += 1 | |
| else: | |
| results["metadata"]["failed_frames"] += 1 | |
| # Periodic saving | |
| if i % 10 == 0: | |
| with open(output_file, "w") as f: | |
| json.dump(results, f, indent=2) | |
| # Final save | |
| with open(output_file, "w") as f: | |
| json.dump(results, f, indent=2) | |
| return True | |
| except Exception as e: | |
| print(f"❌ Processing failed: {str(e)}") | |
| return False | |
| def summarize_activities(frame_analyses: List[Dict]) -> Dict: | |
| """Summarize activities from frame analyses.""" | |
| return {} | |
| def process_rar_file(rar_path: str) -> bool: | |
| """Process a single RAR file with new frame processing""" | |
| filename = os.path.basename(rar_path) | |
| processing_status["current_file"] = filename | |
| # Handle multi-part RAR naming | |
| if is_multipart_rar(filename): | |
| course_name = get_rar_part_base(filename) | |
| else: | |
| course_name = filename.replace(".rar", "") | |
| extract_dir = os.path.join(EXTRACT_FOLDER, course_name) | |
| try: | |
| log_message(f"🔄 Processing: {filename}") | |
| # Clean up any existing directory | |
| if os.path.exists(extract_dir): | |
| shutil.rmtree(extract_dir, ignore_errors=True) | |
| # Extract RAR | |
| os.makedirs(extract_dir, exist_ok=True) | |
| if not extract_with_retry(rar_path, extract_dir): | |
| raise Exception("RAR extraction failed") | |
| # Process video files | |
| video_files = [] | |
| for root, _, files in os.walk(extract_dir): | |
| for file in files: | |
| if file.lower().endswith((".mp4", ".avi", ".mov", ".mkv")): | |
| video_files.append(os.path.join(root, file)) | |
| processing_status["extracted_courses"] += 1 | |
| log_message(f"✅ Extracted {len(video_files)} videos from \'{course_name}\'") | |
| # Process each video | |
| for video_path in video_files: | |
| video_filename = Path(video_path).name | |
| video_filename_clean = video_filename.replace(".", "_") | |
| frames_dir = os.path.join(FRAMES_OUTPUT_FOLDER, f"{course_name}_{video_filename_clean}_frames") | |
| ensure_dir(frames_dir) | |
| # Extract frames | |
| extracted_count = extract_frames(video_path, frames_dir, DEFAULT_FPS) | |
| if extracted_count == 0: | |
| raise Exception(f"No frames extracted from {video_filename}") | |
| processing_status["extracted_frames_count"] += extracted_count | |
| # Analyze frames | |
| video_filename_clean = video_filename.replace(".", "_") | |
| analysis_output = os.path.join(ANALYSIS_OUTPUT_FOLDER, f"{course_name}_{video_filename_clean}_analysis.json") | |
| if process_video_frames(frames_dir, video_filename, analysis_output): | |
| processing_status["analyzed_frames_count"] += extracted_count | |
| processing_status["extracted_videos"] += 1 | |
| else: | |
| raise Exception(f"Frame analysis failed for {video_filename}") | |
| return True | |
| except Exception as e: | |
| error_msg = str(e) | |
| log_message(f"❌ Processing failed: {error_msg}") | |
| log_failed_file(filename, error_msg) | |
| return False | |
| finally: | |
| processing_status["current_file"] = None | |
| def main_processing_loop(start_index: int = 0): | |
| """Main processing workflow - extraction, frame extraction, and vision analysis""" | |
| processing_status["is_running"] = True | |
| try: | |
| # Load state | |
| processed_rars = load_json_state(PROCESS_STATE_FILE, {"processed_rars": []})["processed_rars"] | |
| download_state = load_json_state(DOWNLOAD_STATE_FILE, {"next_download_index": 0}) | |
| # Use start_index if provided, otherwise use the saved state | |
| next_index = start_index if start_index > 0 else download_state["next_download_index"] | |
| log_message(f"📊 Starting from index {next_index}") | |
| log_message(f"📊 Previously processed: {len(processed_rars)} files") | |
| # Get file list | |
| try: | |
| files = list(hf_api.list_repo_files(repo_id=SOURCE_REPO_ID, repo_type="dataset")) | |
| rar_files = sorted([f for f in files if f.endswith(".rar")]) | |
| processing_status["total_files"] = len(rar_files) | |
| log_message(f"📁 Found {len(rar_files)} RAR files in repository") | |
| if next_index >= len(rar_files): | |
| log_message("✅ All files have been processed!") | |
| return | |
| except Exception as e: | |
| log_message(f"❌ Failed to get file list: {str(e)}") | |
| return | |
| # Process only one file per run | |
| if next_index < len(rar_files): | |
| rar_file = rar_files[next_index] | |
| filename = os.path.basename(rar_file) | |
| if filename in processed_rars: | |
| log_message(f"⏭️ Skipping already processed: {filename}") | |
| processing_status["processed_files"] += 1 | |
| # Move to next file | |
| next_index += 1 | |
| save_json_state(DOWNLOAD_STATE_FILE, {"next_download_index": next_index}) | |
| log_message(f"📊 Moving to next file. Progress: {next_index}/{len(rar_files)}") | |
| return | |
| log_message(f"📥 Downloading: {filename}") | |
| dest_path = os.path.join(DOWNLOAD_FOLDER, filename) | |
| # Download file | |
| download_url = f"https://huggingface.co/datasets/{SOURCE_REPO_ID}/resolve/main/{rar_file}" | |
| if download_with_retry(download_url, dest_path): | |
| # Process file | |
| if process_rar_file(dest_path): | |
| processed_rars.append(filename) | |
| save_json_state(PROCESS_STATE_FILE, {"processed_rars": processed_rars}) | |
| log_message(f"✅ Successfully processed: {filename}") | |
| processing_status["processed_files"] += 1 | |
| else: | |
| log_message(f"❌ Failed to process: {filename}") | |
| processing_status["failed_files"] += 1 | |
| # Clean up downloaded file | |
| try: | |
| os.remove(dest_path) | |
| log_message(f"🗑️ Cleaned up download: {filename}") | |
| except: | |
| pass | |
| else: | |
| log_message(f"❌ Failed to download: {filename}") | |
| processing_status["failed_files"] += 1 | |
| # Update download state for next run | |
| next_index += 1 | |
| save_json_state(DOWNLOAD_STATE_FILE, {"next_download_index": next_index}) | |
| # Status update | |
| log_message(f"📊 Progress: {next_index}/{len(rar_files)} files processed") | |
| log_message(f"📊 Extracted: {processing_status['extracted_courses']} courses") | |
| log_message(f"📊 Videos Processed: {processing_status['extracted_videos']} videos") | |
| log_message(f"📊 Frames Extracted: {processing_status['extracted_frames_count']} frames") | |
| log_message(f"📊 Frames Analyzed: {processing_status['analyzed_frames_count']} frames") | |
| log_message(f"📊 Failed: {processing_status['failed_files']} files") | |
| if next_index < len(rar_files): | |
| log_message(f"🔄 Run the script again to process the next file: {os.path.basename(rar_files[next_index])}") | |
| else: | |
| log_message("🎉 All files have been processed!") | |
| else: | |
| log_message("✅ All files have been processed!") | |
| log_message(f"🎉 Processing complete!") | |
| log_message(f"📊 Final stats: {processing_status['extracted_courses']} courses extracted, {processing_status['extracted_videos']} videos processed, {processing_status['extracted_frames_count']} frames extracted, {processing_status['analyzed_frames_count']} frames analyzed") | |
| except KeyboardInterrupt: | |
| log_message("⏹️ Processing interrupted by user") | |
| except Exception as e: | |
| log_message(f"❌ Fatal error: {str(e)}") | |
| finally: | |
| processing_status["is_running"] = False | |
| cleanup_temp_files() | |
| # FastAPI Endpoints | |
| async def analyze_video_endpoint( | |
| file: UploadFile = File(...), | |
| fps: float = Form(DEFAULT_FPS), | |
| prompt: Optional[str] = Form(None) | |
| ): | |
| """Analyze a single video file and return frame-by-frame analysis.""" | |
| if not file.filename.lower().endswith((".mp4", ".avi", ".mov", ".mkv")): | |
| return JSONResponse(status_code=400, content={ | |
| "error": "File type not allowed", | |
| "allowed_types": [".mp4", ".avi", ".mov", ".mkv"] | |
| }) | |
| with tempfile.TemporaryDirectory() as temp_dir: | |
| temp_dir_path = Path(temp_dir) | |
| file_path = temp_dir_path / file.filename | |
| with open(file_path, "wb") as buffer: | |
| shutil.copyfileobj(file.file, buffer) | |
| frames_dir = temp_dir_path / "frames" | |
| frame_count = extract_frames(file_path, frames_dir, fps) | |
| frame_analyses = [] | |
| for frame_file in sorted(frames_dir.glob("*.png")): | |
| analysis = analyze_single_frame(str(frame_file), prompt or "") | |
| frame_analyses.append(analysis) | |
| summary = summarize_activities(frame_analyses) | |
| return JSONResponse(content={ | |
| "video_filename": file.filename, | |
| "frame_count": frame_count, | |
| "fps": fps, | |
| "frame_analyses": frame_analyses, | |
| "summary": summary | |
| }) | |
| async def analyze_archive_endpoint( | |
| file: UploadFile = File(...), | |
| fps: float = Form(DEFAULT_FPS), | |
| prompt: Optional[str] = Form(None) | |
| ): | |
| """Analyze videos from RAR/ZIP archive and return frame-by-frame analysis.""" | |
| if not file.filename.lower().endswith((".rar", ".zip")): | |
| return JSONResponse(status_code=400, content={ | |
| "error": "File type not allowed", | |
| "allowed_types": [".rar", ".zip"] | |
| }) | |
| with tempfile.TemporaryDirectory() as temp_dir: | |
| temp_dir_path = Path(temp_dir) | |
| file_path = temp_dir_path / file.filename | |
| with open(file_path, "wb") as buffer: | |
| shutil.copyfileobj(file.file, buffer) | |
| extract_dir = temp_dir_path / "extracted" | |
| video_files = [] | |
| if file.filename.lower().endswith(".rar"): | |
| with rarfile.RarFile(file_path) as rf: | |
| rf.extractall(extract_dir) | |
| else: | |
| with zipfile.ZipFile(file_path) as zf: | |
| zf.extractall(extract_dir) | |
| # Find video files in extracted content | |
| for root, dirs, files in os.walk(extract_dir): | |
| for file in files: | |
| if file.lower().endswith((".mp4", ".avi", ".mov", ".mkv")): | |
| video_files.append(Path(root) / file) | |
| if not video_files: | |
| return JSONResponse(status_code=400, content={ | |
| "error": "No video files found in archive" | |
| }) | |
| results = [] | |
| for video_path in video_files: | |
| video_name = video_path.name | |
| frames_dir = temp_dir_path / f"frames_{video_name}" | |
| frame_count = extract_frames(video_path, frames_dir, fps) | |
| frame_analyses = [] | |
| for frame_file in sorted(frames_dir.glob("*.png")): | |
| analysis = analyze_single_frame(str(frame_file), prompt or "") | |
| frame_analyses.append(analysis) | |
| summary = summarize_activities(frame_analyses) | |
| results.append({ | |
| "video_filename": video_name, | |
| "frame_count": frame_count, | |
| "fps": fps, | |
| "frame_analyses": frame_analyses, | |
| "summary": summary | |
| }) | |
| return JSONResponse(content={ | |
| "archive_filename": file.filename, | |
| "videos_processed": len(video_files), | |
| "results": results | |
| }) | |
| async def health_check(): | |
| """Health check endpoint.""" | |
| return JSONResponse(content={ | |
| "status": "healthy", | |
| "model": "GIT", | |
| "note": "Now using GIT model." | |
| }) | |
| async def get_processing_status(): | |
| """Get current processing status.""" | |
| return JSONResponse(content=processing_status) | |
| # Expose necessary functions and variables | |
| __all__ = [ | |
| "main_processing_loop", | |
| "processing_status", | |
| "ANALYSIS_OUTPUT_FOLDER", | |
| "log_message", | |
| "analyze_single_frame", | |
| "extract_frames", | |
| "DEFAULT_FPS", | |
| "ensure_dir" | |
| ] | |