import json import logging import time from typing import Optional, List, Dict, Union import requests from app.decorators.timeit import timeit from app.models.transcriber_model import TranscriptSegment, TranscriptResult from app.transcriber.base import Transcriber from app.utils.logger import get_logger from events import transcription_finished __version__ = "0.0.3" API_BASE_URL = "https://member.bilibili.com/x/bcut/rubick-interface" # 申请上传 API_REQ_UPLOAD = API_BASE_URL + "/resource/create" # 提交上传 API_COMMIT_UPLOAD = API_BASE_URL + "/resource/create/complete" # 创建任务 API_CREATE_TASK = API_BASE_URL + "/task" # 查询结果 API_QUERY_RESULT = API_BASE_URL + "/task/result" logger = get_logger(__name__) def _bilibili_cookie() -> Optional[str]: """读取「下载配置」里保存的 B 站 Cookie(没有则返回 None)。""" try: from app.services.cookie_manager import CookieConfigManager return CookieConfigManager().get("bilibili") except Exception: return None def _with_cookie_hint(msg: str) -> str: """bcut 是 B 站的接口,未带 B 站 Cookie 时容易被风控拒绝(如「第三方服务异常」)。 未配置 Cookie 时在报错后面追加可行动的提示;已配置则原样返回。 """ if "下载配置" in msg: # 已带提示,保持幂等 return msg if _bilibili_cookie(): return msg return ( f"{msg}。bcut(必剪)转写走的是 B 站接口,未配置 B 站 Cookie 时容易被风控拒绝:" "请在「设置 → 下载配置」中填写 B 站 Cookie 后重试," "或在「设置 → 音频转写配置」中切换为本地转写引擎(fast-whisper / mlx-whisper)。" ) class BcutTranscriber(Transcriber): """必剪 语音识别接口""" headers = { 'User-Agent': 'Bilibili/1.0.0 (https://www.bilibili.com)', 'Content-Type': 'application/json' } def __init__(self): self.session = requests.Session() # 带上「下载配置」里的 B 站 Cookie(如有),降低被 B 站风控拒绝的概率 cookie = _bilibili_cookie() if cookie: self.headers = {**self.headers, 'Cookie': cookie} self.task_id = None self.__etags = [] self.__in_boss_key: Optional[str] = None self.__resource_id: Optional[str] = None self.__upload_id: Optional[str] = None self.__upload_urls: List[str] = [] self.__per_size: Optional[int] = None self.__clips: Optional[int] = None self.__etags: List[str] = [] self.__download_url: Optional[str] = None self.task_id: Optional[str] = None def _load_file(self, file_path: str) -> bytes: """读取文件内容""" with open(file_path, 'rb') as f: return f.read() def _upload(self, file_path: str) -> None: """申请上传""" file_binary = self._load_file(file_path) if not file_binary: raise ValueError("无法读取文件数据") payload = json.dumps({ "type": 2, "name": "audio.mp3", "size": len(file_binary), "ResourceFileType": "mp3", "model_id": "8", }) resp = self.session.post( API_REQ_UPLOAD, data=payload, headers=self.headers ) resp.raise_for_status() resp = resp.json() resp_data = resp["data"] self.__in_boss_key = resp_data["in_boss_key"] self.__resource_id = resp_data["resource_id"] self.__upload_id = resp_data["upload_id"] self.__upload_urls = resp_data["upload_urls"] self.__per_size = resp_data["per_size"] self.__clips = len(resp_data["upload_urls"]) logger.info( f"申请上传成功, 总计大小{resp_data['size'] // 1024}KB, {self.__clips}分片, 分片大小{resp_data['per_size'] // 1024}KB: {self.__in_boss_key}" ) self.__upload_part(file_binary) self.__commit_upload() def __upload_part(self, file_binary: bytes) -> None: """上传音频数据""" for clip in range(self.__clips): start_range = clip * self.__per_size end_range = min((clip + 1) * self.__per_size, len(file_binary)) logger.info(f"开始上传分片{clip}: {start_range}-{end_range}") resp = self.session.put( self.__upload_urls[clip], data=file_binary[start_range:end_range], headers={'Content-Type': 'application/octet-stream'} ) resp.raise_for_status() etag = resp.headers.get("Etag", "").strip('"') self.__etags.append(etag) logger.info(f"分片{clip}上传成功: {etag}") def __commit_upload(self) -> None: """提交上传数据""" data = json.dumps({ "InBossKey": self.__in_boss_key, "ResourceId": self.__resource_id, "Etags": ",".join(self.__etags), "UploadId": self.__upload_id, "model_id": "8", }) resp = self.session.post( API_COMMIT_UPLOAD, data=data, headers=self.headers ) resp.raise_for_status() resp = resp.json() print('Bili',resp) if resp.get("code") != 0: error_msg = f"上传提交失败: {resp.get('message', '未知错误')}" logger.error(error_msg) raise Exception(error_msg) self.__download_url = resp["data"]["download_url"] logger.info(f"提交成功,下载链接: {self.__download_url}") def _create_task(self) -> str: """开始创建转换任务""" resp = self.session.post( API_CREATE_TASK, json={"resource": self.__download_url, "model_id": "8"}, headers=self.headers ) resp.raise_for_status() resp = resp.json() if resp.get("code") != 0: error_msg = f"创建任务失败: {resp.get('message', '未知错误')}" logger.error(error_msg) raise Exception(error_msg) self.task_id = resp["data"]["task_id"] logger.info(f"任务已创建: {self.task_id}") return self.task_id def _query_result(self) -> dict: """查询转换结果""" resp = self.session.get( API_QUERY_RESULT, params={"model_id": 7, "task_id": self.task_id}, headers=self.headers ) resp.raise_for_status() resp = resp.json() if resp.get("code") != 0: error_msg = f"查询结果失败: {resp.get('message', '未知错误')}" logger.error(error_msg) raise Exception(error_msg) return resp["data"] @timeit def transcript(self, file_path: str) -> TranscriptResult: """执行识别过程,符合 Transcriber 接口""" try: logger.info(f"开始处理文件: {file_path}") # 上传文件 logger.info("正在上传文件...") self._upload(file_path) # 创建任务 logger.info("提交转录任务...") self._create_task() # 轮询检查任务状态 logger.info("等待转录结果...") task_resp = None max_retries = 500 for i in range(max_retries): task_resp = self._query_result() if task_resp["state"] == 4: # 完成状态 break elif task_resp["state"] == 3: # 失败状态 error_msg = f"B站ASR任务失败,状态码: {task_resp['state']}" logger.error(error_msg) raise Exception(error_msg) # 每隔一段时间打印进度 if i % 10 == 0: logger.info(f"转录进行中... {i}/{max_retries}") time.sleep(1) if not task_resp or task_resp["state"] != 4: error_msg = f"B站ASR任务未能完成,状态: {task_resp.get('state') if task_resp else 'Unknown'}" logger.error(error_msg) raise Exception(error_msg) # 解析结果 logger.info("转录成功,处理结果...") result_json = json.loads(task_resp["result"]) # 提取分段数据 segments = [] full_text = "" for u in result_json.get("utterances", []): text = u.get("transcript", "").strip() # B站ASR返回的时间戳是毫秒,需要转换为秒 start_time = float(u.get("start_time", 0)) / 1000.0 end_time = float(u.get("end_time", 0)) / 1000.0 full_text += text + " " segments.append(TranscriptSegment( start=start_time, end=end_time, text=text )) # 创建结果对象 result = TranscriptResult( language=result_json.get("language", "zh"), full_text=full_text.strip(), segments=segments, raw=result_json ) # 触发完成事件 # self.on_finish(file_path, result) return result except Exception as e: logger.error(f"B站ASR处理失败: {str(e)}") # 未配置 B 站 Cookie 时附加「去下载配置填 Cookie / 换本地转写引擎」的提示 raise Exception(_with_cookie_hint(str(e))) from e def on_finish(self, video_path: str, result: TranscriptResult) -> None: """转录完成的回调""" logger.info(f"B站ASR转写完成: {video_path}") transcription_finished.send({ "file_path": video_path, })