tokencleaner / token_cleaner.py
HIRO12121212's picture
Update token_cleaner.py
4fcd409 verified
"""
Token Cleaner for Videoinu Tokens
Checks HuggingFace dataset for expired tokens and removes them
Runs continuously every 60 minutes in background
Provides a Gradio web interface for manual triggering and monitoring
"""
import os
import time
import json
import logging
import base64
import requests
from huggingface_hub import HfApi
from datetime import datetime
import threading
import gradio as gr
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class TokenCleaner:
def __init__(self, hf_token, dataset_name="HIRO12121212/videoinutoken"):
self.hf_token = hf_token
self.dataset_name = dataset_name
self.hf_api = HfApi()
logger.info(f"Token Cleaner initialized for dataset: {dataset_name}")
def decode_jwt_token(self, token):
try:
parts = token.split('.')
if len(parts) != 3:
logger.warning("Invalid JWT format (not 3 parts)")
return None
payload = parts[1]
padding = 4 - (len(payload) % 4)
if padding != 4:
payload += '=' * padding
decoded_bytes = base64.urlsafe_b64decode(payload)
decoded_json = json.loads(decoded_bytes)
return decoded_json
except Exception as e:
logger.error(f"Error decoding JWT token: {str(e)}")
return None
def is_token_expired(self, token_data):
if not token_data or 'exp' not in token_data:
return True
exp_timestamp = token_data['exp']
current_timestamp = int(time.time())
is_expired = current_timestamp >= exp_timestamp
if is_expired:
exp_date = datetime.fromtimestamp(exp_timestamp)
logger.info(f"Token expired on {exp_date} (email: {token_data.get('email', 'unknown')})")
return is_expired
def fetch_all_tokens(self):
try:
download_url = f"https://huggingface.co/datasets/{self.dataset_name}/resolve/main/videoinu_tokens.json"
headers = {"Authorization": f"Bearer {self.hf_token}"}
response = requests.get(download_url, headers=headers, timeout=10)
if response.status_code == 200:
tokens_array = json.loads(response.text)
logger.info(f"✅ Fetched {len(tokens_array)} tokens from dataset")
return tokens_array
else:
logger.error(f"Failed to fetch tokens: {response.status_code}")
return []
except Exception as e:
logger.error(f"Error fetching tokens: {str(e)}")
return []
def update_dataset(self, tokens_array):
try:
import tempfile
with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f:
json.dump(tokens_array, f, indent=2)
temp_path = f.name
self.hf_api.upload_file(
path_or_fileobj=temp_path,
path_in_repo="videoinu_tokens.json",
repo_id=self.dataset_name,
repo_type="dataset",
token=self.hf_token
)
os.unlink(temp_path)
logger.info(f"✅ Updated dataset with {len(tokens_array)} tokens")
return True
except Exception as e:
logger.error(f"Error updating dataset: {str(e)}")
return False
def clean_expired_tokens(self):
logger.info("=" * 80)
logger.info("Starting token cleaning cycle...")
logger.info("=" * 80)
all_tokens = self.fetch_all_tokens()
if not all_tokens:
logger.warning("No tokens found in dataset")
return {"total": 0, "valid": 0, "expired": 0, "invalid": 0}
total_count = len(all_tokens)
valid_tokens = []
expired_count = 0
invalid_count = 0
logger.info(f"Checking {total_count} tokens...")
for i, token_entry in enumerate(all_tokens):
token = token_entry.get('token', '')
email = token_entry.get('email', 'unknown')
credits = token_entry.get('credits_remaining', 0)
decoded = self.decode_jwt_token(token)
if decoded is None:
logger.warning(f"[{i+1}/{total_count}] Invalid token format: {email}")
invalid_count += 1
continue
if self.is_token_expired(decoded):
logger.info(f"[{i+1}/{total_count}] 🗑️ Removing expired token: {email} ({credits} credits)")
expired_count += 1
else:
exp_timestamp = decoded.get('exp', 0)
exp_date = datetime.fromtimestamp(exp_timestamp)
logger.info(f"[{i+1}/{total_count}] ✅ Valid token: {email} (expires: {exp_date}, {credits} credits)")
valid_tokens.append(token_entry)
if expired_count > 0 or invalid_count > 0:
logger.info(f"\n🧹 Cleaning dataset...")
logger.info(f" Removing {expired_count} expired tokens")
logger.info(f" Removing {invalid_count} invalid tokens")
if self.update_dataset(valid_tokens):
logger.info(f"✅ Dataset cleaned successfully!")
else:
logger.error(f"❌ Failed to update dataset")
else:
logger.info(f"\n✨ No expired tokens found - dataset is clean!")
stats = {
"total": total_count,
"valid": len(valid_tokens),
"expired": expired_count,
"invalid": invalid_count,
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
logger.info("=" * 80)
logger.info("Cleaning Summary:")
logger.info(f" Total tokens checked: {stats['total']}")
logger.info(f" Valid tokens: {stats['valid']}")
logger.info(f" Expired tokens removed: {stats['expired']}")
logger.info(f" Invalid tokens removed: {stats['invalid']}")
logger.info("=" * 80)
return stats
def run_continuous(self, interval_minutes=60):
logger.info(f"🚀 Token Cleaner started - checking every {interval_minutes} minutes")
while True:
try:
self.clean_expired_tokens()
logger.info(f"\n💤 Sleeping for {interval_minutes} minutes...")
logger.info(f"Next check at: {datetime.fromtimestamp(time.time() + interval_minutes * 60)}")
logger.info("")
time.sleep(interval_minutes * 60)
except KeyboardInterrupt:
logger.info("\n🛑 Token Cleaner stopped by user")
break
except Exception as e:
logger.error(f"Error in cleaning cycle: {str(e)}")
logger.info(f"Retrying in 1 minute...")
time.sleep(60)
# Global cleaner instance
HF_TOKEN = os.getenv("HF_TOKEN")
if not HF_TOKEN:
logger.error("HF_TOKEN environment variable not set!")
exit(1)
cleaner = TokenCleaner(
hf_token=HF_TOKEN,
dataset_name="HIRO12121212/videoinutoken"
)
# Start background thread
threading.Thread(target=cleaner.run_continuous, args=(60,), daemon=True).start()
# Gradio functions (no arguments needed)
def clean_now():
stats = cleaner.clean_expired_tokens()
summary = (
f"🧹 Cleaning completed at {stats['timestamp']}\n\n"
f"Total tokens checked: **{stats['total']}**\n"
f"Valid tokens remaining: **{stats['valid']}**\n"
f"Expired tokens removed: **{stats['expired']}**\n"
f"Invalid tokens removed: **{stats['invalid']}**"
)
return summary
def get_status():
return "Background cleaner is running (checks every 60 minutes). Use 'Clean Now' for manual cleaning."
# Gradio interface
with gr.Blocks(title="Videoinu Token Cleaner") as demo:
gr.Markdown("""
# 🧹 Videoinu Token Cleaner Dashboard
This Space automatically cleans expired tokens from the dataset every hour.
- Background cleaning is always running.
- Click **Clean Now** to trigger an immediate clean.
- Results appear here + full logs in Space logs.
""")
status = gr.Textbox(label="Status", value="Background cleaner active", interactive=False)
output = gr.Markdown(label="Last Cleaning Result")
with gr.Row():
clean_button = gr.Button("Clean Now", variant="primary", scale=1)
refresh_button = gr.Button("Refresh Status", scale=1)
clean_button.click(
fn=clean_now,
outputs=output,
api_name="clean_now"
)
refresh_button.click(
fn=get_status,
outputs=status
)
# Launch the app
demo.launch(
server_name="0.0.0.0",
server_port=7860,
show_error=True
)