bookmyservice-ums / scripts /cleanup_tokens.py
MukeshKapoor25's picture
remember me
9b51d59
"""
Cleanup script for expired refresh tokens
Run this periodically (e.g., daily cron job) to remove expired tokens
"""
import asyncio
import sys
import os
from datetime import datetime
# Add parent directory to path
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from app.models.refresh_token_model import RefreshTokenModel
import logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
async def cleanup_expired_tokens():
"""Remove expired refresh tokens from database"""
logger.info("Starting cleanup of expired refresh tokens...")
try:
deleted_count = await RefreshTokenModel.cleanup_expired_tokens()
logger.info(f"โœ… Successfully cleaned up {deleted_count} expired tokens")
return deleted_count
except Exception as e:
logger.error(f"โŒ Error during cleanup: {str(e)}", exc_info=True)
raise
async def get_token_statistics():
"""Get statistics about refresh tokens"""
from app.core.nosql_client import db
try:
collection = db["refresh_tokens"]
# Total tokens
total = await collection.count_documents({})
# Active tokens (not revoked, not expired)
active = await collection.count_documents({
"revoked": False,
"expires_at": {"$gt": datetime.utcnow()}
})
# Revoked tokens
revoked = await collection.count_documents({"revoked": True})
# Expired tokens
expired = await collection.count_documents({
"expires_at": {"$lt": datetime.utcnow()}
})
# Used tokens
used = await collection.count_documents({"used": True})
# Remember me tokens
remember_me = await collection.count_documents({"remember_me": True})
logger.info("\n๐Ÿ“Š Token Statistics:")
logger.info(f" Total tokens: {total}")
logger.info(f" Active tokens: {active}")
logger.info(f" Revoked tokens: {revoked}")
logger.info(f" Expired tokens: {expired}")
logger.info(f" Used tokens: {used}")
logger.info(f" Remember me tokens: {remember_me}")
return {
"total": total,
"active": active,
"revoked": revoked,
"expired": expired,
"used": used,
"remember_me": remember_me
}
except Exception as e:
logger.error(f"โŒ Error getting statistics: {str(e)}", exc_info=True)
raise
async def check_suspicious_activity():
"""Check for suspicious token rotation patterns"""
from app.core.cache_client import get_redis
import json
try:
redis = await get_redis()
# Get all token families
cursor = 0
suspicious_families = []
while True:
cursor, keys = await redis.scan(cursor, match="token_family:*", count=100)
for key in keys:
data = await redis.get(key)
if data:
family_data = json.loads(data)
rotation_count = family_data.get("rotation_count", 0)
# Flag families with excessive rotations (>100 in 30 days)
if rotation_count > 100:
suspicious_families.append({
"family_id": key.split(":")[-1],
"customer_id": family_data.get("customer_id"),
"rotation_count": rotation_count,
"created_at": family_data.get("created_at")
})
if cursor == 0:
break
if suspicious_families:
logger.warning(f"\nโš ๏ธ Found {len(suspicious_families)} suspicious token families:")
for family in suspicious_families:
logger.warning(f" - Family {family['family_id']}: {family['rotation_count']} rotations")
logger.warning(f" Customer: {family['customer_id']}")
else:
logger.info("\nโœ… No suspicious token activity detected")
return suspicious_families
except Exception as e:
logger.error(f"โŒ Error checking suspicious activity: {str(e)}", exc_info=True)
return []
async def main():
"""Main cleanup function"""
logger.info("=" * 60)
logger.info("Refresh Token Cleanup Script")
logger.info(f"Run time: {datetime.utcnow().isoformat()}")
logger.info("=" * 60)
# Get statistics before cleanup
logger.info("\n๐Ÿ“ˆ Statistics before cleanup:")
await get_token_statistics()
# Perform cleanup
logger.info("\n๐Ÿงน Performing cleanup...")
deleted_count = await cleanup_expired_tokens()
# Get statistics after cleanup
logger.info("\n๐Ÿ“ˆ Statistics after cleanup:")
await get_token_statistics()
# Check for suspicious activity
logger.info("\n๐Ÿ” Checking for suspicious activity...")
suspicious = await check_suspicious_activity()
# Summary
logger.info("\n" + "=" * 60)
logger.info("Cleanup Summary:")
logger.info(f" Deleted tokens: {deleted_count}")
logger.info(f" Suspicious families: {len(suspicious)}")
logger.info("=" * 60)
if __name__ == "__main__":
asyncio.run(main())