sool-max / main.py
samifalouti's picture
Update main.py
9757138 verified
import time
import asyncio
import os
import json
from datetime import datetime
from fastapi import FastAPI, HTTPException, Depends, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.security import APIKeyHeader, HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from supabase import create_client, Client
app = FastAPI()
# Environment Variables
SUPABASE_URL = os.getenv("SUPABASE_URL")
SUPABASE_SERVICE_ROLE_KEY = os.getenv("SUPABASE_SERVICE_ROLE_KEY")
SERVER_KEY = os.getenv("SERVER_KEY")
SERVER_ID = os.getenv("SERVER_ID")
TOKEN_KEY = os.getenv("TOKEN_KEY")
TOKEN_ID = os.getenv("TOKEN_ID")
CLOUD_KEY = os.getenv("CLOUD_KEY")
supabase: Client = create_client(SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY)
# Security: Support both X-API-Key and Bearer Token
api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False)
bearer_scheme = HTTPBearer(auto_error=False)
async def verify_token(
api_key: str = Depends(api_key_header),
bearer: HTTPAuthorizationCredentials = Depends(bearer_scheme)
):
"""
Unified authentication: checks X-API-Key or Authorization: Bearer
"""
token = None
# 1. Check for X-API-Key (Standard Web/Postman)
if api_key:
token = api_key
# 2. Check for Bearer Token (Cursor / Continue / OpenAI standard)
elif bearer:
token = bearer.credentials
if not token:
print("Auth Error: No token provided in headers")
raise HTTPException(status_code=401, detail="API Key missing. Use X-API-Key or Bearer token.")
try:
# Query Supabase for the token
result = supabase.table("api_tokens").select("user_id").eq("api", token).execute()
if not result.data or len(result.data) == 0:
print(f"Auth Error: Token {token[:5]}*** not found in database")
raise HTTPException(status_code=403, detail="Invalid API Token.")
return result.data[0]
except HTTPException:
raise
except Exception as e:
print(f"Database error during auth: {e}")
raise HTTPException(status_code=500, detail="Internal Server Error during authentication.")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
class ChatRequest(BaseModel):
message: str
chat_lock = asyncio.Lock()
def get_driver():
chrome_options = Options()
chrome_options.add_argument("--headless=new")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
chrome_options.add_argument("--blink-settings=imagesEnabled=false")
chrome_options.binary_location = "/usr/bin/chromium"
service = Service("/usr/bin/chromedriver")
return webdriver.Chrome(service=service, options=chrome_options)
# --- Original Endpoint ---
@app.post("/chat")
async def chat(request: ChatRequest, token_data: dict = Depends(verify_token)):
async with chat_lock:
driver = get_driver()
try:
driver.get(CLOUD_KEY)
driver.execute_script("""
window.localStorage.setItem(arguments[0], arguments[1]);
window.localStorage.setItem(arguments[2], arguments[3]);
""", SERVER_KEY, SERVER_ID, TOKEN_KEY, TOKEN_ID)
driver.refresh()
wait = WebDriverWait(driver, 20)
input_box = wait.until(EC.element_to_be_clickable((By.ID, "user-input")))
driver.execute_script("document.getElementById('selenium-hook').textContent = '';")
input_box.send_keys(request.message)
driver.execute_script("document.getElementById('send-btn').click();")
start_time = time.time()
while time.time() - start_time < 120:
try:
hook = driver.find_element(By.ID, "selenium-hook")
response = hook.get_attribute("textContent").strip()
if response:
try:
parsed = json.loads(response)
return {
"role": parsed.get("message", {}).get("role", "assistant"),
"content": parsed.get("message", {}).get("content"),
"finish_reason": parsed.get("finish_reason", "stop"),
"usage": parsed.get("usage")
}
except:
return {"role": "assistant", "content": response}
except:
pass
await asyncio.sleep(1)
raise HTTPException(status_code=504, detail="AI Timeout")
finally:
driver.quit()
# --- Cursor / Continue Compatibility Endpoint ---
@app.post("/v1/chat/completions")
async def cursor_handler(request: Request, token_data: dict = Depends(verify_token)):
try:
body = await request.json()
messages = body.get("messages", [])
# Extract the last message from the conversation history
last_user_message = ""
for m in reversed(messages):
if m.get("role") == "user":
last_user_message = m.get("content")
break
if not last_user_message:
return {"error": "No user message found"}
# Call the existing scraper logic
chat_req = ChatRequest(message=last_user_message)
response = await chat(chat_req, token_data)
content = response.get("content") or response.get("raw") or "No response from scraper."
# Return standard OpenAI JSON structure
return {
"id": f"chatcmpl-{int(time.time())}",
"object": "chat.completion",
"created": int(time.time()),
"model": body.get("model", "custom-scraper"),
"choices": [{
"index": 0,
"message": {
"role": "assistant",
"content": content
},
"finish_reason": "stop"
}],
"usage": response.get("usage") if isinstance(response.get("usage"), dict) else {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0}
}
except Exception as e:
print(f"Bridge Error: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))