Spaces:
Paused
Paused
| from fastapi import APIRouter, Depends, HTTPException, Query | |
| from sqlalchemy.ext.asyncio import AsyncSession | |
| from sqlalchemy import select, or_ | |
| from typing import List, Dict, Any | |
| from datetime import datetime, timedelta | |
| from ..core.dependencies import get_current_active_user | |
| from ..db.database import get_db | |
| from ..db.models import Event, User | |
| from ..db.schemas import EventCreate, EventUpdate, EventInDB, RecurringEventCreate | |
| router = APIRouter() | |
| async def create_event( | |
| event: EventCreate, | |
| current_user: User = Depends(get_current_active_user), | |
| db: AsyncSession = Depends(get_db) | |
| ) -> EventInDB: | |
| """Create a new calendar event""" | |
| db_event = Event( | |
| user_id=current_user.id, | |
| title=event.title, | |
| description=event.description, | |
| start_time=event.start_time, | |
| end_time=event.end_time, | |
| attendees=event.attendees, | |
| is_all_day=event.is_all_day, | |
| reminder_minutes=event.reminder_minutes, | |
| status="scheduled", | |
| attendee_responses={} | |
| ) | |
| db.add(db_event) | |
| await db.commit() | |
| await db.refresh(db_event) | |
| return db_event | |
| async def get_events( | |
| start_date: datetime = Query(default=None), | |
| end_date: datetime = Query(default=None), | |
| include_attendee_events: bool = True, | |
| current_user: User = Depends(get_current_active_user), | |
| db: AsyncSession = Depends(get_db) | |
| ) -> List[EventInDB]: | |
| """Get user's events within a date range""" | |
| if not start_date: | |
| start_date = datetime.now() | |
| if not end_date: | |
| end_date = start_date + timedelta(days=30) | |
| query = select(Event).where( | |
| Event.start_time >= start_date, | |
| Event.end_time <= end_date | |
| ) | |
| if include_attendee_events: | |
| query = query.where(or_( | |
| Event.user_id == current_user.id, | |
| Event.attendees.contains([str(current_user.id)]) | |
| )) | |
| else: | |
| query = query.where(Event.user_id == current_user.id) | |
| query = query.order_by(Event.start_time) | |
| result = await db.execute(query) | |
| return result.scalars().all() | |
| async def update_event( | |
| event_id: int, | |
| event_update: EventUpdate, | |
| current_user: User = Depends(get_current_active_user), | |
| db: AsyncSession = Depends(get_db) | |
| ) -> EventInDB: | |
| """Update an event""" | |
| stmt = select(Event).where( | |
| Event.id == event_id, | |
| Event.user_id == current_user.id | |
| ) | |
| result = await db.execute(stmt) | |
| event = result.scalar_one_or_none() | |
| if not event: | |
| raise HTTPException( | |
| status_code=404, | |
| detail="Event not found or you don't have permission to update it" | |
| ) | |
| # Update event fields | |
| update_data = event_update.dict(exclude_unset=True) | |
| for field, value in update_data.items(): | |
| setattr(event, field, value) | |
| event.updated_at = datetime.utcnow() | |
| await db.commit() | |
| await db.refresh(event) | |
| return event | |
| async def delete_event( | |
| event_id: int, | |
| current_user: User = Depends(get_current_active_user), | |
| db: AsyncSession = Depends(get_db) | |
| ) -> Dict[str, bool]: | |
| """Delete an event""" | |
| stmt = select(Event).where( | |
| Event.id == event_id, | |
| Event.user_id == current_user.id | |
| ) | |
| result = await db.execute(stmt) | |
| event = result.scalar_one_or_none() | |
| if not event: | |
| raise HTTPException( | |
| status_code=404, | |
| detail="Event not found or you don't have permission to delete it" | |
| ) | |
| await db.delete(event) | |
| await db.commit() | |
| return {"success": True} | |
| async def respond_to_event( | |
| event_id: int, | |
| response: str, | |
| current_user: User = Depends(get_current_active_user), | |
| db: AsyncSession = Depends(get_db) | |
| ) -> Dict[str, bool]: | |
| """Respond to an event invitation""" | |
| if response not in ["accepted", "declined", "maybe"]: | |
| raise HTTPException( | |
| status_code=400, | |
| detail="Invalid response. Must be one of: accepted, declined, maybe" | |
| ) | |
| stmt = select(Event).where( | |
| Event.id == event_id, | |
| Event.attendees.contains([str(current_user.id)]) | |
| ) | |
| result = await db.execute(stmt) | |
| event = result.scalar_one_or_none() | |
| if not event: | |
| raise HTTPException( | |
| status_code=404, | |
| detail="Event not found or you are not invited to this event" | |
| ) | |
| # Update the response in the attendee_responses dictionary | |
| event.attendee_responses[str(current_user.id)] = response | |
| event.updated_at = datetime.utcnow() | |
| await db.commit() | |
| return {"success": True} |