Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import tempfile | |
| import json | |
| import shutil | |
| import os | |
| import cv2 | |
| import numpy as np | |
| import torch | |
| import importlib | |
| import requests | |
| import textwrap | |
| # Optional PDF reporting: import reportlab safely and set a flag. | |
| # REPORTLAB_AVAILABLE will be used by _write_pdf to select the PDF code path. | |
| try: | |
| from reportlab.lib.pagesizes import A4 | |
| from reportlab.pdfgen import canvas | |
| REPORTLAB_AVAILABLE = True | |
| except Exception: | |
| REPORTLAB_AVAILABLE = False | |
| # ZeroGPU: decorador para marcar funciones GPU. Fallback local si no existe | |
| try: | |
| import spaces # provisto en HF Spaces | |
| GPU_DECORATOR = spaces.GPU | |
| except Exception: | |
| def GPU_DECORATOR(func=None, **kwargs): | |
| # Permite usar @GPU_DECORATOR o @GPU_DECORATOR(...) | |
| if func is None: | |
| def wrap(f): | |
| return f | |
| return wrap | |
| return func | |
| # ──────────────────────────── | |
| # Configuración | |
| # ──────────────────────────── | |
| os.environ["OMP_NUM_THREADS"] = "1" # evita warnings de OpenMP | |
| # Do NOT initialize the model at import time in Spaces with Stateless GPU. | |
| # Lazily load the YOLO model inside worker processes (functions decorated | |
| # with @GPU_DECORATOR) to avoid initializing CUDA in the main process. | |
| model = None | |
| def get_model(): | |
| """Lazily load and return the YOLO model. | |
| Important: call this from inside worker processes (e.g. functions decorated | |
| with @GPU_DECORATOR) so CUDA is initialized in the worker and not in the | |
| main process (required for HF Spaces with Stateless GPU). | |
| """ | |
| global model | |
| if model is None: | |
| # Import YOLO inside the function to avoid importing ultralytics (which | |
| # may touch CUDA) at module import time. | |
| from ultralytics import YOLO | |
| model = YOLO("best2.pt") | |
| return model | |
| # ──────────────────────────── | |
| # Métricas simples (persistidas en /tmp) | |
| # ──────────────────────────── | |
| METRICS_PATH = os.path.join(tempfile.gettempdir(), "blade_metrics.json") | |
| def _load_metrics(): | |
| try: | |
| if os.path.exists(METRICS_PATH): | |
| with open(METRICS_PATH, "r", encoding="utf-8") as f: | |
| return json.load(f) | |
| except Exception: | |
| pass | |
| return { | |
| "total_jobs": 0, | |
| "videos": 0, | |
| "images": 0, | |
| "detections_total": 0, | |
| "per_label": {}, | |
| "last_job": None, | |
| } | |
| def _save_metrics(m): | |
| try: | |
| with open(METRICS_PATH, "w", encoding="utf-8") as f: | |
| json.dump(m, f, ensure_ascii=False, indent=2) | |
| except Exception: | |
| pass | |
| def _record_metrics(job_type, counts): | |
| m = _load_metrics() | |
| m["total_jobs"] += 1 | |
| if job_type == "video": | |
| m["videos"] += 1 | |
| elif job_type == "image": | |
| m["images"] += 1 | |
| dets = int(sum(counts.values())) if isinstance(counts, dict) else 0 | |
| m["detections_total"] += dets | |
| # per label aggregate | |
| if isinstance(counts, dict): | |
| per = m.get("per_label", {}) | |
| for k, v in counts.items(): | |
| per[k] = int(per.get(k, 0)) + int(v) | |
| m["per_label"] = per | |
| m["last_job"] = {"type": job_type, "detections": dets} | |
| _save_metrics(m) | |
| def get_metrics(): | |
| """Devuelve el snapshot actual de métricas.""" | |
| return _load_metrics() | |
| # ──────────────────────────── | |
| # Funciones de Inferencia | |
| # ──────────────────────────── | |
| def infer_media(media_path, conf=0.25, iou=0.45, out_res="720p", preset="default"): | |
| """ | |
| Procesa un fichero de vídeo o imagen con parámetros configurables. | |
| Retornos: | |
| - Vídeo: {"video": out_vid_path, "classes": {label: count, ...}} | |
| - Imagen: {"path": out_img_path, "classes": {label: count, ...}} | |
| """ | |
| if not media_path: | |
| # Si no hay entrada (p.ej., se pulsó el botón en la otra pestaña), no fallar. | |
| return {} | |
| # Ensure the model is loaded inside the worker process (do not load at import time) | |
| global model | |
| model = get_model() | |
| # Dispositivo: usar GPU si está disponible (ZeroGPU) | |
| device = 0 if torch.cuda.is_available() else "cpu" | |
| use_half = device != "cpu" | |
| ext = os.path.splitext(media_path)[1].lower() | |
| tmpdir = tempfile.mkdtemp() | |
| # Resolución objetivo | |
| res_map = {"360p": (640, 360), "480p": (854, 480), "720p": (1280, 720)} | |
| target_size = res_map.get(out_res) | |
| # ─ Vídeo ─────────────────────────────────────────────────────── | |
| if ext in [".mp4", ".mov", ".avi", ".mkv"]: | |
| in_vid = os.path.join(tmpdir, "in.mp4") | |
| out_vid = os.path.join(tmpdir, "out.mp4") | |
| shutil.copy(media_path, in_vid) | |
| # FPS del vídeo (opcional: tomar real si existe) | |
| cap = cv2.VideoCapture(in_vid) | |
| fps = cap.get(cv2.CAP_PROP_FPS) or 30 | |
| try: | |
| fps = float(fps) | |
| if fps <= 0 or fps != fps: # NaN check | |
| fps = 30 | |
| except Exception: | |
| fps = 30 | |
| writer = None | |
| counts = {} | |
| # Streaming de frames con anotaciones y conteo por clase | |
| results = model.predict(source=in_vid, conf=conf, iou=iou, stream=True, device=device) | |
| for r in results: | |
| # acumular conteos | |
| for b in r.boxes: | |
| label = model.names[int(b.cls[0])] | |
| counts[label] = counts.get(label, 0) + 1 | |
| annotated = r.plot() # frame anotado | |
| if target_size: | |
| annotated = cv2.resize(annotated, target_size) | |
| if writer is None: | |
| h, w = annotated.shape[:2] | |
| fourcc = cv2.VideoWriter_fourcc(*"mp4v") | |
| writer = cv2.VideoWriter(out_vid, fourcc, fps, (w, h)) | |
| writer.write(annotated) | |
| if writer: | |
| writer.release() | |
| if cap: | |
| cap.release() | |
| # registrar métricas | |
| _record_metrics("video", counts) | |
| return {"video": out_vid, "classes": counts} | |
| # ─ Imagen ────────────────────────────────────────────────────── | |
| elif ext in [".jpg", ".jpeg", ".png", ".bmp"]: | |
| img = cv2.imread(media_path) | |
| results = model.predict(source=media_path, conf=conf, iou=iou, save=False, device=device) | |
| counts = {} | |
| # Dibujamos cajas manualmente y contamos | |
| for box in results[0].boxes: | |
| x1, y1, x2, y2 = map(int, box.xyxy[0]) | |
| cls_id = int(box.cls[0]) | |
| label = model.names[cls_id] | |
| counts[label] = counts.get(label, 0) + 1 | |
| # rectángulo | |
| cv2.rectangle(img, (x1, y1), (x2, y2), (0, 255, 0), 2) | |
| # texto | |
| cv2.putText(img, label, (x1, y1 - 10), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2) | |
| if target_size: | |
| img = cv2.resize(img, target_size) | |
| out_path = os.path.join(tmpdir, "annotated.png") | |
| cv2.imwrite(out_path, img) | |
| # registrar métricas | |
| _record_metrics("image", counts) | |
| return {"path": out_path, "classes": counts} | |
| else: | |
| raise ValueError(f"Formato no soportado: {ext}") | |
| def show_classes(): | |
| """Devuelve las clases que el modelo conoce. Carga el modelo si no está listo.""" | |
| try: | |
| m = get_model() | |
| names = m.names | |
| if isinstance(names, dict): | |
| class_list = [names[k] for k in sorted(names)] | |
| else: | |
| class_list = names | |
| return ", ".join(class_list) | |
| except Exception: | |
| return "Model not loaded" | |
| # Funciones auxiliares para extraer el recurso de salida desde el dict | |
| def _extract_video(d): | |
| return (d.get("video") if isinstance(d, dict) else d) | |
| def _extract_path(d): | |
| return (d.get("path") if isinstance(d, dict) else d) | |
| def analyze_image_with_gpt(image_path, detections_summary=""): | |
| """ | |
| Analiza una imagen directamente con GPT-4 Vision para obtener observaciones visuales | |
| que el modelo YOLO podría haber perdido. | |
| """ | |
| try: | |
| GPTClass = _load_gptoss_wrapper() | |
| if not GPTClass: | |
| return "Análisis de IA no disponible (GPT wrapper no configurado)" | |
| # Construir prompt en español para análisis visual directo con GPT-4 Vision | |
| prompt = f"""Eres un experto en inspección de palas de aerogeneradores. Analiza visualmente esta imagen de una pala de aerogenerador y proporciona un análisis detallado en español. | |
| DETECCIONES AUTOMÁTICAS DEL MODELO YOLO: | |
| {detections_summary if detections_summary else "No se detectaron defectos automáticamente"} | |
| INSTRUCCIONES PARA TU ANÁLISIS VISUAL: | |
| Observa cuidadosamente la imagen y describe: | |
| 1. **Condición general de la superficie**: Color, textura, acabado, limpieza | |
| 2. **Borde de ataque (leading edge)**: Estado, erosión, daños, desgaste | |
| 3. **Borde de salida (trailing edge)**: Integridad, grietas, deformaciones | |
| 4. **Superficie principal**: Grietas, decoloración, impactos, reparaciones previas | |
| 5. **Elementos estructurales**: Uniones, tornillos, conexiones visibles | |
| 6. **Contaminación**: Suciedad, hielo, vegetación, residuos | |
| 7. **Daños específicos**: Impactos de rayos, aves, granizo, desgaste UV | |
| COMPARACIÓN CON DETECCIONES AUTOMÁTICAS: | |
| - Confirma o refuta las detecciones del modelo YOLO | |
| - Identifica defectos que YOLO pudo haber perdido | |
| - Evalúa la severidad de los defectos detectados | |
| CONTEXTO DE DEFECTOS COMUNES: | |
| - **Dirt/Suciedad**: Acumulación que reduce eficiencia aerodinámica | |
| - **Erosion**: Desgaste del borde de ataque por partículas | |
| - **Cracks/Grietas**: Fisuras estructurales críticas | |
| - **Lightning damage**: Daños por descargas eléctricas | |
| - **Ice**: Formación de hielo estacional | |
| - **Bird strikes**: Impactos de aves | |
| - **UV degradation**: Decoloración por radiación solar | |
| IMPORTANTE: | |
| - Responde SOLO en español | |
| - Describe específicamente lo que VES en la imagen | |
| - Sé preciso sobre ubicaciones (izquierda, derecha, centro, bordes) | |
| - Menciona colores, texturas, patrones específicos | |
| - Evalúa la severidad de cada problema observado | |
| Formato de respuesta: | |
| ## 🔍 Análisis Visual Directo de la Pala | |
| **Estado General:** [tu evaluación visual del estado] | |
| **Observaciones Específicas:** | |
| [describe detalladamente lo que ves en cada área] | |
| **Defectos Identificados Visualmente:** | |
| [lista específica de problemas que observas] | |
| **Comparación con Detección Automática:** | |
| [confirma/refuta/complementa las detecciones YOLO] | |
| **Severidad y Prioridades:** | |
| [evalúa qué problemas son más críticos] | |
| **Recomendaciones de Mantenimiento:** | |
| [acciones específicas basadas en lo observado] | |
| """ | |
| # Configurar modelo de visión | |
| vision_model_id = os.getenv("VISION_MODEL_ID", "Qwen/Qwen2-VL-7B-Instruct") | |
| model_id = os.getenv("MODEL_ID", vision_model_id) | |
| wrapper = GPTClass(model=model_id) | |
| # Intentar usar análisis de imágenes (GPT-4 Vision o Qwen2-VL) | |
| try: | |
| print(f"DEBUG: Intentando análisis de imagen con modelo: {model_id}") | |
| analysis = wrapper.analyze_image(image_path, prompt, max_tokens=1200, temperature=0.2) | |
| return analysis | |
| except RuntimeError as vision_error: | |
| # Si el análisis de visión no está disponible, usar análisis basado en características | |
| print(f"DEBUG: Análisis de visión no disponible: {vision_error}") | |
| return _fallback_technical_analysis(image_path, detections_summary, wrapper) | |
| except Exception as e: | |
| return f"Error en el análisis de IA: {str(e)}" | |
| def _fallback_technical_analysis(image_path, detections_summary, wrapper): | |
| """ | |
| Análisis de respaldo basado en características técnicas cuando GPT-4 Vision no está disponible. | |
| """ | |
| try: | |
| # Obtener características visuales básicas de la imagen | |
| visual_features = compute_visual_features(image_path, []) | |
| # Construir descripción técnica detallada | |
| technical_desc = "Análisis basado en características técnicas de la imagen:\n" | |
| if visual_features: | |
| brightness = visual_features.get("brightness", 0) | |
| contrast = visual_features.get("contrast", 0) | |
| blur = visual_features.get("blur", 0) | |
| dominant_rgb = visual_features.get("dominant_rgb", []) | |
| width = visual_features.get("width", 0) | |
| height = visual_features.get("height", 0) | |
| technical_desc += f"- Resolución: {width}x{height} píxeles\n" | |
| technical_desc += f"- Brillo promedio: {brightness:.1f}/255 " | |
| technical_desc += ("(imagen brillante)" if brightness > 130 else "(imagen tenue)" if brightness < 80 else "(iluminación normal)") | |
| technical_desc += f"\n- Contraste: {contrast:.1f} " | |
| technical_desc += ("(alto contraste)" if contrast > 60 else "(bajo contraste)" if contrast < 30 else "(contraste normal)") | |
| technical_desc += f"\n- Nitidez: {blur:.1f} " | |
| technical_desc += ("(imagen nítida)" if blur > 100 else "(imagen borrosa)") | |
| if dominant_rgb: | |
| technical_desc += f"\n- Color dominante: RGB{dominant_rgb}" | |
| # Interpretar colores dominantes | |
| r, g, b = dominant_rgb | |
| if r > 150 and g > 150 and b > 150: | |
| technical_desc += " (tonos claros/blancos - superficie limpia)" | |
| elif r < 100 and g < 100 and b < 100: | |
| technical_desc += " (tonos oscuros - posible suciedad o sombras)" | |
| elif r > g and r > b: | |
| technical_desc += " (tonos rojizos - posible oxidación)" | |
| elif g > r and g > b: | |
| technical_desc += " (tonos verdosos - posible vegetación/algas)" | |
| elif b > r and b > g: | |
| technical_desc += " (tonos azulados - superficie normal)" | |
| # Prompt modificado para análisis técnico | |
| fallback_prompt = f"""Eres un experto en inspección de palas de aerogeneradores. Basándote en los datos técnicos de la imagen y las detecciones automáticas, proporciona un análisis detallado en español. | |
| {technical_desc} | |
| DETECCIONES AUTOMÁTICAS DEL MODELO YOLO: | |
| {detections_summary if detections_summary else "No se detectaron defectos automáticamente"} | |
| NOTA: Este análisis se basa en características técnicas extraídas de la imagen ya que el análisis visual directo no está disponible. | |
| Proporciona un análisis experto interpretando estos datos técnicos en el contexto de inspección de palas de aerogeneradores. | |
| Formato de respuesta: | |
| ## 🔍 Análisis Técnico de la Pala | |
| **Estado General:** [evaluación basada en datos técnicos] | |
| **Interpretación de Características:** | |
| [qué indican los valores técnicos sobre la condición] | |
| **Análisis de Detecciones:** | |
| [interpretación de cada defecto detectado por YOLO] | |
| **Recomendaciones:** | |
| [acciones específicas recomendadas] | |
| """ | |
| analysis = wrapper.generate(fallback_prompt, max_tokens=800, temperature=0.3) | |
| return f"⚠️ **Análisis técnico** (análisis visual directo no disponible)\n\n{analysis}" | |
| except Exception as e: | |
| return f"Error en análisis de respaldo: {str(e)}" | |
| def _check_token(token: str): | |
| """Token gate for public app. Expected token via env APP_ACCESS_TOKEN or KESHERAT_TOKEN. | |
| Defaults to 'KESHERAT' if none provided. | |
| Returns visibility updates for [gate_group, app_group, gate_status].""" | |
| expected = os.getenv("APP_ACCESS_TOKEN") or os.getenv("KESHERAT_TOKEN") or "KESHERAT" | |
| ok = str(token or "").strip() == str(expected).strip() | |
| if ok: | |
| return gr.update(visible=False), gr.update(visible=True), gr.update(visible=False, value="") | |
| else: | |
| return gr.update(visible=True), gr.update(visible=False), gr.update(visible=True, value="Token inválido. Intenta nuevamente.") | |
| def compute_visual_features(image_path, detections=None): | |
| """Compute simple visual features and return a short description plus numeric metrics. | |
| Returns a dict with keys: | |
| - width, height | |
| - brightness (mean grayscale) | |
| - contrast (std grayscale) | |
| - blur (variance of Laplacian; lower = blurrier) | |
| - dominant_rgb (tuple) | |
| - object_count | |
| - avg_bbox_area | |
| - description (short natural language sentence) | |
| """ | |
| try: | |
| img = cv2.imread(image_path) | |
| if img is None: | |
| return {} | |
| h, w = img.shape[:2] | |
| gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) | |
| brightness = float(np.mean(gray)) | |
| contrast = float(np.std(gray)) | |
| lap = cv2.Laplacian(gray, cv2.CV_64F) | |
| blur = float(np.var(lap)) | |
| # Mean color as a simple dominant color proxy (convert BGR -> RGB) | |
| mean_bgr = cv2.mean(img)[:3] | |
| dominant_rgb = (int(mean_bgr[2]), int(mean_bgr[1]), int(mean_bgr[0])) | |
| obj_counts = 0 | |
| avg_bbox_area = 0.0 | |
| if detections: | |
| obj_counts = len(detections) | |
| areas = [] | |
| for d in detections: | |
| bbox = d.get("bbox", [0, 0, 0, 0]) | |
| try: | |
| area = max(0, (bbox[2] - bbox[0]) * (bbox[3] - bbox[1])) | |
| except Exception: | |
| area = 0 | |
| areas.append(area) | |
| if areas: | |
| avg_bbox_area = float(sum(areas) / len(areas)) | |
| # Human-friendly descriptors | |
| bright_desc = "bright" if brightness > 130 else ("dim" if brightness < 80 else "moderately lit") | |
| contrast_desc = "high contrast" if contrast > 60 else ("low contrast" if contrast < 30 else "moderate contrast") | |
| blur_desc = "blurry" if blur < 100 else "sharp" | |
| desc = f"Image appears {bright_desc}, with {contrast_desc}, and is {blur_desc}. Dominant color approx RGB{dominant_rgb}. Detected {obj_counts} objects in view." | |
| return { | |
| "width": w, | |
| "height": h, | |
| "brightness": brightness, | |
| "contrast": contrast, | |
| "blur": blur, | |
| "dominant_rgb": dominant_rgb, | |
| "object_count": obj_counts, | |
| "avg_bbox_area": avg_bbox_area, | |
| "description": desc, | |
| } | |
| except Exception: | |
| return {} | |
| # ──────────────────────────── | |
| # Helpers for multimodal reporting (PDF/MD/JSON) | |
| # ──────────────────────────── | |
| def _write_pdf(path: str, title: str, narrative: str, frames): | |
| """ | |
| Write a wrapped, layout-friendly PDF. This version increases margins, | |
| reduces font sizes, and wraps long lines to avoid cutting text. | |
| """ | |
| if REPORTLAB_AVAILABLE: | |
| c = canvas.Canvas(path, pagesize=A4) | |
| width, height = A4 | |
| margin = 60 | |
| y = height - margin | |
| # Fonts and sizes | |
| title_font = "Helvetica-Bold" | |
| body_font = "Helvetica" | |
| small_font = "Helvetica" | |
| title_size = 13 | |
| body_size = 9 | |
| small_size = 8 | |
| line_height = body_size * 1.18 | |
| small_line_height = small_size * 1.12 | |
| def wrap_text(text, font_size, max_width): | |
| approx_char_width = font_size * 0.55 | |
| max_chars = max(30, int(max_width / approx_char_width)) | |
| out = [] | |
| for para in str(text or "").splitlines(): | |
| wrapped = textwrap.wrap(para, width=max_chars) | |
| out.extend(wrapped if wrapped else [""]) | |
| return out | |
| # Title | |
| c.setFont(title_font, title_size) | |
| for tline in wrap_text(title, title_size, width - 2 * margin): | |
| if y < margin + title_size * 1.5: | |
| c.showPage() | |
| y = height - margin | |
| c.setFont(title_font, title_size) | |
| c.drawString(margin, y, tline) | |
| y -= title_size * 1.25 | |
| y -= 6 | |
| # Narrative | |
| c.setFont(body_font, body_size) | |
| for line in wrap_text(narrative or "", body_size, width - 2 * margin): | |
| if y < margin + line_height: | |
| c.showPage() | |
| y = height - margin | |
| c.setFont(body_font, body_size) | |
| c.drawString(margin, y, line) | |
| y -= line_height | |
| y -= 8 | |
| c.setFont("Helvetica-Bold", 11) | |
| if y < margin + 30: | |
| c.showPage() | |
| y = height - margin | |
| c.setFont("Helvetica-Bold", 11) | |
| c.drawString(margin, y, "Per-frame detections:") | |
| y -= 14 | |
| c.setFont(small_font, small_size) | |
| for f in frames: | |
| if y < margin + 90: | |
| c.showPage() | |
| y = height - margin | |
| c.setFont(small_font, small_size) | |
| c.drawString(margin, y, f"Frame {f.get('frame_index')}:") | |
| y -= small_line_height | |
| dets = f.get("detections", []) | |
| if not dets: | |
| if y < margin + small_line_height: | |
| c.showPage() | |
| y = height - margin | |
| c.setFont(small_font, small_size) | |
| c.drawString(margin + 12, y, "No detections") | |
| y -= small_line_height | |
| else: | |
| for d in dets: | |
| det_text = f"- {d.get('label')} | conf={d.get('confidence')} | bbox={d.get('bbox')}" | |
| text_max_width = width - 2 * margin - 140 | |
| for dl in wrap_text(det_text, small_size, text_max_width): | |
| if y < margin + small_line_height: | |
| c.showPage() | |
| y = height - margin | |
| c.setFont(small_font, small_size) | |
| c.drawString(margin + 12, y, dl) | |
| y -= small_line_height | |
| try: | |
| img_path = d.get("image") | |
| if img_path and os.path.exists(img_path): | |
| img_w = 110 | |
| img_h = 65 | |
| if y < margin + img_h + 20: | |
| c.showPage() | |
| y = height - margin | |
| c.setFont(small_font, small_size) | |
| x_img = width - margin - img_w | |
| y_img = y - img_h + 6 | |
| c.drawImage(img_path, x_img, y_img, width=img_w, height=img_h, preserveAspectRatio=True, mask='auto') | |
| crop_desc = None | |
| if isinstance(d.get("crop_visual"), dict): | |
| crop_desc = d["crop_visual"].get("description") | |
| if crop_desc: | |
| cd_lines = wrap_text(crop_desc, small_size, img_w) | |
| text_y = y_img - 12 | |
| for cd in cd_lines: | |
| if text_y < margin + 20: | |
| c.showPage() | |
| y = height - margin | |
| text_y = y - img_h - 12 | |
| c.setFont(small_font, small_size) | |
| c.drawString(x_img, text_y, cd) | |
| text_y -= small_line_height | |
| y = y - img_h - 8 | |
| except Exception: | |
| pass | |
| c.save() | |
| return | |
| # Fallback plain-text write if ReportLab unavailable | |
| with open(path, "w", encoding="utf-8") as f: | |
| f.write(title + "\n\n") | |
| f.write((narrative or "") + "\n\n") | |
| f.write("Per-frame detections:\n") | |
| for fr in frames: | |
| f.write(f"Frame {fr.get('frame_index')}:\n") | |
| dets = fr.get("detections", []) | |
| if not dets: | |
| f.write(" No detections\n") | |
| else: | |
| for d in dets: | |
| f.write(f" - {d}\n") | |
| def _load_gptoss_wrapper(): | |
| """ | |
| Load the blade-inspection-demo/gptoss_wrapper.py module by filepath so we don't rely on package imports. | |
| """ | |
| try: | |
| base = os.path.dirname(__file__) | |
| wrapper_path = os.path.join(base, "blade-inspection-demo", "gptoss_wrapper.py") | |
| if not os.path.exists(wrapper_path): | |
| # fallback: maybe file already at project root | |
| wrapper_path = os.path.join(base, "gptoss_wrapper.py") | |
| spec = importlib.util.spec_from_file_location("gptoss_wrapper", wrapper_path) | |
| module = importlib.util.module_from_spec(spec) | |
| spec.loader.exec_module(module) | |
| return getattr(module, "GPTOSSWrapper", None) | |
| except Exception as e: | |
| # Print diagnostic info to Space logs so we can see why the wrapper failed to import. | |
| print(f"DEBUG: failed to load GPT wrapper from {wrapper_path}: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return None | |
| def _build_prompt(frames): | |
| """ | |
| Build a compact prompt that summarizes the entire video while keeping prompt | |
| size bounded. We include: | |
| - video-level totals (frames, total detections, counts per class) | |
| - a concise list of frames that contain detections (frame index + short det summary) | |
| - an optional compact aggregate of visual metrics for the whole video | |
| The detailed per-frame visual descriptions remain in the report files (MD/PDF/JSON) | |
| but are not expanded fully in the prompt to avoid token limits. | |
| """ | |
| # Configs (env vars) | |
| try: | |
| max_prompt_frames = int(os.getenv("MAX_PROMPT_FRAMES", "200")) | |
| except Exception: | |
| max_prompt_frames = 200 | |
| total_frames = len(frames) | |
| total_detections = sum(len(f.get("detections", [])) for f in frames) | |
| # Aggregate counts per label and collect frames with detections | |
| counts = {} | |
| frames_with_dets = [] | |
| for f in frames: | |
| dets = f.get("detections", []) | |
| if dets: | |
| frames_with_dets.append(f) | |
| for d in dets: | |
| counts[d.get("label")] = counts.get(d.get("label"), 0) + 1 | |
| lines = [] | |
| lines.append("You are an expert inspection assistant for wind turbine blade images/videos.") | |
| lines.append(f"This video contains {total_frames} frames and {total_detections} total detections.") | |
| if counts: | |
| lines.append("Total detections by class: " + ", ".join([f"{k}({v})" for k, v in counts.items()])) | |
| else: | |
| lines.append("No detections were found in analyzed frames.") | |
| lines.append("") | |
| lines.append("Instructions: Based on the aggregate information and the selected frame summaries below, produce a concise inspection report that includes:") | |
| lines.append("- Summary of main findings") | |
| lines.append("- Suggested severity (low/medium/high) when appropriate") | |
| lines.append("- Recommended next steps for inspection/repair") | |
| lines.append("") | |
| # Include up to max_prompt_frames frames that have detections (prioritize them) | |
| include_list = frames_with_dets[:max_prompt_frames] | |
| lines.append(f"Included frame summaries (showing frames with detections, up to {max_prompt_frames} entries):") | |
| if not include_list: | |
| lines.append("No frames with detections to list (video may be clear or detections are below threshold).") | |
| else: | |
| for f in include_list: | |
| fid = f.get("frame_index") | |
| dets = f.get("detections", []) | |
| det_texts = [] | |
| for d in dets: | |
| conf = d.get("confidence") | |
| conf_s = f"{conf:.2f}" if isinstance(conf, float) else str(conf) | |
| det_texts.append(f"{d.get('label')}({conf_s})") | |
| # compact visual metrics (if present) | |
| visual = f.get("visual") or {} | |
| metric_parts = [] | |
| if visual.get("brightness") is not None: | |
| metric_parts.append(f"bright={visual['brightness']:.0f}") | |
| if visual.get("contrast") is not None: | |
| metric_parts.append(f"contrast={visual['contrast']:.0f}") | |
| if visual.get("blur") is not None: | |
| metric_parts.append(f"blur_var={visual['blur']:.0f}") | |
| if visual.get("dominant_rgb"): | |
| metric_parts.append(f"dominant_rgb={visual['dominant_rgb']}") | |
| metrics = "; ".join(metric_parts) | |
| if metrics: | |
| lines.append(f"Frame {fid}: " + ", ".join(det_texts) + f" [{metrics}]") | |
| else: | |
| lines.append(f"Frame {fid}: " + ", ".join(det_texts)) | |
| lines.append("") | |
| lines.append("NOTE: Full per-frame visual descriptions and images are attached in the generated report files. If you need a fully exhaustive token-by-token per-frame prompt, set FULL_FRAME_REPORT and increase MAX_PROMPT_FRAMES (may exceed model token limits).") | |
| lines.append("") | |
| lines.append("Produce the report in plain text, 6-10 short paragraphs. Also include 1-2 short sentences summarizing why the listed frames are noteworthy (e.g., what the detection likely means).") | |
| return "\n".join(lines) | |
| def generar_analisis_fuerte(media_path): | |
| """Generate strong analysis (PDF/MD/JSON) from a given media file path.""" | |
| if not media_path: | |
| return {"status": "no_input", "report_pdf": None, "report_md": None, "report_json": None} | |
| # Ensure the model is loaded inside the worker process | |
| global model | |
| model = get_model() | |
| tmpdir = tempfile.mkdtemp() | |
| frames = [] | |
| try: | |
| ext = os.path.splitext(media_path)[1].lower() | |
| # attempt to extract up to 3 frames/detections using the loaded YOLO model | |
| if ext in [".mp4", ".mov", ".avi", ".mkv"]: | |
| cap = cv2.VideoCapture(media_path) | |
| idx = 0 | |
| # Process all frames in the video. This may be expensive for long videos. | |
| # To limit processing, set the environment variable MAX_FRAMES to a positive integer. | |
| max_frames_env = os.getenv("MAX_FRAMES", "0") | |
| try: | |
| max_frames = int(max_frames_env) | |
| except Exception: | |
| max_frames = 0 | |
| if max_frames > 0: | |
| print(f"DEBUG: processing up to {max_frames} frames (MAX_FRAMES set)") | |
| else: | |
| print("DEBUG: processing all video frames for strong analysis (may be slow)...") | |
| # Sampling: process only every FRAME_STEP-th frame to reduce GPU load. | |
| try: | |
| frame_step = int(os.getenv("FRAME_STEP", "5")) | |
| if frame_step < 1: | |
| frame_step = 1 | |
| except Exception: | |
| frame_step = 5 | |
| while True: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| # Save every frame image to disk (keeps consistent indexing) but only run | |
| # detection on sampled frames to lower compute usage. | |
| tmpf = os.path.join(tmpdir, f"frame_{idx}.jpg") | |
| cv2.imwrite(tmpf, frame) | |
| if idx % frame_step == 0: | |
| # Run detection on sampled frame | |
| results = model.predict(source=tmpf, conf=0.25, iou=0.45) | |
| dets = [] | |
| if results and len(results) > 0: | |
| det_i = 0 | |
| full_img = cv2.imread(tmpf) | |
| h_full, w_full = (full_img.shape[:2] if full_img is not None else (0, 0)) | |
| for box in results[0].boxes: | |
| try: | |
| cls_id = int(box.cls[0]) | |
| label = model.names[cls_id] | |
| except Exception: | |
| label = "object" | |
| try: | |
| x1, y1, x2, y2 = map(int, box.xyxy[0]) | |
| except Exception: | |
| x1 = y1 = x2 = y2 = 0 | |
| try: | |
| confv = float(box.conf[0]) | |
| except Exception: | |
| confv = None | |
| det = {"label": label, "confidence": confv, "bbox": [x1, y1, x2, y2]} | |
| # Save cropped detection image if possible | |
| try: | |
| if full_img is not None and x2 > x1 and y2 > y1: | |
| # clamp coords | |
| x1c = max(0, min(x1, w_full - 1)) | |
| x2c = max(0, min(x2, w_full)) | |
| y1c = max(0, min(y1, h_full - 1)) | |
| y2c = max(0, min(y2, h_full)) | |
| if x2c > x1c and y2c > y1c: | |
| crop = full_img[y1c:y2c, x1c:x2c] | |
| crop_path = os.path.join(tmpdir, f"frame_{idx}_det_{det_i}.jpg") | |
| cv2.imwrite(crop_path, crop) | |
| det["image"] = crop_path | |
| # compute visual features for the crop and attach | |
| det["crop_visual"] = compute_visual_features(crop_path, [det]) | |
| except Exception: | |
| pass | |
| dets.append(det) | |
| det_i += 1 | |
| # Compute simple visual features for this saved frame | |
| visual = compute_visual_features(tmpf, dets) | |
| frames.append({"frame_index": idx, "detections": dets, "visual": visual, "image_path": tmpf}) | |
| else: | |
| # Non-sampled frame: still compute a cheap visual summary (no detections) | |
| visual = compute_visual_features(tmpf, []) | |
| frames.append({"frame_index": idx, "detections": [], "visual": visual, "image_path": tmpf}) | |
| idx += 1 | |
| if max_frames > 0 and idx >= max_frames: | |
| break | |
| cap.release() | |
| else: | |
| # single image | |
| results = model.predict(source=media_path, conf=0.25, iou=0.45) | |
| dets = [] | |
| if results and len(results) > 0: | |
| full_img = cv2.imread(media_path) | |
| h_full, w_full = (full_img.shape[:2] if full_img is not None else (0, 0)) | |
| det_i = 0 | |
| for box in results[0].boxes: | |
| try: | |
| cls_id = int(box.cls[0]) | |
| label = model.names[cls_id] | |
| except Exception: | |
| label = "object" | |
| try: | |
| x1, y1, x2, y2 = map(int, box.xyxy[0]) | |
| except Exception: | |
| x1 = y1 = x2 = y2 = 0 | |
| try: | |
| confv = float(box.conf[0]) | |
| except Exception: | |
| confv = None | |
| det = {"label": label, "confidence": confv, "bbox": [x1, y1, x2, y2]} | |
| # Save cropped detection image if possible | |
| try: | |
| if full_img is not None and x2 > x1 and y2 > y1: | |
| x1c = max(0, min(x1, w_full - 1)) | |
| x2c = max(0, min(x2, w_full)) | |
| y1c = max(0, min(y1, h_full - 1)) | |
| y2c = max(0, min(y2, h_full)) | |
| if x2c > x1c and y2c > y1c: | |
| crop = full_img[y1c:y2c, x1c:x2c] | |
| crop_path = os.path.join(tmpdir, f"frame_0_det_{det_i}.jpg") | |
| cv2.imwrite(crop_path, crop) | |
| det["image"] = crop_path | |
| det["crop_visual"] = compute_visual_features(crop_path, [det]) | |
| except Exception: | |
| pass | |
| dets.append(det) | |
| det_i += 1 | |
| # Compute visual features for single image | |
| visual = compute_visual_features(media_path, dets) | |
| frames.append({"frame_index": 0, "detections": dets, "visual": visual, "image_path": media_path}) | |
| prompt = _build_prompt(frames) | |
| GPTClass = _load_gptoss_wrapper() | |
| narrative = None | |
| if GPTClass: | |
| try: | |
| # Allow overriding model via env var MODEL_ID (e.g. "openai/gpt-oss-120b:fireworks-ai") | |
| model_id = os.getenv("MODEL_ID", "gpt-oss-120") | |
| print(f"DEBUG: [gpt] using model_id={model_id}, HF_USE_ROUTER={os.getenv('HF_USE_ROUTER')}") | |
| wrapper = GPTClass(model=model_id) | |
| # DEBUG: print prompt (truncated) so Space logs show the request | |
| try: | |
| print("DEBUG: [gpt] sending prompt (truncated 2000 chars):") | |
| print(prompt[:2000]) | |
| except Exception: | |
| print("DEBUG: [gpt] (failed to print prompt)") | |
| narrative = wrapper.generate(prompt) | |
| # DEBUG: print a truncated portion of the response | |
| try: | |
| print("DEBUG: [gpt] response (truncated 2000 chars):") | |
| print((narrative or "")[:2000]) | |
| except Exception: | |
| print("DEBUG: [gpt] (failed to print response)") | |
| except Exception as e: | |
| narrative = f"(GPT call failed) {e}" | |
| print("DEBUG: [gpt] call failed:", e) | |
| else: | |
| narrative = "(GPT wrapper unavailable) Fallback summary:\n" | |
| counts = {} | |
| for f in frames: | |
| for d in f.get("detections", []): | |
| counts[d["label"]] = counts.get(d["label"], 0) + 1 | |
| narrative += "Detected classes: " + ", ".join([f"{k}({v})" for k, v in counts.items()]) if counts else "No detections" | |
| # Write Markdown | |
| report_md = os.path.join(tmpdir, "report.md") | |
| with open(report_md, "w", encoding="utf-8") as md: | |
| md.write("# Informe de inspección (Generar analisis fuerte)\n\n") | |
| md.write(narrative or "Sin narrativa disponible.\n\n") | |
| md.write("\n## Per-frame detections\n\n") | |
| for f in frames: | |
| fid = f.get("frame_index") | |
| md.write(f"- Frame {fid}:\n") | |
| dets = f.get("detections", []) | |
| if not dets: | |
| md.write(" No detections\n") | |
| else: | |
| for i, d in enumerate(dets): | |
| md.write(f" - {d.get('label')}({d.get('confidence')}) bbox={d.get('bbox')}\n") | |
| if d.get("image"): | |
| # Embed the cropped detection image | |
| md.write(f" })\n") | |
| # Add crop visual description if available | |
| cviz = d.get("crop_visual") | |
| if cviz and cviz.get("description"): | |
| md.write(f" Description: {cviz.get('description')}\n") | |
| # Write JSON | |
| report_json = os.path.join(tmpdir, "report.json") | |
| with open(report_json, "w", encoding="utf-8") as jf: | |
| json.dump({"narrative": narrative, "frames": frames}, jf, indent=2) | |
| # Write PDF | |
| report_pdf = os.path.join(tmpdir, "report.pdf") | |
| _write_pdf(report_pdf, "Informe de inspección - Generar analisis fuerte", narrative, frames) | |
| return {"status": "done", "report_pdf": report_pdf, "report_md": report_md, "report_json": report_json} | |
| except Exception as e: | |
| return {"status": f"error: {e}", "report_pdf": None, "report_md": None, "report_json": None} | |
| # ──────────────────────────── | |
| with gr.Blocks( | |
| title="KESHERAT AI", | |
| theme=gr.themes.Soft(), | |
| ) as demo: | |
| gr.HTML(""" | |
| <link rel=\"stylesheet\" href=\"/file=assets/kesheret.css\" /> | |
| <div class=\"kesheret-header\"><h1>KESHERAT AI</h1><p>KESHERAT AI</p></div> | |
| <style> | |
| /* FORCE DARK BLUE - INLINE STYLES HAVE HIGHEST PRIORITY */ | |
| button, .gr-button, .gradio-button, [data-testid*="button"] { | |
| background: #031F33 !important; | |
| background-color: #031F33 !important; | |
| background-image: none !important; | |
| border: 1px solid #031F33 !important; | |
| color: white !important; | |
| } | |
| button:hover, .gr-button:hover, .gradio-button:hover, [data-testid*="button"]:hover { | |
| background: #004D85 !important; | |
| background-color: #004D85 !important; | |
| border-color: #004D85 !important; | |
| } | |
| /* Tab buttons */ | |
| .tab-nav button, button[role="tab"] { | |
| background: #031F33 !important; | |
| background-color: #031F33 !important; | |
| color: white !important; | |
| } | |
| .tab-nav button.selected, .tab-nav button:hover, button[role="tab"]:hover { | |
| background: #004D85 !important; | |
| background-color: #004D85 !important; | |
| } | |
| /* Override any purple colors */ | |
| [style*="rgb(139, 69, 255)"], [style*="rgb(168, 85, 247)"], [style*="#8b45ff"], [style*="#a855f7"] { | |
| background: #031F33 !important; | |
| background-color: #031F33 !important; | |
| } | |
| </style> | |
| <script> | |
| // JavaScript to force dark blue colors after page loads | |
| function forceDarkBlue() { | |
| const buttons = document.querySelectorAll('button, .gr-button, .gradio-button, [data-testid*="button"]'); | |
| buttons.forEach(btn => { | |
| btn.style.setProperty('background', '#031F33', 'important'); | |
| btn.style.setProperty('background-color', '#031F33', 'important'); | |
| btn.style.setProperty('background-image', 'none', 'important'); | |
| btn.style.setProperty('border', '1px solid #031F33', 'important'); | |
| btn.style.setProperty('color', 'white', 'important'); | |
| }); | |
| // Override any purple elements | |
| const purpleElements = document.querySelectorAll('[style*="rgb(139, 69, 255)"], [style*="rgb(168, 85, 247)"], [style*="#8b45ff"], [style*="#a855f7"]'); | |
| purpleElements.forEach(el => { | |
| el.style.setProperty('background', '#031F33', 'important'); | |
| el.style.setProperty('background-color', '#031F33', 'important'); | |
| }); | |
| } | |
| // Run immediately and on DOM changes | |
| forceDarkBlue(); | |
| setTimeout(forceDarkBlue, 1000); | |
| setTimeout(forceDarkBlue, 3000); | |
| // Watch for DOM changes and reapply | |
| const observer = new MutationObserver(forceDarkBlue); | |
| observer.observe(document.body, { childList: true, subtree: true }); | |
| </script> | |
| """) | |
| # Access gate (token) — app is public but gated by token | |
| with gr.Group(visible=True) as gate_group: | |
| gr.Markdown("### Acceso — Introduce tu token de seguridad") | |
| gate_token = gr.Textbox(label="Token", type="password", placeholder="Introduce tu token") | |
| gate_status = gr.Markdown(visible=False) | |
| btn_enter = gr.Button("Entrar") | |
| with gr.Group(visible=False) as app_group: | |
| # Input section: tabs for different media types | |
| with gr.Tabs() as media_tabs: | |
| # Video tab: only video input | |
| with gr.TabItem("Vídeo"): | |
| video_input = gr.Video(label="Sube tu vídeo de inspección") | |
| # Imagen tab: only image input | |
| with gr.TabItem("Imagen"): | |
| image_input = gr.Image(type="filepath", label="Sube una imagen de inspección") | |
| # Configuración tab: only classes tools | |
| with gr.TabItem("Configuración"): | |
| btn_classes = gr.Button("Mostrar clases del modelo") | |
| txt_classes = gr.Textbox(label="Clases cargadas", interactive=False) | |
| btn_classes.click(fn=show_classes, outputs=txt_classes) | |
| # Reportes tab: only report tools | |
| with gr.TabItem("Reportes"): | |
| btn_report = gr.Button("Generar analisis fuerte") | |
| status = gr.Textbox(label="Estado", interactive=False) | |
| pdf_out = gr.File(label="Reporte PDF") | |
| md_out = gr.File(label="Reporte Markdown") | |
| json_out = gr.File(label="Reporte JSON") | |
| def _on_report(vid, img): | |
| path = None | |
| if vid: | |
| path = vid | |
| elif img: | |
| path = img if isinstance(img, str) else getattr(img, "name", None) | |
| if not path: | |
| return "No media provided", None, None, None | |
| res = generar_analisis_fuerte(path) | |
| return res.get("status", "error"), (res.get("report_pdf") if res.get("report_pdf") else None), (res.get("report_md") if res.get("report_md") else None), (res.get("report_json") if res.get("report_json") else None) | |
| btn_report.click(fn=_on_report, inputs=[video_input, image_input], outputs=[status, pdf_out, md_out, json_out]) | |
| # Métricas tab: only metrics tools | |
| with gr.TabItem("Métricas"): | |
| btn_metrics = gr.Button("Ver métricas") | |
| out_metrics = gr.JSON(label="Métricas", visible=True) | |
| btn_metrics.click(fn=get_metrics, outputs=out_metrics, api_name="metrics") | |
| # Detection button (always visible after token) | |
| btn_detect = gr.Button("Detectar defectos", variant="primary") | |
| # Output section: results appear here after detection | |
| output_video = gr.Video(label="Vídeo anotado", visible=False) | |
| output_image = gr.Image(label="Imagen anotada", visible=False) | |
| # Analysis text below the image | |
| analysis_text = gr.Markdown(label="Análisis de IA", visible=False) | |
| # Hidden JSON components for API chaining | |
| json_video = gr.JSON(visible=False) | |
| json_image = gr.JSON(visible=False) | |
| # Functions to show/hide outputs based on active tab and update content | |
| def _update_video_output(json_result): | |
| if json_result and json_result.get("video"): | |
| return gr.Video(value=json_result["video"], visible=True), gr.Image(visible=False), gr.Markdown(visible=False) | |
| return gr.Video(visible=False), gr.Image(visible=False), gr.Markdown(visible=False) | |
| def _update_image_output(json_result): | |
| if json_result and json_result.get("path"): | |
| # Generar resumen de detecciones para el análisis GPT | |
| classes = json_result.get("classes", {}) | |
| if classes: | |
| detections_summary = "Detecciones automáticas: " + ", ".join([f"{k}: {v}" for k, v in classes.items()]) | |
| else: | |
| detections_summary = "No se detectaron defectos automáticamente" | |
| # Obtener análisis de GPT | |
| analysis = analyze_image_with_gpt(json_result["path"], detections_summary) | |
| return ( | |
| gr.Video(visible=False), | |
| gr.Image(value=json_result["path"], visible=True), | |
| gr.Markdown(value=analysis, visible=True) | |
| ) | |
| return gr.Video(visible=False), gr.Image(visible=False), gr.Markdown(visible=False) | |
| # Wire up the detection events with proper output visibility | |
| ev_video = btn_detect.click(fn=infer_media, inputs=video_input, outputs=json_video, api_name="infer_media") | |
| ev_video.then(_update_video_output, inputs=json_video, outputs=[output_video, output_image, analysis_text]) | |
| ev_image = btn_detect.click(fn=infer_media, inputs=image_input, outputs=json_image, api_name="infer_media_1") | |
| ev_image.then(_update_image_output, inputs=json_image, outputs=[output_video, output_image, analysis_text]) | |
| # Wire the gate | |
| btn_enter.click(fn=_check_token, inputs=[gate_token], outputs=[gate_group, app_group, gate_status]) | |
| # Habilitar cola para ZeroGPU | |
| demo.queue() | |
| if __name__ == "__main__": | |
| # Permitir acceso de descarga a directorio temporal para evitar 403 | |
| demo.launch(allowed_paths=[tempfile.gettempdir()]) | |