from fastapi import FastAPI, HTTPException, Depends, Request, status, Form, WebSocket, WebSocketDisconnect from fastapi.middleware.cors import CORSMiddleware from fastapi.security import OAuth2PasswordRequestForm from jose import JWTError, jwt from passlib.context import CryptContext from datetime import datetime, timedelta from pymongo import MongoClient from bson import ObjectId from fastapi.responses import JSONResponse from dotenv import load_dotenv import os from starlette.websockets import WebSocketState from jose import JWTError import httpx load_dotenv() class ConnectionManager: def __init__(self): self.active_connections: list[WebSocket] = [] async def connect(self, websocket: WebSocket): await websocket.accept() self.active_connections.append(websocket) def disconnect(self, websocket: WebSocket): self.active_connections.remove(websocket) async def broadcast(self, message: dict): for connection in self.active_connections: try: await connection.send_json(message) except Exception: self.active_connections.remove(connection) manager = ConnectionManager() # Constants SECRET_KEY = os.environ.get("SECRET_KEY") ALGORITHM = os.environ.get("ALGORITHM") ACCESS_TOKEN_EXPIRE_MINUTES = 60 # Admin credentials fake_admin_db = { "admin": { "username": os.environ.get("ADMIN_USERNAME"), "hashed_password": os.environ.get("ADMIN_PASSWORD") } } pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") client = MongoClient(os.environ.get("MONGODB_URI")) db = client["thehexatechdb"] collection = db["quotationsdb"] app = FastAPI() # CORS origins = ["*"] app.add_middleware( CORSMiddleware, allow_origins=origins, allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Auth Utilities def verify_password(plain, hashed): return pwd_context.verify(plain, hashed) def get_password_hash(password): return pwd_context.hash(password) def authenticate_user(username: str, password: str): user = fake_admin_db.get(username) if not user or not verify_password(password, user["hashed_password"]): return False return {"username": username} def create_access_token(data: dict, expires_delta=None): to_encode = data.copy() expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) to_encode.update({"exp": expire}) return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) def get_current_user(token: str = Depends(OAuth2PasswordRequestForm)): credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token.password, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") if username is None: raise credentials_exception return {"username": username} except JWTError: raise credentials_exception # Routes @app.post("/api/submit") async def submit_query(name: str = Form(...), email: str = Form(...), message: str = Form(...)): query = {"name": name, "email": email, "message": message, "created_at": datetime.utcnow()} result = collection.insert_one(query) DISCORD_WEBHOOK_URL = os.environ.get("DISCORD_WEBHOOK_URL") # Add to .env async def send_discord_notification(data: dict): content = ( f"📩 **New Quotation Received**\n" f"👤 **Name:** {data['name']}\n" f"📧 **Email:** {data['email']}\n" f"💬 **Message:** {data['message']}\n" f"🕒 **Time:** {datetime.utcnow().strftime('%Y-%m-%d %H:%M UTC')}" ) async with httpx.AsyncClient() as client: await client.post(DISCORD_WEBHOOK_URL, json={"content": content}) query["_id"] = str(result.inserted_id) total_count = collection.count_documents({}) try: await send_discord_notification(query) except Exception as e: print("[Discord Notification Failed]", e) await manager.broadcast({ "event": "new_quote", "data": { "name": name, "email": email, "message": message, "total_count": total_count } }) return JSONResponse(content={"id": query["_id"], "notify": True}) @app.post("/api/login") async def login(form_data: OAuth2PasswordRequestForm = Depends()): user = authenticate_user(form_data.username, form_data.password) if not user: raise HTTPException(status_code=401, detail="Invalid credentials") token = create_access_token({"sub": user["username"], "role": "admin"}) return {"access_token": token, "token_type": "bearer"} @app.get("/api/queries") async def get_queries(token: str): try: jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) queries = list(collection.find()) for q in queries: q["_id"] = str(q["_id"]) return queries except JWTError: raise HTTPException(status_code=401, detail="Invalid token") @app.delete("/api/queries/{query_id}") async def delete_query(query_id: str, token: str): try: jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) result = collection.delete_one({"_id": ObjectId(query_id)}) return {"deleted": result.deleted_count == 1} except JWTError: raise HTTPException(status_code=401, detail="Invalid token") @app.websocket("/ws/notifications") async def websocket_endpoint(websocket: WebSocket): token = websocket.query_params.get("token") try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username = payload.get("sub") role = payload.get("role") if not username or role != "admin": await websocket.close(code=1008) return except JWTError: await websocket.close(code=1008) return await manager.connect(websocket) print(f"[WS CONNECT] {username} (admin) connected at {datetime.utcnow()} from {websocket.client.host}") try: while True: await websocket.receive_text() except WebSocketDisconnect: print(f"[WS DISCONNECT] {username} disconnected at {datetime.utcnow()}") manager.disconnect(websocket)