TravelMap / app /main.py
Jack
fixed hosting page
30d3465
from __future__ import annotations
from datetime import date, datetime
from typing import Iterable
from urllib.parse import quote, urlsplit
from fastapi import Depends, FastAPI, Form, Request
from fastapi.exceptions import RequestValidationError
from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from sqlalchemy import func, or_, select
from sqlalchemy.orm import Session, joinedload, selectinload
from starlette.middleware.sessions import SessionMiddleware
from app.config import settings
from app.content import SITE_CONTENT, get_listing_card_meta, get_listing_content
from app.database import SessionLocal, get_db
from app.models import (
Availability,
Booking,
Listing,
Message,
MessageThread,
Review,
TaskDefinition,
User,
WishlistItem,
)
from app.seed import daterange, ensure_database_seeded, parse_date, reset_database
from app.tasks import evaluate_task, serialize_task
app = FastAPI(
title=settings.app_name,
description="A resettable vacation-rental marketplace with booking, messaging, and hosting flows.",
)
app.add_middleware(
SessionMiddleware,
secret_key=settings.session_secret,
same_site=settings.session_same_site,
https_only=settings.session_https_only,
)
app.mount("/static", StaticFiles(directory="app/static"), name="static")
templates = Jinja2Templates(directory="app/templates")
@app.on_event("startup")
def startup() -> None:
ensure_database_seeded()
def get_current_user(request: Request, db: Session) -> User | None:
user_id = request.session.get("user_id")
if user_id is None:
return None
return db.get(User, user_id)
def build_context(request: Request, db: Session, **kwargs):
current_user = get_current_user(request, db)
return {
"request": request,
"current_user": current_user,
"site_content": SITE_CONTENT,
"notice": request.query_params.get("notice"),
**kwargs,
}
def redirect_with_notice(path: str, notice: str) -> RedirectResponse:
separator = "&" if "?" in path else "?"
return RedirectResponse(url=f"{path}{separator}notice={quote(notice)}", status_code=303)
def _safe_notice_target(request: Request) -> str:
referer = request.headers.get("referer", "")
if referer:
parsed = urlsplit(referer)
path = parsed.path or "/"
query = f"?{parsed.query}" if parsed.query else ""
if path.startswith("/"):
return f"{path}{query}"
if request.url.path and request.url.path.startswith("/"):
return request.url.path
return "/"
def _parse_optional_int(value: str) -> int | None:
stripped = value.strip()
if not stripped:
return None
return int(stripped)
def _parse_optional_float(value: str) -> float | None:
stripped = value.strip()
if not stripped:
return None
return float(stripped)
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
if request.url.path.startswith("/api"):
return JSONResponse(status_code=422, content={"detail": exc.errors()})
return redirect_with_notice(_safe_notice_target(request), "Please check the values you entered and try again.")
def require_login(request: Request, db: Session, next_path: str) -> User | RedirectResponse:
user = get_current_user(request, db)
if user is None:
return RedirectResponse(url=f"/login?next={quote(next_path)}", status_code=303)
return user
def require_host(request: Request, db: Session, next_path: str) -> User | RedirectResponse:
user = require_login(request, db, next_path)
if isinstance(user, RedirectResponse):
return user
if not user.is_host:
return redirect_with_notice("/", "Switch to a host account to manage listings.")
return user
def normalize_redirect_target(target: str | None) -> str:
if not target or not target.startswith("/"):
return "/"
return target
def merge_ranges(ranges: list[dict[str, str]]) -> list[dict[str, str]]:
if not ranges:
return []
ordered = sorted(
[(parse_date(item["start"]), parse_date(item["end"])) for item in ranges],
key=lambda item: item[0],
)
merged: list[tuple[date, date]] = [ordered[0]]
for start_date, end_date in ordered[1:]:
previous_start, previous_end = merged[-1]
if start_date <= previous_end:
merged[-1] = (previous_start, max(previous_end, end_date))
else:
merged.append((start_date, end_date))
return [{"start": start.isoformat(), "end": end.isoformat()} for start, end in merged]
def is_date_blocked(current_date: date, blocked_ranges: list[dict[str, str]]) -> bool:
for blocked_range in blocked_ranges:
start_date = parse_date(blocked_range["start"])
end_date = parse_date(blocked_range["end"])
if start_date <= current_date < end_date:
return True
return False
def recompute_availability(db: Session, listing: Listing, start_date: date, end_date: date) -> None:
entries = db.scalars(
select(Availability).where(
Availability.listing_id == listing.id,
Availability.date >= start_date,
Availability.date < end_date,
)
).all()
entry_by_date = {entry.date: entry for entry in entries}
bookings = db.scalars(
select(Booking).where(
Booking.listing_id == listing.id,
Booking.status == "confirmed",
Booking.check_out > start_date,
Booking.check_in < end_date,
)
).all()
booked_dates = set()
for booking in bookings:
booked_dates.update(daterange(booking.check_in, booking.check_out))
for current_date in daterange(start_date, end_date):
entry = entry_by_date.get(current_date)
if entry is None:
entry = Availability(listing_id=listing.id, date=current_date)
db.add(entry)
entry.is_available = (
current_date not in booked_dates and not is_date_blocked(current_date, listing.blocked_ranges)
)
def get_listing(db: Session, slug: str) -> Listing | None:
return db.scalar(
select(Listing)
.where(Listing.slug == slug)
.options(
joinedload(Listing.host),
selectinload(Listing.images),
selectinload(Listing.reviews).joinedload(Review.user),
)
)
def serialize_state(db: Session) -> dict:
listings = db.scalars(select(Listing).options(joinedload(Listing.host)).order_by(Listing.id)).all()
bookings = db.scalars(
select(Booking).options(joinedload(Booking.listing), joinedload(Booking.guest)).order_by(Booking.id)
).all()
wishlists = db.scalars(
select(WishlistItem).options(joinedload(WishlistItem.user), joinedload(WishlistItem.listing))
).all()
threads = db.scalars(
select(MessageThread)
.options(joinedload(MessageThread.listing), joinedload(MessageThread.host), joinedload(MessageThread.guest))
.order_by(MessageThread.id)
).all()
return {
"summary": {
"users": db.scalar(select(func.count(User.id))) or 0,
"listings": len(listings),
"bookings": len(bookings),
"wishlist_items": len(wishlists),
"threads": len(threads),
},
"bookings": [
{
"confirmation_code": booking.confirmation_code,
"status": booking.status,
"guest": booking.guest.email,
"listing": booking.listing.slug,
"check_in": booking.check_in.isoformat(),
"check_out": booking.check_out.isoformat(),
}
for booking in bookings
],
"wishlists": [
{
"user": item.user.email,
"listing": item.listing.slug,
}
for item in wishlists
],
"threads": [
{
"id": thread.id,
"listing": thread.listing.slug,
"guest": thread.guest.email,
"host": thread.host.email,
"subject": thread.subject,
}
for thread in threads
],
}
@app.get("/", response_class=HTMLResponse)
def home(
request: Request,
q: str = "",
city: str = "",
guests: str = "",
max_price: str = "",
min_rating: str = "",
amenity: str = "",
sort: str = "recommended",
db: Session = Depends(get_db),
):
try:
guests_filter = _parse_optional_int(guests)
max_price_filter = _parse_optional_int(max_price)
min_rating_filter = _parse_optional_float(min_rating)
except ValueError:
return redirect_with_notice("/", "Please use numbers for guests, price, and rating filters.")
listings = db.scalars(
select(Listing).options(joinedload(Listing.host), selectinload(Listing.images)).order_by(Listing.id)
).all()
def matches(listing: Listing) -> bool:
haystack = " ".join(
[listing.title, listing.city, listing.country, listing.neighborhood, listing.description]
).lower()
return (
(not q or q.lower() in haystack)
and (not city or listing.city == city)
and (guests_filter is None or listing.max_guests >= guests_filter)
and (max_price_filter is None or listing.price_per_night <= max_price_filter)
and (min_rating_filter is None or listing.rating >= min_rating_filter)
and (not amenity or amenity in listing.amenities)
)
filtered = [listing for listing in listings if matches(listing)]
if sort == "price_low":
filtered.sort(key=lambda item: item.price_per_night)
elif sort == "price_high":
filtered.sort(key=lambda item: item.price_per_night, reverse=True)
elif sort == "rating":
filtered.sort(key=lambda item: (item.rating, item.review_count), reverse=True)
else:
filtered.sort(key=lambda item: (item.rating * item.review_count, item.review_count), reverse=True)
all_amenities = sorted({amenity_name for listing in listings for amenity_name in listing.amenities})
cities = sorted({listing.city for listing in listings})
editorial_listings = sorted(
listings,
key=lambda item: (item.rating, item.review_count),
reverse=True,
)[:3]
return templates.TemplateResponse(
request=request,
name="home.html",
context=build_context(
request,
db,
home_cards=[
{
"listing": listing,
"card_meta": get_listing_card_meta(listing),
}
for listing in filtered
],
results_count=len(filtered),
marketplace_listing_count=len(listings),
cities=cities,
amenities=all_amenities,
city_count=len(cities),
editorial_cards=[
{
"listing": listing,
"content": get_listing_content(listing),
}
for listing in editorial_listings
],
),
)
@app.get("/guide", response_class=HTMLResponse)
def guide(request: Request, db: Session = Depends(get_db)):
users = db.scalars(select(User).order_by(User.id)).all()
return templates.TemplateResponse(
request=request,
name="guide.html",
context=build_context(request, db, users=users),
)
@app.post("/reset")
def browser_reset():
reset_database()
return redirect_with_notice("/guide", "Environment reset to the seeded state.")
@app.get("/login", response_class=HTMLResponse)
def login_page(request: Request, next: str = "/", db: Session = Depends(get_db)):
users = db.scalars(select(User).order_by(User.id)).all()
return templates.TemplateResponse(
request=request,
name="login.html",
context=build_context(request, db, next_path=normalize_redirect_target(next), users=users),
)
@app.post("/login")
def login(
request: Request,
email: str = Form(...),
password: str = Form(...),
next: str = Form("/"),
db: Session = Depends(get_db),
):
user = db.scalar(select(User).where(User.email == email))
if user is None or user.password != password:
users = db.scalars(select(User).order_by(User.id)).all()
return templates.TemplateResponse(
request=request,
name="login.html",
context=build_context(
request,
db,
error="That email and password combination does not match the seeded demo data.",
next_path=normalize_redirect_target(next),
users=users,
),
status_code=400,
)
request.session["user_id"] = user.id
return redirect_with_notice(normalize_redirect_target(next), f"Signed in as {user.name}.")
@app.post("/logout")
def logout(request: Request):
request.session.clear()
return redirect_with_notice("/", "Signed out.")
@app.get("/switch/{user_id}")
def quick_switch(request: Request, user_id: int, next: str = "/", db: Session = Depends(get_db)):
user = db.get(User, user_id)
if user is None:
return redirect_with_notice("/", "That account does not exist.")
request.session["user_id"] = user.id
return redirect_with_notice(normalize_redirect_target(next), f"Switched to {user.name}.")
@app.get("/listings/{slug}", response_class=HTMLResponse, name="listing_detail")
def listing_detail(request: Request, slug: str, db: Session = Depends(get_db)):
listing = db.scalar(
select(Listing)
.where(Listing.slug == slug)
.options(
joinedload(Listing.host),
selectinload(Listing.images),
selectinload(Listing.reviews).joinedload(Review.user),
)
)
if listing is None:
return templates.TemplateResponse(
request=request,
name="home.html",
context=build_context(
request,
db,
home_cards=[],
results_count=0,
marketplace_listing_count=0,
cities=[],
amenities=[],
city_count=0,
editorial_cards=[],
),
status_code=404,
)
current_user = get_current_user(request, db)
listing_content = get_listing_content(listing)
wishlist_item = None
if current_user is not None:
wishlist_item = db.scalar(
select(WishlistItem).where(
WishlistItem.user_id == current_user.id,
WishlistItem.listing_id == listing.id,
)
)
next_unavailable = db.scalars(
select(Availability)
.where(
Availability.listing_id == listing.id,
Availability.is_available.is_(False),
Availability.date >= date.today(),
)
.order_by(Availability.date)
.limit(8)
).all()
first_availability = db.scalar(
select(Availability)
.where(Availability.listing_id == listing.id, Availability.is_available.is_(True))
.order_by(Availability.date)
)
nearby_listings = db.scalars(
select(Listing)
.where(Listing.id != listing.id)
.options(joinedload(Listing.host), selectinload(Listing.images))
.order_by(Listing.rating.desc(), Listing.review_count.desc())
.limit(3)
).all()
return templates.TemplateResponse(
request=request,
name="listing.html",
context=build_context(
request,
db,
listing=listing,
listing_content=listing_content,
gallery_images=listing_content["gallery_images"] or [
{"url": image.url, "alt_text": image.alt_text} for image in listing.images
],
next_unavailable=next_unavailable,
has_wishlisted=wishlist_item is not None,
first_available_date=first_availability.date.isoformat() if first_availability else "",
nearby_cards=[
{
"listing": other,
"card_meta": get_listing_card_meta(other),
}
for other in nearby_listings
],
),
)
@app.post("/listings/{listing_id}/book")
def book_listing(
request: Request,
listing_id: int,
check_in: str = Form(...),
check_out: str = Form(...),
guests: int = Form(...),
db: Session = Depends(get_db),
):
listing = db.get(Listing, listing_id)
if listing is None:
return redirect_with_notice("/", "Listing not found.")
current_user = require_login(request, db, f"/listings/{listing.slug}")
if isinstance(current_user, RedirectResponse):
return current_user
if current_user.id == listing.host_id:
return redirect_with_notice(f"/listings/{listing.slug}", "Hosts cannot book their own listing.")
check_in_date = parse_date(check_in)
check_out_date = parse_date(check_out)
if check_out_date <= check_in_date:
return redirect_with_notice(f"/listings/{listing.slug}", "Choose a check-out date after check-in.")
if guests > listing.max_guests:
return redirect_with_notice(f"/listings/{listing.slug}", "Guest count exceeds the listing capacity.")
availability_entries = db.scalars(
select(Availability).where(
Availability.listing_id == listing.id,
Availability.date >= check_in_date,
Availability.date < check_out_date,
)
).all()
nights = (check_out_date - check_in_date).days
if len(availability_entries) != nights or any(not entry.is_available for entry in availability_entries):
return redirect_with_notice(f"/listings/{listing.slug}", "Those dates are not fully available.")
next_booking_number = (db.scalar(select(func.max(Booking.id))) or 0) + 2001
total_price = nights * listing.price_per_night + listing.cleaning_fee + listing.service_fee
booking = Booking(
confirmation_code=f"BKG-{next_booking_number}",
listing_id=listing.id,
guest_id=current_user.id,
check_in=check_in_date,
check_out=check_out_date,
guests=guests,
total_price=round(total_price, 2),
status="confirmed",
created_at=datetime.utcnow(),
)
db.add(booking)
for availability_entry in availability_entries:
availability_entry.is_available = False
db.commit()
return redirect_with_notice("/trips", f"Booked {listing.title}.")
@app.post("/listings/{listing_id}/wishlist")
def toggle_wishlist(request: Request, listing_id: int, db: Session = Depends(get_db)):
listing = db.get(Listing, listing_id)
if listing is None:
return redirect_with_notice("/", "Listing not found.")
current_user = require_login(request, db, f"/listings/{listing.slug}")
if isinstance(current_user, RedirectResponse):
return current_user
existing = db.scalar(
select(WishlistItem).where(WishlistItem.user_id == current_user.id, WishlistItem.listing_id == listing_id)
)
if existing is None:
db.add(WishlistItem(user_id=current_user.id, listing_id=listing_id))
notice = f"Saved {listing.title}."
else:
db.delete(existing)
notice = f"Removed {listing.title} from the wishlist."
db.commit()
return redirect_with_notice(f"/listings/{listing.slug}", notice)
@app.post("/listings/{listing_id}/message")
def send_listing_message(
request: Request,
listing_id: int,
subject: str = Form("Question about the stay"),
body: str = Form(...),
db: Session = Depends(get_db),
):
listing = db.get(Listing, listing_id)
if listing is None:
return redirect_with_notice("/", "Listing not found.")
current_user = require_login(request, db, f"/listings/{listing.slug}")
if isinstance(current_user, RedirectResponse):
return current_user
thread = db.scalar(
select(MessageThread).where(
MessageThread.listing_id == listing_id,
MessageThread.guest_id == current_user.id,
MessageThread.host_id == listing.host_id,
)
)
if thread is None:
thread = MessageThread(
listing_id=listing_id,
guest_id=current_user.id,
host_id=listing.host_id,
subject=subject.strip() or "Question about the stay",
last_message_at=datetime.utcnow(),
)
db.add(thread)
db.flush()
thread.last_message_at = datetime.utcnow()
db.add(Message(thread_id=thread.id, sender_id=current_user.id, body=body.strip(), created_at=datetime.utcnow()))
db.commit()
return redirect_with_notice(f"/inbox?thread_id={thread.id}", "Message sent.")
@app.get("/trips", response_class=HTMLResponse)
def trips(request: Request, db: Session = Depends(get_db)):
current_user = require_login(request, db, "/trips")
if isinstance(current_user, RedirectResponse):
return current_user
bookings = db.scalars(
select(Booking)
.where(Booking.guest_id == current_user.id)
.options(joinedload(Booking.listing).joinedload(Listing.host), joinedload(Booking.listing).selectinload(Listing.images))
.order_by(Booking.check_in)
).all()
return templates.TemplateResponse(
request=request,
name="trips.html",
context=build_context(request, db, bookings=bookings),
)
@app.post("/bookings/{booking_id}/cancel")
def cancel_booking(request: Request, booking_id: int, db: Session = Depends(get_db)):
current_user = require_login(request, db, "/trips")
if isinstance(current_user, RedirectResponse):
return current_user
booking = db.scalar(
select(Booking).where(Booking.id == booking_id, Booking.guest_id == current_user.id).options(joinedload(Booking.listing))
)
if booking is None:
return redirect_with_notice("/trips", "Booking not found.")
if booking.status == "canceled":
return redirect_with_notice("/trips", "That booking is already canceled.")
booking.status = "canceled"
recompute_availability(db, booking.listing, booking.check_in, booking.check_out)
db.commit()
return redirect_with_notice("/trips", f"Canceled {booking.confirmation_code}.")
@app.get("/wishlists", response_class=HTMLResponse)
def wishlists(request: Request, db: Session = Depends(get_db)):
current_user = require_login(request, db, "/wishlists")
if isinstance(current_user, RedirectResponse):
return current_user
items = db.scalars(
select(WishlistItem)
.where(WishlistItem.user_id == current_user.id)
.options(joinedload(WishlistItem.listing).joinedload(Listing.host), joinedload(WishlistItem.listing).selectinload(Listing.images))
).all()
return templates.TemplateResponse(
request=request,
name="wishlists.html",
context=build_context(
request,
db,
wishlist_cards=[
{
"item": item,
"card_meta": get_listing_card_meta(item.listing),
}
for item in items
],
),
)
@app.get("/inbox", response_class=HTMLResponse)
def inbox(request: Request, thread_id: int | None = None, db: Session = Depends(get_db)):
current_user = require_login(request, db, "/inbox")
if isinstance(current_user, RedirectResponse):
return current_user
threads = db.scalars(
select(MessageThread)
.where(or_(MessageThread.guest_id == current_user.id, MessageThread.host_id == current_user.id))
.options(
joinedload(MessageThread.listing),
joinedload(MessageThread.guest),
joinedload(MessageThread.host),
selectinload(MessageThread.messages).joinedload(Message.sender),
)
.order_by(MessageThread.last_message_at.desc())
).all()
selected_thread = next((thread for thread in threads if thread.id == thread_id), None)
if selected_thread is None and threads:
selected_thread = threads[0]
return templates.TemplateResponse(
request=request,
name="inbox.html",
context=build_context(request, db, threads=threads, selected_thread=selected_thread),
)
@app.post("/threads/{thread_id}/reply")
def reply_to_thread(request: Request, thread_id: int, body: str = Form(...), db: Session = Depends(get_db)):
current_user = require_login(request, db, f"/inbox?thread_id={thread_id}")
if isinstance(current_user, RedirectResponse):
return current_user
thread = db.scalar(
select(MessageThread).where(
MessageThread.id == thread_id,
or_(MessageThread.guest_id == current_user.id, MessageThread.host_id == current_user.id),
)
)
if thread is None:
return redirect_with_notice("/inbox", "Conversation not found.")
thread.last_message_at = datetime.utcnow()
db.add(Message(thread_id=thread.id, sender_id=current_user.id, body=body.strip(), created_at=datetime.utcnow()))
db.commit()
return redirect_with_notice(f"/inbox?thread_id={thread.id}", "Reply sent.")
@app.get("/host", response_class=HTMLResponse)
def host_dashboard(request: Request, db: Session = Depends(get_db)):
current_user = require_host(request, db, "/host")
if isinstance(current_user, RedirectResponse):
return current_user
today = date.today()
listings = db.scalars(
select(Listing)
.where(Listing.host_id == current_user.id)
.options(selectinload(Listing.images))
.order_by(Listing.id)
).all()
reservations = db.scalars(
select(Booking)
.join(Listing, Booking.listing_id == Listing.id)
.where(Listing.host_id == current_user.id)
.options(joinedload(Booking.guest), joinedload(Booking.listing))
.order_by(Booking.check_in)
).all()
reservations_by_listing: dict[int, list[Booking]] = {}
for booking in reservations:
reservations_by_listing.setdefault(booking.listing_id, []).append(booking)
confirmed_reservation_count = 0
blocked_window_count = 0
upcoming_reservations = [
booking for booking in reservations if booking.status == "confirmed" and booking.check_out >= today
]
host_cards = []
for listing in listings:
listing_reservations = reservations_by_listing.get(listing.id, [])
confirmed_count = sum(1 for booking in listing_reservations if booking.status == "confirmed")
confirmed_reservation_count += confirmed_count
blocked_windows = sorted(listing.blocked_ranges, key=lambda blocked_window: blocked_window["start"])
blocked_window_count += len(blocked_windows)
upcoming_listing_reservations = [
booking for booking in listing_reservations if booking.status == "confirmed" and booking.check_out >= today
]
host_cards.append(
{
"listing": listing,
"blocked_windows": blocked_windows,
"confirmed_count": confirmed_count,
"upcoming_count": len(upcoming_listing_reservations),
"next_arrival": min(
(booking.check_in for booking in upcoming_listing_reservations),
default=None,
),
}
)
return templates.TemplateResponse(
request=request,
name="host_dashboard.html",
context=build_context(
request,
db,
listings=listings,
reservations=reservations,
host_cards=host_cards,
today=today,
blocked_window_count=blocked_window_count,
confirmed_reservation_count=confirmed_reservation_count,
upcoming_reservations=upcoming_reservations,
),
)
@app.post("/host/listings/{listing_id}/block")
def block_listing_dates(
request: Request,
listing_id: int,
start_date: str = Form(...),
end_date: str = Form(...),
db: Session = Depends(get_db),
):
current_user = require_host(request, db, "/host")
if isinstance(current_user, RedirectResponse):
return current_user
listing = db.scalar(select(Listing).where(Listing.id == listing_id, Listing.host_id == current_user.id))
if listing is None:
return redirect_with_notice("/host", "Listing not found.")
block_start = parse_date(start_date)
block_end = parse_date(end_date)
if block_end <= block_start:
return redirect_with_notice("/host", "Choose an end date after the start date.")
listing.blocked_ranges = merge_ranges(
[*listing.blocked_ranges, {"start": block_start.isoformat(), "end": block_end.isoformat()}]
)
recompute_availability(db, listing, block_start, block_end)
db.commit()
return redirect_with_notice("/host", f"Blocked dates on {listing.title}.")
@app.get("/api/tasks")
def list_tasks(db: Session = Depends(get_db)):
tasks = db.scalars(
select(TaskDefinition).options(joinedload(TaskDefinition.persona)).order_by(TaskDefinition.id)
).all()
return {"tasks": [serialize_task(task) for task in tasks]}
@app.get("/api/tasks/{task_id}")
def task_detail(task_id: int, db: Session = Depends(get_db)):
task = db.scalar(
select(TaskDefinition).where(TaskDefinition.id == task_id).options(joinedload(TaskDefinition.persona))
)
if task is None:
return JSONResponse({"error": "Task not found."}, status_code=404)
return serialize_task(task)
@app.get("/api/tasks/{task_id}/evaluate")
@app.post("/api/tasks/{task_id}/evaluate")
def task_evaluate(task_id: int, db: Session = Depends(get_db)):
task = db.scalar(
select(TaskDefinition).where(TaskDefinition.id == task_id).options(joinedload(TaskDefinition.persona))
)
if task is None:
return JSONResponse({"error": "Task not found."}, status_code=404)
return evaluate_task(db, task)
@app.get("/api/state")
def state(db: Session = Depends(get_db)):
return serialize_state(db)
@app.post("/api/reset")
def api_reset(token: str = ""):
if settings.reset_token and token != settings.reset_token:
return JSONResponse({"error": "Invalid reset token."}, status_code=403)
reset_database()
with SessionLocal() as fresh_db:
return {"status": "ok", "message": "Environment reset.", "state": serialize_state(fresh_db)}
@app.get("/healthz")
def healthz():
return {"status": "ok"}