Spaces:
Paused
Paused
| import asyncio | |
| import aiohttp | |
| import time | |
| from datetime import datetime | |
| from collections import deque, defaultdict | |
| import json | |
| import secrets | |
| from aiohttp import web | |
| class StreamManager: | |
| def __init__(self): | |
| self.streams = {} # Dictionary to store all stream instances | |
| self.tokens = {} # Dictionary to store stream tokens | |
| self.tasks = {} # Dictionary to store stream tasks | |
| async def create_stream(self, retention_seconds): | |
| # Generate unique stream ID and token | |
| stream_id = secrets.token_urlsafe(8) | |
| access_token = secrets.token_urlsafe(16) | |
| # Create new stream instance | |
| stream = EphemeralStreamReader( | |
| path=stream_id, | |
| retention_seconds=retention_seconds, | |
| show_output=True | |
| ) | |
| self.streams[stream_id] = stream | |
| self.tokens[stream_id] = access_token | |
| # Start the stream reader task | |
| self.tasks[stream_id] = asyncio.create_task(stream.start_reading()) | |
| return stream_id, access_token | |
| def get_stream(self, stream_id, token): | |
| if stream_id in self.streams and self.tokens.get(stream_id) == token: | |
| return self.streams[stream_id] | |
| return None | |
| async def cleanup_inactive_streams(self): | |
| while True: | |
| current_time = time.time() | |
| inactive_streams = [] | |
| for stream_id, stream in self.streams.items(): | |
| # Clean up streams that haven't been accessed in 1 hour | |
| if current_time - stream.last_access > 3600: | |
| inactive_streams.append(stream_id) | |
| for stream_id in inactive_streams: | |
| if stream_id in self.tasks: | |
| self.tasks[stream_id].cancel() | |
| del self.tasks[stream_id] | |
| del self.streams[stream_id] | |
| del self.tokens[stream_id] | |
| await asyncio.sleep(300) # Check every 5 minutes | |
| class EphemeralStreamReader: | |
| def __init__(self, piping_server_url="https://ppng.io", path="test123", retention_seconds=3600, show_output=False): | |
| self.url = f"{piping_server_url}/{path}" | |
| self.path = path | |
| self.reconnect_delay = 1 | |
| self.show_output = show_output | |
| self.retention_seconds = min(3600, max(0, retention_seconds)) # Limit between 0 and 3600 seconds | |
| self.stored_data = deque() | |
| self.last_cleanup = time.time() | |
| self.last_access = time.time() | |
| def cleanup_old_data(self): | |
| current_time = time.time() | |
| while self.stored_data and (current_time - self.stored_data[0]['timestamp']) > self.retention_seconds: | |
| self.stored_data.popleft() | |
| def get_stored_data(self): | |
| self.cleanup_old_data() | |
| self.last_access = time.time() | |
| return list(self.stored_data) | |
| def store_chunk(self, data, timestamp): | |
| self.stored_data.append({ | |
| 'timestamp': timestamp, | |
| 'data': data, | |
| 'formatted_time': datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S') | |
| }) | |
| if time.time() - self.last_cleanup > 60: | |
| self.cleanup_old_data() | |
| self.last_cleanup = time.time() | |
| async def start_reading(self): | |
| while True: | |
| try: | |
| async with aiohttp.ClientSession() as session: | |
| if self.show_output: | |
| print(f"Connecting to {self.url}...") | |
| async with session.get(self.url) as response: | |
| if response.status == 200: | |
| if self.show_output: | |
| print("Connected! Reading stream...") | |
| while True: | |
| chunk = await response.content.read(1024) | |
| if not chunk: | |
| break | |
| current_time = time.time() | |
| try: | |
| text = chunk.decode('utf-8') | |
| self.store_chunk(text, current_time) | |
| except UnicodeDecodeError: | |
| self.store_chunk(str(chunk), current_time) | |
| except aiohttp.ClientError as e: | |
| if self.show_output: | |
| print(f"Connection error: {e}") | |
| except Exception as e: | |
| if self.show_output: | |
| print(f"Error: {e}") | |
| # Always wait a bit before reconnecting | |
| await asyncio.sleep(self.reconnect_delay) | |
| class WebServer: | |
| def __init__(self, stream_manager): | |
| self.stream_manager = stream_manager | |
| async def handle_create_stream(self, request): | |
| try: | |
| data = await request.post() | |
| retention_minutes = float(data.get('retention_minutes', '60')) | |
| retention_seconds = min(3600, max(0, retention_minutes * 60)) | |
| stream_id, token = await self.stream_manager.create_stream(retention_seconds) | |
| html = self.create_stream_page(stream_id, token) | |
| return web.Response(text=html, content_type='text/html') | |
| except Exception as e: | |
| return web.Response(text=f"Error creating stream: {str(e)}", status=400) | |
| async def handle_view_stream(self, request): | |
| stream_id = request.match_info.get('stream_id') | |
| token = request.query.get('token') | |
| stream = self.stream_manager.get_stream(stream_id, token) | |
| if not stream: | |
| return web.Response(text="Invalid stream ID or token", status=403) | |
| html = self.create_viewer_page(stream, token) | |
| return web.Response(text=html, content_type='text/html') | |
| async def handle_home(self, request): | |
| return web.Response(text=self.create_home_page(), content_type='text/html') | |
| def create_home_page(self): | |
| return """ | |
| <!DOCTYPE html> | |
| <html> | |
| <head> | |
| <title>Create Ephemeral Stream</title> | |
| <style> | |
| body { | |
| font-family: Arial, sans-serif; | |
| margin: 20px; | |
| max-width: 800px; | |
| margin: 0 auto; | |
| padding: 20px; | |
| } | |
| .form-container { | |
| margin: 20px 0; | |
| padding: 20px; | |
| border: 1px solid #ddd; | |
| border-radius: 4px; | |
| } | |
| .input-group { | |
| margin: 10px 0; | |
| } | |
| input, button { | |
| padding: 8px; | |
| margin: 5px 0; | |
| } | |
| button { | |
| background: #4CAF50; | |
| color: white; | |
| border: none; | |
| border-radius: 4px; | |
| cursor: pointer; | |
| } | |
| .info { | |
| margin: 20px 0; | |
| padding: 15px; | |
| background: #f5f5f5; | |
| border-radius: 4px; | |
| } | |
| </style> | |
| </head> | |
| <body> | |
| <h1>Create New Ephemeral Stream</h1> | |
| <div class="form-container"> | |
| <form method="POST" action="/create"> | |
| <div class="input-group"> | |
| <label for="retention_minutes">Retention Time (minutes, max 60):</label><br> | |
| <input type="number" id="retention_minutes" name="retention_minutes" | |
| min="0" max="60" value="60" step="0.5" required> | |
| </div> | |
| <button type="submit">Create Stream</button> | |
| </form> | |
| </div> | |
| <div class="info"> | |
| <h3>About Ephemeral Streams</h3> | |
| <p>Create your private stream with custom retention time. Each stream:</p> | |
| <ul> | |
| <li>Has a unique ID and access token</li> | |
| <li>Can only be viewed with the correct token</li> | |
| <li>Automatically deletes data older than the specified retention time</li> | |
| <li>Supports retention times from 0 to 60 minutes</li> | |
| </ul> | |
| </div> | |
| </body> | |
| </html> | |
| """ | |
| def create_stream_page(self, stream_id, token): | |
| view_url = f"/view/{stream_id}?token={token}" | |
| write_url = f"https://ppng.io/{stream_id}" | |
| return f""" | |
| <!DOCTYPE html> | |
| <html> | |
| <head> | |
| <title>Stream Created</title> | |
| <style> | |
| body {{ | |
| font-family: Arial, sans-serif; | |
| margin: 20px; | |
| max-width: 800px; | |
| margin: 0 auto; | |
| padding: 20px; | |
| }} | |
| .info-box {{ | |
| background: #f5f5f5; | |
| padding: 20px; | |
| border-radius: 4px; | |
| margin: 20px 0; | |
| }} | |
| .code-box {{ | |
| background: #2b2b2b; | |
| color: #ffffff; | |
| padding: 15px; | |
| border-radius: 4px; | |
| font-family: monospace; | |
| white-space: pre-wrap; | |
| word-break: break-all; | |
| }} | |
| .button {{ | |
| display: inline-block; | |
| padding: 10px 20px; | |
| background: #4CAF50; | |
| color: white; | |
| text-decoration: none; | |
| border-radius: 4px; | |
| margin: 10px 0; | |
| }} | |
| </style> | |
| </head> | |
| <body> | |
| <h1>Stream Created Successfully</h1> | |
| <div class="info-box"> | |
| <h3>Your Stream Information</h3> | |
| <p><strong>Stream ID:</strong> {stream_id}</p> | |
| <p><strong>Access Token:</strong> {token}</p> | |
| <p><strong>View URL:</strong> <a href="{view_url}">{view_url}</a></p> | |
| <h3>How to Write to Your Stream</h3> | |
| <p>Use curl to write to your stream:</p> | |
| <div class="code-box"> | |
| seq inf | curl -T- {write_url} | |
| </div> | |
| <h3>Important Notes</h3> | |
| <ul> | |
| <li>Keep your access token secret</li> | |
| <li>Bookmark the view URL for easy access</li> | |
| <li>Data older than your specified retention time will be automatically deleted</li> | |
| </ul> | |
| </div> | |
| <a href="{view_url}" class="button">View Your Stream</a> | |
| </body> | |
| </html> | |
| """ | |
| def create_viewer_page(self, stream, token): | |
| html = f""" | |
| <!DOCTYPE html> | |
| <html> | |
| <head> | |
| <title>Stream Viewer</title> | |
| <style> | |
| body {{ | |
| font-family: Arial, sans-serif; | |
| margin: 20px; | |
| }} | |
| .data-container {{ | |
| margin: 20px 0; | |
| }} | |
| .timestamp {{ | |
| color: #666; | |
| font-size: 0.9em; | |
| }} | |
| .data-item {{ | |
| margin: 10px 0; | |
| padding: 10px; | |
| background: #f5f5f5; | |
| border-radius: 4px; | |
| }} | |
| .refresh-btn {{ | |
| padding: 10px 20px; | |
| background: #4CAF50; | |
| color: white; | |
| border: none; | |
| border-radius: 4px; | |
| cursor: pointer; | |
| }} | |
| .info {{ | |
| margin-bottom: 20px; | |
| color: #666; | |
| }} | |
| .code-box {{ | |
| background: #2b2b2b; | |
| color: #ffffff; | |
| padding: 15px; | |
| border-radius: 4px; | |
| font-family: monospace; | |
| margin: 10px 0; | |
| white-space: pre-wrap; | |
| word-break: break-all; | |
| }} | |
| </style> | |
| <script> | |
| function refreshData() {{ | |
| window.location.reload(); | |
| }} | |
| // Auto refresh every 5 seconds | |
| setInterval(refreshData, 5000); | |
| </script> | |
| </head> | |
| <body> | |
| <h1>Stream Viewer</h1> | |
| <div class="info"> | |
| <p>Stream ID: {stream.path}</p> | |
| <p>Retention Time: {stream.retention_seconds / 60:.1f} minutes</p> | |
| <div class="code-box">To write to this stream: seq inf | curl -T- {stream.url}</div> | |
| </div> | |
| <button class="refresh-btn" onclick="refreshData()">Refresh Data</button> | |
| <div class="data-container"> | |
| """ | |
| data = stream.get_stored_data() | |
| for item in reversed(data): | |
| html += f""" | |
| <div class="data-item"> | |
| <div class="timestamp">{item['formatted_time']}</div> | |
| <pre>{item['data']}</pre> | |
| </div> | |
| """ | |
| html += """ | |
| </div> | |
| </body> | |
| </html> | |
| """ | |
| return html | |
| async def main(): | |
| stream_manager = StreamManager() | |
| web_server = WebServer(stream_manager) | |
| app = web.Application() | |
| app.router.add_get('/', web_server.handle_home) | |
| app.router.add_post('/create', web_server.handle_create_stream) | |
| app.router.add_get('/view/{stream_id}', web_server.handle_view_stream) | |
| runner = web.AppRunner(app) | |
| await runner.setup() | |
| site = web.TCPSite(runner, '0.0.0.0', 7860) | |
| await site.start() | |
| print("\nWeb interface available at http://0.0.0.0:7860") | |
| # Start the cleanup task | |
| cleanup_task = asyncio.create_task(stream_manager.cleanup_inactive_streams()) | |
| try: | |
| await asyncio.Event().wait() # Keep the server running indefinitely | |
| finally: | |
| cleanup_task.cancel() | |
| if __name__ == "__main__": | |
| print("Starting Ephemeral Stream Server...") | |
| try: | |
| asyncio.run(main()) | |
| except KeyboardInterrupt: | |
| print("\nStopping server...") |