zomato-backend / main.py
quantumbit's picture
printing logs
1b7782e
from fastapi import FastAPI, Depends, HTTPException, BackgroundTasks
from pathlib import Path
import os
import requests
from dotenv import load_dotenv
from sqlalchemy.orm import Session
from database import Base, engine,SessionLocal
from models import User, Log, CtrReport
from schemas import SignupRequest, LoginRequest, LogRequest
from auth import hash_password, verify_password, create_token
from deps import get_db, get_current_user
from config import API_BASE_URL
from models import User, Log, CtrReport, Restaurant, Review
from utils import extract_points
app = FastAPI()
env_path = Path(__file__).resolve().parent / ".env"
load_dotenv(env_path)
Base.metadata.create_all(bind=engine)
def get_representatives(
texts: list[str],
eps: float,
min_samples: int,
error_label: str
) -> list[str]:
if not texts:
return []
try:
response = requests.post(
f"{API_BASE_URL}/get_representatives",
json={
"texts": texts,
"eps": eps,
"min_samples": min_samples
},
timeout=60
)
except requests.RequestException as exc:
raise HTTPException(
502,
f"{error_label} service failed: {exc}"
)
if response.status_code != 200:
raise HTTPException(
502,
f"{error_label} service error"
)
payload = response.json()
return payload.get("representatives", [])
def requesty_chat(prompt: str) -> str:
api_key = os.getenv("REQUESTY_API_KEY")
if not api_key:
raise HTTPException(500, "Requesty API key not configured")
base_url = os.getenv(
"REQUESTY_API_URL",
"https://router.requesty.ai/v1"
).rstrip("/")
headers = {
"Authorization": f"Bearer {api_key}",
}
referer = os.getenv("REQUESTY_HTTP_REFERER")
title = os.getenv("REQUESTY_X_TITLE")
if referer:
headers["HTTP-Referer"] = referer
if title:
headers["X-Title"] = title
try:
response = requests.post(
f"{base_url}/chat/completions",
headers=headers,
json={
"model": "openai/gpt-4o",
"temperature": 0.2,
"max_tokens": 256,
"messages": [
{
"role": "system",
"content": (
"You find correlations between insights and reviews. "
"Return 3-6 short numbered points, each under 20 words."
)
},
{"role": "user", "content": prompt}
]
},
timeout=60
)
except requests.RequestException as exc:
raise HTTPException(502, f"Requesty service failed: {exc}")
if response.status_code != 200:
detail = response.text.strip()
if detail:
raise HTTPException(
502,
f"Requesty service error: {detail}"
)
raise HTTPException(502, "Requesty service error")
payload = response.json()
choices = payload.get("choices", [])
if not choices:
raise HTTPException(502, "Requesty service returned no choices")
message = choices[0].get("message", {})
content = message.get("content", "")
return content.strip()
@app.post("/auth/signup")
def signup(data: SignupRequest, db: Session = Depends(get_db)):
existing = db.query(User).filter(User.email == data.email).first()
if existing:
raise HTTPException(400, "Email already exists")
user = User(
email=data.email,
password_hash=hash_password(data.password),
name=data.name
)
db.add(user)
db.commit()
db.refresh(user)
return {"message": "User created"}
@app.post("/auth/login")
def login(data: LoginRequest, db: Session = Depends(get_db)):
user = db.query(User).filter(User.email == data.email).first()
if not user or not verify_password(data.password, user.password_hash):
raise HTTPException(401, "Invalid credentials")
token = create_token({"user_id": user.id, "name": user.name, "email":user.email})
return {"access_token": token}
def process_session_logs(user_id: int, session_id: str, db_session_factory):
db = db_session_factory()
try:
session_logs = (
db.query(Log)
.filter(
Log.user_id == user_id,
Log.session_id == session_id
)
.order_by(Log.timestamp.asc())
.all()
)
combined_logs = "\n".join(
f"{entry.timestamp.isoformat()} : {entry.log}"
for entry in session_logs
)
response = requests.post(
f"{API_BASE_URL}/processed-logs",
json={"logs": combined_logs},
timeout=300 # allow long processing
)
if response.status_code != 200:
print("Processing service error")
return
payload = response.json()
report = CtrReport(
user_id=user_id,
session_id=session_id,
report=payload.get("report", ""),
insights=payload.get("insights", ""),
state_flow=payload.get("state_flow", ""),
suggestions=payload.get("suggestions", "")
)
db.add(report)
db.commit()
except Exception as e:
print("Background processing failed:", e)
finally:
db.close()
@app.post("/addLog")
def add_log(
data: LogRequest,
background_tasks: BackgroundTasks,
user_id: int = Depends(get_current_user),
db: Session = Depends(get_db)
):
log = Log(
user_id=user_id,
session_id=data.session_id,
log=data.log,
timestamp=data.timestamp
)
print(data.log)
db.add(log)
db.commit()
# trigger async/background processing
if "<END>" in data.log:
background_tasks.add_task(
process_session_logs,
user_id,
data.session_id,
SessionLocal # your DB session factory
)
return {"message": "Log stored"}
@app.get("/session-summary")
def get_session_summary(
session_id: str,
db: Session = Depends(get_db)
):
logs = (
db.query(Log)
.filter(Log.session_id == session_id)
.order_by(Log.timestamp.asc())
.all()
)
report = (
db.query(CtrReport)
.filter(CtrReport.session_id == session_id)
.order_by(CtrReport.id.desc())
.first()
)
return {
"session_id": session_id,
"logs_count": len(logs),
"logs": [
{
"id": entry.id,
"log": entry.log,
"timestamp": entry.timestamp
}
for entry in logs
],
"report": report.report if report else "",
"insights": report.insights if report else "",
"suggestions": report.suggestions if report else "",
"state_flow": report.state_flow if report else ""
}
@app.get("/sessionid_list")
def get_sessionid_list(db: Session = Depends(get_db)):
rows = (
db.query(CtrReport.session_id)
.distinct()
.order_by(CtrReport.session_id.asc())
.all()
)
session_ids = [row[0] for row in rows]
return {
"count": len(session_ids),
"session_ids": session_ids
}
@app.get("/restaurants")
def get_restaurants(db: Session = Depends(get_db)):
restaurants = db.query(Restaurant).all()
result = []
for r in restaurants:
result.append({
"id": r.id,
"name": r.name,
"cuisine": r.cuisine,
"location": r.location,
"price_range": r.price_range,
"avg_rating": r.avg_rating,
"description": r.description
})
return {
"count": len(result),
"restaurants": result
}
@app.get("/restaurants/{restaurant_id}/reviews")
def get_restaurant_reviews(
restaurant_id: int,
db: Session = Depends(get_db)
):
restaurant = (
db.query(Restaurant)
.filter(Restaurant.id == restaurant_id)
.first()
)
if not restaurant:
raise HTTPException(404, "Restaurant not found")
reviews = (
db.query(Review)
.filter(Review.restaurant_id == restaurant_id)
.order_by(Review.created_at.desc())
.all()
)
result = []
for review in reviews:
result.append({
"id": review.id,
"user_name": review.user_name,
"rating": review.rating,
"review": review.review,
"created_at": review.created_at
})
return {
"restaurant": {
"id": restaurant.id,
"name": restaurant.name,
"cuisine": restaurant.cuisine,
"location": restaurant.location,
"avg_rating": restaurant.avg_rating
},
"total_reviews": len(result),
"reviews": result
}
@app.get("/restaurants/{restaurant_id}/get-few-reviews")
def get_few_reviews(
restaurant_id: int,
eps: float = 0.55,
min_samples: int = 2,
db: Session = Depends(get_db)
):
restaurant = (
db.query(Restaurant)
.filter(Restaurant.id == restaurant_id)
.first()
)
if not restaurant:
raise HTTPException(404, "Restaurant not found")
reviews = (
db.query(Review)
.filter(Review.restaurant_id == restaurant_id)
.all()
)
if not reviews:
return {
"restaurant_id": restaurant_id,
"restaurant_name": restaurant.name,
"total_reviews": 0,
"selected_reviews": []
}
review_texts = [r.review for r in reviews]
try:
response = requests.post(
f"{API_BASE_URL}/get_representatives",
json={
"texts": review_texts,
"eps": eps,
"min_samples": min_samples
},
timeout=60
)
except requests.RequestException as exc:
raise HTTPException(
502,
f"Representative review service failed: {exc}"
)
if response.status_code != 200:
raise HTTPException(
502,
"Representative review service error"
)
payload = response.json()
representative_texts = payload.get("representatives", [])
selected_reviews = []
for text in representative_texts:
matching_review = next(
(r for r in reviews if r.review == text),
None
)
if matching_review:
selected_reviews.append({
"id": matching_review.id,
"user_name": matching_review.user_name,
"rating": matching_review.rating,
"review": matching_review.review,
"created_at": matching_review.created_at
})
return {
"restaurant_id": restaurant.id,
"restaurant_name": restaurant.name,
"total_reviews": len(reviews),
"selected_count": len(selected_reviews),
"selected_reviews": selected_reviews
}
@app.get("/correlated-review-insights")
def correlated_review_insights(
restaurant_id: int,
eps: float = 0.41,
min_samples: int = 2,
db: Session = Depends(get_db)
):
restaurant = (
db.query(Restaurant)
.filter(Restaurant.id == restaurant_id)
.first()
)
if not restaurant:
raise HTTPException(404, "Restaurant not found")
reviews = (
db.query(Review)
.filter(Review.restaurant_id == restaurant_id)
.all()
)
reports = (
db.query(CtrReport)
.order_by(CtrReport.id.desc())
.all()
)
if not reviews or not reports:
return {
"restaurant_id": restaurant.id,
"restaurant_name": restaurant.name,
"correlation_points": []
}
review_texts = [r.review for r in reviews]
all_points = []
for report in reports:
if not report.insights:
continue
all_points.extend(extract_points(report.insights))
unique_points = list(dict.fromkeys(all_points))
if not unique_points:
return {
"restaurant_id": restaurant.id,
"restaurant_name": restaurant.name,
"correlation_points": []
}
representative_reviews = get_representatives(
review_texts,
eps=eps,
min_samples=min_samples,
error_label="Representative review"
)
representative_insights = get_representatives(
unique_points,
eps=eps,
min_samples=min_samples,
error_label="Representative insight"
)
trimmed_reviews = representative_reviews[:25]
trimmed_insights = representative_insights[:25]
prompt = (
"Insights:\n"
+ "\n".join(f"- {point}" for point in trimmed_insights)
+ "\n\nReviews:\n"
+ "\n".join(f"- {text}" for text in trimmed_reviews)
+ "\n\nCorrelate them in short numbered points."
)
content = requesty_chat(prompt)
correlation_points = extract_points(content)
if not correlation_points:
correlation_points = [
line.strip("- ").strip()
for line in content.splitlines()
if line.strip()
]
if not correlation_points and content:
correlation_points = [content]
cleaned_points = []
for point in correlation_points[:6]:
cleaned = point.strip()
if len(cleaned) > 200:
cleaned = cleaned[:197].rstrip() + "..."
cleaned_points.append(cleaned)
return {
"restaurant_id": restaurant.id,
"restaurant_name": restaurant.name,
"reviews_considered": len(trimmed_reviews),
"insights_considered": len(trimmed_insights),
"correlation_points": cleaned_points
}
@app.get("/few-insights")
def get_all_insights(
eps: float = 0.41,
min_samples: int = 2,
db: Session = Depends(get_db)
):
reports = (
db.query(CtrReport)
.order_by(CtrReport.id.desc())
.all()
)
if not reports:
return {
"count": 0,
"insights": [],
"selected_insights": []
}
# flatten all insight points
all_points = []
for report in reports:
if not report.insights:
continue
extracted = extract_points(report.insights)
all_points.extend(extracted)
# remove duplicates while preserving order
unique_points = list(dict.fromkeys(all_points))
selected_insights = []
if unique_points:
try:
response = requests.post(
f"{API_BASE_URL}/get_representatives",
json={
"texts": unique_points,
"eps": eps,
"min_samples": min_samples
},
timeout=60
)
except requests.RequestException as exc:
raise HTTPException(
502,
f"Representative insight service failed: {exc}"
)
if response.status_code != 200:
raise HTTPException(
502,
"Representative insight service error"
)
payload = response.json()
selected_insights = payload.get(
"representatives",
[]
)
return {
"count": len(selected_insights),
"selected_insights": selected_insights
}