mdb / app.py
Chris4K's picture
Update app.py
16ecf79 verified
import os
import secrets
from datetime import datetime, timedelta
from typing import List, Dict, Optional, Union
import chromadb
import json
from fastapi import FastAPI, HTTPException, Body, Query, Depends
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from fastapi.middleware.cors import CORSMiddleware
from jose import jwt, JWTError
from langchain_community.document_loaders import PyPDFLoader, WebBaseLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.embeddings import HuggingFaceEmbeddings
from chromadb.config import Settings
class TokenManager:
"""Handle JWT token generation and validation."""
SECRET_KEY = secrets.token_hex(32)
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
@staticmethod
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
"""Create JWT access token."""
to_encode = data.copy()
expire = datetime.utcnow() + (expires_delta or timedelta(minutes=15))
to_encode.update({"exp": expire})
return jwt.encode(to_encode, TokenManager.SECRET_KEY, algorithm=TokenManager.ALGORITHM)
@staticmethod
def decode_token(token: str) -> Dict:
"""Decode and validate JWT token."""
try:
return jwt.decode(token, TokenManager.SECRET_KEY, algorithms=[TokenManager.ALGORITHM])
except JWTError:
raise HTTPException(status_code=401, detail="Invalid token")
import os
import re
import secrets
from datetime import datetime, timedelta
from typing import List, Dict, Optional, Union
import chromadb
import json
import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse
import chromadb
import json
from fastapi import FastAPI, HTTPException, Body, Query, Depends
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from fastapi.middleware.cors import CORSMiddleware
from jose import jwt, JWTError
from langchain_community.document_loaders import PyPDFLoader, WebBaseLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.embeddings import HuggingFaceEmbeddings
from chromadb.config import Settings
import os
import re
import time
import secrets
from datetime import datetime, timedelta
from typing import List, Dict, Optional, Union, Callable
import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse
import logging
class ContentExtractorBase:
"""Base class for content extraction strategies."""
@staticmethod
def extract(soup: BeautifulSoup) -> str:
"""
Base method to be overridden by specific extraction strategies.
Args:
soup (BeautifulSoup): Parsed HTML content
Returns:
str: Extracted text content
"""
raise NotImplementedError("Subclasses must implement extract method")
class DefaultContentExtractor(ContentExtractorBase):
"""Default content extraction strategy."""
@staticmethod
def extract(soup: BeautifulSoup) -> str:
"""
Extract text using multiple strategies:
1. Try main content containers
2. Fall back to body text
3. Provide clean, normalized text
"""
# Priority content containers
content_selectors = [
'main', 'article', 'div.content', 'div.main-content',
'section.content', 'body'
]
for selector in content_selectors:
content = soup.select_one(selector)
if content:
# Remove unnecessary elements
for unwanted in content(['script', 'style', 'nav', 'footer', 'header', 'aside']):
unwanted.decompose()
text = content.get_text(separator=' ', strip=True)
# Normalize whitespace
text = re.sub(r'\s+', ' ', text).strip()
if text and len(text) > 100: # Ensure meaningful content
return text
# Fallback: entire body text
return soup.get_text(separator=' ', strip=True)
class BlogContentExtractor(ContentExtractorBase):
"""Specialized content extractor for blog-style websites."""
@staticmethod
def extract(soup: BeautifulSoup) -> str:
"""
Extract text for blog-style content:
1. Prioritize article body
2. Handle common blog layouts
"""
# Blog-specific content selectors
blog_selectors = [
'article .entry-content',
'.post-content',
'div.blog-post',
'.article-body'
]
for selector in blog_selectors:
content = soup.select_one(selector)
if content:
# Remove unnecessary elements
for unwanted in content(['script', 'style', 'aside', 'footer', 'header']):
unwanted.decompose()
text = content.get_text(separator=' ', strip=True)
text = re.sub(r'\s+', ' ', text).strip()
if text and len(text) > 100:
return text
# Fallback to default extraction
return DefaultContentExtractor.extract(soup)
class ContentExtractorFactory:
"""Factory for creating appropriate content extractors."""
EXTRACTORS = {
'default': DefaultContentExtractor,
'blog': BlogContentExtractor
}
@classmethod
def get_extractor(cls, site_type: str = 'default') -> ContentExtractorBase:
"""
Get appropriate content extractor based on site type.
Args:
site_type (str): Type of website content
Returns:
ContentExtractorBase: Appropriate content extractor
"""
return cls.EXTRACTORS.get(site_type, DefaultContentExtractor)
class RateLimiter:
"""
Sophisticated rate limiting with configurable strategies.
Supports:
- Maximum total requests
- Request frequency control
- Exponential backoff for failed requests
"""
def __init__(
self,
max_requests: int = 50,
request_interval: float = 1.0,
max_retries: int = 3
):
"""
Initialize rate limiter.
Args:
max_requests (int): Maximum number of requests allowed
request_interval (float): Minimum time between requests in seconds
max_retries (int): Maximum retry attempts for failed requests
"""
self.max_requests = max_requests
self.request_interval = request_interval
self.max_retries = max_retries
self.request_count = 0
self.last_request_time = 0
def wait(self):
"""
Implement wait strategy between requests.
Uses time-based rate limiting.
"""
current_time = time.time()
time_since_last_request = current_time - self.last_request_time
if time_since_last_request < self.request_interval:
sleep_time = self.request_interval - time_since_last_request
time.sleep(sleep_time)
def can_request(self) -> bool:
"""
Check if a new request can be made.
Returns:
bool: True if request is allowed, False otherwise
"""
return (
self.request_count < self.max_requests
)
def record_request(self):
"""
Record a successful request.
"""
self.request_count += 1
self.last_request_time = time.time()
def exponential_backoff(self, attempt: int) -> float:
"""
Calculate exponential backoff time.
Args:
attempt (int): Current retry attempt
Returns:
float: Time to wait before next retry
"""
return min(2 ** attempt, 60) # Max wait of 60 seconds
class DomainCrawler:
"""
Advanced domain crawler with:
- Rate limiting
- Configurable content extraction
- Robust URL handling
"""
def __init__(
self,
base_url: str,
max_pages: int = 50,
depth: int = 3,
site_type: str = 'default',
rate_limit_config: Dict = None
):
"""
Initialize domain crawler with advanced configurations.
Args:
base_url (str): Starting URL to crawl
max_pages (int): Maximum number of pages to index
depth (int): Maximum depth of links to follow
site_type (str): Type of website for content extraction
rate_limit_config (Dict): Custom rate limiting configuration
"""
# URL and crawling configuration
self.base_url = self.normalize_url(base_url)
self.max_pages = max_pages
self.depth = depth
# Content extraction strategy
self.content_extractor = ContentExtractorFactory.get_extractor(site_type)
# Rate limiting
rate_config = rate_limit_config or {
'max_requests': max_pages,
'request_interval': 1.0,
'max_retries': 3
}
self.rate_limiter = RateLimiter(**rate_config)
# Tracking
self.visited_urls = set()
self.page_sources = []
# Logging
self.logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
@staticmethod
def normalize_url(url: str) -> str:
"""Normalize URLs to prevent duplicate indexing."""
try:
parsed = urlparse(url)
clean_url = parsed._replace(fragment='')
normalized = clean_url.geturl()
normalized = re.sub(r'^https?://www\.', 'https://', normalized)
normalized = re.sub(r'/$', '', normalized)
return normalized.lower()
except Exception:
return url.lower()
def is_valid_url(self, url: str) -> bool:
"""Check if URL is valid and within the same domain."""
try:
normalized_url = self.normalize_url(url)
parsed_base = urlparse(self.base_url)
parsed_url = urlparse(normalized_url)
return (
parsed_base.netloc == parsed_url.netloc and
parsed_url.scheme in ['http', 'https'] and
not re.search(r'\.(pdf|jpg|jpeg|png|gif|mp4|mp3|zip|rar)$', parsed_url.path, re.IGNORECASE) and
normalized_url not in self.visited_urls
)
except Exception:
return False
def extract_text(self, url: str) -> str:
"""
Extract text content with retry and rate limiting.
Args:
url (str): URL to extract content from
Returns:
str: Extracted text content
"""
if not self.rate_limiter.can_request():
self.logger.warning(f"Rate limit exceeded. Stopping crawl at {url}")
return ""
for attempt in range(self.rate_limiter.max_retries):
try:
# Rate limiting wait
self.rate_limiter.wait()
# Request with user agent and timeout
response = requests.get(url, timeout=10, headers={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
})
response.raise_for_status()
# Parse HTML
soup = BeautifulSoup(response.text, 'html.parser')
# Extract content using selected strategy
text = self.content_extractor.extract(soup)
# Record successful request
self.rate_limiter.record_request()
return text
except requests.RequestException as e:
self.logger.error(f"Request error for {url} (Attempt {attempt+1}): {e}")
# Exponential backoff
backoff_time = self.rate_limiter.exponential_backoff(attempt)
time.sleep(backoff_time)
self.logger.error(f"Failed to extract content from {url} after {self.rate_limiter.max_retries} attempts")
return ""
def crawl(self, url: str = None, current_depth: int = 0) -> List[Dict]:
"""
Recursive web crawler with advanced controls.
Args:
url (str, optional): URL to crawl. Defaults to base_url.
current_depth (int, optional): Current crawl depth. Defaults to 0.
Returns:
List[Dict]: Collected page sources
"""
url = self.normalize_url(url or self.base_url)
# Stopping conditions
if (current_depth > self.depth or
len(self.visited_urls) >= self.max_pages or
not self.is_valid_url(url)):
return self.page_sources
# Prevent duplicate visits
if url in self.visited_urls:
return self.page_sources
self.visited_urls.add(url)
# Extract text
text = self.extract_text(url)
if text:
self.page_sources.append({
"text": text,
"source": url,
"indexed_at": datetime.utcnow().isoformat()
})
# Stop if rate limit or max pages reached
if not self.rate_limiter.can_request():
self.logger.info("Rate limit reached. Stopping crawl.")
return self.page_sources
# Find and follow links
try:
response = requests.get(url, timeout=10, headers={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
})
soup = BeautifulSoup(response.text, 'html.parser')
for link in soup.find_all('a', href=True):
absolute_link = urljoin(url, link['href'])
normalized_link = self.normalize_url(absolute_link)
if (self.is_valid_url(normalized_link) and
normalized_link not in self.visited_urls):
self.crawl(normalized_link, current_depth + 1)
except Exception as e:
self.logger.error(f"Crawling error for {url}: {e}")
return self.page_sources
class DocumentIndexerConfig:
"""Enhanced configuration management."""
CONFIG_FILE = "server_config.json"
DEFAULT_CONFIG = {
"chunk_size": 500,
"chunk_overlap": 50,
"embedding_model": "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2",
"top_k": 5,
"persist_directory": "/home/user/.cache/chroma_db",
"default_collection": "documents"
}
@classmethod
def load_config(cls) -> Dict:
"""Load configuration with fallback to defaults."""
try:
with open(cls.CONFIG_FILE, "r") as f:
config = json.load(f)
# Merge with defaults to ensure all keys exist
return {**cls.DEFAULT_CONFIG, **config}
except FileNotFoundError:
return cls.DEFAULT_CONFIG
@classmethod
def save_config(cls, config_data: Dict):
"""Save configuration, preserving existing data."""
existing_config = cls.load_config()
existing_config.update(config_data)
with open(cls.CONFIG_FILE, "w") as f:
json.dump(existing_config, f, indent=4)
class UserManager:
"""Enhanced user management with role-based access."""
USERS = {
"admin": {
"password": "admin123",
"role": "admin",
"collections": ["all"]
},
"customer": {
"password": "customer123",
"role": "user",
"collections": ["customer_1"]
}
}
@staticmethod
def authenticate_user(username: str, password: str) -> Optional[Dict]:
"""Authenticate user credentials."""
user = UserManager.USERS.get(username)
if user and user["password"] == password:
return user
return None
@staticmethod
def get_user_collections(username: str) -> List[str]:
"""Get collections a user can access."""
user = UserManager.USERS.get(username)
return user.get('collections', []) if user else []
class DocumentIndexer:
"""Comprehensive document indexing with enhanced metadata and source tracking."""
def __init__(self, config: Dict = None, username: str = None):
"""
Initialize with configuration and username for collection management
"""
self.config = config or DocumentIndexerConfig.load_config()
self.username = username
# Determine collection name based on user
if username == "admin":
self.default_collection = self.config["default_collection"]
else:
# Use username-specific collection or fallback to default
self.default_collection = f"{username}_collection"
self.chroma_client = chromadb.Client(
Settings(persist_directory=self.config["persist_directory"])
)
self.embedding_model = HuggingFaceEmbeddings(
model_name=self.config["embedding_model"]
)
def split_document(self, text: str) -> List[str]:
"""Enhanced text splitting with configurable parameters."""
splitter = RecursiveCharacterTextSplitter(
chunk_size=self.config.get("chunk_size", 500),
chunk_overlap=self.config.get("chunk_overlap", 50)
)
return splitter.split_text(text)
def index_document(
self,
text: str,
doc_type: str,
collection_name: str,
source: str = None
) -> Dict:
"""
Advanced document indexing with comprehensive metadata.
Args:
text (str): Document text content
doc_type (str): Type of document
collection_name (str): Target collection
source (str, optional): Source URL or path
"""
collection = self.chroma_client.get_or_create_collection(name=collection_name)
chunks = self.split_document(text)
doc_id = f"{collection_name}_{len(collection.get()['ids'])}"
collection.add(
documents=chunks,
metadatas=[{
"doc_id": doc_id or "",
"source_type": doc_type or "",
"source": source or "",
"indexed_at": datetime.utcnow().isoformat(),
"chunk_number": i,
"total_chunks": len(chunks) or 0
} for i in range(len(chunks))],
ids=[f"{doc_id}_{i}" for i in range(len(chunks))]
)
return {
"status": "success",
"doc_id": doc_id,
"collection": collection_name,
"chunks": len(chunks)
}
def index_source(self, source: str, doc_type: str, collection_name: str) -> Dict:
"""
Comprehensive source indexing with multiple strategies.
Supports:
- Manual text entry
- PDF files
- Websites
- Entire domain crawling
"""
strategies = {
"manual": lambda: self.index_document(source, doc_type, collection_name),
"pdf": lambda: self.index_document(
PyPDFLoader(source).load_and_split_text(),
doc_type,
collection_name,
source
),
"website": lambda: self.index_document(
WebBaseLoader(source).load(),
doc_type,
collection_name,
source
),
"domain": lambda: self._index_domain(source, collection_name)
}
strategy = strategies.get(doc_type)
if not strategy:
raise HTTPException(status_code=400, detail="Unsupported document type")
return strategy()
def _index_domain(self, base_url: str, collection_name: str) -> Dict:
"""
Enhanced domain indexing with individual page tracking
"""
crawler = DomainCrawler(base_url)
page_sources = crawler.crawl()
# Handle case where no texts are found
if not page_sources:
return {
"status": "warning",
"message": "No text content found",
"collection": collection_name,
"chunks": 0
}
# Index each page individually
results = []
for page in page_sources:
result = self.index_document(
page["text"],
"domain",
collection_name,
page["source"]
)
results.append(result)
return {
"status": "success",
"collection": collection_name,
"total_pages_indexed": len(results),
"details": results
}
def search_documents(
self,
query: str,
top_k: int = None,
collection_name: str = None
) -> Dict:
"""Enhanced document search with configurable parameters."""
collection_name = collection_name or self.config["default_collection"]
collection = self.chroma_client.get_or_create_collection(name=collection_name)
top_k = top_k or self.config.get("top_k", 5)
results = collection.query(query_texts=[query], n_results=top_k)
return results
class TokenManager:
"""Handle JWT token generation and validation."""
SECRET_KEY = secrets.token_hex(32)
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
@staticmethod
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
"""Create JWT access token."""
to_encode = data.copy()
expire = datetime.utcnow() + (expires_delta or timedelta(minutes=15))
to_encode.update({"exp": expire})
return jwt.encode(to_encode, TokenManager.SECRET_KEY, algorithm=TokenManager.ALGORITHM)
@staticmethod
def decode_token(token: str) -> Dict:
"""Decode and validate JWT token."""
try:
return jwt.decode(token, TokenManager.SECRET_KEY, algorithms=[TokenManager.ALGORITHM])
except JWTError:
raise HTTPException(status_code=401, detail="Invalid token")
def create_app():
"""Create and configure FastAPI application."""
app = FastAPI(title="Document Indexing API",openapi_url="/api/v1/openapi.json",docs_url="/documentation")
# CORS Middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], #host hf
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
security = HTTPBearer()
def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)):
"""Authenticate user from token."""
token = credentials.credentials
payload = TokenManager.decode_token(token)
username = payload.get("sub")
if not username:
raise HTTPException(status_code=401, detail="Invalid token")
return username
@app.post("/login")
async def login(credentials: Dict[str, str] = Body(...)):
"""Login endpoint with JWT token generation."""
username = credentials.get("username")
password = credentials.get("password")
user = UserManager.authenticate_user(username, password)
if not user:
raise HTTPException(status_code=401, detail="Invalid credentials")
access_token = TokenManager.create_access_token(
data={"sub": username, "role": user["role"]},
expires_delta=timedelta(minutes=TokenManager.ACCESS_TOKEN_EXPIRE_MINUTES)
)
return {
"token": access_token,
"username": username,
"role": user["role"]
}
@app.get("/admin/config")
def get_config(user: str = Depends(get_current_user)):
"""Get server configuration (Admin only)."""
if user != "admin":
raise HTTPException(status_code=403, detail="Access denied")
return server_config
@app.post("/admin/config")
def update_config(new_config: Dict, user: str = Depends(get_current_user)):
"""Update server configuration (Admin only)."""
if user != "admin":
raise HTTPException(status_code=403, detail="Access denied")
server_config.update(new_config)
ConfigManager.save_config(server_config)
return {"status": "success", "updated_config": new_config}
@app.post("/delete")
def delete_document(
doc_id: str,
collection: Optional[str] = Query(None),
user: str = Depends(get_current_user)
):
"""
Delete a specific document from ChromaDB
- Admins can delete from any collection
- Users can only delete from their own collection
"""
indexer = DocumentIndexer(DocumentIndexerConfig.load_config(), user)
collection_name = collection or indexer.default_collection
# Verify user access
if user != "admin":
# Ensure user is only accessing their own collection
if not collection_name.startswith(f"{user}_"):
raise HTTPException(status_code=403, detail="Access denied")
collection = indexer.chroma_client.get_or_create_collection(name=collection_name)
# Find all chunks of the document
chunks_to_delete = [
chunk_id for chunk_id in collection.get()['ids']
if chunk_id.startswith(doc_id)
]
if not chunks_to_delete:
raise HTTPException(status_code=404, detail="Document not found")
# Delete document chunks
collection.delete(ids=chunks_to_delete)
return {
"status": "success",
"deleted_chunks": len(chunks_to_delete),
"document_id": doc_id
}
@app.post("/reindex")
def reindex_document(
doc_id: str,
doc_type: str = Body(...),
source: str = Body(...),
collection: Optional[str] = Query(None),
user: str = Depends(get_current_user)
):
"""
Reindex a specific document:
1. Delete existing document chunks
2. Reindex from the source
"""
indexer = DocumentIndexer(DocumentIndexerConfig.load_config(), user)
collection_name = collection or indexer.default_collection
# Verify user access
if user != "admin":
if not collection_name.startswith(f"{user}_"):
raise HTTPException(status_code=403, detail="Access denied")
# Delete existing document
delete_document(doc_id, collection_name, user)
# Reindex the source
return indexer.index_source(source, doc_type, collection_name)
@app.post("/reindexAll")
def reindex_all(user: str = Depends(get_current_user)):
"""
Reindex entire database:
- Admin can reindex all collections
- Users can only reindex their own collection
"""
if user == "admin":
# Admin reindexes all collections
collections = [
collection.name for collection in
DocumentIndexer(DocumentIndexerConfig.load_config()).chroma_client.list_collections()
]
else:
# Users only reindex their own collection
collections = [f"{user}_collection"]
results = {}
for collection_name in collections:
try:
# Retrieve all documents in the collection
collection = DocumentIndexer(
DocumentIndexerConfig.load_config()
).chroma_client.get_or_create_collection(name=collection_name)
# Get all document sources
sources = set(
metadata.get('source', '') for metadata in
collection.get()['metadatas'] if metadata.get('source')
)
# Reindex each unique source
collection_results = []
for source in sources:
# Attempt to determine doc_type based on source
doc_type = (
"pdf" if source.endswith('.pdf') else
"website" if source.startswith(('http://', 'https://')) else
"manual"
)
# Delete existing collection and reindex
indexer = DocumentIndexer(
DocumentIndexerConfig.load_config(),
user
)
result = indexer.index_source(source, doc_type, collection_name)
collection_results.append(result)
results[collection_name] = collection_results
except Exception as e:
results[collection_name] = {"error": str(e)}
return results
@app.post("/index")
def index_content(
doc_type: str = Body(...),
source: str = Body(...),
collection: Optional[str] = Query(None),
user: str = Depends(get_current_user)
):
"""
Index content in ChromaDB with user-specific collections
"""
indexer = DocumentIndexer(DocumentIndexerConfig.load_config(), user)
# Use provided collection or generate user-specific collection
collection_name = collection or indexer.default_collection
return indexer.index_source(source, doc_type, collection_name)
@app.get("/list")
def list_documents(user: str = Depends(get_current_user)):
"""List all indexed documents."""
# TODO: Implement role-based document listing
indexer = DocumentIndexer(DocumentIndexerConfig.load_config())
collection = indexer.chroma_client.get_or_create_collection(
indexer.config["default_collection"]
)
docs = collection.get()
#return {"documents": docs["documents"], "ids": docs["ids"]}
return docs
@app.post("/search")
def search_documents(
query: str,
top_k: Optional[int] = Query(None),
collection: Optional[str] = Query(None),
user: str = Depends(get_current_user)
):
"""Search indexed documents."""
indexer = DocumentIndexer(DocumentIndexerConfig.load_config())
return indexer.search_documents(query, top_k, collection)
return app
app = create_app()