| import time |
| import json |
| import pickle |
| import os |
| from typing import Any, Optional, Dict |
| from datetime import datetime, timedelta |
| from pathlib import Path |
| from config import Config |
|
|
| try: |
| from upstash_redis import Redis |
| except ImportError: |
| Redis = None |
|
|
| class CacheManager: |
| def __init__(self): |
| self.storage_type = None |
| self.cache_dir = None |
| self.redis = None |
| |
| |
| self._init_storage() |
| |
| |
| self.epg: Dict[str, Dict[str, Any]] = {} |
| self.cid: Optional[str] = None |
| self.cid_time: float = 0 |
| self.auth: Optional[Dict[str, Any]] = None |
| self.auth_time: float = 0 |
| self.channels: Optional[list] = None |
| self.channels_time: float = 0 |
| self.stream_info: Dict[str, Dict[str, Any]] = {} |
| |
| |
| self._load_cache() |
| |
| def _init_storage(self): |
| """初始化存储(Redis > Disk > Memory)""" |
| |
| |
| redis_url = os.getenv('REDIS_URL', '') |
| redis_token = os.getenv('REDIS_TOKEN', '') |
| |
| if Redis and redis_url and redis_token: |
| try: |
| self.redis = Redis(url=redis_url, token=redis_token) |
| self.redis.ping() |
| self.storage_type = 'redis' |
| print("✅ 使用 Redis 持久化存储") |
| return |
| except Exception as e: |
| print(f"⚠️ Redis 连接失败: {e}") |
| self.redis = None |
| |
| |
| cache_dir = Path(os.getenv('CACHE_DIR', '/tmp/cache')) |
| |
| try: |
| cache_dir.mkdir(parents=True, exist_ok=True) |
| (cache_dir / 'epg').mkdir(exist_ok=True) |
| |
| |
| test_file = cache_dir / '.test' |
| test_file.write_text('test') |
| test_file.unlink() |
| |
| self.cache_dir = cache_dir |
| self.storage_type = 'disk' |
| print(f"✅ 使用本地磁盘缓存: {cache_dir}") |
| print(f" 缓存将在容器运行期间保留") |
| return |
| |
| except Exception as e: |
| print(f"⚠️ 磁盘缓存不可用: {e}") |
| |
| |
| self.storage_type = 'memory' |
| print("⚠️ 使用内存缓存(容器重启后丢失)") |
| |
| def _load_cache(self): |
| """启动时加载缓存""" |
| if self.storage_type == 'redis': |
| self._load_from_redis() |
| elif self.storage_type == 'disk': |
| self._load_from_disk() |
| |
| def _load_from_redis(self): |
| """从 Redis 加载缓存""" |
| try: |
| |
| cid_data = self.redis.get('cache:cid') |
| if cid_data: |
| data = json.loads(cid_data) |
| self.cid = data['value'] |
| self.cid_time = data['time'] |
| |
| |
| auth_data = self.redis.get('cache:auth') |
| if auth_data: |
| data = json.loads(auth_data) |
| self.auth = data['value'] |
| self.auth_time = data['time'] |
| |
| |
| channels_data = self.redis.get('cache:channels') |
| if channels_data: |
| data = json.loads(channels_data) |
| self.channels = data['value'] |
| self.channels_time = data['time'] |
| |
| print("✅ Redis 缓存加载完成") |
| except Exception as e: |
| print(f"⚠️ Redis 缓存加载失败: {e}") |
| |
| def _load_from_disk(self): |
| """从磁盘加载缓存""" |
| try: |
| |
| epg_file = self.cache_dir / 'epg_cache.pkl' |
| if epg_file.exists(): |
| with open(epg_file, 'rb') as f: |
| self.epg = pickle.load(f) |
| print(f"✅ 从磁盘加载 {len(self.epg)} 条 EPG 缓存") |
| |
| |
| meta_file = self.cache_dir / 'meta_cache.json' |
| if meta_file.exists(): |
| with open(meta_file, 'r') as f: |
| meta = json.load(f) |
| if 'cid' in meta: |
| self.cid = meta['cid'].get('value') |
| self.cid_time = meta['cid'].get('time', 0) |
| if 'auth' in meta: |
| self.auth = meta['auth'].get('value') |
| self.auth_time = meta['auth'].get('time', 0) |
| if 'channels' in meta: |
| self.channels = meta['channels'].get('value') |
| self.channels_time = meta['channels'].get('time', 0) |
| print("✅ 从磁盘加载元数据缓存") |
| |
| except Exception as e: |
| print(f"⚠️ 磁盘缓存加载失败: {e}") |
| |
| def _save_to_disk(self, force=False): |
| """保存缓存到磁盘""" |
| if self.storage_type != 'disk' or not self.cache_dir: |
| return |
| |
| try: |
| |
| epg_file = self.cache_dir / 'epg_cache.pkl' |
| with open(epg_file, 'wb') as f: |
| pickle.dump(self.epg, f, protocol=pickle.HIGHEST_PROTOCOL) |
| |
| |
| meta_file = self.cache_dir / 'meta_cache.json' |
| meta = { |
| 'cid': {'value': self.cid, 'time': self.cid_time}, |
| 'auth': {'value': self.auth, 'time': self.auth_time}, |
| 'channels': {'value': self.channels, 'time': self.channels_time} |
| } |
| with open(meta_file, 'w') as f: |
| json.dump(meta, f) |
| |
| if force: |
| print(f"💾 磁盘缓存已保存 ({len(self.epg)} 条 EPG)") |
| except Exception as e: |
| print(f"⚠️ 磁盘缓存保存失败: {e}") |
| |
| |
| |
| def get_cid(self) -> Optional[str]: |
| if self.cid and (time.time() - self.cid_time < Config.CACHE_TTL['CID']): |
| return self.cid |
| |
| if self.storage_type == 'redis': |
| try: |
| cid_data = self.redis.get('cache:cid') |
| if cid_data: |
| data = json.loads(cid_data) |
| if time.time() - data['time'] < Config.CACHE_TTL['CID']: |
| self.cid = data['value'] |
| self.cid_time = data['time'] |
| return self.cid |
| except: |
| pass |
| |
| return None |
| |
| def set_cid(self, cid: str): |
| self.cid = cid |
| self.cid_time = time.time() |
| |
| if self.storage_type == 'redis': |
| try: |
| data = {'value': cid, 'time': self.cid_time} |
| self.redis.set('cache:cid', json.dumps(data), ex=Config.CACHE_TTL['CID']) |
| except: |
| pass |
| elif self.storage_type == 'disk': |
| self._save_to_disk() |
| |
| |
| |
| def get_auth(self) -> Optional[Dict[str, Any]]: |
| if self.auth and (time.time() - self.auth_time < Config.CACHE_TTL['AUTH']): |
| return self.auth |
| |
| if self.storage_type == 'redis': |
| try: |
| auth_data = self.redis.get('cache:auth') |
| if auth_data: |
| data = json.loads(auth_data) |
| if time.time() - data['time'] < Config.CACHE_TTL['AUTH']: |
| self.auth = data['value'] |
| self.auth_time = data['time'] |
| return self.auth |
| except: |
| pass |
| |
| return None |
| |
| def set_auth(self, auth: Dict[str, Any]): |
| self.auth = auth |
| self.auth_time = time.time() |
| |
| if self.storage_type == 'redis': |
| try: |
| data = {'value': auth, 'time': self.auth_time} |
| self.redis.set('cache:auth', json.dumps(data), ex=Config.CACHE_TTL['AUTH']) |
| except: |
| pass |
| elif self.storage_type == 'disk': |
| self._save_to_disk() |
| |
| |
| |
| def get_channels(self) -> Optional[list]: |
| if self.channels and (time.time() - self.channels_time < Config.CACHE_TTL['CHANNELS']): |
| return self.channels |
| |
| if self.storage_type == 'redis': |
| try: |
| channels_data = self.redis.get('cache:channels') |
| if channels_data: |
| data = json.loads(channels_data) |
| if time.time() - data['time'] < Config.CACHE_TTL['CHANNELS']: |
| self.channels = data['value'] |
| self.channels_time = data['time'] |
| return self.channels |
| except: |
| pass |
| |
| return None |
| |
| def set_channels(self, channels: list): |
| self.channels = channels |
| self.channels_time = time.time() |
| |
| if self.storage_type == 'redis': |
| try: |
| data = {'value': channels, 'time': self.channels_time} |
| self.redis.set('cache:channels', json.dumps(data), ex=Config.CACHE_TTL['CHANNELS']) |
| except: |
| pass |
| elif self.storage_type == 'disk': |
| self._save_to_disk() |
| |
| |
| |
| def get_stream(self, key: str) -> Optional[str]: |
| if key in self.stream_info: |
| cached = self.stream_info[key] |
| if time.time() - cached['time'] < Config.CACHE_TTL['STREAM']: |
| return cached['url'] |
| return None |
| |
| def set_stream(self, key: str, url: str): |
| self.stream_info[key] = {'url': url, 'time': time.time()} |
| |
| if len(self.stream_info) > Config.MAX_STREAM_CACHE: |
| oldest_key = min(self.stream_info.keys(), |
| key=lambda k: self.stream_info[k]['time']) |
| del self.stream_info[oldest_key] |
| |
| |
| |
| def get_epg(self, vid: str, date: str) -> Optional[any]: |
| key = f"{vid}_{date}" |
| |
| |
| if key in self.epg: |
| cached = self.epg[key] |
| |
| if vid == '_all_' and date == 'full': |
| if time.time() - cached['time'] < Config.CACHE_TTL['EPG_FULL']: |
| return cached['data'] |
| else: |
| today = self._get_jst_date() |
| ttl = Config.CACHE_TTL['EPG_TODAY'] if date == today else Config.CACHE_TTL['EPG_OTHER'] |
| |
| if time.time() - cached['time'] < ttl: |
| return cached['data'] |
| |
| |
| if self.storage_type == 'disk' and vid != '_all_': |
| try: |
| epg_file = self.cache_dir / 'epg' / f"{key}.pkl" |
| if epg_file.exists(): |
| with open(epg_file, 'rb') as f: |
| cached = pickle.load(f) |
| |
| today = self._get_jst_date() |
| ttl = Config.CACHE_TTL['EPG_TODAY'] if date == today else Config.CACHE_TTL['EPG_OTHER'] |
| |
| if time.time() - cached['time'] < ttl: |
| |
| self.epg[key] = cached |
| return cached['data'] |
| except: |
| pass |
| |
| return None |
| |
| def set_epg(self, vid: str, date: str, data: any): |
| key = f"{vid}_{date}" |
| self.epg[key] = { |
| 'data': data, |
| 'time': time.time(), |
| 'date': date, |
| 'vid': vid |
| } |
| |
| |
| if self.storage_type == 'redis': |
| |
| pass |
| |
| |
| elif self.storage_type == 'disk': |
| |
| try: |
| epg_file = self.cache_dir / 'epg' / f"{key}.pkl" |
| with open(epg_file, 'wb') as f: |
| pickle.dump(self.epg[key], f, protocol=pickle.HIGHEST_PROTOCOL) |
| except: |
| pass |
| |
| |
| |
| |
| if len(self.epg) > Config.MAX_EPG_CACHE: |
| self._clean_old_epg() |
| |
| def _clean_old_epg(self): |
| """清理过期 EPG""" |
| cutoff_date = (datetime.now() - timedelta(days=Config.CACHE_TTL['EPG_MAX_DAYS'])).strftime('%Y-%m-%d') |
| to_delete = [k for k, v in self.epg.items() |
| if v.get('date', '') < cutoff_date and not k.startswith('_all_')] |
| |
| for k in to_delete: |
| del self.epg[k] |
| |
| |
| if self.storage_type == 'disk': |
| try: |
| epg_file = self.cache_dir / 'epg' / f"{k}.pkl" |
| if epg_file.exists(): |
| epg_file.unlink() |
| except: |
| pass |
| |
| def _get_jst_date(self) -> str: |
| from datetime import timezone |
| jst = timezone(timedelta(hours=9)) |
| now = datetime.now(jst) |
| return now.strftime('%Y-%m-%d') |
| |
| |
| |
| def clear_cache(self, cache_type: str = 'all'): |
| if cache_type in ['cid', 'all']: |
| self.cid = None |
| self.cid_time = 0 |
| if self.storage_type == 'redis': |
| try: |
| self.redis.delete('cache:cid') |
| except: |
| pass |
| |
| if cache_type in ['auth', 'all']: |
| self.auth = None |
| self.auth_time = 0 |
| if self.storage_type == 'redis': |
| try: |
| self.redis.delete('cache:auth') |
| except: |
| pass |
| |
| if cache_type in ['channels', 'all']: |
| self.channels = None |
| self.channels_time = 0 |
| if self.storage_type == 'redis': |
| try: |
| self.redis.delete('cache:channels') |
| except: |
| pass |
| |
| if cache_type in ['streams', 'all']: |
| self.stream_info.clear() |
| |
| if cache_type in ['epg', 'all']: |
| self.epg.clear() |
| |
| |
| if self.storage_type == 'disk' and self.cache_dir: |
| try: |
| for epg_file in (self.cache_dir / 'epg').glob('*.pkl'): |
| epg_file.unlink() |
| (self.cache_dir / 'epg_cache.pkl').unlink(missing_ok=True) |
| except: |
| pass |
| |
| |
| if self.storage_type == 'disk': |
| self._save_to_disk() |
| |
| def get_stats(self) -> dict: |
| return { |
| 'storage_type': self.storage_type, |
| 'cid': { |
| 'cached': self.cid is not None, |
| 'value': self.cid if self.cid else None, |
| 'age': f"{int(time.time() - self.cid_time)}s" if self.cid else None, |
| 'ttl': f"{Config.CACHE_TTL['CID'] - int(time.time() - self.cid_time)}s" if self.cid else None, |
| 'storage': self.storage_type |
| }, |
| 'auth': { |
| 'cached': self.auth is not None, |
| 'token': self.auth['access_token'] if self.auth and 'access_token' in self.auth else None, |
| 'age': f"{int(time.time() - self.auth_time)}s" if self.auth else None, |
| 'ttl': f"{Config.CACHE_TTL['AUTH'] - int(time.time() - self.auth_time)}s" if self.auth else None, |
| 'storage': self.storage_type |
| }, |
| 'channels': self.channels is not None, |
| 'streams': len(self.stream_info), |
| 'epg': len(self.epg), |
| 'epg_detail': self.get_epg_cache_info() if self.epg else None |
| } |
| |
| def get_epg_cache_info(self) -> dict: |
| """获取 EPG 缓存详细信息""" |
| info = { |
| 'total_entries': len(self.epg), |
| 'by_channel': {}, |
| 'by_date': {}, |
| 'full_cache_available': False |
| } |
| |
| |
| if '_all__full' in self.epg: |
| info['full_cache_available'] = True |
| full_cache = self.epg['_all__full'] |
| info['full_cache_time'] = datetime.fromtimestamp(full_cache['time']).strftime('%Y-%m-%d %H:%M:%S') |
| info['full_cache_age'] = f"{int(time.time() - full_cache['time'])}s" |
| |
| |
| for key, value in self.epg.items(): |
| if key == '_all__full': |
| continue |
| |
| vid = value.get('vid', 'unknown') |
| date = value.get('date', 'unknown') |
| |
| if vid not in info['by_channel']: |
| info['by_channel'][vid] = {'dates': [], 'program_count': 0} |
| info['by_channel'][vid]['dates'].append(date) |
| info['by_channel'][vid]['program_count'] += len(value.get('data', [])) |
| |
| if date not in info['by_date']: |
| info['by_date'][date] = {'channels': [], 'program_count': 0} |
| info['by_date'][date]['channels'].append(vid) |
| info['by_date'][date]['program_count'] += len(value.get('data', [])) |
| |
| info['summary'] = { |
| 'total_channels': len(info['by_channel']), |
| 'total_dates': len(info['by_date']), |
| 'total_programs': sum(ch['program_count'] for ch in info['by_channel'].values()) |
| } |
| |
| return info |
| |
| def __del__(self): |
| """程序退出时保存缓存""" |
| if self.storage_type == 'disk': |
| self._save_to_disk(force=True) |
| print("💾 缓存已保存到磁盘") |
|
|
|
|
| |
| cache = CacheManager() |