Turnstile / db_results.py
llzai's picture
Upload 6 files
38a5f07 verified
import aiosqlite
import json
import logging
from typing import Dict, Any, Optional, Union
DB_PATH = "results.db"
# PRAGMA настройки для оптимизации БД
PRAGMA_SETTINGS = [
"PRAGMA journal_mode=WAL",
"PRAGMA synchronous=NORMAL",
"PRAGMA cache_size=10000",
"PRAGMA temp_store=MEMORY",
"PRAGMA busy_timeout=30000"
]
async def _apply_pragma_settings(db):
"""Применить PRAGMA настройки к подключению БД"""
for pragma in PRAGMA_SETTINGS:
await db.execute(pragma)
async def init_db():
"""Initialize database with results table in WAL mode"""
try:
async with aiosqlite.connect(DB_PATH) as db:
await _apply_pragma_settings(db)
await db.execute("""
CREATE TABLE IF NOT EXISTS results (
task_id TEXT PRIMARY KEY,
type TEXT NOT NULL,
data TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
await db.commit()
logging.getLogger("TurnstileAPIServer").info(f"Database initialized in WAL mode: {DB_PATH}")
except Exception as e:
logging.getLogger("TurnstileAPIServer").error(f"Database initialization error: {e}")
raise
async def save_result(task_id: str, task_type: str, data: Union[Dict[str, Any], str]) -> None:
"""Save result to database"""
try:
async with aiosqlite.connect(DB_PATH) as db:
await _apply_pragma_settings(db)
data_json = json.dumps(data) if isinstance(data, dict) else data
await db.execute(
"REPLACE INTO results (task_id, type, data) VALUES (?, ?, ?)",
(task_id, task_type, data_json)
)
await db.commit()
except Exception as e:
logging.getLogger("TurnstileAPIServer").error(f"Error saving result {task_id}: {e}")
raise
async def load_result(task_id: str) -> Optional[Union[Dict[str, Any], str]]:
"""Load result from database"""
try:
async with aiosqlite.connect(DB_PATH) as db:
await _apply_pragma_settings(db)
async with db.execute("SELECT data FROM results WHERE task_id = ?", (task_id,)) as cursor:
row = await cursor.fetchone()
if row:
try:
return json.loads(row[0])
except json.JSONDecodeError:
return row[0]
return None
except Exception as e:
logging.getLogger("TurnstileAPIServer").error(f"Error loading result {task_id}: {e}")
return None
async def load_all_results() -> Dict[str, Any]:
"""Load all results from database"""
try:
async with aiosqlite.connect(DB_PATH) as db:
await _apply_pragma_settings(db)
results = {}
async with db.execute("SELECT task_id, data FROM results") as cursor:
async for row in cursor:
try:
results[row[0]] = json.loads(row[1])
except json.JSONDecodeError:
results[row[0]] = row[1]
return results
except Exception as e:
logging.getLogger("TurnstileAPIServer").error(f"Error loading all results: {e}")
return {}
async def delete_result(task_id: str) -> None:
"""Delete result from database"""
try:
async with aiosqlite.connect(DB_PATH) as db:
await _apply_pragma_settings(db)
await db.execute("DELETE FROM results WHERE task_id = ?", (task_id,))
await db.commit()
except Exception as e:
logging.getLogger("TurnstileAPIServer").error(f"Error deleting result {task_id}: {e}")
async def get_pending_count() -> int:
"""Get count of pending tasks"""
try:
async with aiosqlite.connect(DB_PATH) as db:
await _apply_pragma_settings(db)
async with db.execute("SELECT COUNT(*) FROM results WHERE data LIKE '%CAPTCHA_NOT_READY%'") as cursor:
row = await cursor.fetchone()
return row[0] if row else 0
except Exception as e:
logging.getLogger("TurnstileAPIServer").error(f"Error getting pending count: {e}")
return 0
async def cleanup_old_results(days_old: int = 1) -> int:
"""Clean up results older than specified days"""
try:
async with aiosqlite.connect(DB_PATH) as db:
await _apply_pragma_settings(db)
async with db.execute(
"DELETE FROM results WHERE created_at < datetime('now', '-{} days')".format(days_old)
) as cursor:
deleted_count = cursor.rowcount
await db.commit()
logging.getLogger("TurnstileAPIServer").info(f"Cleaned up {deleted_count} old results")
return deleted_count
except Exception as e:
logging.getLogger("TurnstileAPIServer").error(f"Error cleaning up old results: {e}")
return 0