MichaelChou0806's picture
Update app.py
e7bb2ea verified
raw
history blame
8.26 kB
import os
import time
import shutil
from fastapi import FastAPI, File, UploadFile
from pydub import AudioSegment
from openai import OpenAI
import gradio as gr
# ========================
# 🔐 設定區
# ========================
PASSWORD = os.getenv("APP_PASSWORD", "defaultpass")
MAX_SIZE = 25 * 1024 * 1024
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
# ========================
# ⚔️ 防暴力破解
# ========================
MAX_FAILED_IN_WINDOW = 10
WINDOW_SECONDS = 24 * 3600
LOCK_DURATION_SECONDS = 24 * 3600
SHORT_BURST_LIMIT = 5
SHORT_BURST_SECONDS = 60
attempts = {}
locked = {}
def _now(): return int(time.time())
def prune_old_attempts(sid):
cutoff = _now() - WINDOW_SECONDS
if sid in attempts:
attempts[sid] = [t for t in attempts[sid] if t >= cutoff]
if not attempts[sid]:
del attempts[sid]
def check_lock(sid):
if sid in locked:
if _now() < locked[sid]:
remain = locked[sid] - _now()
return True, f"🔒 已被鎖定,請 {remain // 60} 分鐘後再試。"
else:
locked.pop(sid, None)
attempts.pop(sid, None)
prune_old_attempts(sid)
cnt = len(attempts.get(sid, []))
if cnt >= MAX_FAILED_IN_WINDOW:
locked[sid] = _now() + LOCK_DURATION_SECONDS
return True, f"🔒 嘗試過多,已鎖定 24 小時。"
return False, ""
def record_failed_attempt(sid):
now = _now()
attempts.setdefault(sid, []).append(now)
prune_old_attempts(sid)
recent_cutoff = now - SHORT_BURST_SECONDS
recent = [t for t in attempts[sid] if t >= recent_cutoff]
if len(recent) >= SHORT_BURST_LIMIT:
locked[sid] = now + 300
return len(attempts[sid]), "⚠️ 多次快速嘗試,暫時鎖定5分鐘。"
return len(attempts[sid]), ""
def clear_attempts(sid):
attempts.pop(sid, None)
locked.pop(sid, None)
# ========================
# 🎧 音訊轉錄
# ========================
def split_audio_if_needed(path):
size = os.path.getsize(path)
if size <= MAX_SIZE:
return [path]
audio = AudioSegment.from_file(path)
num = int(size / MAX_SIZE) + 1
chunk_ms = len(audio) / num
files = []
for i in range(num):
start, end = int(i * chunk_ms), int((i + 1) * chunk_ms)
chunk = audio[start:end]
fn = f"chunk_{i+1}.wav"
chunk.export(fn, format="wav")
files.append(fn)
return files
def transcribe_core(path, model):
# ✅ iPhone LINE 語音(mp4 audio-only)— 不轉檔,只複製改副檔名
if path and path.lower().endswith(".mp4"):
fixed_path = path[:-4] + ".m4a"
try:
shutil.copy(path, fixed_path)
path = fixed_path
print("🔧 已自動修正 mp4 → m4a")
except Exception as e:
print(f"⚠️ mp4→m4a 複製失敗:{e},改用原檔嘗試")
chunks = split_audio_if_needed(path)
txts = []
for f in chunks:
with open(f, "rb") as af:
res = client.audio.transcriptions.create(
model=model,
file=af,
response_format="text"
)
txts.append(res)
full = "\n".join(txts)
res = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role":"user","content":f"請用繁體中文摘要以下內容:\n{full}"}],
temperature=0.4,
)
summ = res.choices[0].message.content.strip()
return full, summ
# ========================
# 💬 主流程(Gradio)
# ========================
def _normalize_upload_path(file_input):
if not file_input:
return None
if isinstance(file_input, str):
return file_input
if isinstance(file_input, list) and file_input:
return _normalize_upload_path(file_input[0])
path = getattr(file_input, "name", None)
if not path and isinstance(file_input, dict):
path = file_input.get("name") or file_input.get("path")
return path
def transcribe_with_password(session_id, password, file_input, model_choice):
password = password.strip().replace(" ", "").replace("\u200b", "")
locked_flag, msg = check_lock(session_id)
if locked_flag:
return msg, "", ""
if password != PASSWORD:
cnt, msg2 = record_failed_attempt(session_id)
return msg2 or f"密碼錯誤(第 {cnt} 次)", "", ""
path = _normalize_upload_path(file_input)
if not path or not os.path.exists(path):
return "找不到上傳檔案,請重新選擇。", "", ""
clear_attempts(session_id)
full, summ = transcribe_core(path, model_choice)
return "✅ 轉錄完成", full, summ
def ask_about_transcript(full_text, q):
if not full_text.strip():
return "⚠️ 尚未有轉錄內容"
if not q.strip():
return "請輸入問題"
prompt = f"以下是轉錄內容:\n{full_text}\n\n問題:{q}\n請用繁體中文回答。"
res = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role":"user","content":prompt}],
temperature=0.6,
)
return res.choices[0].message.content.strip()
# ========================
# 🌐 FastAPI for捷徑 / API
# ========================
api = FastAPI()
@api.post("/api/transcribe")
async def api_transcribe(file: UploadFile = File(...)):
"""供 iPhone 捷徑上傳音檔"""
temp_path = file.filename
with open(temp_path, "wb") as f:
f.write(await file.read())
text, summary = transcribe_core(temp_path, "whisper-1")
os.remove(temp_path)
return {"text": text, "summary": summary}
# ========================
# 🌐 Gradio介面
# ========================
with gr.Blocks(theme=gr.themes.Soft()) as demo:
gr.Markdown("## 🎧 語音轉錄與摘要工具(私人API勿轉傳|支援 iPhone LINE .mp4)")
session_state = gr.State(value=None)
with gr.Row():
password_input = gr.Textbox(
label="輸入密碼",
placeholder="請輸入英文與數字(請切換成英文輸入法)",
type="password",
max_lines=1
)
model_choice = gr.Dropdown(
["whisper-1", "gpt-4o-mini-transcribe"],
value="whisper-1",
label="選擇模型"
)
file_input = gr.File(
label="上傳音訊 / LINE 語音檔(支援 .m4a, .aac, .wav, .mp4)",
file_count="single",
file_types=["audio", ".mp4", ".m4a", ".aac", ".wav"]
)
transcribe_btn = gr.Button("開始轉錄與摘要 🚀")
status_box = gr.Textbox(label="狀態", interactive=False)
transcript_box = gr.Textbox(label="完整轉錄文字", lines=10)
copy_transcript = gr.Button("📋 複製轉錄文字")
summary_box = gr.Textbox(label="摘要結果", lines=10)
copy_summary = gr.Button("📋 複製摘要結果")
with gr.Accordion("💬 進一步問 AI", open=False):
user_q = gr.Textbox(label="輸入問題", lines=2)
ask_btn = gr.Button("詢問 AI 🤔")
ai_reply = gr.Textbox(label="AI 回覆", lines=6)
copy_reply = gr.Button("📋 複製 AI 回覆")
def init_session():
import uuid
return str(uuid.uuid4())
demo.load(init_session, None, session_state)
transcribe_btn.click(
transcribe_with_password,
[session_state, password_input, file_input, model_choice],
[status_box, transcript_box, summary_box],
)
ask_btn.click(ask_about_transcript, [transcript_box, user_q], [ai_reply])
copy_js = """
async (text) => {
try {
await navigator.clipboard.writeText(text);
alert("✅ 已複製到剪貼簿!");
} catch (e) {
alert("❌ 複製失敗:" + e);
}
}
"""
copy_transcript.click(fn=None, inputs=transcript_box, outputs=None, js=copy_js)
copy_summary.click(fn=None, inputs=summary_box, outputs=None, js=copy_js)
copy_reply.click(fn=None, inputs=ai_reply, outputs=None, js=copy_js)
# ✅ 同時啟動 Gradio 與 FastAPI
import threading
import uvicorn
def run_api():
uvicorn.run(api, host="0.0.0.0", port=7861)
threading.Thread(target=run_api, daemon=True).start()
demo.launch(server_name="0.0.0.0", server_port=7860)