MichaelChou0806 commited on
Commit
fdb606f
·
verified ·
1 Parent(s): e5b86b0

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +122 -148
app.py CHANGED
@@ -1,221 +1,195 @@
1
  import os
2
  import time
3
- import uuid
4
- from datetime import timedelta
5
  from pydub import AudioSegment
6
  from openai import OpenAI
7
  import gradio as gr
8
 
9
  # ========================
10
- # 🔐 環境變數設定
11
  # ========================
12
  PASSWORD = os.getenv("APP_PASSWORD", "defaultpass")
13
- MAX_SIZE = 25 * 1024 * 1024 # 25 MB 限制
14
  client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
 
 
15
 
16
  # ========================
17
- # ⚔️ 防暴力破解機制
18
  # ========================
19
- MAX_FAILED_IN_WINDOW = 10 # 24小時內最多10次
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
20
  WINDOW_SECONDS = 24 * 3600
21
- LOCK_DURATION_SECONDS = 24 * 3600 # 鎖24小時
22
- SHORT_BURST_LIMIT = 5 # 一分鐘內最多5次
23
  SHORT_BURST_SECONDS = 60
24
- attempts = {} # {session_id: [timestamps]}
25
- locked = {} # {session_id: unlock_time}
26
 
 
 
27
 
28
- def _now():
29
- return int(time.time())
30
 
31
-
32
- def prune_old_attempts(session_id):
33
- if session_id not in attempts:
34
- return
35
  cutoff = _now() - WINDOW_SECONDS
36
- attempts[session_id] = [t for t in attempts[session_id] if t >= cutoff]
37
- if not attempts[session_id]:
38
- del attempts[session_id]
39
-
40
-
41
- def check_lock(session_id):
42
- if session_id in locked:
43
- unlock = locked[session_id]
44
- if _now() < unlock:
45
- remain = unlock - _now()
46
  return True, f"🔒 已被鎖定,請 {remain // 60} 分鐘後再試。"
47
  else:
48
- del locked[session_id]
49
- attempts.pop(session_id, None)
50
- prune_old_attempts(session_id)
51
- cnt = len(attempts.get(session_id, []))
52
  if cnt >= MAX_FAILED_IN_WINDOW:
53
- locked[session_id] = _now() + LOCK_DURATION_SECONDS
 
54
  return True, f"🔒 嘗試過多,已鎖定 24 小時。"
55
  return False, ""
56
 
57
-
58
- def record_failed_attempt(session_id):
59
  now = _now()
60
- attempts.setdefault(session_id, []).append(now)
61
- prune_old_attempts(session_id)
62
- cnt = len(attempts.get(session_id, []))
63
  recent_cutoff = now - SHORT_BURST_SECONDS
64
- recent = [t for t in attempts[session_id] if t >= recent_cutoff]
65
  if len(recent) >= SHORT_BURST_LIMIT:
66
- locked[session_id] = now + 300 # 鎖5分鐘
67
- return cnt, "⚠️ 多次快速嘗試,暫時鎖定5分鐘。"
68
- return cnt, ""
69
-
70
 
71
- def clear_attempts(session_id):
72
- attempts.pop(session_id, None)
73
- locked.pop(session_id, None)
74
 
75
  # ========================
76
- # 🔊 音訊分割
77
  # ========================
78
- def split_audio_if_needed(input_path):
79
- size = os.path.getsize(input_path)
80
  if size <= MAX_SIZE:
81
- return [input_path]
82
- audio = AudioSegment.from_file(input_path)
83
- num_chunks = int(size / MAX_SIZE) + 1
84
- chunk_length = len(audio) / num_chunks
85
- chunk_files = []
86
- for i in range(num_chunks):
87
- start = int(i * chunk_length)
88
- end = int((i + 1) * chunk_length)
89
  chunk = audio[start:end]
90
- chunk_filename = f"chunk_{i+1}.wav"
91
- chunk.export(chunk_filename, format="wav")
92
- chunk_files.append(chunk_filename)
93
- return chunk_files
94
-
95
- # ========================
96
- # 🎧 轉錄與摘要
97
- # ========================
98
- def transcribe_core(file, model_choice):
99
- chunks = split_audio_if_needed(file)
100
- transcripts = []
101
- for idx, f in enumerate(chunks, 1):
102
- with open(f, "rb") as audio_file:
103
- text = client.audio.transcriptions.create(
104
- model=model_choice,
105
- file=audio_file,
106
- response_format="text"
107
- )
108
- transcripts.append(text)
109
- full_text = "\n".join(transcripts)
110
- response = client.chat.completions.create(
111
  model="gpt-4o-mini",
112
- messages=[
113
- {"role": "system", "content": "你是一位精準且擅長摘要的助手。"},
114
- {"role": "user", "content": "請用繁體中文摘要以下內容:\n" + full_text}
115
- ],
116
  temperature=0.4,
117
  )
