Spaces:
Running
Running
| import os | |
| import cv2 | |
| import dlib | |
| import numpy as np | |
| import pandas as pd | |
| import face_recognition | |
| import face_recognition_models | |
| import google.generativeai as genai | |
| import gradio as gr | |
| import requests | |
| import json | |
| import subprocess | |
| import time | |
| from scipy.spatial import distance as dist | |
| from fpdf import FPDF | |
| from datetime import datetime | |
| from office365.sharepoint.client_context import ClientContext | |
| from PIL import Image | |
| # --- 1. CONFIGURACIÓN INICIAL --- | |
| LISTA_NEGRA_FILE = "lista_negra.csv" | |
| if not os.path.exists(LISTA_NEGRA_FILE): | |
| pd.DataFrame(columns=["cedula", "nombre", "motivo"]).to_csv(LISTA_NEGRA_FILE, index=False) | |
| genai.configure(api_key=os.getenv("GEMINI_API_KEY")) | |
| modelo_activo = "gemini-1.5-pro" | |
| try: | |
| modelos_permitidos = [m.name for m in genai.list_models() if 'generateContent' in m.supported_generation_methods] | |
| for m in modelos_permitidos: | |
| if 'flash' in m or 'pro' in m: | |
| modelo_activo = m.replace('models/', '') | |
| break | |
| except Exception: pass | |
| ai_model = genai.GenerativeModel(modelo_activo) | |
| detector = dlib.get_frontal_face_detector() | |
| predictor = dlib.shape_predictor(face_recognition_models.pose_predictor_model_location()) | |
| # --- 2. FUNCIONES DE APOYO Y APIS --- | |
| def get_ear(eye): | |
| A = dist.euclidean(eye[1], eye[5]) | |
| B = dist.euclidean(eye[2], eye[4]) | |
| C = dist.euclidean(eye[0], eye[3]) | |
| return (A + B) / (2.0 * C) | |
| def send_teams_alert(mensaje, nivel="INFO"): | |
| url = os.getenv("TEAMS_WEBHOOK_URL") | |
| if url: | |
| payload = {"text": f"🔔 **SafeScan Notification** [{nivel}]\n\n{mensaje}"} | |
| try: requests.post(url, json=payload) | |
| except Exception: pass | |
| def upload_to_sharepoint(file_path): | |
| try: | |
| ctx = ClientContext(os.getenv("SP_SITE_URL")).with_client_credentials( | |
| os.getenv("SP_CLIENT_ID"), os.getenv("SP_CLIENT_SECRET") | |
| ) | |
| target_folder = ctx.web.get_folder_by_server_relative_url(os.getenv("SP_FOLDER_NAME")) | |
| with open(file_path, 'rb') as f: | |
| target_folder.upload_file(os.path.basename(file_path), f.read()).execute_query() | |
| return "Cargado exitosamente en ActiData/Onboarding" | |
| except Exception as e: | |
| return f"Error Sharepoint: {str(e)}" | |
| def check_antecedentes_publicos(cedula): | |
| print(f"Consultando Policía y Procuraduría para {cedula}...") | |
| time.sleep(1) | |
| reporte = { | |
| "Procuraduria_Estado": "Sin antecedentes (Validado)", | |
| "Policia_Nacional_Estado": "Sin requerimientos (Validado)", | |
| "Fiscalia_SAE": "Limpio", | |
| "Alerta_Bloqueo": False | |
| } | |
| return reporte | |
| # --- 3. LÓGICA DE NEGOCIO --- | |
| def run_full_onboarding(video, doc_img, full_name): | |
| if not video or not doc_img or not full_name: | |
| return None, {"Estatus": "ERROR", "Detalle": "Campos incompletos."} | |
| try: | |
| img_pil = Image.open(doc_img) | |
| prompt_ocr = """Extrae de esta cédula colombiana la siguiente información. | |
| Retorna ESTRICTAMENTE un JSON con las llaves: 'cedula' (solo números sin puntos), 'apellidos' y 'nombres'.""" | |
| res_ai = ai_model.generate_content([prompt_ocr, img_pil]) | |
| raw_ai = res_ai.text.replace('```json', '').replace('```', '').strip() | |
| datos_cedula = json.loads(raw_ai) | |
| ocr_res = str(datos_cedula.get("cedula", "")).strip() | |
| nombre_completo_doc = f"{datos_cedula.get('nombres', '').strip()} {datos_cedula.get('apellidos', '').strip()}" | |
| except Exception as e: | |
| return None, {"Estatus": "ERROR", "Detalle": f"Fallo en lectura de la Cédula: {str(e)}"} | |
| df_bn = pd.read_csv(LISTA_NEGRA_FILE) | |
| if ocr_res in df_bn['cedula'].astype(str).values: | |
| return None, {"Estatus": "BLOQUEADO", "Motivo": "Cédula detectada en Lista Negra Interna."} | |
| antecedentes = check_antecedentes_publicos(ocr_res) | |
| if antecedentes.get("Alerta_Bloqueo"): | |
| send_teams_alert(f"⚠️ BLOQUEO SARLAFT: {full_name} ({ocr_res}).", "ALERTA") | |
| return None, {"Estatus": "BLOQUEADO", "Motivo": "Reporte positivo en listas públicas.", "Detalle": antecedentes} | |
| video_mp4 = "temp_video.mp4" | |
| subprocess.run(["ffmpeg", "-y", "-i", video, "-vcodec", "libx264", video_mp4], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) | |
| cap = cv2.VideoCapture(video_mp4) | |
| blink, motion, best_frame = False, False, None | |
| caras_detectadas = 0 | |
| while cap.isOpened(): | |
| ret, frame = cap.read() | |
| if not ret: break | |
| gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) | |
| rects = detector(gray, 1) | |
| if len(rects) > 0: caras_detectadas += 1 | |
| for rect in rects: | |
| shape = predictor(gray, rect) | |
| coords = np.array([[p.x, p.y] for p in shape.parts()]) | |
| ear_left = get_ear(coords[36:42]) | |
| ear_right = get_ear(coords[42:48]) | |
| if ((ear_left + ear_right) / 2.0) < 0.20: blink = True | |
| yaw = dist.euclidean(coords[30], coords[0]) / (dist.euclidean(coords[30], coords[16]) + 1e-6) | |
| if yaw > 1.6 or yaw < 0.6: motion = True | |
| best_frame = frame.copy() | |
| cap.release() | |
| if caras_detectadas == 0: | |
| return None, {"Estatus": "RECHAZADO", "Detalle": "No se detectó un rostro claro en el video."} | |
| if not (blink and motion): | |
| return None, {"Estatus": "RECHAZADO", "Detalle": "Prueba de vida fallida (Faltó parpadeo o giro frontal)."} | |
| try: | |
| doc_bytes = face_recognition.load_image_file(doc_img) | |
| doc_encodings = face_recognition.face_encodings(doc_bytes) | |
| live_encodings = face_recognition.face_encodings(cv2.cvtColor(best_frame, cv2.COLOR_BGR2RGB)) | |
| if not doc_encodings or not live_encodings: | |
| return None, {"Estatus": "ERROR", "Detalle": "No se pudo mapear la biometría."} | |
| match = bool(face_recognition.compare_faces([doc_encodings[0]], live_encodings[0], tolerance=0.6)[0]) | |
| except Exception as e: | |
| return None, {"Estatus": "ERROR", "Detalle": f"Error biométrico: {str(e)}"} | |
| res = { | |
| "Estatus": "APROBADO" if match else "RECHAZADO", | |
| "Cédula_Extraída": ocr_res, | |
| "Nombre_en_Cédula": nombre_completo_doc, | |
| "Coincidencia_Facial": match, | |
| "Antecedentes_Policía": antecedentes["Policia_Nacional_Estado"], | |
| "Antecedentes_Procuraduría": antecedentes["Procuraduria_Estado"], | |
| "Timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| } | |
| pdf = FPDF() | |
| pdf.add_page() | |
| pdf.set_font("Arial", "B", 10) | |
| pdf.cell(95, 10, "[LOGO SAE]", align='L') | |
| pdf.cell(95, 10, "[LOGO ACTIVOS POR COLOMBIA]", align='R', ln=True) | |
| pdf.line(10, 25, 200, 25) | |
| pdf.ln(10) | |
| pdf.set_text_color(0, 51, 102) | |
| pdf.set_font("Arial", "B", 16) | |
| pdf.cell(200, 10, f"Certificado de Seguridad y SARLAFT", ln=True, align='C') | |
| pdf.set_font("Arial", "B", 12) | |
| pdf.cell(200, 10, f"Usuario: {full_name}", ln=True, align='C') | |
| pdf.set_text_color(0, 0, 0) | |
| pdf.set_font("Arial", size=12) | |
| pdf.ln(10) | |
| for key, value in res.items(): | |
| pdf.cell(200, 10, f"{key}: {value}", ln=True) | |
| path_pdf = f"Reporte_Onboarding_{ocr_res}.pdf" | |
| pdf.output(path_pdf) | |
| res["Auditoría_SharePoint"] = upload_to_sharepoint(path_pdf) | |
| return path_pdf, res | |
| # --- 4. INTERFAZ GRÁFICA GUIADA (WIZARD) CON CSS CUSTOM --- | |
| custom_css = """ | |
| #guia_rostro { | |
| position: relative; | |
| } | |
| #guia_rostro::after { | |
| content: 'UBICA TU ROSTRO AQUÍ'; | |
| position: absolute; | |
| top: 45%; | |
| left: 50%; | |
| transform: translate(-50%, -50%); | |
| width: 220px; | |
| height: 300px; | |
| border: 4px dashed #00ff00; | |
| border-radius: 50%; | |
| pointer-events: none; | |
| z-index: 999; | |
| display: flex; | |
| align-items: flex-start; | |
| justify-content: center; | |
| padding-top: 20px; | |
| color: #00ff00; | |
| font-weight: bold; | |
| font-size: 14px; | |
| text-shadow: 1px 1px 2px black; | |
| background-color: rgba(0,255,0,0.05); | |
| animation: pulse 2s infinite; | |
| } | |
| @keyframes pulse { | |
| 0% { box-shadow: 0 0 0 0 rgba(0, 255, 0, 0.4); } | |
| 70% { box-shadow: 0 0 0 15px rgba(0, 255, 0, 0); } | |
| 100% { box-shadow: 0 0 0 0 rgba(0, 255, 0, 0); } | |
| } | |
| """ | |
| # AJUSTE PARA GRADIO 6.0: Dejamos Blocks() vacío | |
| with gr.Blocks() as demo: | |
| gr.Markdown("<h1 style='text-align: center; color: #003366;'>🛡️ SafeScan: Onboarding Institucional</h1>") | |
| with gr.Column(visible=True) as paso_1: | |
| gr.Markdown("### PASO 1: Verificación de Identidad (Prueba de Vida)") | |
| gr.Markdown(""" | |
| **Por favor sigue estas instrucciones:** | |
| 1. 💡 Buena iluminación frontal. | |
| 2. 🎯 **Ubica tu rostro dentro del óvalo verde punteado**. | |
| 3. 🔴 Presiona el botón de grabar. | |
| 4. 👀 **Parpadea** un par de veces. | |
| 5. ↔️ **Gira lentamente** la cabeza de lado a lado. | |
| """) | |
| n_in = gr.Textbox(label="Ingresa tu Nombre Completo", placeholder="Ej. Juan Carlos Albornoz") | |
| v_in = gr.Video(label="Cámara", sources=["webcam"], include_audio=False, elem_id="guia_rostro") | |
| btn_paso1 = gr.Button("Siguiente: Escanear Cédula ➡️", variant="primary") | |
| with gr.Column(visible=False) as paso_2: | |
| gr.Markdown("### PASO 2: Captura de Documento") | |
| gr.Markdown(""" | |
| **Instrucciones para la Cédula:** | |
| 1. 🪪 Sostén tu cédula frente a la cámara web. | |
| 2. 📸 Asegúrate de que los datos y tu foto se vean nítidos antes de capturar. | |
| """) | |
| i_in = gr.Image(label="Tomar Foto de Cédula", sources=["webcam"], type="filepath") | |
| with gr.Row(): | |
| btn_atras = gr.Button("⬅️ Volver al Paso 1") | |
| btn_paso2 = gr.Button("✅ Ejecutar Auditoría Completa", variant="primary") | |
| with gr.Column(visible=False) as paso_3: | |
| gr.Markdown("### PASO 3: Resultados de la Auditoría") | |
| with gr.Row(): | |
| with gr.Column(): | |
| out_pdf = gr.File(label="Reporte PDF Institucional") | |
| with gr.Column(): | |
| out_json = gr.JSON(label="Resultados") | |
| btn_reiniciar = gr.Button("🔄 Iniciar Nuevo Registro") | |
| def ir_a_paso2(nombre, video): | |
| if not nombre or not video: | |
| gr.Warning("Debes ingresar tu nombre y grabar el video.") | |
| return gr.update(visible=True), gr.update(visible=False) | |
| return gr.update(visible=False), gr.update(visible=True) | |
| def volver_a_paso1(): | |
| return gr.update(visible=True), gr.update(visible=False) | |
| def procesar_todo(video, imagen, nombre): | |
| if not imagen: | |
| gr.Warning("Debes tomar la foto de la cédula.") | |
| return gr.update(visible=True), gr.update(visible=False), None, None | |
| pdf, json_res = run_full_onboarding(video, imagen, nombre) | |
| return gr.update(visible=False), gr.update(visible=True), pdf, json_res | |
| def reiniciar(): | |
| return gr.update(visible=True), gr.update(visible=False), gr.update(visible=False), None, None, None | |
| btn_paso1.click(ir_a_paso2, inputs=[n_in, v_in], outputs=[paso_1, paso_2]) | |
| btn_atras.click(volver_a_paso1, outputs=[paso_1, paso_2]) | |
| btn_paso2.click(procesar_todo, inputs=[v_in, i_in, n_in], outputs=[paso_2, paso_3, out_pdf, out_json]) | |
| btn_reiniciar.click(reiniciar, outputs=[paso_1, paso_2, paso_3, n_in, v_in, i_in]) | |
| # AJUSTE PARA GRADIO 6.0: Pasamos theme y css a la función launch() | |
| demo.launch(server_name="0.0.0.0", server_port=7860, theme=gr.themes.Soft(), css=custom_css) |