from fastapi import FastAPI, Depends, HTTPException, BackgroundTasks from pathlib import Path import os import requests from dotenv import load_dotenv from sqlalchemy.orm import Session from database import Base, engine,SessionLocal from models import User, Log, CtrReport from schemas import SignupRequest, LoginRequest, LogRequest from auth import hash_password, verify_password, create_token from deps import get_db, get_current_user from config import API_BASE_URL from models import User, Log, CtrReport, Restaurant, Review from utils import extract_points app = FastAPI() env_path = Path(__file__).resolve().parent / ".env" load_dotenv(env_path) Base.metadata.create_all(bind=engine) def get_representatives( texts: list[str], eps: float, min_samples: int, error_label: str ) -> list[str]: if not texts: return [] try: response = requests.post( f"{API_BASE_URL}/get_representatives", json={ "texts": texts, "eps": eps, "min_samples": min_samples }, timeout=60 ) except requests.RequestException as exc: raise HTTPException( 502, f"{error_label} service failed: {exc}" ) if response.status_code != 200: raise HTTPException( 502, f"{error_label} service error" ) payload = response.json() return payload.get("representatives", []) def requesty_chat(prompt: str) -> str: api_key = os.getenv("REQUESTY_API_KEY") if not api_key: raise HTTPException(500, "Requesty API key not configured") base_url = os.getenv( "REQUESTY_API_URL", "https://router.requesty.ai/v1" ).rstrip("/") headers = { "Authorization": f"Bearer {api_key}", } referer = os.getenv("REQUESTY_HTTP_REFERER") title = os.getenv("REQUESTY_X_TITLE") if referer: headers["HTTP-Referer"] = referer if title: headers["X-Title"] = title try: response = requests.post( f"{base_url}/chat/completions", headers=headers, json={ "model": "openai/gpt-4o", "temperature": 0.2, "max_tokens": 256, "messages": [ { "role": "system", "content": ( "You find correlations between insights and reviews. " "Return 3-6 short numbered points, each under 20 words." ) }, {"role": "user", "content": prompt} ] }, timeout=60 ) except requests.RequestException as exc: raise HTTPException(502, f"Requesty service failed: {exc}") if response.status_code != 200: detail = response.text.strip() if detail: raise HTTPException( 502, f"Requesty service error: {detail}" ) raise HTTPException(502, "Requesty service error") payload = response.json() choices = payload.get("choices", []) if not choices: raise HTTPException(502, "Requesty service returned no choices") message = choices[0].get("message", {}) content = message.get("content", "") return content.strip() @app.post("/auth/signup") def signup(data: SignupRequest, db: Session = Depends(get_db)): existing = db.query(User).filter(User.email == data.email).first() if existing: raise HTTPException(400, "Email already exists") user = User( email=data.email, password_hash=hash_password(data.password), name=data.name ) db.add(user) db.commit() db.refresh(user) return {"message": "User created"} @app.post("/auth/login") def login(data: LoginRequest, db: Session = Depends(get_db)): user = db.query(User).filter(User.email == data.email).first() if not user or not verify_password(data.password, user.password_hash): raise HTTPException(401, "Invalid credentials") token = create_token({"user_id": user.id, "name": user.name, "email":user.email}) return {"access_token": token} def process_session_logs(user_id: int, session_id: str, db_session_factory): db = db_session_factory() try: session_logs = ( db.query(Log) .filter( Log.user_id == user_id, Log.session_id == session_id ) .order_by(Log.timestamp.asc()) .all() ) combined_logs = "\n".join( f"{entry.timestamp.isoformat()} : {entry.log}" for entry in session_logs ) response = requests.post( f"{API_BASE_URL}/processed-logs", json={"logs": combined_logs}, timeout=300 # allow long processing ) if response.status_code != 200: print("Processing service error") return payload = response.json() report = CtrReport( user_id=user_id, session_id=session_id, report=payload.get("report", ""), insights=payload.get("insights", ""), state_flow=payload.get("state_flow", ""), suggestions=payload.get("suggestions", "") ) db.add(report) db.commit() except Exception as e: print("Background processing failed:", e) finally: db.close() @app.post("/addLog") def add_log( data: LogRequest, background_tasks: BackgroundTasks, user_id: int = Depends(get_current_user), db: Session = Depends(get_db) ): log = Log( user_id=user_id, session_id=data.session_id, log=data.log, timestamp=data.timestamp ) print(data.log) db.add(log) db.commit() # trigger async/background processing if "" in data.log: background_tasks.add_task( process_session_logs, user_id, data.session_id, SessionLocal # your DB session factory ) return {"message": "Log stored"} @app.get("/session-summary") def get_session_summary( session_id: str, db: Session = Depends(get_db) ): logs = ( db.query(Log) .filter(Log.session_id == session_id) .order_by(Log.timestamp.asc()) .all() ) report = ( db.query(CtrReport) .filter(CtrReport.session_id == session_id) .order_by(CtrReport.id.desc()) .first() ) return { "session_id": session_id, "logs_count": len(logs), "logs": [ { "id": entry.id, "log": entry.log, "timestamp": entry.timestamp } for entry in logs ], "report": report.report if report else "", "insights": report.insights if report else "", "suggestions": report.suggestions if report else "", "state_flow": report.state_flow if report else "" } @app.get("/sessionid_list") def get_sessionid_list(db: Session = Depends(get_db)): rows = ( db.query(CtrReport.session_id) .distinct() .order_by(CtrReport.session_id.asc()) .all() ) session_ids = [row[0] for row in rows] return { "count": len(session_ids), "session_ids": session_ids } @app.get("/restaurants") def get_restaurants(db: Session = Depends(get_db)): restaurants = db.query(Restaurant).all() result = [] for r in restaurants: result.append({ "id": r.id, "name": r.name, "cuisine": r.cuisine, "location": r.location, "price_range": r.price_range, "avg_rating": r.avg_rating, "description": r.description }) return { "count": len(result), "restaurants": result } @app.get("/restaurants/{restaurant_id}/reviews") def get_restaurant_reviews( restaurant_id: int, db: Session = Depends(get_db) ): restaurant = ( db.query(Restaurant) .filter(Restaurant.id == restaurant_id) .first() ) if not restaurant: raise HTTPException(404, "Restaurant not found") reviews = ( db.query(Review) .filter(Review.restaurant_id == restaurant_id) .order_by(Review.created_at.desc()) .all() ) result = [] for review in reviews: result.append({ "id": review.id, "user_name": review.user_name, "rating": review.rating, "review": review.review, "created_at": review.created_at }) return { "restaurant": { "id": restaurant.id, "name": restaurant.name, "cuisine": restaurant.cuisine, "location": restaurant.location, "avg_rating": restaurant.avg_rating }, "total_reviews": len(result), "reviews": result } @app.get("/restaurants/{restaurant_id}/get-few-reviews") def get_few_reviews( restaurant_id: int, eps: float = 0.55, min_samples: int = 2, db: Session = Depends(get_db) ): restaurant = ( db.query(Restaurant) .filter(Restaurant.id == restaurant_id) .first() ) if not restaurant: raise HTTPException(404, "Restaurant not found") reviews = ( db.query(Review) .filter(Review.restaurant_id == restaurant_id) .all() ) if not reviews: return { "restaurant_id": restaurant_id, "restaurant_name": restaurant.name, "total_reviews": 0, "selected_reviews": [] } review_texts = [r.review for r in reviews] try: response = requests.post( f"{API_BASE_URL}/get_representatives", json={ "texts": review_texts, "eps": eps, "min_samples": min_samples }, timeout=60 ) except requests.RequestException as exc: raise HTTPException( 502, f"Representative review service failed: {exc}" ) if response.status_code != 200: raise HTTPException( 502, "Representative review service error" ) payload = response.json() representative_texts = payload.get("representatives", []) selected_reviews = [] for text in representative_texts: matching_review = next( (r for r in reviews if r.review == text), None ) if matching_review: selected_reviews.append({ "id": matching_review.id, "user_name": matching_review.user_name, "rating": matching_review.rating, "review": matching_review.review, "created_at": matching_review.created_at }) return { "restaurant_id": restaurant.id, "restaurant_name": restaurant.name, "total_reviews": len(reviews), "selected_count": len(selected_reviews), "selected_reviews": selected_reviews } @app.get("/correlated-review-insights") def correlated_review_insights( restaurant_id: int, eps: float = 0.41, min_samples: int = 2, db: Session = Depends(get_db) ): restaurant = ( db.query(Restaurant) .filter(Restaurant.id == restaurant_id) .first() ) if not restaurant: raise HTTPException(404, "Restaurant not found") reviews = ( db.query(Review) .filter(Review.restaurant_id == restaurant_id) .all() ) reports = ( db.query(CtrReport) .order_by(CtrReport.id.desc()) .all() ) if not reviews or not reports: return { "restaurant_id": restaurant.id, "restaurant_name": restaurant.name, "correlation_points": [] } review_texts = [r.review for r in reviews] all_points = [] for report in reports: if not report.insights: continue all_points.extend(extract_points(report.insights)) unique_points = list(dict.fromkeys(all_points)) if not unique_points: return { "restaurant_id": restaurant.id, "restaurant_name": restaurant.name, "correlation_points": [] } representative_reviews = get_representatives( review_texts, eps=eps, min_samples=min_samples, error_label="Representative review" ) representative_insights = get_representatives( unique_points, eps=eps, min_samples=min_samples, error_label="Representative insight" ) trimmed_reviews = representative_reviews[:25] trimmed_insights = representative_insights[:25] prompt = ( "Insights:\n" + "\n".join(f"- {point}" for point in trimmed_insights) + "\n\nReviews:\n" + "\n".join(f"- {text}" for text in trimmed_reviews) + "\n\nCorrelate them in short numbered points." ) content = requesty_chat(prompt) correlation_points = extract_points(content) if not correlation_points: correlation_points = [ line.strip("- ").strip() for line in content.splitlines() if line.strip() ] if not correlation_points and content: correlation_points = [content] cleaned_points = [] for point in correlation_points[:6]: cleaned = point.strip() if len(cleaned) > 200: cleaned = cleaned[:197].rstrip() + "..." cleaned_points.append(cleaned) return { "restaurant_id": restaurant.id, "restaurant_name": restaurant.name, "reviews_considered": len(trimmed_reviews), "insights_considered": len(trimmed_insights), "correlation_points": cleaned_points } @app.get("/few-insights") def get_all_insights( eps: float = 0.41, min_samples: int = 2, db: Session = Depends(get_db) ): reports = ( db.query(CtrReport) .order_by(CtrReport.id.desc()) .all() ) if not reports: return { "count": 0, "insights": [], "selected_insights": [] } # flatten all insight points all_points = [] for report in reports: if not report.insights: continue extracted = extract_points(report.insights) all_points.extend(extracted) # remove duplicates while preserving order unique_points = list(dict.fromkeys(all_points)) selected_insights = [] if unique_points: try: response = requests.post( f"{API_BASE_URL}/get_representatives", json={ "texts": unique_points, "eps": eps, "min_samples": min_samples }, timeout=60 ) except requests.RequestException as exc: raise HTTPException( 502, f"Representative insight service failed: {exc}" ) if response.status_code != 200: raise HTTPException( 502, "Representative insight service error" ) payload = response.json() selected_insights = payload.get( "representatives", [] ) return { "count": len(selected_insights), "selected_insights": selected_insights }