import gradio as gr import pandas as pd import requests import re import sqlite3 import os import numpy as np from datetime import datetime, timedelta from urllib.parse import urlparse, urlunparse, quote # --- 1. CONFIGURACIÓN --- API_KEY = os.getenv("GOOGLE_API_KEY") SEARCH_ENGINE_ID = os.getenv("SEARCH_ENGINE_ID") DB_NAME = "data_cache_v14.db" # --- 2. GESTIÓN DE BASE DE DATOS --- def iniciar_db(): conn = sqlite3.connect(DB_NAME) c = conn.cursor() c.execute('''CREATE TABLE IF NOT EXISTS inmuebles (id INTEGER PRIMARY KEY AUTOINCREMENT, query_busqueda TEXT, titulo TEXT, precio REAL, area REAL, habs INTEGER, banos INTEGER, garajes INTEGER, estrato INTEGER, seguridad INTEGER, antiguedad INTEGER, lat TEXT, lon TEXT, fuente TEXT, url TEXT, es_directo INTEGER, fecha_registro DATE)''') conn.commit() conn.close() def guardar_cache(query, datos): if not datos: return conn = sqlite3.connect(DB_NAME) c = conn.cursor() hoy = datetime.now().date() for d in datos: c.execute('''INSERT INTO inmuebles (query_busqueda, titulo, precio, area, habs, banos, garajes, estrato, seguridad, antiguedad, lat, lon, fuente, url, es_directo, fecha_registro) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)''', (query, d['titulo'], d['precio'], d['area'], d['habs'], d['banos'], d['garajes'], d['estrato'], d['seguridad'], d['antiguedad'], d['lat'], d['lon'], d['fuente'], d['url'], d['es_directo'], hoy)) conn.commit() conn.close() def leer_cache(query): conn = sqlite3.connect(DB_NAME) limite = (datetime.now() - timedelta(days=7)).date() df = pd.read_sql_query("SELECT * FROM inmuebles WHERE query_busqueda = ? AND fecha_registro >= ?", conn, params=(query, limite)) conn.close() return df if not df.empty else None # --- 3. EXTRACCIÓN Y VALIDACIÓN --- def analizar_tipo_url(url): url = url.lower() positivos = ['/inmueble/', '/proyecto/', '/propiedad/', 'detalle', 'p-', 'id-', 'cod-', 'mco-', 'mla-'] if any(p in url for p in positivos): return 1 negativos = ['listado', 'resultados', 'buscar', 'search', 'ordenar', 'filtrar', 'page'] if any(n in url for n in negativos): return 0 return 0 def limpiar_url(url): try: parsed = urlparse(url) clean = urlunparse((parsed.scheme, parsed.netloc, parsed.path, '', '', '')) return clean except: return url def extraer_coordenadas(item): lat, lon = None, None pagemap = item.get('pagemap', {}) metatags = pagemap.get('metatags', [{}])[0] if 'og:latitude' in metatags: lat = metatags.get('og:latitude') lon = metatags.get('og:longitude') elif 'geo.position' in metatags: try: parts = metatags['geo.position'].split(';') lat, lon = parts[0], parts[1] except: pass if not lat and 'geocoordinates' in pagemap: geo = pagemap['geocoordinates'][0] lat = geo.get('latitude') lon = geo.get('longitude') return lat, lon def parsear_texto_completo(texto): texto = texto.lower() precio = 0 match_precio = re.search(r'\$\s?([\d.,]+)', texto) if match_precio: s = match_precio.group(1).replace('.','').replace(',','').strip() try: precio = float(s) except: pass area = 0 match_area = re.search(r'(\d+[\.,]?\d*)\s?(m2|mt|mts|metro)', texto) if match_area: s_area = match_area.group(1).replace(',', '.') try: area = float(s_area) except: pass habs = 0 match_habs = re.search(r'(\d+)\s?(hab|alcoba|dormitorio)', texto) if match_habs: try: habs = int(match_habs.group(1)) except: pass banos = 0 match_banos = re.search(r'(\d+)\s?(baño|bano)', texto) if match_banos: try: banos = int(match_banos.group(1)) except: pass garajes = 0 match_garaje = re.search(r'(\d+)\s?(parqueadero|garaje)', texto) if match_garaje: try: garajes = int(match_garaje.group(1)) except: pass elif "garaje" in texto or "parqueadero" in texto: garajes = 1 estrato = 0 match_estrato = re.search(r'estrato\s?:?\s?(\d)', texto) if match_estrato: try: estrato = int(match_estrato.group(1)) except: pass antiguedad = -1 if "estrenar" in texto or "nuevo" in texto or "sobre planos" in texto: antiguedad = 0 else: match_anos = re.search(r'(\d+)\s?(año|ano)', texto) if match_anos: try: antiguedad = int(match_anos.group(1)) except: pass seguridad = 0 if any(k in texto for k in ['conjunto', 'vigilancia', 'porteria', 'seguridad', 'club house', 'cerrado']): seguridad = 1 return precio, area, habs, banos, garajes, estrato, seguridad, antiguedad # --- 4. CONEXIÓN GOOGLE API --- def buscar_google(query): if not API_KEY or not SEARCH_ENGINE_ID: return [] url = "https://www.googleapis.com/customsearch/v1" query_optimizada = f"{query} detalle" query_optimizada = query_optimizada.replace(",", " OR ") params = {'key': API_KEY, 'cx': SEARCH_ENGINE_ID, 'q': query_optimizada, 'num': 10} try: resp = requests.get(url, params=params) if resp.status_code == 429: return [{"error": "quota"}] data = resp.json() resultados = [] if 'items' in data: for item in data['items']: raw_link = item.get('link', '') es_directo = analizar_tipo_url(raw_link) final_link = limpiar_url(raw_link) texto = f"{item.get('title')} {item.get('snippet')}" precio, area, habs, banos, garajes, estrato, seguridad, antiguedad = parsear_texto_completo(texto) lat, lon = extraer_coordenadas(item) fuente = "Web" if "fincaraiz" in raw_link: fuente = "Finca Raíz" elif "metrocuadrado" in raw_link: fuente = "Metrocuadrado" elif "wasi" in raw_link: fuente = "Wasi" if precio > 0 or area > 0: resultados.append({ 'titulo': item.get('title'), 'precio': precio, 'area': area, 'habs': habs, 'banos': banos, 'garajes': garajes, 'estrato': estrato, 'seguridad': seguridad, 'antiguedad': antiguedad, 'lat': lat, 'lon': lon, 'fuente': fuente, 'url': final_link, 'es_directo': es_directo }) return resultados except Exception as e: print(f"Error: {e}") return [] # --- 5. ALGORITMO DE AFINIDAD --- def calcular_scores(df, p_ref, a_ref, h_ref, b_ref, g_ref, e_ref, antiguedad_ref, buscar_condominio): if df.empty: return df df_f = df.copy() # Precios y Áreas df_f['diff_p'] = abs(df_f['precio'] - p_ref) / p_ref score_p = np.maximum(0, 1 - df_f['diff_p']) df_f['diff_a'] = df_f['area'].apply(lambda x: abs(x - a_ref)/a_ref if x > 0 else 1.0) score_a = np.maximum(0, 1 - df_f['diff_a']) # Habitaciones/Baños score_h = df_f['habs'].apply(lambda x: 1.0 if x == h_ref else (0.9 if x==0 else (0.5 if abs(x-h_ref)<=1 else 0))) score_b = df_f['banos'].apply(lambda x: 1.0 if x == b_ref else (0.9 if x==0 else (0.6 if abs(x-b_ref)<=1 else 0.2))) # Garajes/Estrato score_g = df_f['garajes'].apply(lambda x: 1.0 if x >= g_ref else (0.5 if x < g_ref and x > 0 else 0.8 if x==0 else 0)) score_e = df_f['estrato'].apply(lambda x: 1.0 if x == e_ref else (0.9 if x==0 else (0.5 if abs(x-e_ref)<=1 else 0))) # Antigüedad/Condominio def calc_edad(x, ref): if x == -1: return 0.8 if ref == 0: return 1.0 if x == 0 else max(0, 1 - (x/20)) return max(0, 1 - (abs(x - ref) / 20)) score_ant = df_f['antiguedad'].apply(lambda x: calc_edad(x, antiguedad_ref)) def calc_condo(x, quiere_condo): if not quiere_condo: return 1.0 return 1.0 if x == 1 else 0.2 score_c = df_f['seguridad'].apply(lambda x: calc_condo(x, buscar_condominio)) # SCORE FINAL df_f['score'] = (score_p * 25) + (score_a * 20) + (score_h * 10) + \ (score_b * 10) + (score_g * 10) + (score_e * 10) + \ (score_ant * 10) + (score_c * 5) df_f['score'] = df_f['score'].clip(0, 100).round(1) return df_f.sort_values(by='score', ascending=False) # --- 6. MOTOR PRINCIPAL --- def motor(zona, tipo, precio, area, habs, banos, garajes, estrato, antiguedad, es_condominio): css_injection = """
""" if not API_KEY: return f"{css_injection}

⚠️ Error: Faltan API Keys.

" q = f"venta {tipo} {zona.lower()}" df = leer_cache(q) origen = "⚡ Caché" if df is None: origen = "🌐 Google API" lista = buscar_google(q) if lista and "error" in lista[0]: return f"{css_injection}

⚠️ Límite de Cuota Excedido.

" if lista: guardar_cache(q, lista) df = pd.DataFrame(lista) if df is None or df.empty: return f"{css_injection}

❌ No se encontraron datos válidos.

" df_similares = calcular_scores(df, precio, area, habs, banos, garajes, estrato, antiguedad, es_condominio) # Matemáticas Blindadas df_calc = df_similares[(df_similares['score'] >= 40) & (df_similares['area'] > 10) & (df_similares['precio'] > 1000000)] if df_calc.empty: df_calc = df_similares[df_similares['area'] > 10] prom_precio = df_calc['precio'].mean() if not df_calc.empty else 0 if not df_calc.empty: df_calc['m2_individual'] = df_calc['precio'] / df_calc['area'] prom_m2 = df_calc['m2_individual'].mean() else: prom_m2 = 0 html = f"{css_injection}" html += f"""

