zeta / src /api /routes.py
retcheto's picture
PDF lookup: auto-refresh every 5 min instead of cache-at-startup
25dca6a verified
"""
FastAPI routes for Zeta Researcher.
This module defines the API endpoints for chat, search, and document management.
"""
from pathlib import Path
import secrets
import shutil
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import StreamingResponse, HTMLResponse, Response, FileResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from pydantic import BaseModel, Field
from typing import Optional, List, Literal
import json
import uuid
import base64
from datetime import datetime, timezone
from src.rag.pipeline import RAGPipeline
from src.llm.factory import get_available_models
from src.retrieval.retriever import Retriever
from src.embedding.vector_store import VectorStore
from src.embedding.embedder import Embedder
from src.config.settings import list_embedding_models, get_settings, EMBEDDING_MODELS
# SearchMode is no longer used - we use search_sources list instead
from src.search.web.factory import get_available_web_providers
from src.search.scientific.factory import get_available_scientific_providers
from src.rag.conversation import ConversationManager
from src.utils.logging import get_logger
logger = get_logger(__name__)
# Get paths
BASE_DIR = Path(__file__).resolve().parent.parent.parent
WEB_DIR = BASE_DIR / "web"
PDF_DIR = BASE_DIR / "data" / "pdfs"
# Get shares directory from settings (uses /tmp/shares on HF Spaces)
_settings = get_settings()
if _settings.hf_space:
SHARES_DIR = Path(_settings.shares_dir)
else:
SHARES_DIR = BASE_DIR / "data" / "shares"
# Ensure shares directory exists
try:
SHARES_DIR.mkdir(parents=True, exist_ok=True)
except PermissionError:
# On read-only filesystems, use /tmp fallback
SHARES_DIR = Path("/tmp/shares")
SHARES_DIR.mkdir(parents=True, exist_ok=True)
# Saved chats directory (local version only)
SAVED_DIR = Path(_settings.saved_dir) if hasattr(_settings, 'saved_dir') else BASE_DIR / "data" / "saved"
if not _settings.hf_space:
SAVED_DIR.mkdir(parents=True, exist_ok=True)
# Topics index file
TOPICS_FILE = SAVED_DIR / "_topics.json"
def load_topics() -> list:
"""Load all topics from the topics index file."""
if TOPICS_FILE.exists():
try:
return json.loads(TOPICS_FILE.read_text()).get("topics", [])
except Exception:
return []
return []
def save_topics(topics: list) -> None:
"""Persist topics list to the topics index file."""
TOPICS_FILE.write_text(json.dumps({"topics": topics}, indent=2))
# Publishing directories (local version only)
SITE_DIR = BASE_DIR / "data" / "site"
RESEARCH_DIR = SITE_DIR / "research"
if not _settings.hf_space:
RESEARCH_DIR.mkdir(parents=True, exist_ok=True)
app = FastAPI(
title="Zeta Researcher",
description="Local PDF Research Assistant with LLM Integration",
version="0.2.0",
)
@app.middleware("http")
async def auth_middleware(request: Request, call_next):
"""Middleware to enforce HTTP Basic Auth on all routes except public paths."""
# Skip auth for health check, shared links, PDFs (for share source links), and static assets
public_prefixes = ("/health", "/share/", "/pdfs/", "/static/css/", "/static/js/")
if any(request.url.path.startswith(p) for p in public_prefixes) or request.url.path == "/health":
response = await call_next(request)
# Add CORS headers for PDFs so PDF.js viewer can fetch them
if request.url.path.startswith("/pdfs/"):
response.headers["Access-Control-Allow-Origin"] = "*"
return response
# Skip auth if no password is configured
if not _settings.site_password:
return await call_next(request)
# Check for Authorization header
auth_header = request.headers.get("Authorization")
if auth_header and auth_header.startswith("Basic "):
try:
credentials_b64 = auth_header[6:]
credentials_decoded = base64.b64decode(credentials_b64).decode("utf8")
username, password = credentials_decoded.split(":", 1)
if secrets.compare_digest(username, _settings.site_username) and \
secrets.compare_digest(password, _settings.site_password):
return await call_next(request)
except Exception:
pass
# Return 401 with WWW-Authenticate header
return Response(
content="Authentication required",
status_code=401,
headers={"WWW-Authenticate": "Basic realm='Zeta Researcher'"}
)
# Mount static files
app.mount("/static", StaticFiles(directory=WEB_DIR / "static"), name="static")
# PDF serving is handled by the /pdfs/{filename:path} route below
# to support files in subdirectories (e.g. data/pdfs/zeta/)
# Setup templates
templates = Jinja2Templates(directory=WEB_DIR / "templates")
# Initialize components (lazy loading)
_rag_pipeline = None
_retriever = None
_vector_store = None
_conversation_manager = None
def get_rag_pipeline() -> RAGPipeline:
"""Get or create RAG pipeline instance."""
global _rag_pipeline
if _rag_pipeline is None:
_rag_pipeline = RAGPipeline()
return _rag_pipeline
def get_retriever() -> Retriever:
"""Get or create retriever instance."""
global _retriever
if _retriever is None:
_retriever = Retriever()
return _retriever
def get_vector_store() -> VectorStore:
"""Get or create vector store instance."""
global _vector_store
if _vector_store is None:
_vector_store = VectorStore()
return _vector_store
def get_conversation_manager() -> ConversationManager:
"""Get or create conversation manager instance."""
global _conversation_manager
if _conversation_manager is None:
_conversation_manager = ConversationManager()
return _conversation_manager
# Request/Response Models
class ChatRequest(BaseModel):
"""Chat request model."""
query: str = Field(..., description="User question", min_length=1)
top_k: Optional[int] = Field(None, ge=1, le=100, description="Number of chunks to retrieve")
use_diversity: bool = Field(True, description="Apply diversity filtering")
stream: bool = Field(False, description="Enable streaming response")
model: Optional[Literal["claude-sonnet", "claude-opus", "grok", "claude"]] = Field(
None, description="LLM model to use (claude-sonnet, claude-opus, or grok)"
)
filter_documents: Optional[List[str]] = Field(None, description="List of filenames to limit search to")
search_sources: Optional[List[Literal["local", "web", "scientific"]]] = Field(
None, description="List of sources to search: local (PDFs), web, scientific. Multiple can be selected."
)
conversation_id: Optional[str] = Field(None, description="Conversation ID for multi-turn context")
use_bm25: bool = Field(True, description="Enable BM25 hybrid search")
use_reranking: bool = Field(True, description="Enable Cohere reranking")
research_depth: Optional[Literal["concise", "standard", "detailed", "super_detailed"]] = Field(
"standard", description="Research depth level: concise (brief), standard (balanced), detailed (comprehensive), super_detailed (exhaustive)"
)
enable_grok_search: Optional[bool] = Field(None, description="Enable Grok's real-time X.com and web search (Grok only)")
enable_extended_thinking: Optional[bool] = Field(None, description="Enable Claude extended thinking for deep chain-of-thought reasoning (Claude only)")
class ChatResponse(BaseModel):
"""Chat response model."""
answer: str
sources: List[dict]
query: str
num_chunks_retrieved: int
context_tokens: int
response_tokens: int
duration_ms: float
model: str
search_mode: str = "local"
class SearchRequest(BaseModel):
"""Search request model."""
query: str = Field(..., description="Search query", min_length=1)
top_k: Optional[int] = Field(10, ge=1, le=100, description="Number of results")
filter_filename: Optional[str] = Field(None, description="Filter by filename")
class SearchResult(BaseModel):
"""Single search result."""
chunk_id: str
text: str
filename: str
score: float
token_count: int
class SearchResponse(BaseModel):
"""Search response model."""
results: List[SearchResult]
query: str
total_results: int
class ModelInfo(BaseModel):
"""Model information."""
id: str
name: str
model: str
available: bool
class StatsResponse(BaseModel):
"""System statistics response."""
total_chunks: int
collection_name: str
embedding_model: str
llm_model: str
available_models: List[ModelInfo]
default_llm: str
# PDF lookup cache with TTL — rebuilt automatically when stale
import time as _time
_pdf_lookup: dict = {}
_pdf_lookup_built_at: float = 0.0
_PDF_LOOKUP_TTL: float = 300.0 # Rebuild every 5 minutes
def _get_pdf_lookup() -> dict:
"""Return cached PDF lookup, rebuilding if older than TTL."""
global _pdf_lookup, _pdf_lookup_built_at
if _time.monotonic() - _pdf_lookup_built_at > _PDF_LOOKUP_TTL:
lookup = {}
if PDF_DIR.exists():
for pdf_file in PDF_DIR.rglob("*.pdf"):
rel = pdf_file.relative_to(PDF_DIR)
if pdf_file.name not in lookup:
lookup[pdf_file.name] = str(rel)
_pdf_lookup = lookup
_pdf_lookup_built_at = _time.monotonic()
return _pdf_lookup
@app.get("/pdfs/{filename:path}")
async def serve_pdf(filename: str):
"""Serve PDF files, searching subdirectories if needed."""
# Try direct path first (handles both root and subfolder paths like zeta/file.pdf)
pdf_path = (PDF_DIR / filename).resolve()
# Security: ensure resolved path stays within PDF_DIR
if not str(pdf_path).startswith(str(PDF_DIR.resolve())):
raise HTTPException(status_code=403, detail="Access denied")
if pdf_path.is_file() and pdf_path.suffix.lower() == ".pdf":
return FileResponse(pdf_path, media_type="application/pdf",
headers={"Access-Control-Allow-Origin": "*"})
# If filename has no subdirectory, search the lookup table (auto-refreshed every 5 min)
if "/" not in filename:
lookup = _get_pdf_lookup()
if filename in lookup:
pdf_path = (PDF_DIR / lookup[filename]).resolve()
if pdf_path.is_file():
return FileResponse(pdf_path, media_type="application/pdf",
headers={"Access-Control-Allow-Origin": "*"})
raise HTTPException(status_code=404, detail="PDF not found")
# Routes
@app.get("/", response_class=HTMLResponse)
async def root(request: Request):
"""Serve the chat UI."""
return templates.TemplateResponse("index.html", {
"request": request,
"site_title": _settings.site_title
})
@app.get("/api")
async def api_info():
"""API info endpoint."""
return {
"message": "Zeta Researcher API",
"version": "0.2.0",
"status": "RAG pipeline ready",
"endpoints": {
"chat": "/api/chat",
"search": "/api/search",
"stats": "/api/stats",
"health": "/health",
},
}
@app.get("/health")
async def health():
"""Health check endpoint."""
return {"status": "healthy"}
@app.post("/api/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
"""
Send a question and get an answer based on the document collection.
The RAG pipeline will:
1. Retrieve relevant document chunks
2. Build context from the chunks
3. Generate an answer using Claude
Returns the answer with source citations.
"""
try:
pipeline = get_rag_pipeline()
if request.stream:
# Return streaming response with proper SSE headers
return StreamingResponse(
stream_response(pipeline, request),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
},
)
# Handle conversation context
conversation_manager = get_conversation_manager()
conversation = None
conversation_id = request.conversation_id
# Create or retrieve conversation
if conversation_id:
conversation = conversation_manager.get_conversation(conversation_id)
if not conversation:
logger.warning(f"Conversation {conversation_id} not found, creating new one")
conversation = conversation_manager.create_conversation()
conversation_id = conversation.id
else:
# Create new conversation for this query
conversation = conversation_manager.create_conversation()
conversation_id = conversation.id
# Debug logging for Grok search
logger.debug(f"Received enable_grok_search from request: {request.enable_grok_search}")
logger.debug(f"Selected model: {request.model}")
# Non-streaming response
response = await pipeline.query_async(
query=request.query,
top_k=request.top_k,
use_diversity=request.use_diversity,
llm=request.model,
filter_filenames=request.filter_documents,
search_sources=request.search_sources,
use_bm25=request.use_bm25,
use_reranking=request.use_reranking,
research_depth=request.research_depth,
enable_grok_search=request.enable_grok_search,
enable_extended_thinking=request.enable_extended_thinking,
)
# Add turn to conversation
conversation_manager.add_turn(
conversation_id=conversation_id,
query=request.query,
answer=response.answer,
sources=response.sources,
num_chunks=response.num_chunks_retrieved,
context_tokens=response.context_tokens,
response_tokens=response.response_tokens,
model=response.model,
)
return ChatResponse(
answer=response.answer,
sources=response.sources,
query=response.query,
num_chunks_retrieved=response.num_chunks_retrieved,
context_tokens=response.context_tokens,
response_tokens=response.response_tokens,
duration_ms=response.total_duration_ms,
model=response.model,
search_mode=response.search_mode,
)
except Exception as e:
logger.error(f"Chat error: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
async def stream_response(pipeline: RAGPipeline, request: ChatRequest):
"""Generate Server-Sent Events for streaming response."""
try:
# Debug logging for Grok search (streaming mode)
logger.debug(f"Stream: Received enable_grok_search from request: {request.enable_grok_search}")
logger.debug(f"Stream: Selected model: {request.model}")
async for chunk in pipeline.query_stream(
query=request.query,
top_k=request.top_k,
use_diversity=request.use_diversity,
llm=request.model,
filter_filenames=request.filter_documents,
search_sources=request.search_sources,
use_bm25=request.use_bm25,
use_reranking=request.use_reranking,
research_depth=request.research_depth,
enable_grok_search=request.enable_grok_search,
enable_extended_thinking=request.enable_extended_thinking,
):
yield f"data: {json.dumps(chunk)}\n\n"
except Exception as e:
logger.error(f"Streaming error: {e}")
yield f"data: {json.dumps({'type': 'error', 'content': str(e)})}\n\n"
@app.post("/api/search", response_model=SearchResponse)
async def search(request: SearchRequest):
"""
Search the document collection without generating an LLM response.
Returns matching document chunks ranked by relevance.
"""
try:
retriever = get_retriever()
chunks = retriever.retrieve(
query=request.query,
top_k=request.top_k,
filter_filename=request.filter_filename,
)
results = [
SearchResult(
chunk_id=c.chunk_id,
text=c.text[:500] + "..." if len(c.text) > 500 else c.text,
filename=c.filename,
score=round(c.score, 4),
token_count=c.token_count,
)
for c in chunks
]
return SearchResponse(
results=results,
query=request.query,
total_results=len(results),
)
except Exception as e:
logger.error(f"Search error: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/stats", response_model=StatsResponse)
async def stats():
"""Get system statistics."""
try:
settings = get_settings()
pipeline = get_rag_pipeline()
pipeline_stats = pipeline.get_stats()
vector_stats = pipeline_stats.get("retriever", {}).get("vector_store", {})
available = pipeline_stats.get("available_models", [])
return StatsResponse(
total_chunks=vector_stats.get("total_chunks", 0),
collection_name=vector_stats.get("name", "unknown"),
embedding_model=pipeline_stats.get("retriever", {}).get("embedding_model", "unknown"),
llm_model=pipeline_stats.get("llm_model", "unknown"),
available_models=[ModelInfo(**m) for m in available],
default_llm=settings.default_llm,
)
except Exception as e:
logger.error(f"Stats error: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/models")
async def list_models():
"""List available LLM models."""
try:
models = get_available_models()
return {"models": models}
except Exception as e:
logger.error(f"Models error: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/documents")
async def list_documents():
"""List all indexed documents with chunk counts."""
try:
store = get_vector_store()
collection = store.get_collection()
# Get all metadata to count chunks per document
results = collection.get(include=["metadatas"])
# Count chunks per filename
doc_chunks = {}
for meta in results["metadatas"]:
filename = meta.get("filename", "unknown")
doc_chunks[filename] = doc_chunks.get(filename, 0) + 1
# Build document list sorted by name
documents = [
{"filename": name, "chunks": count}
for name, count in sorted(doc_chunks.items())
]
return {
"total_documents": len(documents),
"total_chunks": sum(d["chunks"] for d in documents),
"documents": documents,
}
except Exception as e:
logger.error(f"Documents error: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/embedding-models")
async def list_embedding_models_endpoint():
"""List all available embedding models with their collection statistics."""
try:
store = get_vector_store()
settings = get_settings()
current_model = settings.embedding_model
# Get all collections info
collections = store.list_all_collections()
# Build response with model info
models = []
for coll in collections:
model_id = coll["embedding_model"]
model_config = EMBEDDING_MODELS.get(model_id, {})
models.append({
"id": model_id,
"name": model_config.get("name", model_id.split("/")[-1]),
"description": model_config.get("description", ""),
"dimensions": model_config.get("dimensions", 0),
"total_chunks": coll["total_chunks"],
"collection_name": coll["collection_name"],
"is_active": model_id == current_model,
"has_data": coll["total_chunks"] > 0,
})
return {
"current_model": current_model,
"models": models,
}
except Exception as e:
logger.error(f"Embedding models error: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
class SwitchEmbeddingRequest(BaseModel):
"""Request to switch embedding model."""
model_id: str = Field(..., description="Embedding model ID to switch to")
@app.post("/api/embedding-models/switch")
async def switch_embedding_model(request: SwitchEmbeddingRequest):
"""
Switch to a different embedding model collection.
Note: This only switches the active collection for queries.
To create embeddings with a new model, use the re-embed script.
"""
global _rag_pipeline, _retriever, _vector_store
try:
# Validate model exists
if request.model_id not in EMBEDDING_MODELS:
raise HTTPException(
status_code=400,
detail=f"Unknown embedding model: {request.model_id}. "
f"Available: {list(EMBEDDING_MODELS.keys())}"
)
# Check if collection has data
temp_store = VectorStore(embedding_model=request.model_id)
stats = temp_store.get_collection_stats()
if stats.get("total_chunks", 0) == 0:
raise HTTPException(
status_code=400,
detail=f"No embeddings found for model {request.model_id}. "
f"Run the re-embed script first: python scripts/reembed.py --model {request.model_id}"
)
# Reset all cached instances to use new model
_rag_pipeline = None
_retriever = None
_vector_store = VectorStore(embedding_model=request.model_id)
# Update the settings (runtime only - doesn't persist to .env)
settings = get_settings()
settings.embedding_model = request.model_id
return {
"success": True,
"message": f"Switched to embedding model: {request.model_id}",
"collection_name": stats.get("name"),
"total_chunks": stats.get("total_chunks"),
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Switch embedding model error: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/search-providers")
async def list_search_providers():
"""List available search providers and their status."""
try:
settings = get_settings()
return {
"web": get_available_web_providers(),
"scientific": get_available_scientific_providers(),
"default_mode": settings.default_search_mode,
"search_modes": [
{"id": "local", "name": "Local PDFs", "description": "Search only your uploaded documents"},
{"id": "web", "name": "Web Search", "description": "Search the web using DuckDuckGo or Tavily"},
{"id": "scientific", "name": "Scientific Papers", "description": "Search arXiv, Semantic Scholar, and PubMed"},
{"id": "hybrid", "name": "Hybrid", "description": "Search all sources and combine results"},
],
}
except Exception as e:
logger.error(f"Search providers error: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
# =============================================================================
# Chat Sharing Endpoints
# =============================================================================
class ChatMessage(BaseModel):
"""Single chat message."""
role: Literal["user", "assistant"]
content: str
sources: Optional[List[dict]] = None
metadata: Optional[dict] = None
class ShareChatRequest(BaseModel):
"""Request to share a chat."""
messages: List[ChatMessage]
title: Optional[str] = None
class ShareChatResponse(BaseModel):
"""Response with share URL."""
share_id: str
share_url: str
created_at: str
class SaveChatRequest(BaseModel):
"""Request to save a chat locally."""
messages: List[ChatMessage]
title: str
tags: Optional[List[str]] = []
class SaveChatResponse(BaseModel):
"""Response after saving a chat."""
save_id: str
created_at: str
class TopicModel(BaseModel):
"""A topic/folder for organizing saved chats."""
id: str
name: str
color: str
class SavedChatMetadata(BaseModel):
"""Metadata for a saved chat (for list view)."""
id: str
title: str
message_count: int
created_at: str
updated_at: str
tags: List[str]
is_favorite: bool
first_user_message: str
topic_ids: List[str] = []
class UpdateChatRequest(BaseModel):
"""Request to update saved chat metadata."""
title: Optional[str] = None
tags: Optional[List[str]] = None
is_favorite: Optional[bool] = None
topic_ids: Optional[List[str]] = None
@app.post("/api/share", response_model=ShareChatResponse)
async def share_chat(request: ShareChatRequest, req: Request):
"""
Save a chat conversation and return a shareable link.
The chat is stored server-side and can be accessed via the share URL.
"""
try:
# Generate unique share ID
share_id = str(uuid.uuid4())[:8]
# Build share data
share_data = {
"id": share_id,
"title": request.title or "Shared Chat",
"messages": [msg.model_dump() for msg in request.messages],
"created_at": datetime.utcnow().isoformat(),
"message_count": len(request.messages),
}
# Save to file
share_file = SHARES_DIR / f"{share_id}.json"
with open(share_file, "w") as f:
json.dump(share_data, f, indent=2)
# Build share URL (use HTTPS when behind a reverse proxy like HF Spaces)
base_url = str(req.base_url).rstrip("/")
if req.headers.get("x-forwarded-proto") == "https":
base_url = base_url.replace("http://", "https://")
share_url = f"{base_url}/share/{share_id}"
logger.info(f"Created shared chat: {share_id} with {len(request.messages)} messages")
return ShareChatResponse(
share_id=share_id,
share_url=share_url,
created_at=share_data["created_at"],
)
except Exception as e:
logger.error(f"Share chat error: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/share/{share_id}")
async def get_shared_chat(share_id: str):
"""
Retrieve a shared chat by ID.
Returns the chat messages and metadata.
"""
try:
share_file = SHARES_DIR / f"{share_id}.json"
if not share_file.exists():
raise HTTPException(status_code=404, detail="Shared chat not found")
with open(share_file, "r") as f:
share_data = json.load(f)
return share_data
except HTTPException:
raise
except Exception as e:
logger.error(f"Get shared chat error: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@app.get("/share/{share_id}", response_class=HTMLResponse)
async def view_shared_chat(share_id: str, request: Request):
"""
Render the shared chat view page.
"""
try:
share_file = SHARES_DIR / f"{share_id}.json"
if not share_file.exists():
raise HTTPException(status_code=404, detail="Shared chat not found")
with open(share_file, "r") as f:
share_data = json.load(f)
return templates.TemplateResponse(
"shared.html",
{
"request": request,
"share_id": share_id,
"share_data": share_data,
}
)
except HTTPException:
raise
except Exception as e:
logger.error(f"View shared chat error: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
# =============================================================================
# Topic Endpoints
# =============================================================================
class CreateTopicRequest(BaseModel):
"""Request to create a new topic/folder."""
name: str
color: Optional[str] = "#7c6af7"
@app.get("/api/topics", response_model=List[TopicModel])
async def list_topics():
"""List all topics."""
return load_topics()
@app.post("/api/topics", response_model=TopicModel)
async def create_topic(request: CreateTopicRequest):
"""Create a new topic/folder for organizing saved chats."""
topics = load_topics()
new_topic = {
"id": secrets.token_hex(4),
"name": request.name.strip(),
"color": request.color or "#7c6af7"
}
topics.append(new_topic)
save_topics(topics)
return new_topic
@app.delete("/api/topics/{topic_id}")
async def delete_topic(topic_id: str):
"""Delete a topic and unassign all conversations that belonged to it."""
topics = [t for t in load_topics() if t["id"] != topic_id]
save_topics(topics)
# Remove this topic from all saved conversations
if SAVED_DIR.exists():
for f in SAVED_DIR.glob("*.json"):
if f.name.startswith("_"):
continue
try:
data = json.loads(f.read_text())
ids = data.get("topic_ids", [data.get("topic_id")] if data.get("topic_id") else [])
if topic_id in ids:
data["topic_ids"] = [t for t in ids if t != topic_id]
data.pop("topic_id", None)
f.write_text(json.dumps(data, indent=2))
except Exception:
pass
return {"status": "deleted"}
# =============================================================================
# Saved Chats Endpoints (local version only)
# =============================================================================
@app.post("/api/saved", response_model=SaveChatResponse)
async def save_chat_local(request: SaveChatRequest):
"""
Save a chat conversation locally with title and tags.
Returns save_id for future reference.
"""
try:
# Generate unique ID (8 characters)
save_id = uuid.uuid4().hex[:8]
# Auto-generate title if not provided
title = request.title
if not title and request.messages:
# Use first user message as fallback
first_user_msg = next((msg for msg in request.messages if msg.role == "user"), None)
if first_user_msg:
title = first_user_msg.content[:50].strip()
if len(first_user_msg.content) > 50:
title += "..."
else:
title = f"Saved Conversation {datetime.now().strftime('%Y-%m-%d %H:%M')}"
# Create save data
created_at = datetime.now(timezone.utc).isoformat()
save_data = {
"id": save_id,
"title": title,
"messages": [msg.model_dump() for msg in request.messages],
"created_at": created_at,
"updated_at": created_at,
"message_count": len(request.messages),
"tags": request.tags or [],
"is_favorite": False
}
# Save to file
save_file = SAVED_DIR / f"{save_id}.json"
with open(save_file, "w") as f:
json.dump(save_data, f, indent=2)
logger.info(f"Saved chat {save_id}: {title}")
return SaveChatResponse(
save_id=save_id,
created_at=created_at
)
except Exception as e:
logger.error(f"Save chat error: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/saved", response_model=List[SavedChatMetadata])
async def list_saved_chats(
limit: int = 50,
offset: int = 0,
sort: str = "date_desc",
search: Optional[str] = None,
tags: Optional[str] = None
):
"""
List all saved chats with pagination and filtering.
Returns metadata list (not full messages).
Args:
limit: Maximum number of results to return
offset: Number of results to skip
sort: Sort order (date_desc, date_asc, title_asc, title_desc)
search: Search query for title and content
tags: Comma-separated tags to filter by
"""
try:
all_chats = []
# Load all saved chat files (exclude internal index files like _topics.json)
if SAVED_DIR.exists():
for save_file in SAVED_DIR.glob("*.json"):
if save_file.name.startswith("_"):
continue
try:
with open(save_file, "r") as f:
chat_data = json.load(f)
# Extract first user message for preview
first_user_msg = ""
for msg in chat_data.get("messages", []):
if msg.get("role") == "user":
# Strip HTML tags; store full text (list items truncate via CSS)
import re
first_user_msg = re.sub(r'<[^>]+>', '', msg.get("content", "")).strip()
break
# Create metadata
metadata = SavedChatMetadata(
id=chat_data.get("id", save_file.stem),
title=chat_data.get("title", "Untitled"),
message_count=chat_data.get("message_count", 0),
created_at=chat_data.get("created_at", ""),
updated_at=chat_data.get("updated_at", chat_data.get("created_at", "")),
tags=chat_data.get("tags", []),
is_favorite=chat_data.get("is_favorite", False),
first_user_message=first_user_msg,
topic_ids=chat_data.get("topic_ids") if chat_data.get("topic_ids") is not None
else ([chat_data["topic_id"]] if chat_data.get("topic_id") else [])
)
all_chats.append(metadata)
except Exception as e:
logger.warning(f"Error loading saved chat {save_file}: {e}")
continue
# Apply search filter
if search:
search_lower = search.lower()
all_chats = [
chat for chat in all_chats
if search_lower in chat.title.lower()
or search_lower in chat.first_user_message.lower()
or any(search_lower in tag.lower() for tag in chat.tags)
]
# Apply tags filter
if tags:
filter_tags = [t.strip().lower() for t in tags.split(",")]
all_chats = [
chat for chat in all_chats
if any(tag.lower() in filter_tags for tag in chat.tags)
]
# Sort
if sort == "date_desc":
all_chats.sort(key=lambda x: x.created_at, reverse=True)
elif sort == "date_asc":
all_chats.sort(key=lambda x: x.created_at)
elif sort == "title_asc":
all_chats.sort(key=lambda x: x.title.lower())
elif sort == "title_desc":
all_chats.sort(key=lambda x: x.title.lower(), reverse=True)
# Apply pagination
total = len(all_chats)
all_chats = all_chats[offset:offset + limit]
logger.info(f"Listed {len(all_chats)} saved chats (total: {total})")
return all_chats
except Exception as e:
logger.error(f"List saved chats error: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/saved/{save_id}")
async def get_saved_chat(save_id: str):
"""
Retrieve a specific saved chat by ID.
Returns full chat data with all messages.
"""
try:
save_file = SAVED_DIR / f"{save_id}.json"
if not save_file.exists():
raise HTTPException(status_code=404, detail="Saved chat not found")
with open(save_file, "r") as f:
chat_data = json.load(f)
return chat_data
except HTTPException:
raise
except Exception as e:
logger.error(f"Get saved chat error: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@app.put("/api/saved/{save_id}")
async def update_saved_chat(save_id: str, request: UpdateChatRequest):
"""
Update saved chat metadata (title, tags, favorite status).
Does not modify messages.
"""
try:
save_file = SAVED_DIR / f"{save_id}.json"
if not save_file.exists():
raise HTTPException(status_code=404, detail="Saved chat not found")
# Load existing data
with open(save_file, "r") as f:
chat_data = json.load(f)
# Update fields if provided
if request.title is not None:
chat_data["title"] = request.title
if request.tags is not None:
chat_data["tags"] = request.tags
if request.is_favorite is not None:
# Toggle favorite if current state is opposite
current_favorite = chat_data.get("is_favorite", False)
chat_data["is_favorite"] = not current_favorite
if request.topic_ids is not None:
chat_data["topic_ids"] = request.topic_ids
chat_data.pop("topic_id", None) # remove legacy field if present
# Update timestamp
chat_data["updated_at"] = datetime.now(timezone.utc).isoformat()
# Save back to file
with open(save_file, "w") as f:
json.dump(chat_data, f, indent=2)
logger.info(f"Updated saved chat {save_id}")
return {"message": "Chat updated successfully", "save_id": save_id}
except HTTPException:
raise
except Exception as e:
logger.error(f"Update saved chat error: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@app.delete("/api/saved/{save_id}")
async def delete_saved_chat(save_id: str):
"""
Delete a saved chat.
"""
try:
save_file = SAVED_DIR / f"{save_id}.json"
if not save_file.exists():
raise HTTPException(status_code=404, detail="Saved chat not found")
# Delete file
save_file.unlink()
logger.info(f"Deleted saved chat {save_id}")
return {"message": "Chat deleted successfully", "save_id": save_id}
except HTTPException:
raise
except Exception as e:
logger.error(f"Delete saved chat error: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@app.get("/saved/{save_id}", response_class=HTMLResponse)
async def view_saved_chat(save_id: str, request: Request):
"""
Render the saved chat view page.
"""
try:
save_file = SAVED_DIR / f"{save_id}.json"
if not save_file.exists():
raise HTTPException(status_code=404, detail="Saved chat not found")
with open(save_file, "r") as f:
saved_data = json.load(f)
return templates.TemplateResponse(
"saved.html",
{
"request": request,
"save_id": save_id,
"saved_data": saved_data,
}
)
except HTTPException:
raise
except Exception as e:
logger.error(f"View saved chat error: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
# =============================================================================
# Web Publishing Endpoints (local version only)
# =============================================================================
class PublishRequest(BaseModel):
"""Request to publish a research article to the web."""
messages: List[ChatMessage]
title: Optional[str] = None
description: Optional[str] = None
tags: Optional[List[str]] = []
topic: Optional[str] = None
images: Optional[List[str]] = []
exchange_count: Optional[int] = None # Original question count before editing
class PublishResponse(BaseModel):
"""Response with published article URL."""
slug: str
url: str
deploy_state: str
created_at: str
class ImageUploadRequest(BaseModel):
"""Request to upload an image for a research article."""
image_data: str # Base64-encoded image
filename: str
slug: Optional[str] = "draft" # Article slug (use 'draft' for pre-publish)
class ImageUploadResponse(BaseModel):
"""Response with uploaded image details."""
image_id: str
url: str # Relative URL: /images/research/{slug}/{filename}
filename: str
size: int
def _load_article_index() -> list:
"""Load the article index from JSON."""
index_file = RESEARCH_DIR / "index.json"
if index_file.exists():
with open(index_file, "r") as f:
return json.load(f)
return []
def _save_article_index(articles: list):
"""Save the article index to JSON."""
index_file = RESEARCH_DIR / "index.json"
with open(index_file, "w") as f:
json.dump(articles, f, indent=2)
@app.get("/api/publish/status")
async def publish_status():
"""Check if web publishing is configured and available."""
configured = bool(
_settings.netlify_auth_token
and _settings.netlify_site_id
and not _settings.hf_space
)
return {
"available": configured,
"site_url": _settings.publish_site_url if configured else None,
}
@app.get("/api/publish/articles")
async def list_published_articles():
"""List all published research articles."""
articles = _load_article_index()
return {"articles": sorted(articles, key=lambda a: a.get("created_at", ""), reverse=True)}
@app.delete("/api/publish/articles/{slug}")
async def unpublish_article(slug: str):
"""
Unpublish an article by removing it from site and redeploying.
Args:
slug: Article slug (URL-friendly identifier)
Returns:
Status and deployment result
"""
from src.publish.renderer import render_research_index
from src.publish.netlify import deploy_site
try:
# Check configuration
if not _settings.netlify_auth_token or not _settings.netlify_site_id:
raise HTTPException(
status_code=400,
detail="Publishing not configured. Add NETLIFY_AUTH_TOKEN and NETLIFY_SITE_ID to .env"
)
# Remove article HTML file
article_path = RESEARCH_DIR / f"{slug}.html"
if article_path.exists():
article_path.unlink()
logger.info(f"Removed article file: {article_path}")
else:
raise HTTPException(status_code=404, detail=f"Article '{slug}' not found")
# Remove article images directory
images_dir = SITE_DIR / "images" / "research" / slug
if images_dir.exists():
shutil.rmtree(images_dir)
logger.info(f"Removed images directory: {images_dir}")
# Update index.json
articles = _load_article_index()
articles = [a for a in articles if a.get("slug") != slug]
_save_article_index(articles)
logger.info(f"Updated article index, {len(articles)} articles remaining")
# Regenerate research index page
index_html = render_research_index(
articles=articles,
site_url=_settings.publish_site_url,
)
with open(RESEARCH_DIR / "index.html", "w") as f:
f.write(index_html)
# Deploy entire site to Netlify
deploy_result = deploy_site(
site_id=_settings.netlify_site_id,
auth_token=_settings.netlify_auth_token,
site_dir=SITE_DIR,
)
logger.info(f"Article '{slug}' unpublished successfully")
return {
"status": "unpublished",
"slug": slug,
"deploy": {
"id": deploy_result.get("id"),
"url": deploy_result.get("url"),
"state": deploy_result.get("state")
}
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Unpublish error: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Failed to unpublish article: {str(e)}")
class UpdateArticleTopicRequest(BaseModel):
topic: Optional[str] = None
@app.patch("/api/publish/articles/{slug}")
async def update_article_topic(slug: str, request: UpdateArticleTopicRequest):
"""
Update the topic of a published article and redeploy.
Args:
slug: Article slug
request: Body with new topic value
"""
from src.publish.renderer import render_research_index
from src.publish.netlify import deploy_site
if not _settings.netlify_auth_token or not _settings.netlify_site_id:
raise HTTPException(
status_code=400,
detail="Publishing not configured. Add NETLIFY_AUTH_TOKEN and NETLIFY_SITE_ID to .env"
)
articles = _load_article_index()
article = next((a for a in articles if a.get("slug") == slug), None)
if article is None:
raise HTTPException(status_code=404, detail=f"Article '{slug}' not found.")
article["topic"] = request.topic or ""
_save_article_index(articles)
# Regenerate research index page
index_html = render_research_index(
articles=articles,
site_url=_settings.publish_site_url,
)
with open(RESEARCH_DIR / "index.html", "w") as f:
f.write(index_html)
# Deploy entire site to Netlify
deploy_result = deploy_site(
site_id=_settings.netlify_site_id,
auth_token=_settings.netlify_auth_token,
site_dir=SITE_DIR,
)
return {
"status": "updated",
"slug": slug,
"topic": article["topic"],
"deploy_state": deploy_result.get("state", "unknown"),
}
@app.post("/api/publish/image/upload", response_model=ImageUploadResponse)
async def upload_article_image(request: ImageUploadRequest):
"""
Upload an image for a research article.
Validates image format and size, resizes if too large, and saves to
data/site/images/research/{slug}/ directory.
"""
from src.publish.image_processor import (
base64_to_bytes,
validate_image,
compress_image,
generate_image_filename,
)
# Check configuration (only available locally)
if _settings.hf_space:
raise HTTPException(
status_code=403,
detail="Image upload is only available on the local version."
)
try:
# Decode base64
image_bytes = base64_to_bytes(request.image_data)
# Validate image
is_valid, error_msg = validate_image(image_bytes)
if not is_valid:
raise HTTPException(status_code=400, detail=error_msg)
# Compress and resize
compressed_bytes = compress_image(image_bytes, max_width=1920, quality=85)
# Generate filename
slug = request.slug or "draft"
filename = generate_image_filename(request.filename, slug)
# Create directory
image_dir = SITE_DIR / "images" / "research" / slug
image_dir.mkdir(parents=True, exist_ok=True)
# Save image
image_path = image_dir / filename
with open(image_path, "wb") as f:
f.write(compressed_bytes)
# Generate URL
url = f"/images/research/{slug}/{filename}"
image_id = str(uuid.uuid4())
logger.info(
f"Uploaded image: {filename} ({len(compressed_bytes)} bytes) -> {url}"
)
return ImageUploadResponse(
image_id=image_id,
url=url,
filename=filename,
size=len(compressed_bytes),
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Image upload error: {e}", exc_info=True)
raise HTTPException(
status_code=500, detail=f"Failed to upload image: {str(e)}"
)
@app.post("/api/publish", response_model=PublishResponse)
async def publish_article(request: PublishRequest):
"""
Publish a research conversation as a static web article.
Generates self-contained HTML, deploys the full site to Netlify,
and returns the published URL.
"""
from src.publish.renderer import render_article, render_research_index, slugify
from src.publish.netlify import deploy_site
# Check configuration
if not _settings.netlify_auth_token or not _settings.netlify_site_id:
raise HTTPException(
status_code=503,
detail="Publishing not configured. Set NETLIFY_AUTH_TOKEN and NETLIFY_SITE_ID."
)
if _settings.hf_space:
raise HTTPException(
status_code=403,
detail="Publishing is only available on the local version."
)
if not request.messages:
raise HTTPException(status_code=400, detail="No messages to publish.")
try:
# Generate title from first user message if not provided
first_user = next((m for m in request.messages if m.role == "user"), None)
title = request.title or (first_user.content[:100] if first_user else "Research Article")
description = request.description or (first_user.content[:200] if first_user else "")
# Generate unique slug
base_slug = slugify(title)
if not base_slug:
base_slug = "article"
articles = _load_article_index()
existing_slugs = {a["slug"] for a in articles}
slug = base_slug
counter = 2
while slug in existing_slugs:
slug = f"{base_slug}-{counter}"
counter += 1
created_at = datetime.utcnow().isoformat()
# Handle images if provided
if request.images:
import shutil
# Validate images exist
for image_url in request.images:
# Extract path from URL: /images/research/draft/file.png
if image_url.startswith('/'):
image_path = SITE_DIR / image_url.lstrip('/')
if not image_path.exists():
logger.warning(f"Image not found: {image_url}")
# Move images from 'draft' to final slug directory
draft_dir = SITE_DIR / "images" / "research" / "draft"
final_dir = SITE_DIR / "images" / "research" / slug
if draft_dir.exists() and any(draft_dir.iterdir()):
final_dir.mkdir(parents=True, exist_ok=True)
# Update URLs in content
for msg in request.messages:
if hasattr(msg, 'content') and msg.content:
msg.content = msg.content.replace(
'/images/research/draft/',
f'/images/research/{slug}/'
)
# Move actual files
for img_file in draft_dir.glob('*'):
if img_file.is_file():
dest_file = final_dir / img_file.name
shutil.move(str(img_file), str(dest_file))
logger.info(f"Moved image: {img_file.name} -> {dest_file}")
# Render article HTML
messages_data = [msg.model_dump() for msg in request.messages]
# Use provided exchange_count or calculate from messages
exchange_count = request.exchange_count
if exchange_count is None:
exchange_count = len([m for m in request.messages if m.role == "user"])
article_html = render_article(
slug=slug,
title=title,
description=description,
messages=messages_data,
created_at=created_at,
site_url=_settings.publish_site_url,
exchange_count=exchange_count,
)
# Save article HTML
article_file = RESEARCH_DIR / f"{slug}.html"
with open(article_file, "w") as f:
f.write(article_html)
# Update image URLs in the images list
final_images = []
if request.images:
for img_url in request.images:
updated_url = img_url.replace('/images/research/draft/', f'/images/research/{slug}/')
final_images.append(updated_url)
# Update article index with correct exchange count
articles.append({
"slug": slug,
"title": title,
"description": description,
"created_at": created_at,
"exchange_count": exchange_count, # Use the correct exchange count
"tags": request.tags or [],
"topic": request.topic or "",
"images": final_images,
})
_save_article_index(articles)
# Regenerate research index page
index_html = render_research_index(
articles=articles,
site_url=_settings.publish_site_url,
)
with open(RESEARCH_DIR / "index.html", "w") as f:
f.write(index_html)
# Deploy entire site to Netlify
deploy_result = deploy_site(
site_id=_settings.netlify_site_id,
auth_token=_settings.netlify_auth_token,
site_dir=SITE_DIR,
)
article_url = f"{_settings.publish_site_url}/research/{slug}"
logger.info(f"Published article: {slug} -> {article_url}")
return PublishResponse(
slug=slug,
url=article_url,
deploy_state=deploy_result.get("state", "unknown"),
created_at=created_at,
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Publish error: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Failed to publish: {str(e)}")
# =============================================================================
# X.com Publishing Endpoints
# =============================================================================
class FormatArticleRequest(BaseModel):
"""Request to format messages as X article."""
messages: List[ChatMessage]
title: Optional[str] = None
include_sources: bool = True
class XPostRequest(BaseModel):
"""Request to post to X via API."""
format: Literal["article"]
messages: List[ChatMessage]
@app.post("/api/x/format-article")
async def format_x_article(request: FormatArticleRequest):
"""Format messages as X article."""
try:
from src.publish.x_formatter import format_as_x_article
# Auto-generate title from first query if not provided
title = request.title
if not title and request.messages:
first_query = next((m.content for m in request.messages if m.role == 'user'), '')
title = first_query[:100]
article_data = format_as_x_article(
messages=[msg.dict() for msg in request.messages],
title=title,
include_sources=request.include_sources
)
return article_data
except Exception as e:
logger.error(f"Article formatting error: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Failed to format article: {str(e)}")
@app.get("/api/x/config")
async def x_config_status():
"""Check if X API is configured."""
try:
settings = get_settings()
configured = bool(
settings.x_api_key and
settings.x_api_secret and
settings.x_access_token and
settings.x_access_token_secret
)
return {"configured": configured}
except Exception as e:
logger.error(f"X config check error: {e}", exc_info=True)
return {"configured": False}
@app.post("/api/x/post")
async def post_to_x(request: XPostRequest):
"""Post article to X via API (if configured)."""
try:
from src.publish.x_formatter import format_as_x_article
# Check if X API is configured
settings = get_settings()
if not all([settings.x_api_key, settings.x_api_secret,
settings.x_access_token, settings.x_access_token_secret]):
raise HTTPException(
status_code=400,
detail="X API not configured. Please add X API credentials to .env file."
)
# Import X API client
try:
from src.publish.x_api import XApiClient
except ImportError:
raise HTTPException(
status_code=500,
detail="X API client not available. Install tweepy: pip install tweepy"
)
client = XApiClient()
# Format and post article
article = format_as_x_article(
messages=[msg.dict() for msg in request.messages]
)
result = await client.post_article(article)
return {
"success": True,
"url": result["url"],
"id": result["id"]
}
except HTTPException:
raise
except Exception as e:
logger.error(f"X posting failed: {e}", exc_info=True)
return {
"success": False,
"error": str(e)
}
# =============================================================================
# Conversation Management Endpoints
# =============================================================================
@app.post("/api/conversations")
async def create_conversation(title: Optional[str] = None):
"""Create a new conversation."""
try:
conversation_manager = get_conversation_manager()
conversation = conversation_manager.create_conversation(title=title)
return {
"id": conversation.id,
"title": conversation.title,
"created_at": conversation.created_at.isoformat(),
}
except Exception as e:
logger.error(f"Create conversation error: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/conversations")
async def list_conversations(limit: Optional[int] = None):
"""List all conversations."""
try:
conversation_manager = get_conversation_manager()
conversations = conversation_manager.list_conversations(limit=limit, sort_by="updated_at")
return {"conversations": conversations}
except Exception as e:
logger.error(f"List conversations error: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/conversations/{conversation_id}")
async def get_conversation(conversation_id: str):
"""Get a specific conversation with all turns."""
try:
conversation_manager = get_conversation_manager()
conversation = conversation_manager.get_conversation(conversation_id)
if not conversation:
raise HTTPException(status_code=404, detail="Conversation not found")
return conversation.to_dict()
except HTTPException:
raise
except Exception as e:
logger.error(f"Get conversation error: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@app.delete("/api/conversations/{conversation_id}")
async def delete_conversation(conversation_id: str):
"""Delete a conversation."""
try:
conversation_manager = get_conversation_manager()
success = conversation_manager.delete_conversation(conversation_id)
if not success:
raise HTTPException(status_code=404, detail="Conversation not found")
return {"status": "deleted", "id": conversation_id}
except HTTPException:
raise
except Exception as e:
logger.error(f"Delete conversation error: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@app.patch("/api/conversations/{conversation_id}")
async def update_conversation(conversation_id: str, title: str):
"""Update conversation title."""
try:
conversation_manager = get_conversation_manager()
success = conversation_manager.update_title(conversation_id, title)
if not success:
raise HTTPException(status_code=404, detail="Conversation not found")
return {"status": "updated", "id": conversation_id, "title": title}
except HTTPException:
raise
except Exception as e:
logger.error(f"Update conversation error: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))