Numidium / app /api /routes /ingest.py
Madras1's picture
Upload 45 files
e7f9d7f verified
"""
Data Ingestion Routes
Endpoints para importar dados de fontes externas
"""
from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks
from sqlalchemy.orm import Session
from typing import Optional, List
from app.core.database import get_db
from app.models import Entity, Document
from app.schemas import EntityResponse, DocumentResponse
from app.services.ingestion import wikipedia_scraper, news_service
router = APIRouter(prefix="/ingest", tags=["Data Ingestion"])
# ========== Wikipedia ==========
@router.get("/wikipedia/search")
def search_wikipedia(q: str, limit: int = 10):
"""Busca artigos na Wikipedia"""
results = wikipedia_scraper.search(q, limit)
return results
@router.post("/wikipedia/entity", response_model=EntityResponse)
def import_from_wikipedia(
title: str,
entity_type: str = "person",
db: Session = Depends(get_db)
):
"""
Importa uma entidade da Wikipedia
entity_type: person, organization, location
"""
# Check if entity already exists
existing = db.query(Entity).filter(
Entity.name == title,
Entity.source == "wikipedia"
).first()
if existing:
return existing
# Scrape based on type
if entity_type == "person":
data = wikipedia_scraper.scrape_person(title)
elif entity_type == "organization":
data = wikipedia_scraper.scrape_organization(title)
elif entity_type == "location":
data = wikipedia_scraper.scrape_location(title)
else:
data = wikipedia_scraper.scrape_person(title) # default
if not data:
raise HTTPException(status_code=404, detail="Article not found on Wikipedia")
# Create entity
entity = Entity(**data)
db.add(entity)
db.commit()
db.refresh(entity)
return entity
# ========== News ==========
@router.get("/news/feeds")
def list_available_feeds():
"""Lista os feeds de notícias disponíveis"""
return list(news_service.RSS_FEEDS.keys())
@router.get("/news/fetch")
def fetch_news(feed: Optional[str] = None):
"""
Busca notícias dos feeds RSS
Se feed não for especificado, busca de todos
"""
if feed:
if feed not in news_service.RSS_FEEDS:
raise HTTPException(status_code=404, detail="Feed not found")
url = news_service.RSS_FEEDS[feed]
articles = news_service.fetch_feed(url)
else:
articles = news_service.fetch_all_feeds()
return articles
@router.get("/news/search")
def search_news(q: str):
"""Busca notícias por palavra-chave via Google News"""
return news_service.search_news(q)
@router.post("/news/import")
def import_news(
query: Optional[str] = None,
feed: Optional[str] = None,
db: Session = Depends(get_db)
):
"""
Importa notícias como documentos no sistema
"""
if query:
articles = news_service.search_news(query)
elif feed:
if feed not in news_service.RSS_FEEDS:
raise HTTPException(status_code=404, detail="Feed not found")
articles = news_service.fetch_feed(news_service.RSS_FEEDS[feed])
else:
articles = news_service.fetch_all_feeds()
imported = 0
for article in articles:
# Check if document already exists (by URL)
if article.get("url"):
existing = db.query(Document).filter(
Document.source_url == article["url"]
).first()
if existing:
continue
doc_data = news_service.to_document(article)
doc = Document(**doc_data)
db.add(doc)
imported += 1
db.commit()
return {"message": f"Imported {imported} articles", "total_found": len(articles)}
# ========== Manual Import ==========
@router.post("/bulk/entities")
def bulk_import_entities(
entities: List[dict],
db: Session = Depends(get_db)
):
"""
Importa múltiplas entidades de uma vez
Útil para importar de CSV/JSON
"""
imported = 0
for entity_data in entities:
entity = Entity(
type=entity_data.get("type", "unknown"),
name=entity_data.get("name", "Unnamed"),
description=entity_data.get("description"),
properties=entity_data.get("properties", {}),
latitude=entity_data.get("latitude"),
longitude=entity_data.get("longitude"),
source=entity_data.get("source", "manual")
)
db.add(entity)
imported += 1
db.commit()
return {"message": f"Imported {imported} entities"}