File size: 11,024 Bytes
87a8e86 48b05a3 87a8e86 0c14425 48b05a3 87a8e86 48b05a3 87a8e86 48b05a3 87a8e86 48b05a3 87a8e86 f7ebd8d 87a8e86 48b05a3 87a8e86 48b05a3 87a8e86 0f0bc2b 87a8e86 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 | import sqlite3
import re
import os
import json
import shutil
from datetime import datetime
from pathlib import Path
if os.path.exists("/data"):
DB_PATH = "/data/personas.db"
IMAGES_DIR = Path("/data/images")
else:
DB_PATH = os.environ.get("DB_PATH", "personas.db")
IMAGES_DIR = Path("images")
IMAGES_DIR.mkdir(parents=True, exist_ok=True)
SEED_FILE = Path(__file__).parent / "seed_data.json"
SEED_VERSION = "seed_v1"
# ─── Conexión ─────────────────────────────────────────────────────────────────
def get_conn():
conn = sqlite3.connect(DB_PATH, check_same_thread=False)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA foreign_keys=ON")
return conn
def init_db():
conn = get_conn()
conn.execute("""
CREATE TABLE IF NOT EXISTS personas (
id INTEGER PRIMARY KEY AUTOINCREMENT,
nombre TEXT,
cedula TEXT,
cedula_norm TEXT,
edad INTEGER,
hospital TEXT,
condicion TEXT DEFAULT 'Sin información',
descripcion TEXT,
imagen_path TEXT,
fecha_ingreso TEXT,
fecha_update TEXT,
verificado INTEGER DEFAULT 0,
contacto TEXT,
notas TEXT,
fuente TEXT DEFAULT 'ciudadano'
)
""")
conn.execute("CREATE TABLE IF NOT EXISTS meta (clave TEXT PRIMARY KEY, valor TEXT)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_cedula_norm ON personas(cedula_norm)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_nombre ON personas(nombre)")
conn.commit()
conn.close()
_aplicar_seed()
# ─── Carga inicial de datos (registros de hospitales + niños sin familiar) ─────
def _aplicar_seed():
"""
Carga los datos iniciales UNA sola vez (idempotente).
Si ya se aplicó esta versión del seed, no hace nada — así no se duplican
los registros al reiniciar el Space.
"""
if not SEED_FILE.exists():
return
conn = get_conn()
ya = conn.execute("SELECT valor FROM meta WHERE clave = ?", (SEED_VERSION,)).fetchone()
if ya:
conn.close()
return
try:
with open(SEED_FILE, encoding="utf-8") as f:
personas = json.load(f)
except Exception:
conn.close()
return
now = datetime.now().strftime("%Y-%m-%d %H:%M")
insertados = 0
for p in personas:
nombre = (p.get("nombre") or "").strip() or None
cedula = (p.get("cedula") or "").strip()
if not nombre and not cedula:
continue
cedula_n = normalizar_cedula(cedula)
conn.execute("""
INSERT INTO personas
(nombre, cedula, cedula_norm, edad, hospital, condicion, descripcion,
imagen_path, fecha_ingreso, fecha_update, contacto, notas, fuente)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)
""", (
nombre,
formatear_cedula_display(cedula_n) if cedula_n else (cedula or None),
cedula_n or None,
p.get("edad"), p.get("hospital"), p.get("condicion") or "Sin información",
None, None, now, now, p.get("contacto"), p.get("notas"),
p.get("fuente") or "registro hospital",
))
insertados += 1
conn.execute("INSERT OR REPLACE INTO meta (clave, valor) VALUES (?, ?)",
(SEED_VERSION, f"{insertados} registros cargados el {now}"))
conn.commit()
conn.close()
# ─── Normalización de cédula ──────────────────────────────────────────────────
def normalizar_cedula(cedula: str) -> str:
"""
Devuelve la cédula en formato estándar venezolano: V-12345678 / E-12345678
o solo dígitos si no tiene prefijo claro.
"""
if not cedula:
return ""
c = str(cedula).strip().upper()
c = re.sub(r"[\s.]", "", c)
match = re.match(r"^([VEP])-?(\d+)$", c)
if match:
prefijo, numero = match.groups()
return f"{prefijo}-{numero}"
solo_digitos = re.sub(r"[^0-9]", "", c)
return solo_digitos
def formatear_cedula_display(cedula_norm: str) -> str:
"""Para mostrar: V-12.345.678"""
if not cedula_norm:
return ""
match = re.match(r"^([VEP])-(\d+)$", cedula_norm)
if match:
prefijo, num = match.groups()
num_fmt = f"{int(num):,}".replace(",", ".")
return f"{prefijo}-{num_fmt}"
try:
return f"{int(cedula_norm):,}".replace(",", ".")
except ValueError:
return cedula_norm
# ─── Detección de duplicados ──────────────────────────────────────────────────
def buscar_duplicados(nombre: str, cedula_norm: str, edad=None) -> list[dict]:
"""Busca posibles duplicados ANTES de insertar, guiándose ÚNICAMENTE por la cédula."""
conn = get_conn()
resultados = []
if cedula_norm:
rows = conn.execute(
"SELECT * FROM personas WHERE cedula_norm = ? AND cedula_norm != ''",
(cedula_norm,)
).fetchall()
for r in rows:
d = dict(r)
d["nivel_alerta"] = "EXACTO"
resultados.append(d)
conn.close()
return resultados
# ─── CRUD ─────────────────────────────────────────────────────────────────────
def agregar_persona(nombre, cedula, edad, hospital, condicion, descripcion,
imagen_path, contacto, notas, fuente="ciudadano") -> int:
cedula_n = normalizar_cedula(cedula or "")
conn = get_conn()
now = datetime.now().strftime("%Y-%m-%d %H:%M")
cur = conn.execute("""
INSERT INTO personas
(nombre, cedula, cedula_norm, edad, hospital, condicion, descripcion,
imagen_path, fecha_ingreso, fecha_update, contacto, notas, fuente)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)
""", (
nombre or None,
formatear_cedula_display(cedula_n) if cedula_n else (cedula or None),
cedula_n or None,
edad or None, hospital or None, condicion,
descripcion or None, imagen_path or None,
now, now, contacto or None, notas or None, fuente
))
conn.commit()
new_id = cur.lastrowid
conn.close()
return new_id
def agregar_personas_bulk(personas: list[dict], hospital_default=None,
fuente="registro hospital", saltar_duplicados=True) -> dict:
"""
Inserta varias personas de una sola vez (carga de listas/imágenes).
Salta duplicados EXACTOS por cédula para no ensuciar la base.
Devuelve un resumen: {agregados, omitidos, nombres_omitidos}.
"""
conn = get_conn()
now = datetime.now().strftime("%Y-%m-%d %H:%M")
# prefetch de cédulas existentes para deduplicar rápido (una sola consulta)
existentes = set()
if saltar_duplicados:
for r in conn.execute("SELECT cedula_norm FROM personas WHERE cedula_norm IS NOT NULL AND cedula_norm != ''"):
existentes.add(r[0])
agregados, omitidos, nombres_omitidos = 0, 0, []
for p in personas:
nombre = (p.get("nombre") or "").strip() or None
cedula = (p.get("cedula") or "").strip()
if not nombre and not cedula:
continue
cedula_n = normalizar_cedula(cedula)
if saltar_duplicados and cedula_n and cedula_n in existentes:
omitidos += 1
nombres_omitidos.append(nombre or cedula_n)
continue
edad = p.get("edad")
try:
edad = int(float(edad)) if edad not in (None, "", "None", "nan") else None
except (TypeError, ValueError):
edad = None
hospital = (p.get("hospital") or hospital_default) or None
conn.execute("""
INSERT INTO personas
(nombre, cedula, cedula_norm, edad, hospital, condicion, descripcion,
imagen_path, fecha_ingreso, fecha_update, contacto, notas, fuente)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)
""", (
nombre,
formatear_cedula_display(cedula_n) if cedula_n else (cedula or None),
cedula_n or None,
edad, hospital, p.get("condicion") or "Sin información",
None, None, now, now, p.get("contacto"), p.get("notas"),
p.get("fuente") or fuente,
))
if cedula_n:
existentes.add(cedula_n)
agregados += 1
conn.commit()
conn.close()
return {"agregados": agregados, "omitidos": omitidos, "nombres_omitidos": nombres_omitidos}
def guardar_imagen(imagen_src_path: str, persona_id: int) -> str:
ext = Path(imagen_src_path).suffix or ".jpg"
dest = IMAGES_DIR / f"{persona_id}{ext}"
shutil.copy(imagen_src_path, dest)
return str(dest)
def buscar(nombre="", cedula="", edad_min=None, edad_max=None, hospital=""):
cedula_n = normalizar_cedula(cedula) if cedula else ""
conn = get_conn()
query = "SELECT * FROM personas WHERE 1=1"
params = []
if nombre.strip():
query += " AND UPPER(nombre) LIKE UPPER(?)"
params.append(f"%{nombre.strip()}%")
if cedula_n:
query += " AND (cedula_norm LIKE ? OR cedula LIKE ?)"
params += [f"%{cedula_n}%", f"%{cedula.strip()}%"]
elif cedula.strip():
query += " AND (cedula LIKE ? OR cedula_norm LIKE ?)"
params += [f"%{cedula.strip()}%", f"%{cedula.strip()}%"]
if edad_min is not None:
query += " AND edad >= ?"
params.append(int(edad_min))
if edad_max is not None:
query += " AND edad <= ?"
params.append(int(edad_max))
if hospital.strip():
query += " AND UPPER(hospital) LIKE UPPER(?)"
params.append(f"%{hospital.strip()}%")
query += " ORDER BY fecha_update DESC LIMIT 500"
rows = conn.execute(query, params).fetchall()
conn.close()
return [dict(r) for r in rows]
def obtener_todos_para_exportar():
"""Obtiene toda la base de datos sin límite para exportar."""
conn = get_conn()
rows = conn.execute("SELECT * FROM personas ORDER BY fecha_update DESC").fetchall()
conn.close()
return [dict(r) for r in rows]
def total_registros():
conn = get_conn()
n = conn.execute("SELECT COUNT(*) FROM personas").fetchone()[0]
conn.close()
return n
def get_hospitales():
conn = get_conn()
rows = conn.execute(
"SELECT DISTINCT hospital FROM personas WHERE hospital IS NOT NULL ORDER BY hospital"
).fetchall()
conn.close()
return [r[0] for r in rows]
|