import gradio as gr import tempfile import json import shutil import os import cv2 import numpy as np import torch import importlib import requests import textwrap # Optional PDF reporting: import reportlab safely and set a flag. # REPORTLAB_AVAILABLE will be used by _write_pdf to select the PDF code path. try: from reportlab.lib.pagesizes import A4 from reportlab.pdfgen import canvas REPORTLAB_AVAILABLE = True except Exception: REPORTLAB_AVAILABLE = False # ZeroGPU: decorador para marcar funciones GPU. Fallback local si no existe try: import spaces # provisto en HF Spaces GPU_DECORATOR = spaces.GPU except Exception: def GPU_DECORATOR(func=None, **kwargs): # Permite usar @GPU_DECORATOR o @GPU_DECORATOR(...) if func is None: def wrap(f): return f return wrap return func # ──────────────────────────── # Configuración # ──────────────────────────── os.environ["OMP_NUM_THREADS"] = "1" # evita warnings de OpenMP # Do NOT initialize the model at import time in Spaces with Stateless GPU. # Lazily load the YOLO model inside worker processes (functions decorated # with @GPU_DECORATOR) to avoid initializing CUDA in the main process. model = None def get_model(): """Lazily load and return the YOLO model. Important: call this from inside worker processes (e.g. functions decorated with @GPU_DECORATOR) so CUDA is initialized in the worker and not in the main process (required for HF Spaces with Stateless GPU). """ global model if model is None: # Import YOLO inside the function to avoid importing ultralytics (which # may touch CUDA) at module import time. from ultralytics import YOLO model = YOLO("best2.pt") return model # ──────────────────────────── # Métricas simples (persistidas en /tmp) # ──────────────────────────── METRICS_PATH = os.path.join(tempfile.gettempdir(), "blade_metrics.json") def _load_metrics(): try: if os.path.exists(METRICS_PATH): with open(METRICS_PATH, "r", encoding="utf-8") as f: return json.load(f) except Exception: pass return { "total_jobs": 0, "videos": 0, "images": 0, "detections_total": 0, "per_label": {}, "last_job": None, } def _save_metrics(m): try: with open(METRICS_PATH, "w", encoding="utf-8") as f: json.dump(m, f, ensure_ascii=False, indent=2) except Exception: pass def _record_metrics(job_type, counts): m = _load_metrics() m["total_jobs"] += 1 if job_type == "video": m["videos"] += 1 elif job_type == "image": m["images"] += 1 dets = int(sum(counts.values())) if isinstance(counts, dict) else 0 m["detections_total"] += dets # per label aggregate if isinstance(counts, dict): per = m.get("per_label", {}) for k, v in counts.items(): per[k] = int(per.get(k, 0)) + int(v) m["per_label"] = per m["last_job"] = {"type": job_type, "detections": dets} _save_metrics(m) def get_metrics(): """Devuelve el snapshot actual de métricas.""" return _load_metrics() # ──────────────────────────── # Funciones de Inferencia # ──────────────────────────── @GPU_DECORATOR def infer_media(media_path, conf=0.25, iou=0.45, out_res="720p", preset="default"): """ Procesa un fichero de vídeo o imagen con parámetros configurables. Retornos: - Vídeo: {"video": out_vid_path, "classes": {label: count, ...}} - Imagen: {"path": out_img_path, "classes": {label: count, ...}} """ if not media_path: # Si no hay entrada (p.ej., se pulsó el botón en la otra pestaña), no fallar. return {} # Ensure the model is loaded inside the worker process (do not load at import time) global model model = get_model() # Dispositivo: usar GPU si está disponible (ZeroGPU) device = 0 if torch.cuda.is_available() else "cpu" use_half = device != "cpu" ext = os.path.splitext(media_path)[1].lower() tmpdir = tempfile.mkdtemp() # Resolución objetivo res_map = {"360p": (640, 360), "480p": (854, 480), "720p": (1280, 720)} target_size = res_map.get(out_res) # ─ Vídeo ─────────────────────────────────────────────────────── if ext in [".mp4", ".mov", ".avi", ".mkv"]: in_vid = os.path.join(tmpdir, "in.mp4") out_vid = os.path.join(tmpdir, "out.mp4") shutil.copy(media_path, in_vid) # FPS del vídeo (opcional: tomar real si existe) cap = cv2.VideoCapture(in_vid) fps = cap.get(cv2.CAP_PROP_FPS) or 30 try: fps = float(fps) if fps <= 0 or fps != fps: # NaN check fps = 30 except Exception: fps = 30 writer = None counts = {} # Streaming de frames con anotaciones y conteo por clase results = model.predict(source=in_vid, conf=conf, iou=iou, stream=True, device=device) for r in results: # acumular conteos for b in r.boxes: label = model.names[int(b.cls[0])] counts[label] = counts.get(label, 0) + 1 annotated = r.plot() # frame anotado if target_size: annotated = cv2.resize(annotated, target_size) if writer is None: h, w = annotated.shape[:2] fourcc = cv2.VideoWriter_fourcc(*"mp4v") writer = cv2.VideoWriter(out_vid, fourcc, fps, (w, h)) writer.write(annotated) if writer: writer.release() if cap: cap.release() # registrar métricas _record_metrics("video", counts) return {"video": out_vid, "classes": counts} # ─ Imagen ────────────────────────────────────────────────────── elif ext in [".jpg", ".jpeg", ".png", ".bmp"]: img = cv2.imread(media_path) results = model.predict(source=media_path, conf=conf, iou=iou, save=False, device=device) counts = {} # Dibujamos cajas manualmente y contamos for box in results[0].boxes: x1, y1, x2, y2 = map(int, box.xyxy[0]) cls_id = int(box.cls[0]) label = model.names[cls_id] counts[label] = counts.get(label, 0) + 1 # rectángulo cv2.rectangle(img, (x1, y1), (x2, y2), (0, 255, 0), 2) # texto cv2.putText(img, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2) if target_size: img = cv2.resize(img, target_size) out_path = os.path.join(tmpdir, "annotated.png") cv2.imwrite(out_path, img) # registrar métricas _record_metrics("image", counts) return {"path": out_path, "classes": counts} else: raise ValueError(f"Formato no soportado: {ext}") def show_classes(): """Devuelve las clases que el modelo conoce. Carga el modelo si no está listo.""" try: m = get_model() names = m.names if isinstance(names, dict): class_list = [names[k] for k in sorted(names)] else: class_list = names return ", ".join(class_list) except Exception: return "Model not loaded" # Funciones auxiliares para extraer el recurso de salida desde el dict def _extract_video(d): return (d.get("video") if isinstance(d, dict) else d) def _extract_path(d): return (d.get("path") if isinstance(d, dict) else d) def analyze_image_with_gpt(image_path, detections_summary=""): """ Analiza una imagen directamente con GPT-4 Vision para obtener observaciones visuales que el modelo YOLO podría haber perdido. """ try: GPTClass = _load_gptoss_wrapper() if not GPTClass: return "Análisis de IA no disponible (GPT wrapper no configurado)" # Construir prompt en español para análisis visual directo con GPT-4 Vision prompt = f"""Eres un experto en inspección de palas de aerogeneradores. Analiza visualmente esta imagen de una pala de aerogenerador y proporciona un análisis detallado en español. DETECCIONES AUTOMÁTICAS DEL MODELO YOLO: {detections_summary if detections_summary else "No se detectaron defectos automáticamente"} INSTRUCCIONES PARA TU ANÁLISIS VISUAL: Observa cuidadosamente la imagen y describe: 1. **Condición general de la superficie**: Color, textura, acabado, limpieza 2. **Borde de ataque (leading edge)**: Estado, erosión, daños, desgaste 3. **Borde de salida (trailing edge)**: Integridad, grietas, deformaciones 4. **Superficie principal**: Grietas, decoloración, impactos, reparaciones previas 5. **Elementos estructurales**: Uniones, tornillos, conexiones visibles 6. **Contaminación**: Suciedad, hielo, vegetación, residuos 7. **Daños específicos**: Impactos de rayos, aves, granizo, desgaste UV COMPARACIÓN CON DETECCIONES AUTOMÁTICAS: - Confirma o refuta las detecciones del modelo YOLO - Identifica defectos que YOLO pudo haber perdido - Evalúa la severidad de los defectos detectados CONTEXTO DE DEFECTOS COMUNES: - **Dirt/Suciedad**: Acumulación que reduce eficiencia aerodinámica - **Erosion**: Desgaste del borde de ataque por partículas - **Cracks/Grietas**: Fisuras estructurales críticas - **Lightning damage**: Daños por descargas eléctricas - **Ice**: Formación de hielo estacional - **Bird strikes**: Impactos de aves - **UV degradation**: Decoloración por radiación solar IMPORTANTE: - Responde SOLO en español - Describe específicamente lo que VES en la imagen - Sé preciso sobre ubicaciones (izquierda, derecha, centro, bordes) - Menciona colores, texturas, patrones específicos - Evalúa la severidad de cada problema observado Formato de respuesta: ## 🔍 Análisis Visual Directo de la Pala **Estado General:** [tu evaluación visual del estado] **Observaciones Específicas:** [describe detalladamente lo que ves en cada área] **Defectos Identificados Visualmente:** [lista específica de problemas que observas] **Comparación con Detección Automática:** [confirma/refuta/complementa las detecciones YOLO] **Severidad y Prioridades:** [evalúa qué problemas son más críticos] **Recomendaciones de Mantenimiento:** [acciones específicas basadas en lo observado] """ # Configurar modelo de visión vision_model_id = os.getenv("VISION_MODEL_ID", "Qwen/Qwen2-VL-7B-Instruct") model_id = os.getenv("MODEL_ID", vision_model_id) wrapper = GPTClass(model=model_id) # Intentar usar análisis de imágenes (GPT-4 Vision o Qwen2-VL) try: print(f"DEBUG: Intentando análisis de imagen con modelo: {model_id}") analysis = wrapper.analyze_image(image_path, prompt, max_tokens=1200, temperature=0.2) return analysis except RuntimeError as vision_error: # Si el análisis de visión no está disponible, usar análisis basado en características print(f"DEBUG: Análisis de visión no disponible: {vision_error}") return _fallback_technical_analysis(image_path, detections_summary, wrapper) except Exception as e: return f"Error en el análisis de IA: {str(e)}" def _fallback_technical_analysis(image_path, detections_summary, wrapper): """ Análisis de respaldo basado en características técnicas cuando GPT-4 Vision no está disponible. """ try: # Obtener características visuales básicas de la imagen visual_features = compute_visual_features(image_path, []) # Construir descripción técnica detallada technical_desc = "Análisis basado en características técnicas de la imagen:\n" if visual_features: brightness = visual_features.get("brightness", 0) contrast = visual_features.get("contrast", 0) blur = visual_features.get("blur", 0) dominant_rgb = visual_features.get("dominant_rgb", []) width = visual_features.get("width", 0) height = visual_features.get("height", 0) technical_desc += f"- Resolución: {width}x{height} píxeles\n" technical_desc += f"- Brillo promedio: {brightness:.1f}/255 " technical_desc += ("(imagen brillante)" if brightness > 130 else "(imagen tenue)" if brightness < 80 else "(iluminación normal)") technical_desc += f"\n- Contraste: {contrast:.1f} " technical_desc += ("(alto contraste)" if contrast > 60 else "(bajo contraste)" if contrast < 30 else "(contraste normal)") technical_desc += f"\n- Nitidez: {blur:.1f} " technical_desc += ("(imagen nítida)" if blur > 100 else "(imagen borrosa)") if dominant_rgb: technical_desc += f"\n- Color dominante: RGB{dominant_rgb}" # Interpretar colores dominantes r, g, b = dominant_rgb if r > 150 and g > 150 and b > 150: technical_desc += " (tonos claros/blancos - superficie limpia)" elif r < 100 and g < 100 and b < 100: technical_desc += " (tonos oscuros - posible suciedad o sombras)" elif r > g and r > b: technical_desc += " (tonos rojizos - posible oxidación)" elif g > r and g > b: technical_desc += " (tonos verdosos - posible vegetación/algas)" elif b > r and b > g: technical_desc += " (tonos azulados - superficie normal)" # Prompt modificado para análisis técnico fallback_prompt = f"""Eres un experto en inspección de palas de aerogeneradores. Basándote en los datos técnicos de la imagen y las detecciones automáticas, proporciona un análisis detallado en español. {technical_desc} DETECCIONES AUTOMÁTICAS DEL MODELO YOLO: {detections_summary if detections_summary else "No se detectaron defectos automáticamente"} NOTA: Este análisis se basa en características técnicas extraídas de la imagen ya que el análisis visual directo no está disponible. Proporciona un análisis experto interpretando estos datos técnicos en el contexto de inspección de palas de aerogeneradores. Formato de respuesta: ## 🔍 Análisis Técnico de la Pala **Estado General:** [evaluación basada en datos técnicos] **Interpretación de Características:** [qué indican los valores técnicos sobre la condición] **Análisis de Detecciones:** [interpretación de cada defecto detectado por YOLO] **Recomendaciones:** [acciones específicas recomendadas] """ analysis = wrapper.generate(fallback_prompt, max_tokens=800, temperature=0.3) return f"⚠️ **Análisis técnico** (análisis visual directo no disponible)\n\n{analysis}" except Exception as e: return f"Error en análisis de respaldo: {str(e)}" def _check_token(token: str): """Token gate for public app. Expected token via env APP_ACCESS_TOKEN or KESHERAT_TOKEN. Defaults to 'KESHERAT' if none provided. Returns visibility updates for [gate_group, app_group, gate_status].""" expected = os.getenv("APP_ACCESS_TOKEN") or os.getenv("KESHERAT_TOKEN") or "KESHERAT" ok = str(token or "").strip() == str(expected).strip() if ok: return gr.update(visible=False), gr.update(visible=True), gr.update(visible=False, value="") else: return gr.update(visible=True), gr.update(visible=False), gr.update(visible=True, value="Token inválido. Intenta nuevamente.") def compute_visual_features(image_path, detections=None): """Compute simple visual features and return a short description plus numeric metrics. Returns a dict with keys: - width, height - brightness (mean grayscale) - contrast (std grayscale) - blur (variance of Laplacian; lower = blurrier) - dominant_rgb (tuple) - object_count - avg_bbox_area - description (short natural language sentence) """ try: img = cv2.imread(image_path) if img is None: return {} h, w = img.shape[:2] gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) brightness = float(np.mean(gray)) contrast = float(np.std(gray)) lap = cv2.Laplacian(gray, cv2.CV_64F) blur = float(np.var(lap)) # Mean color as a simple dominant color proxy (convert BGR -> RGB) mean_bgr = cv2.mean(img)[:3] dominant_rgb = (int(mean_bgr[2]), int(mean_bgr[1]), int(mean_bgr[0])) obj_counts = 0 avg_bbox_area = 0.0 if detections: obj_counts = len(detections) areas = [] for d in detections: bbox = d.get("bbox", [0, 0, 0, 0]) try: area = max(0, (bbox[2] - bbox[0]) * (bbox[3] - bbox[1])) except Exception: area = 0 areas.append(area) if areas: avg_bbox_area = float(sum(areas) / len(areas)) # Human-friendly descriptors bright_desc = "bright" if brightness > 130 else ("dim" if brightness < 80 else "moderately lit") contrast_desc = "high contrast" if contrast > 60 else ("low contrast" if contrast < 30 else "moderate contrast") blur_desc = "blurry" if blur < 100 else "sharp" desc = f"Image appears {bright_desc}, with {contrast_desc}, and is {blur_desc}. Dominant color approx RGB{dominant_rgb}. Detected {obj_counts} objects in view." return { "width": w, "height": h, "brightness": brightness, "contrast": contrast, "blur": blur, "dominant_rgb": dominant_rgb, "object_count": obj_counts, "avg_bbox_area": avg_bbox_area, "description": desc, } except Exception: return {} # ──────────────────────────── # Helpers for multimodal reporting (PDF/MD/JSON) # ──────────────────────────── def _write_pdf(path: str, title: str, narrative: str, frames): """ Write a wrapped, layout-friendly PDF. This version increases margins, reduces font sizes, and wraps long lines to avoid cutting text. """ if REPORTLAB_AVAILABLE: c = canvas.Canvas(path, pagesize=A4) width, height = A4 margin = 60 y = height - margin # Fonts and sizes title_font = "Helvetica-Bold" body_font = "Helvetica" small_font = "Helvetica" title_size = 13 body_size = 9 small_size = 8 line_height = body_size * 1.18 small_line_height = small_size * 1.12 def wrap_text(text, font_size, max_width): approx_char_width = font_size * 0.55 max_chars = max(30, int(max_width / approx_char_width)) out = [] for para in str(text or "").splitlines(): wrapped = textwrap.wrap(para, width=max_chars) out.extend(wrapped if wrapped else [""]) return out # Title c.setFont(title_font, title_size) for tline in wrap_text(title, title_size, width - 2 * margin): if y < margin + title_size * 1.5: c.showPage() y = height - margin c.setFont(title_font, title_size) c.drawString(margin, y, tline) y -= title_size * 1.25 y -= 6 # Narrative c.setFont(body_font, body_size) for line in wrap_text(narrative or "", body_size, width - 2 * margin): if y < margin + line_height: c.showPage() y = height - margin c.setFont(body_font, body_size) c.drawString(margin, y, line) y -= line_height y -= 8 c.setFont("Helvetica-Bold", 11) if y < margin + 30: c.showPage() y = height - margin c.setFont("Helvetica-Bold", 11) c.drawString(margin, y, "Per-frame detections:") y -= 14 c.setFont(small_font, small_size) for f in frames: if y < margin + 90: c.showPage() y = height - margin c.setFont(small_font, small_size) c.drawString(margin, y, f"Frame {f.get('frame_index')}:") y -= small_line_height dets = f.get("detections", []) if not dets: if y < margin + small_line_height: c.showPage() y = height - margin c.setFont(small_font, small_size) c.drawString(margin + 12, y, "No detections") y -= small_line_height else: for d in dets: det_text = f"- {d.get('label')} | conf={d.get('confidence')} | bbox={d.get('bbox')}" text_max_width = width - 2 * margin - 140 for dl in wrap_text(det_text, small_size, text_max_width): if y < margin + small_line_height: c.showPage() y = height - margin c.setFont(small_font, small_size) c.drawString(margin + 12, y, dl) y -= small_line_height try: img_path = d.get("image") if img_path and os.path.exists(img_path): img_w = 110 img_h = 65 if y < margin + img_h + 20: c.showPage() y = height - margin c.setFont(small_font, small_size) x_img = width - margin - img_w y_img = y - img_h + 6 c.drawImage(img_path, x_img, y_img, width=img_w, height=img_h, preserveAspectRatio=True, mask='auto') crop_desc = None if isinstance(d.get("crop_visual"), dict): crop_desc = d["crop_visual"].get("description") if crop_desc: cd_lines = wrap_text(crop_desc, small_size, img_w) text_y = y_img - 12 for cd in cd_lines: if text_y < margin + 20: c.showPage() y = height - margin text_y = y - img_h - 12 c.setFont(small_font, small_size) c.drawString(x_img, text_y, cd) text_y -= small_line_height y = y - img_h - 8 except Exception: pass c.save() return # Fallback plain-text write if ReportLab unavailable with open(path, "w", encoding="utf-8") as f: f.write(title + "\n\n") f.write((narrative or "") + "\n\n") f.write("Per-frame detections:\n") for fr in frames: f.write(f"Frame {fr.get('frame_index')}:\n") dets = fr.get("detections", []) if not dets: f.write(" No detections\n") else: for d in dets: f.write(f" - {d}\n") def _load_gptoss_wrapper(): """ Load the blade-inspection-demo/gptoss_wrapper.py module by filepath so we don't rely on package imports. """ try: base = os.path.dirname(__file__) wrapper_path = os.path.join(base, "blade-inspection-demo", "gptoss_wrapper.py") if not os.path.exists(wrapper_path): # fallback: maybe file already at project root wrapper_path = os.path.join(base, "gptoss_wrapper.py") spec = importlib.util.spec_from_file_location("gptoss_wrapper", wrapper_path) module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) return getattr(module, "GPTOSSWrapper", None) except Exception as e: # Print diagnostic info to Space logs so we can see why the wrapper failed to import. print(f"DEBUG: failed to load GPT wrapper from {wrapper_path}: {e}") import traceback traceback.print_exc() return None def _build_prompt(frames): """ Build a compact prompt that summarizes the entire video while keeping prompt size bounded. We include: - video-level totals (frames, total detections, counts per class) - a concise list of frames that contain detections (frame index + short det summary) - an optional compact aggregate of visual metrics for the whole video The detailed per-frame visual descriptions remain in the report files (MD/PDF/JSON) but are not expanded fully in the prompt to avoid token limits. """ # Configs (env vars) try: max_prompt_frames = int(os.getenv("MAX_PROMPT_FRAMES", "200")) except Exception: max_prompt_frames = 200 total_frames = len(frames) total_detections = sum(len(f.get("detections", [])) for f in frames) # Aggregate counts per label and collect frames with detections counts = {} frames_with_dets = [] for f in frames: dets = f.get("detections", []) if dets: frames_with_dets.append(f) for d in dets: counts[d.get("label")] = counts.get(d.get("label"), 0) + 1 lines = [] lines.append("You are an expert inspection assistant for wind turbine blade images/videos.") lines.append(f"This video contains {total_frames} frames and {total_detections} total detections.") if counts: lines.append("Total detections by class: " + ", ".join([f"{k}({v})" for k, v in counts.items()])) else: lines.append("No detections were found in analyzed frames.") lines.append("") lines.append("Instructions: Based on the aggregate information and the selected frame summaries below, produce a concise inspection report that includes:") lines.append("- Summary of main findings") lines.append("- Suggested severity (low/medium/high) when appropriate") lines.append("- Recommended next steps for inspection/repair") lines.append("") # Include up to max_prompt_frames frames that have detections (prioritize them) include_list = frames_with_dets[:max_prompt_frames] lines.append(f"Included frame summaries (showing frames with detections, up to {max_prompt_frames} entries):") if not include_list: lines.append("No frames with detections to list (video may be clear or detections are below threshold).") else: for f in include_list: fid = f.get("frame_index") dets = f.get("detections", []) det_texts = [] for d in dets: conf = d.get("confidence") conf_s = f"{conf:.2f}" if isinstance(conf, float) else str(conf) det_texts.append(f"{d.get('label')}({conf_s})") # compact visual metrics (if present) visual = f.get("visual") or {} metric_parts = [] if visual.get("brightness") is not None: metric_parts.append(f"bright={visual['brightness']:.0f}") if visual.get("contrast") is not None: metric_parts.append(f"contrast={visual['contrast']:.0f}") if visual.get("blur") is not None: metric_parts.append(f"blur_var={visual['blur']:.0f}") if visual.get("dominant_rgb"): metric_parts.append(f"dominant_rgb={visual['dominant_rgb']}") metrics = "; ".join(metric_parts) if metrics: lines.append(f"Frame {fid}: " + ", ".join(det_texts) + f" [{metrics}]") else: lines.append(f"Frame {fid}: " + ", ".join(det_texts)) lines.append("") lines.append("NOTE: Full per-frame visual descriptions and images are attached in the generated report files. If you need a fully exhaustive token-by-token per-frame prompt, set FULL_FRAME_REPORT and increase MAX_PROMPT_FRAMES (may exceed model token limits).") lines.append("") lines.append("Produce the report in plain text, 6-10 short paragraphs. Also include 1-2 short sentences summarizing why the listed frames are noteworthy (e.g., what the detection likely means).") return "\n".join(lines) @GPU_DECORATOR def generar_analisis_fuerte(media_path): """Generate strong analysis (PDF/MD/JSON) from a given media file path.""" if not media_path: return {"status": "no_input", "report_pdf": None, "report_md": None, "report_json": None} # Ensure the model is loaded inside the worker process global model model = get_model() tmpdir = tempfile.mkdtemp() frames = [] try: ext = os.path.splitext(media_path)[1].lower() # attempt to extract up to 3 frames/detections using the loaded YOLO model if ext in [".mp4", ".mov", ".avi", ".mkv"]: cap = cv2.VideoCapture(media_path) idx = 0 # Process all frames in the video. This may be expensive for long videos. # To limit processing, set the environment variable MAX_FRAMES to a positive integer. max_frames_env = os.getenv("MAX_FRAMES", "0") try: max_frames = int(max_frames_env) except Exception: max_frames = 0 if max_frames > 0: print(f"DEBUG: processing up to {max_frames} frames (MAX_FRAMES set)") else: print("DEBUG: processing all video frames for strong analysis (may be slow)...") # Sampling: process only every FRAME_STEP-th frame to reduce GPU load. try: frame_step = int(os.getenv("FRAME_STEP", "5")) if frame_step < 1: frame_step = 1 except Exception: frame_step = 5 while True: ret, frame = cap.read() if not ret: break # Save every frame image to disk (keeps consistent indexing) but only run # detection on sampled frames to lower compute usage. tmpf = os.path.join(tmpdir, f"frame_{idx}.jpg") cv2.imwrite(tmpf, frame) if idx % frame_step == 0: # Run detection on sampled frame results = model.predict(source=tmpf, conf=0.25, iou=0.45) dets = [] if results and len(results) > 0: det_i = 0 full_img = cv2.imread(tmpf) h_full, w_full = (full_img.shape[:2] if full_img is not None else (0, 0)) for box in results[0].boxes: try: cls_id = int(box.cls[0]) label = model.names[cls_id] except Exception: label = "object" try: x1, y1, x2, y2 = map(int, box.xyxy[0]) except Exception: x1 = y1 = x2 = y2 = 0 try: confv = float(box.conf[0]) except Exception: confv = None det = {"label": label, "confidence": confv, "bbox": [x1, y1, x2, y2]} # Save cropped detection image if possible try: if full_img is not None and x2 > x1 and y2 > y1: # clamp coords x1c = max(0, min(x1, w_full - 1)) x2c = max(0, min(x2, w_full)) y1c = max(0, min(y1, h_full - 1)) y2c = max(0, min(y2, h_full)) if x2c > x1c and y2c > y1c: crop = full_img[y1c:y2c, x1c:x2c] crop_path = os.path.join(tmpdir, f"frame_{idx}_det_{det_i}.jpg") cv2.imwrite(crop_path, crop) det["image"] = crop_path # compute visual features for the crop and attach det["crop_visual"] = compute_visual_features(crop_path, [det]) except Exception: pass dets.append(det) det_i += 1 # Compute simple visual features for this saved frame visual = compute_visual_features(tmpf, dets) frames.append({"frame_index": idx, "detections": dets, "visual": visual, "image_path": tmpf}) else: # Non-sampled frame: still compute a cheap visual summary (no detections) visual = compute_visual_features(tmpf, []) frames.append({"frame_index": idx, "detections": [], "visual": visual, "image_path": tmpf}) idx += 1 if max_frames > 0 and idx >= max_frames: break cap.release() else: # single image results = model.predict(source=media_path, conf=0.25, iou=0.45) dets = [] if results and len(results) > 0: full_img = cv2.imread(media_path) h_full, w_full = (full_img.shape[:2] if full_img is not None else (0, 0)) det_i = 0 for box in results[0].boxes: try: cls_id = int(box.cls[0]) label = model.names[cls_id] except Exception: label = "object" try: x1, y1, x2, y2 = map(int, box.xyxy[0]) except Exception: x1 = y1 = x2 = y2 = 0 try: confv = float(box.conf[0]) except Exception: confv = None det = {"label": label, "confidence": confv, "bbox": [x1, y1, x2, y2]} # Save cropped detection image if possible try: if full_img is not None and x2 > x1 and y2 > y1: x1c = max(0, min(x1, w_full - 1)) x2c = max(0, min(x2, w_full)) y1c = max(0, min(y1, h_full - 1)) y2c = max(0, min(y2, h_full)) if x2c > x1c and y2c > y1c: crop = full_img[y1c:y2c, x1c:x2c] crop_path = os.path.join(tmpdir, f"frame_0_det_{det_i}.jpg") cv2.imwrite(crop_path, crop) det["image"] = crop_path det["crop_visual"] = compute_visual_features(crop_path, [det]) except Exception: pass dets.append(det) det_i += 1 # Compute visual features for single image visual = compute_visual_features(media_path, dets) frames.append({"frame_index": 0, "detections": dets, "visual": visual, "image_path": media_path}) prompt = _build_prompt(frames) GPTClass = _load_gptoss_wrapper() narrative = None if GPTClass: try: # Allow overriding model via env var MODEL_ID (e.g. "openai/gpt-oss-120b:fireworks-ai") model_id = os.getenv("MODEL_ID", "gpt-oss-120") print(f"DEBUG: [gpt] using model_id={model_id}, HF_USE_ROUTER={os.getenv('HF_USE_ROUTER')}") wrapper = GPTClass(model=model_id) # DEBUG: print prompt (truncated) so Space logs show the request try: print("DEBUG: [gpt] sending prompt (truncated 2000 chars):") print(prompt[:2000]) except Exception: print("DEBUG: [gpt] (failed to print prompt)") narrative = wrapper.generate(prompt) # DEBUG: print a truncated portion of the response try: print("DEBUG: [gpt] response (truncated 2000 chars):") print((narrative or "")[:2000]) except Exception: print("DEBUG: [gpt] (failed to print response)") except Exception as e: narrative = f"(GPT call failed) {e}" print("DEBUG: [gpt] call failed:", e) else: narrative = "(GPT wrapper unavailable) Fallback summary:\n" counts = {} for f in frames: for d in f.get("detections", []): counts[d["label"]] = counts.get(d["label"], 0) + 1 narrative += "Detected classes: " + ", ".join([f"{k}({v})" for k, v in counts.items()]) if counts else "No detections" # Write Markdown report_md = os.path.join(tmpdir, "report.md") with open(report_md, "w", encoding="utf-8") as md: md.write("# Informe de inspección (Generar analisis fuerte)\n\n") md.write(narrative or "Sin narrativa disponible.\n\n") md.write("\n## Per-frame detections\n\n") for f in frames: fid = f.get("frame_index") md.write(f"- Frame {fid}:\n") dets = f.get("detections", []) if not dets: md.write(" No detections\n") else: for i, d in enumerate(dets): md.write(f" - {d.get('label')}({d.get('confidence')}) bbox={d.get('bbox')}\n") if d.get("image"): # Embed the cropped detection image md.write(f" })\n") # Add crop visual description if available cviz = d.get("crop_visual") if cviz and cviz.get("description"): md.write(f" Description: {cviz.get('description')}\n") # Write JSON report_json = os.path.join(tmpdir, "report.json") with open(report_json, "w", encoding="utf-8") as jf: json.dump({"narrative": narrative, "frames": frames}, jf, indent=2) # Write PDF report_pdf = os.path.join(tmpdir, "report.pdf") _write_pdf(report_pdf, "Informe de inspección - Generar analisis fuerte", narrative, frames) return {"status": "done", "report_pdf": report_pdf, "report_md": report_md, "report_json": report_json} except Exception as e: return {"status": f"error: {e}", "report_pdf": None, "report_md": None, "report_json": None} # ──────────────────────────── with gr.Blocks( title="KESHERAT AI", theme=gr.themes.Soft(), ) as demo: gr.HTML("""
KESHERAT AI