118
- summary = response.choices[0].message.content.strip()
119
- return full_text, summary
120
-
121
 
 
 
 
122
  def transcribe_with_password(session_id, password, file, model_choice):
123
  locked_flag, msg = check_lock(session_id)
124
  if locked_flag:
125
- return msg, "", "", ""
126
  if password != PASSWORD:
127
  cnt, msg2 = record_failed_attempt(session_id)
128
- if msg2:
129
- return msg2, "", "", ""
130
- return f"密碼錯誤(第 {cnt} 次)", "", "", ""
131
  if not file:
132
- return "請上傳音訊檔。", "", "", ""
133
  clear_attempts(session_id)
134
- full_text, summary = transcribe_core(file, model_choice)
135
- return "✅ 成功轉錄與摘要完成", full_text, summary, ""
136
 
137
- # ========================
138
- # 💬 進一步問 AI
139
- # ========================
140
- def ask_about_transcript(full_text, user_question):
141
  if not full_text.strip():
142
- return "⚠️ 尚未有轉錄內容。"
143
- if not user_question.strip():
144
- return "請輸入問題。"
145
- prompt = f"以下是轉錄內容:\n{full_text}\n\n使用者問:{user_question}\n請用繁體中文回答。"
146
- response = client.chat.completions.create(
147
- model="gpt-4o-mini",
148
- messages=[{"role": "user", "content": prompt}],
149
- temperature=0.6,
150
- )
151
- return response.choices[0].message.content.strip()
152
 
153
  # ========================
154
  # 🌐 Gradio 介面
155
  # ========================
156
  with gr.Blocks(theme=gr.themes.Soft()) as demo:
157
- gr.Markdown("## 🎧 語音轉錄與摘要工具(含防暴力破解)")
158
-
159
- session_id_box = gr.Textbox(value="", visible=False)
160
- init_session_js = """
161
- () => {
162
- let sid = localStorage.getItem('my_session_id');
163
- if (!sid) {
164
- sid = crypto.randomUUID ? crypto.randomUUID() :
165
- (Date.now().toString(36) + Math.random().toString(36).slice(2));
166
- localStorage.setItem('my_session_id', sid);
167
- }
168
- return sid;
169
- }
170
- """
171
- session_id_box.load(_js=init_session_js)
172
 
173
  with gr.Row():
174
  password_input = gr.Textbox(label="輸入密碼", type="password")
175
- model_choice = gr.Dropdown(
176
- choices=["whisper-1", "gpt-4o-mini-transcribe"],
177
- value="whisper-1",
178
- label="選擇轉錄模型"
179
- )
180
 
181
  audio_input = gr.Audio(type="filepath", label="上傳音訊 (.m4a, .aac, .wav)")
182
  transcribe_btn = gr.Button("開始轉錄與摘要 🚀")
183
-
184
  status_box = gr.Textbox(label="狀態", interactive=False)
185
- with gr.Row():
186
- transcript_box = gr.Textbox(label="完整轉錄文字", lines=10)
187
- copy_transcript = gr.Button("📋 複製")
188
-
189
- with gr.Row():
190
- summary_box = gr.Textbox(label="摘要結果", lines=10)
191
- copy_summary = gr.Button("📋 複製")
192
 
193
  with gr.Accordion("💬 進一步問 AI", open=False):
194
- user_question = gr.Textbox(label="輸入你的問題", lines=2)
195
  ask_btn = gr.Button("詢問 AI 🤔")
196
  ai_reply = gr.Textbox(label="AI 回覆", lines=6)
197
  copy_reply = gr.Button("📋 複製")
198
 
199
- transcribe_btn.click(
200
- fn=transcribe_with_password,
201
- inputs=[session_id_box, password_input, audio_input, model_choice],
202
- outputs=[status_box, transcript_box, summary_box, gr.Textbox(visible=False)]
203
- )
204
 
205
- ask_btn.click(
206
- fn=ask_about_transcript,
207
- inputs=[transcript_box, user_question],
208
- outputs=[ai_reply]
209
  )
 
210
 
211
  copy_js = """
212
  async (text) => {
213
- try {
214
- await navigator.clipboard.writeText(text);
215
- alert(" 已複製到剪貼簿!");
216
- } catch (err) {
217
- alert("❌ 複製失敗:" + err);
218
- }
219
  }
220
  """
221
  copy_transcript.click(None, transcript_box, None, _js=copy_js)
 
1
  import os
2
  import time
