import os
import re
import math
import html
import string as _str_module
import uuid
import hashlib
import asyncio
import logging
import time
from datetime import datetime, timezone
# FIX: load .env file automatically if present
try:
from dotenv import load_dotenv
load_dotenv()
except ImportError:
pass # python-dotenv not installed — env vars must be set externally
import httpx
from fastapi import (
FastAPI, Depends, HTTPException, Header,
Request, BackgroundTasks, WebSocket, WebSocketDisconnect
)
from fastapi.responses import FileResponse, JSONResponse
from fastapi.middleware.cors import CORSMiddleware
# FIX: use field_validator for pydantic v2 compat; removed deprecated @validator
try:
from pydantic import BaseModel, field_validator as _fv
_USE_FIELD_VALIDATOR = True
except ImportError:
from pydantic import BaseModel, validator as _fv
_USE_FIELD_VALIDATOR = False
from passlib.context import CryptContext
from github import Github
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer
from reportlab.lib.styles import getSampleStyleSheet
from reportlab.lib.pagesizes import letter
from auth import create_access_token, verify_token
from tasks import run_scan
import db as DB
# ── Inline replacements for removed modules ────────────────────────────────
# config.py → DEV_MODE removed; all features available to authenticated users
# access.py → enforce_feature / has_feature / get_ai_depth / within_limit removed
# plans.py → get_plan_limits removed; limits are now hard-coded here
_PLAN_LIMITS = {
"free": {
"daily_scans": 5,
"daily_repos": 2,
"history_limit": 10,
"ai_depth": "basic",
"repo_scan": True,
"pdf_download": False,
"json_export": False,
"advanced_ai": False,
"scheduled_scans": False,
"api_access": False,
},
"pro_trial": {
"daily_scans": -1, # unlimited — -1 means no limit
"daily_repos": -1,
"history_limit": 500,
"ai_depth": "full",
"repo_scan": True,
"pdf_download": True,
"json_export": True,
"advanced_ai": True,
"scheduled_scans": True,
"api_access": True,
},
"pro": {
"daily_scans": -1,
"daily_repos": -1,
"history_limit": 500,
"ai_depth": "full",
"repo_scan": True,
"pdf_download": True,
"json_export": True,
"advanced_ai": True,
"scheduled_scans": True,
"api_access": True,
},
"enterprise": {
"daily_scans": -1,
"daily_repos": -1,
"history_limit": 9999,
"ai_depth": "full",
"repo_scan": True,
"pdf_download": True,
"json_export": True,
"advanced_ai": True,
"scheduled_scans": True,
"api_access": True,
},
}
def get_plan_limits(plan: str) -> dict:
"""Return limits dict for a plan name. Falls back to free limits."""
return _PLAN_LIMITS.get((plan or "free").lower(), _PLAN_LIMITS["free"]).copy()
def is_pro_or_trial(user: dict) -> bool:
"""Return True if the user has active Pro access (paid or trial)."""
plan = (user.get("plan") or "free").lower()
return plan in ("pro", "pro_trial", "enterprise") and user.get("is_pro", False)
def within_limit(user: dict, usage_count: int) -> bool:
"""
Return True if the user has not exceeded their daily scan limit.
-1 = unlimited (pro/pro_trial/enterprise). Free = 5/day.
"""
plan = (user.get("plan") or "free").lower()
limit = get_plan_limits(plan)["daily_scans"]
if limit == -1:
return True
return usage_count <= limit
def get_ai_depth(user: dict) -> str:
"""Return the AI analysis depth for this user's plan."""
return get_plan_limits((user.get("plan") or "free"))["ai_depth"]
def enforce_feature(user: dict, feature: str) -> None:
"""
Raise 403 if the user's plan does not include the feature.
Free & trial users have access to repo_scan — they're limited by daily scan count.
"""
plan = (user.get("plan") or "free").lower()
# repo_scan is available to everyone — enforce via usage limits instead
if feature == "repo_scan":
return
# pro_trial has full pro access
if plan in ("pro", "pro_trial", "enterprise"):
return
limits = get_plan_limits(plan)
if not limits.get(feature, False):
raise HTTPException(
403,
detail={"success": False, "error": f"'{feature}' requires a Pro plan. Start your free 30-day trial or upgrade."}
)
def has_feature(user: dict, feature: str) -> bool:
"""Return True if the user's plan includes the feature."""
return bool(get_plan_limits(user.get("plan", "free")).get(feature, False))
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", datefmt="%Y-%m-%d %H:%M:%S")
logger = logging.getLogger("safeaiscan")
app = FastAPI(title="SafeAIScan Enterprise", version="2.1.0")
app.add_middleware(CORSMiddleware, allow_origins=["http://localhost:5500","http://localhost:3000","https://rathious-safeaiscan.hf.space"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"])
# ══════════════════════════════════════════════════════════════
# SECURITY: Rate limiting, security headers, XSS/injection guard
# Added without touching any existing route logic
# ══════════════════════════════════════════════════════════════
from collections import defaultdict
# ── Rate limit store (in-memory, per IP per bucket) ──────────
_rl_store: dict = defaultdict(lambda: {"count": 0, "window_start": 0.0})
_rl_lock = asyncio.Lock()
_RL_LIMITS = {
"auth": 5, # /auth/* — 5 attempts per 60s (brute-force protection)
"scan": 30, # /api/analyze, /api/scan-repo — 30 per 60s
"payment": 10, # /payment/* — 10 per 60s
"default": 60, # everything else
}
def _rl_bucket(path: str) -> str:
if path.startswith("/auth/"): return "auth"
if "/analyze" in path or "/scan" in path: return "scan"
if path.startswith("/payment/"): return "payment"
return "default"
def _get_real_ip(request: Request) -> str:
"""Respect Cloudflare / HF proxy headers for real IP."""
for h in ("cf-connecting-ip", "x-real-ip", "x-forwarded-for"):
v = request.headers.get(h, "")
if v:
return v.split(",")[0].strip()
return (request.client.host if request.client else "unknown")
# ── Injection / XSS pattern detector ─────────────────────────
_INJECT_RES = [
re.compile(r"