| import time
|
| import json
|
| import pickle
|
| import os
|
| from typing import Any, Optional, Dict
|
| from datetime import datetime, timedelta
|
| from pathlib import Path
|
| from config import Config
|
|
|
| try:
|
| from upstash_redis import Redis
|
| except ImportError:
|
| Redis = None
|
|
|
| class CacheManager:
|
| def __init__(self):
|
| self.storage_type = None
|
| self.cache_dir = None
|
| self.redis = None
|
|
|
|
|
| self._init_storage()
|
|
|
|
|
| self.epg: Dict[str, Dict[str, Any]] = {}
|
| self.cid: Optional[str] = None
|
| self.cid_time: float = 0
|
| self.auth: Optional[Dict[str, Any]] = None
|
| self.auth_time: float = 0
|
| self.channels: Optional[list] = None
|
| self.channels_time: float = 0
|
| self.stream_info: Dict[str, Dict[str, Any]] = {}
|
|
|
|
|
| self._load_cache()
|
|
|
| def _init_storage(self):
|
| """初始化存储(Redis > Disk > Memory)"""
|
|
|
|
|
| redis_url = os.getenv('REDIS_URL', '')
|
| redis_token = os.getenv('REDIS_TOKEN', '')
|
|
|
| if Redis and redis_url and redis_token:
|
| try:
|
| self.redis = Redis(url=redis_url, token=redis_token)
|
| self.redis.ping()
|
| self.storage_type = 'redis'
|
| print("✅ 使用 Redis 持久化存储")
|
| return
|
| except Exception as e:
|
| print(f"⚠️ Redis 连接失败: {e}")
|
| self.redis = None
|
|
|
|
|
| cache_dir = Path(os.getenv('CACHE_DIR', '/tmp/cache'))
|
|
|
| try:
|
| cache_dir.mkdir(parents=True, exist_ok=True)
|
| (cache_dir / 'epg').mkdir(exist_ok=True)
|
|
|
|
|
| test_file = cache_dir / '.test'
|
| test_file.write_text('test')
|
| test_file.unlink()
|
|
|
| self.cache_dir = cache_dir
|
| self.storage_type = 'disk'
|
| print(f"✅ 使用本地磁盘缓存: {cache_dir}")
|
| print(f" 缓存将在容器运行期间保留")
|
| return
|
|
|
| except Exception as e:
|
| print(f"⚠️ 磁盘缓存不可用: {e}")
|
|
|
|
|
| self.storage_type = 'memory'
|
| print("⚠️ 使用内存缓存(容器重启后丢失)")
|
|
|
| def _load_cache(self):
|
| """启动时加载缓存"""
|
| if self.storage_type == 'redis':
|
| self._load_from_redis()
|
| elif self.storage_type == 'disk':
|
| self._load_from_disk()
|
|
|
| def _load_from_redis(self):
|
| """从 Redis 加载缓存"""
|
| try:
|
|
|
| cid_data = self.redis.get('cache:cid')
|
| if cid_data:
|
| data = json.loads(cid_data)
|
| self.cid = data['value']
|
| self.cid_time = data['time']
|
|
|
|
|
| auth_data = self.redis.get('cache:auth')
|
| if auth_data:
|
| data = json.loads(auth_data)
|
| self.auth = data['value']
|
| self.auth_time = data['time']
|
|
|
|
|
| channels_data = self.redis.get('cache:channels')
|
| if channels_data:
|
| data = json.loads(channels_data)
|
| self.channels = data['value']
|
| self.channels_time = data['time']
|
|
|
| print("✅ Redis 缓存加载完成")
|
| except Exception as e:
|
| print(f"⚠️ Redis 缓存加载失败: {e}")
|
|
|
| def _load_from_disk(self):
|
| """从磁盘加载缓存"""
|
| try:
|
|
|
| epg_file = self.cache_dir / 'epg_cache.pkl'
|
| if epg_file.exists():
|
| with open(epg_file, 'rb') as f:
|
| self.epg = pickle.load(f)
|
| print(f"✅ 从磁盘加载 {len(self.epg)} 条 EPG 缓存")
|
|
|
|
|
| meta_file = self.cache_dir / 'meta_cache.json'
|
| if meta_file.exists():
|
| with open(meta_file, 'r') as f:
|
| meta = json.load(f)
|
| if 'cid' in meta:
|
| self.cid = meta['cid'].get('value')
|
| self.cid_time = meta['cid'].get('time', 0)
|
| if 'auth' in meta:
|
| self.auth = meta['auth'].get('value')
|
| self.auth_time = meta['auth'].get('time', 0)
|
| if 'channels' in meta:
|
| self.channels = meta['channels'].get('value')
|
| self.channels_time = meta['channels'].get('time', 0)
|
| print("✅ 从磁盘加载元数据缓存")
|
|
|
| except Exception as e:
|
| print(f"⚠️ 磁盘缓存加载失败: {e}")
|
|
|
| def _save_to_disk(self, force=False):
|
| """保存缓存到磁盘"""
|
| if self.storage_type != 'disk' or not self.cache_dir:
|
| return
|
|
|
| try:
|
|
|
| epg_file = self.cache_dir / 'epg_cache.pkl'
|
| with open(epg_file, 'wb') as f:
|
| pickle.dump(self.epg, f, protocol=pickle.HIGHEST_PROTOCOL)
|
|
|
|
|
| meta_file = self.cache_dir / 'meta_cache.json'
|
| meta = {
|
| 'cid': {'value': self.cid, 'time': self.cid_time},
|
| 'auth': {'value': self.auth, 'time': self.auth_time},
|
| 'channels': {'value': self.channels, 'time': self.channels_time}
|
| }
|
| with open(meta_file, 'w') as f:
|
| json.dump(meta, f)
|
|
|
| if force:
|
| print(f"💾 磁盘缓存已保存 ({len(self.epg)} 条 EPG)")
|
| except Exception as e:
|
| print(f"⚠️ 磁盘缓存保存失败: {e}")
|
|
|
|
|
|
|
| def get_cid(self) -> Optional[str]:
|
| if self.cid and (time.time() - self.cid_time < Config.CACHE_TTL['CID']):
|
| return self.cid
|
|
|
| if self.storage_type == 'redis':
|
| try:
|
| cid_data = self.redis.get('cache:cid')
|
| if cid_data:
|
| data = json.loads(cid_data)
|
| if time.time() - data['time'] < Config.CACHE_TTL['CID']:
|
| self.cid = data['value']
|
| self.cid_time = data['time']
|
| return self.cid
|
| except:
|
| pass
|
|
|
| return None
|
|
|
| def set_cid(self, cid: str):
|
| self.cid = cid
|
| self.cid_time = time.time()
|
|
|
| if self.storage_type == 'redis':
|
| try:
|
| data = {'value': cid, 'time': self.cid_time}
|
| self.redis.set('cache:cid', json.dumps(data), ex=Config.CACHE_TTL['CID'])
|
| except:
|
| pass
|
| elif self.storage_type == 'disk':
|
| self._save_to_disk()
|
|
|
|
|
|
|
| def get_auth(self) -> Optional[Dict[str, Any]]:
|
| if self.auth and (time.time() - self.auth_time < Config.CACHE_TTL['AUTH']):
|
| return self.auth
|
|
|
| if self.storage_type == 'redis':
|
| try:
|
| auth_data = self.redis.get('cache:auth')
|
| if auth_data:
|
| data = json.loads(auth_data)
|
| if time.time() - data['time'] < Config.CACHE_TTL['AUTH']:
|
| self.auth = data['value']
|
| self.auth_time = data['time']
|
| return self.auth
|
| except:
|
| pass
|
|
|
| return None
|
|
|
| def set_auth(self, auth: Dict[str, Any]):
|
| self.auth = auth
|
| self.auth_time = time.time()
|
|
|
| if self.storage_type == 'redis':
|
| try:
|
| data = {'value': auth, 'time': self.auth_time}
|
| self.redis.set('cache:auth', json.dumps(data), ex=Config.CACHE_TTL['AUTH'])
|
| except:
|
| pass
|
| elif self.storage_type == 'disk':
|
| self._save_to_disk()
|
|
|
|
|
|
|
| def get_channels(self) -> Optional[list]:
|
| if self.channels and (time.time() - self.channels_time < Config.CACHE_TTL['CHANNELS']):
|
| return self.channels
|
|
|
| if self.storage_type == 'redis':
|
| try:
|
| channels_data = self.redis.get('cache:channels')
|
| if channels_data:
|
| data = json.loads(channels_data)
|
| if time.time() - data['time'] < Config.CACHE_TTL['CHANNELS']:
|
| self.channels = data['value']
|
| self.channels_time = data['time']
|
| return self.channels
|
| except:
|
| pass
|
|
|
| return None
|
|
|
| def set_channels(self, channels: list):
|
| self.channels = channels
|
| self.channels_time = time.time()
|
|
|
| if self.storage_type == 'redis':
|
| try:
|
| data = {'value': channels, 'time': self.channels_time}
|
| self.redis.set('cache:channels', json.dumps(data), ex=Config.CACHE_TTL['CHANNELS'])
|
| except:
|
| pass
|
| elif self.storage_type == 'disk':
|
| self._save_to_disk()
|
|
|
|
|
|
|
| def get_stream(self, key: str) -> Optional[str]:
|
| if key in self.stream_info:
|
| cached = self.stream_info[key]
|
| if time.time() - cached['time'] < Config.CACHE_TTL['STREAM']:
|
| return cached['url']
|
| return None
|
|
|
| def set_stream(self, key: str, url: str):
|
| self.stream_info[key] = {'url': url, 'time': time.time()}
|
|
|
| if len(self.stream_info) > Config.MAX_STREAM_CACHE:
|
| oldest_key = min(self.stream_info.keys(),
|
| key=lambda k: self.stream_info[k]['time'])
|
| del self.stream_info[oldest_key]
|
|
|
|
|
|
|
| def get_epg(self, vid: str, date: str) -> Optional[any]:
|
| key = f"{vid}_{date}"
|
|
|
|
|
| if key in self.epg:
|
| cached = self.epg[key]
|
|
|
| if vid == '_all_' and date == 'full':
|
| if time.time() - cached['time'] < Config.CACHE_TTL['EPG_FULL']:
|
| return cached['data']
|
| else:
|
| today = self._get_jst_date()
|
| ttl = Config.CACHE_TTL['EPG_TODAY'] if date == today else Config.CACHE_TTL['EPG_OTHER']
|
|
|
| if time.time() - cached['time'] < ttl:
|
| return cached['data']
|
|
|
|
|
| if self.storage_type == 'disk' and vid != '_all_':
|
| try:
|
| epg_file = self.cache_dir / 'epg' / f"{key}.pkl"
|
| if epg_file.exists():
|
| with open(epg_file, 'rb') as f:
|
| cached = pickle.load(f)
|
|
|
| today = self._get_jst_date()
|
| ttl = Config.CACHE_TTL['EPG_TODAY'] if date == today else Config.CACHE_TTL['EPG_OTHER']
|
|
|
| if time.time() - cached['time'] < ttl:
|
|
|
| self.epg[key] = cached
|
| return cached['data']
|
| except:
|
| pass
|
|
|
| return None
|
|
|
| def set_epg(self, vid: str, date: str, data: any):
|
| key = f"{vid}_{date}"
|
| self.epg[key] = {
|
| 'data': data,
|
| 'time': time.time(),
|
| 'date': date,
|
| 'vid': vid
|
| }
|
|
|
|
|
| if self.storage_type == 'redis':
|
|
|
| pass
|
|
|
|
|
| elif self.storage_type == 'disk':
|
|
|
| try:
|
| epg_file = self.cache_dir / 'epg' / f"{key}.pkl"
|
| with open(epg_file, 'wb') as f:
|
| pickle.dump(self.epg[key], f, protocol=pickle.HIGHEST_PROTOCOL)
|
| except:
|
| pass
|
|
|
|
|
| if len(self.epg) % 20 == 0:
|
| self._save_to_disk()
|
|
|
|
|
| if len(self.epg) > Config.MAX_EPG_CACHE:
|
| self._clean_old_epg()
|
|
|
| def _clean_old_epg(self):
|
| """清理过期 EPG"""
|
| cutoff_date = (datetime.now() - timedelta(days=Config.CACHE_TTL['EPG_MAX_DAYS'])).strftime('%Y-%m-%d')
|
| to_delete = [k for k, v in self.epg.items()
|
| if v.get('date', '') < cutoff_date and not k.startswith('_all_')]
|
|
|
| for k in to_delete:
|
| del self.epg[k]
|
|
|
|
|
| if self.storage_type == 'disk':
|
| try:
|
| epg_file = self.cache_dir / 'epg' / f"{k}.pkl"
|
| if epg_file.exists():
|
| epg_file.unlink()
|
| except:
|
| pass
|
|
|
| def _get_jst_date(self) -> str:
|
| from datetime import timezone
|
| jst = timezone(timedelta(hours=9))
|
| now = datetime.now(jst)
|
| return now.strftime('%Y-%m-%d')
|
|
|
|
|
|
|
| def clear_cache(self, cache_type: str = 'all'):
|
| if cache_type in ['cid', 'all']:
|
| self.cid = None
|
| self.cid_time = 0
|
| if self.storage_type == 'redis':
|
| try:
|
| self.redis.delete('cache:cid')
|
| except:
|
| pass
|
|
|
| if cache_type in ['auth', 'all']:
|
| self.auth = None
|
| self.auth_time = 0
|
| if self.storage_type == 'redis':
|
| try:
|
| self.redis.delete('cache:auth')
|
| except:
|
| pass
|
|
|
| if cache_type in ['channels', 'all']:
|
| self.channels = None
|
| self.channels_time = 0
|
| if self.storage_type == 'redis':
|
| try:
|
| self.redis.delete('cache:channels')
|
| except:
|
| pass
|
|
|
| if cache_type in ['streams', 'all']:
|
| self.stream_info.clear()
|
|
|
| if cache_type in ['epg', 'all']:
|
| self.epg.clear()
|
|
|
|
|
| if self.storage_type == 'disk' and self.cache_dir:
|
| try:
|
| for epg_file in (self.cache_dir / 'epg').glob('*.pkl'):
|
| epg_file.unlink()
|
| (self.cache_dir / 'epg_cache.pkl').unlink(missing_ok=True)
|
| except:
|
| pass
|
|
|
|
|
| if self.storage_type == 'disk':
|
| self._save_to_disk()
|
|
|
| def get_stats(self) -> dict:
|
| return {
|
| 'storage_type': self.storage_type,
|
| 'cid': {
|
| 'cached': self.cid is not None,
|
| 'value': self.cid if self.cid else None,
|
| 'age': f"{int(time.time() - self.cid_time)}s" if self.cid else None,
|
| 'ttl': f"{Config.CACHE_TTL['CID'] - int(time.time() - self.cid_time)}s" if self.cid else None,
|
| 'storage': self.storage_type
|
| },
|
| 'auth': {
|
| 'cached': self.auth is not None,
|
| 'token': self.auth['access_token'] if self.auth and 'access_token' in self.auth else None,
|
| 'age': f"{int(time.time() - self.auth_time)}s" if self.auth else None,
|
| 'ttl': f"{Config.CACHE_TTL['AUTH'] - int(time.time() - self.auth_time)}s" if self.auth else None,
|
| 'storage': self.storage_type
|
| },
|
| 'channels': self.channels is not None,
|
| 'streams': len(self.stream_info),
|
| 'epg': len(self.epg),
|
| 'epg_detail': self.get_epg_cache_info() if self.epg else None
|
| }
|
|
|
| def get_epg_cache_info(self) -> dict:
|
| """获取 EPG 缓存详细信息"""
|
| info = {
|
| 'total_entries': len(self.epg),
|
| 'by_channel': {},
|
| 'by_date': {},
|
| 'full_cache_available': False
|
| }
|
|
|
|
|
| if '_all__full' in self.epg:
|
| info['full_cache_available'] = True
|
| full_cache = self.epg['_all__full']
|
| info['full_cache_time'] = datetime.fromtimestamp(full_cache['time']).strftime('%Y-%m-%d %H:%M:%S')
|
| info['full_cache_age'] = f"{int(time.time() - full_cache['time'])}s"
|
|
|
|
|
| for key, value in self.epg.items():
|
| if key == '_all__full':
|
| continue
|
|
|
| vid = value.get('vid', 'unknown')
|
| date = value.get('date', 'unknown')
|
|
|
| if vid not in info['by_channel']:
|
| info['by_channel'][vid] = {'dates': [], 'program_count': 0}
|
| info['by_channel'][vid]['dates'].append(date)
|
| info['by_channel'][vid]['program_count'] += len(value.get('data', []))
|
|
|
| if date not in info['by_date']:
|
| info['by_date'][date] = {'channels': [], 'program_count': 0}
|
| info['by_date'][date]['channels'].append(vid)
|
| info['by_date'][date]['program_count'] += len(value.get('data', []))
|
|
|
| info['summary'] = {
|
| 'total_channels': len(info['by_channel']),
|
| 'total_dates': len(info['by_date']),
|
| 'total_programs': sum(ch['program_count'] for ch in info['by_channel'].values())
|
| }
|
|
|
| return info
|
|
|
| def __del__(self):
|
| """程序退出时保存缓存"""
|
| if self.storage_type == 'disk':
|
| self._save_to_disk(force=True)
|
| print("💾 缓存已保存到磁盘")
|
|
|
|
|
|
|
| cache = CacheManager() |