PPT.404 / app.py
DeepLearning101's picture
Create app.py
a4e4968 verified
raw
history blame
14.5 kB
import gradio as gr
import google.generativeai as genai
import os
import json
import pandas as pd
import tempfile
from pdf2image import convert_from_path
from pptx import Presentation
from pptx.util import Inches, Pt
from pptx.dml.color import RGBColor
from huggingface_hub import HfApi, hf_hub_download
from dotenv import load_dotenv
# --- 設定與常數 ---
load_dotenv()
PROF_SAVE_FILE = "saved_professors.json"
COMP_SAVE_FILE = "saved_companies.json"
HF_TOKEN = os.getenv("HF_TOKEN")
DATASET_REPO_ID = os.getenv("DATASET_REPO_ID")
# ==========================================
# 🧠 核心服務層 (The Logic / Chef)
# ==========================================
class UnifiedService:
def __init__(self, api_key_input=None):
self.api_key = self._get_api_key(api_key_input)
if self.api_key:
genai.configure(api_key=self.api_key)
# 使用支援 Google Search 的模型
self.model_name = "gemini-2.0-flash-exp"
def _get_api_key(self, user_key):
if user_key and user_key.strip(): return user_key.strip()
system_key = os.getenv("GEMINI_API_KEY")
if system_key: return system_key
return None # 允許初始化時無 Key,但在使用功能時會噴錯
def _check_key(self):
if not self.api_key: raise ValueError("請先輸入 API Key 或設定系統環境變數")
# --- 1. PDF 轉 PPTX ---
def analyze_pdf_to_pptx(self, pdf_file, progress):
self._check_key()
model = genai.GenerativeModel(self.model_name)
prs = Presentation()
prs.slide_width = Inches(16); prs.slide_height = Inches(9)
progress(0.1, desc="轉檔中...")
images = convert_from_path(pdf_file)
for i, img in enumerate(images):
progress(0.1 + (0.8 * (i / len(images))), desc=f"分析第 {i+1} 頁...")
slide = prs.slides.add_slide(prs.slide_layouts[6])
prompt = "Detect all text blocks. Return JSON: [{'text':..., 'box_2d':[ymin,xmin,ymax,xmax] (0-1000), 'font_size':int, 'is_bold':bool, 'color':hex}]"
try:
response = model.generate_content([prompt, img], generation_config={"response_mime_type": "application/json"})
blocks = json.loads(response.text)
for b in blocks:
box = b.get("box_2d", [0,0,0,0])
left, top = Inches((box[1]/1000)*16), Inches((box[0]/1000)*9)
width, height = Inches(((box[3]-box[1])/1000)*16), Inches(((box[2]-box[0])/1000)*9)
tx = slide.shapes.add_textbox(left, top, width, height)
p = tx.text_frame.paragraphs[0]
p.text = b.get("text",""); p.font.size = Pt(b.get("font_size", 12)); p.font.bold = b.get("is_bold", False)
try: p.font.color.rgb = RGBColor.from_string(b.get("color", "#000000").replace("#",""))
except: pass
except Exception as e: print(f"Page {i} err: {e}")
out = tempfile.mktemp(suffix=".pptx")
prs.save(out)
return out, "✅ 轉換完成"
# --- 2. 圖片去字 ---
def remove_text(self, image):
self._check_key()
model = genai.GenerativeModel(self.model_name)
prompt = "Remove all text from this image, fill background naturally. Return image only."
resp = model.generate_content([prompt, image]) # V1 SDK 通常回傳 multipart,這裡簡化處理
# 注意: Gemini V1 SDK 在 Python 直接回傳 image 比較 tricky,若失敗建議檢查 SDK 版本
# 這裡假設環境支援直接回圖,若否則需用 requests 操作 REST API
try:
return resp.parts[0].image
except:
return image # Fallback
# --- 3. 搜尋 (教授/公司) 共用邏輯 ---
def _search_with_google(self, query, prompt_template):
self._check_key()
# 這裡使用 Google Search Tool 設定
tools = [{"google_search": {}}]
model = genai.GenerativeModel(self.model_name, tools=tools)
# Step 1: Search
resp1 = model.generate_content(prompt_template.format(query=query))
# Step 2: Extract JSON (Pure Text Model)
model_extract = genai.GenerativeModel(self.model_name) # No tools for extraction
extract_prompt = f"Extract structured data from this text into JSON array: {resp1.text}"
resp2 = model_extract.generate_content(extract_prompt, generation_config={"response_mime_type": "application/json"})
try: return json.loads(resp2.text)
except: return []
def search_professors(self, query):
p = "Find 10 prominent professors in Taiwan for '{query}'. Return name, university, department."
return self._search_with_google(query, p)
def search_companies(self, query):
p = "Find 5-10 Taiwanese companies for '{query}'. Return name, industry."
return self._search_with_google(query, p)
def get_details(self, data, role):
self._check_key()
tools = [{"google_search": {}}]
model = genai.GenerativeModel(self.model_name, tools=tools)
prompt = f"Act as {role}. Investigate: {json.dumps(data)}. Report in Traditional Chinese Markdown."
resp = model.generate_content(prompt)
# 處理來源引用 (V1 SDK)
sources = []
if hasattr(resp.candidates[0], 'grounding_metadata'):
chunks = resp.candidates[0].grounding_metadata.grounding_chunks
for c in chunks:
if c.web: sources.append({"title": c.web.title, "uri": c.web.uri})
# 去重
unique_sources = list({v['uri']:v for v in sources}.values())
return {"text": resp.text, "sources": unique_sources}
def chat(self, hist, msg, context, role):
self._check_key()
model = genai.GenerativeModel(self.model_name)
chat = model.start_chat(history=[
{"role": "user" if h[0] else "model", "parts": [h[0] or h[1]]} for h in hist
])
full_msg = f"Context: {context}\nInstruction: {role}\nUser: {msg}"
resp = chat.send_message(full_msg)
return resp.text
# ==========================================
# 💾 資料存取層 (Persistence)
# ==========================================
def load_data(filename):
if HF_TOKEN and DATASET_REPO_ID:
try: hf_hub_download(repo_id=DATASET_REPO_ID, filename=filename, repo_type="dataset", token=HF_TOKEN, local_dir=".")
except: pass
if os.path.exists(filename):
try:
with open(filename, 'r', encoding='utf-8') as f: return json.load(f)
except: pass
return []
def save_data(data, filename):
with open(filename, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=2)
if HF_TOKEN and DATASET_REPO_ID:
try:
api = HfApi(token=HF_TOKEN)
api.upload_file(path_or_fileobj=filename, path_in_repo=filename, repo_id=DATASET_REPO_ID, repo_type="dataset", commit_message="Sync")
except: pass
# ==========================================
# 🖥️ 介面邏輯 (UI Helpers)
# ==========================================
def format_df(data_list, cols):
if not data_list: return pd.DataFrame(columns=cols)
res = []
for d in data_list:
icon = {'match':'✅','good':'✅','risk':'⚠️'}.get(d.get('status'),'')
res.append([f"{icon} {d.get('name')}", d.get('university') or d.get('industry'), ", ".join(d.get('tags',[]))])
return pd.DataFrame(res, columns=cols)
# ==========================================
# 🚀 主程式 (Gradio)
# ==========================================
def main_app():
# 初始化
prof_data = load_data(PROF_SAVE_FILE)
comp_data = load_data(COMP_SAVE_FILE)
with gr.Blocks(title="Prof.404 x PPT.404 Ultimate", theme=gr.themes.Soft()) as demo:
# 全域 Key
with gr.Accordion("🔑 系統設定 (API Key)", open=False):
api_key = gr.Textbox(label="Google Gemini API Key", type="password", placeholder="若未填寫則使用系統預設")
gr.Markdown(
"""
<div align="center">
<h1>🚀 Prof.404 Ultimate: 產學導航 & 文件工具站</h1>
<h3>整合文件視覺處理 (PPT/Img) 與 產學資源導航 (Prof/Com) 的全方位平台</h3>
</div>
"""
)
with gr.Tabs():
# --- Tab 1: 工具箱 ---
with gr.Tab("🛠️ 文件工具箱 (PPT.404)"):
with gr.Row():
with gr.Column():
gr.Markdown("### 📄 PDF 轉 PPTX (含排版還原)")
pdf_file = gr.File(label="上傳 PDF")
pdf_btn = gr.Button("開始轉換", variant="primary")
ppt_out = gr.File(label="下載 PPTX")
pdf_msg = gr.Textbox(label="狀態", interactive=False)
pdf_btn.click(
lambda f, k: UnifiedService(k).analyze_pdf_to_pptx(f, gr.Progress()),
inputs=[pdf_file, api_key], outputs=[ppt_out, pdf_msg]
)
with gr.Column():
gr.Markdown("### 🎨 圖片智慧去字")
img_in = gr.Image(type="pil", label="原圖")
img_btn = gr.Button("一鍵去除", variant="primary")
img_out = gr.Image(label="結果")
img_btn.click(
lambda i, k: UnifiedService(k).remove_text(i),
inputs=[img_in, api_key], outputs=[img_out]
)
# --- Tab 2: 找教授 ---
with gr.Tab("🎓 找教授 (Prof.404)"):
p_state = gr.State(prof_data)
p_current = gr.State(None) # 當前選中的教授
with gr.Row():
p_query = gr.Textbox(label="搜尋領域", scale=4)
p_btn = gr.Button("搜尋", scale=1)
with gr.Row():
p_table = gr.Dataframe(headers=["姓名", "大學", "標籤"], interactive=False, scale=1)
with gr.Column(scale=1, visible=False) as p_detail_col:
p_md = gr.Markdown()
p_chat = gr.Chatbot(height=300)
p_msg = gr.Textbox(label="詢問關於此教授")
# Logic Wrappers
def search_p(q, k, saved):
svc = UnifiedService(k)
res = svc.search_professors(q)
return res, format_df(res, ["姓名","大學","標籤"])
def select_p(evt: gr.SelectData, res, k, saved):
svc = UnifiedService(k)
item = res[evt.index[0]]
# 取得詳細資料
det = svc.get_details(item, "Academic Consultant")
item['details'] = det['text']
# 簡易儲存邏輯 (為了Demo簡化,實際建議加上去重)
saved.append(item)
save_data(saved, PROF_SAVE_FILE)
display_text = det['text'] + "\n\n📚 來源:\n" + "\n".join([f"- {s['title']}" for s in det['sources']])
return gr.update(visible=True), display_text, [], item, saved
def chat_p(hist, msg, item, k):
svc = UnifiedService(k)
reply = svc.chat(hist, msg, item.get('details'), "Academic Consultant")
hist.append((msg, reply))
return hist, ""
p_btn.click(search_p, [p_query, api_key, p_state], [p_state, p_table])
p_table.select(select_p, [p_state, api_key, p_state], [p_detail_col, p_md, p_chat, p_current, p_state])
p_msg.submit(chat_p, [p_chat, p_msg, p_current, api_key], [p_chat, p_msg])
# --- Tab 3: 找公司 ---
with gr.Tab("🏢 找公司 (Com.404)"):
c_state = gr.State(comp_data)
c_current = gr.State(None)
with gr.Row():
c_query = gr.Textbox(label="搜尋產業/公司", scale=4)
c_btn = gr.Button("搜尋", scale=1)
with gr.Row():
c_table = gr.Dataframe(headers=["公司", "產業", "標籤"], interactive=False, scale=1)
with gr.Column(scale=1, visible=False) as c_detail_col:
c_md = gr.Markdown()
c_chat = gr.Chatbot(height=300)
c_msg = gr.Textbox(label="詢問關於此公司")
# Logic Wrappers (Similar structure)
def search_c(q, k, saved):
svc = UnifiedService(k)
res = svc.search_companies(q)
return res, format_df(res, ["公司","產業","標籤"])
def select_c(evt: gr.SelectData, res, k, saved):
svc = UnifiedService(k)
item = res[evt.index[0]]
det = svc.get_details(item, "Business Analyst")
item['details'] = det['text']
saved.append(item)
save_data(saved, COMP_SAVE_FILE)
display_text = det['text'] + "\n\n📚 來源:\n" + "\n".join([f"- {s['title']}" for s in det['sources']])
return gr.update(visible=True), display_text, [], item, saved
def chat_c(hist, msg, item, k):
svc = UnifiedService(k)
reply = svc.chat(hist, msg, item.get('details'), "Business Analyst")
hist.append((msg, reply))
return hist, ""
c_btn.click(search_c, [c_query, api_key, c_state], [c_state, c_table])
c_table.select(select_c, [c_state, api_key, c_state], [c_detail_col, c_md, c_chat, c_current, c_state])
c_msg.submit(chat_c, [c_chat, c_msg, c_current, api_key], [c_chat, c_msg])
demo.queue().launch()
if __name__ == "__main__":
main_app()