Spaces:
Sleeping
Sleeping
| import os | |
| import sys | |
| import json | |
| import logging | |
| import uvicorn | |
| from typing import Any, Dict, Optional | |
| from fastapi import FastAPI, HTTPException, Depends | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel | |
| from starlette.concurrency import run_in_threadpool | |
| # Γ’ββ¬Γ’ββ¬ Logging Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬ | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s [%(levelname)s] %(message)s", | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # Ensure Python can find project modules when running this file directly | |
| sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) | |
| from api.auth import ( | |
| hash_password, | |
| verify_password, | |
| create_access_token, | |
| get_current_user, | |
| require_current_user, | |
| ) | |
| from db.database import ( | |
| init_db, | |
| create_user, | |
| get_user_by_email, | |
| get_user_by_id, | |
| get_user_by_username, | |
| update_username, | |
| update_user_password, | |
| delete_user, | |
| set_verification_code, | |
| verify_email_code, | |
| is_email_verified, | |
| save_check_history, | |
| get_user_history, | |
| get_article_count, | |
| ) | |
| from api.email_utils import generate_verification_code, send_verification_email, send_password_reset_email | |
| from datetime import datetime, timedelta | |
| # Γ’ββ¬Γ’ββ¬ Lazy FactChecker singleton Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬ | |
| _fact_checker_instance = None | |
| def get_fact_checker(): | |
| """ | |
| Lazily instantiate the FactChecker and reuse it across requests. | |
| Retries on every call until successful. | |
| """ | |
| global _fact_checker_instance | |
| if _fact_checker_instance is not None: | |
| return _fact_checker_instance | |
| try: | |
| logger.info("Initializing FactChecker (models + DB) Γ’β¬Β¦") | |
| from checker.fact_checker import FactChecker | |
| _fact_checker_instance = FactChecker() | |
| logger.info("FactChecker ready Γ’Εβ") | |
| return _fact_checker_instance | |
| except Exception as exc: | |
| logger.error("FactChecker failed to initialize: %s", exc, exc_info=True) | |
| raise | |
| # Γ’ββ¬Γ’ββ¬ Request / Response models Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬ | |
| class CheckRequest(BaseModel): | |
| text: str | |
| headline: Optional[str] = None | |
| class CheckURLRequest(BaseModel): | |
| url: str | |
| class RegisterRequest(BaseModel): | |
| email: str | |
| username: str | |
| password: str | |
| class LoginRequest(BaseModel): | |
| username: str | |
| password: str | |
| class UpdateProfileRequest(BaseModel): | |
| username: str | |
| class ChangePasswordRequest(BaseModel): | |
| current_password: str | |
| new_password: str | |
| class DeleteAccountRequest(BaseModel): | |
| password: str | |
| class CheckUsernameRequest(BaseModel): | |
| username: str | |
| class ForgotPasswordRequest(BaseModel): | |
| identity: str | |
| class ResetPasswordRequest(BaseModel): | |
| identity: str | |
| code: str | |
| new_password: str | |
| class CheckEmailRequest(BaseModel): | |
| email: str | |
| class VerifyEmailRequest(BaseModel): | |
| code: str | |
| class ResendVerificationRequest(BaseModel): | |
| pass | |
| class ContactRequest(BaseModel): | |
| subject: str | |
| message: str | |
| sender_name: Optional[str] = None | |
| # Γ’ββ¬Γ’ββ¬ App Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬ | |
| app = FastAPI( | |
| title="FactCheckThesis API", | |
| description="HTTP API wrapper around the FactChecker engine for mobile and web clients.", | |
| version="1.0.0", | |
| ) | |
| # Allow all origins for now so the Flutter app and any dev tools can call this API. | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| async def on_startup(): | |
| """Initialize DB tables and log that the server is up.""" | |
| try: | |
| init_db() | |
| logger.info("Database tables initialized Γ’Εβ") | |
| except Exception as exc: | |
| logger.warning("Could not initialize DB tables: %s", exc) | |
| # One-time migration: mark pre-existing accounts as email verified | |
| try: | |
| from db.database import get_connection | |
| conn = get_connection() | |
| cursor = conn.cursor() | |
| cursor.execute( | |
| "UPDATE users SET email_verified = TRUE " | |
| "WHERE verification_code IS NULL AND email_verified = FALSE" | |
| ) | |
| affected = cursor.rowcount | |
| conn.commit() | |
| conn.close() | |
| if affected > 0: | |
| logger.info("Migration: marked %d pre-existing account(s) as email_verified", affected) | |
| except Exception as exc: | |
| logger.warning("Migration (email_verified) skipped: %s", exc) | |
| port = os.environ.get("PORT", "10000") | |
| logger.info("Γ°ΕΈΕ‘β¬ FastAPI server is UP on port %s", port) | |
| # Γ’ββ¬Γ’ββ¬ Health / root Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬ | |
| async def root() -> Dict[str, str]: | |
| return {"status": "ok"} | |
| async def health() -> Dict[str, str]: | |
| """Lightweight health-check for deployment (always 200, even before models load).""" | |
| return {"status": "healthy"} | |
| # Γ’ββ¬Γ’ββ¬ Auth endpoints Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬ | |
| async def register(payload: RegisterRequest) -> Dict[str, Any]: | |
| """ | |
| Create a new user account. | |
| Generates a verification code and sends it to the user's email. | |
| Returns the user info + an access token on success. | |
| """ | |
| if len(payload.password) < 6: | |
| raise HTTPException(status_code=400, detail="Password must be at least 6 characters.") | |
| if not payload.email or not payload.username: | |
| raise HTTPException(status_code=400, detail="Email and username are required.") | |
| # Check email and username uniqueness first | |
| existing_email = get_user_by_email(payload.email.lower().strip()) | |
| if existing_email is not None: | |
| if existing_email.get("email_verified") is False: | |
| delete_user(existing_email["id"]) | |
| else: | |
| raise HTTPException(status_code=409, detail="Email is already used.") | |
| existing_username = get_user_by_username(payload.username.strip()) | |
| if existing_username is not None: | |
| if existing_username.get("email_verified") is False: | |
| delete_user(existing_username["id"]) | |
| else: | |
| raise HTTPException(status_code=409, detail="This username is already taken. Please choose another.") | |
| hashed = hash_password(payload.password) | |
| user = create_user(payload.email.lower().strip(), payload.username.strip(), hashed) | |
| if user is None: | |
| raise HTTPException(status_code=409, detail="Registration failed due to a database constraint.") | |
| # Generate and send verification code | |
| code = generate_verification_code() | |
| expires_at = datetime.now() + timedelta(minutes=10) | |
| set_verification_code(user["id"], code, expires_at) | |
| email_sent = send_verification_email( | |
| to_email=payload.email.lower().strip(), | |
| code=code, | |
| username=payload.username.strip(), | |
| ) | |
| token = create_access_token(user["id"], user["email"]) | |
| return { | |
| "token": token, | |
| "user": { | |
| "id": user["id"], | |
| "email": user["email"], | |
| "username": user["username"], | |
| "email_verified": False, | |
| }, | |
| "verification_email_sent": email_sent, | |
| } | |
| async def login(payload: LoginRequest) -> Dict[str, Any]: | |
| """ | |
| Authenticate a user with username (or email) + password. | |
| Returns user info + access token on success. | |
| Includes email_verified status so the app can redirect unverified users. | |
| """ | |
| try: | |
| identity = payload.username.strip() | |
| if "@" in identity: | |
| user = get_user_by_email(identity.lower()) | |
| else: | |
| user = get_user_by_username(identity) | |
| # Enforce exact case match for usernames | |
| if user and user["username"] != identity: | |
| user = None | |
| if user is None or not verify_password(payload.password, user["password_hash"]): | |
| raise HTTPException(status_code=401, detail="Invalid credentials.") | |
| verified = is_email_verified(user["id"]) | |
| token = create_access_token(user["id"], user["email"]) | |
| return { | |
| "token": token, | |
| "user": { | |
| "id": user["id"], | |
| "email": user["email"], | |
| "username": user["username"], | |
| "email_verified": verified, | |
| }, | |
| } | |
| except HTTPException: | |
| raise | |
| except Exception as exc: | |
| logger.error("Login error for %s: %s", payload.username, exc, exc_info=True) | |
| raise HTTPException(status_code=500, detail=f"Internal login error: {str(exc)}") | |
| async def me(current_user: dict = Depends(require_current_user)) -> Dict[str, Any]: | |
| """Return the profile of the currently authenticated user.""" | |
| user = get_user_by_id(int(current_user["sub"])) | |
| if user is None: | |
| raise HTTPException(status_code=404, detail="User not found.") | |
| # Convert datetime to string for JSON | |
| user["created_at"] = str(user["created_at"]) | |
| return {"user": user} | |
| # Γ’ββ¬Γ’ββ¬ Email verification endpoints Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬ | |
| async def verify_email( | |
| payload: VerifyEmailRequest, | |
| current_user: dict = Depends(require_current_user), | |
| ) -> Dict[str, Any]: | |
| """Verify the user's email with a 6-digit code.""" | |
| user_id = int(current_user["sub"]) | |
| code = payload.code.strip() | |
| if not code or len(code) != 6: | |
| raise HTTPException(status_code=400, detail="Please enter a valid 6-digit code.") | |
| result = verify_email_code(user_id, code) | |
| if result == "ok": | |
| return {"success": True, "message": "Email verified successfully!"} | |
| elif result == "expired": | |
| raise HTTPException(status_code=410, detail="Verification code has expired. Please request a new one.") | |
| else: | |
| raise HTTPException(status_code=400, detail="Invalid verification code. Please try again.") | |
| async def resend_verification( | |
| current_user: dict = Depends(require_current_user), | |
| ) -> Dict[str, Any]: | |
| """Resend the verification code email.""" | |
| user_id = int(current_user["sub"]) | |
| user = get_user_by_id(user_id) | |
| if user is None: | |
| raise HTTPException(status_code=404, detail="User not found.") | |
| if user.get("email_verified"): | |
| return {"success": True, "message": "Email is already verified."} | |
| code = generate_verification_code() | |
| expires_at = datetime.now() + timedelta(minutes=10) | |
| set_verification_code(user_id, code, expires_at) | |
| email_sent = send_verification_email( | |
| to_email=user["email"], | |
| code=code, | |
| username=user["username"], | |
| ) | |
| if email_sent: | |
| return {"success": True, "message": "Verification code sent to your email."} | |
| else: | |
| raise HTTPException( | |
| status_code=500, | |
| detail="Could not send verification email. Please check that your email address is correct and try again.", | |
| ) | |
| # Γ’ββ¬Γ’ββ¬ History endpoint Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬ | |
| async def history(current_user: dict = Depends(require_current_user)) -> Dict[str, Any]: | |
| """Return the fact-check history for the authenticated user.""" | |
| user_id = int(current_user["sub"]) | |
| rows = get_user_history(user_id) | |
| # Convert datetime objects to strings | |
| for row in rows: | |
| row["checked_at"] = str(row["checked_at"]) | |
| return {"history": rows} | |
| # Γ’ββ¬Γ’ββ¬ Check username availability Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬ | |
| async def check_username(payload: CheckUsernameRequest) -> Dict[str, Any]: | |
| """Check if a username is available.""" | |
| username = payload.username.strip() | |
| if not username or len(username) < 3: | |
| return {"available": False, "reason": "Username must be at least 3 characters."} | |
| if len(username) > 30: | |
| return {"available": False, "reason": "Username must be at most 30 characters."} | |
| existing = get_user_by_username(username) | |
| if existing is not None: | |
| return {"available": False, "reason": "This username is already taken."} | |
| return {"available": True} | |
| # ββ Check email availability βββββββββββββββββββββββββββββββββββββββ | |
| async def check_email(payload: CheckEmailRequest) -> Dict[str, Any]: | |
| """Check if an email is available.""" | |
| email = payload.email.lower().strip() | |
| if not email or "@" not in email: | |
| return {"available": False, "reason": "Please enter a valid email address."} | |
| existing = get_user_by_email(email) | |
| if existing is not None: | |
| return {"available": False, "reason": "Email is already used."} | |
| return {"available": True} | |
| # Γ’ββ¬Γ’ββ¬ Update profile (username) Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬ | |
| async def update_profile( | |
| payload: UpdateProfileRequest, | |
| current_user: dict = Depends(require_current_user), | |
| ) -> Dict[str, Any]: | |
| """Update the current user's username.""" | |
| username = payload.username.strip() | |
| if not username or len(username) < 3: | |
| raise HTTPException(status_code=400, detail="Username must be at least 3 characters.") | |
| if len(username) > 30: | |
| raise HTTPException(status_code=400, detail="Username must be at most 30 characters.") | |
| user_id = int(current_user["sub"]) | |
| updated = update_username(user_id, username) | |
| if updated is None: | |
| raise HTTPException(status_code=409, detail="This username is already taken.") | |
| return { | |
| "success": True, | |
| "user": { | |
| "id": updated["id"], | |
| "email": updated["email"], | |
| "username": updated["username"], | |
| }, | |
| } | |
| # Γ’ββ¬Γ’ββ¬ Change password Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬ | |
| async def change_password( | |
| payload: ChangePasswordRequest, | |
| current_user: dict = Depends(require_current_user), | |
| ) -> Dict[str, Any]: | |
| """Change the current user's password.""" | |
| user_id = int(current_user["sub"]) | |
| user = get_user_by_id(user_id) | |
| if user is None: | |
| raise HTTPException(status_code=404, detail="User not found.") | |
| # Verify current password | |
| full_user = get_user_by_email(user["email"]) | |
| if full_user is None or not verify_password(payload.current_password, full_user["password_hash"]): | |
| raise HTTPException(status_code=401, detail="Current password is incorrect.") | |
| # Validate new password strength | |
| new_pw = payload.new_password | |
| if len(new_pw) < 8: | |
| raise HTTPException(status_code=400, detail="New password must be at least 8 characters.") | |
| new_hash = hash_password(new_pw) | |
| success = update_user_password(user_id, new_hash) | |
| if not success: | |
| raise HTTPException(status_code=500, detail="Failed to update password.") | |
| return {"success": True, "message": "Password updated successfully."} | |
| # ββ Forgot / Reset password ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def mask_email(email: str) -> str: | |
| parts = email.split('@') | |
| if len(parts) != 2: return email | |
| name, domain = parts | |
| if len(name) <= 2: | |
| masked_name = name[0] + "*" * len(name) | |
| else: | |
| masked_name = name[0] + "*" * (len(name) - 2) + name[-1] | |
| return f"{masked_name}@{domain}" | |
| async def forgot_password(payload: ForgotPasswordRequest) -> Dict[str, Any]: | |
| """Initiates the password reset flow.""" | |
| identity = payload.identity.strip() | |
| user = get_user_by_email(identity.lower()) | |
| if not user: | |
| user = get_user_by_username(identity) | |
| if user and user["username"] != identity: | |
| user = None | |
| if not user: | |
| raise HTTPException(status_code=400, detail="Account does not exist.") | |
| code = generate_verification_code() | |
| expires_at = datetime.utcnow() + timedelta(minutes=10) | |
| if not set_verification_code(user["id"], code, expires_at): | |
| raise HTTPException(status_code=500, detail="Failed to save reset code") | |
| # Fire-and-forget: email is sent in background thread via Resend/SMTP | |
| send_password_reset_email(user["email"], code, user["username"]) | |
| masked = mask_email(user["email"]) | |
| return {"message": "If that account exists, a reset code has been sent.", "email": masked} | |
| async def reset_password(payload: ResetPasswordRequest) -> Dict[str, Any]: | |
| """Verifies the code and resets the password.""" | |
| identity = payload.identity.strip() | |
| user = get_user_by_email(identity.lower()) | |
| if not user: | |
| user = get_user_by_username(identity) | |
| if user and user["username"] != identity: | |
| user = None | |
| if not user: | |
| raise HTTPException(status_code=400, detail="Invalid request. Account not found.") | |
| status = verify_email_code(user["id"], payload.code.strip()) | |
| if status != 'ok': | |
| raise HTTPException(status_code=400, detail="Invalid or expired reset code.") | |
| if len(payload.new_password) < 8: | |
| raise HTTPException(status_code=400, detail="New password must be at least 8 characters.") | |
| from api.auth import hash_password | |
| hashed_pw = hash_password(payload.new_password) | |
| success = update_user_password(user["id"], hashed_pw) | |
| if not success: | |
| raise HTTPException(status_code=500, detail="Failed to reset password.") | |
| return {"message": "Password reset successfully."} | |
| # Γ’ββ¬Γ’ββ¬ Delete account Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬ | |
| async def delete_account( | |
| payload: DeleteAccountRequest, | |
| current_user: dict = Depends(require_current_user), | |
| ) -> Dict[str, Any]: | |
| """Permanently delete the current user's account and all data.""" | |
| user_id = int(current_user["sub"]) | |
| user = get_user_by_id(user_id) | |
| if user is None: | |
| raise HTTPException(status_code=404, detail="User not found.") | |
| # Verify password before deletion | |
| full_user = get_user_by_email(user["email"]) | |
| if full_user is None or not verify_password(payload.password, full_user["password_hash"]): | |
| raise HTTPException(status_code=401, detail="Incorrect password. Account was not deleted.") | |
| success = delete_user(user_id) | |
| if not success: | |
| raise HTTPException(status_code=500, detail="Failed to delete account.") | |
| return {"success": True, "message": "Account deleted successfully."} | |
| # Γ’ββ¬Γ’ββ¬ Fact-check endpoint Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬ | |
| class AnalyzeStructureRequest(BaseModel): | |
| text: str | |
| async def api_analyze_structure(payload: AnalyzeStructureRequest) -> Dict[str, Any]: | |
| """ | |
| Exposes the structural (inverted pyramid) analysis algorithm | |
| to the frontend Live Writing Guide without running full ML/URL checks. | |
| """ | |
| if not payload.text or len(payload.text.strip()) < 30: | |
| return { | |
| "language": "unknown", | |
| "formalism_score": 0, | |
| "assessment": "Text too short for structure analysis.", | |
| "recommendations": [], | |
| } | |
| from checker.internal.structure_analyzer import analyze_structure | |
| try: | |
| res = analyze_structure(payload.text) | |
| return res | |
| except Exception as exc: | |
| logger.error("Error in analyze_structure: %s", exc) | |
| raise HTTPException( | |
| status_code=500, | |
| detail=f"Structure analysis failed: {exc}" | |
| ) | |
| async def check_article( | |
| payload: CheckRequest, | |
| current_user: Optional[dict] = Depends(get_current_user), | |
| ) -> Dict[str, Any]: | |
| """ | |
| Run the full fact-check pipeline on the provided text. | |
| If the user is authenticated (Bearer token), the result is saved to their history. | |
| Returns the same structured dict as checker.fact_checker.FactChecker.check. | |
| """ | |
| try: | |
| fc = get_fact_checker() | |
| except Exception as exc: | |
| raise HTTPException( | |
| status_code=503, | |
| detail=f"FactChecker is not available: {exc}", | |
| ) | |
| # Γ’ββ¬Γ’ββ¬ Input quality check: reject gibberish / random text Γ’ββ¬Γ’ββ¬ | |
| import re as _re | |
| raw_text = payload.text.strip() | |
| # Strip URLs before word-count check | |
| clean_input = _re.sub(r"https?://\S+", "", raw_text).strip() | |
| words = clean_input.split() | |
| if len(words) < 10: | |
| raise HTTPException( | |
| status_code=400, | |
| detail="Please provide at least 10 words for a meaningful analysis.", | |
| ) | |
| # Language detection Γ’β¬β reject unrecognizable text | |
| try: | |
| from langdetect import detect, DetectorFactory | |
| DetectorFactory.seed = 0 | |
| detected_lang = detect(clean_input) | |
| # Accept common languages; langdetect returns 'en', 'tl', 'fil', etc. | |
| # If it detects something but with very low confidence, it still returns a code. | |
| # The key insight: truly random gibberish often gets detected as an unlikely | |
| # language with low confidence. We can also check vowel ratio. | |
| except Exception: | |
| # langdetect throws LangDetectException for gibberish it can't classify | |
| raise HTTPException( | |
| status_code=400, | |
| detail="The text appears to be gibberish or random characters. Please provide a real article or claim.", | |
| ) | |
| # Vowel ratio check Γ’β¬β real text in most languages has ~30-50% vowels | |
| alpha_chars = [c.lower() for c in clean_input if c.isalpha()] | |
| if len(alpha_chars) > 20: | |
| vowel_count = sum(1 for c in alpha_chars if c in "aeiouAEIOU") | |
| vowel_ratio = vowel_count / len(alpha_chars) | |
| if vowel_ratio < 0.10: # Less than 10% vowels = likely gibberish | |
| raise HTTPException( | |
| status_code=400, | |
| detail="The text appears to be random characters. Please provide a real article or claim to verify.", | |
| ) | |
| try: | |
| result = await run_in_threadpool(fc.check, payload.text, headline=payload.headline) | |
| except Exception as e: | |
| import traceback | |
| error_info = traceback.format_exc() | |
| logger.error(f"Error in fc.check: {error_info}") | |
| raise HTTPException( | |
| status_code=500, | |
| detail=f"Internal Server Error during AI analysis: {error_info}" | |
| ) | |
| # Save to history if user is logged in | |
| if current_user is not None: | |
| try: | |
| user_id = int(current_user["sub"]) | |
| verdict = result.get("final_verdict", "UNKNOWN") | |
| save_check_history(user_id, payload.text, verdict, json.dumps(result)) | |
| except Exception as exc: | |
| logger.warning("Failed to save check history: %s", exc) | |
| return result | |
| # Γ’ββ¬Γ’ββ¬ Fact-check URL endpoint Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬ | |
| async def check_url( | |
| payload: CheckURLRequest, | |
| current_user: Optional[dict] = Depends(get_current_user), | |
| ) -> Dict[str, Any]: | |
| """ | |
| Extract article text from a URL, then run the full fact-check pipeline. | |
| Uses trafilatura to scrape the main content from the given URL. | |
| If the user is authenticated, the result is saved to their history. | |
| """ | |
| import trafilatura | |
| import re | |
| url = payload.url.strip() | |
| if not url: | |
| raise HTTPException(status_code=400, detail="URL is required.") | |
| # Γ’ββ¬Γ’ββ¬ Detect social media URLs that block scraping Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬ | |
| social_media_patterns = { | |
| r"(facebook\.com|fb\.com|fb\.watch)": "Facebook", | |
| r"instagram\.com": "Instagram", | |
| r"(tiktok\.com|vm\.tiktok\.com)": "TikTok", | |
| r"(twitter\.com|x\.com)": "X (Twitter)", | |
| r"threads\.net": "Threads", | |
| } | |
| url_lower = url.lower() | |
| for pattern, platform in social_media_patterns.items(): | |
| if re.search(pattern, url_lower): | |
| raise HTTPException( | |
| status_code=422, | |
| detail=( | |
| f"{platform} posts cannot be automatically extracted because " | |
| f"{platform} blocks external access to its content. " | |
| f"Instead, please copy the text from the {platform} post " | |
| f"and use the 'Proofread Article' feature to fact-check it." | |
| ), | |
| ) | |
| # Γ’ββ¬Γ’ββ¬ Fetch and extract article content Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬ | |
| try: | |
| downloaded = trafilatura.fetch_url(url) | |
| if downloaded is None: | |
| raise HTTPException( | |
| status_code=422, | |
| detail="Could not fetch the URL. The site may be blocking automated access. Please check the link or try copying the article text into the Proofread feature.", | |
| ) | |
| text = trafilatura.extract( | |
| downloaded, | |
| include_comments=False, | |
| include_tables=False, | |
| no_fallback=False, | |
| ) | |
| if not text or len(text.strip()) < 50: | |
| raise HTTPException( | |
| status_code=422, | |
| detail="Could not extract enough article text from this URL. The page may be paywalled, login-protected, or not a news article. Try copying the text into the Proofread feature instead.", | |
| ) | |
| # Try to get the title | |
| metadata = trafilatura.extract( | |
| downloaded, | |
| output_format="json", | |
| include_comments=False, | |
| include_tables=False, | |
| ) | |
| title = None | |
| if metadata: | |
| import json as json_mod | |
| try: | |
| meta_dict = json_mod.loads(metadata) | |
| title = meta_dict.get("title", None) | |
| except Exception: | |
| pass | |
| except HTTPException: | |
| raise | |
| except Exception as exc: | |
| logger.error("URL extraction failed for %s: %s", url, exc) | |
| raise HTTPException( | |
| status_code=422, | |
| detail=f"Failed to extract article content: {exc}", | |
| ) | |
| # Run fact-check on extracted text | |
| try: | |
| fc = get_fact_checker() | |
| except Exception as exc: | |
| raise HTTPException( | |
| status_code=503, | |
| detail=f"FactChecker is not available: {exc}", | |
| ) | |
| result = await run_in_threadpool(fc.check, text, headline=title) | |
| # Add source URL and title to result | |
| result["source_url"] = url | |
| result["extracted_title"] = title | |
| # Save to history if user is logged in | |
| if current_user is not None: | |
| try: | |
| user_id = int(current_user["sub"]) | |
| verdict = result.get("final_verdict", "UNKNOWN") | |
| history_text = f"[URL] {url}\n\n{text[:500]}" | |
| save_check_history(user_id, history_text, verdict, json.dumps(result)) | |
| except Exception as exc: | |
| logger.warning("Failed to save check history: %s", exc) | |
| return result | |
| # Γ’ββ¬Γ’ββ¬ Lightweight structure analysis (for live writing guide) Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬ | |
| class StructureAnalysisRequest(BaseModel): | |
| text: str | |
| headline: Optional[str] = None | |
| async def analyze_structure_endpoint( | |
| payload: StructureAnalysisRequest, | |
| ) -> Dict[str, Any]: | |
| """ | |
| Lightweight structure analysis Γ’β¬β regex only, no ML, no DB. | |
| Designed for real-time calls while the user is typing. | |
| Returns 5W+1H coverage, attribution, transitions, formalism score, and recommendations. | |
| """ | |
| raw_text = payload.text.strip() | |
| if len(raw_text) < 20: | |
| return { | |
| "formalism_score": 0, | |
| "lead_5w1h": {}, | |
| "has_attribution": False, | |
| "attribution_count": 0, | |
| "has_transitions": False, | |
| "transition_count": 0, | |
| "has_formal_markers": False, | |
| "assessment": "", | |
| "signals_found": [], | |
| "recommendations": [], | |
| } | |
| # Only analyze the body text β headline is excluded to avoid | |
| # inflating 5W+1H scores (headlines naturally contain WHO/WHAT) | |
| from checker.internal.structure_analyzer import analyze_structure | |
| result = analyze_structure(raw_text) | |
| return result | |
| # Γ’ββ¬Γ’ββ¬ Translation endpoint Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬ | |
| class TranslateRequest(BaseModel): | |
| text: str | |
| target_lang: str # 'en', 'fil', 'ceb' | |
| async def translate_text( | |
| payload: TranslateRequest, | |
| current_user: Optional[dict] = Depends(get_current_user), | |
| ) -> Dict[str, Any]: | |
| """ | |
| Translate article text to the specified target language. | |
| Supported target languages: 'en' (English), 'fil' (Filipino), 'ceb' (Cebuano). | |
| Uses Google Translate via deep-translator. | |
| """ | |
| text = payload.text.strip() | |
| target = payload.target_lang.strip().lower() | |
| if not text: | |
| raise HTTPException(status_code=400, detail="Text is required.") | |
| # Map our app's language codes to Google Translate codes | |
| lang_map = { | |
| "en": "en", | |
| "fil": "tl", # Google Translate uses 'tl' for Filipino/Tagalog | |
| "ceb": "ceb", # Google Translate supports Cebuano | |
| } | |
| target_code = lang_map.get(target) | |
| if target_code is None: | |
| raise HTTPException( | |
| status_code=400, | |
| detail=f"Unsupported target language '{target}'. Use 'en', 'fil', or 'ceb'.", | |
| ) | |
| try: | |
| from deep_translator import GoogleTranslator | |
| # Split text into chunks if > 5000 chars (Google Translate limit) | |
| max_chunk = 4500 | |
| if len(text) <= max_chunk: | |
| translated = GoogleTranslator(source="auto", target=target_code).translate(text) | |
| else: | |
| # Split by paragraphs and translate chunk by chunk | |
| paragraphs = text.split("\n") | |
| chunks = [] | |
| current_chunk = "" | |
| for para in paragraphs: | |
| if len(current_chunk) + len(para) + 1 > max_chunk: | |
| if current_chunk: | |
| chunks.append(current_chunk) | |
| current_chunk = para | |
| else: | |
| current_chunk = current_chunk + "\n" + para if current_chunk else para | |
| if current_chunk: | |
| chunks.append(current_chunk) | |
| translator = GoogleTranslator(source="auto", target=target_code) | |
| translated_chunks = [translator.translate(chunk) for chunk in chunks] | |
| translated = "\n".join(translated_chunks) | |
| # Detect source language | |
| detected_source = "unknown" | |
| try: | |
| from langdetect import detect, DetectorFactory | |
| DetectorFactory.seed = 0 | |
| detected_source = detect(text) | |
| except Exception: | |
| pass | |
| lang_names = {"en": "English", "fil": "Filipino", "ceb": "Cebuano"} | |
| return { | |
| "translated_text": translated, | |
| "target_language": target, | |
| "target_language_name": lang_names.get(target, target), | |
| "detected_source_language": detected_source, | |
| } | |
| except ImportError: | |
| raise HTTPException( | |
| status_code=503, | |
| detail="Translation service not available. deep-translator package is not installed.", | |
| ) | |
| except Exception as exc: | |
| logger.error("Translation failed: %s", exc, exc_info=True) | |
| raise HTTPException( | |
| status_code=500, | |
| detail=f"Translation failed: {exc}", | |
| ) | |
| # ββ Manual scrape trigger ββββββββββββββββββββββββββββββββββββββββββββ | |
| async def scrape_news() -> Dict[str, Any]: | |
| """ | |
| Manually trigger a news scrape to refresh the external articles database. | |
| Runs in the background and returns immediately. | |
| """ | |
| from checker.external.core import trigger_scrape_once | |
| trigger_scrape_once() | |
| total = get_article_count() | |
| return { | |
| "status": "scrape_started", | |
| "message": "Background news scraping triggered.", | |
| "current_article_count": total, | |
| } | |
| # ββ Article count endpoint βββββββββββββββββββββββββββββββββββββββββββ | |
| async def article_count() -> Dict[str, Any]: | |
| """Return the total number of scraped articles in the external database.""" | |
| count = get_article_count() | |
| return {"article_count": count} | |
| # ββ Contact Us endpoint ββββββββββββββββββββββββββββββββββββββββββββ | |
| async def contact_us( | |
| payload: ContactRequest, | |
| current_user: Optional[dict] = Depends(get_current_user), | |
| ) -> Dict[str, Any]: | |
| """ | |
| Send a contact message to the BantayPahayag team via email. | |
| The message is forwarded to bantaypahayag@gmail.com. | |
| """ | |
| subject = payload.subject.strip() | |
| message = payload.message.strip() | |
| sender_name = (payload.sender_name or "").strip() | |
| if not subject or len(subject) < 3: | |
| raise HTTPException(status_code=400, detail="Please provide a subject.") | |
| if not message or len(message) < 10: | |
| raise HTTPException(status_code=400, detail="Please write a message (at least 10 characters).") | |
| # Identify the sender | |
| user_email = "anonymous" | |
| user_name = sender_name or "Anonymous User" | |
| if current_user is not None: | |
| try: | |
| user = get_user_by_id(int(current_user["sub"])) | |
| if user: | |
| user_email = user.get("email", "unknown") | |
| user_name = sender_name or user.get("username", "User") | |
| except Exception: | |
| pass | |
| # Send via Resend API | |
| import requests | |
| CONTACT_EMAIL = "bantaypahayag@gmail.com" | |
| resend_api_key = os.environ.get("RESEND_API_KEY", "") | |
| if not resend_api_key: | |
| logger.warning("Resend API key not configured β contact message from %s lost", user_email) | |
| raise HTTPException(status_code=503, detail="Email service is not configured.") | |
| try: | |
| text_body = ( | |
| f"Contact Form Submission\n" | |
| f"{'=' * 40}\n\n" | |
| f"From: {user_name}\n" | |
| f"Email: {user_email}\n" | |
| f"Subject: {subject}\n\n" | |
| f"Message:\n{message}\n" | |
| ) | |
| html_body = f""" | |
| <div style="font-family: 'Segoe UI', Arial, sans-serif; max-width: 520px; margin: 0 auto; | |
| padding: 28px 24px; background: #ffffff; border-radius: 12px; | |
| border: 1px solid #e2e8f0;"> | |
| <div style="text-align: center; margin-bottom: 20px;"> | |
| <div style="display: inline-block; background: linear-gradient(135deg, #1E3A8A, #3B82F6); | |
| color: white; font-size: 18px; font-weight: 800; padding: 10px 20px; | |
| border-radius: 10px;">BantayPahayag</div> | |
| </div> | |
| <h2 style="color: #0F172A; font-size: 18px; margin: 0 0 16px 0;">New Contact Message</h2> | |
| <table style="width: 100%; border-collapse: collapse; margin-bottom: 16px;"> | |
| <tr><td style="color: #64748B; padding: 6px 0; font-size: 13px; width: 70px;">From:</td> | |
| <td style="color: #0F172A; padding: 6px 0; font-size: 13px; font-weight: 600;">{user_name}</td></tr> | |
| <tr><td style="color: #64748B; padding: 6px 0; font-size: 13px;">Email:</td> | |
| <td style="color: #0F172A; padding: 6px 0; font-size: 13px;">{user_email}</td></tr> | |
| <tr><td style="color: #64748B; padding: 6px 0; font-size: 13px;">Subject:</td> | |
| <td style="color: #0F172A; padding: 6px 0; font-size: 13px; font-weight: 600;">{subject}</td></tr> | |
| </table> | |
| <div style="background: #F8FAFC; border: 1px solid #E2E8F0; border-radius: 8px; | |
| padding: 16px; margin-bottom: 16px;"> | |
| <p style="color: #334155; font-size: 13px; line-height: 1.7; margin: 0; | |
| white-space: pre-wrap;">{message}</p> | |
| </div> | |
| <hr style="border: none; border-top: 1px solid #e2e8f0; margin: 16px 0;" /> | |
| <p style="color: #CBD5E1; font-size: 11px; text-align: center; margin: 0;">Sent via BantayPahayag Contact Form (Resend API)</p> | |
| </div> | |
| """ | |
| payload = { | |
| "from": "BantayPahayag <onboarding@resend.dev>", | |
| "to": [CONTACT_EMAIL], | |
| "subject": f"[BantayPahayag Contact] {subject}", | |
| "html": html_body, | |
| "text": text_body | |
| } | |
| if user_email and user_email != "anonymous": | |
| payload["reply_to"] = user_email | |
| headers = { | |
| "Authorization": f"Bearer {resend_api_key}", | |
| "Content-Type": "application/json" | |
| } | |
| resp = requests.post("https://api.resend.com/emails", json=payload, headers=headers, timeout=10) | |
| if resp.status_code >= 400: | |
| logger.error("Resend API error: %s", resp.text) | |
| raise HTTPException(status_code=500, detail="Failed to send email. Please try again later.") | |
| logger.info("Contact email sent from %s (%s): %s", user_name, user_email, subject) | |
| except HTTPException: | |
| raise | |
| except Exception as exc: | |
| logger.error("Failed to send contact email: %s", exc) | |
| raise HTTPException(status_code=500, detail="Failed to send email. Please try again later.") | |
| return {"success": True, "message": "Your message has been sent. We'll get back to you soon!"} | |
| if __name__ == "__main__": | |
| port = int(os.environ.get("PORT", 10000)) | |
| logger.info("Starting uvicorn on 0.0.0.0:%s Γ’β¬Β¦", port) | |
| uvicorn.run("api.main:app", host="0.0.0.0", port=port) | |