MichaelChou0806's picture
Update app.py
dba472f verified
import os, shutil, base64, uuid, mimetypes, json, time
from pydub import AudioSegment
from openai import OpenAI
import gradio as gr
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
from fastapi.middleware.cors import CORSMiddleware
# ====== 基本設定 ======
PASSWORD = os.getenv("APP_PASSWORD")
if not PASSWORD:
raise ValueError("APP_PASSWORD environment variable is not set!")
MAX_SIZE = 25 * 1024 * 1024
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
print("===== 🚀 啟動中 =====")
print(f"APP_PASSWORD: ✅ 已載入")
# ====== 工具函數 ======
MIME_EXT = {
"audio/mp4": "m4a", "audio/m4a": "m4a", "audio/aac": "aac",
"audio/mpeg": "mp3", "audio/wav": "wav", "audio/x-wav": "wav",
"audio/ogg": "ogg", "audio/webm": "webm", "audio/opus": "opus",
"video/mp4": "mp4",
}
def _dataurl_to_file(data_url: str, orig_name: str | None = None) -> str:
"""將 data URL 轉換為本地檔案"""
try:
header, b64 = data_url.split(",", 1)
except ValueError:
raise ValueError("Invalid data URL format")
mime = header.split(";")[0].split(":", 1)[-1].strip()
ext = MIME_EXT.get(mime) or (mimetypes.guess_extension(mime) or "m4a").lstrip(".")
fname = orig_name if (orig_name and "." in orig_name) else f"upload_{uuid.uuid4().hex}.{ext}"
with open(fname, "wb") as f:
f.write(base64.b64decode(b64))
return fname
def _extract_effective_path(file_obj) -> str:
"""從各種格式中提取有效檔案路徑"""
print(f"[DEBUG] 檔案物件類型: {type(file_obj)}")
print(f"[DEBUG] 檔案物件內容: {file_obj}")
if file_obj is None:
raise FileNotFoundError("File object is None")
# 字串路徑
if isinstance(file_obj, str):
s = file_obj.strip().strip('"')
print(f"[DEBUG] 字串路徑: {s}")
if s.startswith("data:"):
return _dataurl_to_file(s, None)
if os.path.isfile(s):
return s
# 字典格式
if isinstance(file_obj, dict):
print(f"[DEBUG] 字典 keys: {list(file_obj.keys())}")
# data URL
data = file_obj.get("data")
if isinstance(data, str) and data.startswith("data:"):
return _dataurl_to_file(data, file_obj.get("orig_name"))
# 路徑
for key in ["path", "name", "file", "filepath"]:
p = file_obj.get(key)
if p and isinstance(p, str):
p = p.strip().strip('"')
if os.path.isfile(p):
print(f"[DEBUG] 找到路徑 (key={key}): {p}")
return p
# 物件屬性
for attr in ["name", "path", "file", "filepath"]:
if hasattr(file_obj, attr):
p = getattr(file_obj, attr, None)
if p and isinstance(p, str):
p = p.strip().strip('"')
if os.path.isfile(p):
print(f"[DEBUG] 找到路徑 (attr={attr}): {p}")
return p
# 直接轉換
try:
path_str = str(file_obj).strip().strip('"')
if os.path.isfile(path_str):
print(f"[DEBUG] 直接路徑: {path_str}")
return path_str
except:
pass
raise FileNotFoundError(f"Cannot parse file: {type(file_obj)} - {file_obj}")
def split_audio(path):
"""將音訊檔案分割成多個小於 25MB 的片段"""
size = os.path.getsize(path)
if size <= MAX_SIZE:
return [path]
audio = AudioSegment.from_file(path)
n = int(size / MAX_SIZE) + 1
chunk_ms = len(audio) / n
parts = []
for i in range(n):
fn = f"chunk_{i+1}.wav"
audio[int(i*chunk_ms):int((i+1)*chunk_ms)].export(fn, format="wav")
parts.append(fn)
return parts
def transcribe_core(path, model="whisper-1"):
"""使用 Whisper 進行語音轉錄,並使用 GPT 進行繁簡轉換和摘要"""
print(f"\n{'='*60}")
print(f"[transcribe_core] 開始轉錄: {path}")
print(f"{'='*60}")
start_time = time.time()
# 處理 MP4
if path.lower().endswith(".mp4"):
fixed = path[:-4] + ".m4a"
try:
shutil.copy(path, fixed)
path = fixed
except:
pass
# 分割並轉錄
chunks = split_audio(path)
raw = []
for i, c in enumerate(chunks, 1):
print(f"[transcribe_core] 轉錄片段 {i}/{len(chunks)}")
with open(c, "rb") as af:
txt = client.audio.transcriptions.create(
model=model, file=af, response_format="text"
)
raw.append(txt)
raw_txt = "\n".join(raw)
# 簡轉繁
conv = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role":"system","content":"你是嚴格的繁體中文轉換器"},
{"role":"user","content":f"將以下內容轉為台灣繁體,不意譯:\n{raw_txt}"}
],
temperature=0.0
)
trad = conv.choices[0].message.content.strip()
# AI 摘要
summ = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role":"system","content":"你是繁體摘要助手"},
{"role":"user","content":f"請用台灣繁體中文摘要;內容多則條列重點,內容短則一句話:\n{trad}"}
],
temperature=0.2
)
summary = summ.choices[0].message.content.strip()
total_time = time.time() - start_time
print(f"[transcribe_core] ✅ 完成! 耗時: {total_time:.1f}秒\n")
return trad, summary
# ====== Gradio UI 函式 ======
def transcribe_web(password, audio_file):
"""網頁版轉錄處理"""
print(f"\n{'='*60}")
print(f"🌐 [WEB] 收到請求")
print(f"密碼: {'已提供' if password else '未提供'}")
print(f"檔案: {audio_file}")
print(f"{'='*60}")
# 驗證密碼
if not password:
print("[WEB] ❌ 密碼為空")
return "", "❌ Please enter password", "", ""
if password.strip() != PASSWORD:
print(f"[WEB] ❌ 密碼錯誤")
return "", "❌ Incorrect password", "", ""
# 檢查檔案
if not audio_file:
print("[WEB] ❌ 未上傳檔案")
return "", "⚠️ Please upload audio file", "", ""
try:
# 處理檔案
print(f"[WEB] 處理檔案...")
path = _extract_effective_path(audio_file)
print(f"[WEB] ✅ 檔案: {path}")
# 顯示檔案名稱
file_name = os.path.basename(path)
# 轉錄
print(f"[WEB] 開始轉錄...")
text, summary = transcribe_core(path)
# 統計
char_count = len(text)
status = f"✅ Completed! ({char_count} chars)"
print(f"[WEB] ✅ 成功\n")
return file_name, status, text, summary
except Exception as e:
import traceback
error_msg = traceback.format_exc()
print(f"❌ [WEB] 錯誤:\n{error_msg}\n")
return "", f"❌ Error: {str(e)}", "", ""
# ====== FastAPI 應用 ======
fastapi_app = FastAPI()
fastapi_app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@fastapi_app.post("/api/transcribe")
async def api_transcribe(request: Request):
"""API 端點 - 用於手機等外部調用"""
try:
body = await request.json()
print(f"\n{'='*60}")
print(f"📱 [API] 收到請求")
print(f"{'='*60}")
# 驗證密碼
password = body.get("password", "")
if password.strip() != PASSWORD:
print(f"[API] ❌ 密碼錯誤")
return JSONResponse(
status_code=401,
content={"status": "error", "error": "Password incorrect"}
)
# 檢查檔案
file_data = body.get("file_data", "")
file_name = body.get("file_name", "recording.m4a")
if not file_data or not file_data.startswith("data:"):
print(f"[API] ❌ 檔案格式錯誤")
return JSONResponse(
status_code=400,
content={"status": "error", "error": "Invalid file format"}
)
# 處理檔案
file_dict = {"data": file_data, "orig_name": file_name}
path = _extract_effective_path(file_dict)
print(f"[API] ✅ 檔案: {path}")
# 轉錄
text, summary = transcribe_core(path)
result = {
"status": "success",
"transcription": text,
"summary": summary
}
print(f"[API] ✅ 成功\n")
return JSONResponse(content=result)
except Exception as e:
import traceback
error_trace = traceback.format_exc()
print(f"❌ [API] 錯誤:\n{error_trace}\n")
return JSONResponse(
status_code=500,
content={"status": "error", "error": str(e)}
)
# ====== 手機優化 CSS ======
mobile_css = """
/* 基本響應式 */
.gradio-container {
max-width: 100% !important;
padding: 0.5rem !important;
}
/* 手機優化 */
@media (max-width: 768px) {
.gradio-container {
padding: 0.25rem !important;
}
/* 標題縮小 */
h1 {
font-size: 1.5rem !important;
}
h3 {
font-size: 1.1rem !important;
}
/* 按鈕加大點擊區域 */
button {
min-height: 44px !important;
font-size: 1rem !important;
}
/* 輸入框 */
input, textarea {
font-size: 16px !important; /* 防止手機自動縮放 */
}
/* 行布局改為列布局 */
.row {
flex-direction: column !important;
}
.column {
width: 100% !important;
max-width: 100% !important;
}
}
/* 確保文字可選取和複製 */
textarea {
user-select: text !important;
-webkit-user-select: text !important;
}
"""
# ====== Gradio 介面 ======
with gr.Blocks(
title="Audio Transcription",
theme=gr.themes.Soft(),
css=mobile_css
) as demo:
gr.Markdown("# 🎧 Audio Transcription")
gr.Markdown("AI-Powered Speech-to-Text")
# 密碼輸入
password_input = gr.Textbox(
label="Password",
type="password",
placeholder="Enter password"
)
# 檔案上傳 - 使用最基本的 UploadButton
audio_input = gr.UploadButton(
label="📁 Choose Audio File",
file_types=["audio/*", ".mp3", ".m4a", ".wav", ".ogg", ".webm", ".mp4"],
file_count="single"
)
# 顯示已選擇的檔案
file_display = gr.Textbox(
label="Selected File",
interactive=False,
placeholder="No file selected"
)
# 提交按鈕
submit_btn = gr.Button(
"🚀 Start Transcription",
variant="primary",
size="lg"
)
# 狀態顯示
status_output = gr.Textbox(
label="Status",
interactive=False
)
# 轉錄結果
transcription_output = gr.Textbox(
label="Transcription",
lines=10,
max_lines=20
)
# 摘要
summary_output = gr.Textbox(
label="Summary",
lines=5,
max_lines=10
)
gr.Markdown("---")
gr.Markdown("""
### 📱 API Integration
**Endpoint:** `POST /api/transcribe`
**Request:**
```json
{
"password": "your_password",
"file_data": "data:audio/m4a;base64,...",
"file_name": "recording.m4a"
}
```
**Response:**
```json
{
"status": "success",
"transcription": "...",
"summary": "..."
}
```
""")
# 事件綁定 - 只在點擊提交按鈕時處理
submit_btn.click(
fn=transcribe_web,
inputs=[password_input, audio_input],
outputs=[file_display, status_output, transcription_output, summary_output]
)
# ====== 掛載到 FastAPI ======
app = gr.mount_gradio_app(fastapi_app, demo, path="/")
# ====== 啟動 ======
if __name__ == "__main__":
print("\n" + "="*60)
print("🚀 服務啟動")
print("🌐 網頁: http://0.0.0.0:7860")
print("📱 API: http://0.0.0.0:7860/api/transcribe")
print("="*60 + "\n")
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)