| import os, json, logging, time, base64, gc, asyncio, concurrent.futures
|
| import cv2, numpy as np, torch
|
| from pathlib import Path
|
| from typing import List, Dict, Any, Optional, AsyncGenerator
|
| from collections import Counter
|
| from dataclasses import dataclass
|
| from dotenv import load_dotenv
|
|
|
| load_dotenv()
|
|
|
|
|
| DEEPSEEK_API_URL = "https://ds2api-tau-woad.vercel.app/v1/chat/completions"
|
| DEEPSEEK_API_KEY = "sk-ds2api-key-1-your-custom-key"
|
| DEEPSEEK_MODEL = "deepseek-chat"
|
| BASE_DIR = Path("video_analysis_pro")
|
| OUTPUT_DIR, CACHE_DIR, REPORTS_DIR = BASE_DIR/"output", BASE_DIR/"cache", BASE_DIR/"reports"
|
| for d in [BASE_DIR, OUTPUT_DIR, CACHE_DIR, REPORTS_DIR]: d.mkdir(parents=True, exist_ok=True)
|
|
|
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
| logger = logging.getLogger("ZenithEngine")
|
|
|
|
|
| try:
|
| from ultralytics import YOLO
|
| YOLO_AVAILABLE = True
|
| except ImportError:
|
| YOLO_AVAILABLE = False
|
|
|
| try:
|
| from faster_whisper import WhisperModel
|
| WHISPER_AVAILABLE = True
|
| except ImportError:
|
| WHISPER_AVAILABLE = False
|
|
|
| @dataclass
|
| class Frame:
|
| path: Path
|
| timestamp: float
|
| metrics: Dict[str, float] = None
|
| vision_content: str = ""
|
|
|
| class DeepSeekClient:
|
| def __init__(self):
|
| self.api_url = DEEPSEEK_API_URL
|
| self.api_key = DEEPSEEK_API_KEY
|
| logger.info(f"✅ DeepSeek Client initialisé avec l'URL : {self.api_url}")
|
|
|
| async def stream_content(self, model: str, messages: List[Dict[str, Any]], options: Dict[str, Any]) -> AsyncGenerator[Dict[str, Any], None]:
|
|
|
| formatted_messages = []
|
| for msg in messages:
|
| role = msg["role"]
|
| content = msg.get("content", "")
|
|
|
|
|
| if isinstance(content, list):
|
| text_parts = []
|
| image_parts = []
|
| for part in content:
|
| if part["type"] == "text":
|
| text_parts.append(part["text"])
|
| elif part["type"] == "image_url":
|
| url = part["image_url"]["url"]
|
| if url.startswith("data:"):
|
| image_parts.append({"type": "image_url", "image_url": {"url": url}})
|
|
|
|
|
| if image_parts:
|
| formatted_messages.append({
|
| "role": role,
|
| "content": [{"type": "text", "text": " ".join(text_parts)}] + image_parts
|
| })
|
| else:
|
| formatted_messages.append({"role": role, "content": " ".join(text_parts)})
|
| else:
|
| formatted_messages.append({"role": role, "content": content})
|
|
|
| payload = {
|
| "model": model,
|
| "messages": formatted_messages,
|
| "temperature": options.get("temperature", 0.7),
|
| "stream": True
|
| }
|
|
|
| import httpx
|
| async with httpx.AsyncClient(timeout=None) as client:
|
| try:
|
| async with client.stream(
|
| "POST", self.api_url,
|
| headers={
|
| "Authorization": f"Bearer {self.api_key}",
|
| "Content-Type": "application/json"
|
| },
|
| json=payload
|
| ) as response:
|
| if response.status_code != 200:
|
| error_text = await response.aread()
|
| logger.error(f"❌ Erreur DeepSeek API (HTTP {response.status_code}): {error_text.decode()}")
|
| yield {"error": f"Erreur API DeepSeek: {response.status_code}"}
|
| return
|
|
|
| async for line in response.aiter_lines():
|
| if line.startswith("data: "):
|
| data_str = line[6:]
|
| if data_str.strip() == "[DONE]":
|
| break
|
| try:
|
| data = json.loads(data_str)
|
|
|
| if "choices" in data and len(data["choices"]) > 0:
|
| delta = data["choices"][0].get("delta", {})
|
| content = delta.get("content", "")
|
| if content:
|
|
|
| yield {
|
| "response": {
|
| "candidates": [{
|
| "content": {
|
| "parts": [{"text": content}]
|
| }
|
| }]
|
| }
|
| }
|
| except json.JSONDecodeError:
|
| continue
|
| except Exception as e:
|
| logger.error(f"❌ Erreur lors du streaming DeepSeek : {str(e)}")
|
| yield {"error": str(e)}
|
|
|
| class VideoProcessor:
|
| @staticmethod
|
| def get_frame_metrics(frame: np.ndarray) -> dict:
|
| try:
|
| gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
|
| hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
|
| return {"brightness": float(np.mean(gray)), "contrast": float(np.std(gray)),
|
| "saturation": float(np.mean(hsv[:, :, 1])), "sharpness": float(cv2.Laplacian(gray, cv2.CV_64F).var())}
|
| except: return {"brightness": 0, "contrast": 0, "saturation": 0, "sharpness": 0}
|
|
|
| def __init__(self, video_path: Path, output_dir: Path):
|
| self.video_path, self.output_dir = video_path, output_dir
|
| self.output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
| def extract_keyframes(self, max_frames: int = 30) -> List[Frame]:
|
| """
|
| Extraction intelligente de keyframes avec échantillonnage adaptatif.
|
| - Vidéos courtes (<2min) : 1 frame toutes les 3-4s
|
| - Vidéos moyennes (2-10min) : 1 frame toutes les 10-15s
|
| - Vidéos longues (>10min) : 1 frame toutes les 20-30s
|
| """
|
| try:
|
| from decord import VideoReader, cpu
|
| vr = VideoReader(str(self.video_path), ctx=cpu(0))
|
| total = len(vr)
|
| fps = vr.get_avg_fps()
|
| duration_seconds = total / fps
|
|
|
|
|
| if duration_seconds < 120:
|
| target_interval = 3
|
| elif duration_seconds < 600:
|
| target_interval = 12
|
| else:
|
| target_interval = 25
|
|
|
|
|
| optimal_frames = min(int(duration_seconds / target_interval), max_frames)
|
| optimal_frames = max(optimal_frames, 10)
|
|
|
| step = max(1, total // optimal_frames)
|
| indices = range(0, total, step)[:optimal_frames]
|
| frames_data = vr.get_batch(indices).asnumpy()
|
|
|
| extracted = []
|
| for i, idx in enumerate(indices):
|
| img = cv2.cvtColor(frames_data[i], cv2.COLOR_RGB2BGR)
|
| ts = idx / fps
|
| p = self.output_dir / f"f_{idx}.jpg"
|
| cv2.imwrite(str(p), img, [cv2.IMWRITE_JPEG_QUALITY, 70])
|
| extracted.append(Frame(path=p, timestamp=ts, metrics=self.get_frame_metrics(img)))
|
|
|
| logger.info(f"✅ Extraction adaptative : {len(extracted)} frames pour {duration_seconds:.1f}s de vidéo (1 frame/{target_interval}s)")
|
| return extracted
|
| except Exception as e:
|
| logger.warning(f"Decord failed, fallback to CV2: {e}")
|
| cap = cv2.VideoCapture(str(self.video_path))
|
| fps = cap.get(cv2.CAP_PROP_FPS) or 30.0
|
| total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) or 1000
|
| duration_seconds = total / fps
|
|
|
|
|
| if duration_seconds < 120:
|
| target_interval = 3
|
| elif duration_seconds < 600:
|
| target_interval = 12
|
| else:
|
| target_interval = 25
|
|
|
| optimal_frames = min(int(duration_seconds / target_interval), max_frames)
|
| optimal_frames = max(optimal_frames, 10)
|
|
|
| step = max(1, total // optimal_frames)
|
| extracted = []
|
| for idx in range(0, total, step):
|
| if len(extracted) >= optimal_frames: break
|
| cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
|
| ret, img = cap.read()
|
| if ret:
|
| ts = idx / fps
|
| p = self.output_dir / f"f_{idx}.jpg"
|
| cv2.imwrite(str(p), img, [cv2.IMWRITE_JPEG_QUALITY, 70])
|
| extracted.append(Frame(path=p, timestamp=ts, metrics=self.get_frame_metrics(img)))
|
| cap.release()
|
| logger.info(f"✅ Extraction CV2 adaptative : {len(extracted)} frames pour {duration_seconds:.1f}s de vidéo")
|
| return extracted
|
|
|
| class AudioProcessor:
|
| def __init__(self): self.model = None
|
| def initialize(self):
|
| if WHISPER_AVAILABLE and self.model is None:
|
| try:
|
| device = "cuda" if torch.cuda.is_available() else "cpu"
|
|
|
| self.model = WhisperModel("tiny", device=device, compute_type="int8")
|
| except: pass
|
| def transcribe(self, p: Path) -> str:
|
| self.initialize()
|
| if not self.model: return "Transcription indisponible"
|
| try:
|
| segments, info = self.model.transcribe(str(p), beam_size=5)
|
| transcript = " ".join([s.text for s in segments])
|
| return f"[Langue source détectée: {info.language.upper()}] {transcript}"
|
| except: return "Erreur transcription"
|
|
|
| class VideoDownloader:
|
| @staticmethod
|
| def download(url: str, output_dir: Path) -> Optional[Path]:
|
| import yt_dlp
|
| ydl_opts = {
|
| 'format': 'bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best',
|
| 'outtmpl': str(output_dir / 'downloaded_video.%(ext)s'),
|
| 'noplaylist': True, 'quiet': True, 'no_warnings': True, 'nocheckcertificate': True,
|
| 'user_agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
|
| 'referer': 'https://www.google.com/',
|
| 'http_headers': {'Accept': '*/*', 'Accept-Language': 'en-US,en;q=0.9'}
|
| }
|
| try:
|
| with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
| info = ydl.extract_info(url, download=True)
|
| return Path(ydl.prepare_filename(info))
|
| except: return None
|
|
|
| class ZenithAnalyzer:
|
| def __init__(self):
|
| self.deepseek = DeepSeekClient()
|
| self.audio_proc = AudioProcessor()
|
| self.yolo = YOLO("yolov8n.pt") if YOLO_AVAILABLE else None
|
|
|
| async def extract_frames_only(self, video_path: Path, session_id: str) -> List[str]:
|
| session_dir = OUTPUT_DIR / f"session_{session_id}"
|
| session_dir.mkdir(parents=True, exist_ok=True)
|
| proc = VideoProcessor(video_path, session_dir)
|
| frames = proc.extract_keyframes()
|
| return [f"/output/session_{session_id}/{f.path.name}" for f in frames[:12]]
|
|
|
| async def run_full_analysis(self, video_path: Path, session_id: str, custom_prompt: Optional[str] = None) -> AsyncGenerator[Dict[str, Any], None]:
|
| session_dir = OUTPUT_DIR / f"session_{session_id}"
|
| session_dir.mkdir(parents=True, exist_ok=True)
|
| cache_file = session_dir / "analysis_cache.json"
|
|
|
|
|
| existing_frames = list(session_dir.glob("f_*.jpg"))
|
| if not existing_frames:
|
| yield {"status": "sampling", "message": "Analyse des séquences..."}
|
| proc = VideoProcessor(video_path, session_dir)
|
| frames = proc.extract_keyframes()
|
| else:
|
| def get_idx(p):
|
| try: return int(p.stem.split('_')[1])
|
| except: return 0
|
| existing_paths = sorted(existing_frames, key=get_idx)
|
| frames = []
|
| for p in existing_paths:
|
| img = cv2.imread(str(p))
|
| metrics = VideoProcessor.get_frame_metrics(img) if img is not None else {"brightness": 0, "contrast": 0, "saturation": 0, "sharpness": 0}
|
| frames.append(Frame(path=p, timestamp=0.0, metrics=metrics))
|
| yield {"status": "sampling", "message": "Récupération des séquences existantes..."}
|
|
|
|
|
| frame_urls = [f"/output/session_{session_id}/{f.path.name}" for f in frames[:12]]
|
| yield {"status": "frames_ready", "frames": frame_urls, "message": "Séquences prêtes."}
|
|
|
|
|
| cached_data = {}
|
| if cache_file.exists():
|
| try:
|
| with open(cache_file, "r") as f:
|
| cached_data = json.load(f)
|
| logger.info(f"✅ Cache trouvé pour la session {session_id}")
|
| except: pass
|
|
|
| if "transcript" in cached_data and "vision_info" in cached_data:
|
| transcript = cached_data["transcript"]
|
| v_info = cached_data["vision_info"]
|
| yield {"status": "fusion", "message": "Utilisation des données en cache..."}
|
| else:
|
| yield {"status": "audio", "message": "Traitement audio & visuel..."}
|
| loop = asyncio.get_event_loop()
|
| with concurrent.futures.ThreadPoolExecutor() as executor:
|
| audio_task = loop.run_in_executor(executor, self.audio_proc.transcribe, video_path)
|
|
|
| if self.yolo:
|
| all_paths = [str(f.path) for f in frames]
|
| batch_size = 20
|
| for i in range(0, len(all_paths), batch_size):
|
| batch = all_paths[i:i+batch_size]
|
| results = await loop.run_in_executor(executor, lambda: self.yolo(batch, verbose=False, imgsz=256, stream=False))
|
| for j, res in enumerate(results):
|
| idx = i + j
|
| objs = [res.names[int(b.cls[0])] for b in res.boxes if b.conf > 0.35]
|
| ambiance = f"Ambiance: {'Sombre' if frames[idx].metrics['brightness'] < 50 else 'Lumineuse'}"
|
| frames[idx].vision_content = f"{ambiance}, Objets: " + ", ".join([f"{v}x {k}" for k,v in Counter(objs).items()])
|
|
|
| transcript = await audio_task
|
|
|
| v_info = "\n".join([f"[{f.timestamp:.1f}s] {f.vision_content}" for f in frames[:40]])
|
|
|
|
|
| try:
|
| with open(cache_file, "w") as f:
|
| json.dump({"transcript": transcript, "vision_info": v_info}, f)
|
| except: pass
|
|
|
| yield {"status": "fusion", "message": "Intelligence Artificielle en action..."}
|
|
|
|
|
| base_instruction = custom_prompt if custom_prompt else "Résumer et continuer l'analyse du média"
|
|
|
| prompt = f"""Tu es l'unité Zenith AI, un système d'analyse de données multimédias.
|
| INSTRUCTION UTILISATEUR : {base_instruction}
|
|
|
| DONNÉES D'ENTRÉE :
|
| - TRANSCRIPTION : {transcript}
|
| - DONNÉES VISUELLES : {v_info}
|
|
|
| Produis un rapport TECHNIQUE, FACTUEL et STRUCTURÉ en Markdown."""
|
|
|
|
|
|
|
| num_images_to_send = min(8, len(frames))
|
| if len(frames) > 0:
|
| step = max(1, len(frames) // num_images_to_send)
|
| selected_frames = [frames[i] for i in range(0, len(frames), step)][:num_images_to_send]
|
| else:
|
| selected_frames = []
|
|
|
| def encode_f(f):
|
| img = cv2.imread(str(f.path))
|
|
|
| img = cv2.resize(img, (800, 450), interpolation=cv2.INTER_AREA)
|
| _, buf = cv2.imencode('.jpg', img, [cv2.IMWRITE_JPEG_QUALITY, 65])
|
| return {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{base64.b64encode(buf).decode()}"}}
|
|
|
| with concurrent.futures.ThreadPoolExecutor() as executor:
|
| images = list(executor.map(encode_f, selected_frames))
|
|
|
| messages = [{"role": "user", "content": [{"type": "text", "text": prompt}] + images}]
|
|
|
| yield {"status": "generating", "message": "Génération du rapport par l'IA..."}
|
| async for chunk in self.deepseek.stream_content(DEEPSEEK_MODEL, messages, {"temperature": 0.7}):
|
| if "error" in chunk:
|
| yield {"error": chunk["error"]}
|
| break
|
| resp = chunk.get("response", {})
|
| candidates = resp.get("candidates", [])
|
| if candidates:
|
| for part in candidates[0].get("content", {}).get("parts", []):
|
| text = part.get("text", "")
|
| if text: yield {"status": "streaming", "text": text}
|
|
|
|
|
| gc.collect()
|
| if torch.cuda.is_available(): torch.cuda.empty_cache()
|
| yield {"status": "completed", "message": "Analyse terminée."}
|
|
|