Admin-Desk2 / app /api /calendar.py
Fred808's picture
Upload 86 files
b70ff07 verified
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, or_
from typing import List, Dict, Any
from datetime import datetime, timedelta
from ..core.dependencies import get_current_active_user
from ..db.database import get_db
from ..db.models import Event, User
from ..db.schemas import EventCreate, EventUpdate, EventInDB, RecurringEventCreate
router = APIRouter()
@router.post("/events", response_model=EventInDB)
async def create_event(
event: EventCreate,
current_user: User = Depends(get_current_active_user),
db: AsyncSession = Depends(get_db)
) -> EventInDB:
"""Create a new calendar event"""
db_event = Event(
user_id=current_user.id,
title=event.title,
description=event.description,
start_time=event.start_time,
end_time=event.end_time,
attendees=event.attendees,
is_all_day=event.is_all_day,
reminder_minutes=event.reminder_minutes,
status="scheduled",
attendee_responses={}
)
db.add(db_event)
await db.commit()
await db.refresh(db_event)
return db_event
@router.get("/events", response_model=List[EventInDB])
async def get_events(
start_date: datetime = Query(default=None),
end_date: datetime = Query(default=None),
include_attendee_events: bool = True,
current_user: User = Depends(get_current_active_user),
db: AsyncSession = Depends(get_db)
) -> List[EventInDB]:
"""Get user's events within a date range"""
if not start_date:
start_date = datetime.now()
if not end_date:
end_date = start_date + timedelta(days=30)
query = select(Event).where(
Event.start_time >= start_date,
Event.end_time <= end_date
)
if include_attendee_events:
query = query.where(or_(
Event.user_id == current_user.id,
Event.attendees.contains([str(current_user.id)])
))
else:
query = query.where(Event.user_id == current_user.id)
query = query.order_by(Event.start_time)
result = await db.execute(query)
return result.scalars().all()
@router.put("/events/{event_id}", response_model=EventInDB)
async def update_event(
event_id: int,
event_update: EventUpdate,
current_user: User = Depends(get_current_active_user),
db: AsyncSession = Depends(get_db)
) -> EventInDB:
"""Update an event"""
stmt = select(Event).where(
Event.id == event_id,
Event.user_id == current_user.id
)
result = await db.execute(stmt)
event = result.scalar_one_or_none()
if not event:
raise HTTPException(
status_code=404,
detail="Event not found or you don't have permission to update it"
)
# Update event fields
update_data = event_update.dict(exclude_unset=True)
for field, value in update_data.items():
setattr(event, field, value)
event.updated_at = datetime.utcnow()
await db.commit()
await db.refresh(event)
return event
@router.delete("/events/{event_id}")
async def delete_event(
event_id: int,
current_user: User = Depends(get_current_active_user),
db: AsyncSession = Depends(get_db)
) -> Dict[str, bool]:
"""Delete an event"""
stmt = select(Event).where(
Event.id == event_id,
Event.user_id == current_user.id
)
result = await db.execute(stmt)
event = result.scalar_one_or_none()
if not event:
raise HTTPException(
status_code=404,
detail="Event not found or you don't have permission to delete it"
)
await db.delete(event)
await db.commit()
return {"success": True}
@router.post("/events/{event_id}/respond")
async def respond_to_event(
event_id: int,
response: str,
current_user: User = Depends(get_current_active_user),
db: AsyncSession = Depends(get_db)
) -> Dict[str, bool]:
"""Respond to an event invitation"""
if response not in ["accepted", "declined", "maybe"]:
raise HTTPException(
status_code=400,
detail="Invalid response. Must be one of: accepted, declined, maybe"
)
stmt = select(Event).where(
Event.id == event_id,
Event.attendees.contains([str(current_user.id)])
)
result = await db.execute(stmt)
event = result.scalar_one_or_none()
if not event:
raise HTTPException(
status_code=404,
detail="Event not found or you are not invited to this event"
)
# Update the response in the attendee_responses dictionary
event.attendee_responses[str(current_user.id)] = response
event.updated_at = datetime.utcnow()
await db.commit()
return {"success": True}