| import sqlite3 |
| import re |
| import os |
| import json |
| import shutil |
| from datetime import datetime |
| from pathlib import Path |
|
|
| if os.path.exists("/data"): |
| DB_PATH = "/data/personas.db" |
| IMAGES_DIR = Path("/data/images") |
| else: |
| DB_PATH = os.environ.get("DB_PATH", "personas.db") |
| IMAGES_DIR = Path("images") |
|
|
| IMAGES_DIR.mkdir(parents=True, exist_ok=True) |
| SEED_FILE = Path(__file__).parent / "seed_data.json" |
| SEED_VERSION = "seed_v1" |
|
|
|
|
| |
|
|
| def get_conn(): |
| conn = sqlite3.connect(DB_PATH, check_same_thread=False) |
| conn.row_factory = sqlite3.Row |
| conn.execute("PRAGMA journal_mode=WAL") |
| conn.execute("PRAGMA foreign_keys=ON") |
| return conn |
|
|
|
|
| def init_db(): |
| conn = get_conn() |
| conn.execute(""" |
| CREATE TABLE IF NOT EXISTS personas ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| nombre TEXT, |
| cedula TEXT, |
| cedula_norm TEXT, |
| edad INTEGER, |
| hospital TEXT, |
| condicion TEXT DEFAULT 'Sin información', |
| descripcion TEXT, |
| imagen_path TEXT, |
| fecha_ingreso TEXT, |
| fecha_update TEXT, |
| verificado INTEGER DEFAULT 0, |
| contacto TEXT, |
| notas TEXT, |
| fuente TEXT DEFAULT 'ciudadano' |
| ) |
| """) |
| conn.execute("CREATE TABLE IF NOT EXISTS meta (clave TEXT PRIMARY KEY, valor TEXT)") |
| conn.execute("CREATE INDEX IF NOT EXISTS idx_cedula_norm ON personas(cedula_norm)") |
| conn.execute("CREATE INDEX IF NOT EXISTS idx_nombre ON personas(nombre)") |
| conn.commit() |
| conn.close() |
| _aplicar_seed() |
|
|
|
|
| |
|
|
| def _aplicar_seed(): |
| """ |
| Carga los datos iniciales UNA sola vez (idempotente). |
| Si ya se aplicó esta versión del seed, no hace nada — así no se duplican |
| los registros al reiniciar el Space. |
| """ |
| if not SEED_FILE.exists(): |
| return |
| conn = get_conn() |
| ya = conn.execute("SELECT valor FROM meta WHERE clave = ?", (SEED_VERSION,)).fetchone() |
| if ya: |
| conn.close() |
| return |
| try: |
| with open(SEED_FILE, encoding="utf-8") as f: |
| personas = json.load(f) |
| except Exception: |
| conn.close() |
| return |
|
|
| now = datetime.now().strftime("%Y-%m-%d %H:%M") |
| insertados = 0 |
| for p in personas: |
| nombre = (p.get("nombre") or "").strip() or None |
| cedula = (p.get("cedula") or "").strip() |
| if not nombre and not cedula: |
| continue |
| cedula_n = normalizar_cedula(cedula) |
| conn.execute(""" |
| INSERT INTO personas |
| (nombre, cedula, cedula_norm, edad, hospital, condicion, descripcion, |
| imagen_path, fecha_ingreso, fecha_update, contacto, notas, fuente) |
| VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?) |
| """, ( |
| nombre, |
| formatear_cedula_display(cedula_n) if cedula_n else (cedula or None), |
| cedula_n or None, |
| p.get("edad"), p.get("hospital"), p.get("condicion") or "Sin información", |
| None, None, now, now, p.get("contacto"), p.get("notas"), |
| p.get("fuente") or "registro hospital", |
| )) |
| insertados += 1 |
|
|
| conn.execute("INSERT OR REPLACE INTO meta (clave, valor) VALUES (?, ?)", |
| (SEED_VERSION, f"{insertados} registros cargados el {now}")) |
| conn.commit() |
| conn.close() |
|
|
|
|
| |
|
|
| def normalizar_cedula(cedula: str) -> str: |
| """ |
| Devuelve la cédula en formato estándar venezolano: V-12345678 / E-12345678 |
| o solo dígitos si no tiene prefijo claro. |
| """ |
| if not cedula: |
| return "" |
| c = str(cedula).strip().upper() |
| c = re.sub(r"[\s.]", "", c) |
| match = re.match(r"^([VEP])-?(\d+)$", c) |
| if match: |
| prefijo, numero = match.groups() |
| return f"{prefijo}-{numero}" |
| solo_digitos = re.sub(r"[^0-9]", "", c) |
| return solo_digitos |
|
|
|
|
| def formatear_cedula_display(cedula_norm: str) -> str: |
| """Para mostrar: V-12.345.678""" |
| if not cedula_norm: |
| return "" |
| match = re.match(r"^([VEP])-(\d+)$", cedula_norm) |
| if match: |
| prefijo, num = match.groups() |
| num_fmt = f"{int(num):,}".replace(",", ".") |
| return f"{prefijo}-{num_fmt}" |
| try: |
| return f"{int(cedula_norm):,}".replace(",", ".") |
| except ValueError: |
| return cedula_norm |
|
|
|
|
| |
|
|
| def buscar_duplicados(nombre: str, cedula_norm: str, edad=None) -> list[dict]: |
| """Busca posibles duplicados ANTES de insertar, guiándose ÚNICAMENTE por la cédula.""" |
| conn = get_conn() |
| resultados = [] |
|
|
| if cedula_norm: |
| rows = conn.execute( |
| "SELECT * FROM personas WHERE cedula_norm = ? AND cedula_norm != ''", |
| (cedula_norm,) |
| ).fetchall() |
| for r in rows: |
| d = dict(r) |
| d["nivel_alerta"] = "EXACTO" |
| resultados.append(d) |
|
|
| conn.close() |
| return resultados |
|
|
|
|
| |
|
|
| def agregar_persona(nombre, cedula, edad, hospital, condicion, descripcion, |
| imagen_path, contacto, notas, fuente="ciudadano") -> int: |
| cedula_n = normalizar_cedula(cedula or "") |
| conn = get_conn() |
| now = datetime.now().strftime("%Y-%m-%d %H:%M") |
| cur = conn.execute(""" |
| INSERT INTO personas |
| (nombre, cedula, cedula_norm, edad, hospital, condicion, descripcion, |
| imagen_path, fecha_ingreso, fecha_update, contacto, notas, fuente) |
| VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?) |
| """, ( |
| nombre or None, |
| formatear_cedula_display(cedula_n) if cedula_n else (cedula or None), |
| cedula_n or None, |
| edad or None, hospital or None, condicion, |
| descripcion or None, imagen_path or None, |
| now, now, contacto or None, notas or None, fuente |
| )) |
| conn.commit() |
| new_id = cur.lastrowid |
| conn.close() |
| return new_id |
|
|
|
|
| def agregar_personas_bulk(personas: list[dict], hospital_default=None, |
| fuente="registro hospital", saltar_duplicados=True) -> dict: |
| """ |
| Inserta varias personas de una sola vez (carga de listas/imágenes). |
| Salta duplicados EXACTOS por cédula para no ensuciar la base. |
| Devuelve un resumen: {agregados, omitidos, nombres_omitidos}. |
| """ |
| conn = get_conn() |
| now = datetime.now().strftime("%Y-%m-%d %H:%M") |
|
|
| |
| existentes = set() |
| if saltar_duplicados: |
| for r in conn.execute("SELECT cedula_norm FROM personas WHERE cedula_norm IS NOT NULL AND cedula_norm != ''"): |
| existentes.add(r[0]) |
|
|
| agregados, omitidos, nombres_omitidos = 0, 0, [] |
| for p in personas: |
| nombre = (p.get("nombre") or "").strip() or None |
| cedula = (p.get("cedula") or "").strip() |
| if not nombre and not cedula: |
| continue |
| cedula_n = normalizar_cedula(cedula) |
|
|
| if saltar_duplicados and cedula_n and cedula_n in existentes: |
| omitidos += 1 |
| nombres_omitidos.append(nombre or cedula_n) |
| continue |
|
|
| edad = p.get("edad") |
| try: |
| edad = int(float(edad)) if edad not in (None, "", "None", "nan") else None |
| except (TypeError, ValueError): |
| edad = None |
|
|
| hospital = (p.get("hospital") or hospital_default) or None |
| conn.execute(""" |
| INSERT INTO personas |
| (nombre, cedula, cedula_norm, edad, hospital, condicion, descripcion, |
| imagen_path, fecha_ingreso, fecha_update, contacto, notas, fuente) |
| VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?) |
| """, ( |
| nombre, |
| formatear_cedula_display(cedula_n) if cedula_n else (cedula or None), |
| cedula_n or None, |
| edad, hospital, p.get("condicion") or "Sin información", |
| None, None, now, now, p.get("contacto"), p.get("notas"), |
| p.get("fuente") or fuente, |
| )) |
| if cedula_n: |
| existentes.add(cedula_n) |
| agregados += 1 |
|
|
| conn.commit() |
| conn.close() |
| return {"agregados": agregados, "omitidos": omitidos, "nombres_omitidos": nombres_omitidos} |
|
|
|
|
| def guardar_imagen(imagen_src_path: str, persona_id: int) -> str: |
| ext = Path(imagen_src_path).suffix or ".jpg" |
| dest = IMAGES_DIR / f"{persona_id}{ext}" |
| shutil.copy(imagen_src_path, dest) |
| return str(dest) |
|
|
|
|
| def buscar(nombre="", cedula="", edad_min=None, edad_max=None, hospital=""): |
| cedula_n = normalizar_cedula(cedula) if cedula else "" |
| conn = get_conn() |
| query = "SELECT * FROM personas WHERE 1=1" |
| params = [] |
|
|
| if nombre.strip(): |
| query += " AND UPPER(nombre) LIKE UPPER(?)" |
| params.append(f"%{nombre.strip()}%") |
|
|
| if cedula_n: |
| query += " AND (cedula_norm LIKE ? OR cedula LIKE ?)" |
| params += [f"%{cedula_n}%", f"%{cedula.strip()}%"] |
| elif cedula.strip(): |
| query += " AND (cedula LIKE ? OR cedula_norm LIKE ?)" |
| params += [f"%{cedula.strip()}%", f"%{cedula.strip()}%"] |
|
|
| if edad_min is not None: |
| query += " AND edad >= ?" |
| params.append(int(edad_min)) |
| if edad_max is not None: |
| query += " AND edad <= ?" |
| params.append(int(edad_max)) |
| if hospital.strip(): |
| query += " AND UPPER(hospital) LIKE UPPER(?)" |
| params.append(f"%{hospital.strip()}%") |
|
|
| query += " ORDER BY fecha_update DESC LIMIT 500" |
| rows = conn.execute(query, params).fetchall() |
| conn.close() |
| return [dict(r) for r in rows] |
|
|
|
|
| def obtener_todos_para_exportar(): |
| """Obtiene toda la base de datos sin límite para exportar.""" |
| conn = get_conn() |
| rows = conn.execute("SELECT * FROM personas ORDER BY fecha_update DESC").fetchall() |
| conn.close() |
| return [dict(r) for r in rows] |
|
|
|
|
| def total_registros(): |
| conn = get_conn() |
| n = conn.execute("SELECT COUNT(*) FROM personas").fetchone()[0] |
| conn.close() |
| return n |
|
|
|
|
| def get_hospitales(): |
| conn = get_conn() |
| rows = conn.execute( |
| "SELECT DISTINCT hospital FROM personas WHERE hospital IS NOT NULL ORDER BY hospital" |
| ).fetchall() |
| conn.close() |
| return [r[0] for r in rows] |
|
|