Chandima Prabhath
Optimize profile picture upload by resizing and compressing images before upload
fe33638
| from fastapi import FastAPI, HTTPException, Depends, status, APIRouter, UploadFile, File | |
| from fastapi.security import OAuth2PasswordRequestForm, OAuth2PasswordBearer | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from datetime import datetime, timedelta, timezone | |
| from passlib.context import CryptContext | |
| from typing import Optional, List | |
| from pydantic import BaseModel, Field, validator | |
| import jwt | |
| import database | |
| import cloudinary | |
| import cloudinary.uploader | |
| import os | |
| # JWT configuration | |
| SECRET_KEY = "06J3LND9NFH" | |
| ALGORITHM = "HS256" | |
| ACCESS_TOKEN_EXPIRE_MINUTES = 43200 | |
| CLOUD_NAME = os.getenv('CLOUD_NAME') | |
| API_KEY = os.getenv('API_KEY') | |
| API_SECRET = os.getenv('API_SECRET') | |
| app = FastAPI() | |
| # Uncomment and adjust CORS if needed: | |
| # app.add_middleware( | |
| # CORSMiddleware, | |
| # allow_origins=["http://localhost:5173", "http://localhost:*", "192.168.*.*:*"], | |
| # allow_credentials=True, | |
| # allow_methods=["*"], | |
| # allow_headers=["*"], | |
| # ) | |
| # Configure Cloudinary (ensure you have set your credentials accordingly) | |
| cloudinary.config( | |
| cloud_name=CLOUD_NAME, | |
| api_key=API_KEY, | |
| api_secret=API_SECRET | |
| ) | |
| # Define the OAuth2 scheme for OpenAPI docs. | |
| oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/login") | |
| # Password hashing utilities | |
| pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") | |
| def hash_password(password: str) -> str: | |
| return pwd_context.hash(password) | |
| def verify_password(plain_password: str, hashed_password: str) -> bool: | |
| return pwd_context.verify(plain_password, hashed_password) | |
| # ---------------------# | |
| # Pydantic Models # | |
| # ---------------------# | |
| # User models | |
| class UserCreate(BaseModel): | |
| username: str | |
| email: str | |
| password: str | |
| first_name: Optional[str] = None | |
| last_name: Optional[str] = None | |
| bio: Optional[str] = None | |
| profile_picture: Optional[str] = None | |
| class UserUpdate(BaseModel): | |
| username: Optional[str] = None | |
| email: Optional[str] = None | |
| password: Optional[str] = None | |
| first_name: Optional[str] = None | |
| last_name: Optional[str] = None | |
| bio: Optional[str] = None | |
| profile_picture: Optional[str] = None | |
| class UserOut(BaseModel): | |
| user_id: int | |
| username: str | |
| email: Optional[str] = None | |
| first_name: Optional[str] = None | |
| last_name: Optional[str] = None | |
| bio: Optional[str] = None | |
| profile_picture: Optional[str] = None | |
| # Auth token model | |
| class Token(BaseModel): | |
| access_token: str | |
| token_type: str | |
| # Role models | |
| class RoleCreate(BaseModel): | |
| role_name: str | |
| description: Optional[str] = None | |
| class RoleOut(BaseModel): | |
| role_id: int | |
| role_name: str | |
| description: Optional[str] = None | |
| # Permission models | |
| class PermissionCreate(BaseModel): | |
| permission_name: str | |
| description: Optional[str] = None | |
| class PermissionOut(BaseModel): | |
| permission_id: int | |
| permission_name: str | |
| description: Optional[str] = None | |
| # Location models | |
| class LocationCreate(BaseModel): | |
| name: str | |
| address: str | |
| city: str | |
| state: Optional[str] = None | |
| country: Optional[str] = None | |
| latitude: float | |
| longitude: float | |
| class LocationOut(BaseModel): | |
| location_id: int | |
| name: str | |
| address: str | |
| city: str | |
| state: Optional[str] = None | |
| country: Optional[str] = None | |
| latitude: float | |
| longitude: float | |
| # Event models | |
| class EventCreate(BaseModel): | |
| host_id: int | |
| location_id: int | |
| title: str | |
| start_time: datetime | |
| end_time: datetime | |
| description: Optional[str] = None | |
| category: Optional[str] = None | |
| max_participants: Optional[int] = None | |
| event_picture: Optional[str] = None | |
| is_recurring: bool = False | |
| recurrence_type: Optional[str] = None # daily, weekly, monthly, yearly, custom | |
| recurrence_interval: Optional[int] = None | |
| recurrence_end_date: Optional[datetime] = None | |
| custom_recurrence_pattern: Optional[str] = None | |
| def ensure_utc(cls, value): | |
| if value is None: | |
| return value | |
| if isinstance(value, str): | |
| value = datetime.fromisoformat(value.replace("Z", "+00:00")) | |
| if value.tzinfo is None: | |
| value = value.replace(tzinfo=timezone.utc) | |
| return value.astimezone(timezone.utc) | |
| class Config: | |
| json_encoders = { | |
| datetime: lambda v: v.astimezone(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") | |
| } | |
| json_schema_extra = { | |
| "example": { | |
| "host_id": 1, | |
| "location_id": 1, | |
| "title": "Test Party", | |
| "start_time": "2025-03-22 06:29:49", | |
| "end_time": "2025-03-22 06:29:49", | |
| "description": "This is a test party", | |
| "category": "party", | |
| "max_participants": 100, | |
| "event_picture": "https://example.com/image.jpg", | |
| "is_recurring": False, | |
| "recurrence_type": "", | |
| "recurrence_interval": 0, | |
| "recurrence_end_date": "2025-03-22 06:29:49", | |
| "custom_recurrence_pattern": "" | |
| } | |
| } | |
| class EventUpdate(BaseModel): | |
| host_id: Optional[int] = None | |
| location_id: Optional[int] = None | |
| title: Optional[str] = None | |
| start_time: Optional[datetime] = None | |
| end_time: Optional[datetime] = None | |
| description: Optional[str] = None | |
| category: Optional[str] = None | |
| max_participants: Optional[int] = None | |
| event_picture: Optional[str] = None | |
| is_recurring: Optional[bool] = None | |
| recurrence_type: Optional[str] = None | |
| recurrence_interval: Optional[int] = None | |
| recurrence_end_date: Optional[datetime] = None | |
| custom_recurrence_pattern: Optional[str] = None | |
| class Config: | |
| json_encoders = { | |
| datetime: lambda v: (v if v.tzinfo else v.replace(tzinfo=timezone.utc)).isoformat() | |
| } | |
| class EventOut(BaseModel): | |
| event_id: int | |
| host_id: int | |
| location_id: int | |
| title: str | |
| start_time: datetime | |
| end_time: datetime | |
| description: Optional[str] = None | |
| category: Optional[str] = None | |
| max_participants: Optional[int] = None | |
| event_picture: Optional[str] = None | |
| is_recurring: bool = False | |
| recurrence_type: Optional[str] = None | |
| recurrence_interval: Optional[int] = None | |
| recurrence_end_date: Optional[datetime] = None | |
| custom_recurrence_pattern: Optional[str] = None | |
| class Config: | |
| json_encoders = { | |
| datetime: lambda v: (v if v.tzinfo else v.replace(tzinfo=timezone.utc)).isoformat() | |
| } | |
| # ---------------------# | |
| # Auth Functions # | |
| # ---------------------# | |
| def authenticate_user(username: str, password: str): | |
| user = database.get_user_by_username(username) | |
| if not user or not verify_password(password, user["password_hash"]): | |
| return None | |
| return user | |
| def create_access_token(data: dict, expires_delta: timedelta = None): | |
| to_encode = data.copy() | |
| expire = datetime.utcnow().replace(tzinfo=timezone.utc) + (expires_delta if expires_delta else timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)) | |
| to_encode.update({"exp": expire}) | |
| encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) | |
| return encoded_jwt | |
| # Updated get_current_user now uses OAuth2PasswordBearer for token extraction. | |
| async def get_current_user(token: str = Depends(oauth2_scheme)): | |
| try: | |
| payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) | |
| username: str = payload.get("sub") | |
| if username is None: | |
| raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token") | |
| except jwt.PyJWTError: | |
| raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token") | |
| user = database.get_user_by_username(username) | |
| if user is None: | |
| raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found") | |
| return user | |
| def admin_required(current_user: dict = Depends(get_current_user)): | |
| roles = database.get_roles_by_user(current_user["user_id"]) | |
| if not any(role["role_name"] == "admin" for role in roles): | |
| raise HTTPException(status_code=403, detail="Admin privileges required") | |
| return current_user | |
| # ---------------------# | |
| # Startup # | |
| # ---------------------# | |
| def startup(): | |
| # Create all tables | |
| database.create_tables() | |
| # Create default admin user and role if they do not exist | |
| admin_username = "admin" | |
| admin_email = "admin@example.com" | |
| admin_password = "admin123" # default password; change after first login | |
| admin_user = database.get_user_by_username(admin_username) | |
| if not admin_user: | |
| admin_user_id = database.create_user(admin_username, admin_email, hash_password(admin_password)) | |
| admin_user = database.get_user_by_id(admin_user_id) | |
| admin_role = database.get_role_by_name("admin") | |
| if not admin_role: | |
| admin_role_id = database.create_role("admin", "Administrator with full privileges") | |
| admin_role = database.get_role_by_id(admin_role_id) | |
| # Assign admin role to admin user if not already assigned | |
| roles = database.get_roles_by_user(admin_user["user_id"]) | |
| if not any(role["role_name"] == "admin" for role in roles): | |
| database.add_user_role(admin_user["user_id"], admin_role["role_id"]) | |
| # ---------------------# | |
| # Routers # | |
| # ---------------------# | |
| # User Router | |
| user_router = APIRouter(prefix="/api/v1/user", tags=["User"]) | |
| async def all_users(page: int = 1, per_page: int = 100, current_user: dict = Depends(get_current_user)): | |
| users = database.get_all_users(page=page, per_page=per_page) | |
| return users | |
| def create_new_user(user: UserCreate): | |
| if database.get_user_by_username(user.username): | |
| raise HTTPException(status_code=400, detail="Username already registered") | |
| password_hash = hash_password(user.password) | |
| user_id = database.create_user(user.username, user.email, password_hash, user.first_name, user.last_name, user.bio, user.profile_picture) | |
| new_user = database.get_user_by_id(user_id) | |
| return new_user | |
| def read_user(user_id: int, current_user: dict = Depends(get_current_user)): | |
| user = database.get_user_by_id(user_id) | |
| if not user: | |
| raise HTTPException(status_code=404, detail="User not found") | |
| return user | |
| def update_existing_user(user_id: int, user_update: UserUpdate, current_user: dict = Depends(get_current_user)): | |
| user = database.get_user_by_id(user_id) | |
| if not user: | |
| raise HTTPException(status_code=404, detail="User not found") | |
| new_username = user_update.username if user_update.username else None | |
| new_email = user_update.email if user_update.email else None | |
| new_password_hash = hash_password(user_update.password) if user_update.password else None | |
| database.update_user( | |
| user_id, | |
| username=new_username, | |
| email=new_email, | |
| password_hash=new_password_hash, | |
| first_name=user_update.first_name, | |
| last_name=user_update.last_name, | |
| bio=user_update.bio, | |
| profile_picture=user_update.profile_picture | |
| ) | |
| updated_user = database.get_user_by_id(user_id) | |
| return updated_user | |
| def delete_existing_user(user_id: int, current_user: dict = Depends(get_current_user)): | |
| user = database.get_user_by_id(user_id) | |
| if not user: | |
| raise HTTPException(status_code=404, detail="User not found") | |
| database.delete_user(user_id) | |
| return {"detail": "User deleted"} | |
| async def upload_profile_picture(file: UploadFile = File(...), current_user: dict = Depends(get_current_user)): | |
| try: | |
| # Optimize the image: resize to 512x512, convert to JPEG, and apply eco quality for compression | |
| result = cloudinary.uploader.upload( | |
| file.file, | |
| transformation=[{ | |
| "width": 512, | |
| "height": 512, | |
| "crop": "limit", | |
| "quality": "auto:eco", | |
| "fetch_format": "jpg" | |
| }] | |
| ) | |
| except Exception as e: | |
| raise HTTPException(status_code=400, detail=f"Image upload failed: {str(e)}") | |
| image_url = result.get("secure_url") | |
| if not image_url: | |
| raise HTTPException(status_code=400, detail="Failed to obtain image URL from Cloudinary") | |
| # Update the user's profile picture in the database | |
| database.update_user( | |
| current_user["user_id"], | |
| username=None, | |
| email=None, | |
| password_hash=None, | |
| first_name=None, | |
| last_name=None, | |
| bio=None, | |
| profile_picture=image_url | |
| ) | |
| updated_user = database.get_user_by_id(current_user["user_id"]) | |
| return updated_user | |
| # Authentication Router | |
| auth_router = APIRouter(prefix="/api/v1/auth", tags=["Authentication"]) | |
| def login(form_data: OAuth2PasswordRequestForm = Depends()): | |
| user = authenticate_user(form_data.username, form_data.password) | |
| if not user: | |
| raise HTTPException(status_code=400, detail="Incorrect username or password") | |
| access_token = create_access_token(data={"sub": user["username"]}) | |
| return {"access_token": access_token, "token_type": "bearer"} | |
| def read_current_user_endpoint(current_user: dict = Depends(get_current_user)): | |
| return current_user | |
| # Admin Router for Roles and Permissions management | |
| admin_router = APIRouter(prefix="/api/v1/admin", tags=["Admin"]) | |
| def create_role(role: RoleCreate): | |
| role_id = database.create_role(role.role_name, role.description) | |
| new_role = database.get_role_by_id(role_id) | |
| return new_role | |
| def get_all_roles(page: int = 1, per_page: int = 100): | |
| roles = database.get_all_roles(page=page, per_page=per_page) | |
| return roles | |
| def get_role(role_id: int): | |
| role = database.get_role_by_id(role_id) | |
| if not role: | |
| raise HTTPException(status_code=404, detail="Role not found") | |
| return role | |
| def update_role(role_id: int, role: RoleCreate): | |
| database.update_role(role_id, role.role_name, role.description) | |
| updated_role = database.get_role_by_id(role_id) | |
| return updated_role | |
| def delete_role(role_id: int): | |
| database.delete_role(role_id) | |
| return {"detail": "Role deleted"} | |
| def create_permission(permission: PermissionCreate): | |
| permission_id = database.create_permission(permission.permission_name, permission.description) | |
| new_permission = database.get_permission_by_id(permission_id) | |
| return new_permission | |
| def get_all_permissions(page: int = 1, per_page: int = 100): | |
| permissions = database.get_all_permissions(page=page, per_page=per_page) | |
| return permissions | |
| def get_permission(permission_id: int): | |
| permission = database.get_permission_by_id(permission_id) | |
| if not permission: | |
| raise HTTPException(status_code=404, detail="Permission not found") | |
| return permission | |
| def update_permission(permission_id: int, permission: PermissionCreate): | |
| database.update_permission(permission_id, permission.permission_name, permission.description) | |
| updated_permission = database.get_permission_by_id(permission_id) | |
| return updated_permission | |
| def delete_permission(permission_id: int): | |
| database.delete_permission(permission_id) | |
| return {"detail": "Permission deleted"} | |
| # Locations Router | |
| location_router = APIRouter(prefix="/api/v1/locations", tags=["Locations"]) | |
| def create_location(location: LocationCreate, current_user: dict = Depends(get_current_user)): | |
| location_id = database.create_location(location.name, location.address, location.city, location.state, location.country, location.latitude, location.longitude) | |
| new_location = database.get_location_by_id(location_id) | |
| return new_location | |
| def get_all_locations(page: int = 1, per_page: int = 100, current_user: dict = Depends(get_current_user)): | |
| locations = database.get_all_locations(page=page, per_page=per_page) | |
| return locations | |
| def get_location(location_id: int, current_user: dict = Depends(get_current_user)): | |
| location = database.get_location_by_id(location_id) | |
| if not location: | |
| raise HTTPException(status_code=404, detail="Location not found") | |
| return location | |
| def update_location(location_id: int, location: LocationCreate, current_user: dict = Depends(get_current_user)): | |
| database.update_location(location_id, location.name, location.address, location.city, location.state, location.country, location.latitude, location.longitude) | |
| updated_location = database.get_location_by_id(location_id) | |
| return updated_location | |
| def delete_location(location_id: int, current_user: dict = Depends(get_current_user)): | |
| database.delete_location(location_id) | |
| return {"detail": "Location deleted"} | |
| # Events Router | |
| event_router = APIRouter(prefix="/api/v1/events", tags=["Events"]) | |
| def create_event(event: EventCreate, current_user: dict = Depends(get_current_user)): | |
| recurrence_type = event.recurrence_type.strip() if event.recurrence_type and event.recurrence_type.strip() else None | |
| recurrence_interval = event.recurrence_interval if event.recurrence_interval and event.recurrence_interval != 0 else None | |
| recurrence_end_date = event.recurrence_end_date if event.recurrence_end_date else None | |
| custom_recurrence_pattern = event.custom_recurrence_pattern.strip() if event.custom_recurrence_pattern and event.custom_recurrence_pattern.strip() else None | |
| event_id = database.create_event( | |
| event.host_id, | |
| event.location_id, | |
| event.title, | |
| event.start_time, | |
| event.end_time, | |
| event.description, | |
| event.category, | |
| event.max_participants, | |
| event.event_picture, | |
| event.is_recurring, | |
| recurrence_type, | |
| recurrence_interval, | |
| recurrence_end_date, | |
| custom_recurrence_pattern | |
| ) | |
| new_event = database.get_event_by_id(event_id) | |
| return new_event | |
| def get_all_events(page: int = 1, per_page: int = 100, current_user: dict = Depends(get_current_user)): | |
| events = database.get_all_events(page=page, per_page=per_page) | |
| return events | |
| def get_event(event_id: int, current_user: dict = Depends(get_current_user)): | |
| event = database.get_event_by_id(event_id) | |
| if not event: | |
| raise HTTPException(status_code=404, detail="Event not found") | |
| return event | |
| def update_event(event_id: int, event: EventUpdate, current_user: dict = Depends(get_current_user)): | |
| database.update_event( | |
| event_id, | |
| host_id=event.host_id, | |
| location_id=event.location_id, | |
| title=event.title, | |
| start_time=event.start_time, | |
| end_time=event.end_time, | |
| description=event.description, | |
| category=event.category, | |
| max_participants=event.max_participants, | |
| event_picture=event.event_picture, | |
| is_recurring=event.is_recurring, | |
| recurrence_type=event.recurrence_type, | |
| recurrence_interval=event.recurrence_interval, | |
| recurrence_end_date=event.recurrence_end_date, | |
| custom_recurrence_pattern=event.custom_recurrence_pattern | |
| ) | |
| updated_event = database.get_event_by_id(event_id) | |
| return updated_event | |
| def delete_event(event_id: int, current_user: dict = Depends(get_current_user)): | |
| database.delete_event(event_id) | |
| return {"detail": "Event deleted"} | |
| def join_event(event_id: int, current_user: dict = Depends(get_current_user)): | |
| result = database.add_event_participant(event_id, current_user["user_id"]) | |
| if result == 0: | |
| raise HTTPException(status_code=400, detail="Could not join event") | |
| return {"detail": "Joined event successfully"} | |
| def leave_event(event_id: int, current_user: dict = Depends(get_current_user)): | |
| result = database.remove_event_participant(event_id, current_user["user_id"]) | |
| if result == 0: | |
| raise HTTPException(status_code=400, detail="Could not leave event") | |
| return {"detail": "Left event successfully"} | |
| def get_all_event_categories(current_user: dict = Depends(get_current_user)): | |
| categories = database.get_all_event_categories() | |
| return categories | |
| def get_events_by_category(category: str, current_user: dict = Depends(get_current_user)): | |
| events = database.get_events_by_category(category) | |
| return events | |
| # Include routers in the main app | |
| app.include_router(user_router) | |
| app.include_router(auth_router) | |
| app.include_router(admin_router) | |
| app.include_router(location_router) | |
| app.include_router(event_router) | |