import os import sys import json import logging import uvicorn from typing import Any, Dict, Optional from fastapi import FastAPI, HTTPException, Depends from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from starlette.concurrency import run_in_threadpool # ── Logging ────────────────────────────────────────────────────────── logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", ) logger = logging.getLogger(__name__) # Ensure Python can find project modules when running this file directly sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from api.auth import ( hash_password, verify_password, create_access_token, get_current_user, require_current_user, ) from db.database import ( init_db, create_user, get_user_by_email, get_user_by_id, get_user_by_username, update_username, update_user_password, delete_user, set_verification_code, verify_email_code, is_email_verified, save_check_history, get_user_history, get_article_count, ) from api.email_utils import generate_verification_code, send_verification_email, send_password_reset_email from datetime import datetime, timedelta # ── Lazy FactChecker singleton ─────────────────────────────────────── _fact_checker_instance = None def get_fact_checker(): """ Lazily instantiate the FactChecker and reuse it across requests. Retries on every call until successful. """ global _fact_checker_instance if _fact_checker_instance is not None: return _fact_checker_instance try: logger.info("Initializing FactChecker (models + DB) …") from checker.fact_checker import FactChecker _fact_checker_instance = FactChecker() logger.info("FactChecker ready ✓") return _fact_checker_instance except Exception as exc: logger.error("FactChecker failed to initialize: %s", exc, exc_info=True) raise # ── Request / Response models ──────────────────────────────────────── class CheckRequest(BaseModel): text: str headline: Optional[str] = None class CheckURLRequest(BaseModel): url: str class RegisterRequest(BaseModel): email: str username: str password: str class LoginRequest(BaseModel): username: str password: str class UpdateProfileRequest(BaseModel): username: str class ChangePasswordRequest(BaseModel): current_password: str new_password: str class DeleteAccountRequest(BaseModel): password: str class CheckUsernameRequest(BaseModel): username: str class ForgotPasswordRequest(BaseModel): identity: str class ResetPasswordRequest(BaseModel): identity: str code: str new_password: str class CheckEmailRequest(BaseModel): email: str class VerifyEmailRequest(BaseModel): code: str class ResendVerificationRequest(BaseModel): pass class ContactRequest(BaseModel): subject: str message: str sender_name: Optional[str] = None # ── App ────────────────────────────────────────────────────────────── app = FastAPI( title="FactCheckThesis API", description="HTTP API wrapper around the FactChecker engine for mobile and web clients.", version="1.0.0", ) # Allow all origins for now so the Flutter app and any dev tools can call this API. app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @app.on_event("startup") async def on_startup(): """Initialize DB tables and log that the server is up.""" try: init_db() logger.info("Database tables initialized ✓") except Exception as exc: logger.warning("Could not initialize DB tables: %s", exc) # One-time migration: mark pre-existing accounts as email verified try: from db.database import get_connection conn = get_connection() cursor = conn.cursor() cursor.execute( "UPDATE users SET email_verified = TRUE " "WHERE verification_code IS NULL AND email_verified = FALSE" ) affected = cursor.rowcount conn.commit() conn.close() if affected > 0: logger.info("Migration: marked %d pre-existing account(s) as email_verified", affected) except Exception as exc: logger.warning("Migration (email_verified) skipped: %s", exc) port = os.environ.get("PORT", "10000") logger.info("🚀 FastAPI server is UP on port %s", port) # ── Health / root ──────────────────────────────────────────────────── @app.get("/") async def root() -> Dict[str, str]: return {"status": "ok"} @app.get("/health") async def health() -> Dict[str, str]: """Lightweight health-check for deployment (always 200, even before models load).""" return {"status": "healthy"} # ── Auth endpoints ─────────────────────────────────────────────────── @app.post("/api/register") async def register(payload: RegisterRequest) -> Dict[str, Any]: """ Create a new user account. Generates a verification code and sends it to the user's email. Returns the user info + an access token on success. """ if len(payload.password) < 6: raise HTTPException(status_code=400, detail="Password must be at least 6 characters.") if not payload.email or not payload.username: raise HTTPException(status_code=400, detail="Email and username are required.") # Check email and username uniqueness first existing_email = get_user_by_email(payload.email.lower().strip()) if existing_email is not None: if existing_email.get("email_verified") is False: delete_user(existing_email["id"]) else: raise HTTPException(status_code=409, detail="Email is already used.") existing_username = get_user_by_username(payload.username.strip()) if existing_username is not None: if existing_username.get("email_verified") is False: delete_user(existing_username["id"]) else: raise HTTPException(status_code=409, detail="This username is already taken. Please choose another.") hashed = hash_password(payload.password) user = create_user(payload.email.lower().strip(), payload.username.strip(), hashed) if user is None: raise HTTPException(status_code=409, detail="Registration failed due to a database constraint.") # Generate and send verification code code = generate_verification_code() expires_at = datetime.now() + timedelta(minutes=10) set_verification_code(user["id"], code, expires_at) email_sent = send_verification_email( to_email=payload.email.lower().strip(), code=code, username=payload.username.strip(), ) token = create_access_token(user["id"], user["email"]) return { "token": token, "user": { "id": user["id"], "email": user["email"], "username": user["username"], "email_verified": False, }, "verification_email_sent": email_sent, } @app.post("/api/login") async def login(payload: LoginRequest) -> Dict[str, Any]: """ Authenticate a user with username (or email) + password. Returns user info + access token on success. Includes email_verified status so the app can redirect unverified users. """ try: identity = payload.username.strip() if "@" in identity: user = get_user_by_email(identity.lower()) else: user = get_user_by_username(identity) # Enforce exact case match for usernames if user and user["username"] != identity: user = None if user is None or not verify_password(payload.password, user["password_hash"]): raise HTTPException(status_code=401, detail="Invalid credentials.") verified = is_email_verified(user["id"]) token = create_access_token(user["id"], user["email"]) return { "token": token, "user": { "id": user["id"], "email": user["email"], "username": user["username"], "email_verified": verified, }, } except HTTPException: raise except Exception as exc: logger.error("Login error for %s: %s", payload.username, exc, exc_info=True) raise HTTPException(status_code=500, detail=f"Internal login error: {str(exc)}") @app.get("/api/me") async def me(current_user: dict = Depends(require_current_user)) -> Dict[str, Any]: """Return the profile of the currently authenticated user.""" user = get_user_by_id(int(current_user["sub"])) if user is None: raise HTTPException(status_code=404, detail="User not found.") # Convert datetime to string for JSON user["created_at"] = str(user["created_at"]) return {"user": user} # ── Email verification endpoints ───────────────────────────────────── @app.post("/api/verify-email") async def verify_email( payload: VerifyEmailRequest, current_user: dict = Depends(require_current_user), ) -> Dict[str, Any]: """Verify the user's email with a 6-digit code.""" user_id = int(current_user["sub"]) code = payload.code.strip() if not code or len(code) != 6: raise HTTPException(status_code=400, detail="Please enter a valid 6-digit code.") result = verify_email_code(user_id, code) if result == "ok": return {"success": True, "message": "Email verified successfully!"} elif result == "expired": raise HTTPException(status_code=410, detail="Verification code has expired. Please request a new one.") else: raise HTTPException(status_code=400, detail="Invalid verification code. Please try again.") @app.post("/api/resend-verification") async def resend_verification( current_user: dict = Depends(require_current_user), ) -> Dict[str, Any]: """Resend the verification code email.""" user_id = int(current_user["sub"]) user = get_user_by_id(user_id) if user is None: raise HTTPException(status_code=404, detail="User not found.") if user.get("email_verified"): return {"success": True, "message": "Email is already verified."} code = generate_verification_code() expires_at = datetime.now() + timedelta(minutes=10) set_verification_code(user_id, code, expires_at) email_sent = send_verification_email( to_email=user["email"], code=code, username=user["username"], ) if email_sent: return {"success": True, "message": "Verification code sent to your email."} else: raise HTTPException( status_code=500, detail="Could not send verification email. Please check that your email address is correct and try again.", ) # ── History endpoint ───────────────────────────────────────────────── @app.get("/api/history") async def history(current_user: dict = Depends(require_current_user)) -> Dict[str, Any]: """Return the fact-check history for the authenticated user.""" user_id = int(current_user["sub"]) rows = get_user_history(user_id) # Convert datetime objects to strings for row in rows: row["checked_at"] = str(row["checked_at"]) return {"history": rows} # ── Check username availability ─────────────────────────────────────── @app.post("/api/check-username") async def check_username(payload: CheckUsernameRequest) -> Dict[str, Any]: """Check if a username is available.""" username = payload.username.strip() if not username or len(username) < 3: return {"available": False, "reason": "Username must be at least 3 characters."} if len(username) > 30: return {"available": False, "reason": "Username must be at most 30 characters."} existing = get_user_by_username(username) if existing is not None: return {"available": False, "reason": "This username is already taken."} return {"available": True} # ── Check email availability ─────────────────────────────────────── @app.post("/api/check-email") async def check_email(payload: CheckEmailRequest) -> Dict[str, Any]: """Check if an email is available.""" email = payload.email.lower().strip() if not email or "@" not in email: return {"available": False, "reason": "Please enter a valid email address."} existing = get_user_by_email(email) if existing is not None: return {"available": False, "reason": "Email is already used."} return {"available": True} # ── Update profile (username) ───────────────────────────────────────── @app.put("/api/update-profile") async def update_profile( payload: UpdateProfileRequest, current_user: dict = Depends(require_current_user), ) -> Dict[str, Any]: """Update the current user's username.""" username = payload.username.strip() if not username or len(username) < 3: raise HTTPException(status_code=400, detail="Username must be at least 3 characters.") if len(username) > 30: raise HTTPException(status_code=400, detail="Username must be at most 30 characters.") user_id = int(current_user["sub"]) updated = update_username(user_id, username) if updated is None: raise HTTPException(status_code=409, detail="This username is already taken.") return { "success": True, "user": { "id": updated["id"], "email": updated["email"], "username": updated["username"], }, } # ── Change password ────────────────────────────────────────────────── @app.put("/api/change-password") async def change_password( payload: ChangePasswordRequest, current_user: dict = Depends(require_current_user), ) -> Dict[str, Any]: """Change the current user's password.""" user_id = int(current_user["sub"]) user = get_user_by_id(user_id) if user is None: raise HTTPException(status_code=404, detail="User not found.") # Verify current password full_user = get_user_by_email(user["email"]) if full_user is None or not verify_password(payload.current_password, full_user["password_hash"]): raise HTTPException(status_code=401, detail="Current password is incorrect.") # Validate new password strength new_pw = payload.new_password if len(new_pw) < 8: raise HTTPException(status_code=400, detail="New password must be at least 8 characters.") new_hash = hash_password(new_pw) success = update_user_password(user_id, new_hash) if not success: raise HTTPException(status_code=500, detail="Failed to update password.") return {"success": True, "message": "Password updated successfully."} # ── Forgot / Reset password ──────────────────────────────────────────────── def mask_email(email: str) -> str: parts = email.split('@') if len(parts) != 2: return email name, domain = parts if len(name) <= 2: masked_name = name[0] + "*" * len(name) else: masked_name = name[0] + "*" * (len(name) - 2) + name[-1] return f"{masked_name}@{domain}" @app.post("/api/forgot-password") async def forgot_password(payload: ForgotPasswordRequest) -> Dict[str, Any]: """Initiates the password reset flow.""" identity = payload.identity.strip() user = get_user_by_email(identity.lower()) if not user: user = get_user_by_username(identity) if user and user["username"] != identity: user = None if not user: raise HTTPException(status_code=400, detail="Account does not exist.") code = generate_verification_code() expires_at = datetime.utcnow() + timedelta(minutes=10) if not set_verification_code(user["id"], code, expires_at): raise HTTPException(status_code=500, detail="Failed to save reset code") # Fire-and-forget: email is sent in background thread via Resend/SMTP send_password_reset_email(user["email"], code, user["username"]) masked = mask_email(user["email"]) return {"message": "If that account exists, a reset code has been sent.", "email": masked} @app.post("/api/reset-password") async def reset_password(payload: ResetPasswordRequest) -> Dict[str, Any]: """Verifies the code and resets the password.""" identity = payload.identity.strip() user = get_user_by_email(identity.lower()) if not user: user = get_user_by_username(identity) if user and user["username"] != identity: user = None if not user: raise HTTPException(status_code=400, detail="Invalid request. Account not found.") status = verify_email_code(user["id"], payload.code.strip()) if status != 'ok': raise HTTPException(status_code=400, detail="Invalid or expired reset code.") if len(payload.new_password) < 8: raise HTTPException(status_code=400, detail="New password must be at least 8 characters.") from api.auth import hash_password hashed_pw = hash_password(payload.new_password) success = update_user_password(user["id"], hashed_pw) if not success: raise HTTPException(status_code=500, detail="Failed to reset password.") return {"message": "Password reset successfully."} # ── Delete account ─────────────────────────────────────────────────── @app.delete("/api/delete-account") async def delete_account( payload: DeleteAccountRequest, current_user: dict = Depends(require_current_user), ) -> Dict[str, Any]: """Permanently delete the current user's account and all data.""" user_id = int(current_user["sub"]) user = get_user_by_id(user_id) if user is None: raise HTTPException(status_code=404, detail="User not found.") # Verify password before deletion full_user = get_user_by_email(user["email"]) if full_user is None or not verify_password(payload.password, full_user["password_hash"]): raise HTTPException(status_code=401, detail="Incorrect password. Account was not deleted.") success = delete_user(user_id) if not success: raise HTTPException(status_code=500, detail="Failed to delete account.") return {"success": True, "message": "Account deleted successfully."} # ── Fact-check endpoint ────────────────────────────────────────────── class AnalyzeStructureRequest(BaseModel): text: str @app.post("/api/analyze-structure") async def api_analyze_structure(payload: AnalyzeStructureRequest) -> Dict[str, Any]: """ Exposes the structural (inverted pyramid) analysis algorithm to the frontend Live Writing Guide without running full ML/URL checks. """ if not payload.text or len(payload.text.strip()) < 30: return { "language": "unknown", "formalism_score": 0, "assessment": "Text too short for structure analysis.", "recommendations": [], } from checker.internal.structure_analyzer import analyze_structure try: res = analyze_structure(payload.text) return res except Exception as exc: logger.error("Error in analyze_structure: %s", exc) raise HTTPException( status_code=500, detail=f"Structure analysis failed: {exc}" ) @app.post("/api/check") async def check_article( payload: CheckRequest, current_user: Optional[dict] = Depends(get_current_user), ) -> Dict[str, Any]: """ Run the full fact-check pipeline on the provided text. If the user is authenticated (Bearer token), the result is saved to their history. Returns the same structured dict as checker.fact_checker.FactChecker.check. """ try: fc = get_fact_checker() except Exception as exc: raise HTTPException( status_code=503, detail=f"FactChecker is not available: {exc}", ) # ── Input quality check: reject gibberish / random text ── import re as _re raw_text = payload.text.strip() # Strip URLs before word-count check clean_input = _re.sub(r"https?://\S+", "", raw_text).strip() words = clean_input.split() if len(words) < 10: raise HTTPException( status_code=400, detail="Please provide at least 10 words for a meaningful analysis.", ) # Language detection — reject unrecognizable text try: from langdetect import detect, DetectorFactory DetectorFactory.seed = 0 detected_lang = detect(clean_input) # Accept common languages; langdetect returns 'en', 'tl', 'fil', etc. # If it detects something but with very low confidence, it still returns a code. # The key insight: truly random gibberish often gets detected as an unlikely # language with low confidence. We can also check vowel ratio. except Exception: # langdetect throws LangDetectException for gibberish it can't classify raise HTTPException( status_code=400, detail="The text appears to be gibberish or random characters. Please provide a real article or claim.", ) # Vowel ratio check — real text in most languages has ~30-50% vowels alpha_chars = [c.lower() for c in clean_input if c.isalpha()] if len(alpha_chars) > 20: vowel_count = sum(1 for c in alpha_chars if c in "aeiouAEIOU") vowel_ratio = vowel_count / len(alpha_chars) if vowel_ratio < 0.10: # Less than 10% vowels = likely gibberish raise HTTPException( status_code=400, detail="The text appears to be random characters. Please provide a real article or claim to verify.", ) try: result = await run_in_threadpool(fc.check, payload.text, headline=payload.headline) except Exception as e: import traceback error_info = traceback.format_exc() logger.error(f"Error in fc.check: {error_info}") raise HTTPException( status_code=500, detail=f"Internal Server Error during AI analysis: {error_info}" ) # Save to history if user is logged in if current_user is not None: try: user_id = int(current_user["sub"]) verdict = result.get("final_verdict", "UNKNOWN") save_check_history(user_id, payload.text, verdict, json.dumps(result)) except Exception as exc: logger.warning("Failed to save check history: %s", exc) return result # ── Fact-check URL endpoint ────────────────────────────────────────── @app.post("/api/check-url") async def check_url( payload: CheckURLRequest, current_user: Optional[dict] = Depends(get_current_user), ) -> Dict[str, Any]: """ Extract article text from a URL, then run the full fact-check pipeline. Uses trafilatura to scrape the main content from the given URL. If the user is authenticated, the result is saved to their history. """ import trafilatura import re url = payload.url.strip() if not url: raise HTTPException(status_code=400, detail="URL is required.") # ── Detect social media URLs that block scraping ───────────── social_media_patterns = { r"(facebook\.com|fb\.com|fb\.watch)": "Facebook", r"instagram\.com": "Instagram", r"(tiktok\.com|vm\.tiktok\.com)": "TikTok", r"(twitter\.com|x\.com)": "X (Twitter)", r"threads\.net": "Threads", } url_lower = url.lower() for pattern, platform in social_media_patterns.items(): if re.search(pattern, url_lower): raise HTTPException( status_code=422, detail=( f"{platform} posts cannot be automatically extracted because " f"{platform} blocks external access to its content. " f"Instead, please copy the text from the {platform} post " f"and use the 'Proofread Article' feature to fact-check it." ), ) # ── Fetch and extract article content ──────────────────────── try: downloaded = trafilatura.fetch_url(url) if downloaded is None: raise HTTPException( status_code=422, detail="Could not fetch the URL. The site may be blocking automated access. Please check the link or try copying the article text into the Proofread feature.", ) text = trafilatura.extract( downloaded, include_comments=False, include_tables=False, no_fallback=False, ) if not text or len(text.strip()) < 50: raise HTTPException( status_code=422, detail="Could not extract enough article text from this URL. The page may be paywalled, login-protected, or not a news article. Try copying the text into the Proofread feature instead.", ) # Try to get the title metadata = trafilatura.extract( downloaded, output_format="json", include_comments=False, include_tables=False, ) title = None if metadata: import json as json_mod try: meta_dict = json_mod.loads(metadata) title = meta_dict.get("title", None) except Exception: pass except HTTPException: raise except Exception as exc: logger.error("URL extraction failed for %s: %s", url, exc) raise HTTPException( status_code=422, detail=f"Failed to extract article content: {exc}", ) # Run fact-check on extracted text try: fc = get_fact_checker() except Exception as exc: raise HTTPException( status_code=503, detail=f"FactChecker is not available: {exc}", ) result = await run_in_threadpool(fc.check, text, headline=title) # Add source URL and title to result result["source_url"] = url result["extracted_title"] = title # Save to history if user is logged in if current_user is not None: try: user_id = int(current_user["sub"]) verdict = result.get("final_verdict", "UNKNOWN") history_text = f"[URL] {url}\n\n{text[:500]}" save_check_history(user_id, history_text, verdict, json.dumps(result)) except Exception as exc: logger.warning("Failed to save check history: %s", exc) return result # ── Lightweight structure analysis (for live writing guide) ────────── class StructureAnalysisRequest(BaseModel): text: str headline: Optional[str] = None @app.post("/api/analyze-structure") async def analyze_structure_endpoint( payload: StructureAnalysisRequest, ) -> Dict[str, Any]: """ Lightweight structure analysis — regex only, no ML, no DB. Designed for real-time calls while the user is typing. Returns 5W+1H coverage, attribution, transitions, formalism score, and recommendations. """ raw_text = payload.text.strip() if len(raw_text) < 20: return { "formalism_score": 0, "lead_5w1h": {}, "has_attribution": False, "attribution_count": 0, "has_transitions": False, "transition_count": 0, "has_formal_markers": False, "assessment": "", "signals_found": [], "recommendations": [], } # Only analyze the body text — headline is excluded to avoid # inflating 5W+1H scores (headlines naturally contain WHO/WHAT) from checker.internal.structure_analyzer import analyze_structure result = analyze_structure(raw_text) return result # ── Translation endpoint ───────────────────────────────────────────── class TranslateRequest(BaseModel): text: str target_lang: str # 'en', 'fil', 'ceb' @app.post("/api/translate") async def translate_text( payload: TranslateRequest, current_user: Optional[dict] = Depends(get_current_user), ) -> Dict[str, Any]: """ Translate article text to the specified target language. Supported target languages: 'en' (English), 'fil' (Filipino), 'ceb' (Cebuano). Uses Google Translate via deep-translator. """ text = payload.text.strip() target = payload.target_lang.strip().lower() if not text: raise HTTPException(status_code=400, detail="Text is required.") # Map our app's language codes to Google Translate codes lang_map = { "en": "en", "fil": "tl", # Google Translate uses 'tl' for Filipino/Tagalog "ceb": "ceb", # Google Translate supports Cebuano } target_code = lang_map.get(target) if target_code is None: raise HTTPException( status_code=400, detail=f"Unsupported target language '{target}'. Use 'en', 'fil', or 'ceb'.", ) try: from deep_translator import GoogleTranslator # Split text into chunks if > 5000 chars (Google Translate limit) max_chunk = 4500 if len(text) <= max_chunk: translated = GoogleTranslator(source="auto", target=target_code).translate(text) else: # Split by paragraphs and translate chunk by chunk paragraphs = text.split("\n") chunks = [] current_chunk = "" for para in paragraphs: if len(current_chunk) + len(para) + 1 > max_chunk: if current_chunk: chunks.append(current_chunk) current_chunk = para else: current_chunk = current_chunk + "\n" + para if current_chunk else para if current_chunk: chunks.append(current_chunk) translator = GoogleTranslator(source="auto", target=target_code) translated_chunks = [translator.translate(chunk) for chunk in chunks] translated = "\n".join(translated_chunks) # Detect source language detected_source = "unknown" try: from langdetect import detect, DetectorFactory DetectorFactory.seed = 0 detected_source = detect(text) except Exception: pass lang_names = {"en": "English", "fil": "Filipino", "ceb": "Cebuano"} return { "translated_text": translated, "target_language": target, "target_language_name": lang_names.get(target, target), "detected_source_language": detected_source, } except ImportError: raise HTTPException( status_code=503, detail="Translation service not available. deep-translator package is not installed.", ) except Exception as exc: logger.error("Translation failed: %s", exc, exc_info=True) raise HTTPException( status_code=500, detail=f"Translation failed: {exc}", ) # ── Manual scrape trigger ──────────────────────────────────────────── @app.post("/api/scrape-news") async def scrape_news() -> Dict[str, Any]: """ Manually trigger a news scrape to refresh the external articles database. Runs in the background and returns immediately. """ from checker.external.core import trigger_scrape_once trigger_scrape_once() total = get_article_count() return { "status": "scrape_started", "message": "Background news scraping triggered.", "current_article_count": total, } # ── Article count endpoint ─────────────────────────────────────────── @app.get("/api/article-count") async def article_count() -> Dict[str, Any]: """Return the total number of scraped articles in the external database.""" count = get_article_count() return {"article_count": count} # ── Contact Us endpoint ──────────────────────────────────────────── @app.post("/api/contact") async def contact_us( payload: ContactRequest, current_user: Optional[dict] = Depends(get_current_user), ) -> Dict[str, Any]: """ Send a contact message to the BantayPahayag team via email. The message is forwarded to bantaypahayag@gmail.com. """ subject = payload.subject.strip() message = payload.message.strip() sender_name = (payload.sender_name or "").strip() if not subject or len(subject) < 3: raise HTTPException(status_code=400, detail="Please provide a subject.") if not message or len(message) < 10: raise HTTPException(status_code=400, detail="Please write a message (at least 10 characters).") # Identify the sender user_email = "anonymous" user_name = sender_name or "Anonymous User" if current_user is not None: try: user = get_user_by_id(int(current_user["sub"])) if user: user_email = user.get("email", "unknown") user_name = sender_name or user.get("username", "User") except Exception: pass # Send via Resend API import requests CONTACT_EMAIL = "bantaypahayag@gmail.com" resend_api_key = os.environ.get("RESEND_API_KEY", "") if not resend_api_key: logger.warning("Resend API key not configured — contact message from %s lost", user_email) raise HTTPException(status_code=503, detail="Email service is not configured.") try: text_body = ( f"Contact Form Submission\n" f"{'=' * 40}\n\n" f"From: {user_name}\n" f"Email: {user_email}\n" f"Subject: {subject}\n\n" f"Message:\n{message}\n" ) html_body = f"""
BantayPahayag

