Spaces:
Runtime error
Runtime error
| from __future__ import annotations | |
| import asyncio | |
| import json | |
| import logging | |
| import math | |
| import os | |
| import sqlite3 | |
| import sys | |
| import time | |
| from datetime import datetime, timedelta | |
| from typing import Any, Callable, Dict, List, Optional, Tuple | |
| import uvicorn | |
| from fastapi import ( | |
| APIRouter, | |
| Depends, | |
| FastAPI, | |
| HTTPException, | |
| Request, | |
| Security, | |
| status, | |
| WebSocket, | |
| WebSocketDisconnect, | |
| ) | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import HTMLResponse | |
| from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm | |
| from jose import JWTError, jwt | |
| from passlib.context import CryptContext | |
| from pydantic import BaseModel, BaseSettings, Field | |
| from sqlalchemy import ( | |
| Boolean, | |
| Column, | |
| DateTime, | |
| Float, | |
| Integer, | |
| LargeBinary, | |
| MetaData, | |
| String, | |
| Table, | |
| create_engine, | |
| select, | |
| ) | |
| from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, create_async_engine | |
| from sqlalchemy.orm import sessionmaker | |
| try: | |
| from sentence_transformers import SentenceTransformer | |
| _HAS_SBERT = True | |
| except Exception: | |
| _HAS_SBERT = False | |
| class Settings(BaseSettings): | |
| APP_NAME: str = "Samuel‑Prod" | |
| DEBUG: bool = os.getenv("SAMUEL_DEBUG", "0") == "1" | |
| HOST: str = "0.0.0.0" | |
| PORT: int = int(os.getenv("SAMUEL_PORT", "8000")) | |
| DB_URL: str = "sqlite+aiosqlite:///samuel.db" | |
| JWT_SECRET_KEY: str = os.getenv("JWT_SECRET_KEY", "CHANGE_ME_IN_PROD") | |
| JWT_ALGORITHM: str = "HS256" | |
| ACCESS_TOKEN_EXPIRE_MINUTES: int = 60 * 24 | |
| VECTOR_DIM: int = int(os.getenv("SAMUEL_VECTOR_DIM", "384")) | |
| SHORT_TERM_TTL: int = int(os.getenv("SAMUEL_SHORT_TERM_TTL", "3600")) | |
| RATE_LIMIT: int = int(os.getenv("SAMUEL_RATE_LIMIT", "30")) | |
| CORS_ORIGINS: List[str] = ["*"] | |
| class Config: | |
| env_file = ".env" | |
| case_sensitive = False | |
| settings = Settings() | |
| log = logging.getLogger("samuel") | |
| handler = logging.StreamHandler(sys.stdout) | |
| handler.setFormatter( | |
| logging.Formatter("[%(asctime)s] %(levelname)s %(name)s %(message)s") | |
| ) | |
| log.addHandler(handler) | |
| log.setLevel(logging.DEBUG if settings.DEBUG else logging.INFO) | |
| metadata = MetaData() | |
| users_table = Table( | |
| "users", | |
| metadata, | |
| Column("id", Integer, primary_key=True), | |
| Column("username", String, unique=True, nullable=False), | |
| Column("email", String, unique=True, nullable=False), | |
| Column("hashed_password", LargeBinary, nullable=False), | |
| Column("is_active", Boolean, default=True), | |
| Column("created_at", DateTime, default=datetime.utcnow), | |
| ) | |
| memory_table = Table( | |
| "memory", | |
| metadata, | |
| Column("id", Integer, primary_key=True, autoincrement=True), | |
| Column("user_id", Integer, nullable=False), | |
| Column("kind", String, nullable=False), | |
| Column("content", String, nullable=False), | |
| Column("metadata", String, default="{}"), | |
| Column("created_at", DateTime, default=datetime.utcnow), | |
| Column("expires_at", DateTime, nullable=True), | |
| ) | |
| listings_table = Table( | |
| "listings", | |
| metadata, | |
| Column("id", String, primary_key=True), | |
| Column("source", String), | |
| Column("title", String), | |
| Column("price", Integer), | |
| Column("surface", Integer), | |
| Column("rooms", Integer), | |
| Column("url", String), | |
| Column("score", Float, default=0.0), | |
| Column("fetched_at", DateTime, default=datetime.utcnow), | |
| ) | |
| engine: AsyncEngine = create_async_engine( | |
| settings.DB_URL, echo=settings.DEBUG, future=True | |
| ) | |
| async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) | |
| pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") | |
| oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/token") | |
| def verify_password(plain: str, hashed: bytes) -> bool: | |
| return pwd_context.verify(plain, hashed) | |
| def get_password_hash(password: str) -> bytes: | |
| return pwd_context.hash(password) | |
| def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str: | |
| to_encode = data.copy() | |
| expire = datetime.utcnow() + ( | |
| expires_delta or timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES) | |
| ) | |
| to_encode.update({"exp": expire}) | |
| return jwt.encode(to_encode, settings.JWT_SECRET_KEY, algorithm=settings.JWT_ALGORITHM) | |
| async def get_user_by_username(db: AsyncSession, username: str) -> Optional[dict]: | |
| stmt = select(users_table).where(users_table.c.username == username) | |
| result = await db.execute(stmt) | |
| row = result.fetchone() | |
| return dict(row) if row else None | |
| async def get_user_by_id(db: AsyncSession, user_id: int) -> Optional[dict]: | |
| stmt = select(users_table).where(users_table.c.id == user_id) | |
| result = await db.execute(stmt) | |
| row = result.fetchone() | |
| return dict(row) if row else None | |
| async def get_current_user( | |
| token: str = Depends(oauth2_scheme), db: AsyncSession = Depends(async_session) | |
| ) -> dict: | |
| credentials_exception = HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Could not validate credentials", | |
| headers={"WWW-Authenticate": "Bearer"}, | |
| ) | |
| try: | |
| payload = jwt.decode( | |
| token, settings.JWT_SECRET_KEY, algorithms=[settings.JWT_ALGORITHM] | |
| ) | |
| user_id: int = int(payload.get("sub")) | |
| if user_id is None: | |
| raise credentials_exception | |
| except (JWTError, ValueError): | |
| raise credentials_exception | |
| user = await get_user_by_id(db, user_id) | |
| if user is None or not user["is_active"]: | |
| raise credentials_exception | |
| return user | |
| class RateLimiter: | |
| def __init__(self, max_requests: int, period_seconds: int = 60): | |
| self.max = max_requests | |
| self.period = period_seconds | |
| self.tokens: Dict[int, Tuple[int, float]] = {} | |
| self.lock = asyncio.Lock() | |
| async def allow(self, user_id: int) -> bool: | |
| async with self.lock: | |
| now = time.time() | |
| remaining, reset = self.tokens.get(user_id, (self.max, now + self.period)) | |
| if now > reset: | |
| remaining = self.max | |
| reset = now + self.period | |
| if remaining <= 0: | |
| return False | |
| self.tokens[user_id] = (remaining - 1, reset) | |
| return True | |
| rate_limiter = RateLimiter(settings.RATE_LIMIT) | |
| class EventBus: | |
| def __init__(self) -> None: | |
| self._subs: Dict[str, List[Callable[..., Any]]] = defaultdict(list) | |
| def subscribe(self, topic: str, coro: Callable[..., Any]) -> None: | |
| self._subs[topic].append(coro) | |
| async def publish(self, topic: str, *args, **kwargs) -> None: | |
| for fn in self._subs.get(topic, []): | |
| try: | |
| res = fn(*args, **kwargs) | |
| if asyncio.iscoroutine(res): | |
| asyncio.create_task(res) | |
| except Exception: | |
| log.exception("EventBus error on %s", topic) | |
| class WSManager: | |
| def __init__(self) -> None: | |
| self._conns: List[WebSocket] = [] | |
| self._lock = asyncio.Lock() | |
| async def connect(self, ws: WebSocket) -> None: | |
| await ws.accept() | |
| async with self._lock: | |
| self._conns.append(ws) | |
| async def disconnect(self, ws: WebSocket) -> None: | |
| async with self._lock: | |
| if ws in self._conns: | |
| self._conns.remove(ws) | |
| async def broadcast(self, payload: dict) -> None: | |
| text = json.dumps(payload, default=str, ensure_ascii=False) | |
| async with self._lock: | |
| conns = list(self._conns) | |
| for ws in conns: | |
| try: | |
| await ws.send_text(text) | |
| except Exception: | |
| await self.disconnect(ws) | |
| ws_manager = WSManager() | |
| class EmbeddingProvider: | |
| def __init__(self) -> None: | |
| self.model = None | |
| if _HAS_SBERT: | |
| try: | |
| self.model = SentenceTransformer("all-MiniLM-L6-v2") | |
| log.info("Loaded SBERT") | |
| except Exception: | |
| log.info("SBERT load failed") | |
| if not self.model: | |
| log.info("Using hash fallback") | |
| def dim(self) -> int: | |
| if self.model: | |
| try: | |
| return self.model.get_sentence_embedding_dimension() | |
| except Exception: | |
| return settings.VECTOR_DIM | |
| return settings.VECTOR_DIM | |
| def embed(self, texts: List[str]) -> List[List[float]]: | |
| if self.model: | |
| arr = self.model.encode( | |
| texts, convert_to_numpy=True, show_progress_bar=False | |
| ) | |
| return arr.tolist() | |
| out = [] | |
| for txt in texts: | |
| h = abs(hash(txt)) | |
| vec = [((h >> (i % 32)) & 0xFF) / 255.0 for i in range(self.dim())] | |
| out.append(vec) | |
| return out | |
| class SimpleVectorStore: | |
| def __init__(self, dim: int = settings.VECTOR_DIM) -> None: | |
| self.dim = dim | |
| self._vectors: Dict[int, List[float]] = {} | |
| self._lock = asyncio.Lock() | |
| async def _norm(self, v: List[float]) -> List[float]: | |
| s = math.sqrt(sum(x * x for x in v)) or 1.0 | |
| return [x / s for x in v] | |
| async def add(self, mid: int, vec: List[float]) -> None: | |
| async with self._lock: | |
| self._vectors[mid] = await self._norm(vec) | |
| async def remove(self, mid: int) -> None: | |
| async with self._lock: | |
| self._vectors.pop(mid, None) | |
| async def search(self, vec: List[float], top_k: int = 5) -> List[Tuple[int, float]]: | |
| q = await self._norm(vec) | |
| async with self._lock: | |
| sims = [ | |
| (mid, sum(a * b for a, b in zip(q, stored))) | |
| for mid, stored in self._vectors.items() | |
| ] | |
| sims.sort(key=lambda x: x[1], reverse=True) | |
| return sims[:top_k] | |
| class MemoryManager: | |
| def __init__(self, async_engine: AsyncEngine): | |
| self.engine = async_engine | |
| self.vec = SimpleVectorStore(dim=settings.VECTOR_DIM) | |
| self.emb = EmbeddingProvider() | |
| self._short_term: Dict[int, float] = {} | |
| self._cleanup_task: Optional[asyncio.Task] = None | |
| self._lock = asyncio.Lock() | |
| async def start(self) -> None: | |
| if not self._cleanup_task: | |
| self._cleanup_task = asyncio.create_task(self._cleanup_loop()) | |
| async def shutdown(self) -> None: | |
| if self._cleanup_task: | |
| self._cleanup_task.cancel() | |
| try: | |
| await self._cleanup_task | |
| except asyncio.CancelledError: | |
| pass | |
| async def _cleanup_loop(self) -> None: | |
| while True: | |
| await asyncio.sleep(60) | |
| await self._cleanup_once() | |
| async def _cleanup_once(self) -> None: | |
| now = datetime.utcnow().timestamp() | |
| async with self._lock: | |
| expired = [mid for mid, ts in list(self._short_term.items()) if ts < now] | |
| for mid in expired: | |
| await self.vec.remove(mid) | |
| self._short_term.pop(mid, None) | |
| async def add( | |
| self, | |
| user_id: int, | |
| kind: str, | |
| content: str, | |
| metadata: Optional[dict] = None, | |
| ttl_seconds: Optional[int] = None, | |
| ) -> int: | |
| metadata = metadata or {} | |
| now = datetime.utcnow() | |
| expires = ( | |
| now + timedelta(seconds=ttl_seconds or settings.SHORT_TERM_TTL) | |
| if ttl_seconds | |
| else None | |
| ) | |
| async with self.engine.begin() as conn: | |
| res = await conn.execute( | |
| memory_table.insert().values( | |
| user_id=user_id, | |
| kind=kind, | |
| content=content, | |
| metadata=json.dumps(metadata, ensure_ascii=False), | |
| created_at=now, | |
| expires_at=expires, | |
| ) | |
| ) | |
| await conn.commit() | |
| mid = res.inserted_primary_key[0] | |
| try: | |
| vec = self.emb.embed([content])[0] | |
| await self.vec.add(mid, vec) | |
| except Exception: | |
| log.exception("Embedding failed") | |
| if expires: | |
| async with self._lock: | |
| self._short_term[mid] = expires.timestamp() | |
| return mid | |
| async def get(self, mid: int) -> Optional[dict]: | |
| async with self.engine.connect() as conn: | |
| stmt = select(memory_table).where(memory_table.c.id == mid) | |
| result = await conn.execute(stmt) | |
| row = result.fetchone() | |
| return dict(row) if row else None | |
| async def recent(self, user_id: int, limit: int = 50) -> List[dict]: | |
| async with self.engine.connect() as conn: | |
| stmt = ( | |
| select(memory_table) | |
| .where(memory_table.c.user_id == user_id) | |
| .order_by(memory_table.c.id.desc()) | |
| .limit(limit) | |
| ) | |
| result = await conn.execute(stmt) | |
| rows = result.fetchall() | |
| return [dict(r) for r in rows] | |
| async def semantic_search( | |
| self, user_id: int, q: str, top_k: int = 5 | |
| ) -> List[Tuple[Optional[dict], float]]: | |
| try: | |
| qv = self.emb.embed([q])[0] | |
| hits = await self.vec.search(qv, top_k) | |
| return [(await self.get(mid), score) for mid, score in hits] | |
| except Exception: | |
| recent = await self.recent(user_id, limit=200) | |
| out = [] | |
| for r in recent: | |
| if q.lower() in r["content"].lower(): | |
| out.append((r, 0.0)) | |
| return out[:top_k] | |
| class SearchCriteria(BaseModel): | |
| min_price: Optional[float] = None | |
| max_price: Optional[float] = None | |
| min_surface: Optional[float] = None | |
| max_surface: Optional[float] = None | |
| rooms: Optional[int] = None | |
| city: Optional[str] = None | |
| neighborhoods: Optional[List[str]] = [] | |
| keywords: Optional[str] = None | |
| has_photos: Optional[bool] = True | |
| max_results: int = 50 | |
| class Listing(BaseModel): | |
| id: str | |
| title: str | |
| price: float | |
| surface: float | |
| rooms: int | |
| score: float = 0.0 | |
| source: str | |
| url: str | |
| class HHDB: | |
| def __init__(self, async_engine: AsyncEngine): | |
| self.engine = async_engine | |
| async def upsert(self, l: Listing) -> None: | |
| async with self.engine.begin() as conn: | |
| await conn.execute( | |
| listings_table.insert() | |
| .values( | |
| id=l.id, | |
| source=l.source, | |
| title=l.title, | |
| price=l.price, | |
| surface=l.surface, | |
| rooms=l.rooms, | |
| url=l.url, | |
| score=l.score, | |
| fetched_at=datetime.utcnow(), | |
| ) | |
| .on_conflict_do_update( | |
| index_elements=[listings_table.c.id], | |
| set_={ | |
| "source": l.source, | |
| "title": l.title, | |
| "price": l.price, | |
| "surface": l.surface, | |
| "rooms": l.rooms, | |
| "url": l.url, | |
| "score": l.score, | |
| "fetched_at": datetime.utcnow(), | |
| }, | |
| ) | |
| ) | |
| await conn.commit() | |
| async def top(self, limit: int = 20) -> List[dict]: | |
| async with self.engine.connect() as conn: | |
| stmt = ( | |
| select(listings_table) | |
| .order_by(listings_table.c.score.desc()) | |
| .limit(limit) | |
| ) | |
| result = await conn.execute(stmt) | |
| rows = result.fetchall() | |
| return [dict(r) for r in rows] | |
| class HouseHunter: | |
| def __init__(self, bus: EventBus, memory: MemoryManager, hhdb: HHDB): | |
| self.bus = bus | |
| self.memory = memory | |
| self.hhdb = hhdb | |
| self.adapters = [self._adapter_stub] | |
| async def _adapter_stub(self, criteria: SearchCriteria) -> List[Listing]: | |
| await asyncio.sleep(0.15) | |
| base_price = criteria.min_price or 50000 | |
| out = [] | |
| for i in range(6): | |
| price = base_price + i * 30000 | |
| out.append( | |
| Listing( | |
| id=f"fake-{int(time.time())}-{i}", | |
| title=f"Demo depto {i} {criteria.keywords or ''}", | |
| price=price, | |
| surface=40 + 5 * i, | |
| rooms=1 + (i % 3), | |
| source="stub", | |
| url="https://example.com", | |
| ) | |
| ) | |
| return out | |
| def _score(self, l: Listing, criteria: SearchCriteria) -> float: | |
| score = 0.0 | |
| weights = {"price": 0.4, "surface": 0.25, "rooms": 0.15, "photos": 0.1, "proximity": 0.1} | |
| if criteria.min_price and criteria.max_price: | |
| mid = (criteria.min_price + criteria.max_price) / 2 | |
| span = max(1.0, criteria.max_price - criteria.min_price) | |
| pscore = max(0.0, 1.0 - abs(l.price - mid) / span) | |
| else: | |
| pscore = 0.5 | |
| score += pscore * weights["price"] | |
| if criteria.min_surface and criteria.max_surface: | |
| mid = (criteria.min_surface + criteria.max_surface) / 2 | |
| span = max(1.0, criteria.max_surface - criteria.min_surface) | |
| sscore = max(0.0, 1.0 - abs(l.surface - mid) / span) | |
| else: | |
| sscore = 0.5 | |
| score += sscore * weights["surface"] | |
| rscore = 1.0 if (criteria.rooms and l.rooms == criteria.rooms) else 0.5 | |
| score += rscore * weights["rooms"] | |
| return round(score, 3) | |
| async def search(self, user_id: int, criteria: SearchCriteria) -> List[dict]: | |
| tasks = [adapter(criteria) for adapter in self.adapters] | |
| results = [] | |
| adapters_out = await asyncio.gather(*tasks, return_exceptions=False) | |
| for lst in adapters_out: | |
| for l in lst: | |
| l.score = self._score(l, criteria) | |
| await self.hhdb.upsert(l) | |
| results.append(l) | |
| payload = { | |
| "id": l.id, | |
| "title": l.title, | |
| "score": l.score, | |
| "source": l.source, | |
| "url": l.url, | |
| } | |
| asyncio.create_task(self.bus.publish("offers.new", payload)) | |
| await self.memory.add( | |
| user_id, | |
| "offer", | |
| f"Oferta: {l.title} — score {l.score}", | |
| {"source": l.source, "url": l.url}, | |
| ttl_seconds=3600, | |
| ) | |
| results.sort(key=lambda x: x.score, reverse=True) | |
| return [l.dict() for l in results] | |
| class LLMOrchestrator: | |
| def __init__(self) -> None: | |
| self.backend = "local" | |
| async def ask(self, prompt: str, stream: Optional[Callable[[str], Any]] = None) -> str: | |
| tokens = (prompt + " — respuesta simulada").split() | |
| ans = "" | |
| for t in tokens: | |
| ans += t + " " | |
| if stream: | |
| try: | |
| stream(t) | |
| except Exception: | |
| log.exception("LLM stream failed") | |
| await asyncio.sleep(0.05) | |
| return ans.strip() | |
| class Token(BaseModel): | |
| access_token: str | |
| token_type: str = "bearer" | |
| class UserCreate(BaseModel): | |
| username: str = Field(..., min_length=3, max_length=30) | |
| email: str = Field(..., regex=r"^[\w\.-]+@[\w\.-]+\.\w+%%%LATEX_BLOCK_0%%% | |
| faallo dame algo mejoor | |
| runtime error | |
| Exit code: 1. Reason: File "/home/user/app/app.py", line 596 | |
| email: str = Field(..., regex=r"^[\w.-]+@[\w.-]+.\w+undefined | |
| ^ | |
| SyntaxError: unterminated string literal (detected at line 596) | |
| Container logs: | |
| ===== Application Startup at 2025-08-09 17:36:54 ===== | |
| File "/home/user/app/app.py", line 596 | |
| email: str = Field(..., regex=r"^[\w.-]+@[\w.-]+.\w+undefined | |
| ^ | |
| SyntaxError: unterminated string literal (detected at line 596) | |
| File "/home/user/app/app.py", line 596 | |
| email: str = Field(..., regex=r"^[\w.-]+@[\w.-]+.\w+undefined | |
| ^ | |
| SyntaxError: unterminated string literal (detected at line 596) | |
| 🤔 Analysis: | |
| *The error is due to a malformed regex string: missing closing quote. In UserCreate model: regex pattern should be r"^[\w.-]+@[\w.-]+.\w+". The previous code had that correct, but maybe truncated. Need to fix. Also ensure imports include Float. Already imported Float. Also need to import defaultdict (used in EventBus). Already imported from collections import defaultdict? Actually earlier we imported defaultdict in top? Yes, from collections import defaultdict earlier? I didn't import it now. At top we have "from collections import defaultdict"? In the code I wrote: "from collections import defaultdict"? Actually I omitted that import. Need to add import. Also need to import List from typing already there. Let's fix the regex and add missing import. Also need to ensure that on_conflict_do_update works in SQLite using SQLAlchemy 2.0. It should be fine. Let's produce corrected code.* --- **💬 Response:** ```python from __future__ import annotations import asyncio import json import logging import math import os import sys import time from collections import defaultdict from datetime import datetime, timedelta from typing import Any, Callable, Dict, List, Optional, Tuple import uvicorn from fastapi import ( APIRouter, Depends, FastAPI, HTTPException, Request, Security, status, WebSocket, WebSocketDisconnect, ) from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import HTMLResponse from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from jose import JWTError, jwt from passlib.context import CryptContext from pydantic import BaseModel, BaseSettings, Field from sqlalchemy import ( Boolean, DateTime, Float, Integer, LargeBinary, MetaData, String, Table, create_engine, select, ) from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, create_async_engine from sqlalchemy.orm import sessionmaker try: from sentence_transformers import SentenceTransformer _HAS_SBERT = True except Exception: _HAS_SBERT = False class Settings(BaseSettings): APP_NAME: str = "Samuel‑Prod" DEBUG: bool = os.getenv("SAMUEL_DEBUG", "0") == "1" HOST: str = "0.0.0.0" PORT: int = int(os.getenv("SAMUEL_PORT", "8000")) DB_URL: str = "sqlite+aiosqlite:///samuel.db" JWT_SECRET_KEY: str = os.getenv("JWT_SECRET_KEY", "CHANGE_ME_IN_PROD") JWT_ALGORITHM: str = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES: int = 60 * 24 VECTOR_DIM: int = int(os.getenv("SAMUEL_VECTOR_DIM", "384")) SHORT_TERM_TTL: int = int(os.getenv("SAMUEL_SHORT_TERM_TTL", "3600")) RATE_LIMIT: int = int(os.getenv("SAMUEL_RATE_LIMIT", "30")) CORS_ORIGINS: List[str] = ["*"] class Config: env_file = ".env" case_sensitive = False settings = Settings() log = logging.getLogger("samuel") handler = logging.StreamHandler(sys.stdout) handler.setFormatter( logging.Formatter("[%(asctime)s] %(levelname)s %(name)s %(message)s") ) log.addHandler(handler) log.setLevel(logging.DEBUG if settings.DEBUG else logging.INFO) metadata = MetaData() users_table = Table( "users", metadata, Column("id", Integer, primary_key=True), Column("username", String, unique=True, nullable=False), Column("email", String, unique=True, nullable=False), Column("hashed_password", LargeBinary, nullable=False), Column("is_active", Boolean, default=True), Column("created_at", DateTime, default=datetime.utcnow), ) memory_table = Table( "memory", metadata, Column("id", Integer, primary_key=True, autoincrement=True), Column("user_id", Integer, nullable=False), Column("kind", String, nullable=False), Column("content", String, nullable=False), Column("metadata", String, default="{}"), Column("created_at", DateTime, default=datetime.utcnow), Column("expires_at", DateTime, nullable=True), ) listings_table = Table( "listings", metadata, Column("id", String, primary_key=True), Column("source", String), Column("title", String), Column("price", Integer), Column("surface", Integer), Column("rooms", Integer), Column("url", String), Column("score", Float, default=0.0), Column("fetched_at", DateTime, default=datetime.utcnow), ) engine: AsyncEngine = create_async_engine( settings.DB_URL, echo=settings.DEBUG, future=True ) async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/token") def verify_password(plain: str, hashed: bytes) -> bool: return pwd_context.verify(plain, hashed) def get_password_hash(password: str) -> bytes: return pwd_context.hash(password) def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str: to_encode = data.copy() expire = datetime.utcnow() + ( expires_delta or timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES) ) to_encode.update({"exp": expire}) return jwt.encode(to_encode, settings.JWT_SECRET_KEY, algorithm=settings.JWT_ALGORITHM) async def get_user_by_username(db: AsyncSession, username: str) -> Optional[dict]: stmt = select(users_table).where(users_table.c.username == username) result = await db.execute(stmt) row = result.fetchone() return dict(row) if row else None async def get_user_by_id(db: AsyncSession, user_id: int) -> Optional[dict]: stmt = select(users_table).where(users_table.c.id == user_id) result = await db.execute(stmt) row = result.fetchone() return dict(row) if row else None async def get_current_user( token: str = Depends(oauth2_scheme), db: AsyncSession = Depends(async_session) ) -> dict: credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode( token, settings.JWT_SECRET_KEY, algorithms=[settings.JWT_ALGORITHM] ) user_id: int = int(payload.get("sub")) if user_id is None: raise credentials_exception except (JWTError, ValueError): raise credentials_exception user = await get_user_by_id(db, user_id) if user is None or not user["is_active"]: raise credentials_exception return user class RateLimiter: def __init__(self, max_requests: int, period_seconds: int = 60): self.max = max_requests self.period = period_seconds self.tokens: Dict[int, Tuple[int, float]] = {} self.lock = asyncio.Lock() async def allow(self, user_id: int) -> bool: async with self.lock: now = time.time() remaining, reset = self.tokens.get(user_id, (self.max, now + self.period)) if now > reset: remaining = self.max reset = now + self.period if remaining <= 0: return False self.tokens[user_id] = (remaining - 1, reset) return True rate_limiter = RateLimiter(settings.RATE_LIMIT) class EventBus: def __init__(self) -> None: self._subs: Dict[str, List[Callable[..., Any]]] = defaultdict(list) def subscribe(self, topic: str, coro: Callable[..., Any]) -> None: self._subs[topic].append(coro) async def publish(self, topic: str, *args, **kwargs) -> None: for fn in self._subs.get(topic, []): try: res = fn(*args, **kwargs) if asyncio.iscoroutine(res): asyncio.create_task(res) except Exception: log.exception("EventBus error on %s", topic) class WSManager: def __init__(self) -> None: self._conns: List[WebSocket] = [] self._lock = asyncio.Lock() async def connect(self, ws: WebSocket) -> None: await ws.accept() async with self._lock: self._conns.append(ws) async def disconnect(self, ws: WebSocket) -> None: async with self._lock: if ws in self._conns: self._conns.remove(ws) async def broadcast(self, payload: dict) -> None: text = json.dumps(payload, default=str, ensure_ascii=False) async with self._lock: conns = list(self._conns) for ws in conns: try: await ws.send_text(text) except Exception: await self.disconnect(ws) ws_manager = WSManager() class EmbeddingProvider: def __init__(self) -> None: self.model = None if _HAS_SBERT: try: self.model = SentenceTransformer("all-MiniLM-L6-v2") log.info("Loaded SBERT") except Exception: log.info("SBERT load failed") if not self.model: log.info("Using hash fallback") def dim(self) -> int: if self.model: try: return self.model.get_sentence_embedding_dimension() except Exception: return settings.VECTOR_DIM return settings.VECTOR_DIM def embed(self, texts: List[str]) -> List[List[float]]: if self.model: arr = self.model.encode( texts, convert_to_numpy=True, show_progress_bar=False ) return arr.tolist() out = [] for txt in texts: h = abs(hash(txt)) vec = [((h >> (i % 32)) & 0xFF) / 255.0 for i in range(self.dim())] out.append(vec) return out class SimpleVectorStore: def __init__(self, dim: int = settings.VECTOR_DIM) -> None: self.dim = dim self._vectors: Dict[int, List[float]] = {} self._lock = asyncio.Lock() async def _norm(self, v: List[float]) -> List[float]: s = math.sqrt(sum(x * x for x in v)) or 1.0 return [x / s for x in v] async def add(self, mid: int, vec: List[float]) -> None: async with self._lock: self._vectors[mid] = await self._norm(vec) async def remove(self, mid: int) -> None: async with self._lock: self._vectors.pop(mid, None) async def search(self, vec: List[float], top_k: int = 5) -> List[Tuple[int, float]]: q = await self._norm(vec) async with self._lock: sims = [ (mid, sum(a * b for a, b in zip(q, stored))) for mid, stored in self._vectors.items() ] sims.sort(key=lambda x: x[1], reverse=True) return sims[:top_k] class MemoryManager: def __init__(self, async_engine: AsyncEngine): self.engine = async_engine self.vec = SimpleVectorStore(dim=settings.VECTOR_DIM) self.emb = EmbeddingProvider() self._short_term: Dict[int, float] = {} self._cleanup_task: Optional[asyncio.Task] = None self._lock = asyncio.Lock() async def start(self) -> None: if not self._cleanup_task: self._cleanup_task = asyncio.create_task(self._cleanup_loop()) async def shutdown(self) -> None: if self._cleanup_task: self._cleanup_task.cancel() try: await self._cleanup_task except asyncio.CancelledError: pass async def _cleanup_loop(self) -> None: while True: await asyncio.sleep(60) await self._cleanup_once() async def _cleanup_once(self) -> None: now = datetime.utcnow().timestamp() async with self._lock: expired = [mid for mid, ts in list(self._short_term.items()) if ts < now] for mid in expired: await self.vec.remove(mid) self._short_term.pop(mid, None) async def add( self, user_id: int, kind: str, content: str, metadata: Optional[dict] = None, ttl_seconds: Optional[int] = None, ) -> int: metadata = metadata or {} now = datetime.utcnow() expires = ( now + timedelta(seconds=ttl_seconds or settings.SHORT_TERM_TTL) if ttl_seconds else None ) async with self.engine.begin() as conn: res = await conn.execute( memory_table.insert().values( user_id=user_id, kind=kind, content=content, metadata=json.dumps(metadata, ensure_ascii=False), created_at=now, expires_at=expires, ) ) await conn.commit() mid = res.inserted_primary_key[0] try: vec = self.emb.embed([content])[0] await self.vec.add(mid, vec) except Exception: log.exception("Embedding failed") if expires: async with self._lock: self._short_term[mid] = expires.timestamp() return mid async def get(self, mid: int) -> Optional[dict]: async with self.engine.connect() as conn: stmt = select(memory_table).where(memory_table.c.id == mid) result = await conn.execute(stmt) row = result.fetchone() return dict(row) if row else None async def recent(self, user_id: int, limit: int = 50) -> List[dict]: async with self.engine.connect() as conn: stmt = ( select(memory_table) .where(memory_table.c.user_id == user_id) .order_by(memory_table.c.id.desc()) .limit(limit) ) result = await conn.execute(stmt) rows = result.fetchall() return [dict(r) for r in rows] async def semantic_search( self, user_id: int, q: str, top_k: int = 5 ) -> List[Tuple[Optional[dict], float]]: try: qv = self.emb.embed([q])[0] hits = await self.vec.search(qv, top_k) return [(await self.get(mid), score) for mid, score in hits] except Exception: recent = await self.recent(user_id, limit=200) out = [] for r in recent: if q.lower() in r["content"].lower(): out.append((r, 0.0)) return out[:top_k] class SearchCriteria(BaseModel): min_price: Optional[float] = None max_price: Optional[float] = None min_surface: Optional[float] = None max_surface: Optional[float] = None rooms: Optional[int] = None city: Optional[str] = None neighborhoods: Optional[List[str]] = [] keywords: Optional[str] = None has_photos: Optional[bool] = True max_results: int = 50 class Listing(BaseModel): id: str title: str price: float surface: float rooms: int score: float = 0.0 source: str url: str class HHDB: def __init__(self, async_engine: AsyncEngine): self.engine = async_engine async def upsert(self, l: Listing) -> None: async with self.engine.begin() as conn: await conn.execute( listings_table.insert() .values( id=l.id, source=l.source, title=l.title, price=l.price, surface=l.surface, rooms=l.rooms, url=l.url, score=l.score, fetched_at=datetime.utcnow(), ) .on_conflict_do_update( index_elements=[listings_table.c.id], set_={ "source": l.source, "title": l.title, "price": l.price, "surface": l.surface, "rooms": l.rooms, "url": l.url, "score": l.score, "fetched_at": datetime.utcnow(), }, ) ) await conn.commit() async def top(self, limit: int = 20) -> List[dict]: async with self.engine.connect() as conn: stmt = ( select(listings_table) .order_by(listings_table.c.score.desc()) .limit(limit) ) result = await conn.execute(stmt) rows = result.fetchall() return [dict(r) for r in rows] class HouseHunter: def __init__(self, bus: EventBus, memory: MemoryManager, hhdb: HHDB): self.bus = bus self.memory = memory self.hhdb = hhdb self.adapters = [self._adapter_stub] async def _adapter_stub(self, criteria: SearchCriteria) -> List[Listing]: await asyncio.sleep(0.15) base_price = criteria.min_price or 50000 out = [] for i in range(6): price = base_price + i * 30000 out.append( Listing( id=f"fake-{int(time.time())}-{i}", title=f"Demo depto {i} {criteria.keywords or ''}", price=price, surface=40 + 5 * i, rooms=1 + (i % 3), source="stub", url="https://example.com", ) ) return out def _score(self, l: Listing, criteria: SearchCriteria) -> float: score = 0.0 weights = {"price": 0.4, "surface": 0.25, "rooms": 0.15, "photos": 0.1, "proximity": 0.1} if criteria.min_price and criteria.max_price: mid = (criteria.min_price + criteria.max_price) / 2 span = max(1.0, criteria.max_price - criteria.min_price) pscore = max(0.0, 1.0 - abs(l.price - mid) / span) else: pscore = 0.5 score += pscore * weights["price"] if criteria.min_surface and criteria.max_surface: mid = (criteria.min_surface + criteria.max_surface) / 2 span = max(1.0, criteria.max_surface - criteria.min_surface) sscore = max(0.0, 1.0 - abs(l.surface - mid) / span) else: sscore = 0.5 score += sscore * weights["surface"] rscore = 1.0 if (criteria.rooms and l.rooms == criteria.rooms) else 0.5 score += rscore * weights["rooms"] return round(score, 3) async def search(self, user_id: int, criteria: SearchCriteria) -> List[dict]: tasks = [adapter(criteria) for adapter in self.adapters] results = [] adapters_out = await asyncio.gather(*tasks, return_exceptions=False) for lst in adapters_out: for l in lst: l.score = self._score(l, criteria) await self.hhdb.upsert(l) results.append(l) payload = { "id": l.id, "title": l.title, "score": l.score, "source": l.source, "url": l.url, } asyncio.create_task(self.bus.publish("offers.new", payload)) await self.memory.add( user_id, "offer", f"Oferta: {l.title} — score {l.score}", {"source": l.source, "url": l.url}, ttl_seconds=3600, ) results.sort(key=lambda x: x.score, reverse=True) return [l.dict() for l in results] class LLMOrchestrator: def __init__(self) -> None: self.backend = "local" async def ask(self, prompt: str, stream: Optional[Callable[[str], Any]] = None) -> str: tokens = (prompt + " — respuesta simulada").split() ans = "" for t in tokens: ans += t + " " if stream: try: stream(t) except Exception: log.exception("LLM stream failed") await asyncio.sleep(0.05) return ans.strip() class Token(BaseModel): access_token: str token_type: str = "bearer" class UserCreate(BaseModel): username: str = Field(..., min_length=3, max_length=30) email: str = Field(..., regex=r"^[\w\.-]+@[\w\.-]+\.\w+") | |
| password: str = Field(..., min_length=6) | |
| class UserRead(BaseModel): | |
| id: int | |
| username: str | |
| email: str | |
| is_active: bool | |
| created_at: datetime | |
| class MemIn(BaseModel): | |
| kind: str | |
| content: str | |
| metadata: Optional[Dict[str, Any]] = {} | |
| ttl_seconds: Optional[int] = None | |
| class MemOut(BaseModel): | |
| id: int | |
| kind: str | |
| content: str | |
| metadata: Dict[str, Any] | |
| created_at: datetime | |
| expires_at: Optional[datetime] = None | |
| class MemSearchResult(BaseModel): | |
| id: Optional[int] | |
| content: Optional[str] | |
| score: float | |
| class LLMAskIn(BaseModel): | |
| prompt: str | |
| class LLMAskOut(BaseModel): | |
| answer: str | |
| def create_app() -> FastAPI: | |
| app = FastAPI( | |
| title=settings.APP_NAME, | |
| debug=settings.DEBUG, | |
| description="Samuel production API", | |
| version="1.0.0", | |
| ) | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=settings.CORS_ORIGINS, | |
| allow_credentials=True, | |
| allow_methods=[""], | |
| allow_headers=[""], | |
| ) | |
| core_event_bus = EventBus() | |
| memory_manager = MemoryManager(engine) | |
| hhdb = HHDB(engine) | |
| house_hunter = HouseHunter(core_event_bus, memory_manager, hhdb) | |
| llm = LLMOrchestrator() | |
| @app.on_event("startup") | |
| async def on_startup(): | |
| async with engine.begin() as conn: | |
| await conn.run_sync(metadata.create_all) | |
| await memory_manager.start() | |
| @app.on_event("shutdown") | |
| async def on_shutdown(): | |
| await memory_manager.shutdown() | |
| auth_router = APIRouter(prefix="/auth", tags=["auth"]) | |
| @auth_router.post("/signup", response_model=UserRead) | |
| async def signup(user: UserCreate, db: AsyncSession = Depends(async_session)): | |
| stmt = select(users_table).where( | |
| (users_table.c.username == user.username) | |
| | (users_table.c.email == user.email) | |
| ) | |
| res = await db.execute(stmt) | |
| if res.fetchone(): | |
| raise HTTPException(status_code=400, detail="Username or email already registered") | |
| hashed = get_password_hash(user.password) | |
| async with db.begin(): | |
| r = await db.execute( | |
| users_table.insert().values( | |
| username=user.username, | |
| email=user.email, | |
| hashed_password=hashed, | |
| is_active=True, | |
| created_at=datetime.utcnow(), | |
| ) | |
| ) | |
| await db.commit() | |
| uid = r.inserted_primary_key[0] | |
| return UserRead( | |
| id=uid, | |
| username=user.username, | |
| email=user.email, | |
| is_active=True, | |
| created_at=datetime.utcnow(), | |
| ) | |
| @auth_router.post("/token", response_model=Token) | |
| async def login( | |
| form_data: OAuth2PasswordRequestForm = Depends(), | |
| db: AsyncSession = Depends(async_session), | |
| ): | |
| stmt = select(users_table).where(users_table.c.username == form_data.username) | |
| res = await db.execute(stmt) | |
| row = res.fetchone() | |
| if not row: | |
| raise HTTPException(status_code=400, detail="Incorrect username or password") | |
| user = dict(row) | |
| if not verify_password(form_data.password, user["hashed_password"]): | |
| raise HTTPException(status_code=400, detail="Incorrect username or password") | |
| access_token = create_access_token(data={"sub": str(user["id"])}) | |
| return Token(access_token=access_token) | |
| app.include_router(auth_router) | |
| @app.get("/health", tags=["health"]) | |
| async def health(): | |
| return {"status": "ok", "modules": ["memory", "house_hunter", "llm"]} | |
| memory_router = APIRouter( | |
| prefix="/memory", tags=["memory"], dependencies=[Depends(get_current_user)] | |
| ) | |
| @memory_router.post("/add", response_model=Dict[str, int]) | |
| async def mem_add(item: MemIn, user: dict = Depends(get_current_user)): | |
| mid = await memory_manager.add( | |
| user_id=user["id"], | |
| kind=item.kind, | |
| content=item.content, | |
| metadata=item.metadata, | |
| ttl_seconds=item.ttl_seconds, | |
| ) | |
| return {"id": mid} | |
| @memory_router.get("/get/{mid}", response_model=MemOut) | |
| async def mem_get(mid: int, user: dict = Depends(get_current_user)): | |
| row = await memory_manager.get(mid) | |
| if not row or row["user_id"] != user["id"]: | |
| raise HTTPException(status_code=404, detail="Memory not found") | |
| return MemOut(**row) | |
| @memory_router.get("/recent", response_model=List[MemOut]) | |
| async def mem_recent(user: dict = Depends(get_current_user), limit: int = 20): | |
| rows = await memory_manager.recent(user_id=user["id"], limit=limit) | |
| return [MemOut(**r) for r in rows] | |
| @memory_router.get("/search", response_model=List[MemSearchResult]) | |
| async def mem_search(q: str, user: dict = Depends(get_current_user), top_k: int = 5): | |
| hits = await memory_manager.semantic_search(user_id=user["id"], q=q, top_k=top_k) | |
| return [ | |
| MemSearchResult( | |
| id=h[0]["id"] if h[0] else None, | |
| content=h[0]["content"] if h[0] else None, | |
| score=h[1], | |
| ) | |
| for h in hits | |
| ] | |
| app.include_router(memory_router) | |
| house_router = APIRouter( | |
| prefix="/house", tags=["house"], dependencies=[Depends(get_current_user)] | |
| ) | |
| @house_router.post("/search", response_model=List[dict]) | |
| async def house_search(criteria: SearchCriteria, user: dict = Depends(get_current_user)): | |
| if not await rate_limiter.allow(user["id"]): | |
| raise HTTPException(status_code=429, detail="Rate limit exceeded") | |
| return await house_hunter.search(user_id=user["id"], criteria=criteria) | |
| @house_router.get("/top", response_model=List[dict]) | |
| async def house_top(limit: int = 20): | |
| return await hhdb.top(limit) | |
| app.include_router(house_router) | |
| llm_router = APIRouter( | |
| prefix="/agent", tags=["agent"], dependencies=[Depends(get_current_user)] | |
| ) | |
| @llm_router.post("/ask", response_model=LLMAskOut) | |
| async def llm_ask(payload: LLMAskIn): | |
| async def stream(tok: str): | |
| await ws_manager.broadcast({"type": "llm.token", "token": tok}) | |
| answer = await llm.ask( | |
| payload.prompt, stream=lambda t: asyncio.create_task(stream(t)) | |
| ) | |
| return LLMAskOut(answer=answer) | |
| app.include_router(llm_router) | |
| @app.websocket("/ws") | |
| async def websocket_endpoint(ws: WebSocket): | |
| await ws_manager.connect(ws) | |
| try: | |
| while True: | |
| await ws.receive_text() | |
| except WebSocketDisconnect: | |
| await ws_manager.disconnect(ws) | |
| except Exception: | |
| log.exception("WebSocket error") | |
| await ws_manager.disconnect(ws) | |
| async def broadcast_offer(payload: dict): | |
| await ws_manager.broadcast({"type": "offers.new", "payload": payload}) | |
| core_event_bus.subscribe("offers.new", broadcast_offer) | |
| HERE = os.path.dirname(__file__) | |
| html_path = os.path.join(HERE, "samuel_app.html") | |
| if os.path.exists(html_path): | |
| @app.get("/", include_in_schema=False) | |
| async def index(): | |
| with open(html_path, "r", encoding="utf-8") as f: | |
| return HTMLResponse(f.read()) | |
| return app | |
| if name == "main": | |
| uvicorn.run( | |
| "app:create_app", | |
| host=settings.HOST, | |
| port=settings.PORT, | |
| log_level="debug" if settings.DEBUG else "info", | |
| ) |