File size: 6,670 Bytes
6d6b8af
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
"""

This module provides real-time data integration and processing capabilities.

"""

import asyncio
import aiohttp
import json
from typing import Dict, Any, List, Optional
from datetime import datetime

class RealTimeDataIntegrator:
    def __init__(self):
        self.data_sources = {}
        self.active_streams = {}
        self.data_buffer = {}
        self.session = None

    async def initialize(self):
        """

        Initialize the data integrator with a new aiohttp session.

        """
        if not self.session:
            self.session = aiohttp.ClientSession()

    async def add_data_source(self, source_id: str, config: Dict[str, Any]) -> bool:
        """

        Add a new data source for real-time integration.

        

        Args:

            source_id: Unique identifier for the data source

            config: Configuration for the data source

            

        Returns:

            Success status

        """
        try:
            self.data_sources[source_id] = {
                "config": config,
                "status": "configured",
                "last_update": None,
                "error_count": 0
            }
            return True
        except Exception as e:
            print(f"Error adding data source: {e}")
            return False

    async def start_stream(self, source_id: str) -> bool:
        """

        Start streaming data from a specific source.

        

        Args:

            source_id: ID of the data source to stream from

            

        Returns:

            Success status

        """
        if source_id not in self.data_sources:
            return False
            
        try:
            # Initialize buffer for this stream
            self.data_buffer[source_id] = []
            
            # Start async streaming task
            self.active_streams[source_id] = asyncio.create_task(
                self._stream_data(source_id)
            )
            
            return True
        except Exception as e:
            print(f"Error starting stream: {e}")
            return False

    async def stop_stream(self, source_id: str) -> bool:
        """

        Stop streaming from a specific source.

        

        Args:

            source_id: ID of the data source to stop

            

        Returns:

            Success status

        """
        if source_id in self.active_streams:
            try:
                self.active_streams[source_id].cancel()
                del self.active_streams[source_id]
                return True
            except Exception as e:
                print(f"Error stopping stream: {e}")
                
        return False

    async def get_latest_data(self, source_id: str) -> Optional[Dict[str, Any]]:
        """

        Get the latest data from a specific source.

        

        Args:

            source_id: ID of the data source to query

            

        Returns:

            Latest data point if available

        """
        if source_id in self.data_buffer and self.data_buffer[source_id]:
            return self.data_buffer[source_id][-1]
        return None

    async def _stream_data(self, source_id: str):
        """

        Internal method to handle continuous data streaming.

        """
        config = self.data_sources[source_id]["config"]
        url = config.get("url")
        interval = config.get("interval", 1.0)  # Default to 1 second
        
        while True:
            try:
                if not self.session:
                    await self.initialize()
                
                async with self.session.get(url) as response:
                    if response.status == 200:
                        data = await response.json()
                        self._process_data(source_id, data)
                    else:
                        self.data_sources[source_id]["error_count"] += 1
                        
            except Exception as e:
                print(f"Stream error for {source_id}: {e}")
                self.data_sources[source_id]["error_count"] += 1
                
            await asyncio.sleep(interval)

    def _process_data(self, source_id: str, data: Dict[str, Any]):
        """

        Process incoming data from a stream.

        """
        # Add timestamp
        processed_data = {
            "timestamp": datetime.now().isoformat(),
            "data": data
        }
        
        # Update buffer
        self.data_buffer[source_id].append(processed_data)
        
        # Limit buffer size
        max_buffer = self.data_sources[source_id]["config"].get("buffer_size", 1000)
        if len(self.data_buffer[source_id]) > max_buffer:
            self.data_buffer[source_id] = self.data_buffer[source_id][-max_buffer:]
            
        # Update source status
        self.data_sources[source_id]["last_update"] = processed_data["timestamp"]
        self.data_sources[source_id]["status"] = "active"

    def get_source_status(self, source_id: str) -> Dict[str, Any]:
        """

        Get status information for a data source.

        

        Args:

            source_id: ID of the data source to query

            

        Returns:

            Status information for the source

        """
        if source_id not in self.data_sources:
            return {"status": "not_found"}
            
        source = self.data_sources[source_id]
        return {
            "status": source["status"],
            "last_update": source["last_update"],
            "error_count": source["error_count"],
            "buffer_size": len(self.data_buffer.get(source_id, []))
        }

    async def clear_data(self, source_id: str) -> bool:
        """

        Clear stored data for a specific source.

        

        Args:

            source_id: ID of the data source to clear

            

        Returns:

            Success status

        """
        if source_id in self.data_buffer:
            self.data_buffer[source_id] = []
            return True
        return False

    async def shutdown(self):
        """

        Cleanup and shutdown all data streams.

        """
        # Cancel all active streams
        for source_id in list(self.active_streams.keys()):
            await self.stop_stream(source_id)
            
        # Close aiohttp session
        if self.session:
            await self.session.close()
            self.session = None
            
        # Clear buffers
        self.data_buffer = {}