KESHERAT AI
Sistema Inteligente de Inspección para Turbinas Eólicas
import gradio as gr import tempfile import json import shutil import os import cv2 import numpy as np import importlib import requests import textwrap # Optional PDF reporting: import reportlab safely and set a flag. # REPORTLAB_AVAILABLE will be used by _write_pdf to select the PDF code path. try: from reportlab.lib.pagesizes import A4 from reportlab.pdfgen import canvas REPORTLAB_AVAILABLE = True except Exception: REPORTLAB_AVAILABLE = False # ZeroGPU: decorador para marcar funciones GPU. Fallback local si no existe try: import spaces # provisto en HF Spaces GPU_DECORATOR = spaces.GPU except Exception: def GPU_DECORATOR(func=None, **kwargs): # Permite usar @GPU_DECORATOR o @GPU_DECORATOR(...) if func is None: def wrap(f): return f return wrap return func # ──────────────────────────── # Configuración # ──────────────────────────── os.environ["OMP_NUM_THREADS"] = "1" # evita warnings de OpenMP # Configuración KESHERAT AI para detección inteligente # Consultas organizadas por categorías con colores específicos y umbrales DETECTION_CATEGORIES = { "structural": { "queries": ["bolt", "screw", "fastener", "tornillo"], "color": (0, 255, 0), # Verde brillante para elementos estructurales "name": "Est", # Nombre corto "threshold": 0.15 # Umbral más alto para reducir falsos positivos }, "damage": { "queries": ["damage", "crack", "break", "daño", "grieta"], "color": (0, 0, 255), # Azul para daños "name": "Daño", "threshold": 0.2 # Umbral alto para daños críticos }, "dirt": { "queries": ["dirt", "stain", "contamination", "suciedad", "mancha"], "color": (0, 255, 255), # Cian para suciedad "name": "Suc", "threshold": 0.25 # Umbral alto para suciedad significativa }, "erosion": { "queries": ["leading edge erosion", "blade erosion", "surface erosion", "erosión del borde de ataque", "erosión de pala", "desgaste severo"], "color": (255, 0, 255), # Magenta para erosión "name": "Ero", "threshold": 0.35 # Umbral muy alto para erosión específica } } # Diccionario de traducción de términos técnicos al español TRANSLATIONS = { # Elementos estructurales "bolt": "perno", "screw": "tornillo", "fastener": "sujetador", "tornillo": "tornillo", # Daños "damage": "daño", "crack": "grieta", "break": "rotura", "daño": "daño", "grieta": "grieta", # Suciedad "dirt": "suciedad", "stain": "mancha", "contamination": "contaminación", "suciedad": "suciedad", "mancha": "mancha", # Erosión específica "leading edge erosion": "erosión del borde", "blade erosion": "erosión de pala", "surface erosion": "erosión superficial", "erosión del borde de ataque": "erosión del borde", "erosión de pala": "erosión de pala", "desgaste severo": "desgaste severo", "erosion": "erosión", "wear": "desgaste", "corrosion": "corrosión", "erosión": "erosión", "desgaste": "desgaste", # Términos generales "unknown": "desconocido" } def update_detection_thresholds(structural_th, damage_th, dirt_th, erosion_th): """Actualiza los umbrales de detección dinámicamente.""" global DETECTION_CATEGORIES DETECTION_CATEGORIES["structural"]["threshold"] = structural_th DETECTION_CATEGORIES["damage"]["threshold"] = damage_th DETECTION_CATEGORIES["dirt"]["threshold"] = dirt_th DETECTION_CATEGORIES["erosion"]["threshold"] = erosion_th return f"✅ Umbrales actualizados: Estructural={structural_th}, Daño={damage_th}, Suciedad={dirt_th}, Erosión={erosion_th}" def detect_multiple_categories(wrapper, image_path, base_threshold=0.1): """ Realiza detección inteligente con KESHERAT AI y combina resultados. Usa umbrales específicos por categoría para mejor precisión. """ all_detections = {} total_count = 0 for category_name, category_info in DETECTION_CATEGORIES.items(): category_threshold = category_info.get("threshold", base_threshold) print(f"🔍 Detectando {category_info['name']} con KESHERAT AI...") combined_detections = [] # 1. DETECTAR CON OWL-V2 try: print(f" 🦉 Probando OWL-V2 (umbral: {category_threshold})...") owlv2_result = wrapper.detect_objects_owlv2( image_path, category_info["queries"], threshold=category_threshold ) owlv2_detections = owlv2_result.get("detections", []) combined_detections.extend(owlv2_detections) print(f" ✅ OWL-V2 encontró {len(owlv2_detections)} detecciones") except Exception as e: print(f" ⚠️ OWL-V2 falló: {e}") # 2. DETECTAR CON GROUNDING DINO try: print(f" 🎯 Probando Grounding DINO...") dino_result = wrapper.detect_objects_grounding_dino( image_path, category_info["queries"], threshold=category_threshold ) dino_detections = dino_result.get("detections", []) combined_detections.extend(dino_detections) print(f" ✅ Grounding DINO encontró {len(dino_detections)} detecciones") except Exception as e: print(f" ⚠️ Grounding DINO falló: {e}") # 3. GUARDAR RESULTADOS COMBINADOS if combined_detections: all_detections[category_name] = { "detections": combined_detections, "color": category_info["color"], "name": category_info["name"], "count": len(combined_detections) } total_count += len(combined_detections) print(f" 🎯 Total combinado para {category_info['name']}: {len(combined_detections)} detecciones") else: print(f" ❌ No se encontraron detecciones de {category_info['name']} en ningún modelo") print(f"🎯 TOTAL GENERAL (KESHERAT AI): {total_count} detecciones") return all_detections def draw_categorized_detections(img, categorized_detections): """ Dibuja las detecciones en la imagen con colores específicos por categoría. Filtra y limita detecciones para evitar saturación visual. """ # Umbral mínimo para mostrar detecciones MIN_CONFIDENCE_DISPLAY = 0.2 MAX_DETECTIONS_PER_CATEGORY = 6 # Máximo por categoría for _, category_data in categorized_detections.items(): detections = category_data["detections"] color = category_data["color"] category_display_name = category_data["name"] # Filtrar por confianza y limitar cantidad filtered_detections = [d for d in detections if d.get("confidence", 0) >= MIN_CONFIDENCE_DISPLAY] filtered_detections.sort(key=lambda x: x.get("confidence", 0), reverse=True) filtered_detections = filtered_detections[:MAX_DETECTIONS_PER_CATEGORY] for detection in filtered_detections: confidence = detection.get("confidence", 0.0) bbox = detection.get("bbox", [0, 0, 0, 0]) x1, y1, x2, y2 = map(int, bbox) # Hacer las cajas más pequeñas (reducir 15% en cada lado) width = x2 - x1 height = y2 - y1 margin_x = int(width * 0.075) margin_y = int(height * 0.075) x1 += margin_x y1 += margin_y x2 -= margin_x y2 -= margin_y # Dibujar rectángulo más fino cv2.rectangle(img, (x1, y1), (x2, y2), color, 1) # Obtener el nombre específico del objeto detectado label = detection.get("label", "unknown") # Traducir al español label_spanish = TRANSLATIONS.get(label, label) # Texto con nombre específico del objeto en español text = f"{label_spanish}: {confidence:.2f}" # Fuente más pequeña font_scale = 0.4 thickness = 1 # Fondo semi-transparente para el texto (text_width, text_height), _ = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, font_scale, thickness) # Crear overlay para transparencia overlay = img.copy() cv2.rectangle(overlay, (x1, y1 - text_height - 6), (x1 + text_width + 4, y1), color, -1) cv2.addWeighted(overlay, 0.7, img, 0.3, 0, img) # Texto con contorno para mejor legibilidad cv2.putText(img, text, (x1 + 2, y1 - 3), cv2.FONT_HERSHEY_SIMPLEX, font_scale, (0, 0, 0), thickness + 1) # Contorno negro cv2.putText(img, text, (x1 + 2, y1 - 3), cv2.FONT_HERSHEY_SIMPLEX, font_scale, (255, 255, 255), thickness) # Texto blanco return img def get_all_queries(): """Retorna todas las queries de todas las categorías como una lista plana.""" all_queries = [] for category_info in DETECTION_CATEGORIES.values(): all_queries.extend(category_info["queries"]) return all_queries # ──────────────────────────── # Métricas simples (persistidas en /tmp) # ──────────────────────────── METRICS_PATH = os.path.join(tempfile.gettempdir(), "blade_metrics.json") def _load_metrics(): try: if os.path.exists(METRICS_PATH): with open(METRICS_PATH, "r", encoding="utf-8") as f: return json.load(f) except Exception: pass return { "total_jobs": 0, "videos": 0, "images": 0, "detections_total": 0, "per_label": {}, "last_job": None, } def _save_metrics(m): try: with open(METRICS_PATH, "w", encoding="utf-8") as f: json.dump(m, f, ensure_ascii=False, indent=2) except Exception: pass def _record_metrics(job_type, counts): m = _load_metrics() m["total_jobs"] += 1 if job_type == "video": m["videos"] += 1 elif job_type == "image": m["images"] += 1 dets = int(sum(counts.values())) if isinstance(counts, dict) else 0 m["detections_total"] += dets # per label aggregate if isinstance(counts, dict): per = m.get("per_label", {}) for k, v in counts.items(): per[k] = int(per.get(k, 0)) + int(v) m["per_label"] = per m["last_job"] = {"type": job_type, "detections": dets} _save_metrics(m) def get_metrics(): """Devuelve el snapshot actual de métricas.""" return _load_metrics() # ──────────────────────────── # Funciones de Inferencia # ──────────────────────────── @GPU_DECORATOR def infer_media(media_path, conf=0.1, out_res="720p"): """ Procesa un fichero de vídeo o imagen usando KESHERAT AI para detección inteligente. Retornos: - Vídeo: {"video": out_vid_path, "classes": {label: count, ...}} - Imagen: {"path": out_img_path, "classes": {label: count, ...}} """ if not media_path: # Si no hay entrada (p.ej., se pulsó el botón en la otra pestaña), no fallar. return {} ext = os.path.splitext(media_path)[1].lower() tmpdir = tempfile.mkdtemp() # Resolución objetivo res_map = {"360p": (640, 360), "480p": (854, 480), "720p": (1280, 720)} target_size = res_map.get(out_res) # ─ Vídeo ─────────────────────────────────────────────────────── if ext in [".mp4", ".mov", ".avi", ".mkv"]: in_vid = os.path.join(tmpdir, "in.mp4") out_vid = os.path.join(tmpdir, "out.mp4") shutil.copy(media_path, in_vid) # FPS del vídeo (opcional: tomar real si existe) cap = cv2.VideoCapture(in_vid) fps = cap.get(cv2.CAP_PROP_FPS) or 30 try: fps = float(fps) if fps <= 0 or fps != fps: # NaN check fps = 30 except Exception: fps = 30 writer = None counts = {} # Configurar modelos de detección (OWL-V2 + Grounding DINO) try: GPTClass = _load_gptoss_wrapper() if GPTClass: wrapper = GPTClass() print("Wrapper de detección configurado correctamente") else: wrapper = None print("No se pudo cargar el wrapper de detección") except Exception as e: print(f"Error configurando modelos de detección: {e}") wrapper = None # Procesar frames con OWL-V2 (cada 30 frames para eficiencia) cap = cv2.VideoCapture(in_vid) frame_idx = 0 while True: ret, frame = cap.read() if not ret: break # Procesar solo cada 30 frames con OWL-V2 para eficiencia if wrapper and frame_idx % 30 == 0: try: # Guardar frame temporal temp_frame_path = os.path.join(tmpdir, f"temp_frame_{frame_idx}.jpg") cv2.imwrite(temp_frame_path, frame) # Detectar con OWL-V2 detection_result = wrapper.detect_objects_owlv2(temp_frame_path, get_all_queries(), threshold=0.1) detections = detection_result.get("detections", []) # Dibujar detecciones for detection in detections: label = detection.get("label", "unknown") confidence = detection.get("confidence", 0.0) bbox = detection.get("bbox", [0, 0, 0, 0]) x1, y1, x2, y2 = map(int, bbox) counts[label] = counts.get(label, 0) + 1 # Rectángulo cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2) # Texto con confianza text = f"{label} ({confidence:.2f})" cv2.putText(frame, text, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2) # Limpiar archivo temporal if os.path.exists(temp_frame_path): os.remove(temp_frame_path) except Exception as e: print(f"Error procesando frame {frame_idx}: {e}") # Redimensionar si es necesario if target_size: frame = cv2.resize(frame, target_size) # Configurar writer en el primer frame if writer is None: h, w = frame.shape[:2] fourcc = cv2.VideoWriter_fourcc(*"mp4v") writer = cv2.VideoWriter(out_vid, fourcc, fps, (w, h)) writer.write(frame) frame_idx += 1 if writer: writer.release() if cap: cap.release() # registrar métricas _record_metrics("video", counts) return {"video": out_vid, "classes": counts} # ─ Imagen ────────────────────────────────────────────────────── elif ext in [".jpg", ".jpeg", ".png", ".bmp"]: img = cv2.imread(media_path) # Usar modelos de detección zero-shot con múltiples categorías try: GPTClass = _load_gptoss_wrapper() if GPTClass: wrapper = GPTClass() print(f"🔍 Iniciando detección multi-categoría en imagen: {media_path}") # Usar el nuevo sistema de múltiples categorías categorized_detections = detect_multiple_categories(wrapper, media_path, base_threshold=0.1) # Dibujar detecciones categorizadas con colores específicos if categorized_detections: img = draw_categorized_detections(img, categorized_detections) # Crear counts para compatibilidad con el resto del código counts = {} for category_name, category_data in categorized_detections.items(): for detection in category_data["detections"]: label = detection.get("label", "unknown") counts[label] = counts.get(label, 0) + 1 total_detections = sum(counts.values()) print(f"🎯 Total de detecciones encontradas: {total_detections}") else: print("Wrapper no disponible, sin detecciones") counts = {} except Exception as e: print(f"Error en detección zero-shot: {e}") counts = {} if target_size: img = cv2.resize(img, target_size) out_path = os.path.join(tmpdir, "annotated.png") cv2.imwrite(out_path, img) # registrar métricas _record_metrics("image", counts) return {"path": out_path, "classes": counts} else: raise ValueError(f"Formato no soportado: {ext}") def show_classes(): """Devuelve las capacidades de detección que KESHERAT AI puede realizar organizadas por categorías.""" result = [] for category_name, category_info in DETECTION_CATEGORIES.items(): queries = ", ".join(category_info["queries"]) result.append(f"{category_info['name']}: {queries}") return " | ".join(result) # Funciones auxiliares para extraer el recurso de salida desde el dict def analyze_image_with_ai(image_path, detections_summary=""): """ Análisis basado en las detecciones de KESHERAT AI. Reporta los resultados del análisis multimodal inteligente. """ if not detections_summary or detections_summary == "No se detectaron defectos automáticamente": return """ ## 🔍 **Análisis de Inspección - KESHERAT AI** **Estado General:** No se detectaron defectos significativos con el análisis automático. **Recomendación:** Continuar con inspección visual manual para verificar áreas que podrían no ser detectables automáticamente. """ return f""" ## 🔍 **Análisis de Inspección - KESHERAT AI** **Detecciones Automáticas Encontradas:** {detections_summary} **Estado General:** Se detectaron elementos estructurales y posibles defectos que requieren atención. **Recomendaciones:** - ✅ **Elementos Estructurales**: Verificar estado de tornillos y elementos de fijación detectados - ⚠️ **Daños Detectados**: Inspeccionar visualmente las áreas marcadas como daños - 🧹 **Suciedad**: Limpiar áreas con acumulación de suciedad detectada - 🔧 **Erosión**: Evaluar áreas de erosión para determinar necesidad de reparación **Nota:** Este análisis utiliza tecnología de IA multimodal avanzada para máxima precisión. Se recomienda inspección visual adicional por personal técnico especializado. """ # Función eliminada - ya no usamos análisis con GPT/Qwen def _check_token(token: str): """Token gate for public app. Expected token via env APP_ACCESS_TOKEN or KESHERAT_TOKEN. Defaults to 'KESHERAT' if none provided. Returns visibility updates for [gate_group, app_group, gate_status].""" expected = os.getenv("APP_ACCESS_TOKEN") or os.getenv("KESHERAT_TOKEN") or "KESHERAT" ok = str(token or "").strip() == str(expected).strip() if ok: return gr.update(visible=False), gr.update(visible=True), gr.update(visible=False, value="") else: return gr.update(visible=True), gr.update(visible=False), gr.update(visible=True, value="Token inválido. Intenta nuevamente.") def compute_visual_features(image_path, detections=None): """Compute simple visual features and return a short description plus numeric metrics. Returns a dict with keys: - width, height - brightness (mean grayscale) - contrast (std grayscale) - blur (variance of Laplacian; lower = blurrier) - dominant_rgb (tuple) - object_count - avg_bbox_area - description (short natural language sentence) """ try: img = cv2.imread(image_path) if img is None: return {} h, w = img.shape[:2] gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) brightness = float(np.mean(gray)) contrast = float(np.std(gray)) lap = cv2.Laplacian(gray, cv2.CV_64F) blur = float(np.var(lap)) # Mean color as a simple dominant color proxy (convert BGR -> RGB) mean_bgr = cv2.mean(img)[:3] dominant_rgb = (int(mean_bgr[2]), int(mean_bgr[1]), int(mean_bgr[0])) obj_counts = 0 avg_bbox_area = 0.0 if detections: obj_counts = len(detections) areas = [] for d in detections: bbox = d.get("bbox", [0, 0, 0, 0]) try: area = max(0, (bbox[2] - bbox[0]) * (bbox[3] - bbox[1])) except Exception: area = 0 areas.append(area) if areas: avg_bbox_area = float(sum(areas) / len(areas)) # Human-friendly descriptors bright_desc = "bright" if brightness > 130 else ("dim" if brightness < 80 else "moderately lit") contrast_desc = "high contrast" if contrast > 60 else ("low contrast" if contrast < 30 else "moderate contrast") blur_desc = "blurry" if blur < 100 else "sharp" desc = f"Image appears {bright_desc}, with {contrast_desc}, and is {blur_desc}. Dominant color approx RGB{dominant_rgb}. Detected {obj_counts} objects in view." return { "width": w, "height": h, "brightness": brightness, "contrast": contrast, "blur": blur, "dominant_rgb": dominant_rgb, "object_count": obj_counts, "avg_bbox_area": avg_bbox_area, "description": desc, } except Exception: return {} # ──────────────────────────── # Helpers for multimodal reporting (PDF/MD/JSON) # ──────────────────────────── def _write_pdf(path: str, title: str, narrative: str, frames): """ Write a wrapped, layout-friendly PDF. This version increases margins, reduces font sizes, and wraps long lines to avoid cutting text. """ if REPORTLAB_AVAILABLE: c = canvas.Canvas(path, pagesize=A4) width, height = A4 margin = 60 y = height - margin # Fonts and sizes title_font = "Helvetica-Bold" body_font = "Helvetica" small_font = "Helvetica" title_size = 13 body_size = 9 small_size = 8 line_height = body_size * 1.18 small_line_height = small_size * 1.12 def wrap_text(text, font_size, max_width): approx_char_width = font_size * 0.55 max_chars = max(30, int(max_width / approx_char_width)) out = [] for para in str(text or "").splitlines(): wrapped = textwrap.wrap(para, width=max_chars) out.extend(wrapped if wrapped else [""]) return out # Title c.setFont(title_font, title_size) for tline in wrap_text(title, title_size, width - 2 * margin): if y < margin + title_size * 1.5: c.showPage() y = height - margin c.setFont(title_font, title_size) c.drawString(margin, y, tline) y -= title_size * 1.25 y -= 6 # Narrative c.setFont(body_font, body_size) for line in wrap_text(narrative or "", body_size, width - 2 * margin): if y < margin + line_height: c.showPage() y = height - margin c.setFont(body_font, body_size) c.drawString(margin, y, line) y -= line_height y -= 8 c.setFont("Helvetica-Bold", 11) if y < margin + 30: c.showPage() y = height - margin c.setFont("Helvetica-Bold", 11) c.drawString(margin, y, "Per-frame detections:") y -= 14 c.setFont(small_font, small_size) for f in frames: if y < margin + 90: c.showPage() y = height - margin c.setFont(small_font, small_size) c.drawString(margin, y, f"Frame {f.get('frame_index')}:") y -= small_line_height dets = f.get("detections", []) if not dets: if y < margin + small_line_height: c.showPage() y = height - margin c.setFont(small_font, small_size) c.drawString(margin + 12, y, "No detections") y -= small_line_height else: for d in dets: det_text = f"- {d.get('label')} | conf={d.get('confidence')} | bbox={d.get('bbox')}" text_max_width = width - 2 * margin - 140 for dl in wrap_text(det_text, small_size, text_max_width): if y < margin + small_line_height: c.showPage() y = height - margin c.setFont(small_font, small_size) c.drawString(margin + 12, y, dl) y -= small_line_height try: img_path = d.get("image") if img_path and os.path.exists(img_path): img_w = 110 img_h = 65 if y < margin + img_h + 20: c.showPage() y = height - margin c.setFont(small_font, small_size) x_img = width - margin - img_w y_img = y - img_h + 6 c.drawImage(img_path, x_img, y_img, width=img_w, height=img_h, preserveAspectRatio=True, mask='auto') crop_desc = None if isinstance(d.get("crop_visual"), dict): crop_desc = d["crop_visual"].get("description") if crop_desc: cd_lines = wrap_text(crop_desc, small_size, img_w) text_y = y_img - 12 for cd in cd_lines: if text_y < margin + 20: c.showPage() y = height - margin text_y = y - img_h - 12 c.setFont(small_font, small_size) c.drawString(x_img, text_y, cd) text_y -= small_line_height y = y - img_h - 8 except Exception: pass c.save() return # Fallback plain-text write if ReportLab unavailable with open(path, "w", encoding="utf-8") as f: f.write(title + "\n\n") f.write((narrative or "") + "\n\n") f.write("Per-frame detections:\n") for fr in frames: f.write(f"Frame {fr.get('frame_index')}:\n") dets = fr.get("detections", []) if not dets: f.write(" No detections\n") else: for d in dets: f.write(f" - {d}\n") def _load_gptoss_wrapper(): """ Load the blade-inspection-demo/gptoss_wrapper.py module by filepath so we don't rely on package imports. """ try: base = os.path.dirname(__file__) wrapper_path = os.path.join(base, "blade-inspection-demo", "gptoss_wrapper.py") if not os.path.exists(wrapper_path): # fallback: maybe file already at project root wrapper_path = os.path.join(base, "gptoss_wrapper.py") spec = importlib.util.spec_from_file_location("gptoss_wrapper", wrapper_path) module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) return getattr(module, "GPTOSSWrapper", None) except Exception as e: # Print diagnostic info to Space logs so we can see why the wrapper failed to import. print(f"DEBUG: failed to load GPT wrapper from {wrapper_path}: {e}") import traceback traceback.print_exc() return None def _build_prompt(frames): """ Build a compact prompt that summarizes the entire video while keeping prompt size bounded. We include: - video-level totals (frames, total detections, counts per class) - a concise list of frames that contain detections (frame index + short det summary) - an optional compact aggregate of visual metrics for the whole video The detailed per-frame visual descriptions remain in the report files (MD/PDF/JSON) but are not expanded fully in the prompt to avoid token limits. """ # Configs (env vars) try: max_prompt_frames = int(os.getenv("MAX_PROMPT_FRAMES", "200")) except Exception: max_prompt_frames = 200 total_frames = len(frames) total_detections = sum(len(f.get("detections", [])) for f in frames) # Aggregate counts per label and collect frames with detections counts = {} frames_with_dets = [] for f in frames: dets = f.get("detections", []) if dets: frames_with_dets.append(f) for d in dets: counts[d.get("label")] = counts.get(d.get("label"), 0) + 1 lines = [] lines.append("You are an expert inspection assistant for wind turbine blade images/videos.") lines.append(f"This video contains {total_frames} frames and {total_detections} total detections.") if counts: lines.append("Total detections by class: " + ", ".join([f"{k}({v})" for k, v in counts.items()])) else: lines.append("No detections were found in analyzed frames.") lines.append("") lines.append("Instructions: Based on the aggregate information and the selected frame summaries below, produce a concise inspection report that includes:") lines.append("- Summary of main findings") lines.append("- Suggested severity (low/medium/high) when appropriate") lines.append("- Recommended next steps for inspection/repair") lines.append("") # Include up to max_prompt_frames frames that have detections (prioritize them) include_list = frames_with_dets[:max_prompt_frames] lines.append(f"Included frame summaries (showing frames with detections, up to {max_prompt_frames} entries):") if not include_list: lines.append("No frames with detections to list (video may be clear or detections are below threshold).") else: for f in include_list: fid = f.get("frame_index") dets = f.get("detections", []) det_texts = [] for d in dets: conf = d.get("confidence") conf_s = f"{conf:.2f}" if isinstance(conf, float) else str(conf) det_texts.append(f"{d.get('label')}({conf_s})") # compact visual metrics (if present) visual = f.get("visual") or {} metric_parts = [] if visual.get("brightness") is not None: metric_parts.append(f"bright={visual['brightness']:.0f}") if visual.get("contrast") is not None: metric_parts.append(f"contrast={visual['contrast']:.0f}") if visual.get("blur") is not None: metric_parts.append(f"blur_var={visual['blur']:.0f}") if visual.get("dominant_rgb"): metric_parts.append(f"dominant_rgb={visual['dominant_rgb']}") metrics = "; ".join(metric_parts) if metrics: lines.append(f"Frame {fid}: " + ", ".join(det_texts) + f" [{metrics}]") else: lines.append(f"Frame {fid}: " + ", ".join(det_texts)) lines.append("") lines.append("NOTE: Full per-frame visual descriptions and images are attached in the generated report files. If you need a fully exhaustive token-by-token per-frame prompt, set FULL_FRAME_REPORT and increase MAX_PROMPT_FRAMES (may exceed model token limits).") lines.append("") lines.append("Produce the report in plain text, 6-10 short paragraphs. Also include 1-2 short sentences summarizing why the listed frames are noteworthy (e.g., what the detection likely means).") return "\n".join(lines) @GPU_DECORATOR def generar_analisis_fuerte(media_path): """Generate strong analysis (PDF/MD/JSON) from a given media file path.""" if not media_path: return {"status": "no_input", "report_pdf": None, "report_md": None, "report_json": None} # Configurar OWL-V2 para detección try: GPTClass = _load_gptoss_wrapper() if GPTClass: wrapper = GPTClass() else: wrapper = None except Exception as e: print(f"Error configurando OWL-V2: {e}") wrapper = None tmpdir = tempfile.mkdtemp() frames = [] try: ext = os.path.splitext(media_path)[1].lower() # attempt to extract up to 3 frames/detections using the loaded YOLO model if ext in [".mp4", ".mov", ".avi", ".mkv"]: cap = cv2.VideoCapture(media_path) idx = 0 # Process all frames in the video. This may be expensive for long videos. # To limit processing, set the environment variable MAX_FRAMES to a positive integer. max_frames_env = os.getenv("MAX_FRAMES", "0") try: max_frames = int(max_frames_env) except Exception: max_frames = 0 if max_frames > 0: print(f"DEBUG: processing up to {max_frames} frames (MAX_FRAMES set)") else: print("DEBUG: processing all video frames for strong analysis (may be slow)...") # Sampling: process only every FRAME_STEP-th frame to reduce GPU load. try: frame_step = int(os.getenv("FRAME_STEP", "5")) if frame_step < 1: frame_step = 1 except Exception: frame_step = 5 while True: ret, frame = cap.read() if not ret: break # Save every frame image to disk (keeps consistent indexing) but only run # detection on sampled frames to lower compute usage. tmpf = os.path.join(tmpdir, f"frame_{idx}.jpg") cv2.imwrite(tmpf, frame) if idx % frame_step == 0: # Run OWL-V2 detection on sampled frame dets = [] if wrapper: try: detection_result = wrapper.detect_objects_owlv2(tmpf, get_all_queries(), threshold=0.1) detections = detection_result.get("detections", []) det_i = 0 full_img = cv2.imread(tmpf) h_full, w_full = (full_img.shape[:2] if full_img is not None else (0, 0)) for detection in detections: label = detection.get("label", "unknown") confv = detection.get("confidence", 0.0) bbox = detection.get("bbox", [0, 0, 0, 0]) x1, y1, x2, y2 = map(int, bbox) det = {"label": label, "confidence": confv, "bbox": [x1, y1, x2, y2]} # Save cropped detection image if possible try: if full_img is not None and x2 > x1 and y2 > y1: # clamp coords x1c = max(0, min(x1, w_full - 1)) x2c = max(0, min(x2, w_full)) y1c = max(0, min(y1, h_full - 1)) y2c = max(0, min(y2, h_full)) if x2c > x1c and y2c > y1c: crop = full_img[y1c:y2c, x1c:x2c] crop_path = os.path.join(tmpdir, f"frame_{idx}_det_{det_i}.jpg") cv2.imwrite(crop_path, crop) det["image"] = crop_path # compute visual features for the crop and attach det["crop_visual"] = compute_visual_features(crop_path, [det]) det_i += 1 except Exception: pass dets.append(det) except Exception as e: print(f"Error en detección OWL-V2 frame {idx}: {e}") dets = [] dets.append(det) det_i += 1 # Compute simple visual features for this saved frame visual = compute_visual_features(tmpf, dets) frames.append({"frame_index": idx, "detections": dets, "visual": visual, "image_path": tmpf}) else: # Non-sampled frame: still compute a cheap visual summary (no detections) visual = compute_visual_features(tmpf, []) frames.append({"frame_index": idx, "detections": [], "visual": visual, "image_path": tmpf}) idx += 1 if max_frames > 0 and idx >= max_frames: break cap.release() else: # single image dets = [] if wrapper: try: detection_result = wrapper.detect_objects_owlv2(media_path, get_all_queries(), threshold=0.1) detections = detection_result.get("detections", []) full_img = cv2.imread(media_path) h_full, w_full = (full_img.shape[:2] if full_img is not None else (0, 0)) det_i = 0 for detection in detections: label = detection.get("label", "unknown") confv = detection.get("confidence", 0.0) bbox = detection.get("bbox", [0, 0, 0, 0]) x1, y1, x2, y2 = map(int, bbox) det = {"label": label, "confidence": confv, "bbox": [x1, y1, x2, y2]} # Save cropped detection image if possible try: if full_img is not None and x2 > x1 and y2 > y1: x1c = max(0, min(x1, w_full - 1)) x2c = max(0, min(x2, w_full)) y1c = max(0, min(y1, h_full - 1)) y2c = max(0, min(y2, h_full)) if x2c > x1c and y2c > y1c: crop = full_img[y1c:y2c, x1c:x2c] crop_path = os.path.join(tmpdir, f"frame_0_det_{det_i}.jpg") cv2.imwrite(crop_path, crop) det["image"] = crop_path det["crop_visual"] = compute_visual_features(crop_path, [det]) det_i += 1 except Exception: pass dets.append(det) except Exception as e: print(f"Error en detección OWL-V2 imagen: {e}") dets = [] # Compute visual features for single image visual = compute_visual_features(media_path, dets) frames.append({"frame_index": 0, "detections": dets, "visual": visual, "image_path": media_path}) prompt = _build_prompt(frames) GPTClass = _load_gptoss_wrapper() narrative = None if GPTClass: try: # Allow overriding model via env var MODEL_ID (e.g. "openai/gpt-oss-120b:fireworks-ai") model_id = os.getenv("MODEL_ID", "gpt-oss-120") print(f"DEBUG: [gpt] using model_id={model_id}, HF_USE_ROUTER={os.getenv('HF_USE_ROUTER')}") wrapper = GPTClass(model=model_id) # DEBUG: print prompt (truncated) so Space logs show the request try: print("DEBUG: [gpt] sending prompt (truncated 2000 chars):") print(prompt[:2000]) except Exception: print("DEBUG: [gpt] (failed to print prompt)") narrative = wrapper.generate(prompt) # DEBUG: print a truncated portion of the response try: print("DEBUG: [gpt] response (truncated 2000 chars):") print((narrative or "")[:2000]) except Exception: print("DEBUG: [gpt] (failed to print response)") except Exception as e: narrative = f"(GPT call failed) {e}" print("DEBUG: [gpt] call failed:", e) else: narrative = "(GPT wrapper unavailable) Fallback summary:\n" counts = {} for f in frames: for d in f.get("detections", []): counts[d["label"]] = counts.get(d["label"], 0) + 1 narrative += "Detected classes: " + ", ".join([f"{k}({v})" for k, v in counts.items()]) if counts else "No detections" # Write Markdown report_md = os.path.join(tmpdir, "report.md") with open(report_md, "w", encoding="utf-8") as md: md.write("# Informe de inspección (Generar analisis fuerte)\n\n") md.write(narrative or "Sin narrativa disponible.\n\n") md.write("\n## Per-frame detections\n\n") for f in frames: fid = f.get("frame_index") md.write(f"- Frame {fid}:\n") dets = f.get("detections", []) if not dets: md.write(" No detections\n") else: for i, d in enumerate(dets): md.write(f" - {d.get('label')}({d.get('confidence')}) bbox={d.get('bbox')}\n") if d.get("image"): # Embed the cropped detection image md.write(f" })\n") # Add crop visual description if available cviz = d.get("crop_visual") if cviz and cviz.get("description"): md.write(f" Description: {cviz.get('description')}\n") # Write JSON report_json = os.path.join(tmpdir, "report.json") with open(report_json, "w", encoding="utf-8") as jf: json.dump({"narrative": narrative, "frames": frames}, jf, indent=2) # Write PDF report_pdf = os.path.join(tmpdir, "report.pdf") _write_pdf(report_pdf, "Informe de inspección - Generar analisis fuerte", narrative, frames) return {"status": "done", "report_pdf": report_pdf, "report_md": report_md, "report_json": report_json} except Exception as e: return {"status": f"error: {e}", "report_pdf": None, "report_md": None, "report_json": None} # ──────────────────────────── with gr.Blocks( title="KESHERAT AI - Inspección Inteligente de Turbinas Eólicas", theme=gr.themes.Soft(), css=""" /* ===== DISEÑO COMPLETAMENTE NUEVO Y LIMPIO ===== */ /* Reset global */ * { box-sizing: border-box !important; text-shadow: none !important; } /* Fondo blanco limpio */ body, html, .gradio-container { background: #ffffff !important; color: #212529 !important; font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif !important; } .gradio-container { max-width: 1400px !important; margin: 0 auto !important; padding: 20px !important; } /* ===== HEADER AZUL ===== */ .main-header { background: linear-gradient(135deg, #0d6efd 0%, #0b5ed7 100%) !important; color: white !important; padding: 30px !important; border-radius: 15px !important; margin-bottom: 30px !important; text-align: center !important; box-shadow: 0 4px 20px rgba(13, 110, 253, 0.2) !important; } .main-header h1 { color: white !important; font-size: 2.5rem !important; font-weight: 700 !important; margin-bottom: 10px !important; text-shadow: 1px 1px 3px rgba(0,0,0,0.2) !important; } .main-header p { color: rgba(255,255,255,0.9) !important; font-size: 1.1rem !important; margin: 0 !important; } /* ===== SLIDERS: CARDS AZUL CLARO ===== */ [data-testid*="slider"] { background: #e3f2fd !important; border: 2px solid #bbdefb !important; border-radius: 12px !important; padding: 20px !important; margin: 10px 0 !important; box-shadow: 0 2px 10px rgba(0,0,0,0.05) !important; } [data-testid*="slider"] * { color: #0d47a1 !important; font-weight: 500 !important; } [data-testid*="slider"] label { color: #0d47a1 !important; font-size: 1.1rem !important; font-weight: 600 !important; margin-bottom: 8px !important; } [data-testid*="slider"] .gr-info { color: #1565c0 !important; font-size: 0.9rem !important; margin-top: 5px !important; } /* ===== BOTONES ===== */ .gr-button { background: linear-gradient(135deg, #0d6efd 0%, #0b5ed7 100%) !important; color: white !important; border: none !important; border-radius: 8px !important; padding: 12px 24px !important; font-weight: 600 !important; transition: all 0.3s ease !important; } .gr-button:hover { background: linear-gradient(135deg, #0b5ed7 0%, #0a58ca 100%) !important; transform: translateY(-1px) !important; box-shadow: 0 4px 15px rgba(13, 110, 253, 0.3) !important; } /* ===== TABS ===== */ .gr-tab-nav { background: #f8f9fa !important; border-radius: 10px !important; padding: 5px !important; margin-bottom: 20px !important; } .gr-tab-nav button { background: transparent !important; color: #495057 !important; border: none !important; border-radius: 6px !important; padding: 10px 20px !important; font-weight: 500 !important; transition: all 0.3s ease !important; } .gr-tab-nav button.selected { background: #0d6efd !important; color: white !important; box-shadow: 0 2px 8px rgba(13, 110, 253, 0.3) !important; } /* ===== INPUTS Y TEXTBOXES ===== */ .gr-textbox, .gr-dropdown, input, textarea, select { background: #ffffff !important; color: #212529 !important; border: 2px solid #e9ecef !important; border-radius: 8px !important; padding: 12px !important; font-size: 1rem !important; } .gr-textbox:focus, .gr-dropdown:focus, input:focus, textarea:focus, select:focus { border-color: #0d6efd !important; box-shadow: 0 0 0 3px rgba(13, 110, 253, 0.1) !important; outline: none !important; } /* ===== CARDS Y CONTENEDORES ===== */ .gr-group, .gr-form, .gr-box { background: #ffffff !important; border: 1px solid #e9ecef !important; border-radius: 12px !important; padding: 20px !important; margin: 10px 0 !important; box-shadow: 0 2px 10px rgba(0,0,0,0.05) !important; } /* ===== MARKDOWN Y TEXTO ===== */ .gr-markdown h1, .gr-markdown h2, .gr-markdown h3, .gr-markdown h4, .gr-markdown h5, .gr-markdown h6 { color: #212529 !important; font-weight: 600 !important; margin-bottom: 15px !important; } .gr-markdown p, .gr-markdown span, .gr-markdown div { color: #495057 !important; line-height: 1.6 !important; } /* ===== NOTIFICACIONES ===== */ .toast, .notification, .alert { background: #ffffff !important; color: #212529 !important; border: 1px solid #dee2e6 !important; border-radius: 8px !important; padding: 15px !important; box-shadow: 0 4px 20px rgba(0,0,0,0.1) !important; } .toast.success { background: #d4edda !important; color: #155724 !important; border-color: #c3e6cb !important; } .toast.error { background: #f8d7da !important; color: #721c24 !important; border-color: #f5c6cb !important; } .toast.warning { background: #fff3cd !important; color: #856404 !important; border-color: #ffeaa7 !important; } .toast.info { background: #d1ecf1 !important; color: #0c5460 !important; border-color: #bee5eb !important; } /* ===== SECCIÓN DE LOGIN - CONTRASTE MEJORADO ===== */ .section-container { background: #ffffff !important; border: 1px solid #dee2e6 !important; border-radius: 12px !important; padding: 25px !important; margin: 20px 0 !important; box-shadow: 0 4px 15px rgba(0,0,0,0.08) !important; } .section-container h2 { color: #212529 !important; font-weight: 700 !important; font-size: 1.5rem !important; margin-bottom: 20px !important; } .section-container p { color: #212529 !important; font-weight: 500 !important; font-size: 16px !important; margin-bottom: 20px !important; line-height: 1.5 !important; } /* Labels y texto de ayuda en inputs */ .gr-textbox label, .gr-file label, .gr-dropdown label { color: #212529 !important; font-weight: 600 !important; font-size: 1rem !important; margin-bottom: 8px !important; } .gr-textbox .gr-info, .gr-file .gr-info, .gr-dropdown .gr-info { color: #495057 !important; font-weight: 500 !important; font-size: 0.9rem !important; margin-top: 5px !important; } /* ===== OVERRIDE FINAL ===== */ /* Asegurar que nada sobrescriba nuestros estilos */ [data-testid*="slider"] { background: #e3f2fd !important; border: 2px solid #bbdefb !important; } [data-testid*="slider"] *, [data-testid*="slider"] label, [data-testid*="slider"] .gr-info, [data-testid*="slider"] p, [data-testid*="slider"] span { color: #0d47a1 !important; } """ ) as demo: # Header principal mejorado gr.HTML("""
Sistema Inteligente de Inspección para Turbinas Eólicas
Bienvenido a KESHERAT AI. Para comenzar el análisis inteligente de turbinas eólicas, introduce tu token de acceso autorizado.
KESHERAT AI utiliza tecnología avanzada de inteligencia artificial para detectar automáticamente defectos en palas de turbinas eólicas. El sistema es seguro y todos los análisis se procesan de forma confidencial.
Formatos soportados: MP4, MOV, AVI, MKV | Tamaño máximo recomendado: 500MB
Formatos soportados: JPG, PNG, BMP | Resolución recomendada: mínimo 1024x768px
Ajusta estos valores para controlar qué tan sensible es la detección para cada tipo de defecto. Valores más bajos = más sensible (detecta más objetos), valores más altos = menos sensible (solo objetos muy claros).
Los cambios se aplican automáticamente. Valores recomendados para principiantes están preseleccionados.
Genera reportes profesionales en múltiples formatos para documentar los resultados de la inspección. Nota: Primero debes analizar un archivo antes de generar reportes.
Visualiza métricas de uso del sistema, estadísticas de detección y rendimiento general.
Analizando tu archivo con tecnología de IA avanzada
Tiempo estimado: 30-60 segundos
Usa imágenes nítidas y bien iluminadas para mejores detecciones
Acércate a las áreas de interés para análisis más precisos
Ajusta la sensibilidad según tus necesidades específicas
🚀 KESHERAT AI - Sistema Inteligente de Inspección para Turbinas Eólicas
Tecnología avanzada de IA para detección automática de defectos