3
+ import smtplib
4
+ from email.mime.text import MIMEText
5
  from pydub import AudioSegment
6
  from openai import OpenAI
7
  import gradio as gr
8
 
9
  # ========================
10
+ # 🔐 設定區
11
  # ========================
12
  PASSWORD = os.getenv("APP_PASSWORD", "defaultpass")
13
+ MAX_SIZE = 25 * 1024 * 1024
14
  client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
15
+ ALERT_EMAIL = os.getenv("ALERT_EMAIL")
16
+ ALERT_PASS = os.getenv("ALERT_PASS")
17
 
18
  # ========================
19
+ # 📧 寄信通知
20
  # ========================
21
+ def send_alert_email(session_id, reason):
22
+ if not ALERT_EMAIL or not ALERT_PASS:
23
+ return
24
+ msg = MIMEText(f"Session {session_id} 已被鎖定。\n原因:{reason}\n時間:{time.ctime()}", "plain", "utf-8")
25
+ msg["Subject"] = "⚠️ 語音轉錄系統警報"
26
+ msg["From"] = ALERT_EMAIL
27
+ msg["To"] = ALERT_EMAIL
28
+ try:
29
+ with smtplib.SMTP_SSL("smtp.gmail.com", 465) as server:
30
+ server.login(ALERT_EMAIL, ALERT_PASS)
31
+ server.sendmail(ALERT_EMAIL, ALERT_EMAIL, msg.as_string())
32
+ print(f"✅ 已寄出警報郵件至 {ALERT_EMAIL}")
33
+ except Exception as e:
34
+ print(f"❌ 寄信失敗:{e}")
35
+
36
+ # ========================
37
+ # ⚔️ 防暴力破解
38
+ # ========================
39
+ MAX_FAILED_IN_WINDOW = 10
40
  WINDOW_SECONDS = 24 * 3600
41
+ LOCK_DURATION_SECONDS = 24 * 3600
42
+ SHORT_BURST_LIMIT = 5
43
  SHORT_BURST_SECONDS = 60
 
 
44
 
45
+ attempts = {}
46
+ locked = {}
47
 
48
+ def _now(): return int(time.time())
 
49
 
50
+ def prune_old_attempts(sid):
 
 
 
51
  cutoff = _now() - WINDOW_SECONDS
52
+ if sid in attempts:
53
+ attempts[sid] = [t for t in attempts[sid] if t >= cutoff]
54
+ if not attempts[sid]:
55
+ del attempts[sid]
56
+
57
+ def check_lock(sid):
58
+ if sid in locked:
59
+ if _now() < locked[sid]:
60
+ remain = locked[sid] - _now()
 
61
  return True, f"🔒 已被鎖定,請 {remain // 60} 分鐘後再試。"
62
  else:
63
+ locked.pop(sid, None)
64
+ attempts.pop(sid, None)
65
+ prune_old_attempts(sid)
66
+ cnt = len(attempts.get(sid, []))
67
  if cnt >= MAX_FAILED_IN_WINDOW:
68
+ locked[sid] = _now() + LOCK_DURATION_SECONDS
69
+ send_alert_email(sid, f"密碼錯誤 {cnt} 次,鎖定24小時")
70
  return True, f"🔒 嘗試過多,已鎖定 24 小時。"
71
  return False, ""
72
 
73
+ def record_failed_attempt(sid):
 
74
  now = _now()
75
+ attempts.setdefault(sid, []).append(now)
76
+ prune_old_attempts(sid)
 
77
  recent_cutoff = now - SHORT_BURST_SECONDS
78
+ recent = [t for t in attempts[sid] if t >= recent_cutoff]
79
  if len(recent) >= SHORT_BURST_LIMIT:
80
+ locked[sid] = now + 300
81
+ send_alert_email(sid, "短時間內多次錯誤,鎖定5分鐘")
82
+ return len(attempts[sid]), "⚠️ 多次快速嘗試,暫時鎖定5分鐘。"
83
+ return len(attempts[sid]), ""
84
 
85
+ def clear_attempts(sid):
86
+ attempts.pop(sid, None)
87
+ locked.pop(sid, None)
88
 
89
  # ========================
90
+ # 🎧 音訊轉錄
91
  # ========================
92
+ def split_audio_if_needed(path):
93
+ size = os.path.getsize(path)
94
  if size <= MAX_SIZE:
95
+ return [path]
96
+ audio = AudioSegment.from_file(path)
97
+ num = int(size / MAX_SIZE) + 1
98
+ chunk_ms = len(audio) / num
99
+ files = []
100
+ for i in range(num):
101
+ start, end = int(i * chunk_ms), int((i + 1) * chunk_ms)
 
