import random from functools import partial import asyncio import requests from fastapi import HTTPException import dropbox from dropbox.files import FolderMetadata, FileMetadata from datetime import datetime, timedelta, timezone from config import SanatanConfig from db import SanatanDatabase from modules.audio.model import AudioRequest, AudioType import logging from modules.dropbox.client import get_dbx logging.basicConfig() logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) DROPBOX_SEMAPHORE = asyncio.Semaphore(5) def _list_folder_sync(dbx, path): return dbx.files_list_folder(path) async def list_folder_async(dbx, path): async with DROPBOX_SEMAPHORE: loop = asyncio.get_running_loop() return await loop.run_in_executor( None, partial(_list_folder_sync, dbx, path) ) def _list_folder_continue_sync(dbx, cursor): return dbx.files_list_folder_continue(cursor) async def list_folder_continue_async(dbx, cursor): async with DROPBOX_SEMAPHORE: loop = asyncio.get_running_loop() return await loop.run_in_executor( None, partial(_list_folder_continue_sync, dbx, cursor) ) async def list_dropbox_folder_hierarchy(dbx: dropbox.Dropbox, base_path: str = ""): """ Recursively fetches the folder/file hierarchy from Dropbox starting at base_path. Includes direct temporary download links for files. Args: dbx (dropbox.Dropbox): Authenticated Dropbox client. base_path (str): Path inside Dropbox ("" means root). Returns: dict: Nested dict with folders -> {subfolders/files with links}. """ hierarchy = {} try: logger.info("listing files in %s", base_path) result = await list_folder_async(dbx, base_path) while True: for entry in result.entries: if isinstance(entry, FolderMetadata): # Recurse into subfolder hierarchy[entry.name] = await list_dropbox_folder_hierarchy( dbx, entry.path_lower ) elif isinstance(entry, FileMetadata): try: async with DROPBOX_SEMAPHORE: link = await get_temp_link_async(dbx, entry.path_lower) hierarchy.setdefault("__files__", []).append( { "name": entry.name, "path": entry.path_lower, "download_url": link, } ) except Exception as link_err: logger.warning( f"Could not generate link for {entry.path_lower}: {link_err}" ) if result.has_more: result = await list_folder_continue_async(dbx, result.cursor) else: break except Exception as e: logger.error(f"Error listing folder {base_path}: {e}",e) return hierarchy # cache = {(scripture_name, global_index, type): {"url": ..., "expiry": ...}} audio_cache: dict[tuple[str, int, str], dict] = {} CACHE_TTL = timedelta(hours=3, minutes=30) # refresh before 4h expiry AUDIO_LIST_CACHE_TTL = timedelta(hours=24) audio_list_cache = {} # {(scripture_name): {"entries": [...], "expiry": datetime}} def _get_temp_link_sync(dbx, file_path): # This runs in a thread return dbx.files_get_temporary_link(file_path).link async def get_temp_link_async(dbx, file_path, retries=3): loop = asyncio.get_running_loop() for attempt in range(retries): try: return await loop.run_in_executor( None, partial(_get_temp_link_sync, dbx, file_path) ) except requests.exceptions.RequestException as e: if attempt == retries - 1: raise await asyncio.sleep(0.5 * (2 ** attempt) + random.random()) async def get_audio_urls(req: AudioRequest): base_path = f"/{req.scripture_name}/audio" prefix = f"{req.global_index}-" urls = {} now = datetime.now(timezone.utc) # --- 1️⃣ Check if folder listing is cached --- cache_entry = audio_list_cache.get(req.scripture_name) if cache_entry and cache_entry["expiry"] > now: entries = cache_entry["entries"] else: # Fetch fresh listing from Dropbox try: result = await list_folder_async(get_dbx(), base_path) entries = result.entries while result.has_more: result = await list_folder_continue_async(get_dbx(), result.cursor) entries.extend(result.entries) audio_list_cache[req.scripture_name] = { "entries": entries, "expiry": now + AUDIO_LIST_CACHE_TTL, } except dropbox.exceptions.ApiError: raise HTTPException(status_code=404, detail="Audio directory not found") # --- 2️⃣ Filter matching files --- matching_files = [ entry for entry in entries if isinstance(entry, FileMetadata) and entry.name.startswith(prefix) ] if not matching_files: raise HTTPException(status_code=404, detail="No audio files found") # --- 3️⃣ Generate or reuse cached URLs --- for entry in matching_files: filename = entry.name file_type = filename[len(prefix):].rsplit(".", 1)[0] cache_key = (req.scripture_name, req.global_index, file_type) cached = audio_cache.get(cache_key) if cached and cached["expiry"] > now: urls[file_type] = cached["url"] continue file_path = f"{base_path}/{filename}" try: async with DROPBOX_SEMAPHORE: temp_link = await get_temp_link_async(get_dbx(), file_path) urls[file_type] = temp_link audio_cache[cache_key] = { "url": temp_link, "expiry": now + CACHE_TTL } except dropbox.exceptions.ApiError as e: logger.error(f"Dropbox API error for {file_path}: {e}") urls[file_type] = None except requests.exceptions.RequestException as e: logger.warning(f"Dropbox connection error for {file_path}: {e}") urls[file_type] = None return urls async def cleanup_audio_url_cache(interval_seconds: int = 600): """Periodically remove expired entries from audio_cache.""" while True: now = datetime.now(timezone.utc) expired_keys = [key for key, val in audio_cache.items() if val["expiry"] <= now] for key in expired_keys: del audio_cache[key] # Debug log if expired_keys: logger.info(f"Cleaned up {len(expired_keys)} expired cache entries") await asyncio.sleep(interval_seconds) from datetime import datetime, timezone, timedelta # Simple in-memory cache _audio_indices_cache: dict[tuple[str, str], dict] = {} CACHE_TTL_2 = timedelta(minutes=10) async def get_global_indices_with_audio(scripture_name: str, audio_type: AudioType): """ Returns a sorted list of global indices for a given scripture that have audio of the specified type. Supports AudioType.any, AudioType.none, and specific types. Uses in-memory caching for repeated calls. """ now = datetime.now(timezone.utc) cache_key = (scripture_name, audio_type.value) # Check cache cached = _audio_indices_cache.get(cache_key) if cached and cached["expiry"] > now: return cached["indices"] # Step 1: list all files in Dropbox folder base_path = f"/{scripture_name}/audio" entries = [] try: result = await list_folder_async(get_dbx(), base_path) entries.extend(result.entries) while result.has_more: result = await list_folder_continue_async(get_dbx(), result.cursor) entries.extend(result.entries) except dropbox.exceptions.ApiError: raise HTTPException(status_code=404, detail="Audio directory not found") # Step 2: collect all global indices with any audio all_indices_with_audio = set() for entry in entries: if not isinstance(entry, FileMetadata) or "-" not in entry.name: continue global_index_str, _ = entry.name.split("-", 1) try: global_index = int(global_index_str) except ValueError: continue all_indices_with_audio.add(global_index) # Step 3: filter based on audio_type if audio_type == AudioType.none: db = SanatanDatabase() config = SanatanConfig() total_verses = db.count( collection_name=config.get_collection_name(scripture_name=scripture_name) ) indices = set(range(1, total_verses + 1)) - all_indices_with_audio elif audio_type == AudioType.any: indices = all_indices_with_audio else: indices = set() for entry in entries: if not isinstance(entry, FileMetadata) or "-" not in entry.name: continue global_index_str, rest = entry.name.split("-", 1) try: global_index = int(global_index_str) except ValueError: continue file_type = rest.rsplit(".", 1)[0].strip().lower() if file_type.startswith(audio_type.value): indices.add(global_index) # Cache the result _audio_indices_cache[cache_key] = { "indices": sorted(indices), "expiry": now + CACHE_TTL_2 } return sorted(indices) if __name__ == "__main__": # Create Dropbox client with your access token # data = list_dropbox_folder_hierarchy(dbx, "") # data = asyncio.run( # get_audio_urls(AudioRequest(scripture_name="divya_prabandham", global_index=0)) # ) data = asyncio.run( get_global_indices_with_audio( scripture_name="divya_prabandham", audio_type=AudioType.upanyasam ) ) # logger.info(json.dumps(data, indent=2)) logger.info(len(data))