anicove2 / api /utils /helpers.py
mwask's picture
Upload 124 files
19ecc0d verified
Raw
History Blame Contribute Delete
13.6 kB
"""
Helper utility functions for the application.
Includes Turnstile verification, AniList API interactions, and watchlist enrichment.
"""
import requests
import logging
import time
import asyncio
import inspect
import aiohttp
from threading import Lock
from typing import Dict, Any
logger = logging.getLogger(__name__)
# Global storage for sync progress
sync_progress_storage = {}
sync_progress_lock = Lock()
# === Turnstile Verification ===
def verify_turnstile(token, secret, remoteip=None):
"""Verify Turnstile token with Cloudflare - Vercel compatible version"""
if not token:
return False
# For Vercel deployment, we need to handle the verification more carefully
data = {
"secret": secret,
"response": token
}
# Only add remoteip if it's a valid IP address (avoid Vercel proxy issues)
if remoteip and remoteip not in ['127.0.0.1', 'localhost'] and not remoteip.startswith('::'):
data["remoteip"] = remoteip
try:
# Use longer timeout for serverless functions
resp = requests.post(
"https://challenges.cloudflare.com/turnstile/v0/siteverify",
data=data,
timeout=10,
headers={
'Content-Type': 'application/x-www-form-urlencoded',
'User-Agent': 'YumeZone/1.0'
}
)
if resp.status_code != 200:
logger.error(f"Turnstile API returned status {resp.status_code}: {resp.text}")
return False
result = resp.json()
success = result.get("success", False)
# Log detailed error info for debugging
if not success:
error_codes = result.get("error-codes", [])
logger.warning(f"Turnstile verification failed. Token: {token[:20]}..., Error codes: {error_codes}")
return False
return success
except requests.exceptions.Timeout:
logger.error("Turnstile verification timeout")
# Fail secure rather than open
return False
except requests.exceptions.RequestException as e:
logger.error(f"Turnstile verification request error: {e}")
return False
except Exception as e:
logger.error(f"Turnstile verification unexpected error: {e}")
return False
# === AniList API Functions ===
def get_anilist_user_info(access_token):
"""Get user information from AniList GraphQL API."""
query = '''
query {
Viewer {
id
name
avatar {
large
medium
}
bannerImage
about
statistics {
anime {
count
meanScore
minutesWatched
}
}
}
}
'''
headers = {
'Authorization': f'Bearer {access_token}',
'Content-Type': 'application/json',
'Accept': 'application/json'
}
try:
response = requests.post('https://graphql.anilist.co',
json={'query': query},
headers=headers)
if response.status_code == 200:
data = response.json()
if 'errors' in data:
logger.error(f"AniList GraphQL errors: {data['errors']}")
return None
return data['data']['Viewer']
else:
logger.error(f"AniList API error: {response.status_code} - {response.text}")
return None
except Exception as e:
logger.error(f"Error getting AniList user info: {e}")
return None
async def fetch_anilist_next_episode(anilist_id: int = None, mal_id: int = None, search_title: str = None) -> dict:
"""Fetch the next episode schedule from AniList GraphQL API using anilistId, malId, or search title."""
if not anilist_id and not mal_id and not search_title:
return {}
query_id = '''
query ($id: Int, $idMal: Int) {
Media(id: $id, idMal: $idMal, type: ANIME) {
id
status
nextAiringEpisode {
airingAt
timeUntilAiring
episode
}
}
}
'''
query_search = '''
query ($search: String) {
Media(search: $search, type: ANIME, status: RELEASING) {
nextAiringEpisode {
airingAt
timeUntilAiring
episode
}
}
}
'''
# Try exact ID queries first
exact_queries = []
if anilist_id and int(anilist_id) > 0:
exact_queries.append((query_id, {"id": int(anilist_id)}))
if mal_id and int(mal_id) > 0:
exact_queries.append((query_id, {"idMal": int(mal_id)}))
try:
async with aiohttp.ClientSession() as session:
# 1. Run exact queries first
for query, variables in exact_queries:
async with session.post('https://graphql.anilist.co', json={'query': query, 'variables': variables}) as resp:
if resp.status == 200:
data = await resp.json()
media = data.get('data', {}).get('Media')
if media:
# If media is found, this is the exact anime season!
# If nextAiringEpisode is present, return it.
next_ep = media.get('nextAiringEpisode')
if next_ep and next_ep.get('airingAt'):
return {
"airingTimestamp": next_ep.get('airingAt'),
"timeUntilAiring": next_ep.get('timeUntilAiring'),
"episode": next_ep.get('episode')
}
# Otherwise, the exact match exists but is NOT releasing (FINISHED, NOT_YET_RELEASED, etc.)
# Stop immediately — do not fall back to loose title searching which will match other seasons!
return {}
# 2. If no exact IDs are available, try search fallback (filters by RELEASING automatically)
if not exact_queries and search_title:
async with session.post('https://graphql.anilist.co', json={'query': query_search, 'variables': {"search": search_title}}) as resp:
if resp.status == 200:
data = await resp.json()
media = data.get('data', {}).get('Media')
if media:
next_ep = media.get('nextAiringEpisode')
if next_ep and next_ep.get('airingAt'):
return {
"airingTimestamp": next_ep.get('airingAt'),
"timeUntilAiring": next_ep.get('timeUntilAiring'),
"episode": next_ep.get('episode')
}
return {}
except Exception as e:
logger.error(f"Error fetching next episode from AniList: {e}")
return {}
# === Sync Progress Management ===
def store_sync_progress(user_id: str, progress_data: dict):
"""Store sync progress for a user"""
with sync_progress_lock:
sync_progress_storage[user_id] = {
**progress_data,
'timestamp': time.time()
}
def get_sync_progress(user_id: str) -> dict:
"""Get sync progress for a user"""
with sync_progress_lock:
return sync_progress_storage.get(user_id, {})
def clear_sync_progress(user_id: str):
"""Clear sync progress for a user"""
with sync_progress_lock:
sync_progress_storage.pop(user_id, None)
# === Sync Wrapper Function ===
def sync_anilist_watchlist_blocking(user_id: str, access_token: str, progress_callback=None) -> Dict[str, Any]:
"""
Run the user's sync function in a safe way whether it's async or sync.
Returns the dict result, or {'error': ...} on failure.
"""
try:
# import the async function and BatchConfig from ani_to_yume
from .ani_to_yume import sync_anilist_watchlist_to_local as async_sync_watchlist
from .ani_to_yume import BatchConfig
# Use optimized config for better performance and fewer failures
config = BatchConfig(
batch_size=1000,
delay_between_batches=0.05,
max_retries=1,
skip_failed_matches=True,
max_search_candidates=4,
max_anime_check=3
)
# If the imported name is an async function, call it with progress callback
if inspect.iscoroutinefunction(async_sync_watchlist):
coro = async_sync_watchlist(user_id, access_token, progress_callback, config)
else:
# check if function accepts progress_callback and config
sig = inspect.signature(async_sync_watchlist)
params = sig.parameters
if 'progress_callback' in params and 'config' in params:
coro_or_result = async_sync_watchlist(user_id, access_token, progress_callback, config)
elif 'progress_callback' in params:
coro_or_result = async_sync_watchlist(user_id, access_token, progress_callback)
else:
coro_or_result = async_sync_watchlist(user_id, access_token)
# if the result is a coroutine, treat it as such
if asyncio.iscoroutine(coro_or_result):
coro = coro_or_result
else:
# synchronous function returned a result
return coro_or_result or {}
# At this point `coro` should be an awaitable
try:
return asyncio.run(coro) or {}
except RuntimeError as e:
# asyncio.run may fail if we're already inside an event loop (e.g., some WSGI/ASGI contexts).
logger.debug("asyncio.run failed, falling back to manual loop: %s", e)
loop = asyncio.new_event_loop()
try:
asyncio.set_event_loop(loop)
return loop.run_until_complete(coro) or {}
finally:
try:
loop.run_until_complete(loop.shutdown_asyncgens())
except Exception:
pass
loop.close()
except Exception as e:
logger.exception("Blocking watchlist sync failed")
return {"error": str(e)}
# === Watchlist Enrichment ===
from ..providers import UnifiedScraper
HA = UnifiedScraper()
async def enrich_watchlist_item(item: dict) -> dict:
"""
Enriches a watchlist item with poster_url, episodes, total_episodes, rating.
Always fetches fresh data from the scraper.
"""
try:
anime_id = item.get('anime_id')
if not anime_id:
return item
try:
resp = await HA.anime_about(anime_id)
except Exception as e:
logger.debug(f"Scraper.anime_about failed for {anime_id}: {e}")
resp = None
poster_url = ''
episodes = {'sub': 0, 'dub': 0}
total_episodes = 0
rating = None
status = 'Unknown'
if isinstance(resp, dict):
candidate = resp.get('anime', {}).get('info', {})
if isinstance(candidate, dict):
# Poster
poster_url = (
candidate.get('poster')
or candidate.get('image')
or candidate.get('thumbnail')
or candidate.get('poster_url')
or ''
)
# Stats → episodes & rating
stats = candidate.get('stats') or {}
eps_obj = stats.get('episodes') or candidate.get('episodes')
if isinstance(eps_obj, dict):
episodes['sub'] = int(eps_obj.get('sub') or 0)
episodes['dub'] = int(eps_obj.get('dub') or 0)
elif isinstance(eps_obj, (int, float, str)):
try:
episodes['sub'] = int(eps_obj)
except Exception:
pass
rating = candidate.get('rating') or stats.get('rating')
# Extract status
more_info = resp.get('anime', {}).get('moreInfo', {})
status = more_info.get('status') or 'Unknown'
# Total episodes
if episodes.get('sub'):
total_episodes = episodes['sub']
elif episodes.get('dub'):
total_episodes = episodes['dub']
payload = {
'poster_url': poster_url,
'episodes': episodes,
'total_episodes': total_episodes,
'anime_status': status # anime's airing status, NOT the user's watchlist status
}
if rating:
payload['rating'] = rating
item.update(payload)
return item
except Exception as e:
logger.debug(f"enrich_watchlist_item error for {item.get('anime_id')}: {e}")
return item