102
  chunk = audio[start:end]
103
+ fn = f"chunk_{i+1}.wav"
104
+ chunk.export(fn, format="wav")
105
+ files.append(fn)
106
+ return files
107
+
108
+ def transcribe_core(path, model):
109
+ chunks = split_audio_if_needed(path)
110
+ txts = []
111
+ for f in chunks:
112
+ with open(f, "rb") as af:
113
+ res = client.audio.transcriptions.create(model=model, file=af, response_format="text")
114
+ txts.append(res)
115
+ full = "\n".join(txts)
116
+ res = client.chat.completions.create(
 
 
 
 
 
 
 
117
  model="gpt-4o-mini",
118
+ messages=[{"role":"user","content":f"請用繁體中文摘要以下內容:\n{full}"}],
 
 
 
119
  temperature=0.4,
120
  )
121
+ summ = res.choices[0].message.content.strip()
122
+ return full, summ
 
123
 
124
+ # ========================
125
+ # 💬 主流程
126
+ # ========================
127
  def transcribe_with_password(session_id, password, file, model_choice):
128
  locked_flag, msg = check_lock(session_id)
129
  if locked_flag:
130
+ return msg, "", ""
131
  if password != PASSWORD:
132
  cnt, msg2 = record_failed_attempt(session_id)
133
+ return msg2 or f"密碼錯誤(第 {cnt} 次)", "", ""
 
 
134
  if not file:
135
+ return "請上傳音訊檔。", "", ""
136
  clear_attempts(session_id)
137
+ full, summ = transcribe_core(file, model_choice)
138
+ return "✅ 轉錄完成", full, summ
139
 
140
+ def ask_about_transcript(full_text, q):
 
 
 
141
  if not full_text.strip():
142
+ return "⚠️ 尚未有轉錄內容"
143
+ if not q.strip():
144
+ return "請輸入問題"
145
+ prompt = f"以下是轉錄內容:\n{full_text}\n\n問題:{q}\n請用繁體中文回答。"
146
+ res = client.chat.completions.create(
147
+ model="gpt-4o-mini", messages=[{"role":"user","content":prompt}], temperature=0.6)
148
+ return res.choices[0].message.content.strip()
 
 
 
149
 
150
  # ========================
151
  # 🌐 Gradio 介面
152
  # ========================
153
  with gr.Blocks(theme=gr.themes.Soft()) as demo:
154
+ gr.Markdown("## 🎧 語音轉錄與摘要工具(防暴力破解+郵件警報)")
155
+
156
+ session_state = gr.State(value=None)
 
 
 
 
 
 
 
 
 
 
 
 
157
 
158
  with gr.Row():
159
  password_input = gr.Textbox(label="輸入密碼", type="password")
160
+ model_choice = gr.Dropdown(["whisper-1", "gpt-4o-mini-transcribe"], value="whisper-1", label="選擇模型")
 
 
 
 
161
 
162
  audio_input = gr.Audio(type="filepath", label="上傳音訊 (.m4a, .aac, .wav)")
163
  transcribe_btn = gr.Button("開始轉錄與摘要 🚀")
 
164
  status_box = gr.Textbox(label="狀態", interactive=False)
165
+ transcript_box = gr.Textbox(label="完整轉錄文字", lines=10)
166
+ copy_transcript = gr.Button("📋 複製")
167
+ summary_box = gr.Textbox(label="摘要結果", lines=10)
168
+ copy_summary = gr.Button("📋 複製")
 
 
 
169
 
170
  with gr.Accordion("💬 進一步問 AI", open=False):
171
+ user_q = gr.Textbox(label="輸入問題", lines=2)
172
  ask_btn = gr.Button("詢問 AI 🤔")
173
  ai_reply = gr.Textbox(label="AI 回覆", lines=6)
174
  copy_reply = gr.Button("📋 複製")
175
 
176
+ def init_session():
177
+ import uuid
178
+ return str(uuid.uuid4())
179
+ demo.load(init_session, None, session_state)
 
180
 
181
+ transcribe_btn.click(
182
+ transcribe_with_password,
183
+ [session_state, password_input, audio_input, model_choice],
184
+ [status_box, transcript_box, summary_box],
185
  )
186
+ ask_btn.click(ask_about_transcript, [transcript_box, user_q], [ai_reply])
187
 
188
  copy_js = """
189
  async (text) => {
190
+ try { await navigator.clipboard.writeText(text);
191
+ alert("✅ 已複製到剪貼簿!"); }
192
+ catch(e){ alert(" 複製失敗:" + e); }
 
 
 
193
  }
194
  """
195
  copy_transcript.click(None, transcript_box, None, _js=copy_js)