Kesheratmex
**Add GPT‑4 Vision support with detailed prompt and fallback**
199293f
raw
history blame
48.9 kB
import gradio as gr
import tempfile
import json
import shutil
import os
import cv2
import numpy as np
import torch
import importlib
import requests
import textwrap
# Optional PDF reporting: import reportlab safely and set a flag.
# REPORTLAB_AVAILABLE will be used by _write_pdf to select the PDF code path.
try:
from reportlab.lib.pagesizes import A4
from reportlab.pdfgen import canvas
REPORTLAB_AVAILABLE = True
except Exception:
REPORTLAB_AVAILABLE = False
# ZeroGPU: decorador para marcar funciones GPU. Fallback local si no existe
try:
import spaces # provisto en HF Spaces
GPU_DECORATOR = spaces.GPU
except Exception:
def GPU_DECORATOR(func=None, **kwargs):
# Permite usar @GPU_DECORATOR o @GPU_DECORATOR(...)
if func is None:
def wrap(f):
return f
return wrap
return func
# ────────────────────────────
# Configuración
# ────────────────────────────
os.environ["OMP_NUM_THREADS"] = "1" # evita warnings de OpenMP
# Do NOT initialize the model at import time in Spaces with Stateless GPU.
# Lazily load the YOLO model inside worker processes (functions decorated
# with @GPU_DECORATOR) to avoid initializing CUDA in the main process.
model = None
def get_model():
"""Lazily load and return the YOLO model.
Important: call this from inside worker processes (e.g. functions decorated
with @GPU_DECORATOR) so CUDA is initialized in the worker and not in the
main process (required for HF Spaces with Stateless GPU).
"""
global model
if model is None:
# Import YOLO inside the function to avoid importing ultralytics (which
# may touch CUDA) at module import time.
from ultralytics import YOLO
model = YOLO("best2.pt")
return model
# ────────────────────────────
# Métricas simples (persistidas en /tmp)
# ────────────────────────────
METRICS_PATH = os.path.join(tempfile.gettempdir(), "blade_metrics.json")
def _load_metrics():
try:
if os.path.exists(METRICS_PATH):
with open(METRICS_PATH, "r", encoding="utf-8") as f:
return json.load(f)
except Exception:
pass
return {
"total_jobs": 0,
"videos": 0,
"images": 0,
"detections_total": 0,
"per_label": {},
"last_job": None,
}
def _save_metrics(m):
try:
with open(METRICS_PATH, "w", encoding="utf-8") as f:
json.dump(m, f, ensure_ascii=False, indent=2)
except Exception:
pass
def _record_metrics(job_type, counts):
m = _load_metrics()
m["total_jobs"] += 1
if job_type == "video":
m["videos"] += 1
elif job_type == "image":
m["images"] += 1
dets = int(sum(counts.values())) if isinstance(counts, dict) else 0
m["detections_total"] += dets
# per label aggregate
if isinstance(counts, dict):
per = m.get("per_label", {})
for k, v in counts.items():
per[k] = int(per.get(k, 0)) + int(v)
m["per_label"] = per
m["last_job"] = {"type": job_type, "detections": dets}
_save_metrics(m)
def get_metrics():
"""Devuelve el snapshot actual de métricas."""
return _load_metrics()
# ────────────────────────────
# Funciones de Inferencia
# ────────────────────────────
@GPU_DECORATOR
def infer_media(media_path, conf=0.25, iou=0.45, out_res="720p", preset="default"):
"""
Procesa un fichero de vídeo o imagen con parámetros configurables.
Retornos:
- Vídeo: {"video": out_vid_path, "classes": {label: count, ...}}
- Imagen: {"path": out_img_path, "classes": {label: count, ...}}
"""
if not media_path:
# Si no hay entrada (p.ej., se pulsó el botón en la otra pestaña), no fallar.
return {}
# Ensure the model is loaded inside the worker process (do not load at import time)
global model
model = get_model()
# Dispositivo: usar GPU si está disponible (ZeroGPU)
device = 0 if torch.cuda.is_available() else "cpu"
use_half = device != "cpu"
ext = os.path.splitext(media_path)[1].lower()
tmpdir = tempfile.mkdtemp()
# Resolución objetivo
res_map = {"360p": (640, 360), "480p": (854, 480), "720p": (1280, 720)}
target_size = res_map.get(out_res)
# ─ Vídeo ───────────────────────────────────────────────────────
if ext in [".mp4", ".mov", ".avi", ".mkv"]:
in_vid = os.path.join(tmpdir, "in.mp4")
out_vid = os.path.join(tmpdir, "out.mp4")
shutil.copy(media_path, in_vid)
# FPS del vídeo (opcional: tomar real si existe)
cap = cv2.VideoCapture(in_vid)
fps = cap.get(cv2.CAP_PROP_FPS) or 30
try:
fps = float(fps)
if fps <= 0 or fps != fps: # NaN check
fps = 30
except Exception:
fps = 30
writer = None
counts = {}
# Streaming de frames con anotaciones y conteo por clase
results = model.predict(source=in_vid, conf=conf, iou=iou, stream=True, device=device)
for r in results:
# acumular conteos
for b in r.boxes:
label = model.names[int(b.cls[0])]
counts[label] = counts.get(label, 0) + 1
annotated = r.plot() # frame anotado
if target_size:
annotated = cv2.resize(annotated, target_size)
if writer is None:
h, w = annotated.shape[:2]
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
writer = cv2.VideoWriter(out_vid, fourcc, fps, (w, h))
writer.write(annotated)
if writer:
writer.release()
if cap:
cap.release()
# registrar métricas
_record_metrics("video", counts)
return {"video": out_vid, "classes": counts}
# ─ Imagen ──────────────────────────────────────────────────────
elif ext in [".jpg", ".jpeg", ".png", ".bmp"]:
img = cv2.imread(media_path)
results = model.predict(source=media_path, conf=conf, iou=iou, save=False, device=device)
counts = {}
# Dibujamos cajas manualmente y contamos
for box in results[0].boxes:
x1, y1, x2, y2 = map(int, box.xyxy[0])
cls_id = int(box.cls[0])
label = model.names[cls_id]
counts[label] = counts.get(label, 0) + 1
# rectángulo
cv2.rectangle(img, (x1, y1), (x2, y2), (0, 255, 0), 2)
# texto
cv2.putText(img, label, (x1, y1 - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2)
if target_size:
img = cv2.resize(img, target_size)
out_path = os.path.join(tmpdir, "annotated.png")
cv2.imwrite(out_path, img)
# registrar métricas
_record_metrics("image", counts)
return {"path": out_path, "classes": counts}
else:
raise ValueError(f"Formato no soportado: {ext}")
def show_classes():
"""Devuelve las clases que el modelo conoce. Carga el modelo si no está listo."""
try:
m = get_model()
names = m.names
if isinstance(names, dict):
class_list = [names[k] for k in sorted(names)]
else:
class_list = names
return ", ".join(class_list)
except Exception:
return "Model not loaded"
# Funciones auxiliares para extraer el recurso de salida desde el dict
def _extract_video(d):
return (d.get("video") if isinstance(d, dict) else d)
def _extract_path(d):
return (d.get("path") if isinstance(d, dict) else d)
def analyze_image_with_gpt(image_path, detections_summary=""):
"""
Analiza una imagen directamente con GPT-4 Vision para obtener observaciones visuales
que el modelo YOLO podría haber perdido.
"""
try:
GPTClass = _load_gptoss_wrapper()
if not GPTClass:
return "Análisis de IA no disponible (GPT wrapper no configurado)"
# Construir prompt en español para análisis visual directo con GPT-4 Vision
prompt = f"""Eres un experto en inspección de palas de aerogeneradores. Analiza visualmente esta imagen de una pala de aerogenerador y proporciona un análisis detallado en español.
DETECCIONES AUTOMÁTICAS DEL MODELO YOLO:
{detections_summary if detections_summary else "No se detectaron defectos automáticamente"}
INSTRUCCIONES PARA TU ANÁLISIS VISUAL:
Observa cuidadosamente la imagen y describe:
1. **Condición general de la superficie**: Color, textura, acabado, limpieza
2. **Borde de ataque (leading edge)**: Estado, erosión, daños, desgaste
3. **Borde de salida (trailing edge)**: Integridad, grietas, deformaciones
4. **Superficie principal**: Grietas, decoloración, impactos, reparaciones previas
5. **Elementos estructurales**: Uniones, tornillos, conexiones visibles
6. **Contaminación**: Suciedad, hielo, vegetación, residuos
7. **Daños específicos**: Impactos de rayos, aves, granizo, desgaste UV
COMPARACIÓN CON DETECCIONES AUTOMÁTICAS:
- Confirma o refuta las detecciones del modelo YOLO
- Identifica defectos que YOLO pudo haber perdido
- Evalúa la severidad de los defectos detectados
CONTEXTO DE DEFECTOS COMUNES:
- **Dirt/Suciedad**: Acumulación que reduce eficiencia aerodinámica
- **Erosion**: Desgaste del borde de ataque por partículas
- **Cracks/Grietas**: Fisuras estructurales críticas
- **Lightning damage**: Daños por descargas eléctricas
- **Ice**: Formación de hielo estacional
- **Bird strikes**: Impactos de aves
- **UV degradation**: Decoloración por radiación solar
IMPORTANTE:
- Responde SOLO en español
- Describe específicamente lo que VES en la imagen
- Sé preciso sobre ubicaciones (izquierda, derecha, centro, bordes)
- Menciona colores, texturas, patrones específicos
- Evalúa la severidad de cada problema observado
Formato de respuesta:
## 🔍 Análisis Visual Directo de la Pala
**Estado General:** [tu evaluación visual del estado]
**Observaciones Específicas:**
[describe detalladamente lo que ves en cada área]
**Defectos Identificados Visualmente:**
[lista específica de problemas que observas]
**Comparación con Detección Automática:**
[confirma/refuta/complementa las detecciones YOLO]
**Severidad y Prioridades:**
[evalúa qué problemas son más críticos]
**Recomendaciones de Mantenimiento:**
[acciones específicas basadas en lo observado]
"""
# Configurar modelo de visión
vision_model_id = os.getenv("VISION_MODEL_ID", "Qwen/Qwen2-VL-7B-Instruct")
model_id = os.getenv("MODEL_ID", vision_model_id)
wrapper = GPTClass(model=model_id)
# Intentar usar análisis de imágenes (GPT-4 Vision o Qwen2-VL)
try:
print(f"DEBUG: Intentando análisis de imagen con modelo: {model_id}")
analysis = wrapper.analyze_image(image_path, prompt, max_tokens=1200, temperature=0.2)
return analysis
except RuntimeError as vision_error:
# Si el análisis de visión no está disponible, usar análisis basado en características
print(f"DEBUG: Análisis de visión no disponible: {vision_error}")
return _fallback_technical_analysis(image_path, detections_summary, wrapper)
except Exception as e:
return f"Error en el análisis de IA: {str(e)}"
def _fallback_technical_analysis(image_path, detections_summary, wrapper):
"""
Análisis de respaldo basado en características técnicas cuando GPT-4 Vision no está disponible.
"""
try:
# Obtener características visuales básicas de la imagen
visual_features = compute_visual_features(image_path, [])
# Construir descripción técnica detallada
technical_desc = "Análisis basado en características técnicas de la imagen:\n"
if visual_features:
brightness = visual_features.get("brightness", 0)
contrast = visual_features.get("contrast", 0)
blur = visual_features.get("blur", 0)
dominant_rgb = visual_features.get("dominant_rgb", [])
width = visual_features.get("width", 0)
height = visual_features.get("height", 0)
technical_desc += f"- Resolución: {width}x{height} píxeles\n"
technical_desc += f"- Brillo promedio: {brightness:.1f}/255 "
technical_desc += ("(imagen brillante)" if brightness > 130 else "(imagen tenue)" if brightness < 80 else "(iluminación normal)")
technical_desc += f"\n- Contraste: {contrast:.1f} "
technical_desc += ("(alto contraste)" if contrast > 60 else "(bajo contraste)" if contrast < 30 else "(contraste normal)")
technical_desc += f"\n- Nitidez: {blur:.1f} "
technical_desc += ("(imagen nítida)" if blur > 100 else "(imagen borrosa)")
if dominant_rgb:
technical_desc += f"\n- Color dominante: RGB{dominant_rgb}"
# Interpretar colores dominantes
r, g, b = dominant_rgb
if r > 150 and g > 150 and b > 150:
technical_desc += " (tonos claros/blancos - superficie limpia)"
elif r < 100 and g < 100 and b < 100:
technical_desc += " (tonos oscuros - posible suciedad o sombras)"
elif r > g and r > b:
technical_desc += " (tonos rojizos - posible oxidación)"
elif g > r and g > b:
technical_desc += " (tonos verdosos - posible vegetación/algas)"
elif b > r and b > g:
technical_desc += " (tonos azulados - superficie normal)"
# Prompt modificado para análisis técnico
fallback_prompt = f"""Eres un experto en inspección de palas de aerogeneradores. Basándote en los datos técnicos de la imagen y las detecciones automáticas, proporciona un análisis detallado en español.
{technical_desc}
DETECCIONES AUTOMÁTICAS DEL MODELO YOLO:
{detections_summary if detections_summary else "No se detectaron defectos automáticamente"}
NOTA: Este análisis se basa en características técnicas extraídas de la imagen ya que el análisis visual directo no está disponible.
Proporciona un análisis experto interpretando estos datos técnicos en el contexto de inspección de palas de aerogeneradores.
Formato de respuesta:
## 🔍 Análisis Técnico de la Pala
**Estado General:** [evaluación basada en datos técnicos]
**Interpretación de Características:**
[qué indican los valores técnicos sobre la condición]
**Análisis de Detecciones:**
[interpretación de cada defecto detectado por YOLO]
**Recomendaciones:**
[acciones específicas recomendadas]
"""
analysis = wrapper.generate(fallback_prompt, max_tokens=800, temperature=0.3)
return f"⚠️ **Análisis técnico** (análisis visual directo no disponible)\n\n{analysis}"
except Exception as e:
return f"Error en análisis de respaldo: {str(e)}"
def _check_token(token: str):
"""Token gate for public app. Expected token via env APP_ACCESS_TOKEN or KESHERAT_TOKEN.
Defaults to 'KESHERAT' if none provided.
Returns visibility updates for [gate_group, app_group, gate_status]."""
expected = os.getenv("APP_ACCESS_TOKEN") or os.getenv("KESHERAT_TOKEN") or "KESHERAT"
ok = str(token or "").strip() == str(expected).strip()
if ok:
return gr.update(visible=False), gr.update(visible=True), gr.update(visible=False, value="")
else:
return gr.update(visible=True), gr.update(visible=False), gr.update(visible=True, value="Token inválido. Intenta nuevamente.")
def compute_visual_features(image_path, detections=None):
"""Compute simple visual features and return a short description plus numeric metrics.
Returns a dict with keys:
- width, height
- brightness (mean grayscale)
- contrast (std grayscale)
- blur (variance of Laplacian; lower = blurrier)
- dominant_rgb (tuple)
- object_count
- avg_bbox_area
- description (short natural language sentence)
"""
try:
img = cv2.imread(image_path)
if img is None:
return {}
h, w = img.shape[:2]
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
brightness = float(np.mean(gray))
contrast = float(np.std(gray))
lap = cv2.Laplacian(gray, cv2.CV_64F)
blur = float(np.var(lap))
# Mean color as a simple dominant color proxy (convert BGR -> RGB)
mean_bgr = cv2.mean(img)[:3]
dominant_rgb = (int(mean_bgr[2]), int(mean_bgr[1]), int(mean_bgr[0]))
obj_counts = 0
avg_bbox_area = 0.0
if detections:
obj_counts = len(detections)
areas = []
for d in detections:
bbox = d.get("bbox", [0, 0, 0, 0])
try:
area = max(0, (bbox[2] - bbox[0]) * (bbox[3] - bbox[1]))
except Exception:
area = 0
areas.append(area)
if areas:
avg_bbox_area = float(sum(areas) / len(areas))
# Human-friendly descriptors
bright_desc = "bright" if brightness > 130 else ("dim" if brightness < 80 else "moderately lit")
contrast_desc = "high contrast" if contrast > 60 else ("low contrast" if contrast < 30 else "moderate contrast")
blur_desc = "blurry" if blur < 100 else "sharp"
desc = f"Image appears {bright_desc}, with {contrast_desc}, and is {blur_desc}. Dominant color approx RGB{dominant_rgb}. Detected {obj_counts} objects in view."
return {
"width": w,
"height": h,
"brightness": brightness,
"contrast": contrast,
"blur": blur,
"dominant_rgb": dominant_rgb,
"object_count": obj_counts,
"avg_bbox_area": avg_bbox_area,
"description": desc,
}
except Exception:
return {}
# ────────────────────────────
# Helpers for multimodal reporting (PDF/MD/JSON)
# ────────────────────────────
def _write_pdf(path: str, title: str, narrative: str, frames):
"""
Write a wrapped, layout-friendly PDF. This version increases margins,
reduces font sizes, and wraps long lines to avoid cutting text.
"""
if REPORTLAB_AVAILABLE:
c = canvas.Canvas(path, pagesize=A4)
width, height = A4
margin = 60
y = height - margin
# Fonts and sizes
title_font = "Helvetica-Bold"
body_font = "Helvetica"
small_font = "Helvetica"
title_size = 13
body_size = 9
small_size = 8
line_height = body_size * 1.18
small_line_height = small_size * 1.12
def wrap_text(text, font_size, max_width):
approx_char_width = font_size * 0.55
max_chars = max(30, int(max_width / approx_char_width))
out = []
for para in str(text or "").splitlines():
wrapped = textwrap.wrap(para, width=max_chars)
out.extend(wrapped if wrapped else [""])
return out
# Title
c.setFont(title_font, title_size)
for tline in wrap_text(title, title_size, width - 2 * margin):
if y < margin + title_size * 1.5:
c.showPage()
y = height - margin
c.setFont(title_font, title_size)
c.drawString(margin, y, tline)
y -= title_size * 1.25
y -= 6
# Narrative
c.setFont(body_font, body_size)
for line in wrap_text(narrative or "", body_size, width - 2 * margin):
if y < margin + line_height:
c.showPage()
y = height - margin
c.setFont(body_font, body_size)
c.drawString(margin, y, line)
y -= line_height
y -= 8
c.setFont("Helvetica-Bold", 11)
if y < margin + 30:
c.showPage()
y = height - margin
c.setFont("Helvetica-Bold", 11)
c.drawString(margin, y, "Per-frame detections:")
y -= 14
c.setFont(small_font, small_size)
for f in frames:
if y < margin + 90:
c.showPage()
y = height - margin
c.setFont(small_font, small_size)
c.drawString(margin, y, f"Frame {f.get('frame_index')}:")
y -= small_line_height
dets = f.get("detections", [])
if not dets:
if y < margin + small_line_height:
c.showPage()
y = height - margin
c.setFont(small_font, small_size)
c.drawString(margin + 12, y, "No detections")
y -= small_line_height
else:
for d in dets:
det_text = f"- {d.get('label')} | conf={d.get('confidence')} | bbox={d.get('bbox')}"
text_max_width = width - 2 * margin - 140
for dl in wrap_text(det_text, small_size, text_max_width):
if y < margin + small_line_height:
c.showPage()
y = height - margin
c.setFont(small_font, small_size)
c.drawString(margin + 12, y, dl)
y -= small_line_height
try:
img_path = d.get("image")
if img_path and os.path.exists(img_path):
img_w = 110
img_h = 65
if y < margin + img_h + 20:
c.showPage()
y = height - margin
c.setFont(small_font, small_size)
x_img = width - margin - img_w
y_img = y - img_h + 6
c.drawImage(img_path, x_img, y_img, width=img_w, height=img_h, preserveAspectRatio=True, mask='auto')
crop_desc = None
if isinstance(d.get("crop_visual"), dict):
crop_desc = d["crop_visual"].get("description")
if crop_desc:
cd_lines = wrap_text(crop_desc, small_size, img_w)
text_y = y_img - 12
for cd in cd_lines:
if text_y < margin + 20:
c.showPage()
y = height - margin
text_y = y - img_h - 12
c.setFont(small_font, small_size)
c.drawString(x_img, text_y, cd)
text_y -= small_line_height
y = y - img_h - 8
except Exception:
pass
c.save()
return
# Fallback plain-text write if ReportLab unavailable
with open(path, "w", encoding="utf-8") as f:
f.write(title + "\n\n")
f.write((narrative or "") + "\n\n")
f.write("Per-frame detections:\n")
for fr in frames:
f.write(f"Frame {fr.get('frame_index')}:\n")
dets = fr.get("detections", [])
if not dets:
f.write(" No detections\n")
else:
for d in dets:
f.write(f" - {d}\n")
def _load_gptoss_wrapper():
"""
Load the blade-inspection-demo/gptoss_wrapper.py module by filepath so we don't rely on package imports.
"""
try:
base = os.path.dirname(__file__)
wrapper_path = os.path.join(base, "blade-inspection-demo", "gptoss_wrapper.py")
if not os.path.exists(wrapper_path):
# fallback: maybe file already at project root
wrapper_path = os.path.join(base, "gptoss_wrapper.py")
spec = importlib.util.spec_from_file_location("gptoss_wrapper", wrapper_path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return getattr(module, "GPTOSSWrapper", None)
except Exception as e:
# Print diagnostic info to Space logs so we can see why the wrapper failed to import.
print(f"DEBUG: failed to load GPT wrapper from {wrapper_path}: {e}")
import traceback
traceback.print_exc()
return None
def _build_prompt(frames):
"""
Build a compact prompt that summarizes the entire video while keeping prompt
size bounded. We include:
- video-level totals (frames, total detections, counts per class)
- a concise list of frames that contain detections (frame index + short det summary)
- an optional compact aggregate of visual metrics for the whole video
The detailed per-frame visual descriptions remain in the report files (MD/PDF/JSON)
but are not expanded fully in the prompt to avoid token limits.
"""
# Configs (env vars)
try:
max_prompt_frames = int(os.getenv("MAX_PROMPT_FRAMES", "200"))
except Exception:
max_prompt_frames = 200
total_frames = len(frames)
total_detections = sum(len(f.get("detections", [])) for f in frames)
# Aggregate counts per label and collect frames with detections
counts = {}
frames_with_dets = []
for f in frames:
dets = f.get("detections", [])
if dets:
frames_with_dets.append(f)
for d in dets:
counts[d.get("label")] = counts.get(d.get("label"), 0) + 1
lines = []
lines.append("You are an expert inspection assistant for wind turbine blade images/videos.")
lines.append(f"This video contains {total_frames} frames and {total_detections} total detections.")
if counts:
lines.append("Total detections by class: " + ", ".join([f"{k}({v})" for k, v in counts.items()]))
else:
lines.append("No detections were found in analyzed frames.")
lines.append("")
lines.append("Instructions: Based on the aggregate information and the selected frame summaries below, produce a concise inspection report that includes:")
lines.append("- Summary of main findings")
lines.append("- Suggested severity (low/medium/high) when appropriate")
lines.append("- Recommended next steps for inspection/repair")
lines.append("")
# Include up to max_prompt_frames frames that have detections (prioritize them)
include_list = frames_with_dets[:max_prompt_frames]
lines.append(f"Included frame summaries (showing frames with detections, up to {max_prompt_frames} entries):")
if not include_list:
lines.append("No frames with detections to list (video may be clear or detections are below threshold).")
else:
for f in include_list:
fid = f.get("frame_index")
dets = f.get("detections", [])
det_texts = []
for d in dets:
conf = d.get("confidence")
conf_s = f"{conf:.2f}" if isinstance(conf, float) else str(conf)
det_texts.append(f"{d.get('label')}({conf_s})")
# compact visual metrics (if present)
visual = f.get("visual") or {}
metric_parts = []
if visual.get("brightness") is not None:
metric_parts.append(f"bright={visual['brightness']:.0f}")
if visual.get("contrast") is not None:
metric_parts.append(f"contrast={visual['contrast']:.0f}")
if visual.get("blur") is not None:
metric_parts.append(f"blur_var={visual['blur']:.0f}")
if visual.get("dominant_rgb"):
metric_parts.append(f"dominant_rgb={visual['dominant_rgb']}")
metrics = "; ".join(metric_parts)
if metrics:
lines.append(f"Frame {fid}: " + ", ".join(det_texts) + f" [{metrics}]")
else:
lines.append(f"Frame {fid}: " + ", ".join(det_texts))
lines.append("")
lines.append("NOTE: Full per-frame visual descriptions and images are attached in the generated report files. If you need a fully exhaustive token-by-token per-frame prompt, set FULL_FRAME_REPORT and increase MAX_PROMPT_FRAMES (may exceed model token limits).")
lines.append("")
lines.append("Produce the report in plain text, 6-10 short paragraphs. Also include 1-2 short sentences summarizing why the listed frames are noteworthy (e.g., what the detection likely means).")
return "\n".join(lines)
@GPU_DECORATOR
def generar_analisis_fuerte(media_path):
"""Generate strong analysis (PDF/MD/JSON) from a given media file path."""
if not media_path:
return {"status": "no_input", "report_pdf": None, "report_md": None, "report_json": None}
# Ensure the model is loaded inside the worker process
global model
model = get_model()
tmpdir = tempfile.mkdtemp()
frames = []
try:
ext = os.path.splitext(media_path)[1].lower()
# attempt to extract up to 3 frames/detections using the loaded YOLO model
if ext in [".mp4", ".mov", ".avi", ".mkv"]:
cap = cv2.VideoCapture(media_path)
idx = 0
# Process all frames in the video. This may be expensive for long videos.
# To limit processing, set the environment variable MAX_FRAMES to a positive integer.
max_frames_env = os.getenv("MAX_FRAMES", "0")
try:
max_frames = int(max_frames_env)
except Exception:
max_frames = 0
if max_frames > 0:
print(f"DEBUG: processing up to {max_frames} frames (MAX_FRAMES set)")
else:
print("DEBUG: processing all video frames for strong analysis (may be slow)...")
# Sampling: process only every FRAME_STEP-th frame to reduce GPU load.
try:
frame_step = int(os.getenv("FRAME_STEP", "5"))
if frame_step < 1:
frame_step = 1
except Exception:
frame_step = 5
while True:
ret, frame = cap.read()
if not ret:
break
# Save every frame image to disk (keeps consistent indexing) but only run
# detection on sampled frames to lower compute usage.
tmpf = os.path.join(tmpdir, f"frame_{idx}.jpg")
cv2.imwrite(tmpf, frame)
if idx % frame_step == 0:
# Run detection on sampled frame
results = model.predict(source=tmpf, conf=0.25, iou=0.45)
dets = []
if results and len(results) > 0:
det_i = 0
full_img = cv2.imread(tmpf)
h_full, w_full = (full_img.shape[:2] if full_img is not None else (0, 0))
for box in results[0].boxes:
try:
cls_id = int(box.cls[0])
label = model.names[cls_id]
except Exception:
label = "object"
try:
x1, y1, x2, y2 = map(int, box.xyxy[0])
except Exception:
x1 = y1 = x2 = y2 = 0
try:
confv = float(box.conf[0])
except Exception:
confv = None
det = {"label": label, "confidence": confv, "bbox": [x1, y1, x2, y2]}
# Save cropped detection image if possible
try:
if full_img is not None and x2 > x1 and y2 > y1:
# clamp coords
x1c = max(0, min(x1, w_full - 1))
x2c = max(0, min(x2, w_full))
y1c = max(0, min(y1, h_full - 1))
y2c = max(0, min(y2, h_full))
if x2c > x1c and y2c > y1c:
crop = full_img[y1c:y2c, x1c:x2c]
crop_path = os.path.join(tmpdir, f"frame_{idx}_det_{det_i}.jpg")
cv2.imwrite(crop_path, crop)
det["image"] = crop_path
# compute visual features for the crop and attach
det["crop_visual"] = compute_visual_features(crop_path, [det])
except Exception:
pass
dets.append(det)
det_i += 1
# Compute simple visual features for this saved frame
visual = compute_visual_features(tmpf, dets)
frames.append({"frame_index": idx, "detections": dets, "visual": visual, "image_path": tmpf})
else:
# Non-sampled frame: still compute a cheap visual summary (no detections)
visual = compute_visual_features(tmpf, [])
frames.append({"frame_index": idx, "detections": [], "visual": visual, "image_path": tmpf})
idx += 1
if max_frames > 0 and idx >= max_frames:
break
cap.release()
else:
# single image
results = model.predict(source=media_path, conf=0.25, iou=0.45)
dets = []
if results and len(results) > 0:
full_img = cv2.imread(media_path)
h_full, w_full = (full_img.shape[:2] if full_img is not None else (0, 0))
det_i = 0
for box in results[0].boxes:
try:
cls_id = int(box.cls[0])
label = model.names[cls_id]
except Exception:
label = "object"
try:
x1, y1, x2, y2 = map(int, box.xyxy[0])
except Exception:
x1 = y1 = x2 = y2 = 0
try:
confv = float(box.conf[0])
except Exception:
confv = None
det = {"label": label, "confidence": confv, "bbox": [x1, y1, x2, y2]}
# Save cropped detection image if possible
try:
if full_img is not None and x2 > x1 and y2 > y1:
x1c = max(0, min(x1, w_full - 1))
x2c = max(0, min(x2, w_full))
y1c = max(0, min(y1, h_full - 1))
y2c = max(0, min(y2, h_full))
if x2c > x1c and y2c > y1c:
crop = full_img[y1c:y2c, x1c:x2c]
crop_path = os.path.join(tmpdir, f"frame_0_det_{det_i}.jpg")
cv2.imwrite(crop_path, crop)
det["image"] = crop_path
det["crop_visual"] = compute_visual_features(crop_path, [det])
except Exception:
pass
dets.append(det)
det_i += 1
# Compute visual features for single image
visual = compute_visual_features(media_path, dets)
frames.append({"frame_index": 0, "detections": dets, "visual": visual, "image_path": media_path})
prompt = _build_prompt(frames)
GPTClass = _load_gptoss_wrapper()
narrative = None
if GPTClass:
try:
# Allow overriding model via env var MODEL_ID (e.g. "openai/gpt-oss-120b:fireworks-ai")
model_id = os.getenv("MODEL_ID", "gpt-oss-120")
print(f"DEBUG: [gpt] using model_id={model_id}, HF_USE_ROUTER={os.getenv('HF_USE_ROUTER')}")
wrapper = GPTClass(model=model_id)
# DEBUG: print prompt (truncated) so Space logs show the request
try:
print("DEBUG: [gpt] sending prompt (truncated 2000 chars):")
print(prompt[:2000])
except Exception:
print("DEBUG: [gpt] (failed to print prompt)")
narrative = wrapper.generate(prompt)
# DEBUG: print a truncated portion of the response
try:
print("DEBUG: [gpt] response (truncated 2000 chars):")
print((narrative or "")[:2000])
except Exception:
print("DEBUG: [gpt] (failed to print response)")
except Exception as e:
narrative = f"(GPT call failed) {e}"
print("DEBUG: [gpt] call failed:", e)
else:
narrative = "(GPT wrapper unavailable) Fallback summary:\n"
counts = {}
for f in frames:
for d in f.get("detections", []):
counts[d["label"]] = counts.get(d["label"], 0) + 1
narrative += "Detected classes: " + ", ".join([f"{k}({v})" for k, v in counts.items()]) if counts else "No detections"
# Write Markdown
report_md = os.path.join(tmpdir, "report.md")
with open(report_md, "w", encoding="utf-8") as md:
md.write("# Informe de inspección (Generar analisis fuerte)\n\n")
md.write(narrative or "Sin narrativa disponible.\n\n")
md.write("\n## Per-frame detections\n\n")
for f in frames:
fid = f.get("frame_index")
md.write(f"- Frame {fid}:\n")
dets = f.get("detections", [])
if not dets:
md.write(" No detections\n")
else:
for i, d in enumerate(dets):
md.write(f" - {d.get('label')}({d.get('confidence')}) bbox={d.get('bbox')}\n")
if d.get("image"):
# Embed the cropped detection image
md.write(f" ![frame{fid}_det{i}]({d.get('image')})\n")
# Add crop visual description if available
cviz = d.get("crop_visual")
if cviz and cviz.get("description"):
md.write(f" Description: {cviz.get('description')}\n")
# Write JSON
report_json = os.path.join(tmpdir, "report.json")
with open(report_json, "w", encoding="utf-8") as jf:
json.dump({"narrative": narrative, "frames": frames}, jf, indent=2)
# Write PDF
report_pdf = os.path.join(tmpdir, "report.pdf")
_write_pdf(report_pdf, "Informe de inspección - Generar analisis fuerte", narrative, frames)
return {"status": "done", "report_pdf": report_pdf, "report_md": report_md, "report_json": report_json}
except Exception as e:
return {"status": f"error: {e}", "report_pdf": None, "report_md": None, "report_json": None}
# ────────────────────────────
with gr.Blocks(
title="KESHERAT AI",
theme=gr.themes.Soft(),
) as demo:
gr.HTML("""
<link rel=\"stylesheet\" href=\"/file=assets/kesheret.css\" />
<div class=\"kesheret-header\"><h1>KESHERAT AI</h1><p>KESHERAT AI</p></div>
<style>
/* FORCE DARK BLUE - INLINE STYLES HAVE HIGHEST PRIORITY */
button, .gr-button, .gradio-button, [data-testid*="button"] {
background: #031F33 !important;
background-color: #031F33 !important;
background-image: none !important;
border: 1px solid #031F33 !important;
color: white !important;
}
button:hover, .gr-button:hover, .gradio-button:hover, [data-testid*="button"]:hover {
background: #004D85 !important;
background-color: #004D85 !important;
border-color: #004D85 !important;
}
/* Tab buttons */
.tab-nav button, button[role="tab"] {
background: #031F33 !important;
background-color: #031F33 !important;
color: white !important;
}
.tab-nav button.selected, .tab-nav button:hover, button[role="tab"]:hover {
background: #004D85 !important;
background-color: #004D85 !important;
}
/* Override any purple colors */
[style*="rgb(139, 69, 255)"], [style*="rgb(168, 85, 247)"], [style*="#8b45ff"], [style*="#a855f7"] {
background: #031F33 !important;
background-color: #031F33 !important;
}
</style>
<script>
// JavaScript to force dark blue colors after page loads
function forceDarkBlue() {
const buttons = document.querySelectorAll('button, .gr-button, .gradio-button, [data-testid*="button"]');
buttons.forEach(btn => {
btn.style.setProperty('background', '#031F33', 'important');
btn.style.setProperty('background-color', '#031F33', 'important');
btn.style.setProperty('background-image', 'none', 'important');
btn.style.setProperty('border', '1px solid #031F33', 'important');
btn.style.setProperty('color', 'white', 'important');
});
// Override any purple elements
const purpleElements = document.querySelectorAll('[style*="rgb(139, 69, 255)"], [style*="rgb(168, 85, 247)"], [style*="#8b45ff"], [style*="#a855f7"]');
purpleElements.forEach(el => {
el.style.setProperty('background', '#031F33', 'important');
el.style.setProperty('background-color', '#031F33', 'important');
});
}
// Run immediately and on DOM changes
forceDarkBlue();
setTimeout(forceDarkBlue, 1000);
setTimeout(forceDarkBlue, 3000);
// Watch for DOM changes and reapply
const observer = new MutationObserver(forceDarkBlue);
observer.observe(document.body, { childList: true, subtree: true });
</script>
""")
# Access gate (token) — app is public but gated by token
with gr.Group(visible=True) as gate_group:
gr.Markdown("### Acceso — Introduce tu token de seguridad")
gate_token = gr.Textbox(label="Token", type="password", placeholder="Introduce tu token")
gate_status = gr.Markdown(visible=False)
btn_enter = gr.Button("Entrar")
with gr.Group(visible=False) as app_group:
# Input section: tabs for different media types
with gr.Tabs() as media_tabs:
# Video tab: only video input
with gr.TabItem("Vídeo"):
video_input = gr.Video(label="Sube tu vídeo de inspección")
# Imagen tab: only image input
with gr.TabItem("Imagen"):
image_input = gr.Image(type="filepath", label="Sube una imagen de inspección")
# Configuración tab: only classes tools
with gr.TabItem("Configuración"):
btn_classes = gr.Button("Mostrar clases del modelo")
txt_classes = gr.Textbox(label="Clases cargadas", interactive=False)
btn_classes.click(fn=show_classes, outputs=txt_classes)
# Reportes tab: only report tools
with gr.TabItem("Reportes"):
btn_report = gr.Button("Generar analisis fuerte")
status = gr.Textbox(label="Estado", interactive=False)
pdf_out = gr.File(label="Reporte PDF")
md_out = gr.File(label="Reporte Markdown")
json_out = gr.File(label="Reporte JSON")
def _on_report(vid, img):
path = None
if vid:
path = vid
elif img:
path = img if isinstance(img, str) else getattr(img, "name", None)
if not path:
return "No media provided", None, None, None
res = generar_analisis_fuerte(path)
return res.get("status", "error"), (res.get("report_pdf") if res.get("report_pdf") else None), (res.get("report_md") if res.get("report_md") else None), (res.get("report_json") if res.get("report_json") else None)
btn_report.click(fn=_on_report, inputs=[video_input, image_input], outputs=[status, pdf_out, md_out, json_out])
# Métricas tab: only metrics tools
with gr.TabItem("Métricas"):
btn_metrics = gr.Button("Ver métricas")
out_metrics = gr.JSON(label="Métricas", visible=True)
btn_metrics.click(fn=get_metrics, outputs=out_metrics, api_name="metrics")
# Detection button (always visible after token)
btn_detect = gr.Button("Detectar defectos", variant="primary")
# Output section: results appear here after detection
output_video = gr.Video(label="Vídeo anotado", visible=False)
output_image = gr.Image(label="Imagen anotada", visible=False)
# Analysis text below the image
analysis_text = gr.Markdown(label="Análisis de IA", visible=False)
# Hidden JSON components for API chaining
json_video = gr.JSON(visible=False)
json_image = gr.JSON(visible=False)
# Functions to show/hide outputs based on active tab and update content
def _update_video_output(json_result):
if json_result and json_result.get("video"):
return gr.Video(value=json_result["video"], visible=True), gr.Image(visible=False), gr.Markdown(visible=False)
return gr.Video(visible=False), gr.Image(visible=False), gr.Markdown(visible=False)
def _update_image_output(json_result):
if json_result and json_result.get("path"):
# Generar resumen de detecciones para el análisis GPT
classes = json_result.get("classes", {})
if classes:
detections_summary = "Detecciones automáticas: " + ", ".join([f"{k}: {v}" for k, v in classes.items()])
else:
detections_summary = "No se detectaron defectos automáticamente"
# Obtener análisis de GPT
analysis = analyze_image_with_gpt(json_result["path"], detections_summary)
return (
gr.Video(visible=False),
gr.Image(value=json_result["path"], visible=True),
gr.Markdown(value=analysis, visible=True)
)
return gr.Video(visible=False), gr.Image(visible=False), gr.Markdown(visible=False)
# Wire up the detection events with proper output visibility
ev_video = btn_detect.click(fn=infer_media, inputs=video_input, outputs=json_video, api_name="infer_media")
ev_video.then(_update_video_output, inputs=json_video, outputs=[output_video, output_image, analysis_text])
ev_image = btn_detect.click(fn=infer_media, inputs=image_input, outputs=json_image, api_name="infer_media_1")
ev_image.then(_update_image_output, inputs=json_image, outputs=[output_video, output_image, analysis_text])
# Wire the gate
btn_enter.click(fn=_check_token, inputs=[gate_token], outputs=[gate_group, app_group, gate_status])
# Habilitar cola para ZeroGPU
demo.queue()
if __name__ == "__main__":
# Permitir acceso de descarga a directorio temporal para evitar 403
demo.launch(allowed_paths=[tempfile.gettempdir()])