Spaces:
Runtime error
Runtime error
Update components/real_time_data.py
Browse files- components/real_time_data.py +19 -5
components/real_time_data.py
CHANGED
|
@@ -1,26 +1,40 @@
|
|
| 1 |
import aiohttp
|
|
|
|
| 2 |
from typing import List, Dict, Any
|
| 3 |
|
| 4 |
class RealTimeDataIntegrator:
|
| 5 |
"""Integrates real-time data for up-to-date responses"""
|
|
|
|
| 6 |
def __init__(self):
|
| 7 |
self.session = aiohttp.ClientSession()
|
| 8 |
|
| 9 |
async def fetch_and_integrate(self, urls: List[str]) -> Dict[str, Any]:
|
| 10 |
"""Fetch data from multiple sources and integrate it"""
|
| 11 |
tasks = [self.fetch_data(url) for url in urls]
|
| 12 |
-
results = await asyncio.gather(*tasks)
|
| 13 |
integrated_data = self.integrate_data(results)
|
| 14 |
return integrated_data
|
| 15 |
|
| 16 |
async def fetch_data(self, url: str) -> Dict[str, Any]:
|
| 17 |
"""Fetch data from an external API"""
|
| 18 |
-
|
| 19 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
| 20 |
|
| 21 |
def integrate_data(self, data: List[Dict[str, Any]]) -> Dict[str, Any]:
|
| 22 |
"""Integrate data from multiple sources"""
|
| 23 |
integrated_data = {}
|
| 24 |
for item in data:
|
| 25 |
-
|
| 26 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1 |
import aiohttp
|
| 2 |
+
import asyncio
|
| 3 |
from typing import List, Dict, Any
|
| 4 |
|
| 5 |
class RealTimeDataIntegrator:
|
| 6 |
"""Integrates real-time data for up-to-date responses"""
|
| 7 |
+
|
| 8 |
def __init__(self):
|
| 9 |
self.session = aiohttp.ClientSession()
|
| 10 |
|
| 11 |
async def fetch_and_integrate(self, urls: List[str]) -> Dict[str, Any]:
|
| 12 |
"""Fetch data from multiple sources and integrate it"""
|
| 13 |
tasks = [self.fetch_data(url) for url in urls]
|
| 14 |
+
results = await asyncio.gather(*tasks, return_exceptions=True)
|
| 15 |
integrated_data = self.integrate_data(results)
|
| 16 |
return integrated_data
|
| 17 |
|
| 18 |
async def fetch_data(self, url: str) -> Dict[str, Any]:
|
| 19 |
"""Fetch data from an external API"""
|
| 20 |
+
try:
|
| 21 |
+
async with self.session.get(url) as response:
|
| 22 |
+
response.raise_for_status() # Raise an error for bad status codes
|
| 23 |
+
return await response.json()
|
| 24 |
+
except Exception as e:
|
| 25 |
+
return {"error": str(e), "url": url}
|
| 26 |
|
| 27 |
def integrate_data(self, data: List[Dict[str, Any]]) -> Dict[str, Any]:
|
| 28 |
"""Integrate data from multiple sources"""
|
| 29 |
integrated_data = {}
|
| 30 |
for item in data:
|
| 31 |
+
if isinstance(item, dict):
|
| 32 |
+
integrated_data.update(item)
|
| 33 |
+
else:
|
| 34 |
+
# Handle the case where fetch_data returned an error
|
| 35 |
+
integrated_data.setdefault("errors", []).append(item)
|
| 36 |
+
return integrated_data
|
| 37 |
+
|
| 38 |
+
async def close_session(self):
|
| 39 |
+
"""Close the aiohttp session"""
|
| 40 |
+
await self.session.close()
|