MichaelChou0806's picture
Update app.py
b1c58b3 verified
raw
history blame
12 kB
import os, shutil, base64, uuid, mimetypes, json, time
from pydub import AudioSegment
from openai import OpenAI
import gradio as gr
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
from fastapi.middleware.cors import CORSMiddleware
# ====== 基本設定 ======
PASSWORD = os.getenv("APP_PASSWORD", "chou")
MAX_SIZE = 25 * 1024 * 1024
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
print("===== 🚀 啟動中 =====")
print(f"APP_PASSWORD: {'✅ 已載入' if PASSWORD else '❌ 未載入'}")
# ====== 工具函數 ======
MIME_EXT = {
"audio/mp4": "m4a", "audio/m4a": "m4a", "audio/aac": "aac",
"audio/mpeg": "mp3", "audio/wav": "wav", "audio/x-wav": "wav",
"audio/ogg": "ogg", "audio/webm": "webm", "audio/opus": "opus",
"video/mp4": "mp4",
}
def _dataurl_to_file(data_url: str, orig_name: str | None = None) -> str:
"""將 data URL 轉換為本地檔案"""
try:
header, b64 = data_url.split(",", 1)
except ValueError:
raise ValueError("Invalid data URL format")
mime = header.split(";")[0].split(":", 1)[-1].strip()
ext = MIME_EXT.get(mime) or (mimetypes.guess_extension(mime) or "m4a").lstrip(".")
fname = orig_name if (orig_name and "." in orig_name) else f"upload_{uuid.uuid4().hex}.{ext}"
with open(fname, "wb") as f:
f.write(base64.b64decode(b64))
return fname
def _extract_effective_path(file_obj) -> str:
"""從各種格式中提取有效檔案路徑"""
print(f"[DEBUG] 檔案物件類型: {type(file_obj)}")
print(f"[DEBUG] 檔案物件內容: {file_obj}")
# 處理 None
if file_obj is None:
raise FileNotFoundError("File object is None")
# 如果是字串路徑
if isinstance(file_obj, str):
s = file_obj.strip().strip('"')
print(f"[DEBUG] 字串路徑: {s}")
if s.startswith("data:"):
return _dataurl_to_file(s, None)
if os.path.isfile(s):
return s
# 如果是字典
if isinstance(file_obj, dict):
print(f"[DEBUG] 字典 keys: {list(file_obj.keys())}")
# 嘗試 data URL
data = file_obj.get("data")
if isinstance(data, str) and data.startswith("data:"):
return _dataurl_to_file(data, file_obj.get("orig_name"))
# 嘗試 path
for key in ["path", "name", "file", "filepath"]:
p = file_obj.get(key)
if p and isinstance(p, str):
p = p.strip().strip('"')
if os.path.isfile(p):
print(f"[DEBUG] 找到有效路徑 (key={key}): {p}")
return p
# 如果是物件,嘗試獲取屬性
for attr in ["name", "path", "file", "filepath"]:
if hasattr(file_obj, attr):
p = getattr(file_obj, attr, None)
if p and isinstance(p, str):
p = p.strip().strip('"')
if os.path.isfile(p):
print(f"[DEBUG] 找到有效路徑 (attr={attr}): {p}")
return p
# 最後嘗試:直接當作路徑字串
try:
path_str = str(file_obj).strip().strip('"')
if os.path.isfile(path_str):
print(f"[DEBUG] 直接轉換為路徑: {path_str}")
return path_str
except:
pass
raise FileNotFoundError(f"Cannot parse uploaded file: {type(file_obj)} - {file_obj}")
def split_audio(path):
"""將音訊檔案分割成多個小於 25MB 的片段"""
size = os.path.getsize(path)
if size <= MAX_SIZE:
return [path]
audio = AudioSegment.from_file(path)
n = int(size / MAX_SIZE) + 1
chunk_ms = len(audio) / n
parts = []
for i in range(n):
fn = f"chunk_{i+1}.wav"
audio[int(i*chunk_ms):int((i+1)*chunk_ms)].export(fn, format="wav")
parts.append(fn)
return parts
def transcribe_core(path, model="whisper-1"):
"""使用 Whisper 進行語音轉錄,並使用 GPT 進行繁簡轉換和摘要"""
print(f"\n{'='*60}")
print(f"[transcribe_core] 開始轉錄: {path}")
print(f"{'='*60}")
start_time = time.time()
# 處理 MP4 格式
if path.lower().endswith(".mp4"):
fixed = path[:-4] + ".m4a"
try:
shutil.copy(path, fixed)
path = fixed
except:
pass
# 分割並轉錄
chunks = split_audio(path)
raw = []
for i, c in enumerate(chunks, 1):
print(f"[transcribe_core] 轉錄片段 {i}/{len(chunks)}")
with open(c, "rb") as af:
txt = client.audio.transcriptions.create(
model=model, file=af, response_format="text"
)
raw.append(txt)
raw_txt = "\n".join(raw)
# 簡轉繁
conv = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role":"system","content":"你是嚴格的繁體中文轉換器"},
{"role":"user","content":f"將以下內容轉為台灣繁體,不意譯:\n{raw_txt}"}
],
temperature=0.0
)
trad = conv.choices[0].message.content.strip()
# AI 摘要
summ = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role":"system","content":"你是繁體摘要助手"},
{"role":"user","content":f"請用台灣繁體中文摘要;內容多則條列重點,內容短則一句話:\n{trad}"}
],
temperature=0.2
)
summary = summ.choices[0].message.content.strip()
total_time = time.time() - start_time
print(f"[transcribe_core] ✅ 全部完成! 總耗時: {total_time:.1f}秒\n")
return trad, summary
# ====== Gradio UI 函式 ======
def transcribe_web(password, audio_file):
"""網頁版轉錄處理 - 必須返回三個值"""
print(f"\n{'='*60}")
print(f"🌐 [WEB] 收到網頁請求")
print(f"密碼: {'已提供' if password else '未提供'}")
print(f"檔案: {audio_file}")
print(f"{'='*60}")
# 驗證密碼
if not password:
print("[WEB] ❌ 密碼為空")
return "❌ Please enter password", "", ""
if password.strip() != PASSWORD:
print(f"[WEB] ❌ 密碼錯誤: '{password}' != '{PASSWORD}'")
return "❌ Incorrect password", "", ""
# 檢查檔案
if not audio_file:
print("[WEB] ❌ 未上傳檔案")
return "⚠️ Please upload an audio file", "", ""
try:
# 處理檔案
print(f"[WEB] 開始處理檔案...")
path = _extract_effective_path(audio_file)
print(f"[WEB] ✅ 檔案路徑: {path}")
# 轉錄
print(f"[WEB] 開始轉錄...")
text, summary = transcribe_core(path)
# 統計資訊
char_count = len(text)
status = f"✅ Completed! ({char_count} characters)"
print(f"[WEB] ✅ 轉錄成功\n")
return status, text, summary
except Exception as e:
import traceback
error_msg = traceback.format_exc()
print(f"❌ [WEB] 發生錯誤:\n{error_msg}\n")
return f"❌ Error: {str(e)}", "", ""
# ====== FastAPI 應用 ======
fastapi_app = FastAPI()
fastapi_app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@fastapi_app.post("/api/transcribe")
async def api_transcribe(request: Request):
"""API 端點 - 用於手機等外部調用"""
try:
body = await request.json()
print(f"\n{'='*60}")
print(f"📱 [API] 收到 API 請求")
print(f"{'='*60}")
# 驗證密碼
password = body.get("password", "")
if password.strip() != PASSWORD:
print(f"[API] ❌ 密碼錯誤")
return JSONResponse(
status_code=401,
content={"status": "error", "error": "Password incorrect"}
)
# 檢查檔案資料
file_data = body.get("file_data", "")
file_name = body.get("file_name", "recording.m4a")
if not file_data or not file_data.startswith("data:"):
print(f"[API] ❌ 檔案格式錯誤")
return JSONResponse(
status_code=400,
content={"status": "error", "error": "Invalid file data format"}
)
# 處理檔案
file_dict = {"data": file_data, "orig_name": file_name}
path = _extract_effective_path(file_dict)
print(f"[API] ✅ 檔案解析成功: {path}")
# 轉錄
text, summary = transcribe_core(path)
result = {
"status": "success",
"transcription": text,
"summary": summary
}
print(f"[API] ✅ 轉錄成功\n")
return JSONResponse(content=result)
except Exception as e:
import traceback
error_trace = traceback.format_exc()
print(f"❌ [API] 發生錯誤:\n{error_trace}\n")
return JSONResponse(
status_code=500,
content={"status": "error", "error": str(e)}
)
# ====== Gradio 介面 ======
with gr.Blocks(title="Audio Transcription", theme=gr.themes.Soft()) as demo:
gr.Markdown("""
# 🎧 Audio Transcription Service
### AI-Powered Speech-to-Text with Summarization
""")
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### 📤 Upload")
password_input = gr.Textbox(
label="Password",
type="password",
placeholder="Enter password"
)
audio_input = gr.File(
label="Audio File",
file_types=["audio", ".mp3", ".m4a", ".wav", ".ogg", ".webm", ".mp4"],
file_count="single"
)
submit_btn = gr.Button(
"🚀 Start Transcription",
variant="primary",
size="lg"
)
gr.Markdown("""
**Supported formats:**
MP3, M4A, WAV, OGG, WEBM, MP4
**Processing:**
Automatic chunking for large files
""")
with gr.Column(scale=2):
gr.Markdown("### 📊 Results")
status_output = gr.Textbox(
label="Status",
interactive=False,
lines=1
)
transcription_output = gr.Textbox(
label="Transcription",
lines=12,
show_copy_button=True
)
summary_output = gr.Textbox(
label="Summary",
lines=6,
show_copy_button=True
)
gr.Markdown("---")
gr.Markdown("""
## 📱 API Integration
**Endpoint:** `POST /api/transcribe`
**Request:**
```json
{
"password": "your_password",
"file_data": "data:audio/m4a;base64,...",
"file_name": "recording.m4a"
}
```
**Response:**
```json
{
"status": "success",
"transcription": "...",
"summary": "..."
}
```
""")
# 事件綁定 - 這是關鍵!
submit_btn.click(
fn=transcribe_web,
inputs=[password_input, audio_input],
outputs=[status_output, transcription_output, summary_output],
api_name="transcribe"
)
# ====== 掛載到 FastAPI ======
app = gr.mount_gradio_app(fastapi_app, demo, path="/")
# ====== 啟動 ======
if __name__ == "__main__":
print("\n" + "="*60)
print("🚀 服務啟動")
print("🌐 網頁: http://0.0.0.0:7860")
print("📱 API: http://0.0.0.0:7860/api/transcribe")
print("="*60 + "\n")
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)