| import gradio as gr | |
| import asyncio, aiohttp | |
| import threading, logging | |
| from datetime import datetime | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| URLS = [ | |
| 'https://asfag654-g.hf.space', | |
| 'https://asfag654-libretranslate.hf.space', | |
| 'https://asfag654-kk.hf.space', | |
| 'https://asfag654-t.hf.space', | |
| 'https://asfag654-b.hf.space', | |
| 'https://imluochen-sleepy.hf.space', | |
| 'https://7d820-ss.hf.space', | |
| 'https://7d820-kk.hf.space' | |
| ] | |
| class Monitor: | |
| def __init__(self, urls): | |
| self.urls = urls | |
| self.status = {url: [] for url in urls} | |
| self.check_interval = 3600 | |
| self._stop_event = threading.Event() | |
| def record(self, url, success): | |
| now = datetime.now().strftime('%Y-%m-%d %H:%M:%S') | |
| self.status[url].append((now, success)) | |
| if len(self.status[url]) > 100: | |
| self.status[url] = self.status[url][-100:] | |
| def get_current(self): | |
| return [[url, *([f"{t} β " if s else f"{t} β"][-1:])] for url, logs in self.status.items() for t, s in logs[-1:]] | |
| def get_stats(self, selected=None): | |
| table = [] | |
| for url, logs in self.status.items(): | |
| if selected and selected not in url: continue | |
| total = len(logs) | |
| ok = sum(1 for _, s in logs if s) | |
| pct = f"{ok / total:.1%}" if total else "N/A" | |
| table.append([url, total, ok, pct]) | |
| return sorted(table, key=lambda x: -x[2]) | |
| async def check_url(self, url): | |
| try: | |
| async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(10)) as session: | |
| async with session.get(url): | |
| self.record(url, True) | |
| except: | |
| self.record(url, False) | |
| async def run_check(self): | |
| await asyncio.gather(*(self.check_url(url) for url in self.urls)) | |
| def start(self): | |
| def loop(): | |
| while not self._stop_event.is_set(): | |
| asyncio.run(self.run_check()) | |
| self._stop_event.wait(self.check_interval) | |
| threading.Thread(target=loop, daemon=True).start() | |
| monitor = Monitor(URLS) | |
| monitor.start() | |
| def refresh_all(): | |
| logger.info("ε·ζ°ηΆζ") | |
| return monitor.get_current(), monitor.get_stats() | |
| def get_filtered_stats(sel): | |
| return monitor.get_stats(sel) | |
| with gr.Blocks(title="ζε‘ηΆζηζ§") as demo: | |
| gr.Markdown("## β ηΆζηζ§\nζ―ε°ζΆθͺε¨ζ£ζ΅ζε‘ηΆζγ") | |
| status_table = gr.Dataframe(headers=["URL", "ζθΏηΆζ"], interactive=False) | |
| stats_table = gr.Dataframe(headers=["URL", "ζ£ζ΅ζ°", "ζε", "ζεη"], interactive=False) | |
| with gr.Row(): | |
| sel = gr.Dropdown(choices=[u.split('//')[1].split('.')[0] for u in URLS], label="ηι") | |
| btn = gr.Button("π ε·ζ°ηΆζ") | |
| sel.change(get_filtered_stats, inputs=sel, outputs=stats_table) | |
| btn.click(refresh_all, outputs=[status_table, stats_table]) | |
| demo.load(refresh_all, outputs=[status_table, stats_table]) | |
| gr.HTML(""" | |
| <script> | |
| setInterval(() => { | |
| const btn = [...document.querySelectorAll('button')].find(b => b.innerText.includes('ε·ζ°')); | |
| if (btn) btn.click(); | |
| }, 30000); | |
| </script> | |
| """) | |
| demo.launch() |