Spaces:
Sleeping
Sleeping
| """ | |
| Token Cleaner for Videoinu Tokens | |
| Checks HuggingFace dataset for expired tokens and removes them | |
| Runs continuously every 60 minutes in background | |
| Provides a Gradio web interface for manual triggering and monitoring | |
| """ | |
| import os | |
| import time | |
| import json | |
| import logging | |
| import base64 | |
| import requests | |
| from huggingface_hub import HfApi | |
| from datetime import datetime | |
| import threading | |
| import gradio as gr | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| class TokenCleaner: | |
| def __init__(self, hf_token, dataset_name="HIRO12121212/videoinutoken"): | |
| self.hf_token = hf_token | |
| self.dataset_name = dataset_name | |
| self.hf_api = HfApi() | |
| logger.info(f"Token Cleaner initialized for dataset: {dataset_name}") | |
| def decode_jwt_token(self, token): | |
| try: | |
| parts = token.split('.') | |
| if len(parts) != 3: | |
| logger.warning("Invalid JWT format (not 3 parts)") | |
| return None | |
| payload = parts[1] | |
| padding = 4 - (len(payload) % 4) | |
| if padding != 4: | |
| payload += '=' * padding | |
| decoded_bytes = base64.urlsafe_b64decode(payload) | |
| decoded_json = json.loads(decoded_bytes) | |
| return decoded_json | |
| except Exception as e: | |
| logger.error(f"Error decoding JWT token: {str(e)}") | |
| return None | |
| def is_token_expired(self, token_data): | |
| if not token_data or 'exp' not in token_data: | |
| return True | |
| exp_timestamp = token_data['exp'] | |
| current_timestamp = int(time.time()) | |
| is_expired = current_timestamp >= exp_timestamp | |
| if is_expired: | |
| exp_date = datetime.fromtimestamp(exp_timestamp) | |
| logger.info(f"Token expired on {exp_date} (email: {token_data.get('email', 'unknown')})") | |
| return is_expired | |
| def fetch_all_tokens(self): | |
| try: | |
| download_url = f"https://huggingface.co/datasets/{self.dataset_name}/resolve/main/videoinu_tokens.json" | |
| headers = {"Authorization": f"Bearer {self.hf_token}"} | |
| response = requests.get(download_url, headers=headers, timeout=10) | |
| if response.status_code == 200: | |
| tokens_array = json.loads(response.text) | |
| logger.info(f"✅ Fetched {len(tokens_array)} tokens from dataset") | |
| return tokens_array | |
| else: | |
| logger.error(f"Failed to fetch tokens: {response.status_code}") | |
| return [] | |
| except Exception as e: | |
| logger.error(f"Error fetching tokens: {str(e)}") | |
| return [] | |
| def update_dataset(self, tokens_array): | |
| try: | |
| import tempfile | |
| with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f: | |
| json.dump(tokens_array, f, indent=2) | |
| temp_path = f.name | |
| self.hf_api.upload_file( | |
| path_or_fileobj=temp_path, | |
| path_in_repo="videoinu_tokens.json", | |
| repo_id=self.dataset_name, | |
| repo_type="dataset", | |
| token=self.hf_token | |
| ) | |
| os.unlink(temp_path) | |
| logger.info(f"✅ Updated dataset with {len(tokens_array)} tokens") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error updating dataset: {str(e)}") | |
| return False | |
| def clean_expired_tokens(self): | |
| logger.info("=" * 80) | |
| logger.info("Starting token cleaning cycle...") | |
| logger.info("=" * 80) | |
| all_tokens = self.fetch_all_tokens() | |
| if not all_tokens: | |
| logger.warning("No tokens found in dataset") | |
| return {"total": 0, "valid": 0, "expired": 0, "invalid": 0} | |
| total_count = len(all_tokens) | |
| valid_tokens = [] | |
| expired_count = 0 | |
| invalid_count = 0 | |
| logger.info(f"Checking {total_count} tokens...") | |
| for i, token_entry in enumerate(all_tokens): | |
| token = token_entry.get('token', '') | |
| email = token_entry.get('email', 'unknown') | |
| credits = token_entry.get('credits_remaining', 0) | |
| decoded = self.decode_jwt_token(token) | |
| if decoded is None: | |
| logger.warning(f"[{i+1}/{total_count}] Invalid token format: {email}") | |
| invalid_count += 1 | |
| continue | |
| if self.is_token_expired(decoded): | |
| logger.info(f"[{i+1}/{total_count}] 🗑️ Removing expired token: {email} ({credits} credits)") | |
| expired_count += 1 | |
| else: | |
| exp_timestamp = decoded.get('exp', 0) | |
| exp_date = datetime.fromtimestamp(exp_timestamp) | |
| logger.info(f"[{i+1}/{total_count}] ✅ Valid token: {email} (expires: {exp_date}, {credits} credits)") | |
| valid_tokens.append(token_entry) | |
| if expired_count > 0 or invalid_count > 0: | |
| logger.info(f"\n🧹 Cleaning dataset...") | |
| logger.info(f" Removing {expired_count} expired tokens") | |
| logger.info(f" Removing {invalid_count} invalid tokens") | |
| if self.update_dataset(valid_tokens): | |
| logger.info(f"✅ Dataset cleaned successfully!") | |
| else: | |
| logger.error(f"❌ Failed to update dataset") | |
| else: | |
| logger.info(f"\n✨ No expired tokens found - dataset is clean!") | |
| stats = { | |
| "total": total_count, | |
| "valid": len(valid_tokens), | |
| "expired": expired_count, | |
| "invalid": invalid_count, | |
| "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| } | |
| logger.info("=" * 80) | |
| logger.info("Cleaning Summary:") | |
| logger.info(f" Total tokens checked: {stats['total']}") | |
| logger.info(f" Valid tokens: {stats['valid']}") | |
| logger.info(f" Expired tokens removed: {stats['expired']}") | |
| logger.info(f" Invalid tokens removed: {stats['invalid']}") | |
| logger.info("=" * 80) | |
| return stats | |
| def run_continuous(self, interval_minutes=60): | |
| logger.info(f"🚀 Token Cleaner started - checking every {interval_minutes} minutes") | |
| while True: | |
| try: | |
| self.clean_expired_tokens() | |
| logger.info(f"\n💤 Sleeping for {interval_minutes} minutes...") | |
| logger.info(f"Next check at: {datetime.fromtimestamp(time.time() + interval_minutes * 60)}") | |
| logger.info("") | |
| time.sleep(interval_minutes * 60) | |
| except KeyboardInterrupt: | |
| logger.info("\n🛑 Token Cleaner stopped by user") | |
| break | |
| except Exception as e: | |
| logger.error(f"Error in cleaning cycle: {str(e)}") | |
| logger.info(f"Retrying in 1 minute...") | |
| time.sleep(60) | |
| # Global cleaner instance | |
| HF_TOKEN = os.getenv("HF_TOKEN") | |
| if not HF_TOKEN: | |
| logger.error("HF_TOKEN environment variable not set!") | |
| exit(1) | |
| cleaner = TokenCleaner( | |
| hf_token=HF_TOKEN, | |
| dataset_name="HIRO12121212/videoinutoken" | |
| ) | |
| # Start background thread | |
| threading.Thread(target=cleaner.run_continuous, args=(60,), daemon=True).start() | |
| # Gradio functions (no arguments needed) | |
| def clean_now(): | |
| stats = cleaner.clean_expired_tokens() | |
| summary = ( | |
| f"🧹 Cleaning completed at {stats['timestamp']}\n\n" | |
| f"Total tokens checked: **{stats['total']}**\n" | |
| f"Valid tokens remaining: **{stats['valid']}**\n" | |
| f"Expired tokens removed: **{stats['expired']}**\n" | |
| f"Invalid tokens removed: **{stats['invalid']}**" | |
| ) | |
| return summary | |
| def get_status(): | |
| return "Background cleaner is running (checks every 60 minutes). Use 'Clean Now' for manual cleaning." | |
| # Gradio interface | |
| with gr.Blocks(title="Videoinu Token Cleaner") as demo: | |
| gr.Markdown(""" | |
| # 🧹 Videoinu Token Cleaner Dashboard | |
| This Space automatically cleans expired tokens from the dataset every hour. | |
| - Background cleaning is always running. | |
| - Click **Clean Now** to trigger an immediate clean. | |
| - Results appear here + full logs in Space logs. | |
| """) | |
| status = gr.Textbox(label="Status", value="Background cleaner active", interactive=False) | |
| output = gr.Markdown(label="Last Cleaning Result") | |
| with gr.Row(): | |
| clean_button = gr.Button("Clean Now", variant="primary", scale=1) | |
| refresh_button = gr.Button("Refresh Status", scale=1) | |
| clean_button.click( | |
| fn=clean_now, | |
| outputs=output, | |
| api_name="clean_now" | |
| ) | |
| refresh_button.click( | |
| fn=get_status, | |
| outputs=status | |
| ) | |
| # Launch the app | |
| demo.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| show_error=True | |
| ) |