import sqlite3 import re import os import json import shutil from datetime import datetime from pathlib import Path if os.path.exists("/data"): DB_PATH = "/data/personas.db" IMAGES_DIR = Path("/data/images") else: DB_PATH = os.environ.get("DB_PATH", "personas.db") IMAGES_DIR = Path("images") IMAGES_DIR.mkdir(parents=True, exist_ok=True) SEED_FILE = Path(__file__).parent / "seed_data.json" SEED_VERSION = "seed_v1" # ─── Conexión ───────────────────────────────────────────────────────────────── def get_conn(): conn = sqlite3.connect(DB_PATH, check_same_thread=False) conn.row_factory = sqlite3.Row conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA foreign_keys=ON") return conn def init_db(): conn = get_conn() conn.execute(""" CREATE TABLE IF NOT EXISTS personas ( id INTEGER PRIMARY KEY AUTOINCREMENT, nombre TEXT, cedula TEXT, cedula_norm TEXT, edad INTEGER, hospital TEXT, condicion TEXT DEFAULT 'Sin información', descripcion TEXT, imagen_path TEXT, fecha_ingreso TEXT, fecha_update TEXT, verificado INTEGER DEFAULT 0, contacto TEXT, notas TEXT, fuente TEXT DEFAULT 'ciudadano' ) """) conn.execute("CREATE TABLE IF NOT EXISTS meta (clave TEXT PRIMARY KEY, valor TEXT)") conn.execute("CREATE INDEX IF NOT EXISTS idx_cedula_norm ON personas(cedula_norm)") conn.execute("CREATE INDEX IF NOT EXISTS idx_nombre ON personas(nombre)") conn.commit() conn.close() _aplicar_seed() # ─── Carga inicial de datos (registros de hospitales + niños sin familiar) ───── def _aplicar_seed(): """ Carga los datos iniciales UNA sola vez (idempotente). Si ya se aplicó esta versión del seed, no hace nada — así no se duplican los registros al reiniciar el Space. """ if not SEED_FILE.exists(): return conn = get_conn() ya = conn.execute("SELECT valor FROM meta WHERE clave = ?", (SEED_VERSION,)).fetchone() if ya: conn.close() return try: with open(SEED_FILE, encoding="utf-8") as f: personas = json.load(f) except Exception: conn.close() return now = datetime.now().strftime("%Y-%m-%d %H:%M") insertados = 0 for p in personas: nombre = (p.get("nombre") or "").strip() or None cedula = (p.get("cedula") or "").strip() if not nombre and not cedula: continue cedula_n = normalizar_cedula(cedula) conn.execute(""" INSERT INTO personas (nombre, cedula, cedula_norm, edad, hospital, condicion, descripcion, imagen_path, fecha_ingreso, fecha_update, contacto, notas, fuente) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?) """, ( nombre, formatear_cedula_display(cedula_n) if cedula_n else (cedula or None), cedula_n or None, p.get("edad"), p.get("hospital"), p.get("condicion") or "Sin información", None, None, now, now, p.get("contacto"), p.get("notas"), p.get("fuente") or "registro hospital", )) insertados += 1 conn.execute("INSERT OR REPLACE INTO meta (clave, valor) VALUES (?, ?)", (SEED_VERSION, f"{insertados} registros cargados el {now}")) conn.commit() conn.close() # ─── Normalización de cédula ────────────────────────────────────────────────── def normalizar_cedula(cedula: str) -> str: """ Devuelve la cédula en formato estándar venezolano: V-12345678 / E-12345678 o solo dígitos si no tiene prefijo claro. """ if not cedula: return "" c = str(cedula).strip().upper() c = re.sub(r"[\s.]", "", c) match = re.match(r"^([VEP])-?(\d+)$", c) if match: prefijo, numero = match.groups() return f"{prefijo}-{numero}" solo_digitos = re.sub(r"[^0-9]", "", c) return solo_digitos def formatear_cedula_display(cedula_norm: str) -> str: """Para mostrar: V-12.345.678""" if not cedula_norm: return "" match = re.match(r"^([VEP])-(\d+)$", cedula_norm) if match: prefijo, num = match.groups() num_fmt = f"{int(num):,}".replace(",", ".") return f"{prefijo}-{num_fmt}" try: return f"{int(cedula_norm):,}".replace(",", ".") except ValueError: return cedula_norm # ─── Detección de duplicados ────────────────────────────────────────────────── def buscar_duplicados(nombre: str, cedula_norm: str, edad=None) -> list[dict]: """Busca posibles duplicados ANTES de insertar, guiándose ÚNICAMENTE por la cédula.""" conn = get_conn() resultados = [] if cedula_norm: rows = conn.execute( "SELECT * FROM personas WHERE cedula_norm = ? AND cedula_norm != ''", (cedula_norm,) ).fetchall() for r in rows: d = dict(r) d["nivel_alerta"] = "EXACTO" resultados.append(d) conn.close() return resultados # ─── CRUD ───────────────────────────────────────────────────────────────────── def agregar_persona(nombre, cedula, edad, hospital, condicion, descripcion, imagen_path, contacto, notas, fuente="ciudadano") -> int: cedula_n = normalizar_cedula(cedula or "") conn = get_conn() now = datetime.now().strftime("%Y-%m-%d %H:%M") cur = conn.execute(""" INSERT INTO personas (nombre, cedula, cedula_norm, edad, hospital, condicion, descripcion, imagen_path, fecha_ingreso, fecha_update, contacto, notas, fuente) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?) """, ( nombre or None, formatear_cedula_display(cedula_n) if cedula_n else (cedula or None), cedula_n or None, edad or None, hospital or None, condicion, descripcion or None, imagen_path or None, now, now, contacto or None, notas or None, fuente )) conn.commit() new_id = cur.lastrowid conn.close() return new_id def agregar_personas_bulk(personas: list[dict], hospital_default=None, fuente="registro hospital", saltar_duplicados=True) -> dict: """ Inserta varias personas de una sola vez (carga de listas/imágenes). Salta duplicados EXACTOS por cédula para no ensuciar la base. Devuelve un resumen: {agregados, omitidos, nombres_omitidos}. """ conn = get_conn() now = datetime.now().strftime("%Y-%m-%d %H:%M") # prefetch de cédulas existentes para deduplicar rápido (una sola consulta) existentes = set() if saltar_duplicados: for r in conn.execute("SELECT cedula_norm FROM personas WHERE cedula_norm IS NOT NULL AND cedula_norm != ''"): existentes.add(r[0]) agregados, omitidos, nombres_omitidos = 0, 0, [] for p in personas: nombre = (p.get("nombre") or "").strip() or None cedula = (p.get("cedula") or "").strip() if not nombre and not cedula: continue cedula_n = normalizar_cedula(cedula) if saltar_duplicados and cedula_n and cedula_n in existentes: omitidos += 1 nombres_omitidos.append(nombre or cedula_n) continue edad = p.get("edad") try: edad = int(float(edad)) if edad not in (None, "", "None", "nan") else None except (TypeError, ValueError): edad = None hospital = (p.get("hospital") or hospital_default) or None conn.execute(""" INSERT INTO personas (nombre, cedula, cedula_norm, edad, hospital, condicion, descripcion, imagen_path, fecha_ingreso, fecha_update, contacto, notas, fuente) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?) """, ( nombre, formatear_cedula_display(cedula_n) if cedula_n else (cedula or None), cedula_n or None, edad, hospital, p.get("condicion") or "Sin información", None, None, now, now, p.get("contacto"), p.get("notas"), p.get("fuente") or fuente, )) if cedula_n: existentes.add(cedula_n) agregados += 1 conn.commit() conn.close() return {"agregados": agregados, "omitidos": omitidos, "nombres_omitidos": nombres_omitidos} def guardar_imagen(imagen_src_path: str, persona_id: int) -> str: ext = Path(imagen_src_path).suffix or ".jpg" dest = IMAGES_DIR / f"{persona_id}{ext}" shutil.copy(imagen_src_path, dest) return str(dest) def buscar(nombre="", cedula="", edad_min=None, edad_max=None, hospital=""): cedula_n = normalizar_cedula(cedula) if cedula else "" conn = get_conn() query = "SELECT * FROM personas WHERE 1=1" params = [] if nombre.strip(): query += " AND UPPER(nombre) LIKE UPPER(?)" params.append(f"%{nombre.strip()}%") if cedula_n: query += " AND (cedula_norm LIKE ? OR cedula LIKE ?)" params += [f"%{cedula_n}%", f"%{cedula.strip()}%"] elif cedula.strip(): query += " AND (cedula LIKE ? OR cedula_norm LIKE ?)" params += [f"%{cedula.strip()}%", f"%{cedula.strip()}%"] if edad_min is not None: query += " AND edad >= ?" params.append(int(edad_min)) if edad_max is not None: query += " AND edad <= ?" params.append(int(edad_max)) if hospital.strip(): query += " AND UPPER(hospital) LIKE UPPER(?)" params.append(f"%{hospital.strip()}%") query += " ORDER BY fecha_update DESC LIMIT 500" rows = conn.execute(query, params).fetchall() conn.close() return [dict(r) for r in rows] def obtener_todos_para_exportar(): """Obtiene toda la base de datos sin límite para exportar.""" conn = get_conn() rows = conn.execute("SELECT * FROM personas ORDER BY fecha_update DESC").fetchall() conn.close() return [dict(r) for r in rows] def total_registros(): conn = get_conn() n = conn.execute("SELECT COUNT(*) FROM personas").fetchone()[0] conn.close() return n def get_hospitales(): conn = get_conn() rows = conn.execute( "SELECT DISTINCT hospital FROM personas WHERE hospital IS NOT NULL ORDER BY hospital" ).fetchall() conn.close() return [r[0] for r in rows]