""" Data Ingestion Routes Endpoints para importar dados de fontes externas """ from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks from sqlalchemy.orm import Session from typing import Optional, List from datetime import datetime import asyncio from app.api.deps import get_scoped_db from app.models import Entity, Document, Relationship from app.schemas import EntityResponse, DocumentResponse from app.services.ingestion import wikipedia_scraper, news_service from app.services.nlp import entity_extractor from app.services.geocoding import geocode router = APIRouter(prefix="/ingest", tags=["Data Ingestion"]) def parse_event_date(date_str): """Parse date string to datetime object""" if not date_str: return None try: # Try YYYY-MM-DD format return datetime.strptime(date_str[:10], "%Y-%m-%d") except: try: # Try YYYY format return datetime.strptime(date_str[:4], "%Y") except: return None # ========== Wikipedia ========== @router.get("/wikipedia/search") def search_wikipedia(q: str, limit: int = 10): """Busca artigos na Wikipedia""" results = wikipedia_scraper.search(q, limit) return results @router.post("/wikipedia/entity", response_model=EntityResponse) async def import_from_wikipedia( title: str, entity_type: str = "person", project_id: Optional[str] = None, auto_extract: bool = True, db: Session = Depends(get_scoped_db) ): """ Importa uma entidade da Wikipedia entity_type: person, organization, location project_id: ID do projeto para associar a entidade auto_extract: Se True, usa LLM para extrair entidades relacionadas """ # Check if entity already exists existing = db.query(Entity).filter( Entity.name == title, Entity.source == "wikipedia" ).first() if existing: return existing # Scrape based on type if entity_type == "person": data = wikipedia_scraper.scrape_person(title) elif entity_type == "organization": data = wikipedia_scraper.scrape_organization(title) elif entity_type == "location": data = wikipedia_scraper.scrape_location(title) else: data = wikipedia_scraper.scrape_person(title) # default if not data: raise HTTPException(status_code=404, detail="Article not found on Wikipedia") # Create main entity with project_id entity = Entity(**data) entity.project_id = project_id db.add(entity) db.commit() db.refresh(entity) # Auto-extract entities and relationships using LLM if auto_extract and data.get("description"): try: # Limit text to avoid token limits text_to_analyze = data["description"][:3000] result = await entity_extractor.extract(text_to_analyze) # Create extracted entities created_entities = {} for ext_entity in result.entities: # Skip if same as main entity if ext_entity.name.lower() == title.lower(): created_entities[ext_entity.name] = entity continue # Check if entity exists (by similar name) existing_ent = db.query(Entity).filter( Entity.name.ilike(f"%{ext_entity.name}%") ).first() if existing_ent: created_entities[ext_entity.name] = existing_ent else: # Get coordinates for location entities lat, lng = None, None if ext_entity.type == "location": coords = await geocode(ext_entity.name) if coords: lat, lng = coords # Parse event_date event_date = parse_event_date(getattr(ext_entity, 'event_date', None)) new_ent = Entity( name=ext_entity.name, type=ext_entity.type if ext_entity.type in ["person", "organization", "location", "event"] else "person", description=ext_entity.description or ext_entity.role, source="wikipedia_extraction", latitude=lat, longitude=lng, event_date=event_date, project_id=project_id, properties={"role": ext_entity.role, "aliases": ext_entity.aliases, "extracted_from": title} ) db.add(new_ent) db.commit() db.refresh(new_ent) created_entities[ext_entity.name] = new_ent # Create relationships for rel in result.relationships: source_ent = created_entities.get(rel.source) or db.query(Entity).filter(Entity.name.ilike(f"%{rel.source}%")).first() target_ent = created_entities.get(rel.target) or db.query(Entity).filter(Entity.name.ilike(f"%{rel.target}%")).first() if source_ent and target_ent and source_ent.id != target_ent.id: # Check if relationship exists existing_rel = db.query(Relationship).filter( Relationship.source_id == source_ent.id, Relationship.target_id == target_ent.id, Relationship.type == rel.relationship_type ).first() if not existing_rel: # Parse relationship event_date rel_event_date = parse_event_date(getattr(rel, 'event_date', None)) new_rel = Relationship( source_id=source_ent.id, target_id=target_ent.id, type=rel.relationship_type, event_date=rel_event_date, properties={"context": rel.context, "extracted_from": title} ) db.add(new_rel) db.commit() except Exception as e: print(f"NER extraction error: {e}") # Continue without extraction if it fails return entity # ========== News ========== @router.get("/news/feeds") def list_available_feeds(): """Lista os feeds de notícias disponíveis""" return list(news_service.RSS_FEEDS.keys()) @router.get("/news/fetch") def fetch_news(feed: Optional[str] = None): """ Busca notícias dos feeds RSS Se feed não for especificado, busca de todos """ if feed: if feed not in news_service.RSS_FEEDS: raise HTTPException(status_code=404, detail="Feed not found") url = news_service.RSS_FEEDS[feed] articles = news_service.fetch_feed(url) else: articles = news_service.fetch_all_feeds() return articles @router.get("/news/search") def search_news(q: str): """Busca notícias por palavra-chave via Google News""" return news_service.search_news(q) @router.post("/news/import") async def import_news( query: Optional[str] = None, feed: Optional[str] = None, auto_extract: bool = True, db: Session = Depends(get_scoped_db) ): """ Importa notícias como documentos no sistema auto_extract: Se True, usa LLM para extrair entidades de cada notícia """ if query: articles = news_service.search_news(query) elif feed: if feed not in news_service.RSS_FEEDS: raise HTTPException(status_code=404, detail="Feed not found") articles = news_service.fetch_feed(news_service.RSS_FEEDS[feed]) else: articles = news_service.fetch_all_feeds() imported = 0 extracted_entities = 0 for article in articles: # Check if document already exists (by URL) if article.get("url"): existing = db.query(Document).filter( Document.source_url == article["url"] ).first() if existing: continue doc_data = news_service.to_document(article) doc = Document(**doc_data) db.add(doc) db.commit() imported += 1 # Extract entities from article content if auto_extract: try: text_to_analyze = f"{article.get('title', '')} {article.get('description', '')}".strip() if len(text_to_analyze) >= 20: result = await entity_extractor.extract(text_to_analyze[:2000]) created_entities = {} for ext_entity in result.entities: # Check if entity exists existing_ent = db.query(Entity).filter( Entity.name.ilike(f"%{ext_entity.name}%") ).first() if existing_ent: created_entities[ext_entity.name] = existing_ent else: # Get coordinates for location entities lat, lng = None, None if ext_entity.type == "location": coords = await geocode(ext_entity.name) if coords: lat, lng = coords new_ent = Entity( name=ext_entity.name, type=ext_entity.type if ext_entity.type in ["person", "organization", "location", "event"] else "person", description=ext_entity.description or ext_entity.role, source="news_extraction", latitude=lat, longitude=lng, properties={"role": ext_entity.role, "aliases": ext_entity.aliases, "from_article": article.get('title', '')} ) db.add(new_ent) db.commit() db.refresh(new_ent) created_entities[ext_entity.name] = new_ent extracted_entities += 1 # Create relationships for rel in result.relationships: source_ent = created_entities.get(rel.source) or db.query(Entity).filter(Entity.name.ilike(f"%{rel.source}%")).first() target_ent = created_entities.get(rel.target) or db.query(Entity).filter(Entity.name.ilike(f"%{rel.target}%")).first() if source_ent and target_ent and source_ent.id != target_ent.id: existing_rel = db.query(Relationship).filter( Relationship.source_id == source_ent.id, Relationship.target_id == target_ent.id, Relationship.type == rel.relationship_type ).first() if not existing_rel: new_rel = Relationship( source_id=source_ent.id, target_id=target_ent.id, type=rel.relationship_type, properties={"context": rel.context} ) db.add(new_rel) db.commit() except Exception as e: print(f"NER extraction error for article: {e}") # Continue without extraction return { "message": f"Imported {imported} articles", "total_found": len(articles), "extracted_entities": extracted_entities } # ========== Manual Import ========== @router.post("/bulk/entities") def bulk_import_entities( entities: List[dict], db: Session = Depends(get_scoped_db) ): """ Importa múltiplas entidades de uma vez Útil para importar de CSV/JSON """ imported = 0 for entity_data in entities: entity = Entity( type=entity_data.get("type", "unknown"), name=entity_data.get("name", "Unnamed"), description=entity_data.get("description"), properties=entity_data.get("properties", {}), latitude=entity_data.get("latitude"), longitude=entity_data.get("longitude"), source=entity_data.get("source", "manual") ) db.add(entity) imported += 1 db.commit() return {"message": f"Imported {imported} entities"}