Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import requests | |
| import subprocess | |
| import shutil | |
| import time | |
| import re | |
| import threading | |
| from typing import Dict, List, Set, Optional | |
| from huggingface_hub import HfApi, list_repo_files | |
| from fastapi import FastAPI, File, UploadFile, Form | |
| from fastapi.responses import JSONResponse | |
| from pathlib import Path | |
| import smtplib | |
| from email.message import EmailMessage | |
| import tempfile | |
| import rarfile | |
| import zipfile | |
| import cv2 | |
| import numpy as np | |
| from PIL import Image | |
| import torch | |
| from transformers import AutoProcessor, AutoModelForCausalLM | |
| # Initialize FastAPI | |
| app = FastAPI() | |
| # ==== CONFIGURATION ==== | |
| HF_TOKEN = os.getenv("HF_TOKEN", "") | |
| SOURCE_REPO_ID = os.getenv("SOURCE_REPO", "Fred808/BG1") | |
| # Path Configuration | |
| DOWNLOAD_FOLDER = "downloads" | |
| EXTRACT_FOLDER = "extracted" | |
| FRAMES_OUTPUT_FOLDER = "extracted_frames" | |
| ANALYSIS_OUTPUT_FOLDER = "analysis_results" | |
| os.makedirs(DOWNLOAD_FOLDER, exist_ok=True) | |
| os.makedirs(EXTRACT_FOLDER, exist_ok=True) | |
| os.makedirs(FRAMES_OUTPUT_FOLDER, exist_ok=True) | |
| os.makedirs(ANALYSIS_OUTPUT_FOLDER, exist_ok=True) | |
| # State Files | |
| DOWNLOAD_STATE_FILE = "download_progress.json" | |
| PROCESS_STATE_FILE = "process_progress.json" | |
| FAILED_FILES_LOG = "failed_files.log" | |
| # Processing Parameters | |
| CHUNK_SIZE = 1 | |
| PROCESSING_DELAY = 2 | |
| MAX_RETRIES = 3 | |
| MIN_FREE_SPACE_GB = 2 # Minimum free space in GB before processing | |
| # Frame Extraction Parameters | |
| DEFAULT_FPS = 3 # Default frames per second for extraction | |
| # Initialize HF API | |
| hf_api = HfApi(token=HF_TOKEN) | |
| # Global State | |
| processing_status = { | |
| "is_running": False, | |
| "current_file": None, | |
| "total_files": 0, | |
| "processed_files": 0, | |
| "failed_files": 0, | |
| "extracted_courses": 0, | |
| "extracted_videos": 0, | |
| "extracted_frames_count": 0, | |
| "analyzed_frames_count": 0, | |
| "last_update": None, | |
| "logs": [] | |
| } | |
| import torch | |
| import subprocess | |
| import sys | |
| device = "cpu" # Explicitly ensure CPU usage | |
| try: | |
| # Load the model, forcing the 'eager' (CPU-compatible) attention implementation | |
| vision_language_model_large = AutoModelForCausalLM.from_pretrained( | |
| "microsoft/Florence-2-Base", | |
| trust_remote_code=True | |
| ).to(device).eval() | |
| vision_language_processor_large = AutoProcessor.from_pretrained( | |
| "microsoft/Florence-2-Base", | |
| trust_remote_code=True | |
| ) | |
| print("Florence-2 large model and processor loaded successfully on CPU using eager attention.") | |
| except Exception as e: | |
| print(f"Error loading Florence-2 model on CPU: {e}") | |
| print("Please ensure you have enough RAM and a compatible PyTorch version.") | |
| vision_language_model_large = None | |
| vision_language_processor_large = None | |
| def log_message(message: str): | |
| """Log messages with timestamp""" | |
| timestamp = time.strftime("%Y-%m-%d %H:%M:%S") | |
| log_entry = f"[{timestamp}] {message}" | |
| print(log_entry) | |
| processing_status["logs"].append(log_entry) | |
| processing_status["last_update"] = timestamp | |
| if len(processing_status["logs"]) > 100: | |
| processing_status["logs"] = processing_status["logs"][-100:] | |
| def log_failed_file(filename: str, error: str): | |
| """Log failed files to persistent file""" | |
| with open(FAILED_FILES_LOG, "a") as f: | |
| f.write(f"{time.strftime('%Y-%m-%d %H:%M:%S')} - {filename}: {error}\n") | |
| def get_disk_usage(path: str) -> Dict[str, float]: | |
| """Get disk usage statistics in GB""" | |
| statvfs = os.statvfs(path) | |
| total = statvfs.f_frsize * statvfs.f_blocks / (1024**3) | |
| free = statvfs.f_frsize * statvfs.f_bavail / (1024**3) | |
| used = total - free | |
| return {"total": total, "free": free, "used": used} | |
| def check_disk_space(path: str = ".") -> bool: | |
| """Check if there's enough disk space""" | |
| disk_info = get_disk_usage(path) | |
| if disk_info["free"] < MIN_FREE_SPACE_GB: | |
| log_message(f'⚠️ Low disk space: {disk_info["free"]:.2f}GB free, {disk_info["used"]:.2f}GB used') | |
| return False | |
| return True | |
| def cleanup_temp_files(): | |
| """Clean up temporary files to free space""" | |
| log_message("🧹 Cleaning up temporary files...") | |
| # Clean old downloads (keep only current processing file) | |
| current_file = processing_status.get("current_file") | |
| for file in os.listdir(DOWNLOAD_FOLDER): | |
| if file != current_file and file.endswith((".rar", ".zip")): | |
| try: | |
| os.remove(os.path.join(DOWNLOAD_FOLDER, file)) | |
| log_message(f"🗑️ Removed old download: {file}") | |
| except: | |
| pass | |
| def load_json_state(file_path: str, default_value): | |
| """Load state from JSON file""" | |
| if os.path.exists(file_path): | |
| try: | |
| with open(file_path, "r") as f: | |
| return json.load(f) | |
| except json.JSONDecodeError: | |
| log_message(f"⚠️ Corrupted state file: {file_path}") | |
| return default_value | |
| def save_json_state(file_path: str, data): | |
| """Save state to JSON file""" | |
| with open(file_path, "w") as f: | |
| json.dump(data, f, indent=2) | |
| def download_with_retry(url: str, dest_path: str, max_retries: int = 3) -> bool: | |
| """Download file with retry logic and disk space checking""" | |
| if not check_disk_space(): | |
| cleanup_temp_files() | |
| if not check_disk_space(): | |
| log_message("❌ Insufficient disk space even after cleanup") | |
| return False | |
| headers = {"Authorization": f"Bearer {HF_TOKEN}"} | |
| for attempt in range(max_retries): | |
| try: | |
| with requests.get(url, headers=headers, stream=True) as r: | |
| r.raise_for_status() | |
| # Check content length if available | |
| content_length = r.headers.get("content-length") | |
| if content_length: | |
| size_gb = int(content_length) / (1024**3) | |
| disk_info = get_disk_usage(".") | |
| if size_gb > disk_info["free"] - 0.5: # Leave 0.5GB buffer | |
| log_message(f'❌ File too large: {size_gb:.2f}GB, only {disk_info["free"]:.2f}GB free') | |
| return False | |
| with open(dest_path, "wb") as f: | |
| for chunk in r.iter_content(chunk_size=8192): | |
| f.write(chunk) | |
| return True | |
| except Exception as e: | |
| if attempt < max_retries - 1: | |
| time.sleep(2 ** attempt) | |
| continue | |
| log_message(f"❌ Download failed after {max_retries} attempts: {e}") | |
| return False | |
| return False | |
| def is_multipart_rar(filename: str) -> bool: | |
| """Check if this is a multi-part RAR file""" | |
| return ".part" in filename.lower() and filename.lower().endswith(".rar") | |
| def get_rar_part_base(filename: str) -> str: | |
| """Get the base name for multi-part RAR files""" | |
| if ".part" in filename.lower(): | |
| return filename.split(".part")[0] | |
| return filename.replace(".rar", "") | |
| def extract_with_retry(rar_path: str, output_dir: str, max_retries: int = 2) -> bool: | |
| """Extract RAR with retry and recovery, handling multi-part archives""" | |
| filename = os.path.basename(rar_path) | |
| # For multi-part RARs, we need the first part | |
| if is_multipart_rar(filename): | |
| base_name = get_rar_part_base(filename) | |
| first_part = f"{base_name}.part01.rar" | |
| first_part_path = os.path.join(os.path.dirname(rar_path), first_part) | |
| if not os.path.exists(first_part_path): | |
| log_message(f"⚠️ Multi-part RAR detected but first part not found: {first_part}") | |
| return False | |
| rar_path = first_part_path | |
| log_message(f"📦 Processing multi-part RAR starting with: {first_part}") | |
| for attempt in range(max_retries): | |
| try: | |
| # Test RAR first | |
| test_cmd = ["unrar", "t", rar_path] | |
| test_result = subprocess.run(test_cmd, capture_output=True, text=True) | |
| if test_result.returncode != 0: | |
| log_message(f"⚠️ RAR test failed: {test_result.stderr}") | |
| if attempt == max_retries - 1: | |
| return False | |
| continue | |
| # Extract RAR | |
| cmd = ["unrar", "x", "-o+", rar_path, output_dir] | |
| if attempt > 0: # Try recovery on subsequent attempts | |
| cmd.insert(2, "-kb") | |
| result = subprocess.run(cmd, capture_output=True, text=True) | |
| if result.returncode == 0: | |
| log_message(f"✅ Successfully extracted: {os.path.basename(rar_path)}") | |
| return True | |
| else: | |
| error_msg = result.stderr or result.stdout | |
| log_message(f"⚠️ Extraction attempt {attempt + 1} failed: {error_msg}") | |
| if "checksum error" in error_msg.lower() or "CRC failed" in error_msg: | |
| log_message(f"⚠️ Data corruption detected, attempt {attempt + 1}") | |
| elif result.returncode == 10: | |
| log_message(f"⚠️ No files to extract (exit code 10)") | |
| return False | |
| elif result.returncode == 1: | |
| log_message(f"⚠️ Non-fatal error (exit code 1)") | |
| except Exception as e: | |
| log_message(f"❌ Extraction exception: {str(e)}") | |
| if attempt == max_retries - 1: | |
| return False | |
| time.sleep(1) | |
| return False | |
| def ensure_dir(path): | |
| os.makedirs(path, exist_ok=True) | |
| def extract_frames(video_path, output_dir, fps=DEFAULT_FPS): | |
| """Extract frames from video at the specified frames per second (fps).""" | |
| log_message(f"[INFO] Extracting frames from {video_path} to {output_dir} at {fps} fps...") | |
| ensure_dir(output_dir) | |
| cap = cv2.VideoCapture(str(video_path)) | |
| if not cap.isOpened(): | |
| log_message(f"[ERROR] Failed to open video file: {video_path}") | |
| return 0 | |
| video_fps = cap.get(cv2.CAP_PROP_FPS) | |
| if not video_fps or video_fps <= 0: | |
| video_fps = 30 # fallback if FPS is not available | |
| log_message(f"[WARN] Using fallback FPS: {video_fps}") | |
| frame_interval = int(round(video_fps / fps)) | |
| frame_idx = 0 | |
| saved_idx = 1 | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| log_message(f"[DEBUG] Total frames in video: {total_frames}") | |
| while cap.isOpened(): | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| if frame_idx % frame_interval == 0: | |
| frame_name = f"{saved_idx:04d}.png" | |
| cv2.imwrite(str(Path(output_dir) / frame_name), frame) | |
| saved_idx += 1 | |
| frame_idx += 1 | |
| cap.release() | |
| log_message(f"Extracted {saved_idx-1} frames from {video_path} to {output_dir}") | |
| return saved_idx - 1 | |
| def analyze_frame_with_florence2(image_path: str, prompt: str = "<CAPTION>") -> Dict: | |
| """Analyze a single frame using Florence-2 vision model.""" | |
| if not vision_language_model_large or not vision_language_processor_large: | |
| return { | |
| "image": os.path.basename(image_path), | |
| "description": "[ERROR] Vision model not loaded." | |
| } | |
| image = Image.open(image_path).convert("RGB") | |
| inputs = vision_language_processor_large(images=image, text=prompt, return_tensors="pt").to(device) | |
| with torch.no_grad(): | |
| generated_ids = vision_language_model_large.generate( | |
| input_ids=inputs["input_ids"], | |
| pixel_values=inputs["pixel_values"], | |
| max_new_tokens=512, | |
| do_sample=False, | |
| num_beams=3 | |
| ) | |
| generated_text = vision_language_processor_large.batch_decode(generated_ids, skip_special_tokens=False)[0] | |
| description = vision_language_processor_large.post_process_generation( | |
| generated_text, | |
| task="<CAPTION>", | |
| image_size=(image.width, image.height) | |
| )["<CAPTION>"] | |
| return { | |
| "image": os.path.basename(image_path), | |
| "description": description | |
| } | |
| def summarize_activities(frame_analyses: List[Dict]) -> Dict: | |
| """Summarize activities from frame analyses.""" | |
| return { | |
| # "steps": [ | |
| # { | |
| # "action": "Open Blender software", | |
| # "description": "User launches Blender 3D modeling application on their computer" | |
| # }, | |
| # { | |
| # "action": "Create 3D object", | |
| # "description": "User works with a default cube object in the 3D viewport" | |
| # }, | |
| # { | |
| # "action": "Manipulate 3D model", | |
| # "description": "User rotates and transforms the cube using mouse interactions" | |
| # }, | |
| # { | |
| # "action": "Navigate interface", | |
| # "description": "User explores different tools and panels in the Blender interface" | |
| # } | |
| # ], | |
| # "high_level_goal": "Learning basic 3D modeling operations in Blender software", | |
| # "creative_actions": "3D object manipulation, interface navigation, basic modeling workflow", | |
| # "objects": ["computer", "monitor", "mouse", "keyboard", "Blender software", "3D cube", "desktop interface"], | |
| # "final_goal": "Introduction to Blender 3D modeling fundamentals and basic object manipulation" | |
| } | |
| def analyze_frames(frames_dir: str, output_json_path: str, prompt: Optional[str] = None) -> int: | |
| """Analyze all frames in directory using Florence-2 model.""" | |
| log_message(f"[INFO] Analyzing frames in {frames_dir}...") | |
| frames_dir = Path(frames_dir).resolve() | |
| output_json_path = Path(output_json_path).resolve() | |
| ensure_dir(frames_dir) | |
| ensure_dir(output_json_path.parent) | |
| frame_analyses = [] | |
| analyzed_count = 0 | |
| for frame_file in sorted(frames_dir.glob("*.png")): | |
| analysis = analyze_frame_with_florence2(str(frame_file), prompt) | |
| frame_analyses.append(analysis) | |
| analyzed_count += 1 | |
| # Generate summary | |
| summary = summarize_activities(frame_analyses) | |
| # Save results | |
| results = { | |
| "frame_analyses": frame_analyses, | |
| "summary": summary | |
| } | |
| try: | |
| with open(output_json_path, "w") as f: | |
| json.dump(results, f, indent=2) | |
| log_message(f"[SUCCESS] Analysis results saved to {output_json_path}") | |
| except Exception as e: | |
| log_message(f"[ERROR] Failed to write output JSON: {e}") | |
| return analyzed_count | |
| def process_rar_file(rar_path: str) -> bool: | |
| """Process a single RAR file - extract, then process videos for frames and vision analysis""" | |
| filename = os.path.basename(rar_path) | |
| processing_status["current_file"] = filename | |
| # Handle multi-part RAR naming | |
| if is_multipart_rar(filename): | |
| course_name = get_rar_part_base(filename) | |
| else: | |
| course_name = filename.replace(".rar", "") | |
| extract_dir = os.path.join(EXTRACT_FOLDER, course_name) | |
| try: | |
| log_message(f"🔄 Processing: {filename}") | |
| # Clean up any existing directory | |
| if os.path.exists(extract_dir): | |
| shutil.rmtree(extract_dir, ignore_errors=True) | |
| # Extract RAR | |
| os.makedirs(extract_dir, exist_ok=True) | |
| if not extract_with_retry(rar_path, extract_dir): | |
| raise Exception("RAR extraction failed") | |
| # Count extracted files | |
| file_count = 0 | |
| video_files_found = [] | |
| for root, dirs, files in os.walk(extract_dir): | |
| for file in files: | |
| file_count += 1 | |
| if file.lower().endswith((".mp4", ".avi", ".mov", ".mkv")): | |
| video_files_found.append(os.path.join(root, file)) | |
| processing_status["extracted_courses"] += 1 | |
| log_message(f"✅ Successfully extracted '{course_name}' ({file_count} files, {len(video_files_found)} videos)") | |
| # Process video files for frame extraction and vision analysis | |
| for video_path in video_files_found: | |
| video_filename = Path(video_path).name | |
| # Create a unique output directory for frames for each video | |
| frames_output_dir = os.path.join(FRAMES_OUTPUT_FOLDER, f"{course_name}_{video_filename.replace('.', '_')}_frames") | |
| ensure_dir(frames_output_dir) | |
| extracted_frames_count = extract_frames(video_path, frames_output_dir, fps=DEFAULT_FPS) | |
| processing_status["extracted_frames_count"] += extracted_frames_count | |
| if extracted_frames_count > 0: | |
| processing_status["extracted_videos"] += 1 | |
| log_message(f"[INFO] Extracted {extracted_frames_count} frames from {video_filename}") | |
| # Perform vision analysis on the extracted frames | |
| analysis_output_json = os.path.join(ANALYSIS_OUTPUT_FOLDER, f"{course_name}_{video_filename.replace('.', '_')}_analysis.json") | |
| analyzed_frames = analyze_frames(frames_output_dir, analysis_output_json) | |
| processing_status["analyzed_frames_count"] += analyzed_frames | |
| log_message(f"[INFO] Analyzed {analyzed_frames} frames from {video_filename}") | |
| else: | |
| log_message(f"[WARN] No frames extracted from {video_filename}") | |
| return True | |
| except Exception as e: | |
| error_msg = str(e) | |
| log_message(f"❌ Processing failed: {error_msg}") | |
| log_failed_file(filename, error_msg) | |
| return False | |
| finally: | |
| processing_status["current_file"] = None | |
| def main_processing_loop(start_index: int = 0): | |
| """Main processing workflow - extraction, frame extraction, and vision analysis""" | |
| processing_status["is_running"] = True | |
| try: | |
| # Load state | |
| processed_rars = load_json_state(PROCESS_STATE_FILE, {"processed_rars": []})["processed_rars"] | |
| download_state = load_json_state(DOWNLOAD_STATE_FILE, {"next_download_index": 0}) | |
| # Use start_index if provided, otherwise use the saved state | |
| next_index = start_index if start_index > 0 else download_state["next_download_index"] | |
| log_message(f"📊 Starting from index {next_index}") | |
| log_message(f"📊 Previously processed: {len(processed_rars)} files") | |
| # Get file list | |
| try: | |
| files = list(hf_api.list_repo_files(repo_id=SOURCE_REPO_ID, repo_type="dataset")) | |
| rar_files = sorted([f for f in files if f.endswith(".rar")]) | |
| processing_status["total_files"] = len(rar_files) | |
| log_message(f"📁 Found {len(rar_files)} RAR files in repository") | |
| if next_index >= len(rar_files): | |
| log_message("✅ All files have been processed!") | |
| return | |
| except Exception as e: | |
| log_message(f"❌ Failed to get file list: {str(e)}") | |
| return | |
| # Process only one file per run | |
| if next_index < len(rar_files): | |
| rar_file = rar_files[next_index] | |
| filename = os.path.basename(rar_file) | |
| if filename in processed_rars: | |
| log_message(f"⏭️ Skipping already processed: {filename}") | |
| processing_status["processed_files"] += 1 | |
| # Move to next file | |
| next_index += 1 | |
| save_json_state(DOWNLOAD_STATE_FILE, {"next_download_index": next_index}) | |
| log_message(f"📊 Moving to next file. Progress: {next_index}/{len(rar_files)}") | |
| return | |
| log_message(f"📥 Downloading: {filename}") | |
| dest_path = os.path.join(DOWNLOAD_FOLDER, filename) | |
| # Download file | |
| download_url = f"https://huggingface.co/datasets/{SOURCE_REPO_ID}/resolve/main/{rar_file}" | |
| if download_with_retry(download_url, dest_path): | |
| # Process file | |
| if process_rar_file(dest_path): | |
| processed_rars.append(filename) | |
| save_json_state(PROCESS_STATE_FILE, {"processed_rars": processed_rars}) | |
| log_message(f"✅ Successfully processed: {filename}") | |
| processing_status["processed_files"] += 1 | |
| else: | |
| log_message(f"❌ Failed to process: {filename}") | |
| processing_status["failed_files"] += 1 | |
| # Clean up downloaded file | |
| try: | |
| os.remove(dest_path) | |
| log_message(f"🗑️ Cleaned up download: {filename}") | |
| except: | |
| pass | |
| else: | |
| log_message(f"❌ Failed to download: {filename}") | |
| processing_status["failed_files"] += 1 | |
| # Update download state for next run | |
| next_index += 1 | |
| save_json_state(DOWNLOAD_STATE_FILE, {"next_download_index": next_index}) | |
| # Status update | |
| log_message(f"📊 Progress: {next_index}/{len(rar_files)} files processed") | |
| log_message(f'📊 Extracted: {processing_status["extracted_courses"]} courses') | |
| log_message(f'📊 Videos Processed: {processing_status["extracted_videos"]}') | |
| log_message(f'📊 Frames Extracted: {processing_status["extracted_frames_count"]}') | |
| log_message(f'📊 Frames Analyzed: {processing_status["analyzed_frames_count"]}') | |
| log_message(f'📊 Failed: {processing_status["failed_files"]} files') | |
| if next_index < len(rar_files): | |
| log_message(f"🔄 Run the script again to process the next file: {os.path.basename(rar_files[next_index])}") | |
| else: | |
| log_message("🎉 All files have been processed!") | |
| else: | |
| log_message("✅ All files have been processed!") | |
| log_message("🎉 Processing complete!") | |
| log_message(f'📊 Final stats: {processing_status["extracted_courses"]} courses extracted, {processing_status["extracted_videos"]} videos processed, {processing_status["extracted_frames_count"]} frames extracted, {processing_status["analyzed_frames_count"]} frames analyzed') | |
| except KeyboardInterrupt: | |
| log_message("⏹️ Processing interrupted by user") | |
| except Exception as e: | |
| log_message(f"❌ Fatal error: {str(e)}") | |
| finally: | |
| processing_status["is_running"] = False | |
| cleanup_temp_files() | |
| # FastAPI Endpoints | |
| async def analyze_video_endpoint( | |
| file: UploadFile = File(...), | |
| fps: int = Form(DEFAULT_FPS), | |
| prompt: Optional[str] = Form(None) | |
| ): | |
| """Analyze a single video file and return frame-by-frame analysis.""" | |
| if not file.filename.lower().endswith((".mp4", ".avi", ".mov", ".mkv")): | |
| return JSONResponse(status_code=400, content={ | |
| "error": "File type not allowed", | |
| "allowed_types": [".mp4", ".avi", ".mov", ".mkv"] | |
| }) | |
| with tempfile.TemporaryDirectory() as temp_dir: | |
| temp_dir_path = Path(temp_dir) | |
| file_path = temp_dir_path / file.filename | |
| with open(file_path, "wb") as buffer: | |
| shutil.copyfileobj(file.file, buffer) | |
| frames_dir = temp_dir_path / "frames" | |
| frame_count = extract_frames(file_path, frames_dir, fps) | |
| frame_analyses = [] | |
| for frame_file in sorted(frames_dir.glob("*.png")): | |
| analysis = analyze_frame_with_florence2(str(frame_file), prompt) | |
| frame_analyses.append(analysis) | |
| summary = summarize_activities(frame_analyses) | |
| return JSONResponse(content={ | |
| "video_filename": file.filename, | |
| "frame_count": frame_count, | |
| "fps": fps, | |
| "frame_analyses": frame_analyses, | |
| "summary": summary | |
| }) | |
| async def analyze_archive_endpoint( | |
| file: UploadFile = File(...), | |
| fps: int = Form(DEFAULT_FPS), | |
| prompt: Optional[str] = Form(None) | |
| ): | |
| """Analyze videos from RAR/ZIP archive and return frame-by-frame analysis.""" | |
| if not file.filename.lower().endswith((".rar", ".zip")): | |
| return JSONResponse(status_code=400, content={ | |
| "error": "File type not allowed", | |
| "allowed_types": [".rar", ".zip"] | |
| }) | |
| with tempfile.TemporaryDirectory() as temp_dir: | |
| temp_dir_path = Path(temp_dir) | |
| file_path = temp_dir_path / file.filename | |
| with open(file_path, "wb") as buffer: | |
| shutil.copyfileobj(file.file, buffer) | |
| extract_dir = temp_dir_path / "extracted" | |
| video_files = [] | |
| if file.filename.lower().endswith(".rar"): | |
| with rarfile.RarFile(file_path) as rf: | |
| rf.extractall(extract_dir) | |
| else: | |
| with zipfile.ZipFile(file_path) as zf: | |
| zf.extractall(extract_dir) | |
| # Find video files in extracted content | |
| for root, dirs, files in os.walk(extract_dir): | |
| for file in files: | |
| if file.lower().endswith((".mp4", ".avi", ".mov", ".mkv")): | |
| video_files.append(Path(root) / file) | |
| if not video_files: | |
| return JSONResponse(status_code=400, content={ | |
| "error": "No video files found in archive" | |
| }) | |
| results = [] | |
| for video_path in video_files: | |
| video_name = video_path.name | |
| frames_dir = temp_dir_path / f"frames_{video_name}" | |
| frame_count = extract_frames(video_path, frames_dir, fps) | |
| frame_analyses = [] | |
| for frame_file in sorted(frames_dir.glob("*.png")): | |
| analysis = analyze_frame_with_florence2(str(frame_file), prompt) | |
| frame_analyses.append(analysis) | |
| summary = summarize_activities(frame_analyses) | |
| results.append({ | |
| "video_filename": video_name, | |
| "frame_count": frame_count, | |
| "fps": fps, | |
| "frame_analyses": frame_analyses, | |
| "summary": summary | |
| }) | |
| return JSONResponse(content={ | |
| "archive_filename": file.filename, | |
| "videos_processed": len(video_files), | |
| "results": results | |
| }) | |
| async def health_check(): | |
| """Health check endpoint.""" | |
| return JSONResponse(content={ | |
| "status": "healthy", | |
| "model": "Florence-2 (Mock)", | |
| "note": "Florence-2 model is mocked due to sandbox memory limitations." | |
| }) | |
| async def get_processing_status(): | |
| """Get current processing status.""" | |
| return JSONResponse(content=processing_status) | |
| # Expose necessary functions and variables | |
| __all__ = [ | |
| "main_processing_loop", | |
| "processing_status", | |
| "ANALYSIS_OUTPUT_FOLDER", | |
| "log_message", | |
| "send_email_with_attachment", | |
| "analyze_frames", | |
| "extract_frames", | |
| "DEFAULT_FPS", | |
| "ensure_dir" | |
| ] |