import time import json import pickle import os from typing import Any, Optional, Dict from datetime import datetime, timedelta from pathlib import Path from config import Config try: from upstash_redis import Redis except ImportError: Redis = None class CacheManager: def __init__(self): self.storage_type = None self.cache_dir = None self.redis = None # 尝试初始化存储(优先级:Redis > Disk > Memory) self._init_storage() # 初始化内存缓存 self.epg: Dict[str, Dict[str, Any]] = {} self.cid: Optional[str] = None self.cid_time: float = 0 self.auth: Optional[Dict[str, Any]] = None self.auth_time: float = 0 self.channels: Optional[list] = None self.channels_time: float = 0 self.stream_info: Dict[str, Dict[str, Any]] = {} # 加载已有缓存 self._load_cache() def _init_storage(self): """初始化存储(Redis > Disk > Memory)""" # 1. 尝试 Redis redis_url = os.getenv('REDIS_URL', '') redis_token = os.getenv('REDIS_TOKEN', '') if Redis and redis_url and redis_token: try: self.redis = Redis(url=redis_url, token=redis_token) self.redis.ping() self.storage_type = 'redis' print("✅ 使用 Redis 持久化存储") return except Exception as e: print(f"⚠️ Redis 连接失败: {e}") self.redis = None # 2. 尝试本地磁盘 cache_dir = Path(os.getenv('CACHE_DIR', '/tmp/cache')) try: cache_dir.mkdir(parents=True, exist_ok=True) (cache_dir / 'epg').mkdir(exist_ok=True) # 测试写入权限 test_file = cache_dir / '.test' test_file.write_text('test') test_file.unlink() self.cache_dir = cache_dir self.storage_type = 'disk' print(f"✅ 使用本地磁盘缓存: {cache_dir}") print(f" 缓存将在容器运行期间保留") return except Exception as e: print(f"⚠️ 磁盘缓存不可用: {e}") # 3. 降级到内存 self.storage_type = 'memory' print("⚠️ 使用内存缓存(容器重启后丢失)") def _load_cache(self): """启动时加载缓存""" if self.storage_type == 'redis': self._load_from_redis() elif self.storage_type == 'disk': self._load_from_disk() def _load_from_redis(self): """从 Redis 加载缓存""" try: # 加载 CID cid_data = self.redis.get('cache:cid') if cid_data: data = json.loads(cid_data) self.cid = data['value'] self.cid_time = data['time'] # 加载 Auth auth_data = self.redis.get('cache:auth') if auth_data: data = json.loads(auth_data) self.auth = data['value'] self.auth_time = data['time'] # 加载 Channels channels_data = self.redis.get('cache:channels') if channels_data: data = json.loads(channels_data) self.channels = data['value'] self.channels_time = data['time'] print("✅ Redis 缓存加载完成") except Exception as e: print(f"⚠️ Redis 缓存加载失败: {e}") def _load_from_disk(self): """从磁盘加载缓存""" try: # 加载 EPG 缓存 epg_file = self.cache_dir / 'epg_cache.pkl' if epg_file.exists(): with open(epg_file, 'rb') as f: self.epg = pickle.load(f) print(f"✅ 从磁盘加载 {len(self.epg)} 条 EPG 缓存") # 加载元数据缓存 meta_file = self.cache_dir / 'meta_cache.json' if meta_file.exists(): with open(meta_file, 'r') as f: meta = json.load(f) if 'cid' in meta: self.cid = meta['cid'].get('value') self.cid_time = meta['cid'].get('time', 0) if 'auth' in meta: self.auth = meta['auth'].get('value') self.auth_time = meta['auth'].get('time', 0) if 'channels' in meta: self.channels = meta['channels'].get('value') self.channels_time = meta['channels'].get('time', 0) print("✅ 从磁盘加载元数据缓存") except Exception as e: print(f"⚠️ 磁盘缓存加载失败: {e}") def _save_to_disk(self, force=False): """保存缓存到磁盘""" if self.storage_type != 'disk' or not self.cache_dir: return try: # 保存 EPG(使用 pickle,快速) epg_file = self.cache_dir / 'epg_cache.pkl' with open(epg_file, 'wb') as f: pickle.dump(self.epg, f, protocol=pickle.HIGHEST_PROTOCOL) # 保存元数据(使用 JSON,可读) meta_file = self.cache_dir / 'meta_cache.json' meta = { 'cid': {'value': self.cid, 'time': self.cid_time}, 'auth': {'value': self.auth, 'time': self.auth_time}, 'channels': {'value': self.channels, 'time': self.channels_time} } with open(meta_file, 'w') as f: json.dump(meta, f) if force: print(f"💾 磁盘缓存已保存 ({len(self.epg)} 条 EPG)") except Exception as e: print(f"⚠️ 磁盘缓存保存失败: {e}") # ==================== CID 缓存 ==================== def get_cid(self) -> Optional[str]: if self.cid and (time.time() - self.cid_time < Config.CACHE_TTL['CID']): return self.cid if self.storage_type == 'redis': try: cid_data = self.redis.get('cache:cid') if cid_data: data = json.loads(cid_data) if time.time() - data['time'] < Config.CACHE_TTL['CID']: self.cid = data['value'] self.cid_time = data['time'] return self.cid except: pass return None def set_cid(self, cid: str): self.cid = cid self.cid_time = time.time() if self.storage_type == 'redis': try: data = {'value': cid, 'time': self.cid_time} self.redis.set('cache:cid', json.dumps(data), ex=Config.CACHE_TTL['CID']) except: pass elif self.storage_type == 'disk': self._save_to_disk() # ==================== Auth 缓存 ==================== def get_auth(self) -> Optional[Dict[str, Any]]: if self.auth and (time.time() - self.auth_time < Config.CACHE_TTL['AUTH']): return self.auth if self.storage_type == 'redis': try: auth_data = self.redis.get('cache:auth') if auth_data: data = json.loads(auth_data) if time.time() - data['time'] < Config.CACHE_TTL['AUTH']: self.auth = data['value'] self.auth_time = data['time'] return self.auth except: pass return None def set_auth(self, auth: Dict[str, Any]): self.auth = auth self.auth_time = time.time() if self.storage_type == 'redis': try: data = {'value': auth, 'time': self.auth_time} self.redis.set('cache:auth', json.dumps(data), ex=Config.CACHE_TTL['AUTH']) except: pass elif self.storage_type == 'disk': self._save_to_disk() # ==================== Channels 缓存 ==================== def get_channels(self) -> Optional[list]: if self.channels and (time.time() - self.channels_time < Config.CACHE_TTL['CHANNELS']): return self.channels if self.storage_type == 'redis': try: channels_data = self.redis.get('cache:channels') if channels_data: data = json.loads(channels_data) if time.time() - data['time'] < Config.CACHE_TTL['CHANNELS']: self.channels = data['value'] self.channels_time = data['time'] return self.channels except: pass return None def set_channels(self, channels: list): self.channels = channels self.channels_time = time.time() if self.storage_type == 'redis': try: data = {'value': channels, 'time': self.channels_time} self.redis.set('cache:channels', json.dumps(data), ex=Config.CACHE_TTL['CHANNELS']) except: pass elif self.storage_type == 'disk': self._save_to_disk() # ==================== Stream 缓存 ==================== def get_stream(self, key: str) -> Optional[str]: if key in self.stream_info: cached = self.stream_info[key] if time.time() - cached['time'] < Config.CACHE_TTL['STREAM']: return cached['url'] return None def set_stream(self, key: str, url: str): self.stream_info[key] = {'url': url, 'time': time.time()} if len(self.stream_info) > Config.MAX_STREAM_CACHE: oldest_key = min(self.stream_info.keys(), key=lambda k: self.stream_info[k]['time']) del self.stream_info[oldest_key] # ==================== EPG 缓存 ==================== def get_epg(self, vid: str, date: str) -> Optional[any]: key = f"{vid}_{date}" # 检查内存缓存 if key in self.epg: cached = self.epg[key] if vid == '_all_' and date == 'full': if time.time() - cached['time'] < Config.CACHE_TTL['EPG_FULL']: return cached['data'] else: today = self._get_jst_date() ttl = Config.CACHE_TTL['EPG_TODAY'] if date == today else Config.CACHE_TTL['EPG_OTHER'] if time.time() - cached['time'] < ttl: return cached['data'] # 如果使用磁盘缓存,尝试从单独文件加载 if self.storage_type == 'disk' and vid != '_all_': try: epg_file = self.cache_dir / 'epg' / f"{key}.pkl" if epg_file.exists(): with open(epg_file, 'rb') as f: cached = pickle.load(f) today = self._get_jst_date() ttl = Config.CACHE_TTL['EPG_TODAY'] if date == today else Config.CACHE_TTL['EPG_OTHER'] if time.time() - cached['time'] < ttl: # 加载到内存 self.epg[key] = cached return cached['data'] except: pass return None def set_epg(self, vid: str, date: str, data: any): key = f"{vid}_{date}" self.epg[key] = { 'data': data, 'time': time.time(), 'date': date, 'vid': vid } # Redis 存储 if self.storage_type == 'redis': # Redis 代码保持不变... pass # 磁盘存储 elif self.storage_type == 'disk': # 单独保存每个 EPG 文件(避免频繁写入大文件) try: epg_file = self.cache_dir / 'epg' / f"{key}.pkl" with open(epg_file, 'wb') as f: pickle.dump(self.epg[key], f, protocol=pickle.HIGHEST_PROTOCOL) except: pass # 每 20 条更新一次主缓存文件 if len(self.epg) % 20 == 0: self._save_to_disk() # 清理过期缓存 if len(self.epg) > Config.MAX_EPG_CACHE: self._clean_old_epg() def _clean_old_epg(self): """清理过期 EPG""" cutoff_date = (datetime.now() - timedelta(days=Config.CACHE_TTL['EPG_MAX_DAYS'])).strftime('%Y-%m-%d') to_delete = [k for k, v in self.epg.items() if v.get('date', '') < cutoff_date and not k.startswith('_all_')] for k in to_delete: del self.epg[k] # 删除磁盘文件 if self.storage_type == 'disk': try: epg_file = self.cache_dir / 'epg' / f"{k}.pkl" if epg_file.exists(): epg_file.unlink() except: pass def _get_jst_date(self) -> str: from datetime import timezone jst = timezone(timedelta(hours=9)) now = datetime.now(jst) return now.strftime('%Y-%m-%d') # ==================== 缓存管理 ==================== def clear_cache(self, cache_type: str = 'all'): if cache_type in ['cid', 'all']: self.cid = None self.cid_time = 0 if self.storage_type == 'redis': try: self.redis.delete('cache:cid') except: pass if cache_type in ['auth', 'all']: self.auth = None self.auth_time = 0 if self.storage_type == 'redis': try: self.redis.delete('cache:auth') except: pass if cache_type in ['channels', 'all']: self.channels = None self.channels_time = 0 if self.storage_type == 'redis': try: self.redis.delete('cache:channels') except: pass if cache_type in ['streams', 'all']: self.stream_info.clear() if cache_type in ['epg', 'all']: self.epg.clear() # 清理磁盘文件 if self.storage_type == 'disk' and self.cache_dir: try: for epg_file in (self.cache_dir / 'epg').glob('*.pkl'): epg_file.unlink() (self.cache_dir / 'epg_cache.pkl').unlink(missing_ok=True) except: pass # 保存更改 if self.storage_type == 'disk': self._save_to_disk() def get_stats(self) -> dict: return { 'storage_type': self.storage_type, 'cid': { 'cached': self.cid is not None, 'value': self.cid if self.cid else None, 'age': f"{int(time.time() - self.cid_time)}s" if self.cid else None, 'ttl': f"{Config.CACHE_TTL['CID'] - int(time.time() - self.cid_time)}s" if self.cid else None, 'storage': self.storage_type }, 'auth': { 'cached': self.auth is not None, 'token': self.auth['access_token'] if self.auth and 'access_token' in self.auth else None, 'age': f"{int(time.time() - self.auth_time)}s" if self.auth else None, 'ttl': f"{Config.CACHE_TTL['AUTH'] - int(time.time() - self.auth_time)}s" if self.auth else None, 'storage': self.storage_type }, 'channels': self.channels is not None, 'streams': len(self.stream_info), 'epg': len(self.epg), 'epg_detail': self.get_epg_cache_info() if self.epg else None } def get_epg_cache_info(self) -> dict: """获取 EPG 缓存详细信息""" info = { 'total_entries': len(self.epg), 'by_channel': {}, 'by_date': {}, 'full_cache_available': False } # 检查全量缓存 if '_all__full' in self.epg: info['full_cache_available'] = True full_cache = self.epg['_all__full'] info['full_cache_time'] = datetime.fromtimestamp(full_cache['time']).strftime('%Y-%m-%d %H:%M:%S') info['full_cache_age'] = f"{int(time.time() - full_cache['time'])}s" # 统计各频道和日期 for key, value in self.epg.items(): if key == '_all__full': continue vid = value.get('vid', 'unknown') date = value.get('date', 'unknown') if vid not in info['by_channel']: info['by_channel'][vid] = {'dates': [], 'program_count': 0} info['by_channel'][vid]['dates'].append(date) info['by_channel'][vid]['program_count'] += len(value.get('data', [])) if date not in info['by_date']: info['by_date'][date] = {'channels': [], 'program_count': 0} info['by_date'][date]['channels'].append(vid) info['by_date'][date]['program_count'] += len(value.get('data', [])) info['summary'] = { 'total_channels': len(info['by_channel']), 'total_dates': len(info['by_date']), 'total_programs': sum(ch['program_count'] for ch in info['by_channel'].values()) } return info def __del__(self): """程序退出时保存缓存""" if self.storage_type == 'disk': self._save_to_disk(force=True) print("💾 缓存已保存到磁盘") # 全局单例 cache = CacheManager()