Lukeetah's picture
Update app.py
db669b7 verified
import os
import re
import ssl
import json
import time
import smtplib
import random
import asyncio
from dataclasses import dataclass, asdict
from typing import List, Optional, Dict, Any, Tuple
from email.message import EmailMessage
import urllib.parse as ul
import httpx
from bs4 import BeautifulSoup
from rapidfuzz import fuzz
import pandas as pd
import gradio as gr
# =========================
# Configuración principal
# =========================
DEFAULT_MAX_USD = 90000
DEFAULT_NEIGHBORHOODS = [
"Saavedra", "Nuñez", "La Lucila", "Florida Oeste", "Munro", "Carapachay",
"Olivos", "Villa Martelli", "Florida", "Vicente López"
]
DEFAULT_TYPES = ["casa", "ph"] # "casa", "ph"
DEFAULT_MIN_ROOMS = 3
REQUIRE_BIDET = True
REQUIRE_PET_FRIENDLY = True
REQUIRE_OUTDOOR = True
# Auto-relajación si no hay resultados (escalonada)
AUTO_RELAX_ENABLED = True
RELAX_STEPS = [
{"require_bidet": False}, # 1) liberar bidet
{"require_pet": False}, # 2) liberar mascotas
{"min_rooms": 2}, # 3) bajar ambientes a 2
{"require_outdoor": False}, # 4) exterior opcional
{"max_price_usd_delta": 10000}, # 5) subir precio máx. +10k
]
# Microzonas (boost de score)
MICROZONAS_PRIORITARIAS = [
"Parque Saavedra", "Parque Sarmiento", "Av. Balbín", "Ruiz Huidobro",
"Lomas de Nuñez", "Cabildo", "Plaza Alberti",
"Estación La Lucila", "Rawson", "Paraná", "Maipú",
"Estación Florida", "Estación Carapachay", "Estación Munro",
"Ugarte", "San Martín", "Panamericana", "Pelliza", "Melo",
]
# Anti-scraping
USER_AGENT_POOL = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 13_4) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.4 Safari/605.1.15",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:126.0) Gecko/20100101 Firefox/126.0",
]
REFERER_POOL = ["https://www.google.com/", "https://www.bing.com/", "https://duckduckgo.com/"]
TIMEOUT = httpx.Timeout(25.0, connect=12.0)
RETRIES = 2
BACKOFF_BASE = 0.9
JITTER_RANGE = (0.13, 0.55) # s
# Rate-limit por dominio
DOMAIN_RATE_LIMIT = {
"www.zonaprop.com.ar": 0.6,
"www.argenprop.com": 0.6,
"www.properati.com.ar": 0.6,
"inmuebles.mercadolibre.com.ar": 0.7,
"inmuebles.clarin.com": 0.8,
"www.soloduenos.com": 0.9,
"mudafy.com.ar": 0.7,
"www.remax.com.ar": 0.9,
"www.enbuenosaires.com": 0.8,
"www.buscatucasa.com.ar": 0.8,
}
# Proxy opcional (configurable en Secrets)
PROXY_URL = os.getenv("PROXY_URL", "").strip()
# Email (configurable en Secrets)
SMTP_HOST = os.getenv("SMTP_HOST", "").strip()
SMTP_PORT = int(os.getenv("SMTP_PORT", "587"))
SMTP_USER = os.getenv("SMTP_USER", "").strip()
SMTP_PASS = os.getenv("SMTP_PASS", "").strip()
SMTP_FROM = os.getenv("SMTP_FROM", SMTP_USER).strip()
SMTP_USE_SSL = os.getenv("SMTP_USE_SSL", "false").lower() in ("1", "true", "yes")
EMAIL_REGEX = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$")
# =========================
# Modelos y utilidades
# =========================
@dataclass
class Listing:
source: str
title: str
link: str
price_usd: Optional[float]
currency: Optional[str]
address: Optional[str]
neighborhood: Optional[str]
city: Optional[str]
rooms: Optional[int]
bedrooms: Optional[int]
bathrooms: Optional[int]
has_patio: Optional[bool]
has_terrace: Optional[bool]
pet_friendly: Optional[bool]
has_bidet: Optional[bool]
description: Optional[str]
score: float
def clean_text(s: str) -> str:
return re.sub(r"\s+", " ", (s or "").strip())
def to_float_price(value: str) -> Optional[float]:
if not value:
return None
txt = value.replace(".", "").replace(",", ".").upper()
if any(k in txt for k in ["USD", "U$S", "US$", "DOLAR", "U$D"]):
m = re.search(r"(\d+(?:\.\d+)?)", txt)
return float(m.group(1)) if m else None
return None
def extract_int_from(text: str, pattern: str) -> Optional[int]:
if not text:
return None
m = re.search(pattern, text)
return int(m.group(1)) if m else None
def fuzzy_any(text: str, keywords: List[str], thresh: int = 80) -> bool:
if not text:
return False
t = text.lower()
return any(fuzz.partial_ratio(t, kw.lower()) >= thresh for kw in keywords)
def feature_guess(desc: str) -> Tuple[Optional[bool], Optional[bool], Optional[bool], Optional[bool]]:
patio = fuzzy_any(desc, ["patio", "patio propio", "patio descubierto", "fondo", "jardín"])
terraza = fuzzy_any(desc, ["terraza", "terraza propia", "terraza transitable", "azotea"])
mascotas = fuzzy_any(desc, ["se aceptan mascotas", "pet friendly", "apta mascotas", "mascotas"])
bidet = fuzzy_any(desc, ["bidet"])
return patio or None, terraza or None, mascotas or None, bidet or None
def residential_score(address: str, neighborhood: str, desc: str) -> float:
text = " ".join([address or "", neighborhood or "", desc or ""])
boost = 0.0
for kw in MICROZONAS_PRIORITARIAS:
if fuzz.partial_ratio(text.lower(), kw.lower()) >= 80:
boost += 0.5
return min(boost, 2.0)
def compute_score(lst: Listing, filters: Dict[str, Any]) -> float:
score = 0.0
if lst.price_usd is not None and lst.price_usd <= filters["max_price_usd"]:
score += 1.0
score += (filters["max_price_usd"] - lst.price_usd) / max(filters["max_price_usd"], 1) * 1.0
if lst.rooms and lst.rooms >= filters["min_rooms"]:
score += 1.0
if filters["require_outdoor"] and (lst.has_patio or lst.has_terrace):
score += 1.0
if filters["require_pet"]:
score += 0.6 if lst.pet_friendly else 0.0
else:
score += 0.2
if filters["require_bidet"]:
score += 0.6 if lst.has_bidet else 0.0
else:
score += 0.2
score += residential_score(lst.address or "", lst.neighborhood or "", lst.description or "")
return round(score, 3)
# =========================
# Anti-scraping helpers
# =========================
_last_hit: Dict[str, float] = {}
def make_headers() -> Dict[str, str]:
return {
"User-Agent": random.choice(USER_AGENT_POOL),
"Accept-Language": "es-AR,es;q=0.9,en;q=0.8",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Referer": random.choice(REFERER_POOL),
"Cache-Control": "no-cache",
"Pragma": "no-cache",
"DNT": "1",
}
async def domain_throttle(domain: str):
now = time.time()
gap = DOMAIN_RATE_LIMIT.get(domain, 0.5)
last = _last_hit.get(domain, 0.0)
wait = (last + gap) - now
if wait > 0:
await asyncio.sleep(wait)
_last_hit[domain] = time.time()
await asyncio.sleep(random.uniform(*JITTER_RANGE))
async def fetch(url: str) -> Optional[str]:
proxies = {"all://": PROXY_URL} if PROXY_URL else None
domain = ul.urlparse(url).netloc
for i in range(RETRIES + 1):
await domain_throttle(domain)
try:
async with httpx.AsyncClient(follow_redirects=True, http2=True, proxies=proxies, timeout=TIMEOUT) as client:
r = await client.get(url, headers=make_headers())
# aceptamos HTML corto; algunos portales entregan SSR mínimo
if r.status_code == 200 and r.text:
return r.text
await asyncio.sleep(BACKOFF_BASE * (2 ** i) + random.uniform(0, 0.35))
except Exception:
await asyncio.sleep(BACKOFF_BASE * (2 ** i) + random.uniform(0, 0.35))
return None
# =========================
# Portales
# =========================
class Portal:
def __init__(self, domain: str, search_builder):
self.domain = domain
self.search_builder = search_builder # fn(neighs, max_usd, types)->[urls]
def sb_qparam(base: str, param: str = "q"):
def _builder(neighs: List[str], max_usd: int, types: List[str]) -> List[str]:
urls = []
syn_outdoor = ["patio", "terraza", "exterior"]
syn_pets = ["mascotas", "pet friendly"]
rooms_variants = ["3 ambientes", "tres ambientes"]
for n in neighs:
for o in syn_outdoor:
q = f"{' o '.join(types)} venta {n} hasta {max_usd} dolares {random.choice(rooms_variants)} {o} {random.choice(syn_pets)} bidet"
urls.append(f"{base}?{param}={ul.quote(q)}")
return urls
return _builder
PORTALS: List[Portal] = [
Portal("www.zonaprop.com.ar", sb_qparam("https://www.zonaprop.com.ar/propiedades.html", "q")),
Portal("www.argenprop.com", sb_qparam("https://www.argenprop.com/propiedades", "text")),
Portal("www.properati.com.ar", sb_qparam("https://www.properati.com.ar/s/venta/propiedades", "q")),
Portal("inmuebles.mercadolibre.com.ar", sb_qparam("https://inmuebles.mercadolibre.com.ar", "as_word")),
Portal("inmuebles.clarin.com", sb_qparam("https://inmuebles.clarin.com/listado", "q")),
Portal("www.soloduenos.com", sb_qparam("https://www.soloduenos.com/buscar", "q")),
Portal("mudafy.com.ar", sb_qparam("https://mudafy.com.ar/propiedades", "q")),
Portal("www.remax.com.ar", sb_qparam("https://www.remax.com.ar/listings", "q")),
Portal("www.enbuenosaires.com", sb_qparam("https://www.enbuenosaires.com/buscar", "q")),
Portal("www.buscatucasa.com.ar", sb_qparam("https://www.buscatucasa.com.ar/buscar", "q")),
]
ANCHOR_TOKENS = [
"propiedad", "inmueble", "inmuebles", "departamento", "casa", "ph",
"detalle", "item", "listing", "publicacion", "aviso", "MLA-"
]
def generic_card_extractor(soup: BeautifulSoup, domain: str) -> List[Dict[str, Any]]:
anchors = soup.select("a[href]")
seen = set()
cards = []
for a in anchors:
href = a.get("href", "")
if not href:
continue
# normalizar absoluto
if href.startswith("//"):
href = "https:" + href
elif href.startswith("/"):
href = f"https://{domain}{href}"
# solo mismo dominio
if domain not in href:
continue
# filtrar rutas no relevantes
if any(x in href for x in ["/login", "/perfil", "/ayuda", "/faq", "/favorito", "/mi-cuenta"]):
continue
# heurística de “parece aviso”
if not any(tok in href.lower() for tok in [t.lower() for t in ANCHOR_TOKENS]):
continue
# no duplicados
if href in seen:
continue
seen.add(href)
title = clean_text(a.get_text(" ", strip=True))
if len(title) < 8:
# algunos sitios tienen título en contenedor padre
parent = a.find_parent()
if parent:
title = clean_text(parent.get_text(" ", strip=True))[:160]
# texto de bloque cercano
parent = a.find_parent()
block_text = clean_text(parent.get_text(" ", strip=True)) if parent else ""
m = re.search(r"(U\$S|USD|US\$|D[oó]lares?)\s*([\d\.\,]+)", block_text, re.IGNORECASE)
price_text = m.group(0) if m else ""
addr_m = re.search(r"(Saavedra|Nu[eñ]ez|La Lucila|Florida(?: Oeste)?|Munro|Carapachay|Olivos|Martelli|Vicente L[oó]pez)[^|,]*", block_text, re.IGNORECASE)
addr_text = addr_m.group(0) if addr_m else ""
cards.append({
"title": title[:160],
"link": href,
"price_text": price_text,
"addr_text": addr_text
})
return cards[:50]
async def scrape_search_page(url: str, domain: str) -> List[Listing]:
html = await fetch(url)
if not html:
return []
soup = BeautifulSoup(html, "lxml")
raw = generic_card_extractor(soup, domain)
out: List[Listing] = []
for c in raw:
price = to_float_price(c.get("price_text", ""))
out.append(Listing(
source=domain,
title=clean_text(c.get("title", ""))[:160],
link=c.get("link", ""),
price_usd=price,
currency="USD" if price is not None else None,
address=c.get("addr_text", ""),
neighborhood=None,
city="Vicente López / CABA",
rooms=None, bedrooms=None, bathrooms=None,
has_patio=None, has_terrace=None, pet_friendly=None, has_bidet=None,
description=None,
score=0.0
))
return out
async def scrape_portal(portal: Portal, neighborhoods: List[str], max_usd: int, types: List[str]) -> List[Listing]:
urls = portal.search_builder(neighborhoods, max_usd, types)
results: List[Listing] = []
for u in urls[:6]: # primeras 6 queries permutadas
try:
res = await scrape_search_page(u, portal.domain)
results.extend(res)
except Exception:
pass
return results
async def enrich_listing(lst: Listing) -> Listing:
html = await fetch(lst.link)
if not html:
return lst
soup = BeautifulSoup(html, "lxml")
# Descripción
desc_el = soup.find(["div", "section"], attrs={"class": re.compile(r"(description|descripcion|post|body|texto|descripcion-larga)")}) or soup.find("p")
desc = clean_text(desc_el.get_text(" ", strip=True)) if desc_el else clean_text(" ".join(x.get_text(" ", strip=True) for x in soup.find_all(["p", "li"])[:60]))
# Inferencias
patio, terraza, mascotas, bidet = feature_guess(desc)
# Características
feat_text = " ".join(
el.get_text(" ", strip=True) for el in soup.find_all(["li", "span", "div"])
if el and el.get_text() and any(x in el.get_text().lower() for x in ["ambiente", "dorm", "bañ"])
).lower()
# también mirar el título
coarse = (lst.title + " " + desc).lower()
rooms = extract_int_from(feat_text, r"(\d+)\s*ambiente") or extract_int_from(coarse, r"(\d+)\s*amb")
bathrooms = extract_int_from(feat_text, r"(\d+)\s*bañ") or extract_int_from(coarse, r"(\d+)\s*bañ")
bedrooms = extract_int_from(feat_text, r"(\d+)\s*dorm") or extract_int_from(coarse, r"(\d+)\s*dormi")
# Dirección
addr_guess = soup.find(attrs={"class": re.compile(r"(address|ubicacion|ubicación|location|inmo-location)")})
if addr_guess and not lst.address:
lst.address = clean_text(addr_guess.get_text(" ", strip=True))[:200]
lst.description = desc or lst.description
lst.has_patio = lst.has_patro if hasattr(lst, "has_patro") else lst.has_patio # guard
lst.has_patio = lst.has_patio if lst.has_patio is not None else patio
lst.has_terrace = lst.has_terrace if lst.has_terrace is not None else terraza
lst.pet_friendly = lst.pet_friendly if lst.pet_friendly is not None else mascotas
lst.has_bidet = lst.has_bidet if lst.has_bidet is not None else bidet
lst.rooms = lst.rooms or rooms
lst.bathrooms = lst.bathrooms or bathrooms
lst.bedrooms = lst.bedrooms or bedrooms
return lst
# =========================
# Orquestación
# =========================
def canon(url: str) -> str:
try:
parsed = ul.urlparse(url)
q = ul.parse_qsl(parsed.query)
q = [(k, v) for (k, v) in q if k.lower() not in {"utm_source", "utm_medium", "utm_campaign", "gclid", "s", "utm_term", "utm_content"}]
new_q = ul.urlencode(q, doseq=True)
return ul.urlunparse((parsed.scheme, parsed.netloc, parsed.path, "", new_q, ""))
except Exception:
return url
async def run_agent_once(neighborhoods: List[str], max_price_usd: int, types: List[str],
min_rooms: int, require_outdoor: bool, require_bidet: bool, require_pet: bool) -> Tuple[List[Listing], str]:
filters = dict(
max_price_usd=max_price_usd,
min_rooms=min_rooms,
require_outdoor=require_outdoor,
require_bidet=require_bidet,
require_pet=require_pet,
)
# 1) Multi-portal
tasks = [scrape_portal(p, neighborhoods, max_price_usd, types) for p in PORTALS]
batch = await asyncio.gather(*tasks)
listings = [l for sub in batch for l in sub]
# 2) Dedup
seen = set()
unique: List[Listing] = []
for l in listings:
key = canon(l.link)
if key in seen:
continue
seen.add(key)
l.link = key
unique.append(l)
# 3) Enriquecer
sem = asyncio.Semaphore(8)
async def guard(item: Listing):
async with sem:
enriched = await enrich_listing(item)
await asyncio.sleep(random.uniform(*JITTER_RANGE))
return enriched
enriched = await asyncio.gather(*[guard(l) for l in unique])
# 4) Filtrar (tolerante: None no bloquea salvo que se exija explícito)
def passes(l: Listing) -> bool:
if l.price_usd is None or l.price_usd > max_price_usd:
return False
if l.rooms is not None and l.rooms < min_rooms:
return False
if require_outdoor and not ((l.has_patio is True) or (l.has_terrace is True)):
return False
if require_bidet and l.has_bidet is not True:
return False
if require_pet and l.pet_friendly is not True:
return False
# tipo tolerante
mix = (l.title + " " + (l.description or "")).lower()
if not any(t in mix for t in types):
pass
return True
filtered = [l for l in enriched if passes(l)]
# 5) Score + Orden
for l in filtered:
l.score = compute_score(l, filters)
filtered.sort(key=lambda x: (-x.score, x.price_usd or 1e9))
# Trace
trace = f"Portales: {len(PORTALS)} | Crudos: {len(listings)} | Únicos: {len(unique)} | Enriquecidos: {len(enriched)} | Final: {len(filtered)}"
return filtered, trace
async def run_agent_with_relax(neighborhoods: List[str], max_price_usd: int, types: List[str],
min_rooms: int, require_outdoor: bool, require_bidet: bool, require_pet: bool,
auto_relax: bool = True) -> Tuple[List[Listing], List[str]]:
log = []
results, trace = await run_agent_once(neighborhoods, max_price_usd, types, min_rooms, require_outdoor, require_bidet, require_pet)
log.append(f"[Base] {trace}")
if results or not auto_relax:
return results, log
# no hay resultados: probar escalonado
base = dict(
neighborhoods=neighborhoods, max_price_usd=max_price_usd, types=types,
min_rooms=min_rooms, require_outdoor=require_outdoor, require_bidet=require_bidet, require_pet=require_pet
)
price = max_price_usd
for i, step in enumerate(RELAX_STEPS, 1):
mr = step.get("min_rooms", base["min_rooms"])
ro = step.get("require_outdoor", base["require_outdoor"])
rb = step.get("require_bidet", base["require_bidet"])
rp = step.get("require_pet", base["require_pet"])
if "max_price_usd_delta" in step:
price = max_price_usd + step["max_price_usd_delta"]
log.append(f"[Relax {i}] rooms={mr} outdoor={ro} bidet={rb} pet={rp} price_max=USD {price}")
results, trace = await run_agent_once(neighborhoods, price, types, mr, ro, rb, rp)
log.append(f"[Relax {i}] {trace}")
if results:
return results, log
return results, log
def listings_to_df(listings: List[Listing]) -> pd.DataFrame:
rows = []
for l in listings:
rows.append({
"Fuente": l.source.replace("www.", ""),
"Título": l.title,
"Precio USD": l.price_usd,
"Ambientes": l.rooms,
"Dormitorios": l.bedrooms,
"Baños": l.bathrooms,
"Patio": l.has_patio,
"Terraza": l.has_terrace,
"Mascotas": l.pet_friendly,
"Bidet": l.has_bidet,
"Dirección/Área": l.address,
"Link": l.link,
"Score": l.score
})
df = pd.DataFrame(rows)
if not df.empty:
cols = ["Fuente","Título","Precio USD","Ambientes","Dormitorios","Baños","Patio","Terraza","Mascotas","Bidet","Dirección/Área","Link","Score"]
df = df[cols]
return df
# =========================
# Email
# =========================
def build_email(subject: str, sender: str, to_addr: str, body_html: str, attachments: List[Tuple[str, bytes, str]]) -> EmailMessage:
msg = EmailMessage()
msg["Subject"] = subject
msg["From"] = sender
msg["To"] = to_addr
msg.set_content("Este mensaje tiene versión HTML y adjuntos.")
msg.add_alternative(body_html, subtype="html")
for filename, content, mimetype in attachments:
maintype, subtype = (mimetype.split("/", 1) if "/" in mimetype else ("application", "octet-stream"))
msg.add_attachment(content, maintype=maintype, subtype=subtype, filename=filename)
return msg
def send_email(to_addr: str, subject: str, html_body: str, attachments: List[Tuple[str, bytes, str]]) -> str:
if not (SMTP_HOST and SMTP_PORT and SMTP_USER and SMTP_PASS and SMTP_FROM):
return "Error: SMTP no configurado (SMTP_HOST, SMTP_PORT, SMTP_USER, SMTP_PASS, SMTP_FROM)."
if not EMAIL_REGEX.match(to_addr or ""):
return "Error: email destino inválido."
msg = build_email(subject, SMTP_FROM, to_addr, html_body, attachments)
try:
if SMTP_USE_SSL or SMTP_PORT == 465:
context = ssl.create_default_context()
with smtplib.SMTP_SSL(SMTP_HOST, SMTP_PORT, context=context) as server:
server.login(SMTP_USER, SMTP_PASS)
server.send_message(msg)
else:
with smtplib.SMTP(SMTP_HOST, SMTP_PORT) as server:
server.ehlo(); server.starttls(); server.ehlo()
server.login(SMTP_USER, SMTP_PASS)
server.send_message(msg)
return "OK"
except Exception as e:
return f"Error enviando email: {e}"
def df_to_csv_bytes(df: pd.DataFrame) -> bytes:
return df.to_csv(index=False).encode("utf-8")
def json_to_bytes(obj: Any) -> bytes:
return json.dumps(obj, ensure_ascii=False, indent=2).encode("utf-8")
def render_summary_html(df: pd.DataFrame, neighborhoods: List[str], max_usd: int, min_rooms: int, relax_log: List[str]) -> str:
count = len(df)
head = f"<h2>Resultados</h2><p><b>Zonas:</b> {', '.join(neighborhoods)}<br><b>Precio máx.:</b> USD {max_usd}<br><b>Ambientes mín.:</b> {min_rooms}<br><b>Total:</b> {count}</p>"
trace = "<pre style='white-space:pre-wrap;font-size:12px;opacity:.85;'>" + "\n".join(relax_log) + "</pre>"
if count == 0:
return head + "<p>No se encontraron resultados con los filtros actuales.</p>" + trace
top_rows = df.sort_values(by=['Score','Precio USD'], ascending=[False, True]).head(12)
items = []
for _, r in top_rows.iterrows():
flags = " · ".join([k for k in ["Patio","Terraza","Mascotas","Bidet"] if bool(r.get(k))]) or "—"
price = f"USD {int(r['Precio USD'])}" if pd.notna(r['Precio USD']) else "USD —"
addr = r.get("Dirección/Área") or ""
items.append(f"<li><b>{r['Título']}</b> — {price}{addr}{flags} — <a href='{r['Link']}'>Abrir</a></li>")
return head + "<ol>" + "\n".join(items) + "</ol>" + trace
# =========================
# UI (Gradio)
# =========================
DESCRIPTION = """
Meta-buscador multi-portales para casas/PH entre Saavedra y La Lucila y alrededores.
• Filtros: USD ≤ 90k, ≥ 3 ambientes, patio/terraza, mascotas, bidet (si figura en descripción).
• Anti-scraping: headers rotativos, referers, HTTP/2, rate limit con jitter, reintentos con backoff.
• Si no hay resultados, activa auto-relajación escalonada (configurable) y documenta los pasos.
"""
async def run_and_present(neighs, max_usd, types, min_rooms, req_outdoor, req_bidet, req_pet, auto_relax, email_to, send_email_flag):
neighs_list = [n.strip() for n in str(neighs).split(",") if n.strip()]
types_list = [t.strip().lower() for t in str(types).split(",") if t.strip()]
max_usd = int(max_usd); min_rooms = int(min_rooms)
req_outdoor = bool(req_outdoor); req_bidet = bool(req_bidet); req_pet = bool(req_pet); auto_relax = bool(auto_relax)
results, relax_log = await run_agent_with_relax(
neighborhoods=neighs_list, max_price_usd=max_usd, types=types_list,
min_rooms=min_rooms, require_outdoor=req_outdoor, require_bidet=req_bidet, require_pet=req_pet,
auto_relax=auto_relax
)
df = listings_to_df(results)
json_blob = [asdict(l) for l in results]
email_status = "Email no enviado."
if send_email_flag:
if not EMAIL_REGEX.match(email_to or ""):
email_status = "Error: email destino inválido."
else:
html = render_summary_html(df, neighs_list, max_usd, min_rooms, relax_log)
attachments: List[Tuple[str, bytes, str]] = []
if not df.empty:
attachments.append(("resultados.csv", df_to_csv_bytes(df), "text/csv"))
attachments.append(("resultados.json", json_to_bytes(json_blob), "application/json"))
status = send_email(
to_addr=email_to,
subject="Resultados de casas/PH (≤ USD 90k) – Norte BA",
html_body=html,
attachments=attachments
)
email_status = "Enviado" if status == "OK" else status
# Mostrar log en la pestaña de estado
return df, json.dumps(json_blob, ensure_ascii=False, indent=2), " | ".join(relax_log), email_status
with gr.Blocks(title="Meta-buscador Inmuebles Norte BA (≤ USD 90k)") as demo:
gr.Markdown("# Meta-buscador de casas/PH norte BA (≤ 90 000 USD)")
gr.Markdown(DESCRIPTION)
with gr.Row():
neighs = gr.Textbox(label="Barrios (coma separada)", value=", ".join(DEFAULT_NEIGHBORHOODS))
max_usd = gr.Number(label="Precio máx. (USD)", value=DEFAULT_MAX_USD, precision=0)
with gr.Row():
types = gr.Textbox(label="Tipos (coma separada)", value=", ".join(DEFAULT_TYPES))
min_rooms = gr.Number(label="Mínimo ambientes", value=DEFAULT_MIN_ROOMS, precision=0)
with gr.Row():
req_outdoor = gr.Checkbox(label="Requerir patio o terraza", value=REQUIRE_OUTDOOR)
req_bidet = gr.Checkbox(label="Requerir bidet (si aparece en descripción)", value=REQUIRE_BIDET)
req_pet = gr.Checkbox(label="Requerir pet-friendly (si aparece en descripción)", value=REQUIRE_PET_FRIENDLY)
auto_relax = gr.Checkbox(label="Auto-relajar si no hay resultados", value=AUTO_RELAX_ENABLED)
gr.Markdown("### Envío por email al finalizar (opcional)")
with gr.Row():
email_to = gr.Textbox(label="Email destino", placeholder="tu@correo.com")
send_email_flag = gr.Checkbox(label="Enviar email al finalizar", value=True)
btn = gr.Button("Buscar ahora", variant="primary")
with gr.Tabs():
with gr.Tab("Resultados"):
table = gr.Dataframe(interactive=False) # sin args raros
with gr.Tab("JSON"):
j = gr.Code(language="json")
with gr.Tab("Estado"):
trace = gr.Markdown("—")
with gr.Tab("Estado de email"):
status = gr.Markdown("—")
btn.click(
run_and_present,
inputs=[neighs, max_usd, types, min_rooms, req_outdoor, req_bidet, req_pet, auto_relax, email_to, send_email_flag],
outputs=[table, j, trace, status]
)
if __name__ == "__main__":
demo.launch()