Search / app.py
dwfwfwfwf's picture
Update app.py
142c0ff verified
# app.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, HttpUrl, Field
from crawl4ai import (
AsyncWebCrawler,
CrawlerRunConfig,
CacheMode,
BrowserConfig,
RateLimiter,
CrawlerMonitor, # Keep this import
DisplayMode # Keep this import
)
from crawl4ai.async_dispatcher import MemoryAdaptiveDispatcher, SemaphoreDispatcher # Import dispatchers
from crawl4ai.markdown_generation_strategy import DefaultMarkdownGenerator
from crawl4ai.content_filter_strategy import BM25ContentFilter, PruningContentFilter
from googlesearch import search as google_search_sync # Rename to avoid conflict
import uvicorn
import asyncio
import re
from typing import Optional, List, Dict, Tuple
from bs4 import BeautifulSoup
from datetime import datetime
import traceback # For detailed error logging
# nest_asyncio removed - no longer needed
app = FastAPI(
title="Search & Crawl API",
description="An API to perform Google Search and crawl results using Crawl4AI",
version="1.1.0"
)
# --- Pydantic Models ---
class CrawlRequest(BaseModel):
url: HttpUrl
cache_mode: str = "DISABLED"
excluded_tags: list[str] = ["nav", "footer", "aside", "header", "script", "style"]
remove_overlay_elements: bool = True
ignore_links: bool = True
subject: Optional[str] = None # Optional subject for content filtering
class Article(BaseModel):
title: str
url: str
description: Optional[str] = None
image_url: Optional[str] = None
timestamp: Optional[str] = None
category: Optional[str] = None
source_url: Optional[str] = None # Added to track original source
class CrawlResponse(BaseModel):
url: str
success: bool
error: Optional[str] = None
metadata: Dict = {}
articles: List[Article] = []
raw_markdown: Optional[str] = None
stats: Dict = {}
class SearchCrawlRequest(BaseModel):
query: str = Field(..., description="The query string for Google Search")
num_results: int = Field(default=10, ge=1, le=30, description="Number of Google Search results to crawl")
subject: Optional[str] = Field(default=None, description="Optional subject for BM25 content filtering during crawl")
use_semaphore_dispatcher: bool = Field(default=False, description="Use SemaphoreDispatcher instead of MemoryAdaptiveDispatcher")
max_concurrent_tasks: int = Field(default=10, ge=1, description="Max concurrent crawls (used by dispatcher)")
cache_mode: str = Field(default="DISABLED", description="Crawl4AI cache mode (ENABLED, DISABLED, BYPASS)")
base_delay_secs: Tuple[float, float] = Field(default=(1.0, 3.0), description="Base delay range (min, max) in seconds for rate limiter")
max_delay_secs: float = Field(default=60.0, description="Max backoff delay in seconds for rate limiter")
max_retries: int = Field(default=3, description="Max retries on rate limit errors for rate limiter")
# --- Helper Functions ---
def clean_url(url: str) -> str:
"""Clean and normalize URLs"""
url = url.replace('<', '').replace('>', '').strip()
if url.startswith('https://'):
try:
domain_part = url[8:].split('/')[0]
if domain_part:
cleaned_url = url.replace(f'https://{domain_part}/{domain_part}', f'https://{domain_part}')
cleaned_url = re.sub(rf'https://{re.escape(domain_part)}/https:/*', f'https://{domain_part}/', cleaned_url)
else:
cleaned_url = url
except IndexError:
cleaned_url = url
if not cleaned_url.startswith('https://'):
# Attempt reconstruction only if domain_part was found
if 'domain_part' in locals() and domain_part:
cleaned_url = f'https://{domain_part}'
else: # Fallback if domain extraction failed entirely
cleaned_url = url # Keep original if parsing was problematic
else:
cleaned_url = url
cleaned_url = cleaned_url.split(' ')[0].split(')')[0]
cleaned_url = cleaned_url.rstrip('/')
return cleaned_url
def is_valid_title(title: str) -> bool:
"""Check if the title is valid"""
if not title: return False
invalid_patterns = ['**_access_time_', 'existing code', '...', 'navigation', 'menu', 'logo']
title_lower = title.lower()
if any(pattern in title_lower for pattern in invalid_patterns): return False
if title.count('-') > 4 or title.count('_') > 3 or '/' in title: return False
if len(title.strip()) < 5: return False
return True
def clean_description(description: str) -> Optional[str]:
"""Clean and normalize description text"""
if not description: return None
if '_access_time_' in description or description.strip().startswith("!"): return None
description = re.sub(r'\[([^\]]+)\]\([^)]+\)', r'\1', description)
description = re.sub(r'\bhttps?://\S+', '', description)
description = description.replace('*', '').replace('_', '').replace('`', '')
description = description.strip().strip('()[]{}<>')
description = ' '.join(description.split())
return description if len(description) > 15 else None
def extract_articles(markdown: str, source_url: str) -> List[Article]:
"""Extracts articles from markdown, assigning the source_url"""
articles = []
seen_urls = set()
article_pattern = re.compile(
r'(?:!\[[^\]]*\]\((?P<image_url>[^)]+)\)\s*)?'
r'\[(?P<title>[^\]]+)\]'
r'\((?P<url>[^)]+)\)'
r'(?:\s*(?P<description>[^\n\[]*))?'
, re.MULTILINE)
for match in article_pattern.finditer(markdown):
title = match.group('title').strip()
url = match.group('url').strip()
description = match.group('description').strip() if match.group('description') else None
image_url_extracted = match.group('image_url').strip() if match.group('image_url') else None
if not url or not title: continue
if not is_valid_title(title): continue
url = clean_url(url)
if not url.startswith(('http://', 'https://')) or url.lower().endswith(('.pdf', '.jpg', '.png', '.gif', '.jpeg', '.webp', '.svg', '.zip', '.docx')):
continue
if url in seen_urls: continue
seen_urls.add(url)
clean_desc = clean_description(description)
image_url = None
if image_url_extracted:
cleaned_img_url = clean_url(image_url_extracted)
if cleaned_img_url.lower().endswith(('.jpg', '.png', '.gif', '.jpeg', '.webp')):
image_url = cleaned_img_url
article = Article(
title=title,
url=url,
description=clean_desc,
image_url=image_url,
timestamp=None,
category=None,
source_url=source_url
)
articles.append(article)
return articles
def extract_metadata(markdown: str) -> Dict:
"""Basic metadata extraction from markdown"""
metadata = {
"timestamp": datetime.now().isoformat(),
"categories": [],
}
category_pattern = r'^##\s+(.*)'
matches = re.findall(category_pattern, markdown, re.MULTILINE)
if matches:
cleaned_categories = []
for cat in matches:
cat_text = re.sub(r'\[([^\]]+)\]\([^)]+\)', r'\1', cat) # Remove links
cat_text = cat_text.replace('*','').replace('_','').strip()
if cat_text and len(cat_text) > 2:
cleaned_categories.append(cat_text)
metadata["categories"] = cleaned_categories
return metadata
# --- FastAPI Endpoints ---
@app.get("/")
def read_root():
return {
"message": "Welcome to Search & Crawl API",
"docs_url": "/docs",
"redoc_url": "/redoc"
}
@app.post("/crawl", response_model=CrawlResponse, summary="Crawl a single URL")
async def crawl_url(request: CrawlRequest):
"""Crawls a single URL using Crawl4AI."""
try:
# Determine Cache Mode
try:
cache_mode_enum = CacheMode[request.cache_mode.upper()]
except KeyError:
raise HTTPException(status_code=400, detail=f"Invalid cache_mode. Use one of: {', '.join([m.name for m in CacheMode])}")
# Configure content filter based on subject
if request.subject:
content_filter = BM25ContentFilter(user_query=request.subject, bm25_threshold=1.2)
else:
content_filter = PruningContentFilter(threshold=0.48, threshold_type="fixed", min_word_threshold=50)
md_generator = DefaultMarkdownGenerator(
content_filter=content_filter,
options={"ignore_images": True, "ignore_links": request.ignore_links}
)
# Browser Config
browser_config = BrowserConfig(headless=True, verbose=False)
async with AsyncWebCrawler(config=browser_config) as crawler:
config = CrawlerRunConfig(
cache_mode=cache_mode_enum,
excluded_tags=request.excluded_tags,
remove_overlay_elements=request.remove_overlay_elements,
markdown_generator=md_generator,
exclude_external_links=True,
exclude_social_media_links=True,
exclude_external_images=True,
exclude_domains=["facebook.com", "twitter.com", "instagram.com", "youtube.com", "tiktok.com", "pinterest.com"]
)
result = await crawler.arun(url=str(request.url), config=config)
markdown = result.markdown_v2.raw_markdown if result.success and result.markdown_v2 else None
articles = extract_articles(markdown, str(request.url)) if markdown else []
metadata = extract_metadata(markdown) if markdown else {"timestamp": datetime.now().isoformat(), "categories": []}
metadata["subject"] = request.subject
metadata["total_articles"] = len(articles)
return CrawlResponse(
url=str(request.url),
success=result.success,
error=result.error_message if not result.success else None,
metadata=metadata,
articles=articles,
raw_markdown=markdown,
stats={
"total_links": len(result.links) if result.links else 0,
"processing_time": result.processing_time if hasattr(result, 'processing_time') else None,
"status_code": result.status_code if hasattr(result, 'status_code') else None,
"dispatch_info": result.dispatch_result.dict() if result.dispatch_result else None
}
)
except Exception as e:
print(f"Error during single crawl for {request.url}: {traceback.format_exc()}")
raise HTTPException(status_code=500, detail=f"An unexpected error occurred: {str(e)}")
@app.post("/search-and-crawl", response_model=List[CrawlResponse], summary="Search Google and crawl results")
async def search_and_crawl(request: SearchCrawlRequest):
"""
Performs a Google Search for the given query, retrieves the top URLs,
and crawls each URL using Crawl4AI's multi-URL dispatcher.
"""
urls_to_crawl = []
try:
# --- 1. Perform Google Search (Synchronous, run in thread pool) ---
loop = asyncio.get_running_loop()
search_iterator = await loop.run_in_executor(
None,
lambda: google_search_sync(request.query, num_results=request.num_results, lang='en')
)
urls_to_crawl = [clean_url(url) for url in search_iterator if url]
if not urls_to_crawl:
return []
except Exception as e:
print(f"Error during Google Search for '{request.query}': {traceback.format_exc()}")
raise HTTPException(status_code=500, detail=f"Google Search failed: {str(e)}")
# --- 2. Configure Crawl4AI ---
try:
# Determine Cache Mode
try:
cache_mode_enum = CacheMode[request.cache_mode.upper()]
except KeyError:
raise HTTPException(status_code=400, detail=f"Invalid cache_mode. Use one of: {', '.join([m.name for m in CacheMode])}")
# Configure content filter
if request.subject:
content_filter = BM25ContentFilter(user_query=request.subject, bm25_threshold=1.2)
else:
content_filter = PruningContentFilter(threshold=0.48, threshold_type="fixed", min_word_threshold=50)
md_generator = DefaultMarkdownGenerator(
content_filter=content_filter,
options={"ignore_images": True, "ignore_links": True}
)
# General CrawlerRunConfig
run_config = CrawlerRunConfig(
cache_mode=cache_mode_enum,
stream=False,
excluded_tags=["nav", "footer", "aside", "header", "script", "style", "noscript", "figure"],
remove_overlay_elements=True,
markdown_generator=md_generator,
exclude_external_links=True,
exclude_social_media_links=True,
exclude_external_images=True,
exclude_domains=["facebook.com", "twitter.com", "instagram.com", "youtube.com", "tiktok.com", "pinterest.com", "linkedin.com"],
)
# Browser Config
browser_config = BrowserConfig(headless=True, verbose=False)
# Rate Limiter Config
rate_limiter = RateLimiter(
base_delay=request.base_delay_secs,
max_delay=request.max_delay_secs,
max_retries=request.max_retries,
rate_limit_codes=[429, 503]
)
# Optional Monitor (Corrected initialization)
monitor = CrawlerMonitor(display_mode=DisplayMode.AGGREGATED) # <--- CORRECTED LINE
# --- 3. Select and Configure Dispatcher ---
if request.use_semaphore_dispatcher:
dispatcher = SemaphoreDispatcher(
max_session_permit=request.max_concurrent_tasks,
rate_limiter=rate_limiter,
monitor=monitor # Pass the correctly initialized monitor
)
else:
dispatcher = MemoryAdaptiveDispatcher(
max_session_permit=request.max_concurrent_tasks,
memory_threshold_percent=90.0,
check_interval=1.0,
rate_limiter=rate_limiter,
monitor=monitor # Pass the correctly initialized monitor
)
# --- 4. Run Multi-URL Crawl ---
crawl_results = []
async with AsyncWebCrawler(config=browser_config) as crawler:
results = await crawler.arun_many(
urls=urls_to_crawl,
config=run_config,
dispatcher=dispatcher
)
# --- 5. Process Results ---
for result in results:
if result.success and result.markdown_v2 and result.markdown_v2.raw_markdown:
markdown = result.markdown_v2.raw_markdown
articles = extract_articles(markdown, result.url)
metadata = extract_metadata(markdown)
metadata["subject"] = request.subject
metadata["total_articles"] = len(articles)
crawl_response = CrawlResponse(
url=result.url,
success=True,
error=None,
metadata=metadata,
articles=articles,
raw_markdown=markdown,
stats={
"total_links": len(result.links) if result.links else 0,
"processing_time": result.processing_time if hasattr(result, 'processing_time') else None,
"status_code": result.status_code if hasattr(result, 'status_code') else None,
"dispatch_info": result.dispatch_result.dict() if result.dispatch_result else None
}
)
else:
crawl_response = CrawlResponse(
url=result.url,
success=False,
error=result.error_message or "Crawling failed or produced no markdown",
metadata={"timestamp": datetime.now().isoformat()},
articles=[],
raw_markdown=None,
stats={
"status_code": result.status_code if hasattr(result, 'status_code') else None,
"dispatch_info": result.dispatch_result.dict() if result.dispatch_result else None
}
)
crawl_results.append(crawl_response)
return crawl_results
except Exception as e:
# Log the full traceback for internal debugging
print(f"Error during multi-crawl process for query '{request.query}': {traceback.format_exc()}")
# Raise HTTPException with a user-friendly message (without exposing internal details like specific arguments)
raise HTTPException(status_code=500, detail=f"Multi-crawl process failed: An internal error occurred during crawling setup or execution. Original error type: {type(e).__name__}")
# --- Run Application ---
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=7860) # Removed --workers here, let Docker/deployment handle scaling if needed.