Venezuela / database.py
Yofran23's picture
Actualización automática: database.py
0f0bc2b verified
Raw
History Blame Contribute Delete
11 kB
import sqlite3
import re
import os
import json
import shutil
from datetime import datetime
from pathlib import Path
if os.path.exists("/data"):
DB_PATH = "/data/personas.db"
IMAGES_DIR = Path("/data/images")
else:
DB_PATH = os.environ.get("DB_PATH", "personas.db")
IMAGES_DIR = Path("images")
IMAGES_DIR.mkdir(parents=True, exist_ok=True)
SEED_FILE = Path(__file__).parent / "seed_data.json"
SEED_VERSION = "seed_v1"
# ─── Conexión ─────────────────────────────────────────────────────────────────
def get_conn():
conn = sqlite3.connect(DB_PATH, check_same_thread=False)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA foreign_keys=ON")
return conn
def init_db():
conn = get_conn()
conn.execute("""
CREATE TABLE IF NOT EXISTS personas (
id INTEGER PRIMARY KEY AUTOINCREMENT,
nombre TEXT,
cedula TEXT,
cedula_norm TEXT,
edad INTEGER,
hospital TEXT,
condicion TEXT DEFAULT 'Sin información',
descripcion TEXT,
imagen_path TEXT,
fecha_ingreso TEXT,
fecha_update TEXT,
verificado INTEGER DEFAULT 0,
contacto TEXT,
notas TEXT,
fuente TEXT DEFAULT 'ciudadano'
)
""")
conn.execute("CREATE TABLE IF NOT EXISTS meta (clave TEXT PRIMARY KEY, valor TEXT)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_cedula_norm ON personas(cedula_norm)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_nombre ON personas(nombre)")
conn.commit()
conn.close()
_aplicar_seed()
# ─── Carga inicial de datos (registros de hospitales + niños sin familiar) ─────
def _aplicar_seed():
"""
Carga los datos iniciales UNA sola vez (idempotente).
Si ya se aplicó esta versión del seed, no hace nada — así no se duplican
los registros al reiniciar el Space.
"""
if not SEED_FILE.exists():
return
conn = get_conn()
ya = conn.execute("SELECT valor FROM meta WHERE clave = ?", (SEED_VERSION,)).fetchone()
if ya:
conn.close()
return
try:
with open(SEED_FILE, encoding="utf-8") as f:
personas = json.load(f)
except Exception:
conn.close()
return
now = datetime.now().strftime("%Y-%m-%d %H:%M")
insertados = 0
for p in personas:
nombre = (p.get("nombre") or "").strip() or None
cedula = (p.get("cedula") or "").strip()
if not nombre and not cedula:
continue
cedula_n = normalizar_cedula(cedula)
conn.execute("""
INSERT INTO personas
(nombre, cedula, cedula_norm, edad, hospital, condicion, descripcion,
imagen_path, fecha_ingreso, fecha_update, contacto, notas, fuente)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)
""", (
nombre,
formatear_cedula_display(cedula_n) if cedula_n else (cedula or None),
cedula_n or None,
p.get("edad"), p.get("hospital"), p.get("condicion") or "Sin información",
None, None, now, now, p.get("contacto"), p.get("notas"),
p.get("fuente") or "registro hospital",
))
insertados += 1
conn.execute("INSERT OR REPLACE INTO meta (clave, valor) VALUES (?, ?)",
(SEED_VERSION, f"{insertados} registros cargados el {now}"))
conn.commit()
conn.close()
# ─── Normalización de cédula ──────────────────────────────────────────────────
def normalizar_cedula(cedula: str) -> str:
"""
Devuelve la cédula en formato estándar venezolano: V-12345678 / E-12345678
o solo dígitos si no tiene prefijo claro.
"""
if not cedula:
return ""
c = str(cedula).strip().upper()
c = re.sub(r"[\s.]", "", c)
match = re.match(r"^([VEP])-?(\d+)$", c)
if match:
prefijo, numero = match.groups()
return f"{prefijo}-{numero}"
solo_digitos = re.sub(r"[^0-9]", "", c)
return solo_digitos
def formatear_cedula_display(cedula_norm: str) -> str:
"""Para mostrar: V-12.345.678"""
if not cedula_norm:
return ""
match = re.match(r"^([VEP])-(\d+)$", cedula_norm)
if match:
prefijo, num = match.groups()
num_fmt = f"{int(num):,}".replace(",", ".")
return f"{prefijo}-{num_fmt}"
try:
return f"{int(cedula_norm):,}".replace(",", ".")
except ValueError:
return cedula_norm
# ─── Detección de duplicados ──────────────────────────────────────────────────
def buscar_duplicados(nombre: str, cedula_norm: str, edad=None) -> list[dict]:
"""Busca posibles duplicados ANTES de insertar, guiándose ÚNICAMENTE por la cédula."""
conn = get_conn()
resultados = []
if cedula_norm:
rows = conn.execute(
"SELECT * FROM personas WHERE cedula_norm = ? AND cedula_norm != ''",
(cedula_norm,)
).fetchall()
for r in rows:
d = dict(r)
d["nivel_alerta"] = "EXACTO"
resultados.append(d)
conn.close()
return resultados
# ─── CRUD ─────────────────────────────────────────────────────────────────────
def agregar_persona(nombre, cedula, edad, hospital, condicion, descripcion,
imagen_path, contacto, notas, fuente="ciudadano") -> int:
cedula_n = normalizar_cedula(cedula or "")
conn = get_conn()
now = datetime.now().strftime("%Y-%m-%d %H:%M")
cur = conn.execute("""
INSERT INTO personas
(nombre, cedula, cedula_norm, edad, hospital, condicion, descripcion,
imagen_path, fecha_ingreso, fecha_update, contacto, notas, fuente)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)
""", (
nombre or None,
formatear_cedula_display(cedula_n) if cedula_n else (cedula or None),
cedula_n or None,
edad or None, hospital or None, condicion,
descripcion or None, imagen_path or None,
now, now, contacto or None, notas or None, fuente
))
conn.commit()
new_id = cur.lastrowid
conn.close()
return new_id
def agregar_personas_bulk(personas: list[dict], hospital_default=None,
fuente="registro hospital", saltar_duplicados=True) -> dict:
"""
Inserta varias personas de una sola vez (carga de listas/imágenes).
Salta duplicados EXACTOS por cédula para no ensuciar la base.
Devuelve un resumen: {agregados, omitidos, nombres_omitidos}.
"""
conn = get_conn()
now = datetime.now().strftime("%Y-%m-%d %H:%M")
# prefetch de cédulas existentes para deduplicar rápido (una sola consulta)
existentes = set()
if saltar_duplicados:
for r in conn.execute("SELECT cedula_norm FROM personas WHERE cedula_norm IS NOT NULL AND cedula_norm != ''"):
existentes.add(r[0])
agregados, omitidos, nombres_omitidos = 0, 0, []
for p in personas:
nombre = (p.get("nombre") or "").strip() or None
cedula = (p.get("cedula") or "").strip()
if not nombre and not cedula:
continue
cedula_n = normalizar_cedula(cedula)
if saltar_duplicados and cedula_n and cedula_n in existentes:
omitidos += 1
nombres_omitidos.append(nombre or cedula_n)
continue
edad = p.get("edad")
try:
edad = int(float(edad)) if edad not in (None, "", "None", "nan") else None
except (TypeError, ValueError):
edad = None
hospital = (p.get("hospital") or hospital_default) or None
conn.execute("""
INSERT INTO personas
(nombre, cedula, cedula_norm, edad, hospital, condicion, descripcion,
imagen_path, fecha_ingreso, fecha_update, contacto, notas, fuente)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)
""", (
nombre,
formatear_cedula_display(cedula_n) if cedula_n else (cedula or None),
cedula_n or None,
edad, hospital, p.get("condicion") or "Sin información",
None, None, now, now, p.get("contacto"), p.get("notas"),
p.get("fuente") or fuente,
))
if cedula_n:
existentes.add(cedula_n)
agregados += 1
conn.commit()
conn.close()
return {"agregados": agregados, "omitidos": omitidos, "nombres_omitidos": nombres_omitidos}
def guardar_imagen(imagen_src_path: str, persona_id: int) -> str:
ext = Path(imagen_src_path).suffix or ".jpg"
dest = IMAGES_DIR / f"{persona_id}{ext}"
shutil.copy(imagen_src_path, dest)
return str(dest)
def buscar(nombre="", cedula="", edad_min=None, edad_max=None, hospital=""):
cedula_n = normalizar_cedula(cedula) if cedula else ""
conn = get_conn()
query = "SELECT * FROM personas WHERE 1=1"
params = []
if nombre.strip():
query += " AND UPPER(nombre) LIKE UPPER(?)"
params.append(f"%{nombre.strip()}%")
if cedula_n:
query += " AND (cedula_norm LIKE ? OR cedula LIKE ?)"
params += [f"%{cedula_n}%", f"%{cedula.strip()}%"]
elif cedula.strip():
query += " AND (cedula LIKE ? OR cedula_norm LIKE ?)"
params += [f"%{cedula.strip()}%", f"%{cedula.strip()}%"]
if edad_min is not None:
query += " AND edad >= ?"
params.append(int(edad_min))
if edad_max is not None:
query += " AND edad <= ?"
params.append(int(edad_max))
if hospital.strip():
query += " AND UPPER(hospital) LIKE UPPER(?)"
params.append(f"%{hospital.strip()}%")
query += " ORDER BY fecha_update DESC LIMIT 500"
rows = conn.execute(query, params).fetchall()
conn.close()
return [dict(r) for r in rows]
def obtener_todos_para_exportar():
"""Obtiene toda la base de datos sin límite para exportar."""
conn = get_conn()
rows = conn.execute("SELECT * FROM personas ORDER BY fecha_update DESC").fetchall()
conn.close()
return [dict(r) for r in rows]
def total_registros():
conn = get_conn()
n = conn.execute("SELECT COUNT(*) FROM personas").fetchone()[0]
conn.close()
return n
def get_hospitales():
conn = get_conn()
rows = conn.execute(
"SELECT DISTINCT hospital FROM personas WHERE hospital IS NOT NULL ORDER BY hospital"
).fetchall()
conn.close()
return [r[0] for r in rows]