Scrap-Dji / app.py
joel
Fix: Add missing await for reload() coroutine
4447088
#!/usr/bin/env python3
"""
Scrap-Dji - Application Hugging Face Spaces
Combine FastAPI (endpoints pour frontend) + Gradio (interface web)
"""
import os
import json
import asyncio
from datetime import datetime
from typing import List, Optional, Dict, Any
from pathlib import Path
# FastAPI
from fastapi import FastAPI, Query, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
# Gradio
import gradio as gr
# Scraper
from scraper.main import ScrapDjiScraper
from utils.logger import setup_logger
logger = setup_logger(__name__)
# ============================================================================
# CONFIGURATION
# ============================================================================
DATA_DIR = Path("/data") if os.path.exists("/data") else Path("./data")
DATA_DIR.mkdir(exist_ok=True)
DOCUMENTS_FILE = DATA_DIR / "documents.json"
SOURCES_FILE = Path("sources.json")
# ============================================================================
# FASTAPI APP - Endpoints pour Frontend
# ============================================================================
app = FastAPI(
title="Scrap-Dji API",
description="API de recherche et scraping de contenus africains",
version="2.0.0"
)
# CORS pour permettre les requêtes depuis n'importe quel frontend
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ============================================================================
# MODELS
# ============================================================================
class SearchRequest(BaseModel):
query: str
pays: Optional[str] = None
langue: Optional[str] = None
limit: int = 10
fuzzy: bool = True
class SearchResponse(BaseModel):
total: int
results: List[Dict[str, Any]]
query: str
execution_time_ms: float
class StatsResponse(BaseModel):
total_documents: int
pays: Dict[str, int]
langues: Dict[str, int]
sources: Dict[str, int]
derniere_mise_a_jour: Optional[str]
# ============================================================================
# SEARCH ENGINE - Recherche locale optimisée
# ============================================================================
# ============================================================================
# SEARCH ENGINE - Recherche MongoDB Native
# ============================================================================
from db.mongo_connector import db as mongo_db
class MongoSearchEngine:
"""Moteur de recherche connecté directement à MongoDB"""
def __init__(self):
self.collection = mongo_db["documents"]
async def reload(self):
"""Pas nécessaire avec MongoDB (temps réel)"""
pass
async def search(
self,
query: str,
pays: Optional[str] = None,
langue: Optional[str] = None,
limit: int = 20,
fuzzy: bool = True
) -> List[Dict[str, Any]]:
"""Recherche dans MongoDB avec Regex (Simple & Efficace sans Atlas Search)"""
filter_query = {}
# Filtres exacts
if pays and pays != "Tous":
filter_query["pays"] = pays
if langue and langue != "Toutes":
filter_query["langue"] = langue
# Recherche texte (Regex insensible à la case)
if query:
regex_pattern = {"$regex": query, "$options": "i"}
filter_query["$or"] = [
{"titre": regex_pattern},
{"texte": regex_pattern},
{"tags": regex_pattern}
]
try:
cursor = self.collection.find(filter_query).limit(limit).sort("date", -1)
results = await cursor.to_list(length=limit)
# Conversion ObjectId -> str
for doc in results:
if '_id' in doc:
doc['_id'] = str(doc['_id'])
# Ajout d'un score fictif pour compatibilité frontend
doc['_score'] = 1.0
return results
except Exception as e:
logger.error(f"Erreur recherche MongoDB: {e}")
return []
async def get_stats(self) -> Dict[str, Any]:
"""Retourne les statistiques agrégées depuis MongoDB"""
try:
total = await self.collection.count_documents({})
pipeline_pays = [{"$group": {"_id": "$pays", "count": {"$sum": 1}}}]
pays_data = await self.collection.aggregate(pipeline_pays).to_list(length=100)
pays_count = {d["_id"]: d["count"] for d in pays_data if d["_id"]}
pipeline_langue = [{"$group": {"_id": "$langue", "count": {"$sum": 1}}}]
langue_data = await self.collection.aggregate(pipeline_langue).to_list(length=100)
langues_count = {d["_id"]: d["count"] for d in langue_data if d["_id"]}
# Pour les sources, on fait une estimation ou on extrait le domaine
# Ici simplifié : on compte juste les total
sources_count = {"MongoDB": total}
return {
'total_documents': total,
'pays': pays_count,
'langues': langues_count,
'sources': sources_count,
'derniere_mise_a_jour': datetime.now().isoformat()
}
except Exception as e:
logger.error(f"Erreur stats MongoDB: {e}")
return {
'total_documents': 0, 'pays': {}, 'langues': {}, 'sources': {}, 'derniere_mise_a_jour': None
}
# Instance globale
search_engine = MongoSearchEngine()
# ============================================================================
# API ENDPOINTS
# ============================================================================
@app.get("/api/health")
async def health():
"""Health check"""
try:
count = await search_engine.get_stats()
status = "healthy"
except:
status = "db_error"
count = {"total_documents": 0}
return {
"status": status,
"documents_loaded": count["total_documents"],
"timestamp": datetime.now().isoformat()
}
@app.post("/api/search", response_model=SearchResponse)
async def api_search(request: SearchRequest):
"""Endpoint de recherche principal (MongoDB)"""
start_time = datetime.now()
results = await search_engine.search(
query=request.query,
pays=request.pays,
langue=request.langue,
limit=request.limit,
fuzzy=request.fuzzy
)
execution_time = (datetime.now() - start_time).total_seconds() * 1000
return SearchResponse(
total=len(results),
results=results,
query=request.query,
execution_time_ms=round(execution_time, 2)
)
@app.get("/api/search", response_model=SearchResponse)
async def api_search_get(
q: str = Query(..., description="Texte à rechercher"),
pays: Optional[str] = Query(None, description="Filtrer par pays"),
langue: Optional[str] = Query(None, description="Filtrer par langue"),
limit: int = Query(10, ge=1, le=100, description="Nombre de résultats"),
fuzzy: bool = Query(True, description="Recherche permissive")
):
"""Endpoint de recherche (GET)"""
request = SearchRequest(
query=q,
pays=pays,
langue=langue,
limit=limit,
fuzzy=fuzzy
)
return await api_search(request)
@app.get("/api/stats", response_model=StatsResponse)
async def api_stats():
"""Retourne les statistiques de la base MongoDB"""
stats = await search_engine.get_stats()
return StatsResponse(**stats)
@app.get("/api/documents")
async def api_documents(
skip: int = Query(0, ge=0),
limit: int = Query(10, ge=1, le=100)
):
"""Retourne la liste des documents (paginée)"""
cursor = search_engine.collection.find({}).skip(skip).limit(limit)
documents = await cursor.to_list(length=limit)
for doc in documents:
if '_id' in doc: doc['_id'] = str(doc['_id'])
total = await search_engine.collection.count_documents({})
return {
"total": total,
"skip": skip,
"limit": limit,
"documents": documents
}
@app.get("/api/documents/{doc_id}")
async def api_document_by_id(doc_id: str):
"""Retourne un document par son ID (champ 'id' ou '_id')"""
doc = await search_engine.collection.find_one({"id": doc_id})
if not doc:
# Essai avec ObjectId
try:
from bson import ObjectId
doc = await search_engine.collection.find_one({"_id": ObjectId(doc_id)})
except: pass
if not doc:
raise HTTPException(status_code=404, detail="Document non trouvé")
if '_id' in doc: doc['_id'] = str(doc['_id'])
return doc
@app.post("/api/reload")
async def api_reload():
"""Endpoint dummy pour compatibilité"""
return {"status": "success", "message": "MongoDB is real-time"}
# ============================================================================
# GRADIO INTERFACE
# ============================================================================
async def gradio_search(query: str, pays: str, langue: str, fuzzy: bool):
"""Fonction de recherche pour Gradio (Async wrapper)"""
if not query:
return "⚠️ Veuillez entrer une requête de recherche"
results = await search_engine.search(
query=query,
pays=pays if pays != "Tous" else None,
langue=langue if langue != "Toutes" else None,
limit=20,
fuzzy=fuzzy
)
if not results:
return f"❌ Aucun résultat pour '{query}'"
# Formatage des résultats
output = f"## 🔍 Résultats pour '{query}' ({len(results)} trouvés)\n\n"
for i, doc in enumerate(results, 1):
titre = doc.get('titre', 'Sans titre')
texte = doc.get('texte', '')[:200] + "..."
pays_doc = doc.get('pays', 'Inconnu')
source = doc.get('source_url', '#')
score = doc.get('_score', 0)
date = doc.get('date', '').split('T')[0]
output += f"### {i}. {titre}\n"
output += f"**Pays:** {pays_doc} | **Date:** {date}\n\n"
output += f"{texte}\n\n"
output += f"[🔗 Lire la source]({source})\n\n"
output += "---\n\n"
return output
async def gradio_stats():
"""Affiche les statistiques pour Gradio"""
# Fix: await pour la fonction async
stats = await search_engine.get_stats()
output = "# 📊 Statistiques de la Base de Données\n\n"
output += f"**Total de documents:** {stats['total_documents']}\n\n"
output += "## 🌍 Répartition par Pays\n\n"
if stats['pays']:
for pays, count in sorted(stats['pays'].items(), key=lambda x: x[1], reverse=True):
output += f"- **{pays}:** {count} documents\n"
else:
output += "_Aucune donnée_\n"
output += "\n## 🗣️ Répartition par Langue\n\n"
if stats['langues']:
for langue, count in sorted(stats['langues'].items(), key=lambda x: x[1], reverse=True):
output += f"- **{langue}:** {count} documents\n"
else:
output += "_Aucune donnée_\n"
output += "\n## 📰 Répartition par Source\n\n"
if stats['sources']:
for source, count in sorted(stats['sources'].items(), key=lambda x: x[1], reverse=True)[:10]:
output += f"- **{source}:** {count} documents\n"
else:
output += "_Aucune donnée_\n"
return output
async def gradio_scrape(progress=gr.Progress()):
"""Lance le scraping pour Gradio"""
progress(0, desc="Initialisation du scraping...")
try:
scraper = ScrapDjiScraper(str(SOURCES_FILE))
progress(0.3, desc="Scraping en cours...")
await scraper.run()
progress(0.8, desc="Rechargement des documents...")
await search_engine.reload()
progress(1.0, desc="Terminé!")
stats = await search_engine.get_stats()
return f"✅ Scraping terminé!\n\n**{stats['total_documents']} documents** dans la base"
except Exception as e:
logger.error(f"Erreur scraping: {e}")
return f"❌ Erreur lors du scraping: {str(e)}"
# Interface Gradio
with gr.Blocks(title="Scrap-Dji - Base de Connaissance Panafricaine", theme=gr.themes.Soft()) as gradio_app:
gr.Markdown("""
# 🌍 Scrap-Dji - Base de Connaissance Panafricaine
Système de scraping et de recherche de contenus africains (Togo, Bénin, Afrique)
""")
with gr.Tabs():
# ONGLET RECHERCHE
with gr.Tab("🔍 Recherche"):
gr.Markdown("### Recherchez dans la base de données")
with gr.Row():
search_query = gr.Textbox(
label="Requête de recherche",
placeholder="Ex: économie togo, politique bénin...",
scale=3
)
search_btn = gr.Button("🔍 Rechercher", variant="primary", scale=1)
with gr.Row():
search_pays = gr.Dropdown(
choices=["Tous", "Togo", "Bénin", "Afrique"],
value="Tous",
label="Pays"
)
search_langue = gr.Dropdown(
choices=["Toutes", "fr", "en"],
value="Toutes",
label="Langue"
)
search_fuzzy = gr.Checkbox(
value=True,
label="Recherche permissive (tolérance aux fautes)"
)
search_output = gr.Markdown()
search_btn.click(
fn=gradio_search,
inputs=[search_query, search_pays, search_langue, search_fuzzy],
outputs=search_output
)
# ONGLET SCRAPING
with gr.Tab("🚀 Scraping"):
gr.Markdown("### Lancer le scraping des sources")
scrape_btn = gr.Button("🚀 Lancer le Scraping", variant="primary", size="lg")
scrape_output = gr.Markdown()
scrape_btn.click(
fn=gradio_scrape,
outputs=scrape_output
)
# ONGLET STATISTIQUES
with gr.Tab("📊 Statistiques"):
gr.Markdown("### Statistiques de la base de données")
stats_btn = gr.Button("📊 Actualiser les Statistiques", variant="primary")
stats_output = gr.Markdown()
stats_btn.click(
fn=gradio_stats,
outputs=stats_output
)
# ONGLET API
with gr.Tab("🔌 API"):
gr.Markdown("""
### Endpoints API disponibles
L'API REST est accessible pour intégration dans votre frontend:
#### 🔍 Recherche
```
POST /api/search
GET /api/search?q=query&pays=Togo&limit=20
```
#### 📊 Statistiques
```
GET /api/stats
```
#### 📄 Documents
```
GET /api/documents?skip=0&limit=10
GET /api/documents/{id}
```
#### 🔄 Rechargement
```
POST /api/reload
```
#### ❤️ Health Check
```
GET /api/health
```
---
**Documentation interactive:** [/docs](/docs)
**Exemple de requête:**
```bash
curl -X POST "https://YOUR_SPACE.hf.space/api/search" \\
-H "Content-Type: application/json" \\
-d '{"query": "économie togo", "limit": 10, "fuzzy": true}'
```
""")
# ============================================================================
# MONTAGE GRADIO DANS FASTAPI
# ============================================================================
# Monter l'interface Gradio dans FastAPI
app = gr.mount_gradio_app(app, gradio_app, path="/")
# ============================================================================
# MAIN
# ============================================================================
if __name__ == "__main__":
import uvicorn
# Lancement de l'application
uvicorn.run(
app,
host="0.0.0.0",
port=7860, # Port par défaut pour Hugging Face Spaces
log_level="info"
)