ThesisProject / api /main.py
JeyBii's picture
Sync updated source code api/main.py including partner integration
38e8c68 verified
Raw
History Blame Contribute Delete
44.4 kB
import os
import sys
import json
import logging
import uvicorn
from typing import Any, Dict, Optional
from fastapi import FastAPI, HTTPException, Depends
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from starlette.concurrency import run_in_threadpool
# Ò”€Ò”€ Logging Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)
logger = logging.getLogger(__name__)
# Ensure Python can find project modules when running this file directly
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from api.auth import (
hash_password,
verify_password,
create_access_token,
get_current_user,
require_current_user,
)
from db.database import (
init_db,
create_user,
get_user_by_email,
get_user_by_id,
get_user_by_username,
update_username,
update_user_password,
delete_user,
set_verification_code,
verify_email_code,
is_email_verified,
save_check_history,
get_user_history,
get_article_count,
)
from api.email_utils import generate_verification_code, send_verification_email, send_password_reset_email
from datetime import datetime, timedelta
# Ò”€Ò”€ Lazy FactChecker singleton Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€
_fact_checker_instance = None
def get_fact_checker():
"""
Lazily instantiate the FactChecker and reuse it across requests.
Retries on every call until successful.
"""
global _fact_checker_instance
if _fact_checker_instance is not None:
return _fact_checker_instance
try:
logger.info("Initializing FactChecker (models + DB) Ò€¦")
from checker.fact_checker import FactChecker
_fact_checker_instance = FactChecker()
logger.info("FactChecker ready Γ’Ε“β€œ")
return _fact_checker_instance
except Exception as exc:
logger.error("FactChecker failed to initialize: %s", exc, exc_info=True)
raise
# Ò”€Ò”€ Request / Response models Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€
class CheckRequest(BaseModel):
text: str
headline: Optional[str] = None
class CheckURLRequest(BaseModel):
url: str
class RegisterRequest(BaseModel):
email: str
username: str
password: str
class LoginRequest(BaseModel):
username: str
password: str
class UpdateProfileRequest(BaseModel):
username: str
class ChangePasswordRequest(BaseModel):
current_password: str
new_password: str
class DeleteAccountRequest(BaseModel):
password: str
class CheckUsernameRequest(BaseModel):
username: str
class ForgotPasswordRequest(BaseModel):
identity: str
class ResetPasswordRequest(BaseModel):
identity: str
code: str
new_password: str
class CheckEmailRequest(BaseModel):
email: str
class VerifyEmailRequest(BaseModel):
code: str
class ResendVerificationRequest(BaseModel):
pass
class ContactRequest(BaseModel):
subject: str
message: str
sender_name: Optional[str] = None
# Ò”€Ò”€ App Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€
app = FastAPI(
title="FactCheckThesis API",
description="HTTP API wrapper around the FactChecker engine for mobile and web clients.",
version="1.0.0",
)
# Allow all origins for now so the Flutter app and any dev tools can call this API.
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.on_event("startup")
async def on_startup():
"""Initialize DB tables and log that the server is up."""
try:
init_db()
logger.info("Database tables initialized Γ’Ε“β€œ")
except Exception as exc:
logger.warning("Could not initialize DB tables: %s", exc)
# One-time migration: mark pre-existing accounts as email verified
try:
from db.database import get_connection
conn = get_connection()
cursor = conn.cursor()
cursor.execute(
"UPDATE users SET email_verified = TRUE "
"WHERE verification_code IS NULL AND email_verified = FALSE"
)
affected = cursor.rowcount
conn.commit()
conn.close()
if affected > 0:
logger.info("Migration: marked %d pre-existing account(s) as email_verified", affected)
except Exception as exc:
logger.warning("Migration (email_verified) skipped: %s", exc)
port = os.environ.get("PORT", "10000")
logger.info("ðŸő€ FastAPI server is UP on port %s", port)
# Ò”€Ò”€ Health / root Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€
@app.get("/")
async def root() -> Dict[str, str]:
return {"status": "ok"}
@app.get("/health")
async def health() -> Dict[str, str]:
"""Lightweight health-check for deployment (always 200, even before models load)."""
return {"status": "healthy"}
# Ò”€Ò”€ Auth endpoints Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€
@app.post("/api/register")
async def register(payload: RegisterRequest) -> Dict[str, Any]:
"""
Create a new user account.
Generates a verification code and sends it to the user's email.
Returns the user info + an access token on success.
"""
if len(payload.password) < 6:
raise HTTPException(status_code=400, detail="Password must be at least 6 characters.")
if not payload.email or not payload.username:
raise HTTPException(status_code=400, detail="Email and username are required.")
# Check email and username uniqueness first
existing_email = get_user_by_email(payload.email.lower().strip())
if existing_email is not None:
if existing_email.get("email_verified") is False:
delete_user(existing_email["id"])
else:
raise HTTPException(status_code=409, detail="Email is already used.")
existing_username = get_user_by_username(payload.username.strip())
if existing_username is not None:
if existing_username.get("email_verified") is False:
delete_user(existing_username["id"])
else:
raise HTTPException(status_code=409, detail="This username is already taken. Please choose another.")
hashed = hash_password(payload.password)
user = create_user(payload.email.lower().strip(), payload.username.strip(), hashed)
if user is None:
raise HTTPException(status_code=409, detail="Registration failed due to a database constraint.")
# Generate and send verification code
code = generate_verification_code()
expires_at = datetime.now() + timedelta(minutes=10)
set_verification_code(user["id"], code, expires_at)
email_sent = send_verification_email(
to_email=payload.email.lower().strip(),
code=code,
username=payload.username.strip(),
)
token = create_access_token(user["id"], user["email"])
return {
"token": token,
"user": {
"id": user["id"],
"email": user["email"],
"username": user["username"],
"email_verified": False,
},
"verification_email_sent": email_sent,
}
@app.post("/api/login")
async def login(payload: LoginRequest) -> Dict[str, Any]:
"""
Authenticate a user with username (or email) + password.
Returns user info + access token on success.
Includes email_verified status so the app can redirect unverified users.
"""
try:
identity = payload.username.strip()
if "@" in identity:
user = get_user_by_email(identity.lower())
else:
user = get_user_by_username(identity)
# Enforce exact case match for usernames
if user and user["username"] != identity:
user = None
if user is None or not verify_password(payload.password, user["password_hash"]):
raise HTTPException(status_code=401, detail="Invalid credentials.")
verified = is_email_verified(user["id"])
token = create_access_token(user["id"], user["email"])
return {
"token": token,
"user": {
"id": user["id"],
"email": user["email"],
"username": user["username"],
"email_verified": verified,
},
}
except HTTPException:
raise
except Exception as exc:
logger.error("Login error for %s: %s", payload.username, exc, exc_info=True)
raise HTTPException(status_code=500, detail=f"Internal login error: {str(exc)}")
@app.get("/api/me")
async def me(current_user: dict = Depends(require_current_user)) -> Dict[str, Any]:
"""Return the profile of the currently authenticated user."""
user = get_user_by_id(int(current_user["sub"]))
if user is None:
raise HTTPException(status_code=404, detail="User not found.")
# Convert datetime to string for JSON
user["created_at"] = str(user["created_at"])
return {"user": user}
# Ò”€Ò”€ Email verification endpoints Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€
@app.post("/api/verify-email")
async def verify_email(
payload: VerifyEmailRequest,
current_user: dict = Depends(require_current_user),
) -> Dict[str, Any]:
"""Verify the user's email with a 6-digit code."""
user_id = int(current_user["sub"])
code = payload.code.strip()
if not code or len(code) != 6:
raise HTTPException(status_code=400, detail="Please enter a valid 6-digit code.")
result = verify_email_code(user_id, code)
if result == "ok":
return {"success": True, "message": "Email verified successfully!"}
elif result == "expired":
raise HTTPException(status_code=410, detail="Verification code has expired. Please request a new one.")
else:
raise HTTPException(status_code=400, detail="Invalid verification code. Please try again.")
@app.post("/api/resend-verification")
async def resend_verification(
current_user: dict = Depends(require_current_user),
) -> Dict[str, Any]:
"""Resend the verification code email."""
user_id = int(current_user["sub"])
user = get_user_by_id(user_id)
if user is None:
raise HTTPException(status_code=404, detail="User not found.")
if user.get("email_verified"):
return {"success": True, "message": "Email is already verified."}
code = generate_verification_code()
expires_at = datetime.now() + timedelta(minutes=10)
set_verification_code(user_id, code, expires_at)
email_sent = send_verification_email(
to_email=user["email"],
code=code,
username=user["username"],
)
if email_sent:
return {"success": True, "message": "Verification code sent to your email."}
else:
raise HTTPException(
status_code=500,
detail="Could not send verification email. Please check that your email address is correct and try again.",
)
# Ò”€Ò”€ History endpoint Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€
@app.get("/api/history")
async def history(current_user: dict = Depends(require_current_user)) -> Dict[str, Any]:
"""Return the fact-check history for the authenticated user."""
user_id = int(current_user["sub"])
rows = get_user_history(user_id)
# Convert datetime objects to strings
for row in rows:
row["checked_at"] = str(row["checked_at"])
return {"history": rows}
# Ò”€Ò”€ Check username availability Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€
@app.post("/api/check-username")
async def check_username(payload: CheckUsernameRequest) -> Dict[str, Any]:
"""Check if a username is available."""
username = payload.username.strip()
if not username or len(username) < 3:
return {"available": False, "reason": "Username must be at least 3 characters."}
if len(username) > 30:
return {"available": False, "reason": "Username must be at most 30 characters."}
existing = get_user_by_username(username)
if existing is not None:
return {"available": False, "reason": "This username is already taken."}
return {"available": True}
# ── Check email availability ───────────────────────────────────────
@app.post("/api/check-email")
async def check_email(payload: CheckEmailRequest) -> Dict[str, Any]:
"""Check if an email is available."""
email = payload.email.lower().strip()
if not email or "@" not in email:
return {"available": False, "reason": "Please enter a valid email address."}
existing = get_user_by_email(email)
if existing is not None:
return {"available": False, "reason": "Email is already used."}
return {"available": True}
# Ò”€Ò”€ Update profile (username) Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€
@app.put("/api/update-profile")
async def update_profile(
payload: UpdateProfileRequest,
current_user: dict = Depends(require_current_user),
) -> Dict[str, Any]:
"""Update the current user's username."""
username = payload.username.strip()
if not username or len(username) < 3:
raise HTTPException(status_code=400, detail="Username must be at least 3 characters.")
if len(username) > 30:
raise HTTPException(status_code=400, detail="Username must be at most 30 characters.")
user_id = int(current_user["sub"])
updated = update_username(user_id, username)
if updated is None:
raise HTTPException(status_code=409, detail="This username is already taken.")
return {
"success": True,
"user": {
"id": updated["id"],
"email": updated["email"],
"username": updated["username"],
},
}
# Ò”€Ò”€ Change password Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€
@app.put("/api/change-password")
async def change_password(
payload: ChangePasswordRequest,
current_user: dict = Depends(require_current_user),
) -> Dict[str, Any]:
"""Change the current user's password."""
user_id = int(current_user["sub"])
user = get_user_by_id(user_id)
if user is None:
raise HTTPException(status_code=404, detail="User not found.")
# Verify current password
full_user = get_user_by_email(user["email"])
if full_user is None or not verify_password(payload.current_password, full_user["password_hash"]):
raise HTTPException(status_code=401, detail="Current password is incorrect.")
# Validate new password strength
new_pw = payload.new_password
if len(new_pw) < 8:
raise HTTPException(status_code=400, detail="New password must be at least 8 characters.")
new_hash = hash_password(new_pw)
success = update_user_password(user_id, new_hash)
if not success:
raise HTTPException(status_code=500, detail="Failed to update password.")
return {"success": True, "message": "Password updated successfully."}
# ── Forgot / Reset password ────────────────────────────────────────────────
def mask_email(email: str) -> str:
parts = email.split('@')
if len(parts) != 2: return email
name, domain = parts
if len(name) <= 2:
masked_name = name[0] + "*" * len(name)
else:
masked_name = name[0] + "*" * (len(name) - 2) + name[-1]
return f"{masked_name}@{domain}"
@app.post("/api/forgot-password")
async def forgot_password(payload: ForgotPasswordRequest) -> Dict[str, Any]:
"""Initiates the password reset flow."""
identity = payload.identity.strip()
user = get_user_by_email(identity.lower())
if not user:
user = get_user_by_username(identity)
if user and user["username"] != identity:
user = None
if not user:
raise HTTPException(status_code=400, detail="Account does not exist.")
code = generate_verification_code()
expires_at = datetime.utcnow() + timedelta(minutes=10)
if not set_verification_code(user["id"], code, expires_at):
raise HTTPException(status_code=500, detail="Failed to save reset code")
# Fire-and-forget: email is sent in background thread via Resend/SMTP
send_password_reset_email(user["email"], code, user["username"])
masked = mask_email(user["email"])
return {"message": "If that account exists, a reset code has been sent.", "email": masked}
@app.post("/api/reset-password")
async def reset_password(payload: ResetPasswordRequest) -> Dict[str, Any]:
"""Verifies the code and resets the password."""
identity = payload.identity.strip()
user = get_user_by_email(identity.lower())
if not user:
user = get_user_by_username(identity)
if user and user["username"] != identity:
user = None
if not user:
raise HTTPException(status_code=400, detail="Invalid request. Account not found.")
status = verify_email_code(user["id"], payload.code.strip())
if status != 'ok':
raise HTTPException(status_code=400, detail="Invalid or expired reset code.")
if len(payload.new_password) < 8:
raise HTTPException(status_code=400, detail="New password must be at least 8 characters.")
from api.auth import hash_password
hashed_pw = hash_password(payload.new_password)
success = update_user_password(user["id"], hashed_pw)
if not success:
raise HTTPException(status_code=500, detail="Failed to reset password.")
return {"message": "Password reset successfully."}
# Ò”€Ò”€ Delete account Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€
@app.delete("/api/delete-account")
async def delete_account(
payload: DeleteAccountRequest,
current_user: dict = Depends(require_current_user),
) -> Dict[str, Any]:
"""Permanently delete the current user's account and all data."""
user_id = int(current_user["sub"])
user = get_user_by_id(user_id)
if user is None:
raise HTTPException(status_code=404, detail="User not found.")
# Verify password before deletion
full_user = get_user_by_email(user["email"])
if full_user is None or not verify_password(payload.password, full_user["password_hash"]):
raise HTTPException(status_code=401, detail="Incorrect password. Account was not deleted.")
success = delete_user(user_id)
if not success:
raise HTTPException(status_code=500, detail="Failed to delete account.")
return {"success": True, "message": "Account deleted successfully."}
# Ò”€Ò”€ Fact-check endpoint Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€
class AnalyzeStructureRequest(BaseModel):
text: str
@app.post("/api/analyze-structure")
async def api_analyze_structure(payload: AnalyzeStructureRequest) -> Dict[str, Any]:
"""
Exposes the structural (inverted pyramid) analysis algorithm
to the frontend Live Writing Guide without running full ML/URL checks.
"""
if not payload.text or len(payload.text.strip()) < 30:
return {
"language": "unknown",
"formalism_score": 0,
"assessment": "Text too short for structure analysis.",
"recommendations": [],
}
from checker.internal.structure_analyzer import analyze_structure
try:
res = analyze_structure(payload.text)
return res
except Exception as exc:
logger.error("Error in analyze_structure: %s", exc)
raise HTTPException(
status_code=500,
detail=f"Structure analysis failed: {exc}"
)
@app.post("/api/check")
async def check_article(
payload: CheckRequest,
current_user: Optional[dict] = Depends(get_current_user),
) -> Dict[str, Any]:
"""
Run the full fact-check pipeline on the provided text.
If the user is authenticated (Bearer token), the result is saved to their history.
Returns the same structured dict as checker.fact_checker.FactChecker.check.
"""
try:
fc = get_fact_checker()
except Exception as exc:
raise HTTPException(
status_code=503,
detail=f"FactChecker is not available: {exc}",
)
# Ò”€Ò”€ Input quality check: reject gibberish / random text Ò”€Ò”€
import re as _re
raw_text = payload.text.strip()
# Strip URLs before word-count check
clean_input = _re.sub(r"https?://\S+", "", raw_text).strip()
words = clean_input.split()
if len(words) < 10:
raise HTTPException(
status_code=400,
detail="Please provide at least 10 words for a meaningful analysis.",
)
# Language detection Ò€” reject unrecognizable text
try:
from langdetect import detect, DetectorFactory
DetectorFactory.seed = 0
detected_lang = detect(clean_input)
# Accept common languages; langdetect returns 'en', 'tl', 'fil', etc.
# If it detects something but with very low confidence, it still returns a code.
# The key insight: truly random gibberish often gets detected as an unlikely
# language with low confidence. We can also check vowel ratio.
except Exception:
# langdetect throws LangDetectException for gibberish it can't classify
raise HTTPException(
status_code=400,
detail="The text appears to be gibberish or random characters. Please provide a real article or claim.",
)
# Vowel ratio check Ò€” real text in most languages has ~30-50% vowels
alpha_chars = [c.lower() for c in clean_input if c.isalpha()]
if len(alpha_chars) > 20:
vowel_count = sum(1 for c in alpha_chars if c in "aeiouAEIOU")
vowel_ratio = vowel_count / len(alpha_chars)
if vowel_ratio < 0.10: # Less than 10% vowels = likely gibberish
raise HTTPException(
status_code=400,
detail="The text appears to be random characters. Please provide a real article or claim to verify.",
)
try:
result = await run_in_threadpool(fc.check, payload.text, headline=payload.headline)
except Exception as e:
import traceback
error_info = traceback.format_exc()
logger.error(f"Error in fc.check: {error_info}")
raise HTTPException(
status_code=500,
detail=f"Internal Server Error during AI analysis: {error_info}"
)
# Save to history if user is logged in
if current_user is not None:
try:
user_id = int(current_user["sub"])
verdict = result.get("final_verdict", "UNKNOWN")
save_check_history(user_id, payload.text, verdict, json.dumps(result))
except Exception as exc:
logger.warning("Failed to save check history: %s", exc)
return result
# Ò”€Ò”€ Fact-check URL endpoint Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€
@app.post("/api/check-url")
async def check_url(
payload: CheckURLRequest,
current_user: Optional[dict] = Depends(get_current_user),
) -> Dict[str, Any]:
"""
Extract article text from a URL, then run the full fact-check pipeline.
Uses trafilatura to scrape the main content from the given URL.
If the user is authenticated, the result is saved to their history.
"""
import trafilatura
import re
url = payload.url.strip()
if not url:
raise HTTPException(status_code=400, detail="URL is required.")
# Ò”€Ò”€ Detect social media URLs that block scraping Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€
social_media_patterns = {
r"(facebook\.com|fb\.com|fb\.watch)": "Facebook",
r"instagram\.com": "Instagram",
r"(tiktok\.com|vm\.tiktok\.com)": "TikTok",
r"(twitter\.com|x\.com)": "X (Twitter)",
r"threads\.net": "Threads",
}
url_lower = url.lower()
for pattern, platform in social_media_patterns.items():
if re.search(pattern, url_lower):
raise HTTPException(
status_code=422,
detail=(
f"{platform} posts cannot be automatically extracted because "
f"{platform} blocks external access to its content. "
f"Instead, please copy the text from the {platform} post "
f"and use the 'Proofread Article' feature to fact-check it."
),
)
# Ò”€Ò”€ Fetch and extract article content Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€
try:
downloaded = trafilatura.fetch_url(url)
if downloaded is None:
raise HTTPException(
status_code=422,
detail="Could not fetch the URL. The site may be blocking automated access. Please check the link or try copying the article text into the Proofread feature.",
)
text = trafilatura.extract(
downloaded,
include_comments=False,
include_tables=False,
no_fallback=False,
)
if not text or len(text.strip()) < 50:
raise HTTPException(
status_code=422,
detail="Could not extract enough article text from this URL. The page may be paywalled, login-protected, or not a news article. Try copying the text into the Proofread feature instead.",
)
# Try to get the title
metadata = trafilatura.extract(
downloaded,
output_format="json",
include_comments=False,
include_tables=False,
)
title = None
if metadata:
import json as json_mod
try:
meta_dict = json_mod.loads(metadata)
title = meta_dict.get("title", None)
except Exception:
pass
except HTTPException:
raise
except Exception as exc:
logger.error("URL extraction failed for %s: %s", url, exc)
raise HTTPException(
status_code=422,
detail=f"Failed to extract article content: {exc}",
)
# Run fact-check on extracted text
try:
fc = get_fact_checker()
except Exception as exc:
raise HTTPException(
status_code=503,
detail=f"FactChecker is not available: {exc}",
)
result = await run_in_threadpool(fc.check, text, headline=title)
# Add source URL and title to result
result["source_url"] = url
result["extracted_title"] = title
# Save to history if user is logged in
if current_user is not None:
try:
user_id = int(current_user["sub"])
verdict = result.get("final_verdict", "UNKNOWN")
history_text = f"[URL] {url}\n\n{text[:500]}"
save_check_history(user_id, history_text, verdict, json.dumps(result))
except Exception as exc:
logger.warning("Failed to save check history: %s", exc)
return result
# Ò”€Ò”€ Lightweight structure analysis (for live writing guide) Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€
class StructureAnalysisRequest(BaseModel):
text: str
headline: Optional[str] = None
@app.post("/api/analyze-structure")
async def analyze_structure_endpoint(
payload: StructureAnalysisRequest,
) -> Dict[str, Any]:
"""
Lightweight structure analysis Ò€” regex only, no ML, no DB.
Designed for real-time calls while the user is typing.
Returns 5W+1H coverage, attribution, transitions, formalism score, and recommendations.
"""
raw_text = payload.text.strip()
if len(raw_text) < 20:
return {
"formalism_score": 0,
"lead_5w1h": {},
"has_attribution": False,
"attribution_count": 0,
"has_transitions": False,
"transition_count": 0,
"has_formal_markers": False,
"assessment": "",
"signals_found": [],
"recommendations": [],
}
# Only analyze the body text β€” headline is excluded to avoid
# inflating 5W+1H scores (headlines naturally contain WHO/WHAT)
from checker.internal.structure_analyzer import analyze_structure
result = analyze_structure(raw_text)
return result
# Ò”€Ò”€ Translation endpoint Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€Ò”€
class TranslateRequest(BaseModel):
text: str
target_lang: str # 'en', 'fil', 'ceb'
@app.post("/api/translate")
async def translate_text(
payload: TranslateRequest,
current_user: Optional[dict] = Depends(get_current_user),
) -> Dict[str, Any]:
"""
Translate article text to the specified target language.
Supported target languages: 'en' (English), 'fil' (Filipino), 'ceb' (Cebuano).
Uses Google Translate via deep-translator.
"""
text = payload.text.strip()
target = payload.target_lang.strip().lower()
if not text:
raise HTTPException(status_code=400, detail="Text is required.")
# Map our app's language codes to Google Translate codes
lang_map = {
"en": "en",
"fil": "tl", # Google Translate uses 'tl' for Filipino/Tagalog
"ceb": "ceb", # Google Translate supports Cebuano
}
target_code = lang_map.get(target)
if target_code is None:
raise HTTPException(
status_code=400,
detail=f"Unsupported target language '{target}'. Use 'en', 'fil', or 'ceb'.",
)
try:
from deep_translator import GoogleTranslator
# Split text into chunks if > 5000 chars (Google Translate limit)
max_chunk = 4500
if len(text) <= max_chunk:
translated = GoogleTranslator(source="auto", target=target_code).translate(text)
else:
# Split by paragraphs and translate chunk by chunk
paragraphs = text.split("\n")
chunks = []
current_chunk = ""
for para in paragraphs:
if len(current_chunk) + len(para) + 1 > max_chunk:
if current_chunk:
chunks.append(current_chunk)
current_chunk = para
else:
current_chunk = current_chunk + "\n" + para if current_chunk else para
if current_chunk:
chunks.append(current_chunk)
translator = GoogleTranslator(source="auto", target=target_code)
translated_chunks = [translator.translate(chunk) for chunk in chunks]
translated = "\n".join(translated_chunks)
# Detect source language
detected_source = "unknown"
try:
from langdetect import detect, DetectorFactory
DetectorFactory.seed = 0
detected_source = detect(text)
except Exception:
pass
lang_names = {"en": "English", "fil": "Filipino", "ceb": "Cebuano"}
return {
"translated_text": translated,
"target_language": target,
"target_language_name": lang_names.get(target, target),
"detected_source_language": detected_source,
}
except ImportError:
raise HTTPException(
status_code=503,
detail="Translation service not available. deep-translator package is not installed.",
)
except Exception as exc:
logger.error("Translation failed: %s", exc, exc_info=True)
raise HTTPException(
status_code=500,
detail=f"Translation failed: {exc}",
)
# ── Manual scrape trigger ────────────────────────────────────────────
@app.post("/api/scrape-news")
async def scrape_news() -> Dict[str, Any]:
"""
Manually trigger a news scrape to refresh the external articles database.
Runs in the background and returns immediately.
"""
from checker.external.core import trigger_scrape_once
trigger_scrape_once()
total = get_article_count()
return {
"status": "scrape_started",
"message": "Background news scraping triggered.",
"current_article_count": total,
}
# ── Article count endpoint ───────────────────────────────────────────
@app.get("/api/article-count")
async def article_count() -> Dict[str, Any]:
"""Return the total number of scraped articles in the external database."""
count = get_article_count()
return {"article_count": count}
# ── Contact Us endpoint ────────────────────────────────────────────
@app.post("/api/contact")
async def contact_us(
payload: ContactRequest,
current_user: Optional[dict] = Depends(get_current_user),
) -> Dict[str, Any]:
"""
Send a contact message to the BantayPahayag team via email.
The message is forwarded to bantaypahayag@gmail.com.
"""
subject = payload.subject.strip()
message = payload.message.strip()
sender_name = (payload.sender_name or "").strip()
if not subject or len(subject) < 3:
raise HTTPException(status_code=400, detail="Please provide a subject.")
if not message or len(message) < 10:
raise HTTPException(status_code=400, detail="Please write a message (at least 10 characters).")
# Identify the sender
user_email = "anonymous"
user_name = sender_name or "Anonymous User"
if current_user is not None:
try:
user = get_user_by_id(int(current_user["sub"]))
if user:
user_email = user.get("email", "unknown")
user_name = sender_name or user.get("username", "User")
except Exception:
pass
# Send via Resend API
import requests
CONTACT_EMAIL = "bantaypahayag@gmail.com"
resend_api_key = os.environ.get("RESEND_API_KEY", "")
if not resend_api_key:
logger.warning("Resend API key not configured β€” contact message from %s lost", user_email)
raise HTTPException(status_code=503, detail="Email service is not configured.")
try:
text_body = (
f"Contact Form Submission\n"
f"{'=' * 40}\n\n"
f"From: {user_name}\n"
f"Email: {user_email}\n"
f"Subject: {subject}\n\n"
f"Message:\n{message}\n"
)
html_body = f"""
<div style="font-family: 'Segoe UI', Arial, sans-serif; max-width: 520px; margin: 0 auto;
padding: 28px 24px; background: #ffffff; border-radius: 12px;
border: 1px solid #e2e8f0;">
<div style="text-align: center; margin-bottom: 20px;">
<div style="display: inline-block; background: linear-gradient(135deg, #1E3A8A, #3B82F6);
color: white; font-size: 18px; font-weight: 800; padding: 10px 20px;
border-radius: 10px;">BantayPahayag</div>
</div>
<h2 style="color: #0F172A; font-size: 18px; margin: 0 0 16px 0;">New Contact Message</h2>
<table style="width: 100%; border-collapse: collapse; margin-bottom: 16px;">
<tr><td style="color: #64748B; padding: 6px 0; font-size: 13px; width: 70px;">From:</td>
<td style="color: #0F172A; padding: 6px 0; font-size: 13px; font-weight: 600;">{user_name}</td></tr>
<tr><td style="color: #64748B; padding: 6px 0; font-size: 13px;">Email:</td>
<td style="color: #0F172A; padding: 6px 0; font-size: 13px;">{user_email}</td></tr>
<tr><td style="color: #64748B; padding: 6px 0; font-size: 13px;">Subject:</td>
<td style="color: #0F172A; padding: 6px 0; font-size: 13px; font-weight: 600;">{subject}</td></tr>
</table>
<div style="background: #F8FAFC; border: 1px solid #E2E8F0; border-radius: 8px;
padding: 16px; margin-bottom: 16px;">
<p style="color: #334155; font-size: 13px; line-height: 1.7; margin: 0;
white-space: pre-wrap;">{message}</p>
</div>
<hr style="border: none; border-top: 1px solid #e2e8f0; margin: 16px 0;" />
<p style="color: #CBD5E1; font-size: 11px; text-align: center; margin: 0;">Sent via BantayPahayag Contact Form (Resend API)</p>
</div>
"""
payload = {
"from": "BantayPahayag <onboarding@resend.dev>",
"to": [CONTACT_EMAIL],
"subject": f"[BantayPahayag Contact] {subject}",
"html": html_body,
"text": text_body
}
if user_email and user_email != "anonymous":
payload["reply_to"] = user_email
headers = {
"Authorization": f"Bearer {resend_api_key}",
"Content-Type": "application/json"
}
resp = requests.post("https://api.resend.com/emails", json=payload, headers=headers, timeout=10)
if resp.status_code >= 400:
logger.error("Resend API error: %s", resp.text)
raise HTTPException(status_code=500, detail="Failed to send email. Please try again later.")
logger.info("Contact email sent from %s (%s): %s", user_name, user_email, subject)
except HTTPException:
raise
except Exception as exc:
logger.error("Failed to send contact email: %s", exc)
raise HTTPException(status_code=500, detail="Failed to send email. Please try again later.")
return {"success": True, "message": "Your message has been sent. We'll get back to you soon!"}
if __name__ == "__main__":
port = int(os.environ.get("PORT", 10000))
logger.info("Starting uvicorn on 0.0.0.0:%s Ò€¦", port)
uvicorn.run("api.main:app", host="0.0.0.0", port=port)