New Contact Message

From: {user_name}
Email: {user_email}
Subject: {subject}

{message}


Sent via BantayPahayag Contact Form (Resend API)

""" payload = { "from": "BantayPahayag ", "to": [CONTACT_EMAIL], "subject": f"[BantayPahayag Contact] {subject}", "html": html_body, "text": text_body } if user_email and user_email != "anonymous": payload["reply_to"] = user_email headers = { "Authorization": f"Bearer {resend_api_key}", "Content-Type": "application/json" } resp = requests.post("https://api.resend.com/emails", json=payload, headers=headers, timeout=10) if resp.status_code >= 400: logger.error("Resend API error: %s", resp.text) raise HTTPException(status_code=500, detail="Failed to send email. Please try again later.") logger.info("Contact email sent from %s (%s): %s", user_name, user_email, subject) except HTTPException: raise except Exception as exc: logger.error("Failed to send contact email: %s", exc) raise HTTPException(status_code=500, detail="Failed to send email. Please try again later.") return {"success": True, "message": "Your message has been sent. We'll get back to you soon!"} if __name__ == "__main__": port = int(os.environ.get("PORT", 10000)) logger.info("Starting uvicorn on 0.0.0.0:%s …", port) uvicorn.run("api.main:app", host="0.0.0.0", port=port)