🎯 Valuación A.V.M.: {zona}

Fuente: {origen}
""" for _, row in df_similares.iterrows(): score = row['score'] if score >= 80: color_bg = "#16a34a" elif score >= 60: color_bg = "#ca8a04" else: color_bg = "#dc2626" txt_area = f"{row['area']} m²" if row['area'] > 0 else "N/A" # --- LÓGICA DE MAPA INTELIGENTE --- if row['lat'] and row['lon']: # Coordenadas exactas encontradas gmaps_link = f"https://www.google.com/maps/search/?api=1&query={row['lat']},{row['lon']}" btn_mapa = f"📍 Ver Ubicación Exacta" else: # Búsqueda Inversa por Título (Fallback) titulo_safe = quote(f"{row['titulo']} {zona}") gmaps_link = f"https://www.google.com/maps/search/?api=1&query={titulo_safe}" btn_mapa = f"📍 Buscar en Mapa" # ---------------------------------- if row['es_directo'] == 1: btn_link = f"🔗 Ver Inmueble" else: btn_link = f"⚠️ Ver Listado de Zona" extras = [] if row['habs']>0: extras.append(f"🛏️ {row['habs']} Hb") if row['banos']>0: extras.append(f"🚿 {row['banos']} Ba") if row['garajes']>0: extras.append(f"🚗 {row['garajes']} Pq") if row['estrato']>0: extras.append(f"💎 E{row['estrato']}") if row['antiguedad']==0: extras.append("🌟 Nuevo") elif row['antiguedad']>0: extras.append(f"⏱️ {row['antiguedad']}a") str_extras = "  |  ".join(extras) badge_seg = "🛡️ Condominio" if row['seguridad'] else "" html += f"""
{row['titulo']} {badge_seg}
💰 ${row['precio']:,.0f}    📐 {txt_area}
{str_extras}
{btn_mapa}
{score}%
{row['fuente']}
{btn_link}
""" html += "" return html # --- 7. INTERFAZ --- iniciar_db() with gr.Blocks(theme=gr.themes.Base()) as demo: gr.Markdown("# 🏢 Valuador Inteligente V14 (Mapa Activo)") with gr.Row(): with gr.Column(scale=2): inp_zona = gr.Textbox(label="📍 Zona", placeholder="Ej: Salitre, Modelia") with gr.Column(scale=1): inp_tipo = gr.Dropdown(["Apartamento", "Casa", "Lote", "Oficina"], label="Tipo", value="Apartamento") with gr.Row(): inp_precio = gr.Number(label="💰 Precio Ref.", value=300000000) inp_area = gr.Number(label="📐 Área (m²)", value=60) inp_estrato = gr.Dropdown([1,2,3,4,5,6], label="💎 Estrato", value=4) with gr.Row(): inp_habs = gr.Slider(1, 6, value=3, step=1, label="🛏️ Habitaciones") inp_banos = gr.Slider(1, 6, value=2, step=1, label="🚿 Baños") inp_garajes = gr.Slider(0, 3, value=1, step=1, label="🚗 Garajes") with gr.Row(): inp_antiguedad = gr.Number(label="⏱️ Antigüedad (Años)", value=5) inp_condominio = gr.Checkbox(label="🛡️ Buscar en Condominio", value=True) btn = gr.Button("🔍 Calcular Valor", variant="primary") out = gr.HTML(label="Informe") btn.click(motor, inputs=[inp_zona, inp_tipo, inp_precio, inp_area, inp_habs, inp_banos, inp_garajes, inp_estrato, inp_antiguedad, inp_condominio], outputs=out) if __name__ == "__main__": demo.launch()