| import json |
| import logging |
| import time |
| from typing import Optional, List, Dict, Union |
|
|
| import requests |
|
|
| from app.decorators.timeit import timeit |
| from app.models.transcriber_model import TranscriptSegment, TranscriptResult |
| from app.transcriber.base import Transcriber |
| from app.utils.logger import get_logger |
| from events import transcription_finished |
|
|
| __version__ = "0.0.3" |
|
|
| API_BASE_URL = "https://member.bilibili.com/x/bcut/rubick-interface" |
|
|
| |
| API_REQ_UPLOAD = API_BASE_URL + "/resource/create" |
|
|
| |
| API_COMMIT_UPLOAD = API_BASE_URL + "/resource/create/complete" |
|
|
| |
| API_CREATE_TASK = API_BASE_URL + "/task" |
|
|
| |
| API_QUERY_RESULT = API_BASE_URL + "/task/result" |
|
|
| logger = get_logger(__name__) |
|
|
|
|
| def _bilibili_cookie() -> Optional[str]: |
| """读取「下载配置」里保存的 B 站 Cookie(没有则返回 None)。""" |
| try: |
| from app.services.cookie_manager import CookieConfigManager |
| return CookieConfigManager().get("bilibili") |
| except Exception: |
| return None |
|
|
|
|
| def _with_cookie_hint(msg: str) -> str: |
| """bcut 是 B 站的接口,未带 B 站 Cookie 时容易被风控拒绝(如「第三方服务异常」)。 |
| |
| 未配置 Cookie 时在报错后面追加可行动的提示;已配置则原样返回。 |
| """ |
| if "下载配置" in msg: |
| return msg |
| if _bilibili_cookie(): |
| return msg |
| return ( |
| f"{msg}。bcut(必剪)转写走的是 B 站接口,未配置 B 站 Cookie 时容易被风控拒绝:" |
| "请在「设置 → 下载配置」中填写 B 站 Cookie 后重试," |
| "或在「设置 → 音频转写配置」中切换为本地转写引擎(fast-whisper / mlx-whisper)。" |
| ) |
|
|
|
|
| class BcutTranscriber(Transcriber): |
| """必剪 语音识别接口""" |
| headers = { |
| 'User-Agent': 'Bilibili/1.0.0 (https://www.bilibili.com)', |
| 'Content-Type': 'application/json' |
| } |
|
|
| def __init__(self): |
| self.session = requests.Session() |
| |
| cookie = _bilibili_cookie() |
| if cookie: |
| self.headers = {**self.headers, 'Cookie': cookie} |
| self.task_id = None |
| self.__etags = [] |
|
|
| self.__in_boss_key: Optional[str] = None |
| self.__resource_id: Optional[str] = None |
| self.__upload_id: Optional[str] = None |
| self.__upload_urls: List[str] = [] |
| self.__per_size: Optional[int] = None |
| self.__clips: Optional[int] = None |
|
|
| self.__etags: List[str] = [] |
| self.__download_url: Optional[str] = None |
| self.task_id: Optional[str] = None |
| |
| def _load_file(self, file_path: str) -> bytes: |
| """读取文件内容""" |
| with open(file_path, 'rb') as f: |
| return f.read() |
|
|
| def _upload(self, file_path: str) -> None: |
| """申请上传""" |
| file_binary = self._load_file(file_path) |
| if not file_binary: |
| raise ValueError("无法读取文件数据") |
| |
| payload = json.dumps({ |
| "type": 2, |
| "name": "audio.mp3", |
| "size": len(file_binary), |
| "ResourceFileType": "mp3", |
| "model_id": "8", |
| }) |
|
|
| resp = self.session.post( |
| API_REQ_UPLOAD, |
| data=payload, |
| headers=self.headers |
| ) |
| resp.raise_for_status() |
| resp = resp.json() |
| resp_data = resp["data"] |
|
|
| self.__in_boss_key = resp_data["in_boss_key"] |
| self.__resource_id = resp_data["resource_id"] |
| self.__upload_id = resp_data["upload_id"] |
| self.__upload_urls = resp_data["upload_urls"] |
| self.__per_size = resp_data["per_size"] |
| self.__clips = len(resp_data["upload_urls"]) |
|
|
| logger.info( |
| f"申请上传成功, 总计大小{resp_data['size'] // 1024}KB, {self.__clips}分片, 分片大小{resp_data['per_size'] // 1024}KB: {self.__in_boss_key}" |
| ) |
| self.__upload_part(file_binary) |
| self.__commit_upload() |
|
|
| def __upload_part(self, file_binary: bytes) -> None: |
| """上传音频数据""" |
| for clip in range(self.__clips): |
| start_range = clip * self.__per_size |
| end_range = min((clip + 1) * self.__per_size, len(file_binary)) |
| logger.info(f"开始上传分片{clip}: {start_range}-{end_range}") |
| resp = self.session.put( |
| self.__upload_urls[clip], |
| data=file_binary[start_range:end_range], |
| headers={'Content-Type': 'application/octet-stream'} |
| ) |
| resp.raise_for_status() |
| etag = resp.headers.get("Etag", "").strip('"') |
| self.__etags.append(etag) |
| logger.info(f"分片{clip}上传成功: {etag}") |
|
|
| def __commit_upload(self) -> None: |
| """提交上传数据""" |
| data = json.dumps({ |
| "InBossKey": self.__in_boss_key, |
| "ResourceId": self.__resource_id, |
| "Etags": ",".join(self.__etags), |
| "UploadId": self.__upload_id, |
| "model_id": "8", |
| }) |
| resp = self.session.post( |
| API_COMMIT_UPLOAD, |
| data=data, |
| headers=self.headers |
| ) |
| resp.raise_for_status() |
| resp = resp.json() |
| print('Bili',resp) |
| if resp.get("code") != 0: |
| error_msg = f"上传提交失败: {resp.get('message', '未知错误')}" |
| logger.error(error_msg) |
| raise Exception(error_msg) |
| |
| self.__download_url = resp["data"]["download_url"] |
| logger.info(f"提交成功,下载链接: {self.__download_url}") |
|
|
| def _create_task(self) -> str: |
| """开始创建转换任务""" |
| resp = self.session.post( |
| API_CREATE_TASK, json={"resource": self.__download_url, "model_id": "8"}, headers=self.headers |
| ) |
| resp.raise_for_status() |
| resp = resp.json() |
| if resp.get("code") != 0: |
| error_msg = f"创建任务失败: {resp.get('message', '未知错误')}" |
| logger.error(error_msg) |
| raise Exception(error_msg) |
| |
| self.task_id = resp["data"]["task_id"] |
| logger.info(f"任务已创建: {self.task_id}") |
| return self.task_id |
|
|
| def _query_result(self) -> dict: |
| """查询转换结果""" |
| resp = self.session.get( |
| API_QUERY_RESULT, |
| params={"model_id": 7, "task_id": self.task_id}, |
| headers=self.headers |
| ) |
| resp.raise_for_status() |
| resp = resp.json() |
| if resp.get("code") != 0: |
| error_msg = f"查询结果失败: {resp.get('message', '未知错误')}" |
| logger.error(error_msg) |
| raise Exception(error_msg) |
| |
| return resp["data"] |
|
|
| @timeit |
| def transcript(self, file_path: str) -> TranscriptResult: |
| """执行识别过程,符合 Transcriber 接口""" |
| try: |
| logger.info(f"开始处理文件: {file_path}") |
| |
| |
| logger.info("正在上传文件...") |
| self._upload(file_path) |
| |
| |
| logger.info("提交转录任务...") |
| self._create_task() |
| |
| |
| logger.info("等待转录结果...") |
| task_resp = None |
| max_retries = 500 |
| for i in range(max_retries): |
| task_resp = self._query_result() |
| |
| if task_resp["state"] == 4: |
| break |
| elif task_resp["state"] == 3: |
| error_msg = f"B站ASR任务失败,状态码: {task_resp['state']}" |
| logger.error(error_msg) |
| raise Exception(error_msg) |
| |
| |
| if i % 10 == 0: |
| logger.info(f"转录进行中... {i}/{max_retries}") |
| |
| time.sleep(1) |
| |
| if not task_resp or task_resp["state"] != 4: |
| error_msg = f"B站ASR任务未能完成,状态: {task_resp.get('state') if task_resp else 'Unknown'}" |
| logger.error(error_msg) |
| raise Exception(error_msg) |
| |
| |
| logger.info("转录成功,处理结果...") |
| result_json = json.loads(task_resp["result"]) |
| |
| |
| segments = [] |
| full_text = "" |
| |
| for u in result_json.get("utterances", []): |
| text = u.get("transcript", "").strip() |
| |
| start_time = float(u.get("start_time", 0)) / 1000.0 |
| end_time = float(u.get("end_time", 0)) / 1000.0 |
| |
| full_text += text + " " |
| segments.append(TranscriptSegment( |
| start=start_time, |
| end=end_time, |
| text=text |
| )) |
| |
| |
| result = TranscriptResult( |
| language=result_json.get("language", "zh"), |
| full_text=full_text.strip(), |
| segments=segments, |
| raw=result_json |
| ) |
| |
| |
| |
| |
| return result |
| |
| except Exception as e: |
| logger.error(f"B站ASR处理失败: {str(e)}") |
| |
| raise Exception(_with_cookie_hint(str(e))) from e |
|
|
| def on_finish(self, video_path: str, result: TranscriptResult) -> None: |
| """转录完成的回调""" |
| logger.info(f"B站ASR转写完成: {video_path}") |
| transcription_finished.send({ |
| "file_path": video_path, |
| }) |