| import os, shutil, base64, uuid, mimetypes, json, time |
| from pydub import AudioSegment |
| from openai import OpenAI |
| import gradio as gr |
| from fastapi import FastAPI, Request |
| from fastapi.responses import JSONResponse |
| from fastapi.middleware.cors import CORSMiddleware |
|
|
| |
| PASSWORD = os.getenv("APP_PASSWORD", "chou") |
| MAX_SIZE = 25 * 1024 * 1024 |
| client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) |
|
|
| print("===== 🚀 啟動中 =====") |
| print(f"APP_PASSWORD: {'✅ 已載入' if PASSWORD else '❌ 未載入'}") |
|
|
| |
| MIME_EXT = { |
| "audio/mp4": "m4a", "audio/m4a": "m4a", "audio/aac": "aac", |
| "audio/mpeg": "mp3", "audio/wav": "wav", "audio/x-wav": "wav", |
| "audio/ogg": "ogg", "audio/webm": "webm", "audio/opus": "opus", |
| "video/mp4": "mp4", |
| } |
|
|
| def _dataurl_to_file(data_url: str, orig_name: str | None = None) -> str: |
| """將 data URL 轉換為本地檔案""" |
| try: |
| header, b64 = data_url.split(",", 1) |
| except ValueError: |
| raise ValueError("Invalid data URL format") |
| mime = header.split(";")[0].split(":", 1)[-1].strip() |
| ext = MIME_EXT.get(mime) or (mimetypes.guess_extension(mime) or "m4a").lstrip(".") |
| fname = orig_name if (orig_name and "." in orig_name) else f"upload_{uuid.uuid4().hex}.{ext}" |
| with open(fname, "wb") as f: |
| f.write(base64.b64decode(b64)) |
| return fname |
|
|
| def _extract_effective_path(file_obj) -> str: |
| """從各種格式中提取有效檔案路徑""" |
| print(f"[DEBUG] 檔案物件類型: {type(file_obj)}") |
| print(f"[DEBUG] 檔案物件內容: {file_obj}") |
| |
| |
| if file_obj is None: |
| raise FileNotFoundError("File object is None") |
| |
| |
| if isinstance(file_obj, str): |
| s = file_obj.strip().strip('"') |
| print(f"[DEBUG] 字串路徑: {s}") |
| if s.startswith("data:"): |
| return _dataurl_to_file(s, None) |
| if os.path.isfile(s): |
| return s |
| |
| |
| if isinstance(file_obj, dict): |
| print(f"[DEBUG] 字典 keys: {list(file_obj.keys())}") |
| |
| |
| data = file_obj.get("data") |
| if isinstance(data, str) and data.startswith("data:"): |
| return _dataurl_to_file(data, file_obj.get("orig_name")) |
| |
| |
| for key in ["path", "name", "file", "filepath"]: |
| p = file_obj.get(key) |
| if p and isinstance(p, str): |
| p = p.strip().strip('"') |
| if os.path.isfile(p): |
| print(f"[DEBUG] 找到有效路徑 (key={key}): {p}") |
| return p |
| |
| |
| for attr in ["name", "path", "file", "filepath"]: |
| if hasattr(file_obj, attr): |
| p = getattr(file_obj, attr, None) |
| if p and isinstance(p, str): |
| p = p.strip().strip('"') |
| if os.path.isfile(p): |
| print(f"[DEBUG] 找到有效路徑 (attr={attr}): {p}") |
| return p |
| |
| |
| try: |
| path_str = str(file_obj).strip().strip('"') |
| if os.path.isfile(path_str): |
| print(f"[DEBUG] 直接轉換為路徑: {path_str}") |
| return path_str |
| except: |
| pass |
| |
| raise FileNotFoundError(f"Cannot parse uploaded file: {type(file_obj)} - {file_obj}") |
|
|
| def split_audio(path): |
| """將音訊檔案分割成多個小於 25MB 的片段""" |
| size = os.path.getsize(path) |
| if size <= MAX_SIZE: |
| return [path] |
| |
| audio = AudioSegment.from_file(path) |
| n = int(size / MAX_SIZE) + 1 |
| chunk_ms = len(audio) / n |
| parts = [] |
| for i in range(n): |
| fn = f"chunk_{i+1}.wav" |
| audio[int(i*chunk_ms):int((i+1)*chunk_ms)].export(fn, format="wav") |
| parts.append(fn) |
| return parts |
|
|
| def transcribe_core(path, model="whisper-1"): |
| """使用 Whisper 進行語音轉錄,並使用 GPT 進行繁簡轉換和摘要""" |
| print(f"\n{'='*60}") |
| print(f"[transcribe_core] 開始轉錄: {path}") |
| print(f"{'='*60}") |
| |
| start_time = time.time() |
| |
| |
| if path.lower().endswith(".mp4"): |
| fixed = path[:-4] + ".m4a" |
| try: |
| shutil.copy(path, fixed) |
| path = fixed |
| except: |
| pass |
| |
| |
| chunks = split_audio(path) |
| raw = [] |
| for i, c in enumerate(chunks, 1): |
| print(f"[transcribe_core] 轉錄片段 {i}/{len(chunks)}") |
| with open(c, "rb") as af: |
| txt = client.audio.transcriptions.create( |
| model=model, file=af, response_format="text" |
| ) |
| raw.append(txt) |
| |
| raw_txt = "\n".join(raw) |
| |
| |
| conv = client.chat.completions.create( |
| model="gpt-4o-mini", |
| messages=[ |
| {"role":"system","content":"你是嚴格的繁體中文轉換器"}, |
| {"role":"user","content":f"將以下內容轉為台灣繁體,不意譯:\n{raw_txt}"} |
| ], |
| temperature=0.0 |
| ) |
| trad = conv.choices[0].message.content.strip() |
| |
| |
| summ = client.chat.completions.create( |
| model="gpt-4o-mini", |
| messages=[ |
| {"role":"system","content":"你是繁體摘要助手"}, |
| {"role":"user","content":f"請用台灣繁體中文摘要;內容多則條列重點,內容短則一句話:\n{trad}"} |
| ], |
| temperature=0.2 |
| ) |
| summary = summ.choices[0].message.content.strip() |
| |
| total_time = time.time() - start_time |
| print(f"[transcribe_core] ✅ 全部完成! 總耗時: {total_time:.1f}秒\n") |
| |
| return trad, summary |
|
|
| |
| def transcribe_web(password, audio_file): |
| """網頁版轉錄處理 - 必須返回三個值""" |
| print(f"\n{'='*60}") |
| print(f"🌐 [WEB] 收到網頁請求") |
| print(f"密碼: {'已提供' if password else '未提供'}") |
| print(f"檔案: {audio_file}") |
| print(f"{'='*60}") |
| |
| |
| if not password: |
| print("[WEB] ❌ 密碼為空") |
| return "❌ Please enter password", "", "" |
| |
| if password.strip() != PASSWORD: |
| print(f"[WEB] ❌ 密碼錯誤: '{password}' != '{PASSWORD}'") |
| return "❌ Incorrect password", "", "" |
| |
| |
| if not audio_file: |
| print("[WEB] ❌ 未上傳檔案") |
| return "⚠️ Please upload an audio file", "", "" |
| |
| try: |
| |
| print(f"[WEB] 開始處理檔案...") |
| path = _extract_effective_path(audio_file) |
| print(f"[WEB] ✅ 檔案路徑: {path}") |
| |
| |
| print(f"[WEB] 開始轉錄...") |
| text, summary = transcribe_core(path) |
| |
| |
| char_count = len(text) |
| status = f"✅ Completed! ({char_count} characters)" |
| |
| print(f"[WEB] ✅ 轉錄成功\n") |
| return status, text, summary |
| |
| except Exception as e: |
| import traceback |
| error_msg = traceback.format_exc() |
| print(f"❌ [WEB] 發生錯誤:\n{error_msg}\n") |
| return f"❌ Error: {str(e)}", "", "" |
|
|
| |
| fastapi_app = FastAPI() |
|
|
| fastapi_app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| @fastapi_app.post("/api/transcribe") |
| async def api_transcribe(request: Request): |
| """API 端點 - 用於手機等外部調用""" |
| try: |
| body = await request.json() |
| print(f"\n{'='*60}") |
| print(f"📱 [API] 收到 API 請求") |
| print(f"{'='*60}") |
| |
| |
| password = body.get("password", "") |
| if password.strip() != PASSWORD: |
| print(f"[API] ❌ 密碼錯誤") |
| return JSONResponse( |
| status_code=401, |
| content={"status": "error", "error": "Password incorrect"} |
| ) |
| |
| |
| file_data = body.get("file_data", "") |
| file_name = body.get("file_name", "recording.m4a") |
| |
| if not file_data or not file_data.startswith("data:"): |
| print(f"[API] ❌ 檔案格式錯誤") |
| return JSONResponse( |
| status_code=400, |
| content={"status": "error", "error": "Invalid file data format"} |
| ) |
| |
| |
| file_dict = {"data": file_data, "orig_name": file_name} |
| path = _extract_effective_path(file_dict) |
| print(f"[API] ✅ 檔案解析成功: {path}") |
| |
| |
| text, summary = transcribe_core(path) |
| |
| result = { |
| "status": "success", |
| "transcription": text, |
| "summary": summary |
| } |
| |
| print(f"[API] ✅ 轉錄成功\n") |
| return JSONResponse(content=result) |
| |
| except Exception as e: |
| import traceback |
| error_trace = traceback.format_exc() |
| print(f"❌ [API] 發生錯誤:\n{error_trace}\n") |
| return JSONResponse( |
| status_code=500, |
| content={"status": "error", "error": str(e)} |
| ) |
|
|
| |
| with gr.Blocks(title="Audio Transcription", theme=gr.themes.Soft()) as demo: |
| |
| gr.Markdown(""" |
| # 🎧 Audio Transcription Service |
| ### AI-Powered Speech-to-Text with Summarization |
| """) |
| |
| with gr.Row(): |
| with gr.Column(scale=1): |
| gr.Markdown("### 📤 Upload") |
| |
| password_input = gr.Textbox( |
| label="Password", |
| type="password", |
| placeholder="Enter password" |
| ) |
| |
| audio_input = gr.File( |
| label="Audio File", |
| file_types=["audio", ".mp3", ".m4a", ".wav", ".ogg", ".webm", ".mp4"], |
| file_count="single" |
| ) |
| |
| submit_btn = gr.Button( |
| "🚀 Start Transcription", |
| variant="primary", |
| size="lg" |
| ) |
| |
| gr.Markdown(""" |
| **Supported formats:** |
| MP3, M4A, WAV, OGG, WEBM, MP4 |
| |
| **Processing:** |
| Automatic chunking for large files |
| """) |
| |
| with gr.Column(scale=2): |
| gr.Markdown("### 📊 Results") |
| |
| status_output = gr.Textbox( |
| label="Status", |
| interactive=False, |
| lines=1 |
| ) |
| |
| transcription_output = gr.Textbox( |
| label="Transcription", |
| lines=12, |
| show_copy_button=True |
| ) |
| |
| summary_output = gr.Textbox( |
| label="Summary", |
| lines=6, |
| show_copy_button=True |
| ) |
| |
| gr.Markdown("---") |
| |
| gr.Markdown(""" |
| ## 📱 API Integration |
| |
| **Endpoint:** `POST /api/transcribe` |
| |
| **Request:** |
| ```json |
| { |
| "password": "your_password", |
| "file_data": "data:audio/m4a;base64,...", |
| "file_name": "recording.m4a" |
| } |
| ``` |
| |
| **Response:** |
| ```json |
| { |
| "status": "success", |
| "transcription": "...", |
| "summary": "..." |
| } |
| ``` |
| """) |
| |
| |
| submit_btn.click( |
| fn=transcribe_web, |
| inputs=[password_input, audio_input], |
| outputs=[status_output, transcription_output, summary_output], |
| api_name="transcribe" |
| ) |
|
|
| |
| app = gr.mount_gradio_app(fastapi_app, demo, path="/") |
|
|
| |
| if __name__ == "__main__": |
| print("\n" + "="*60) |
| print("🚀 服務啟動") |
| print("🌐 網頁: http://0.0.0.0:7860") |
| print("📱 API: http://0.0.0.0:7860/api/transcribe") |
| print("="*60 + "\n") |
| import uvicorn |
| uvicorn.run(app, host="0.0.0.0", port=7860) |