|
|
import os, shutil, base64, uuid, mimetypes, json, time |
|
|
from pydub import AudioSegment |
|
|
from openai import OpenAI |
|
|
import gradio as gr |
|
|
from fastapi import FastAPI, Request |
|
|
from fastapi.responses import JSONResponse |
|
|
from fastapi.middleware.cors import CORSMiddleware |
|
|
|
|
|
|
|
|
PASSWORD = os.getenv("APP_PASSWORD") |
|
|
if not PASSWORD: |
|
|
raise ValueError("APP_PASSWORD environment variable is not set!") |
|
|
|
|
|
MAX_SIZE = 25 * 1024 * 1024 |
|
|
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) |
|
|
|
|
|
print("===== 🚀 啟動中 =====") |
|
|
print(f"APP_PASSWORD: ✅ 已載入") |
|
|
|
|
|
|
|
|
MIME_EXT = { |
|
|
"audio/mp4": "m4a", "audio/m4a": "m4a", "audio/aac": "aac", |
|
|
"audio/mpeg": "mp3", "audio/wav": "wav", "audio/x-wav": "wav", |
|
|
"audio/ogg": "ogg", "audio/webm": "webm", "audio/opus": "opus", |
|
|
"video/mp4": "mp4", |
|
|
} |
|
|
|
|
|
def _dataurl_to_file(data_url: str, orig_name: str | None = None) -> str: |
|
|
"""將 data URL 轉換為本地檔案""" |
|
|
try: |
|
|
header, b64 = data_url.split(",", 1) |
|
|
except ValueError: |
|
|
raise ValueError("Invalid data URL format") |
|
|
mime = header.split(";")[0].split(":", 1)[-1].strip() |
|
|
ext = MIME_EXT.get(mime) or (mimetypes.guess_extension(mime) or "m4a").lstrip(".") |
|
|
fname = orig_name if (orig_name and "." in orig_name) else f"upload_{uuid.uuid4().hex}.{ext}" |
|
|
with open(fname, "wb") as f: |
|
|
f.write(base64.b64decode(b64)) |
|
|
return fname |
|
|
|
|
|
def _extract_effective_path(file_obj) -> str: |
|
|
"""從各種格式中提取有效檔案路徑""" |
|
|
print(f"[DEBUG] 檔案物件類型: {type(file_obj)}") |
|
|
print(f"[DEBUG] 檔案物件內容: {file_obj}") |
|
|
|
|
|
if file_obj is None: |
|
|
raise FileNotFoundError("File object is None") |
|
|
|
|
|
|
|
|
if isinstance(file_obj, str): |
|
|
s = file_obj.strip().strip('"') |
|
|
print(f"[DEBUG] 字串路徑: {s}") |
|
|
if s.startswith("data:"): |
|
|
return _dataurl_to_file(s, None) |
|
|
if os.path.isfile(s): |
|
|
return s |
|
|
|
|
|
|
|
|
if isinstance(file_obj, dict): |
|
|
print(f"[DEBUG] 字典 keys: {list(file_obj.keys())}") |
|
|
|
|
|
|
|
|
data = file_obj.get("data") |
|
|
if isinstance(data, str) and data.startswith("data:"): |
|
|
return _dataurl_to_file(data, file_obj.get("orig_name")) |
|
|
|
|
|
|
|
|
for key in ["path", "name", "file", "filepath"]: |
|
|
p = file_obj.get(key) |
|
|
if p and isinstance(p, str): |
|
|
p = p.strip().strip('"') |
|
|
if os.path.isfile(p): |
|
|
print(f"[DEBUG] 找到路徑 (key={key}): {p}") |
|
|
return p |
|
|
|
|
|
|
|
|
for attr in ["name", "path", "file", "filepath"]: |
|
|
if hasattr(file_obj, attr): |
|
|
p = getattr(file_obj, attr, None) |
|
|
if p and isinstance(p, str): |
|
|
p = p.strip().strip('"') |
|
|
if os.path.isfile(p): |
|
|
print(f"[DEBUG] 找到路徑 (attr={attr}): {p}") |
|
|
return p |
|
|
|
|
|
|
|
|
try: |
|
|
path_str = str(file_obj).strip().strip('"') |
|
|
if os.path.isfile(path_str): |
|
|
print(f"[DEBUG] 直接路徑: {path_str}") |
|
|
return path_str |
|
|
except: |
|
|
pass |
|
|
|
|
|
raise FileNotFoundError(f"Cannot parse file: {type(file_obj)} - {file_obj}") |
|
|
|
|
|
def split_audio(path): |
|
|
"""將音訊檔案分割成多個小於 25MB 的片段""" |
|
|
size = os.path.getsize(path) |
|
|
if size <= MAX_SIZE: |
|
|
return [path] |
|
|
|
|
|
audio = AudioSegment.from_file(path) |
|
|
n = int(size / MAX_SIZE) + 1 |
|
|
chunk_ms = len(audio) / n |
|
|
parts = [] |
|
|
for i in range(n): |
|
|
fn = f"chunk_{i+1}.wav" |
|
|
audio[int(i*chunk_ms):int((i+1)*chunk_ms)].export(fn, format="wav") |
|
|
parts.append(fn) |
|
|
return parts |
|
|
|
|
|
def transcribe_core(path, model="whisper-1"): |
|
|
"""使用 Whisper 進行語音轉錄,並使用 GPT 進行繁簡轉換和摘要""" |
|
|
print(f"\n{'='*60}") |
|
|
print(f"[transcribe_core] 開始轉錄: {path}") |
|
|
print(f"{'='*60}") |
|
|
|
|
|
start_time = time.time() |
|
|
|
|
|
|
|
|
if path.lower().endswith(".mp4"): |
|
|
fixed = path[:-4] + ".m4a" |
|
|
try: |
|
|
shutil.copy(path, fixed) |
|
|
path = fixed |
|
|
except: |
|
|
pass |
|
|
|
|
|
|
|
|
chunks = split_audio(path) |
|
|
raw = [] |
|
|
for i, c in enumerate(chunks, 1): |
|
|
print(f"[transcribe_core] 轉錄片段 {i}/{len(chunks)}") |
|
|
with open(c, "rb") as af: |
|
|
txt = client.audio.transcriptions.create( |
|
|
model=model, file=af, response_format="text" |
|
|
) |
|
|
raw.append(txt) |
|
|
|
|
|
raw_txt = "\n".join(raw) |
|
|
|
|
|
|
|
|
conv = client.chat.completions.create( |
|
|
model="gpt-4o-mini", |
|
|
messages=[ |
|
|
{"role":"system","content":"你是嚴格的繁體中文轉換器"}, |
|
|
{"role":"user","content":f"將以下內容轉為台灣繁體,不意譯:\n{raw_txt}"} |
|
|
], |
|
|
temperature=0.0 |
|
|
) |
|
|
trad = conv.choices[0].message.content.strip() |
|
|
|
|
|
|
|
|
summ = client.chat.completions.create( |
|
|
model="gpt-4o-mini", |
|
|
messages=[ |
|
|
{"role":"system","content":"你是繁體摘要助手"}, |
|
|
{"role":"user","content":f"請用台灣繁體中文摘要;內容多則條列重點,內容短則一句話:\n{trad}"} |
|
|
], |
|
|
temperature=0.2 |
|
|
) |
|
|
summary = summ.choices[0].message.content.strip() |
|
|
|
|
|
total_time = time.time() - start_time |
|
|
print(f"[transcribe_core] ✅ 完成! 耗時: {total_time:.1f}秒\n") |
|
|
|
|
|
return trad, summary |
|
|
|
|
|
|
|
|
def transcribe_web(password, audio_file): |
|
|
"""網頁版轉錄處理""" |
|
|
print(f"\n{'='*60}") |
|
|
print(f"🌐 [WEB] 收到請求") |
|
|
print(f"密碼: {'已提供' if password else '未提供'}") |
|
|
print(f"檔案: {audio_file}") |
|
|
print(f"{'='*60}") |
|
|
|
|
|
|
|
|
if not password: |
|
|
print("[WEB] ❌ 密碼為空") |
|
|
return "", "❌ Please enter password", "", "" |
|
|
|
|
|
if password.strip() != PASSWORD: |
|
|
print(f"[WEB] ❌ 密碼錯誤") |
|
|
return "", "❌ Incorrect password", "", "" |
|
|
|
|
|
|
|
|
if not audio_file: |
|
|
print("[WEB] ❌ 未上傳檔案") |
|
|
return "", "⚠️ Please upload audio file", "", "" |
|
|
|
|
|
try: |
|
|
|
|
|
print(f"[WEB] 處理檔案...") |
|
|
path = _extract_effective_path(audio_file) |
|
|
print(f"[WEB] ✅ 檔案: {path}") |
|
|
|
|
|
|
|
|
file_name = os.path.basename(path) |
|
|
|
|
|
|
|
|
print(f"[WEB] 開始轉錄...") |
|
|
text, summary = transcribe_core(path) |
|
|
|
|
|
|
|
|
char_count = len(text) |
|
|
status = f"✅ Completed! ({char_count} chars)" |
|
|
|
|
|
print(f"[WEB] ✅ 成功\n") |
|
|
return file_name, status, text, summary |
|
|
|
|
|
except Exception as e: |
|
|
import traceback |
|
|
error_msg = traceback.format_exc() |
|
|
print(f"❌ [WEB] 錯誤:\n{error_msg}\n") |
|
|
return "", f"❌ Error: {str(e)}", "", "" |
|
|
|
|
|
|
|
|
fastapi_app = FastAPI() |
|
|
|
|
|
fastapi_app.add_middleware( |
|
|
CORSMiddleware, |
|
|
allow_origins=["*"], |
|
|
allow_credentials=True, |
|
|
allow_methods=["*"], |
|
|
allow_headers=["*"], |
|
|
) |
|
|
|
|
|
@fastapi_app.post("/api/transcribe") |
|
|
async def api_transcribe(request: Request): |
|
|
"""API 端點 - 用於手機等外部調用""" |
|
|
try: |
|
|
body = await request.json() |
|
|
print(f"\n{'='*60}") |
|
|
print(f"📱 [API] 收到請求") |
|
|
print(f"{'='*60}") |
|
|
|
|
|
|
|
|
password = body.get("password", "") |
|
|
if password.strip() != PASSWORD: |
|
|
print(f"[API] ❌ 密碼錯誤") |
|
|
return JSONResponse( |
|
|
status_code=401, |
|
|
content={"status": "error", "error": "Password incorrect"} |
|
|
) |
|
|
|
|
|
|
|
|
file_data = body.get("file_data", "") |
|
|
file_name = body.get("file_name", "recording.m4a") |
|
|
|
|
|
if not file_data or not file_data.startswith("data:"): |
|
|
print(f"[API] ❌ 檔案格式錯誤") |
|
|
return JSONResponse( |
|
|
status_code=400, |
|
|
content={"status": "error", "error": "Invalid file format"} |
|
|
) |
|
|
|
|
|
|
|
|
file_dict = {"data": file_data, "orig_name": file_name} |
|
|
path = _extract_effective_path(file_dict) |
|
|
print(f"[API] ✅ 檔案: {path}") |
|
|
|
|
|
|
|
|
text, summary = transcribe_core(path) |
|
|
|
|
|
result = { |
|
|
"status": "success", |
|
|
"transcription": text, |
|
|
"summary": summary |
|
|
} |
|
|
|
|
|
print(f"[API] ✅ 成功\n") |
|
|
return JSONResponse(content=result) |
|
|
|
|
|
except Exception as e: |
|
|
import traceback |
|
|
error_trace = traceback.format_exc() |
|
|
print(f"❌ [API] 錯誤:\n{error_trace}\n") |
|
|
return JSONResponse( |
|
|
status_code=500, |
|
|
content={"status": "error", "error": str(e)} |
|
|
) |
|
|
|
|
|
|
|
|
mobile_css = """ |
|
|
/* 基本響應式 */ |
|
|
.gradio-container { |
|
|
max-width: 100% !important; |
|
|
padding: 0.5rem !important; |
|
|
} |
|
|
|
|
|
/* 手機優化 */ |
|
|
@media (max-width: 768px) { |
|
|
.gradio-container { |
|
|
padding: 0.25rem !important; |
|
|
} |
|
|
|
|
|
/* 標題縮小 */ |
|
|
h1 { |
|
|
font-size: 1.5rem !important; |
|
|
} |
|
|
|
|
|
h3 { |
|
|
font-size: 1.1rem !important; |
|
|
} |
|
|
|
|
|
/* 按鈕加大點擊區域 */ |
|
|
button { |
|
|
min-height: 44px !important; |
|
|
font-size: 1rem !important; |
|
|
} |
|
|
|
|
|
/* 輸入框 */ |
|
|
input, textarea { |
|
|
font-size: 16px !important; /* 防止手機自動縮放 */ |
|
|
} |
|
|
|
|
|
/* 行布局改為列布局 */ |
|
|
.row { |
|
|
flex-direction: column !important; |
|
|
} |
|
|
|
|
|
.column { |
|
|
width: 100% !important; |
|
|
max-width: 100% !important; |
|
|
} |
|
|
} |
|
|
|
|
|
/* 確保文字可選取和複製 */ |
|
|
textarea { |
|
|
user-select: text !important; |
|
|
-webkit-user-select: text !important; |
|
|
} |
|
|
""" |
|
|
|
|
|
|
|
|
with gr.Blocks( |
|
|
title="Audio Transcription", |
|
|
theme=gr.themes.Soft(), |
|
|
css=mobile_css |
|
|
) as demo: |
|
|
|
|
|
gr.Markdown("# 🎧 Audio Transcription") |
|
|
gr.Markdown("AI-Powered Speech-to-Text") |
|
|
|
|
|
|
|
|
password_input = gr.Textbox( |
|
|
label="Password", |
|
|
type="password", |
|
|
placeholder="Enter password" |
|
|
) |
|
|
|
|
|
|
|
|
audio_input = gr.UploadButton( |
|
|
label="📁 Choose Audio File", |
|
|
file_types=["audio/*", ".mp3", ".m4a", ".wav", ".ogg", ".webm", ".mp4"], |
|
|
file_count="single" |
|
|
) |
|
|
|
|
|
|
|
|
file_display = gr.Textbox( |
|
|
label="Selected File", |
|
|
interactive=False, |
|
|
placeholder="No file selected" |
|
|
) |
|
|
|
|
|
|
|
|
submit_btn = gr.Button( |
|
|
"🚀 Start Transcription", |
|
|
variant="primary", |
|
|
size="lg" |
|
|
) |
|
|
|
|
|
|
|
|
status_output = gr.Textbox( |
|
|
label="Status", |
|
|
interactive=False |
|
|
) |
|
|
|
|
|
|
|
|
transcription_output = gr.Textbox( |
|
|
label="Transcription", |
|
|
lines=10, |
|
|
max_lines=20 |
|
|
) |
|
|
|
|
|
|
|
|
summary_output = gr.Textbox( |
|
|
label="Summary", |
|
|
lines=5, |
|
|
max_lines=10 |
|
|
) |
|
|
|
|
|
gr.Markdown("---") |
|
|
gr.Markdown(""" |
|
|
### 📱 API Integration |
|
|
|
|
|
**Endpoint:** `POST /api/transcribe` |
|
|
|
|
|
**Request:** |
|
|
```json |
|
|
{ |
|
|
"password": "your_password", |
|
|
"file_data": "data:audio/m4a;base64,...", |
|
|
"file_name": "recording.m4a" |
|
|
} |
|
|
``` |
|
|
|
|
|
**Response:** |
|
|
```json |
|
|
{ |
|
|
"status": "success", |
|
|
"transcription": "...", |
|
|
"summary": "..." |
|
|
} |
|
|
``` |
|
|
""") |
|
|
|
|
|
|
|
|
submit_btn.click( |
|
|
fn=transcribe_web, |
|
|
inputs=[password_input, audio_input], |
|
|
outputs=[file_display, status_output, transcription_output, summary_output] |
|
|
) |
|
|
|
|
|
|
|
|
app = gr.mount_gradio_app(fastapi_app, demo, path="/") |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
print("\n" + "="*60) |
|
|
print("🚀 服務啟動") |
|
|
print("🌐 網頁: http://0.0.0.0:7860") |
|
|
print("📱 API: http://0.0.0.0:7860/api/transcribe") |
|
|
print("="*60 + "\n") |
|
|
import uvicorn |
|
|
uvicorn.run(app, host="0.0.0.0", port=7860) |