| import os |
| import json |
| import uuid |
| from datetime import datetime, timedelta |
| from typing import List, Dict, Optional, Any |
|
|
| import secrets |
| from fastapi import FastAPI, Depends, HTTPException, status, UploadFile, File, Form |
| from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm |
| from fastapi.staticfiles import StaticFiles |
| from fastapi.responses import FileResponse, RedirectResponse |
| from fastapi.middleware.cors import CORSMiddleware |
| from jose import JWTError, jwt |
| from passlib.context import CryptContext |
| from pydantic import BaseModel, Field |
|
|
| |
| |
| JWT_SECRET_KEY = os.environ.get("JWT_SECRET_KEY", secrets.token_hex(32)) |
| ALGORITHM = "HS256" |
| ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24 * 7 |
|
|
| |
| DATA_DIR = "data" |
| USERS_DB_FILE = os.path.join(DATA_DIR, "users.json") |
| UPLOAD_DIR = os.path.join(DATA_DIR, "uploads") |
|
|
| |
| os.makedirs(DATA_DIR, exist_ok=True) |
| os.makedirs(UPLOAD_DIR, exist_ok=True) |
|
|
| |
| pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") |
| oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") |
|
|
| |
| class Token(BaseModel): |
| access_token: str |
| token_type: str |
|
|
| class TokenData(BaseModel): |
| username: Optional[str] = None |
|
|
| class EpisodeUpdate(BaseModel): |
| show_id: str |
| show_title: str |
| season_number: int |
| episode_number: int |
|
|
| class UserBase(BaseModel): |
| username: str |
|
|
| class UserCreate(UserBase): |
| password: str |
|
|
| class UserInDB(UserBase): |
| hashed_password: str |
| profile_picture_url: Optional[str] = None |
| watch_history: List[Dict[str, Any]] = Field(default_factory=list) |
|
|
| class UserPublic(UserBase): |
| profile_picture_url: Optional[str] = None |
| watch_history_detailed: Dict[str, Any] = Field(default_factory=dict) |
|
|
|
|
| |
| def load_users() -> Dict[str, Dict]: |
| if not os.path.exists(USERS_DB_FILE): |
| return {} |
| with open(USERS_DB_FILE, "r") as f: |
| try: |
| return json.load(f) |
| except json.JSONDecodeError: |
| return {} |
|
|
| def save_users(users_db: Dict[str, Dict]): |
| with open(USERS_DB_FILE, "w") as f: |
| json.dump(users_db, f, indent=4) |
|
|
| |
| def verify_password(plain_password, hashed_password): |
| return pwd_context.verify(plain_password, hashed_password) |
|
|
| def get_password_hash(password): |
| return pwd_context.hash(password) |
|
|
| def create_access_token(data: dict, expires_delta: Optional[timedelta] = None): |
| to_encode = data.copy() |
| if expires_delta: |
| expire = datetime.utcnow() + expires_delta |
| else: |
| expire = datetime.utcnow() + timedelta(minutes=15) |
| to_encode.update({"exp": expire}) |
| encoded_jwt = jwt.encode(to_encode, JWT_SECRET_KEY, algorithm=ALGORITHM) |
| return encoded_jwt |
|
|
| |
| async def get_current_user(token: str = Depends(oauth2_scheme)) -> UserInDB: |
| credentials_exception = HTTPException( |
| status_code=status.HTTP_401_UNAUTHORIZED, |
| detail="Could not validate credentials", |
| headers={"WWW-Authenticate": "Bearer"}, |
| ) |
| try: |
| payload = jwt.decode(token, JWT_SECRET_KEY, algorithms=[ALGORITHM]) |
| username: str = payload.get("sub") |
| if username is None: |
| raise credentials_exception |
| token_data = TokenData(username=username) |
| except JWTError: |
| raise credentials_exception |
|
|
| users_db = load_users() |
| user_data = users_db.get(token_data.username) |
| if user_data is None: |
| raise credentials_exception |
|
|
| return UserInDB(**user_data) |
|
|
|
|
| |
| app = FastAPI(title="Media Auth API") |
|
|
| |
| |
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| |
| def structure_watch_history(history_list: List[Dict]) -> Dict: |
| """Transforms a flat list of watched episodes into a nested dictionary.""" |
| structured = {} |
| for item in history_list: |
| show_id = item.get("show_id") |
| show_title = item.get("show_title", "Unknown Show") |
| season_num = item.get("season_number") |
| episode_num = item.get("episode_number") |
|
|
| if show_id not in structured: |
| structured[show_id] = { |
| "show_id": show_id, |
| "title": show_title, |
| "seasons": {} |
| } |
| if season_num not in structured[show_id]["seasons"]: |
| structured[show_id]["seasons"][season_num] = { |
| "season_number": season_num, |
| "episodes": {} |
| } |
| structured[show_id]["seasons"][season_num]["episodes"][episode_num] = True |
| return structured |
|
|
|
|
| |
|
|
| @app.post("/token", response_model=Token, tags=["Authentication"]) |
| async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()): |
| """ |
| Standard OAuth2 login. Takes username and password from a form. |
| """ |
| users_db = load_users() |
| user_data = users_db.get(form_data.username) |
| if not user_data or not verify_password(form_data.password, user_data["hashed_password"]): |
| raise HTTPException( |
| status_code=status.HTTP_401_UNAUTHORIZED, |
| detail="Incorrect username or password", |
| headers={"WWW-Authenticate": "Bearer"}, |
| ) |
| access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) |
| access_token = create_access_token( |
| data={"sub": user_data["username"]}, expires_delta=access_token_expires |
| ) |
| return {"access_token": access_token, "token_type": "bearer"} |
|
|
|
|
| @app.post("/signup", status_code=status.HTTP_201_CREATED, tags=["Authentication"]) |
| async def signup_user(user: UserCreate): |
| """ |
| Creates a new user account. |
| """ |
| users_db = load_users() |
| if user.username in users_db: |
| raise HTTPException( |
| status_code=status.HTTP_400_BAD_REQUEST, |
| detail="Username already registered", |
| ) |
| hashed_password = get_password_hash(user.password) |
| new_user = UserInDB(username=user.username, hashed_password=hashed_password) |
| users_db[user.username] = new_user.dict() |
| save_users(users_db) |
| return {"message": "User created successfully. Please login."} |
|
|
|
|
| @app.get("/users/me", response_model=UserPublic, tags=["User"]) |
| async def read_users_me(current_user: UserInDB = Depends(get_current_user)): |
| """ |
| Fetch the profile of the currently authenticated user. |
| """ |
| |
| detailed_history = structure_watch_history(current_user.watch_history) |
| |
| |
| user_public_data = UserPublic( |
| username=current_user.username, |
| profile_picture_url=current_user.profile_picture_url, |
| watch_history_detailed=detailed_history |
| ) |
| return user_public_data |
|
|
|
|
| @app.post("/users/me/profile-picture", response_model=UserPublic, tags=["User"]) |
| async def upload_profile_picture( |
| file: UploadFile = File(...), |
| current_user: UserInDB = Depends(get_current_user) |
| ): |
| """ |
| Upload or update the user's profile picture. |
| """ |
| |
| file_extension = os.path.splitext(file.filename)[1] |
| unique_filename = f"{uuid.uuid4()}{file_extension}" |
| file_path = os.path.join(UPLOAD_DIR, unique_filename) |
|
|
| |
| with open(file_path, "wb") as buffer: |
| buffer.write(await file.read()) |
|
|
| |
| |
| profile_picture_url = f"/uploads/{unique_filename}" |
| users_db = load_users() |
| users_db[current_user.username]["profile_picture_url"] = profile_picture_url |
| save_users(users_db) |
|
|
| |
| current_user.profile_picture_url = profile_picture_url |
| detailed_history = structure_watch_history(current_user.watch_history) |
| return UserPublic( |
| username=current_user.username, |
| profile_picture_url=current_user.profile_picture_url, |
| watch_history_detailed=detailed_history |
| ) |
|
|
|
|
| @app.post("/users/me/watch-history", status_code=status.HTTP_200_OK, tags=["User"]) |
| async def update_watch_history( |
| episode_data: EpisodeUpdate, |
| current_user: UserInDB = Depends(get_current_user) |
| ): |
| """ |
| Adds a new episode to the user's watch history. |
| """ |
| users_db = load_users() |
| user_data = users_db[current_user.username] |
| |
| |
| episode_id = f"{episode_data.show_id}_{episode_data.season_number}_{episode_data.episode_number}" |
| |
| |
| is_already_watched = any( |
| (f"{item.get('show_id')}_{item.get('season_number')}_{item.get('episode_number')}" == episode_id) |
| for item in user_data["watch_history"] |
| ) |
|
|
| if not is_already_watched: |
| user_data["watch_history"].append(episode_data.dict()) |
| save_users(users_db) |
| return {"message": "Watch history updated."} |
| |
| return {"message": "Episode already in watch history."} |
|
|
|
|
| |
| |
| app.mount("/uploads", StaticFiles(directory=UPLOAD_DIR), name="uploads") |
|
|
| |
| |
| app.mount("/", StaticFiles(directory="static", html=True), name="static") |
|
|
|
|
| |
| @app.get("/", include_in_schema=False) |
| def root(): |
| return RedirectResponse(url="/login.html") |