gt / cache_manager.py
harii66's picture
Update cache_manager.py
7d934de verified
import time
import json
import pickle
import os
from typing import Any, Optional, Dict
from datetime import datetime, timedelta
from pathlib import Path
from config import Config
try:
from upstash_redis import Redis
except ImportError:
Redis = None
class CacheManager:
def __init__(self):
self.storage_type = None
self.cache_dir = None
self.redis = None
# 尝试初始化存储(优先级:Redis > Disk > Memory)
self._init_storage()
# 初始化内存缓存
self.epg: Dict[str, Dict[str, Any]] = {}
self.cid: Optional[str] = None
self.cid_time: float = 0
self.auth: Optional[Dict[str, Any]] = None
self.auth_time: float = 0
self.channels: Optional[list] = None
self.channels_time: float = 0
self.stream_info: Dict[str, Dict[str, Any]] = {}
# 加载已有缓存
self._load_cache()
def _init_storage(self):
"""初始化存储(Redis > Disk > Memory)"""
# 1. 尝试 Redis
redis_url = os.getenv('REDIS_URL', '')
redis_token = os.getenv('REDIS_TOKEN', '')
if Redis and redis_url and redis_token:
try:
self.redis = Redis(url=redis_url, token=redis_token)
self.redis.ping()
self.storage_type = 'redis'
print("✅ 使用 Redis 持久化存储")
return
except Exception as e:
print(f"⚠️ Redis 连接失败: {e}")
self.redis = None
# 2. 尝试本地磁盘
cache_dir = Path(os.getenv('CACHE_DIR', '/tmp/cache'))
try:
cache_dir.mkdir(parents=True, exist_ok=True)
(cache_dir / 'epg').mkdir(exist_ok=True)
# 测试写入权限
test_file = cache_dir / '.test'
test_file.write_text('test')
test_file.unlink()
self.cache_dir = cache_dir
self.storage_type = 'disk'
print(f"✅ 使用本地磁盘缓存: {cache_dir}")
print(f" 缓存将在容器运行期间保留")
return
except Exception as e:
print(f"⚠️ 磁盘缓存不可用: {e}")
# 3. 降级到内存
self.storage_type = 'memory'
print("⚠️ 使用内存缓存(容器重启后丢失)")
def _load_cache(self):
"""启动时加载缓存"""
if self.storage_type == 'redis':
self._load_from_redis()
elif self.storage_type == 'disk':
self._load_from_disk()
def _load_from_redis(self):
"""从 Redis 加载缓存"""
try:
# 加载 CID
cid_data = self.redis.get('cache:cid')
if cid_data:
data = json.loads(cid_data)
self.cid = data['value']
self.cid_time = data['time']
# 加载 Auth
auth_data = self.redis.get('cache:auth')
if auth_data:
data = json.loads(auth_data)
self.auth = data['value']
self.auth_time = data['time']
# 加载 Channels
channels_data = self.redis.get('cache:channels')
if channels_data:
data = json.loads(channels_data)
self.channels = data['value']
self.channels_time = data['time']
print("✅ Redis 缓存加载完成")
except Exception as e:
print(f"⚠️ Redis 缓存加载失败: {e}")
def _load_from_disk(self):
"""从磁盘加载缓存"""
try:
# 加载 EPG 缓存
epg_file = self.cache_dir / 'epg_cache.pkl'
if epg_file.exists():
with open(epg_file, 'rb') as f:
self.epg = pickle.load(f)
print(f"✅ 从磁盘加载 {len(self.epg)} 条 EPG 缓存")
# 加载元数据缓存
meta_file = self.cache_dir / 'meta_cache.json'
if meta_file.exists():
with open(meta_file, 'r') as f:
meta = json.load(f)
if 'cid' in meta:
self.cid = meta['cid'].get('value')
self.cid_time = meta['cid'].get('time', 0)
if 'auth' in meta:
self.auth = meta['auth'].get('value')
self.auth_time = meta['auth'].get('time', 0)
if 'channels' in meta:
self.channels = meta['channels'].get('value')
self.channels_time = meta['channels'].get('time', 0)
print("✅ 从磁盘加载元数据缓存")
except Exception as e:
print(f"⚠️ 磁盘缓存加载失败: {e}")
def _save_to_disk(self, force=False):
"""保存缓存到磁盘"""
if self.storage_type != 'disk' or not self.cache_dir:
return
try:
# 保存 EPG(使用 pickle,快速)
epg_file = self.cache_dir / 'epg_cache.pkl'
with open(epg_file, 'wb') as f:
pickle.dump(self.epg, f, protocol=pickle.HIGHEST_PROTOCOL)
# 保存元数据(使用 JSON,可读)
meta_file = self.cache_dir / 'meta_cache.json'
meta = {
'cid': {'value': self.cid, 'time': self.cid_time},
'auth': {'value': self.auth, 'time': self.auth_time},
'channels': {'value': self.channels, 'time': self.channels_time}
}
with open(meta_file, 'w') as f:
json.dump(meta, f)
if force:
print(f"💾 磁盘缓存已保存 ({len(self.epg)} 条 EPG)")
except Exception as e:
print(f"⚠️ 磁盘缓存保存失败: {e}")
# ==================== CID 缓存 ====================
def get_cid(self) -> Optional[str]:
if self.cid and (time.time() - self.cid_time < Config.CACHE_TTL['CID']):
return self.cid
if self.storage_type == 'redis':
try:
cid_data = self.redis.get('cache:cid')
if cid_data:
data = json.loads(cid_data)
if time.time() - data['time'] < Config.CACHE_TTL['CID']:
self.cid = data['value']
self.cid_time = data['time']
return self.cid
except:
pass
return None
def set_cid(self, cid: str):
self.cid = cid
self.cid_time = time.time()
if self.storage_type == 'redis':
try:
data = {'value': cid, 'time': self.cid_time}
self.redis.set('cache:cid', json.dumps(data), ex=Config.CACHE_TTL['CID'])
except:
pass
elif self.storage_type == 'disk':
self._save_to_disk()
# ==================== Auth 缓存 ====================
def get_auth(self) -> Optional[Dict[str, Any]]:
if self.auth and (time.time() - self.auth_time < Config.CACHE_TTL['AUTH']):
return self.auth
if self.storage_type == 'redis':
try:
auth_data = self.redis.get('cache:auth')
if auth_data:
data = json.loads(auth_data)
if time.time() - data['time'] < Config.CACHE_TTL['AUTH']:
self.auth = data['value']
self.auth_time = data['time']
return self.auth
except:
pass
return None
def set_auth(self, auth: Dict[str, Any]):
self.auth = auth
self.auth_time = time.time()
if self.storage_type == 'redis':
try:
data = {'value': auth, 'time': self.auth_time}
self.redis.set('cache:auth', json.dumps(data), ex=Config.CACHE_TTL['AUTH'])
except:
pass
elif self.storage_type == 'disk':
self._save_to_disk()
# ==================== Channels 缓存 ====================
def get_channels(self) -> Optional[list]:
if self.channels and (time.time() - self.channels_time < Config.CACHE_TTL['CHANNELS']):
return self.channels
if self.storage_type == 'redis':
try:
channels_data = self.redis.get('cache:channels')
if channels_data:
data = json.loads(channels_data)
if time.time() - data['time'] < Config.CACHE_TTL['CHANNELS']:
self.channels = data['value']
self.channels_time = data['time']
return self.channels
except:
pass
return None
def set_channels(self, channels: list):
self.channels = channels
self.channels_time = time.time()
if self.storage_type == 'redis':
try:
data = {'value': channels, 'time': self.channels_time}
self.redis.set('cache:channels', json.dumps(data), ex=Config.CACHE_TTL['CHANNELS'])
except:
pass
elif self.storage_type == 'disk':
self._save_to_disk()
# ==================== Stream 缓存 ====================
def get_stream(self, key: str) -> Optional[str]:
if key in self.stream_info:
cached = self.stream_info[key]
if time.time() - cached['time'] < Config.CACHE_TTL['STREAM']:
return cached['url']
return None
def set_stream(self, key: str, url: str):
self.stream_info[key] = {'url': url, 'time': time.time()}
if len(self.stream_info) > Config.MAX_STREAM_CACHE:
oldest_key = min(self.stream_info.keys(),
key=lambda k: self.stream_info[k]['time'])
del self.stream_info[oldest_key]
# ==================== EPG 缓存 ====================
def get_epg(self, vid: str, date: str) -> Optional[any]:
key = f"{vid}_{date}"
# 检查内存缓存
if key in self.epg:
cached = self.epg[key]
if vid == '_all_' and date == 'full':
if time.time() - cached['time'] < Config.CACHE_TTL['EPG_FULL']:
return cached['data']
else:
today = self._get_jst_date()
ttl = Config.CACHE_TTL['EPG_TODAY'] if date == today else Config.CACHE_TTL['EPG_OTHER']
if time.time() - cached['time'] < ttl:
return cached['data']
# 如果使用磁盘缓存,尝试从单独文件加载
if self.storage_type == 'disk' and vid != '_all_':
try:
epg_file = self.cache_dir / 'epg' / f"{key}.pkl"
if epg_file.exists():
with open(epg_file, 'rb') as f:
cached = pickle.load(f)
today = self._get_jst_date()
ttl = Config.CACHE_TTL['EPG_TODAY'] if date == today else Config.CACHE_TTL['EPG_OTHER']
if time.time() - cached['time'] < ttl:
# 加载到内存
self.epg[key] = cached
return cached['data']
except:
pass
return None
def set_epg(self, vid: str, date: str, data: any):
key = f"{vid}_{date}"
self.epg[key] = {
'data': data,
'time': time.time(),
'date': date,
'vid': vid
}
# Redis 存储
if self.storage_type == 'redis':
# Redis 代码保持不变...
pass
# 磁盘存储
elif self.storage_type == 'disk':
# 单独保存每个 EPG 文件(避免频繁写入大文件)
try:
epg_file = self.cache_dir / 'epg' / f"{key}.pkl"
with open(epg_file, 'wb') as f:
pickle.dump(self.epg[key], f, protocol=pickle.HIGHEST_PROTOCOL)
except:
pass
# 不再每 20 条全量重写 epg_cache.pkl(写放大严重,越往后越慢)。
# 单条 .pkl 文件已足够;shutdown 时由 _save_to_disk(force=True) 兜底落盘。
# 清理过期缓存
if len(self.epg) > Config.MAX_EPG_CACHE:
self._clean_old_epg()
def _clean_old_epg(self):
"""清理过期 EPG"""
cutoff_date = (datetime.now() - timedelta(days=Config.CACHE_TTL['EPG_MAX_DAYS'])).strftime('%Y-%m-%d')
to_delete = [k for k, v in self.epg.items()
if v.get('date', '') < cutoff_date and not k.startswith('_all_')]
for k in to_delete:
del self.epg[k]
# 删除磁盘文件
if self.storage_type == 'disk':
try:
epg_file = self.cache_dir / 'epg' / f"{k}.pkl"
if epg_file.exists():
epg_file.unlink()
except:
pass
def _get_jst_date(self) -> str:
from datetime import timezone
jst = timezone(timedelta(hours=9))
now = datetime.now(jst)
return now.strftime('%Y-%m-%d')
# ==================== 缓存管理 ====================
def clear_cache(self, cache_type: str = 'all'):
if cache_type in ['cid', 'all']:
self.cid = None
self.cid_time = 0
if self.storage_type == 'redis':
try:
self.redis.delete('cache:cid')
except:
pass
if cache_type in ['auth', 'all']:
self.auth = None
self.auth_time = 0
if self.storage_type == 'redis':
try:
self.redis.delete('cache:auth')
except:
pass
if cache_type in ['channels', 'all']:
self.channels = None
self.channels_time = 0
if self.storage_type == 'redis':
try:
self.redis.delete('cache:channels')
except:
pass
if cache_type in ['streams', 'all']:
self.stream_info.clear()
if cache_type in ['epg', 'all']:
self.epg.clear()
# 清理磁盘文件
if self.storage_type == 'disk' and self.cache_dir:
try:
for epg_file in (self.cache_dir / 'epg').glob('*.pkl'):
epg_file.unlink()
(self.cache_dir / 'epg_cache.pkl').unlink(missing_ok=True)
except:
pass
# 保存更改
if self.storage_type == 'disk':
self._save_to_disk()
def get_stats(self) -> dict:
return {
'storage_type': self.storage_type,
'cid': {
'cached': self.cid is not None,
'value': self.cid if self.cid else None,
'age': f"{int(time.time() - self.cid_time)}s" if self.cid else None,
'ttl': f"{Config.CACHE_TTL['CID'] - int(time.time() - self.cid_time)}s" if self.cid else None,
'storage': self.storage_type
},
'auth': {
'cached': self.auth is not None,
'token': self.auth['access_token'] if self.auth and 'access_token' in self.auth else None,
'age': f"{int(time.time() - self.auth_time)}s" if self.auth else None,
'ttl': f"{Config.CACHE_TTL['AUTH'] - int(time.time() - self.auth_time)}s" if self.auth else None,
'storage': self.storage_type
},
'channels': self.channels is not None,
'streams': len(self.stream_info),
'epg': len(self.epg),
'epg_detail': self.get_epg_cache_info() if self.epg else None
}
def get_epg_cache_info(self) -> dict:
"""获取 EPG 缓存详细信息"""
info = {
'total_entries': len(self.epg),
'by_channel': {},
'by_date': {},
'full_cache_available': False
}
# 检查全量缓存
if '_all__full' in self.epg:
info['full_cache_available'] = True
full_cache = self.epg['_all__full']
info['full_cache_time'] = datetime.fromtimestamp(full_cache['time']).strftime('%Y-%m-%d %H:%M:%S')
info['full_cache_age'] = f"{int(time.time() - full_cache['time'])}s"
# 统计各频道和日期
for key, value in self.epg.items():
if key == '_all__full':
continue
vid = value.get('vid', 'unknown')
date = value.get('date', 'unknown')
if vid not in info['by_channel']:
info['by_channel'][vid] = {'dates': [], 'program_count': 0}
info['by_channel'][vid]['dates'].append(date)
info['by_channel'][vid]['program_count'] += len(value.get('data', []))
if date not in info['by_date']:
info['by_date'][date] = {'channels': [], 'program_count': 0}
info['by_date'][date]['channels'].append(vid)
info['by_date'][date]['program_count'] += len(value.get('data', []))
info['summary'] = {
'total_channels': len(info['by_channel']),
'total_dates': len(info['by_date']),
'total_programs': sum(ch['program_count'] for ch in info['by_channel'].values())
}
return info
def __del__(self):
"""程序退出时保存缓存"""
if self.storage_type == 'disk':
self._save_to_disk(force=True)
print("💾 缓存已保存到磁盘")
# 全局单例
cache = CacheManager()