Spaces:
Sleeping
Sleeping
| import torch | |
| from transformers import AutoModel, AutoTokenizer | |
| from modelscope.hub.snapshot_download import snapshot_download | |
| from PIL import Image | |
| from functools import lru_cache | |
| from decord import VideoReader, cpu | |
| import os | |
| import gc | |
| import cv2 | |
| import tempfile | |
| import shutil | |
| import subprocess | |
| import ffmpeg # Added for ffmpeg-python | |
| from yolo_detection import is_image, is_video | |
| # Constants for video processing | |
| MAX_NUM_FRAMES = 32 # Reduced from 64 to potentially avoid OOM | |
| # Check if CUDA is available | |
| DEVICE = "cuda" if torch.cuda.is_available() else "cpu" | |
| global TOTAL_CHUNKS | |
| TOTAL_CHUNKS = 1 | |
| # Initialize GPU if available | |
| if DEVICE == "cuda": | |
| def debug(): | |
| torch.randn(10).cuda() | |
| debug() | |
| # Model configuration | |
| MODEL_NAME = 'iic/mPLUG-Owl3-7B-240728' | |
| MODEL_CACHE_DIR = "/data/models" | |
| os.makedirs(MODEL_CACHE_DIR, exist_ok=True) | |
| # Download and cache the model | |
| try: | |
| model_path = snapshot_download(MODEL_NAME, cache_dir=MODEL_CACHE_DIR) | |
| except Exception as e: | |
| print(f"Error downloading model: {str(e)}") | |
| model_path = os.path.join(MODEL_CACHE_DIR, MODEL_NAME) | |
| # Model configuration and existing functions remain unchanged... | |
| def load_model_and_tokenizer(): | |
| """Load a cached instance of the model and tokenizer""" | |
| print("Loading/Retrieving mPLUG model from cache...") | |
| try: | |
| # Clear GPU memory if using CUDA | |
| if DEVICE == "cuda": | |
| torch.cuda.empty_cache() | |
| gc.collect() | |
| model = AutoModel.from_pretrained( | |
| model_path, | |
| attn_implementation='sdpa', | |
| trust_remote_code=True, | |
| torch_dtype=torch.half, | |
| device_map='auto' | |
| ) | |
| tokenizer = AutoTokenizer.from_pretrained( | |
| model_path, | |
| trust_remote_code=True | |
| ) | |
| model.eval() | |
| processor = model.init_processor(tokenizer) | |
| return model, tokenizer, processor | |
| except Exception as e: | |
| print(f"Error loading model: {str(e)}") | |
| raise | |
| def process_image(image_path, model, tokenizer, processor, prompt): | |
| """Process single image with mPLUG model""" | |
| try: | |
| image = Image.open(image_path) | |
| messages = [{ | |
| "role": "user", | |
| "content": prompt, | |
| "images": [image] | |
| }] | |
| model_messages = [] | |
| images = [] | |
| for msg in messages: | |
| content_str = msg["content"] | |
| if "images" in msg and msg["images"]: | |
| content_str += "<|image|>" | |
| images.extend(msg["images"]) | |
| model_messages.append({ | |
| "role": msg["role"], | |
| "content": content_str | |
| }) | |
| model_messages.append({ | |
| "role": "assistant", | |
| "content": "" | |
| }) | |
| inputs = processor( | |
| model_messages, | |
| images=images, | |
| videos=None | |
| ) | |
| inputs.to('cuda') | |
| inputs.update({ | |
| 'tokenizer': tokenizer, | |
| 'max_new_tokens': 100, | |
| 'decode_text': True, | |
| }) | |
| response = model.generate(**inputs) | |
| return response[0] | |
| except Exception as e: | |
| print(f"Error processing image: {str(e)}") | |
| return "Error processing image" | |
| def process_video_chunk(video_frames, model, tokenizer, processor, prompt): | |
| """Process a chunk of video frames with mPLUG model""" | |
| messages = [ | |
| { | |
| "role": "user", | |
| "content": prompt, | |
| "video_frames": video_frames | |
| } | |
| ] | |
| model_messages = [] | |
| videos = [] | |
| for msg in messages: | |
| content_str = msg["content"] | |
| if "video_frames" in msg and msg["video_frames"]: | |
| content_str += "<|video|>" | |
| videos.append(msg["video_frames"]) | |
| model_messages.append({ | |
| "role": msg["role"], | |
| "content": content_str | |
| }) | |
| model_messages.append({ | |
| "role": "assistant", | |
| "content": "" | |
| }) | |
| inputs = processor( | |
| model_messages, | |
| images=None, | |
| videos=videos if videos else None | |
| ) | |
| inputs.to('cuda') | |
| inputs.update({ | |
| 'tokenizer': tokenizer, | |
| 'max_new_tokens': 100, | |
| 'decode_text': True, | |
| }) | |
| response = model.generate(**inputs) | |
| del inputs | |
| return response[0] | |
| def split_original_video(video_path, chunk_info): | |
| """Split original video into chunks using multiple methods with fallbacks for cross-platform reliability""" | |
| original_chunks = [] | |
| # Clean the ./tmp directory containing chunks/thumbnails | |
| tmp_dir = os.path.join('.', 'tmp') | |
| if os.path.exists(tmp_dir): | |
| try: | |
| shutil.rmtree(tmp_dir) | |
| os.makedirs(tmp_dir, exist_ok=True) # Recreate for next run | |
| print(f"Cleaned up temporary directory: {tmp_dir}") | |
| except OSError as e: | |
| print(f"Error removing temporary directory {tmp_dir}: {e}") | |
| else: | |
| os.makedirs(tmp_dir) | |
| for chunk in chunk_info: | |
| chunk_id = chunk['chunk_id'] | |
| start_time = chunk['start_time'] | |
| end_time = chunk['end_time'] | |
| output_path = os.path.join(tmp_dir, f"original_chunk_{chunk_id}.mp4") | |
| # Try three different methods in order of preference | |
| chunk_created = False | |
| # Method 1: Try ffmpeg-python library | |
| if not chunk_created: | |
| try: | |
| ( | |
| ffmpeg | |
| .input(video_path, ss=start_time, to=end_time) | |
| .output(output_path, c='copy', loglevel="quiet") # Added loglevel quiet | |
| .run(capture_stdout=True, capture_stderr=True) | |
| ) | |
| # Check if file exists and is not empty after ffmpeg-python call | |
| if os.path.exists(output_path) and os.path.getsize(output_path) > 0: | |
| chunk_created = True | |
| print(f"Successfully created chunk {chunk_id} using ffmpeg-python") | |
| else: | |
| print(f"ffmpeg-python ran but did not create a valid file for chunk {chunk_id}") | |
| # Optionally raise an exception here if needed, or just let it proceed to next method | |
| except ffmpeg.Error as e: # Catch specific ffmpeg errors | |
| print(f"ffmpeg-python error for chunk {chunk_id}: {e.stderr.decode() if e.stderr else str(e)}, trying OpenCV method") | |
| except Exception as e: # Catch other potential errors like file not found | |
| print(f"ffmpeg-python failed with general error for chunk {chunk_id}: {str(e)}, trying OpenCV method") | |
| # Method 2: Try OpenCV for video splitting (re-encoding) | |
| if not chunk_created: | |
| try: | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| raise IOError(f"Cannot open video file: {video_path}") | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| if fps <= 0: # Handle case where fps is invalid | |
| print(f"Warning: Invalid FPS ({fps}) detected for {video_path}. Using default 30.") | |
| fps = 30.0 | |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
| out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) | |
| # Calculate frame positions | |
| start_frame = int(start_time * fps) | |
| end_frame = int(end_time * fps) | |
| # Set position to start frame | |
| cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame) | |
| current_frame = start_frame | |
| while current_frame < end_frame: | |
| ret, frame = cap.read() | |
| if not ret: | |
| print(f"Warning: Could not read frame {current_frame} for chunk {chunk_id}. Reached end of video early?") | |
| break # Stop if we can't read a frame | |
| out.write(frame) | |
| current_frame += 1 | |
| cap.release() | |
| out.release() | |
| # Check if file exists and is not empty after OpenCV call | |
| if os.path.exists(output_path) and os.path.getsize(output_path) > 0: | |
| chunk_created = True | |
| print(f"Successfully created chunk {chunk_id} using OpenCV") | |
| else: | |
| print(f"OpenCV method ran but did not create a valid file for chunk {chunk_id}") | |
| except Exception as e: | |
| print(f"OpenCV method failed for chunk {chunk_id}: {str(e)}, trying subprocess method") | |
| # Clean up potentially empty file created by OpenCV on error | |
| if os.path.exists(output_path): | |
| try: | |
| os.remove(output_path) | |
| except OSError: | |
| pass # Ignore cleanup error | |
| # Method 3: Last resort - Try subprocess with better error handling | |
| if not chunk_created: | |
| try: | |
| cmd = [ | |
| 'ffmpeg', | |
| '-ss', str(start_time), | |
| '-to', str(end_time), | |
| '-i', video_path, | |
| '-c', 'copy', # Attempt copy first | |
| '-loglevel', 'error', # Reduce log noise | |
| output_path | |
| ] | |
| process = subprocess.run(cmd, capture_output=True, text=True, check=False) # Don't check=True initially | |
| if process.returncode != 0 or not os.path.exists(output_path) or os.path.getsize(output_path) == 0: | |
| print(f"Subprocess ffmpeg copy failed for chunk {chunk_id}. Stderr: {process.stderr}. Trying re-encoding.") | |
| # If copy fails, try re-encoding as a fallback within subprocess | |
| cmd_reencode = [ | |
| 'ffmpeg', | |
| '-ss', str(start_time), | |
| '-to', str(end_time), | |
| '-i', video_path, | |
| # '-c:v', 'libx264', # Example re-encode, adjust as needed | |
| # '-crf', '23', | |
| # '-c:a', 'aac', | |
| '-loglevel', 'error', | |
| output_path | |
| ] | |
| # Ensure overwrite if previous attempt created an empty file | |
| if os.path.exists(output_path): | |
| cmd_reencode.insert(1, '-y') # Add overwrite flag | |
| process_reencode = subprocess.run(cmd_reencode, capture_output=True, text=True, check=False) | |
| if process_reencode.returncode != 0: | |
| raise Exception(f"Subprocess ffmpeg re-encode also failed. Stderr: {process_reencode.stderr}") | |
| # Final check after subprocess attempts | |
| if os.path.exists(output_path) and os.path.getsize(output_path) > 0: | |
| chunk_created = True | |
| print(f"Successfully created chunk {chunk_id} using subprocess ffmpeg") | |
| else: | |
| raise Exception("Subprocess ffmpeg failed to create a valid file.") | |
| except FileNotFoundError: | |
| print(f"Subprocess failed for chunk {chunk_id}: 'ffmpeg' command not found. Ensure ffmpeg is installed and in PATH.") | |
| except Exception as e: | |
| print(f"Subprocess method failed for chunk {chunk_id}: {str(e)}") | |
| # Clean up potentially empty file | |
| if os.path.exists(output_path): | |
| try: | |
| os.remove(output_path) | |
| except OSError: | |
| pass | |
| # If any method succeeded, add the chunk to our list | |
| if chunk_created and os.path.exists(output_path): | |
| original_chunks.append(output_path) | |
| else: | |
| print(f"Warning: Failed to create chunk {chunk_id} using all methods, skipping.") | |
| return original_chunks | |
| def encode_video_in_chunks(video_path): | |
| """Extract frames from a video in chunks and save chunks to disk""" | |
| global TOTAL_CHUNKS | |
| vr = VideoReader(video_path, ctx=cpu(0)) | |
| original_fps = vr.get_avg_fps() | |
| sample_fps = round(original_fps / 1) # 1 FPS | |
| frame_idx = [i for i in range(0, len(vr), sample_fps)] | |
| fps = vr.get_avg_fps() | |
| # Create tmp directory if it doesn't exist | |
| tmp_dir = os.path.join('.', 'tmp') | |
| os.makedirs(tmp_dir, exist_ok=True) | |
| # Split frame indices into chunks | |
| chunks = [ | |
| frame_idx[i:i + MAX_NUM_FRAMES] | |
| for i in range(0, len(frame_idx), MAX_NUM_FRAMES) | |
| ] | |
| # Set global TOTAL_CHUNKS before processing | |
| TOTAL_CHUNKS = len(chunks) | |
| print(f"Total chunks: {TOTAL_CHUNKS}") | |
| # Information about saved chunks | |
| chunk_info = [] | |
| for chunk_idx, chunk in enumerate(chunks): | |
| # Get frames for this chunk | |
| frames = vr.get_batch(chunk).asnumpy() | |
| frames_pil = [Image.fromarray(v.astype('uint8')) for v in frames] | |
| # Save chunk as a video file | |
| chunk_path = os.path.join(tmp_dir, f"chunk_{chunk_idx}.mp4") | |
| # Calculate start and end times for this chunk | |
| if chunk: | |
| start_frame = chunk[0] | |
| end_frame = chunk[-1] | |
| start_time = start_frame / fps | |
| end_time = end_frame / fps | |
| # Save chunk info for later use | |
| chunk_info.append({ | |
| 'chunk_id': chunk_idx, | |
| 'path': chunk_path, | |
| 'start_time': start_time, | |
| 'end_time': end_time, | |
| 'start_frame': start_frame, | |
| 'end_frame': end_frame, | |
| 'original_fps': fps # Use actual fps from video | |
| }) | |
| # Use OpenCV to create video from frames | |
| height, width, _ = frames[0].shape | |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
| out = cv2.VideoWriter(chunk_path, fourcc, fps, (width, height)) | |
| for frame in frames: | |
| # Convert RGB to BGR (OpenCV format) | |
| frame_bgr = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR) | |
| out.write(frame_bgr) | |
| out.release() | |
| print(f"Saved chunk {chunk_idx} to {chunk_path}") | |
| yield chunk_idx, frames_pil, chunk_info[-1] if chunk_info else None | |
| # Split original video after processing all chunks | |
| original_chunks = split_original_video(video_path, chunk_info) | |
| def analyze_image_activities(image_path): | |
| """Analyze construction site image and generate activity description""" | |
| from datetime import datetime, timedelta | |
| try: | |
| # Sample structured response - Replace with actual model processing | |
| return [ | |
| { | |
| 'time': datetime.now().strftime("%I:%M %p"), | |
| 'summary': 'Excavation work in progress', | |
| 'objects': ['excavator', 'worker', 'dump-truck'] | |
| }, | |
| { | |
| 'time': (datetime.now() - timedelta(minutes=30)).strftime("%I:%M %p"), | |
| 'summary': 'Material loading operation', | |
| 'objects': ['loader', 'worker', 'gravel'] | |
| } | |
| ] | |
| except Exception as e: | |
| print(f"Error analyzing image: {str(e)}") | |
| return [] # Return empty list on error | |
| def generate_thumbnails(video_path, num_chunks): | |
| """Extract thumbnails for each chunk | |
| Args: | |
| video_path: Path to video file | |
| num_chunks: Number of chunks to generate thumbnails for | |
| """ | |
| vr = VideoReader(video_path, ctx=cpu(0)) | |
| thumbnails = [] | |
| total_frames = len(vr) | |
| # Create/clear tmp directory in current working directory | |
| tmp_dir = os.path.join('.', 'tmp') | |
| # Remove existing directory if it exists | |
| if os.path.exists(tmp_dir): | |
| shutil.rmtree(tmp_dir) | |
| os.makedirs(tmp_dir, exist_ok=True) | |
| # Calculate frame step size based on number of chunks | |
| frame_step = total_frames // num_chunks | |
| for chunk_idx in range(num_chunks): | |
| # Take frame at start of each chunk | |
| frame_idx = chunk_idx * frame_step | |
| if frame_idx < total_frames: | |
| frame = vr[frame_idx].asnumpy() | |
| img = Image.fromarray(frame) | |
| temp_path = os.path.join(tmp_dir, f"thumbnail_{chunk_idx}.jpg") | |
| img.save(temp_path) | |
| thumbnails.append({ | |
| "path": temp_path, | |
| "time": frame_idx/vr.get_avg_fps() | |
| }) | |
| return thumbnails | |
| def analyze_video_activities(video_path, model, tokenizer, processor): | |
| """Analyze video using mPLUG model with chunking""" | |
| global TOTAL_CHUNKS | |
| # try: | |
| # Existing chunk processing | |
| all_activities = [] | |
| # Calculate total chunks first | |
| vr = VideoReader(video_path, ctx=cpu(0)) | |
| sample_fps = round(vr.get_avg_fps() / 1) | |
| frame_idx = [i for i in range(0, len(vr), sample_fps)] | |
| TOTAL_CHUNKS = len([frame_idx[i:i + MAX_NUM_FRAMES] | |
| for i in range(0, len(frame_idx), MAX_NUM_FRAMES)]) | |
| # Generate thumbnails with known chunk count | |
| thumbnails = generate_thumbnails(video_path, num_chunks=TOTAL_CHUNKS) | |
| # Now process chunks | |
| chunk_generator = encode_video_in_chunks(video_path) | |
| for chunk_idx, video_frames, chunk_info in chunk_generator: | |
| prompt = "Analyze this construction site video chunk and describe the activities happening. Focus on construction activities, machinery usage, and worker actions. Include any construction equipment or machinery you can identify." | |
| response = process_video_chunk(video_frames, model, tokenizer, processor, prompt) | |
| print(f"Chunk {chunk_idx}: {response}") | |
| # Map responses to thumbnails | |
| time_start = chunk_idx * MAX_NUM_FRAMES | |
| chunk_thumbnails = [t for t in thumbnails | |
| if time_start <= t['time'] < time_start + MAX_NUM_FRAMES] | |
| # Extract time from frame position | |
| for thumbnail in chunk_thumbnails: | |
| # Calculate timestamp in minutes:seconds format | |
| seconds = int(thumbnail['time']) | |
| minutes = seconds // 60 | |
| seconds = seconds % 60 | |
| timestamp = f"{minutes:02d}:{seconds:02d}" | |
| # Extract objects using basic text parsing from the response | |
| # In a production system, you might want to use more sophisticated NLP | |
| objects = [] | |
| lower_response = response.lower() | |
| possible_objects = ["excavator", "bulldozer", "crane", "truck", "loader", | |
| "worker", "concrete", "scaffold", "beam", "pipe", | |
| "rebar", "formwork", "drill", "grader", "roller"] | |
| for obj in possible_objects: | |
| if obj in lower_response: | |
| objects.append(obj) | |
| activity = { | |
| 'time': timestamp, | |
| 'timestamp_seconds': thumbnail['time'], # Store raw seconds for sorting | |
| 'summary': response, | |
| 'objects': objects, | |
| 'thumbnail': thumbnail["path"], | |
| 'chunk_id': chunk_idx, | |
| 'chunk_path': chunk_info['path'] if chunk_info else None | |
| } | |
| all_activities.append(activity) | |
| # Sort activities by timestamp | |
| all_activities.sort(key=lambda x: x['timestamp_seconds']) | |
| return all_activities | |
| # except Exception as e: | |
| # print(f"Error analyzing video: {str(e)}") | |
| # return [] # Maintain consistent return type | |