Spaces:
Sleeping
Sleeping
| import os | |
| import re | |
| import ssl | |
| import json | |
| import time | |
| import smtplib | |
| import random | |
| import asyncio | |
| from dataclasses import dataclass, asdict | |
| from typing import List, Optional, Dict, Any, Tuple | |
| from email.message import EmailMessage | |
| import urllib.parse as ul | |
| import httpx | |
| from bs4 import BeautifulSoup | |
| from rapidfuzz import fuzz | |
| import pandas as pd | |
| import gradio as gr | |
| # ========================= | |
| # Configuración principal | |
| # ========================= | |
| DEFAULT_MAX_USD = 90000 | |
| DEFAULT_NEIGHBORHOODS = [ | |
| "Saavedra", "Nuñez", "La Lucila", "Florida Oeste", "Munro", "Carapachay", | |
| "Olivos", "Villa Martelli", "Florida", "Vicente López" | |
| ] | |
| DEFAULT_TYPES = ["casa", "ph"] # "casa", "ph" | |
| DEFAULT_MIN_ROOMS = 3 | |
| REQUIRE_BIDET = True | |
| REQUIRE_PET_FRIENDLY = True | |
| REQUIRE_OUTDOOR = True | |
| # Auto-relajación si no hay resultados (escalonada) | |
| AUTO_RELAX_ENABLED = True | |
| RELAX_STEPS = [ | |
| {"require_bidet": False}, # 1) liberar bidet | |
| {"require_pet": False}, # 2) liberar mascotas | |
| {"min_rooms": 2}, # 3) bajar ambientes a 2 | |
| {"require_outdoor": False}, # 4) exterior opcional | |
| {"max_price_usd_delta": 10000}, # 5) subir precio máx. +10k | |
| ] | |
| # Microzonas (boost de score) | |
| MICROZONAS_PRIORITARIAS = [ | |
| "Parque Saavedra", "Parque Sarmiento", "Av. Balbín", "Ruiz Huidobro", | |
| "Lomas de Nuñez", "Cabildo", "Plaza Alberti", | |
| "Estación La Lucila", "Rawson", "Paraná", "Maipú", | |
| "Estación Florida", "Estación Carapachay", "Estación Munro", | |
| "Ugarte", "San Martín", "Panamericana", "Pelliza", "Melo", | |
| ] | |
| # Anti-scraping | |
| USER_AGENT_POOL = [ | |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36", | |
| "Mozilla/5.0 (Macintosh; Intel Mac OS X 13_4) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.4 Safari/605.1.15", | |
| "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36", | |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:126.0) Gecko/20100101 Firefox/126.0", | |
| ] | |
| REFERER_POOL = ["https://www.google.com/", "https://www.bing.com/", "https://duckduckgo.com/"] | |
| TIMEOUT = httpx.Timeout(25.0, connect=12.0) | |
| RETRIES = 2 | |
| BACKOFF_BASE = 0.9 | |
| JITTER_RANGE = (0.13, 0.55) # s | |
| # Rate-limit por dominio | |
| DOMAIN_RATE_LIMIT = { | |
| "www.zonaprop.com.ar": 0.6, | |
| "www.argenprop.com": 0.6, | |
| "www.properati.com.ar": 0.6, | |
| "inmuebles.mercadolibre.com.ar": 0.7, | |
| "inmuebles.clarin.com": 0.8, | |
| "www.soloduenos.com": 0.9, | |
| "mudafy.com.ar": 0.7, | |
| "www.remax.com.ar": 0.9, | |
| "www.enbuenosaires.com": 0.8, | |
| "www.buscatucasa.com.ar": 0.8, | |
| } | |
| # Proxy opcional (configurable en Secrets) | |
| PROXY_URL = os.getenv("PROXY_URL", "").strip() | |
| # Email (configurable en Secrets) | |
| SMTP_HOST = os.getenv("SMTP_HOST", "").strip() | |
| SMTP_PORT = int(os.getenv("SMTP_PORT", "587")) | |
| SMTP_USER = os.getenv("SMTP_USER", "").strip() | |
| SMTP_PASS = os.getenv("SMTP_PASS", "").strip() | |
| SMTP_FROM = os.getenv("SMTP_FROM", SMTP_USER).strip() | |
| SMTP_USE_SSL = os.getenv("SMTP_USE_SSL", "false").lower() in ("1", "true", "yes") | |
| EMAIL_REGEX = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$") | |
| # ========================= | |
| # Modelos y utilidades | |
| # ========================= | |
| class Listing: | |
| source: str | |
| title: str | |
| link: str | |
| price_usd: Optional[float] | |
| currency: Optional[str] | |
| address: Optional[str] | |
| neighborhood: Optional[str] | |
| city: Optional[str] | |
| rooms: Optional[int] | |
| bedrooms: Optional[int] | |
| bathrooms: Optional[int] | |
| has_patio: Optional[bool] | |
| has_terrace: Optional[bool] | |
| pet_friendly: Optional[bool] | |
| has_bidet: Optional[bool] | |
| description: Optional[str] | |
| score: float | |
| def clean_text(s: str) -> str: | |
| return re.sub(r"\s+", " ", (s or "").strip()) | |
| def to_float_price(value: str) -> Optional[float]: | |
| if not value: | |
| return None | |
| txt = value.replace(".", "").replace(",", ".").upper() | |
| if any(k in txt for k in ["USD", "U$S", "US$", "DOLAR", "U$D"]): | |
| m = re.search(r"(\d+(?:\.\d+)?)", txt) | |
| return float(m.group(1)) if m else None | |
| return None | |
| def extract_int_from(text: str, pattern: str) -> Optional[int]: | |
| if not text: | |
| return None | |
| m = re.search(pattern, text) | |
| return int(m.group(1)) if m else None | |
| def fuzzy_any(text: str, keywords: List[str], thresh: int = 80) -> bool: | |
| if not text: | |
| return False | |
| t = text.lower() | |
| return any(fuzz.partial_ratio(t, kw.lower()) >= thresh for kw in keywords) | |
| def feature_guess(desc: str) -> Tuple[Optional[bool], Optional[bool], Optional[bool], Optional[bool]]: | |
| patio = fuzzy_any(desc, ["patio", "patio propio", "patio descubierto", "fondo", "jardín"]) | |
| terraza = fuzzy_any(desc, ["terraza", "terraza propia", "terraza transitable", "azotea"]) | |
| mascotas = fuzzy_any(desc, ["se aceptan mascotas", "pet friendly", "apta mascotas", "mascotas"]) | |
| bidet = fuzzy_any(desc, ["bidet"]) | |
| return patio or None, terraza or None, mascotas or None, bidet or None | |
| def residential_score(address: str, neighborhood: str, desc: str) -> float: | |
| text = " ".join([address or "", neighborhood or "", desc or ""]) | |
| boost = 0.0 | |
| for kw in MICROZONAS_PRIORITARIAS: | |
| if fuzz.partial_ratio(text.lower(), kw.lower()) >= 80: | |
| boost += 0.5 | |
| return min(boost, 2.0) | |
| def compute_score(lst: Listing, filters: Dict[str, Any]) -> float: | |
| score = 0.0 | |
| if lst.price_usd is not None and lst.price_usd <= filters["max_price_usd"]: | |
| score += 1.0 | |
| score += (filters["max_price_usd"] - lst.price_usd) / max(filters["max_price_usd"], 1) * 1.0 | |
| if lst.rooms and lst.rooms >= filters["min_rooms"]: | |
| score += 1.0 | |
| if filters["require_outdoor"] and (lst.has_patio or lst.has_terrace): | |
| score += 1.0 | |
| if filters["require_pet"]: | |
| score += 0.6 if lst.pet_friendly else 0.0 | |
| else: | |
| score += 0.2 | |
| if filters["require_bidet"]: | |
| score += 0.6 if lst.has_bidet else 0.0 | |
| else: | |
| score += 0.2 | |
| score += residential_score(lst.address or "", lst.neighborhood or "", lst.description or "") | |
| return round(score, 3) | |
| # ========================= | |
| # Anti-scraping helpers | |
| # ========================= | |
| _last_hit: Dict[str, float] = {} | |
| def make_headers() -> Dict[str, str]: | |
| return { | |
| "User-Agent": random.choice(USER_AGENT_POOL), | |
| "Accept-Language": "es-AR,es;q=0.9,en;q=0.8", | |
| "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", | |
| "Referer": random.choice(REFERER_POOL), | |
| "Cache-Control": "no-cache", | |
| "Pragma": "no-cache", | |
| "DNT": "1", | |
| } | |
| async def domain_throttle(domain: str): | |
| now = time.time() | |
| gap = DOMAIN_RATE_LIMIT.get(domain, 0.5) | |
| last = _last_hit.get(domain, 0.0) | |
| wait = (last + gap) - now | |
| if wait > 0: | |
| await asyncio.sleep(wait) | |
| _last_hit[domain] = time.time() | |
| await asyncio.sleep(random.uniform(*JITTER_RANGE)) | |
| async def fetch(url: str) -> Optional[str]: | |
| proxies = {"all://": PROXY_URL} if PROXY_URL else None | |
| domain = ul.urlparse(url).netloc | |
| for i in range(RETRIES + 1): | |
| await domain_throttle(domain) | |
| try: | |
| async with httpx.AsyncClient(follow_redirects=True, http2=True, proxies=proxies, timeout=TIMEOUT) as client: | |
| r = await client.get(url, headers=make_headers()) | |
| # aceptamos HTML corto; algunos portales entregan SSR mínimo | |
| if r.status_code == 200 and r.text: | |
| return r.text | |
| await asyncio.sleep(BACKOFF_BASE * (2 ** i) + random.uniform(0, 0.35)) | |
| except Exception: | |
| await asyncio.sleep(BACKOFF_BASE * (2 ** i) + random.uniform(0, 0.35)) | |
| return None | |
| # ========================= | |
| # Portales | |
| # ========================= | |
| class Portal: | |
| def __init__(self, domain: str, search_builder): | |
| self.domain = domain | |
| self.search_builder = search_builder # fn(neighs, max_usd, types)->[urls] | |
| def sb_qparam(base: str, param: str = "q"): | |
| def _builder(neighs: List[str], max_usd: int, types: List[str]) -> List[str]: | |
| urls = [] | |
| syn_outdoor = ["patio", "terraza", "exterior"] | |
| syn_pets = ["mascotas", "pet friendly"] | |
| rooms_variants = ["3 ambientes", "tres ambientes"] | |
| for n in neighs: | |
| for o in syn_outdoor: | |
| q = f"{' o '.join(types)} venta {n} hasta {max_usd} dolares {random.choice(rooms_variants)} {o} {random.choice(syn_pets)} bidet" | |
| urls.append(f"{base}?{param}={ul.quote(q)}") | |
| return urls | |
| return _builder | |
| PORTALS: List[Portal] = [ | |
| Portal("www.zonaprop.com.ar", sb_qparam("https://www.zonaprop.com.ar/propiedades.html", "q")), | |
| Portal("www.argenprop.com", sb_qparam("https://www.argenprop.com/propiedades", "text")), | |
| Portal("www.properati.com.ar", sb_qparam("https://www.properati.com.ar/s/venta/propiedades", "q")), | |
| Portal("inmuebles.mercadolibre.com.ar", sb_qparam("https://inmuebles.mercadolibre.com.ar", "as_word")), | |
| Portal("inmuebles.clarin.com", sb_qparam("https://inmuebles.clarin.com/listado", "q")), | |
| Portal("www.soloduenos.com", sb_qparam("https://www.soloduenos.com/buscar", "q")), | |
| Portal("mudafy.com.ar", sb_qparam("https://mudafy.com.ar/propiedades", "q")), | |
| Portal("www.remax.com.ar", sb_qparam("https://www.remax.com.ar/listings", "q")), | |
| Portal("www.enbuenosaires.com", sb_qparam("https://www.enbuenosaires.com/buscar", "q")), | |
| Portal("www.buscatucasa.com.ar", sb_qparam("https://www.buscatucasa.com.ar/buscar", "q")), | |
| ] | |
| ANCHOR_TOKENS = [ | |
| "propiedad", "inmueble", "inmuebles", "departamento", "casa", "ph", | |
| "detalle", "item", "listing", "publicacion", "aviso", "MLA-" | |
| ] | |
| def generic_card_extractor(soup: BeautifulSoup, domain: str) -> List[Dict[str, Any]]: | |
| anchors = soup.select("a[href]") | |
| seen = set() | |
| cards = [] | |
| for a in anchors: | |
| href = a.get("href", "") | |
| if not href: | |
| continue | |
| # normalizar absoluto | |
| if href.startswith("//"): | |
| href = "https:" + href | |
| elif href.startswith("/"): | |
| href = f"https://{domain}{href}" | |
| # solo mismo dominio | |
| if domain not in href: | |
| continue | |
| # filtrar rutas no relevantes | |
| if any(x in href for x in ["/login", "/perfil", "/ayuda", "/faq", "/favorito", "/mi-cuenta"]): | |
| continue | |
| # heurística de “parece aviso” | |
| if not any(tok in href.lower() for tok in [t.lower() for t in ANCHOR_TOKENS]): | |
| continue | |
| # no duplicados | |
| if href in seen: | |
| continue | |
| seen.add(href) | |
| title = clean_text(a.get_text(" ", strip=True)) | |
| if len(title) < 8: | |
| # algunos sitios tienen título en contenedor padre | |
| parent = a.find_parent() | |
| if parent: | |
| title = clean_text(parent.get_text(" ", strip=True))[:160] | |
| # texto de bloque cercano | |
| parent = a.find_parent() | |
| block_text = clean_text(parent.get_text(" ", strip=True)) if parent else "" | |
| m = re.search(r"(U\$S|USD|US\$|D[oó]lares?)\s*([\d\.\,]+)", block_text, re.IGNORECASE) | |
| price_text = m.group(0) if m else "" | |
| addr_m = re.search(r"(Saavedra|Nu[eñ]ez|La Lucila|Florida(?: Oeste)?|Munro|Carapachay|Olivos|Martelli|Vicente L[oó]pez)[^|,]*", block_text, re.IGNORECASE) | |
| addr_text = addr_m.group(0) if addr_m else "" | |
| cards.append({ | |
| "title": title[:160], | |
| "link": href, | |
| "price_text": price_text, | |
| "addr_text": addr_text | |
| }) | |
| return cards[:50] | |
| async def scrape_search_page(url: str, domain: str) -> List[Listing]: | |
| html = await fetch(url) | |
| if not html: | |
| return [] | |
| soup = BeautifulSoup(html, "lxml") | |
| raw = generic_card_extractor(soup, domain) | |
| out: List[Listing] = [] | |
| for c in raw: | |
| price = to_float_price(c.get("price_text", "")) | |
| out.append(Listing( | |
| source=domain, | |
| title=clean_text(c.get("title", ""))[:160], | |
| link=c.get("link", ""), | |
| price_usd=price, | |
| currency="USD" if price is not None else None, | |
| address=c.get("addr_text", ""), | |
| neighborhood=None, | |
| city="Vicente López / CABA", | |
| rooms=None, bedrooms=None, bathrooms=None, | |
| has_patio=None, has_terrace=None, pet_friendly=None, has_bidet=None, | |
| description=None, | |
| score=0.0 | |
| )) | |
| return out | |
| async def scrape_portal(portal: Portal, neighborhoods: List[str], max_usd: int, types: List[str]) -> List[Listing]: | |
| urls = portal.search_builder(neighborhoods, max_usd, types) | |
| results: List[Listing] = [] | |
| for u in urls[:6]: # primeras 6 queries permutadas | |
| try: | |
| res = await scrape_search_page(u, portal.domain) | |
| results.extend(res) | |
| except Exception: | |
| pass | |
| return results | |
| async def enrich_listing(lst: Listing) -> Listing: | |
| html = await fetch(lst.link) | |
| if not html: | |
| return lst | |
| soup = BeautifulSoup(html, "lxml") | |
| # Descripción | |
| desc_el = soup.find(["div", "section"], attrs={"class": re.compile(r"(description|descripcion|post|body|texto|descripcion-larga)")}) or soup.find("p") | |
| desc = clean_text(desc_el.get_text(" ", strip=True)) if desc_el else clean_text(" ".join(x.get_text(" ", strip=True) for x in soup.find_all(["p", "li"])[:60])) | |
| # Inferencias | |
| patio, terraza, mascotas, bidet = feature_guess(desc) | |
| # Características | |
| feat_text = " ".join( | |
| el.get_text(" ", strip=True) for el in soup.find_all(["li", "span", "div"]) | |
| if el and el.get_text() and any(x in el.get_text().lower() for x in ["ambiente", "dorm", "bañ"]) | |
| ).lower() | |
| # también mirar el título | |
| coarse = (lst.title + " " + desc).lower() | |
| rooms = extract_int_from(feat_text, r"(\d+)\s*ambiente") or extract_int_from(coarse, r"(\d+)\s*amb") | |
| bathrooms = extract_int_from(feat_text, r"(\d+)\s*bañ") or extract_int_from(coarse, r"(\d+)\s*bañ") | |
| bedrooms = extract_int_from(feat_text, r"(\d+)\s*dorm") or extract_int_from(coarse, r"(\d+)\s*dormi") | |
| # Dirección | |
| addr_guess = soup.find(attrs={"class": re.compile(r"(address|ubicacion|ubicación|location|inmo-location)")}) | |
| if addr_guess and not lst.address: | |
| lst.address = clean_text(addr_guess.get_text(" ", strip=True))[:200] | |
| lst.description = desc or lst.description | |
| lst.has_patio = lst.has_patro if hasattr(lst, "has_patro") else lst.has_patio # guard | |
| lst.has_patio = lst.has_patio if lst.has_patio is not None else patio | |
| lst.has_terrace = lst.has_terrace if lst.has_terrace is not None else terraza | |
| lst.pet_friendly = lst.pet_friendly if lst.pet_friendly is not None else mascotas | |
| lst.has_bidet = lst.has_bidet if lst.has_bidet is not None else bidet | |
| lst.rooms = lst.rooms or rooms | |
| lst.bathrooms = lst.bathrooms or bathrooms | |
| lst.bedrooms = lst.bedrooms or bedrooms | |
| return lst | |
| # ========================= | |
| # Orquestación | |
| # ========================= | |
| def canon(url: str) -> str: | |
| try: | |
| parsed = ul.urlparse(url) | |
| q = ul.parse_qsl(parsed.query) | |
| q = [(k, v) for (k, v) in q if k.lower() not in {"utm_source", "utm_medium", "utm_campaign", "gclid", "s", "utm_term", "utm_content"}] | |
| new_q = ul.urlencode(q, doseq=True) | |
| return ul.urlunparse((parsed.scheme, parsed.netloc, parsed.path, "", new_q, "")) | |
| except Exception: | |
| return url | |
| async def run_agent_once(neighborhoods: List[str], max_price_usd: int, types: List[str], | |
| min_rooms: int, require_outdoor: bool, require_bidet: bool, require_pet: bool) -> Tuple[List[Listing], str]: | |
| filters = dict( | |
| max_price_usd=max_price_usd, | |
| min_rooms=min_rooms, | |
| require_outdoor=require_outdoor, | |
| require_bidet=require_bidet, | |
| require_pet=require_pet, | |
| ) | |
| # 1) Multi-portal | |
| tasks = [scrape_portal(p, neighborhoods, max_price_usd, types) for p in PORTALS] | |
| batch = await asyncio.gather(*tasks) | |
| listings = [l for sub in batch for l in sub] | |
| # 2) Dedup | |
| seen = set() | |
| unique: List[Listing] = [] | |
| for l in listings: | |
| key = canon(l.link) | |
| if key in seen: | |
| continue | |
| seen.add(key) | |
| l.link = key | |
| unique.append(l) | |
| # 3) Enriquecer | |
| sem = asyncio.Semaphore(8) | |
| async def guard(item: Listing): | |
| async with sem: | |
| enriched = await enrich_listing(item) | |
| await asyncio.sleep(random.uniform(*JITTER_RANGE)) | |
| return enriched | |
| enriched = await asyncio.gather(*[guard(l) for l in unique]) | |
| # 4) Filtrar (tolerante: None no bloquea salvo que se exija explícito) | |
| def passes(l: Listing) -> bool: | |
| if l.price_usd is None or l.price_usd > max_price_usd: | |
| return False | |
| if l.rooms is not None and l.rooms < min_rooms: | |
| return False | |
| if require_outdoor and not ((l.has_patio is True) or (l.has_terrace is True)): | |
| return False | |
| if require_bidet and l.has_bidet is not True: | |
| return False | |
| if require_pet and l.pet_friendly is not True: | |
| return False | |
| # tipo tolerante | |
| mix = (l.title + " " + (l.description or "")).lower() | |
| if not any(t in mix for t in types): | |
| pass | |
| return True | |
| filtered = [l for l in enriched if passes(l)] | |
| # 5) Score + Orden | |
| for l in filtered: | |
| l.score = compute_score(l, filters) | |
| filtered.sort(key=lambda x: (-x.score, x.price_usd or 1e9)) | |
| # Trace | |
| trace = f"Portales: {len(PORTALS)} | Crudos: {len(listings)} | Únicos: {len(unique)} | Enriquecidos: {len(enriched)} | Final: {len(filtered)}" | |
| return filtered, trace | |
| async def run_agent_with_relax(neighborhoods: List[str], max_price_usd: int, types: List[str], | |
| min_rooms: int, require_outdoor: bool, require_bidet: bool, require_pet: bool, | |
| auto_relax: bool = True) -> Tuple[List[Listing], List[str]]: | |
| log = [] | |
| results, trace = await run_agent_once(neighborhoods, max_price_usd, types, min_rooms, require_outdoor, require_bidet, require_pet) | |
| log.append(f"[Base] {trace}") | |
| if results or not auto_relax: | |
| return results, log | |
| # no hay resultados: probar escalonado | |
| base = dict( | |
| neighborhoods=neighborhoods, max_price_usd=max_price_usd, types=types, | |
| min_rooms=min_rooms, require_outdoor=require_outdoor, require_bidet=require_bidet, require_pet=require_pet | |
| ) | |
| price = max_price_usd | |
| for i, step in enumerate(RELAX_STEPS, 1): | |
| mr = step.get("min_rooms", base["min_rooms"]) | |
| ro = step.get("require_outdoor", base["require_outdoor"]) | |
| rb = step.get("require_bidet", base["require_bidet"]) | |
| rp = step.get("require_pet", base["require_pet"]) | |
| if "max_price_usd_delta" in step: | |
| price = max_price_usd + step["max_price_usd_delta"] | |
| log.append(f"[Relax {i}] rooms={mr} outdoor={ro} bidet={rb} pet={rp} price_max=USD {price}") | |
| results, trace = await run_agent_once(neighborhoods, price, types, mr, ro, rb, rp) | |
| log.append(f"[Relax {i}] {trace}") | |
| if results: | |
| return results, log | |
| return results, log | |
| def listings_to_df(listings: List[Listing]) -> pd.DataFrame: | |
| rows = [] | |
| for l in listings: | |
| rows.append({ | |
| "Fuente": l.source.replace("www.", ""), | |
| "Título": l.title, | |
| "Precio USD": l.price_usd, | |
| "Ambientes": l.rooms, | |
| "Dormitorios": l.bedrooms, | |
| "Baños": l.bathrooms, | |
| "Patio": l.has_patio, | |
| "Terraza": l.has_terrace, | |
| "Mascotas": l.pet_friendly, | |
| "Bidet": l.has_bidet, | |
| "Dirección/Área": l.address, | |
| "Link": l.link, | |
| "Score": l.score | |
| }) | |
| df = pd.DataFrame(rows) | |
| if not df.empty: | |
| cols = ["Fuente","Título","Precio USD","Ambientes","Dormitorios","Baños","Patio","Terraza","Mascotas","Bidet","Dirección/Área","Link","Score"] | |
| df = df[cols] | |
| return df | |
| # ========================= | |
| # ========================= | |
| def build_email(subject: str, sender: str, to_addr: str, body_html: str, attachments: List[Tuple[str, bytes, str]]) -> EmailMessage: | |
| msg = EmailMessage() | |
| msg["Subject"] = subject | |
| msg["From"] = sender | |
| msg["To"] = to_addr | |
| msg.set_content("Este mensaje tiene versión HTML y adjuntos.") | |
| msg.add_alternative(body_html, subtype="html") | |
| for filename, content, mimetype in attachments: | |
| maintype, subtype = (mimetype.split("/", 1) if "/" in mimetype else ("application", "octet-stream")) | |
| msg.add_attachment(content, maintype=maintype, subtype=subtype, filename=filename) | |
| return msg | |
| def send_email(to_addr: str, subject: str, html_body: str, attachments: List[Tuple[str, bytes, str]]) -> str: | |
| if not (SMTP_HOST and SMTP_PORT and SMTP_USER and SMTP_PASS and SMTP_FROM): | |
| return "Error: SMTP no configurado (SMTP_HOST, SMTP_PORT, SMTP_USER, SMTP_PASS, SMTP_FROM)." | |
| if not EMAIL_REGEX.match(to_addr or ""): | |
| return "Error: email destino inválido." | |
| msg = build_email(subject, SMTP_FROM, to_addr, html_body, attachments) | |
| try: | |
| if SMTP_USE_SSL or SMTP_PORT == 465: | |
| context = ssl.create_default_context() | |
| with smtplib.SMTP_SSL(SMTP_HOST, SMTP_PORT, context=context) as server: | |
| server.login(SMTP_USER, SMTP_PASS) | |
| server.send_message(msg) | |
| else: | |
| with smtplib.SMTP(SMTP_HOST, SMTP_PORT) as server: | |
| server.ehlo(); server.starttls(); server.ehlo() | |
| server.login(SMTP_USER, SMTP_PASS) | |
| server.send_message(msg) | |
| return "OK" | |
| except Exception as e: | |
| return f"Error enviando email: {e}" | |
| def df_to_csv_bytes(df: pd.DataFrame) -> bytes: | |
| return df.to_csv(index=False).encode("utf-8") | |
| def json_to_bytes(obj: Any) -> bytes: | |
| return json.dumps(obj, ensure_ascii=False, indent=2).encode("utf-8") | |
| def render_summary_html(df: pd.DataFrame, neighborhoods: List[str], max_usd: int, min_rooms: int, relax_log: List[str]) -> str: | |
| count = len(df) | |
| head = f"<h2>Resultados</h2><p><b>Zonas:</b> {', '.join(neighborhoods)}<br><b>Precio máx.:</b> USD {max_usd}<br><b>Ambientes mín.:</b> {min_rooms}<br><b>Total:</b> {count}</p>" | |
| trace = "<pre style='white-space:pre-wrap;font-size:12px;opacity:.85;'>" + "\n".join(relax_log) + "</pre>" | |
| if count == 0: | |
| return head + "<p>No se encontraron resultados con los filtros actuales.</p>" + trace | |
| top_rows = df.sort_values(by=['Score','Precio USD'], ascending=[False, True]).head(12) | |
| items = [] | |
| for _, r in top_rows.iterrows(): | |
| flags = " · ".join([k for k in ["Patio","Terraza","Mascotas","Bidet"] if bool(r.get(k))]) or "—" | |
| price = f"USD {int(r['Precio USD'])}" if pd.notna(r['Precio USD']) else "USD —" | |
| addr = r.get("Dirección/Área") or "" | |
| items.append(f"<li><b>{r['Título']}</b> — {price} — {addr} — {flags} — <a href='{r['Link']}'>Abrir</a></li>") | |
| return head + "<ol>" + "\n".join(items) + "</ol>" + trace | |
| # ========================= | |
| # UI (Gradio) | |
| # ========================= | |
| DESCRIPTION = """ | |
| Meta-buscador multi-portales para casas/PH entre Saavedra y La Lucila y alrededores. | |
| • Filtros: USD ≤ 90k, ≥ 3 ambientes, patio/terraza, mascotas, bidet (si figura en descripción). | |
| • Anti-scraping: headers rotativos, referers, HTTP/2, rate limit con jitter, reintentos con backoff. | |
| • Si no hay resultados, activa auto-relajación escalonada (configurable) y documenta los pasos. | |
| """ | |
| async def run_and_present(neighs, max_usd, types, min_rooms, req_outdoor, req_bidet, req_pet, auto_relax, email_to, send_email_flag): | |
| neighs_list = [n.strip() for n in str(neighs).split(",") if n.strip()] | |
| types_list = [t.strip().lower() for t in str(types).split(",") if t.strip()] | |
| max_usd = int(max_usd); min_rooms = int(min_rooms) | |
| req_outdoor = bool(req_outdoor); req_bidet = bool(req_bidet); req_pet = bool(req_pet); auto_relax = bool(auto_relax) | |
| results, relax_log = await run_agent_with_relax( | |
| neighborhoods=neighs_list, max_price_usd=max_usd, types=types_list, | |
| min_rooms=min_rooms, require_outdoor=req_outdoor, require_bidet=req_bidet, require_pet=req_pet, | |
| auto_relax=auto_relax | |
| ) | |
| df = listings_to_df(results) | |
| json_blob = [asdict(l) for l in results] | |
| email_status = "Email no enviado." | |
| if send_email_flag: | |
| if not EMAIL_REGEX.match(email_to or ""): | |
| email_status = "Error: email destino inválido." | |
| else: | |
| html = render_summary_html(df, neighs_list, max_usd, min_rooms, relax_log) | |
| attachments: List[Tuple[str, bytes, str]] = [] | |
| if not df.empty: | |
| attachments.append(("resultados.csv", df_to_csv_bytes(df), "text/csv")) | |
| attachments.append(("resultados.json", json_to_bytes(json_blob), "application/json")) | |
| status = send_email( | |
| to_addr=email_to, | |
| subject="Resultados de casas/PH (≤ USD 90k) – Norte BA", | |
| html_body=html, | |
| attachments=attachments | |
| ) | |
| email_status = "Enviado" if status == "OK" else status | |
| # Mostrar log en la pestaña de estado | |
| return df, json.dumps(json_blob, ensure_ascii=False, indent=2), " | ".join(relax_log), email_status | |
| with gr.Blocks(title="Meta-buscador Inmuebles Norte BA (≤ USD 90k)") as demo: | |
| gr.Markdown("# Meta-buscador de casas/PH norte BA (≤ 90 000 USD)") | |
| gr.Markdown(DESCRIPTION) | |
| with gr.Row(): | |
| neighs = gr.Textbox(label="Barrios (coma separada)", value=", ".join(DEFAULT_NEIGHBORHOODS)) | |
| max_usd = gr.Number(label="Precio máx. (USD)", value=DEFAULT_MAX_USD, precision=0) | |
| with gr.Row(): | |
| types = gr.Textbox(label="Tipos (coma separada)", value=", ".join(DEFAULT_TYPES)) | |
| min_rooms = gr.Number(label="Mínimo ambientes", value=DEFAULT_MIN_ROOMS, precision=0) | |
| with gr.Row(): | |
| req_outdoor = gr.Checkbox(label="Requerir patio o terraza", value=REQUIRE_OUTDOOR) | |
| req_bidet = gr.Checkbox(label="Requerir bidet (si aparece en descripción)", value=REQUIRE_BIDET) | |
| req_pet = gr.Checkbox(label="Requerir pet-friendly (si aparece en descripción)", value=REQUIRE_PET_FRIENDLY) | |
| auto_relax = gr.Checkbox(label="Auto-relajar si no hay resultados", value=AUTO_RELAX_ENABLED) | |
| gr.Markdown("### Envío por email al finalizar (opcional)") | |
| with gr.Row(): | |
| email_to = gr.Textbox(label="Email destino", placeholder="tu@correo.com") | |
| send_email_flag = gr.Checkbox(label="Enviar email al finalizar", value=True) | |
| btn = gr.Button("Buscar ahora", variant="primary") | |
| with gr.Tabs(): | |
| with gr.Tab("Resultados"): | |
| table = gr.Dataframe(interactive=False) # sin args raros | |
| with gr.Tab("JSON"): | |
| j = gr.Code(language="json") | |
| with gr.Tab("Estado"): | |
| trace = gr.Markdown("—") | |
| with gr.Tab("Estado de email"): | |
| status = gr.Markdown("—") | |
| btn.click( | |
| run_and_present, | |
| inputs=[neighs, max_usd, types, min_rooms, req_outdoor, req_bidet, req_pet, auto_relax, email_to, send_email_flag], | |
| outputs=[table, j, trace, status] | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() | |