| import os, shutil, base64, uuid, mimetypes |
| from pydub import AudioSegment |
| from openai import OpenAI |
| import gradio as gr |
| from fastapi import Request |
| from fastapi.responses import JSONResponse |
|
|
| |
| PASSWORD = os.getenv("APP_PASSWORD", "chou") |
| MAX_SIZE = 25 * 1024 * 1024 |
| client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) |
|
|
| print("===== 🚀 啟動中 =====") |
| print(f"APP_PASSWORD: {'✅ 已載入' if PASSWORD else '❌ 未載入'}") |
|
|
| |
| MIME_EXT = { |
| "audio/mp4": "m4a", "audio/m4a": "m4a", "audio/aac": "aac", |
| "audio/mpeg": "mp3", "audio/wav": "wav", "audio/x-wav": "wav", |
| "audio/ogg": "ogg", "audio/webm": "webm", "audio/opus": "opus", |
| "video/mp4": "mp4", |
| } |
|
|
| def _dataurl_to_file(data_url: str, orig_name: str | None = None) -> str: |
| try: |
| header, b64 = data_url.split(",", 1) |
| except ValueError: |
| raise ValueError("data URL format error") |
| mime = header.split(";")[0].split(":", 1)[-1].strip() |
| ext = MIME_EXT.get(mime) or (mimetypes.guess_extension(mime) or "m4a").lstrip(".") |
| fname = orig_name if (orig_name and "." in orig_name) else f"upload_{uuid.uuid4().hex}.{ext}" |
| with open(fname, "wb") as f: |
| f.write(base64.b64decode(b64)) |
| return fname |
|
|
| def _extract_effective_path(file_obj) -> str: |
| """從各種格式中提取有效檔案路徑""" |
| |
| if isinstance(file_obj, str): |
| s = file_obj.strip().strip('"') |
| if s.startswith("data:"): |
| return _dataurl_to_file(s, None) |
| if os.path.isfile(s): |
| return s |
| |
| if isinstance(file_obj, dict): |
| data = file_obj.get("data") |
| if isinstance(data, str) and data.startswith("data:"): |
| return _dataurl_to_file(data, file_obj.get("orig_name")) |
| p = str(file_obj.get("path") or "").strip().strip('"') |
| if p and os.path.isfile(p): |
| return p |
| |
| for attr in ("name", "path"): |
| p = getattr(file_obj, attr, None) |
| if isinstance(p, str): |
| s = p.strip().strip('"') |
| if os.path.isfile(s): |
| return s |
| raise FileNotFoundError("Cannot parse uploaded file") |
|
|
| |
| def split_audio(path): |
| size = os.path.getsize(path) |
| if size <= MAX_SIZE: |
| return [path] |
| audio = AudioSegment.from_file(path) |
| n = int(size / MAX_SIZE) + 1 |
| chunk_ms = len(audio) / n |
| parts = [] |
| for i in range(n): |
| fn = f"chunk_{i+1}.wav" |
| audio[int(i*chunk_ms):int((i+1)*chunk_ms)].export(fn, format="wav") |
| parts.append(fn) |
| return parts |
|
|
| |
| def transcribe_core(path, model="whisper-1"): |
| if path.lower().endswith(".mp4"): |
| fixed = path[:-4] + ".m4a" |
| try: |
| shutil.copy(path, fixed) |
| path = fixed |
| except: |
| pass |
| chunks = split_audio(path) |
| raw = [] |
| for c in chunks: |
| with open(c, "rb") as af: |
| txt = client.audio.transcriptions.create( |
| model=model, file=af, response_format="text" |
| ) |
| raw.append(txt) |
| raw_txt = "\n".join(raw) |
| |
| conv = client.chat.completions.create( |
| model="gpt-4o-mini", |
| messages=[ |
| {"role":"system","content":"你是嚴格的繁體中文轉換器"}, |
| {"role":"user","content":f"將以下內容轉為台灣繁體,不意譯:\n{raw_txt}"} |
| ], |
| temperature=0.0 |
| ) |
| trad = conv.choices[0].message.content.strip() |
| |
| summ = client.chat.completions.create( |
| model="gpt-4o-mini", |
| messages=[ |
| {"role":"system","content":"你是繁體摘要助手"}, |
| {"role":"user","content":f"請用台灣繁體中文摘要;內容多則條列重點,內容短則一句話:\n{trad}"} |
| ], |
| temperature=0.2 |
| ) |
| return trad, summ.choices[0].message.content.strip() |
|
|
| |
| def transcribe_ui(password, file): |
| print(f"\n🎯 Web UI Request | Password: {password[:2] if password else ''}***") |
| if not password or password.strip() != PASSWORD: |
| return "❌ Password incorrect", "", "" |
| if not file: |
| return "⚠️ No file uploaded", "", "" |
| try: |
| path = _extract_effective_path(file) |
| text, summary = transcribe_core(path) |
| return "✅ Transcription completed", text, summary |
| except Exception as e: |
| print(f"❌ Error: {e}") |
| return f"❌ Error: {e}", "", "" |
|
|
| |
| with gr.Blocks(theme=gr.themes.Soft(), title="LINE Audio Transcription") as demo: |
| gr.Markdown("# 🎧 LINE Audio Transcription & Summary") |
| |
| with gr.Row(): |
| with gr.Column(scale=1): |
| pw_ui = gr.Textbox(label="Password", type="password", placeholder="Enter password") |
| file_ui = gr.File(label="Upload Audio File", file_types=["audio"]) |
| btn_ui = gr.Button("Start Transcription 🚀", variant="primary", size="lg") |
| |
| with gr.Column(scale=2): |
| status_ui = gr.Textbox(label="Status", interactive=False) |
| transcript_ui = gr.Textbox(label="Transcription Result", lines=10) |
| summary_ui = gr.Textbox(label="AI Summary", lines=6) |
| |
| btn_ui.click(transcribe_ui, [pw_ui, file_ui], [status_ui, transcript_ui, summary_ui]) |
| |
| with gr.Accordion("📱 API Documentation (iPhone Shortcut)", open=False): |
| gr.Markdown(""" |
| ### API Endpoint |
| **POST** `/api/transcribe` |
| |
| ### Request Format (JSON) |
| ```json |
| { |
| "password": "your_password", |
| "file": { |
| "data": "data:audio/m4a;base64,UklGR...", |
| "orig_name": "recording.m4a" |
| } |
| } |
| ``` |
| |
| ### Response Format |
| ```json |
| { |
| "status": "success", |
| "transcription": "轉錄內容...", |
| "summary": "摘要內容..." |
| } |
| ``` |
| |
| 💡 **Tip**: Use this endpoint in iPhone Shortcuts for automated transcription |
| """) |
|
|
| |
| @demo.fastapi_app.post("/api/transcribe") |
| async def api_transcribe(request: Request): |
| """iPhone 捷徑專用的 API 端點""" |
| try: |
| body = await request.json() |
| print(f"\n🎯 API Request | Keys: {list(body.keys())}") |
| |
| password = body.get("password", "") |
| if password.strip() != PASSWORD: |
| return JSONResponse(status_code=401, content={"error": "Password incorrect"}) |
| |
| file_obj = body.get("file") |
| if not file_obj: |
| return JSONResponse(status_code=400, content={"error": "No file provided"}) |
| |
| path = _extract_effective_path(file_obj) |
| text, summary = transcribe_core(path) |
| |
| return JSONResponse(content={ |
| "status": "success", |
| "transcription": text, |
| "summary": summary |
| }) |
| except Exception as e: |
| import traceback |
| print(f"❌ API Error:\n{traceback.format_exc()}") |
| return JSONResponse(status_code=500, content={"error": str(e)}) |
|
|
| |
| if __name__ == "__main__": |
| demo.launch(server_name="0.0.0.0", server_port=7860) |