Delphi-be / app /main.py
shubhendu-ghosh's picture
Create app/main.py
f7cca2f verified
from fastapi import FastAPI, Depends, HTTPException, status, Query, Path
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from .schemas import *
from .crud import *
from .auth import decode_access_token
from jose import JWTError
from typing import Dict
from fastapi.middleware.cors import CORSMiddleware
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/")
def root():
return {"message": "API is running 🚀"}
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# dependency: get current user from JWT token
def get_current_user(token: str = Depends(oauth2_scheme)) -> Dict:
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = decode_access_token(token)
username: str = payload.get("sub")
if username is None:
raise credentials_exception
except JWTError:
raise credentials_exception
user = get_user_by_username(username)
if not user:
raise credentials_exception
return user # raw dict from DB (contains password hash) — avoid returning to client
# Signup (create user)
@app.post("/signup", status_code=201)
def signup(payload: UserSignup):
created = create_user(payload.username, payload.password)
if not created:
raise HTTPException(status_code=400, detail="Username already exists")
return {"message": "User created successfully", "username": created["username"]}
@app.post("/token", response_model=Token)
def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
auth = authenticate_user(form_data.username, form_data.password)
if not auth:
raise HTTPException(status_code=401, detail="Incorrect username or password")
return auth
# Create post (protected)
@app.post("/post")
def add_post(post: PostCreate, user=Depends(get_current_user)):
username = user["username"]
post_id = create_post(username, post.content, post.type.value)
return {"post_id": post_id}
# Scroll posts (public)
@app.get("/posts")
def read_posts(limit: int = 20, skip: int = 0):
return get_posts(limit, skip)
# Like post (protected)
@app.post("/like/{post_id}")
def like(post_id: str, user=Depends(get_current_user)):
username = user["username"]
like_post(post_id, username)
return {"message": "Liked"}
# Comment on post (protected)
@app.post("/comment")
def comment(comment: CommentCreate, user=Depends(get_current_user)):
username = user["username"]
comment_post(comment.post_id, username, comment.content)
return {"message": "Comment added"}
# Update post (protected, owner-only)
@app.put("/post/{post_id}")
def edit_post(post_id: str, payload: PostCreate, user=Depends(get_current_user)):
username = user["username"]
ok = update_post(post_id, username, payload.content)
if not ok:
raise HTTPException(status_code=403, detail="Not allowed to edit this post")
return {"message": "Post updated"}
# Delete post (protected, owner-only)
@app.delete("/post/{post_id}")
def remove_post(post_id: str, user=Depends(get_current_user)):
username = user["username"]
ok = delete_post(post_id, username)
if not ok:
raise HTTPException(status_code=403, detail="Not allowed to delete this post")
return {"message": "Post deleted"}
@app.get("/my-posts")
def read_my_posts(
current_user: dict = Depends(get_current_user)):
posts = get_user_posts(current_user["username"])
return posts
@app.patch("/user/me")
def update_me(payload: UserUpdate, user=Depends(get_current_user)):
try:
ok = update_user_profile(
current_username=user["username"],
new_username=payload.username,
new_password=payload.password
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
if not ok:
raise HTTPException(status_code=404, detail="User not found")
return {"message": "Profile updated", "username": payload.username or user["username"]}
@app.delete("/user/me")
def delete_me(user=Depends(get_current_user)):
delete_account_and_posts(user["username"])
return {"message": "Account and posts deleted"}
# ---------- POSTS ----------
@app.post("/posts/{post_id}/report")
def report(post_id: str = Path(...), payload: ReportCreate = None, user=Depends(get_current_user)):
report_post(post_id, user["username"], (payload.reason if payload else None))
return {"message": "Post reported"}
@app.get("/posts/{post_id}")
def read_single_post(post_id: str):
doc = get_post_by_id(post_id)
if not doc:
raise HTTPException(status_code=404, detail="Post not found")
return doc
@app.delete("/posts/{post_id}/like")
def unlike(post_id: str, user=Depends(get_current_user)):
unlike_post(post_id, user["username"])
return {"message": "Unliked"}
# ---------- COMMENTS ----------
@app.delete("/comments/{post_id}/{comment_id}")
def remove_comment(
post_id: str,
comment_id: str,
user=Depends(get_current_user)
):
ok = delete_comment(post_id, comment_id, user["username"])
if not ok:
raise HTTPException(status_code=403, detail="Not allowed or not found")
return {"message": "Comment deleted"}
# ---------- DISCOVERY ----------
@app.get("/posts-search")
def search(
query: str = Query(..., min_length=1),
type: Optional[PostType] = Query(None),
skip: int = 0,
limit: int = 20
):
docs = search_posts(query, type.value if type else None, limit, skip)
return docs
@app.get("/posts-trending")
def trending(limit: int = 20, hours: int = 24):
return trending_posts(hours=hours, limit=limit)