| import os |
| from typing import Optional, List |
| from datetime import datetime |
| from pathlib import Path |
|
|
| from fastapi import FastAPI, Depends, HTTPException, status, Request, Form |
| from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm |
| from fastapi.staticfiles import StaticFiles |
| from fastapi.templating import Jinja2Templates |
| from fastapi.responses import RedirectResponse |
| from pydantic import BaseModel, EmailStr |
| from passlib.context import CryptContext |
|
|
| from sqlalchemy import create_engine, Column, Integer, String, Text, ForeignKey, TIMESTAMP |
| from sqlalchemy.sql import func |
| from sqlalchemy.orm import sessionmaker, declarative_base, relationship, Session |
|
|
| |
| |
| |
| SQLALCHEMY_DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://postgres.nujbaaxesubgawnazmza:gVgyOv6DL9vkzGmh@aws-1-ap-south-1.pooler.supabase.com:6543/postgres") |
|
|
| engine = create_engine( |
| SQLALCHEMY_DATABASE_URL, |
| pool_pre_ping=True, |
| pool_size=5, |
| max_overflow=10, |
| pool_recycle=300, |
| connect_args={"connect_timeout": 10}, |
| ) |
| SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) |
|
|
| Base = declarative_base() |
|
|
| def get_db(): |
| db = SessionLocal() |
| try: |
| yield db |
| finally: |
| db.close() |
|
|
| |
| |
| |
| class User(Base): |
| __tablename__ = "users" |
|
|
| id = Column(Integer, primary_key=True, index=True) |
| name = Column(String(100), nullable=False) |
| age = Column(Integer, nullable=False) |
| address = Column(Text, nullable=False) |
| email = Column(String(100), unique=True, index=True, nullable=False) |
| mobile_number = Column(String(20), nullable=False) |
| password_hash = Column(String(255), nullable=False) |
| created_at = Column(TIMESTAMP, server_default=func.now()) |
|
|
| files = relationship("File", back_populates="owner") |
|
|
| class File(Base): |
| __tablename__ = "files" |
|
|
| id = Column(Integer, primary_key=True, index=True) |
| filename = Column(String(255), nullable=False) |
| file_type = Column(String(50), nullable=False) |
| user_id = Column(Integer, ForeignKey("users.id", ondelete="CASCADE"), nullable=False) |
| upload_timestamp = Column(TIMESTAMP, server_default=func.now()) |
|
|
| owner = relationship("User", back_populates="files") |
|
|
| |
| |
| |
| class UserCreate(BaseModel): |
| name: str |
| age: int |
| address: str |
| email: EmailStr |
| mobile_number: str |
| password: str |
|
|
| class UserLogin(BaseModel): |
| email: EmailStr |
| password: str |
|
|
| class UserResponse(BaseModel): |
| id: int |
| name: str |
| email: str |
|
|
| class Config: |
| from_attributes = True |
|
|
| class Token(BaseModel): |
| access_token: str |
| token_type: str |
|
|
| class TokenData(BaseModel): |
| email: Optional[str] = None |
|
|
| |
| |
| |
|
|
| app = FastAPI(title="Task 1 API") |
|
|
| @app.on_event("startup") |
| def on_startup(): |
| """Create DB tables on startup — retry-safe.""" |
| try: |
| Base.metadata.create_all(bind=engine) |
| print("✅ Database tables created / verified.") |
| except Exception as e: |
| print(f"⚠️ Could not connect to database on startup: {e}") |
| print(" The app will start anyway; DB operations will fail until the DB is reachable.") |
|
|
| |
| BASE_DIR = Path(__file__).resolve().parent |
| app.mount("/static", StaticFiles(directory=str(BASE_DIR / "static")), name="static") |
| templates = Jinja2Templates(directory=str(BASE_DIR / "templates")) |
|
|
| pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") |
| oauth2_scheme = OAuth2PasswordBearer(tokenUrl="api/login") |
|
|
| |
| def verify_password(plain_password, hashed_password): |
| return pwd_context.verify(plain_password[:72], hashed_password) |
|
|
| def get_password_hash(password): |
| return pwd_context.hash(password[:72]) |
|
|
| def get_user_by_email(db: Session, email: str): |
| return db.query(User).filter(User.email == email).first() |
|
|
| |
| |
| |
| @app.post("/api/signup", response_model=UserResponse, status_code=status.HTTP_201_CREATED) |
| def signup(user: UserCreate, db: Session = Depends(get_db)): |
| db_user = get_user_by_email(db, email=user.email) |
| if db_user: |
| raise HTTPException(status_code=400, detail="Email already registered") |
| |
| hashed_password = get_password_hash(user.password) |
| |
| new_user = User( |
| name=user.name, |
| age=user.age, |
| address=user.address, |
| email=user.email, |
| mobile_number=user.mobile_number, |
| password_hash=hashed_password |
| ) |
| |
| db.add(new_user) |
| db.commit() |
| db.refresh(new_user) |
| |
| return new_user |
|
|
| @app.post("/api/login", response_model=Token) |
| def login(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)): |
| |
| |
| user = get_user_by_email(db, email=form_data.username) |
| if not user: |
| raise HTTPException( |
| status_code=status.HTTP_401_UNAUTHORIZED, |
| detail="Invalid credentials", |
| headers={"WWW-Authenticate": "Bearer"}, |
| ) |
| |
| if not verify_password(form_data.password, user.password_hash): |
| raise HTTPException( |
| status_code=status.HTTP_401_UNAUTHORIZED, |
| detail="Invalid credentials", |
| headers={"WWW-Authenticate": "Bearer"}, |
| ) |
| |
| |
| |
| access_token = f"fake-jwt-token-for-{user.email}" |
| |
| return {"access_token": access_token, "token_type": "bearer"} |
|
|
| @app.get("/api/users/me", response_model=UserResponse) |
| def read_users_me(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)): |
| |
| email = token.replace("fake-jwt-token-for-", "") |
| user = get_user_by_email(db, email=email) |
| if user is None: |
| raise HTTPException(status_code=401, detail="Invalid token") |
| return user |
|
|
| |
| |
| |
|
|
| UPLOAD_DIR = "uploads" |
| os.makedirs(UPLOAD_DIR, exist_ok=True) |
| ALLOWED_EXTENSIONS = {".pdf", ".png", ".jpeg", ".jpg"} |
|
|
| from fastapi import UploadFile, File as FastAPIFile |
| from fastapi.responses import FileResponse |
| import shutil |
|
|
| @app.post("/api/upload") |
| async def upload_file( |
| file: UploadFile = FastAPIFile(...), |
| token: str = Depends(oauth2_scheme), |
| db: Session = Depends(get_db) |
| ): |
| |
| email = token.replace("fake-jwt-token-for-", "") |
| user = get_user_by_email(db, email=email) |
| if not user: |
| raise HTTPException(status_code=401, detail="Invalid token") |
|
|
| |
| file_ext = os.path.splitext(file.filename)[1].lower() |
| if file_ext not in ALLOWED_EXTENSIONS: |
| raise HTTPException(status_code=400, detail=f"Invalid file type. Allowed: {', '.join(ALLOWED_EXTENSIONS)}") |
|
|
| |
| custom_filename = f"{user.id}_{user.name.replace(' ', '_')}_{file.filename}" |
| file_path = os.path.join(UPLOAD_DIR, custom_filename) |
|
|
| |
| with open(file_path, "wb") as buffer: |
| shutil.copyfileobj(file.file, buffer) |
|
|
| |
| db_file = File( |
| filename=custom_filename, |
| file_type=file_ext, |
| user_id=user.id |
| ) |
| db.add(db_file) |
| db.commit() |
| db.refresh(db_file) |
|
|
| return {"message": "File uploaded successfully", "file_id": db_file.id, "filename": custom_filename} |
|
|
|
|
| @app.get("/api/download/{file_id}") |
| async def download_file( |
| file_id: int, |
| token: str = Depends(oauth2_scheme), |
| db: Session = Depends(get_db) |
| ): |
| |
| email = token.replace("fake-jwt-token-for-", "") |
| user = get_user_by_email(db, email=email) |
| if not user: |
| raise HTTPException(status_code=401, detail="Invalid token") |
|
|
| |
| file_record = db.query(File).filter(File.id == file_id, File.user_id == user.id).first() |
| if not file_record: |
| raise HTTPException(status_code=404, detail="File not found") |
|
|
| file_path = os.path.join(UPLOAD_DIR, file_record.filename) |
| if not os.path.exists(file_path): |
| raise HTTPException(status_code=404, detail="File is missing on the server") |
|
|
| return FileResponse(path=file_path, filename=file_record.filename) |
|
|
|
|
| |
| class FileMetaResponse(BaseModel): |
| id: int |
| filename: str |
| file_type: str |
| upload_timestamp: Optional[datetime] = None |
|
|
| class Config: |
| from_attributes = True |
|
|
|
|
| @app.get("/api/files", response_model=List[FileMetaResponse]) |
| async def list_files( |
| token: str = Depends(oauth2_scheme), |
| db: Session = Depends(get_db) |
| ): |
| """List all files belonging to the authenticated user.""" |
| email = token.replace("fake-jwt-token-for-", "") |
| user = get_user_by_email(db, email=email) |
| if not user: |
| raise HTTPException(status_code=401, detail="Invalid token") |
|
|
| files = db.query(File).filter(File.user_id == user.id).all() |
| return files |
|
|
|
|
| @app.put("/api/files/{file_id}") |
| async def update_file( |
| file_id: int, |
| file: UploadFile = FastAPIFile(...), |
| token: str = Depends(oauth2_scheme), |
| db: Session = Depends(get_db) |
| ): |
| """Replace an existing file with a new upload.""" |
| email = token.replace("fake-jwt-token-for-", "") |
| user = get_user_by_email(db, email=email) |
| if not user: |
| raise HTTPException(status_code=401, detail="Invalid token") |
|
|
| file_record = db.query(File).filter(File.id == file_id, File.user_id == user.id).first() |
| if not file_record: |
| raise HTTPException(status_code=404, detail="File not found") |
|
|
| |
| file_ext = os.path.splitext(file.filename)[1].lower() |
| if file_ext not in ALLOWED_EXTENSIONS: |
| raise HTTPException(status_code=400, detail=f"Invalid file type. Allowed: {', '.join(ALLOWED_EXTENSIONS)}") |
|
|
| |
| old_path = os.path.join(UPLOAD_DIR, file_record.filename) |
| if os.path.exists(old_path): |
| os.remove(old_path) |
|
|
| |
| custom_filename = f"{user.id}_{user.name.replace(' ', '_')}_{file.filename}" |
| new_path = os.path.join(UPLOAD_DIR, custom_filename) |
| with open(new_path, "wb") as buffer: |
| shutil.copyfileobj(file.file, buffer) |
|
|
| |
| file_record.filename = custom_filename |
| file_record.file_type = file_ext |
| db.commit() |
| db.refresh(file_record) |
|
|
| return {"message": "File updated successfully", "file_id": file_record.id, "filename": custom_filename} |
|
|
|
|
| @app.delete("/api/files/{file_id}") |
| async def delete_file( |
| file_id: int, |
| token: str = Depends(oauth2_scheme), |
| db: Session = Depends(get_db) |
| ): |
| """Delete a file record and its physical file.""" |
| email = token.replace("fake-jwt-token-for-", "") |
| user = get_user_by_email(db, email=email) |
| if not user: |
| raise HTTPException(status_code=401, detail="Invalid token") |
|
|
| file_record = db.query(File).filter(File.id == file_id, File.user_id == user.id).first() |
| if not file_record: |
| raise HTTPException(status_code=404, detail="File not found") |
|
|
| |
| file_path = os.path.join(UPLOAD_DIR, file_record.filename) |
| if os.path.exists(file_path): |
| os.remove(file_path) |
|
|
| db.delete(file_record) |
| db.commit() |
|
|
| return {"message": "File deleted successfully"} |
|
|
|
|
| |
| |
| |
|
|
| def _get_current_user_from_cookie(request: Request, db: Session): |
| """Read the auth token from a cookie and return the User or None.""" |
| token = request.cookies.get("access_token", "") |
| if not token: |
| return None |
| email = token.replace("fake-jwt-token-for-", "") |
| return get_user_by_email(db, email=email) |
|
|
|
|
| @app.get("/") |
| def root_redirect(): |
| return RedirectResponse(url="/login", status_code=302) |
|
|
|
|
| |
| @app.get("/signup") |
| def page_signup(request: Request): |
| return templates.TemplateResponse("signup.html", {"request": request}) |
|
|
|
|
| @app.post("/signup") |
| def page_signup_post( |
| request: Request, |
| name: str = Form(...), |
| age: int = Form(...), |
| address: str = Form(...), |
| email: str = Form(...), |
| mobile_number: str = Form(...), |
| password: str = Form(...), |
| db: Session = Depends(get_db), |
| ): |
| existing = get_user_by_email(db, email=email) |
| if existing: |
| return templates.TemplateResponse("signup.html", { |
| "request": request, |
| "message": "Email already registered", |
| "msg_type": "error", |
| }) |
|
|
| new_user = User( |
| name=name, age=age, address=address, |
| email=email, mobile_number=mobile_number, |
| password_hash=get_password_hash(password), |
| ) |
| db.add(new_user) |
| db.commit() |
|
|
| response = RedirectResponse(url="/login", status_code=302) |
| return response |
|
|
|
|
| |
| @app.get("/login") |
| def page_login(request: Request): |
| return templates.TemplateResponse("login.html", {"request": request}) |
|
|
|
|
| @app.post("/login") |
| def page_login_post( |
| request: Request, |
| email: str = Form(...), |
| password: str = Form(...), |
| db: Session = Depends(get_db), |
| ): |
| user = get_user_by_email(db, email=email) |
| if not user or not verify_password(password, user.password_hash): |
| return templates.TemplateResponse("login.html", { |
| "request": request, |
| "message": "Invalid email or password", |
| "msg_type": "error", |
| }) |
|
|
| token = f"fake-jwt-token-for-{user.email}" |
| response = RedirectResponse(url="/dashboard", status_code=302) |
| response.set_cookie(key="access_token", value=token, httponly=True) |
| return response |
|
|
|
|
| |
| @app.get("/dashboard") |
| def page_dashboard(request: Request, db: Session = Depends(get_db)): |
| user = _get_current_user_from_cookie(request, db) |
| if not user: |
| return RedirectResponse(url="/login", status_code=302) |
|
|
| files = db.query(File).filter(File.user_id == user.id).all() |
| return templates.TemplateResponse("dashboard.html", { |
| "request": request, |
| "user": user, |
| "files": files, |
| }) |
|
|
|
|
| |
| @app.post("/upload") |
| async def page_upload( |
| request: Request, |
| file1: UploadFile = FastAPIFile(...), |
| file2: Optional[UploadFile] = FastAPIFile(None), |
| db: Session = Depends(get_db), |
| ): |
| user = _get_current_user_from_cookie(request, db) |
| if not user: |
| return RedirectResponse(url="/login", status_code=302) |
|
|
| uploaded = 0 |
| for f in [file1, file2]: |
| if f is None or f.filename == "": |
| continue |
| file_ext = os.path.splitext(f.filename)[1].lower() |
| if file_ext not in ALLOWED_EXTENSIONS: |
| continue |
| custom_filename = f"{user.id}_{user.name.replace(' ', '_')}_{f.filename}" |
| dest = os.path.join(UPLOAD_DIR, custom_filename) |
| with open(dest, "wb") as buf: |
| shutil.copyfileobj(f.file, buf) |
| db_file = File(filename=custom_filename, file_type=file_ext, user_id=user.id) |
| db.add(db_file) |
| uploaded += 1 |
|
|
| db.commit() |
| return RedirectResponse(url="/dashboard", status_code=302) |
|
|
|
|
| |
| @app.get("/download/{file_id}") |
| async def page_download(file_id: int, request: Request, db: Session = Depends(get_db)): |
| user = _get_current_user_from_cookie(request, db) |
| if not user: |
| return RedirectResponse(url="/login", status_code=302) |
|
|
| file_record = db.query(File).filter(File.id == file_id, File.user_id == user.id).first() |
| if not file_record: |
| return RedirectResponse(url="/dashboard", status_code=302) |
|
|
| file_path = os.path.join(UPLOAD_DIR, file_record.filename) |
| if not os.path.exists(file_path): |
| return RedirectResponse(url="/dashboard", status_code=302) |
|
|
| from fastapi.responses import FileResponse as FR |
| return FR(path=file_path, filename=file_record.filename) |
|
|
|
|
| |
| @app.get("/delete/{file_id}") |
| def page_delete(file_id: int, request: Request, db: Session = Depends(get_db)): |
| user = _get_current_user_from_cookie(request, db) |
| if not user: |
| return RedirectResponse(url="/login", status_code=302) |
|
|
| file_record = db.query(File).filter(File.id == file_id, File.user_id == user.id).first() |
| if file_record: |
| file_path = os.path.join(UPLOAD_DIR, file_record.filename) |
| if os.path.exists(file_path): |
| os.remove(file_path) |
| db.delete(file_record) |
| db.commit() |
|
|
| return RedirectResponse(url="/dashboard", status_code=302) |
|
|
|
|
| |
| @app.get("/logout") |
| def page_logout(): |
| response = RedirectResponse(url="/login", status_code=302) |
| response.delete_cookie("access_token") |
| return response |
|
|