Spaces:
Sleeping
Sleeping
| import requests | |
| from src.config import API_BASE_URL | |
| class NewsLensClient: | |
| def __init__(self, base_url: str = API_BASE_URL): | |
| self.base_url = base_url | |
| def analyze(self, topic: str, top_k: int = 10) -> dict: | |
| url = f"{self.base_url}/analyze" | |
| payload = { | |
| "topic": topic, | |
| "top_k": top_k | |
| } | |
| try: | |
| response = requests.post(url, json=payload, timeout=30) | |
| response.raise_for_status() | |
| return response.json() | |
| except requests.exceptions.RequestException as e: | |
| raise RuntimeError(f"API request failed: {str(e)}") | |
| def ingest(self, topic: str, page_size: int = 10) -> dict: | |
| url = f"{self.base_url}/ingest" | |
| payload = { | |
| "topic": topic, | |
| "page_size": page_size, | |
| } | |
| try: | |
| response = requests.post(url, json=payload, timeout=45) | |
| response.raise_for_status() | |
| return response.json() | |
| except requests.exceptions.RequestException as e: | |
| raise RuntimeError(f"API request failed: {str(e)}") | |
| class DirectPipelineClient: | |
| def __init__(self): | |
| from src.analysis.rag_pipeline import NewsAnalysisPipeline | |
| self.pipeline = NewsAnalysisPipeline() | |
| def analyze(self, topic: str, top_k: int = 10) -> dict: | |
| return self.pipeline.analyze(topic, top_k) | |
| def ingest(self, topic: str, page_size: int = 10) -> dict: | |
| from src.ingestion.newsapi_client import fetch_news | |
| articles = fetch_news(topic=topic, page_size=page_size) | |
| self.pipeline.vector_store.store_articles(articles) | |
| return { | |
| "topic": topic, | |
| "articles_fetched": len(articles), | |
| "articles_stored": len(articles), | |
| "status": "success" | |
| } | |