Greg-House commited on
Commit
146624d
·
verified ·
1 Parent(s): 76f3cde

Create app.py

Browse files
Files changed (1) hide show
  1. app.py +202 -0
app.py ADDED
@@ -0,0 +1,202 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import asyncio
2
+ import aiohttp
3
+ import time
4
+ import ssl
5
+ from datetime import datetime
6
+ from collections import deque
7
+ import json
8
+ from pathlib import Path
9
+
10
+ class EphemeralStreamReader:
11
+ def __init__(self, piping_server_url="https://ppng.io", path="test123", retention_hours=1, show_output=False):
12
+ self.url = f"{piping_server_url}/{path}"
13
+ self.path = path
14
+ self.reconnect_delay = 1
15
+ self.show_output = show_output
16
+ self.retention_hours = retention_hours
17
+ self.stored_data = deque()
18
+ self.last_cleanup = time.time()
19
+
20
+ def cleanup_old_data(self):
21
+ current_time = time.time()
22
+ retention_seconds = self.retention_hours * 3600
23
+ while self.stored_data and (current_time - self.stored_data[0]['timestamp']) > retention_seconds:
24
+ self.stored_data.popleft()
25
+
26
+ def get_stored_data(self):
27
+ self.cleanup_old_data()
28
+ return list(self.stored_data)
29
+
30
+ def store_chunk(self, data, timestamp):
31
+ self.stored_data.append({
32
+ 'timestamp': timestamp,
33
+ 'data': data,
34
+ 'formatted_time': datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S')
35
+ })
36
+
37
+ if time.time() - self.last_cleanup > 60:
38
+ self.cleanup_old_data()
39
+ self.last_cleanup = time.time()
40
+
41
+ async def start_reading(self):
42
+ while True:
43
+ try:
44
+ async with aiohttp.ClientSession() as session:
45
+ if self.show_output:
46
+ print(f"Connecting to {self.url}...")
47
+ async with session.get(self.url) as response:
48
+ if response.status == 200:
49
+ if self.show_output:
50
+ print("Connected! Reading stream...")
51
+
52
+ while True:
53
+ chunk = await response.content.read(1024)
54
+ if not chunk:
55
+ break
56
+
57
+ current_time = time.time()
58
+ try:
59
+ text = chunk.decode('utf-8')
60
+ self.store_chunk(text, current_time)
61
+
62
+ if self.show_output:
63
+ timestamp = datetime.fromtimestamp(current_time).strftime('%Y-%m-%d %H:%M:%S')
64
+ print(f"[{timestamp}] Received: {text}", end='', flush=True)
65
+ except UnicodeDecodeError:
66
+ self.store_chunk(str(chunk), current_time)
67
+ if self.show_output:
68
+ print(f"Received raw bytes: {chunk}")
69
+
70
+ except aiohttp.ClientError as e:
71
+ if self.show_output:
72
+ print(f"Connection error: {e}")
73
+ except Exception as e:
74
+ if self.show_output:
75
+ print(f"Error: {e}")
76
+
77
+ if self.show_output:
78
+ print(f"Connection closed. Reconnecting in {self.reconnect_delay} seconds...")
79
+ await asyncio.sleep(self.reconnect_delay)
80
+
81
+ async def start_http_server(self, host='localhost', port=8000, ssl_context=None):
82
+ from aiohttp import web
83
+
84
+ async def handle_get(request):
85
+ data = self.get_stored_data()
86
+ html = f"""
87
+ <!DOCTYPE html>
88
+ <html>
89
+ <head>
90
+ <title>Ephemeral Stream Data</title>
91
+ <style>
92
+ body {{
93
+ font-family: Arial, sans-serif;
94
+ margin: 20px;
95
+ }}
96
+ .data-container {{
97
+ margin: 20px 0;
98
+ }}
99
+ .timestamp {{
100
+ color: #666;
101
+ font-size: 0.9em;
102
+ }}
103
+ .data-item {{
104
+ margin: 10px 0;
105
+ padding: 10px;
106
+ background: #f5f5f5;
107
+ border-radius: 4px;
108
+ }}
109
+ .refresh-btn {{
110
+ padding: 10px 20px;
111
+ background: #4CAF50;
112
+ color: white;
113
+ border: none;
114
+ border-radius: 4px;
115
+ cursor: pointer;
116
+ }}
117
+ .info {{
118
+ margin-bottom: 20px;
119
+ color: #666;
120
+ }}
121
+ </style>
122
+ <script>
123
+ function refreshData() {{
124
+ window.location.reload();
125
+ }}
126
+ </script>
127
+ </head>
128
+ <body>
129
+ <h1>Ephemeral Stream Data</h1>
130
+ <div class="info">
131
+ <p>Stream Path: {self.path}</p>
132
+ <p>Retention Time: {self.retention_hours} hours</p>
133
+ <p>To write to this stream: <code>seq inf | curl -T- {self.url}</code></p>
134
+ </div>
135
+ <button class="refresh-btn" onclick="refreshData()">Refresh Data</button>
136
+ <div class="data-container">
137
+ """
138
+
139
+ for item in reversed(data):
140
+ html += f"""
141
+ <div class="data-item">
142
+ <div class="timestamp">{item['formatted_time']}</div>
143
+ <pre>{item['data']}</pre>
144
+ </div>
145
+ """
146
+
147
+ html += """
148
+ </div>
149
+ </body>
150
+ </html>
151
+ """
152
+ return web.Response(text=html, content_type='text/html')
153
+
154
+ app = web.Application()
155
+ app.router.add_get('/', handle_get)
156
+
157
+ runner = web.AppRunner(app)
158
+ await runner.setup()
159
+ site = web.TCPSite(runner, host, port, ssl_context=ssl_context)
160
+ await site.start()
161
+
162
+ protocol = "https" if ssl_context else "http"
163
+ print(f"\nWeb interface available at {protocol}://{host}:{port}")
164
+ print(f"To write to this stream: seq inf | curl -T- {self.url}")
165
+
166
+ def create_ssl_context():
167
+ """Create SSL context from certificate and key files"""
168
+ cert_path = Path("./localhost.pem")
169
+ key_path = Path("./localhost-key.pem")
170
+
171
+ if not cert_path.exists() or not key_path.exists():
172
+ print("Warning: SSL certificate or key not found. Running in HTTP mode.")
173
+ return None
174
+
175
+ ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
176
+ ssl_context.load_cert_chain(str(cert_path), str(key_path))
177
+ return ssl_context
178
+
179
+ async def main():
180
+ ssl_context = create_ssl_context()
181
+ reader = EphemeralStreamReader(retention_hours=1, show_output=False)
182
+
183
+ await asyncio.gather(
184
+ reader.start_reading(),
185
+ reader.start_http_server(host='localhost', port=8000, ssl_context=ssl_context)
186
+ )
187
+
188
+ if __name__ == "__main__":
189
+ import argparse
190
+
191
+ parser = argparse.ArgumentParser(description='Ephemeral Stream Reader with HTTPS support')
192
+ parser.add_argument('--host', default='localhost', help='Host to bind to')
193
+ parser.add_argument('--port', type=int, default=8000, help='Port to bind to')
194
+ parser.add_argument('--path', default='test123', help='Path for the stream')
195
+
196
+ args = parser.parse_args()
197
+
198
+ print("Starting Ephemeral Stream Reader with HTTPS support...")
199
+ try:
200
+ asyncio.run(main())
201
+ except KeyboardInterrupt:
202
+ print("\nStopping server...")