Spaces:
Paused
Paused
| import gradio as gr | |
| import google.generativeai as genai | |
| import os | |
| import json | |
| import pandas as pd | |
| import tempfile | |
| from pdf2image import convert_from_path | |
| from pptx import Presentation | |
| from pptx.util import Inches, Pt | |
| from pptx.dml.color import RGBColor | |
| from huggingface_hub import HfApi, hf_hub_download | |
| from dotenv import load_dotenv | |
| # --- 設定與常數 --- | |
| load_dotenv() | |
| PROF_SAVE_FILE = "saved_professors.json" | |
| COMP_SAVE_FILE = "saved_companies.json" | |
| HF_TOKEN = os.getenv("HF_TOKEN") | |
| DATASET_REPO_ID = os.getenv("DATASET_REPO_ID") | |
| # ========================================== | |
| # 🧠 核心服務層 (The Logic / Chef) | |
| # ========================================== | |
| class UnifiedService: | |
| def __init__(self, api_key_input=None): | |
| self.api_key = self._get_api_key(api_key_input) | |
| if self.api_key: | |
| genai.configure(api_key=self.api_key) | |
| # 使用支援 Google Search 的模型 | |
| self.model_name = "gemini-2.0-flash-exp" | |
| def _get_api_key(self, user_key): | |
| if user_key and user_key.strip(): return user_key.strip() | |
| system_key = os.getenv("GEMINI_API_KEY") | |
| if system_key: return system_key | |
| return None # 允許初始化時無 Key,但在使用功能時會噴錯 | |
| def _check_key(self): | |
| if not self.api_key: raise ValueError("請先輸入 API Key 或設定系統環境變數") | |
| # --- 1. PDF 轉 PPTX --- | |
| def analyze_pdf_to_pptx(self, pdf_file, progress): | |
| self._check_key() | |
| model = genai.GenerativeModel(self.model_name) | |
| prs = Presentation() | |
| prs.slide_width = Inches(16); prs.slide_height = Inches(9) | |
| progress(0.1, desc="轉檔中...") | |
| images = convert_from_path(pdf_file) | |
| for i, img in enumerate(images): | |
| progress(0.1 + (0.8 * (i / len(images))), desc=f"分析第 {i+1} 頁...") | |
| slide = prs.slides.add_slide(prs.slide_layouts[6]) | |
| prompt = "Detect all text blocks. Return JSON: [{'text':..., 'box_2d':[ymin,xmin,ymax,xmax] (0-1000), 'font_size':int, 'is_bold':bool, 'color':hex}]" | |
| try: | |
| response = model.generate_content([prompt, img], generation_config={"response_mime_type": "application/json"}) | |
| blocks = json.loads(response.text) | |
| for b in blocks: | |
| box = b.get("box_2d", [0,0,0,0]) | |
| left, top = Inches((box[1]/1000)*16), Inches((box[0]/1000)*9) | |
| width, height = Inches(((box[3]-box[1])/1000)*16), Inches(((box[2]-box[0])/1000)*9) | |
| tx = slide.shapes.add_textbox(left, top, width, height) | |
| p = tx.text_frame.paragraphs[0] | |
| p.text = b.get("text",""); p.font.size = Pt(b.get("font_size", 12)); p.font.bold = b.get("is_bold", False) | |
| try: p.font.color.rgb = RGBColor.from_string(b.get("color", "#000000").replace("#","")) | |
| except: pass | |
| except Exception as e: print(f"Page {i} err: {e}") | |
| out = tempfile.mktemp(suffix=".pptx") | |
| prs.save(out) | |
| return out, "✅ 轉換完成" | |
| # --- 2. 圖片去字 --- | |
| def remove_text(self, image): | |
| self._check_key() | |
| model = genai.GenerativeModel(self.model_name) | |
| prompt = "Remove all text from this image, fill background naturally. Return image only." | |
| resp = model.generate_content([prompt, image]) # V1 SDK 通常回傳 multipart,這裡簡化處理 | |
| # 注意: Gemini V1 SDK 在 Python 直接回傳 image 比較 tricky,若失敗建議檢查 SDK 版本 | |
| # 這裡假設環境支援直接回圖,若否則需用 requests 操作 REST API | |
| try: | |
| return resp.parts[0].image | |
| except: | |
| return image # Fallback | |
| # --- 3. 搜尋 (教授/公司) 共用邏輯 --- | |
| def _search_with_google(self, query, prompt_template): | |
| self._check_key() | |
| # 這裡使用 Google Search Tool 設定 | |
| tools = [{"google_search": {}}] | |
| model = genai.GenerativeModel(self.model_name, tools=tools) | |
| # Step 1: Search | |
| resp1 = model.generate_content(prompt_template.format(query=query)) | |
| # Step 2: Extract JSON (Pure Text Model) | |
| model_extract = genai.GenerativeModel(self.model_name) # No tools for extraction | |
| extract_prompt = f"Extract structured data from this text into JSON array: {resp1.text}" | |
| resp2 = model_extract.generate_content(extract_prompt, generation_config={"response_mime_type": "application/json"}) | |
| try: return json.loads(resp2.text) | |
| except: return [] | |
| def search_professors(self, query): | |
| p = "Find 10 prominent professors in Taiwan for '{query}'. Return name, university, department." | |
| return self._search_with_google(query, p) | |
| def search_companies(self, query): | |
| p = "Find 5-10 Taiwanese companies for '{query}'. Return name, industry." | |
| return self._search_with_google(query, p) | |
| def get_details(self, data, role): | |
| self._check_key() | |
| tools = [{"google_search": {}}] | |
| model = genai.GenerativeModel(self.model_name, tools=tools) | |
| prompt = f"Act as {role}. Investigate: {json.dumps(data)}. Report in Traditional Chinese Markdown." | |
| resp = model.generate_content(prompt) | |
| # 處理來源引用 (V1 SDK) | |
| sources = [] | |
| if hasattr(resp.candidates[0], 'grounding_metadata'): | |
| chunks = resp.candidates[0].grounding_metadata.grounding_chunks | |
| for c in chunks: | |
| if c.web: sources.append({"title": c.web.title, "uri": c.web.uri}) | |
| # 去重 | |
| unique_sources = list({v['uri']:v for v in sources}.values()) | |
| return {"text": resp.text, "sources": unique_sources} | |
| def chat(self, hist, msg, context, role): | |
| self._check_key() | |
| model = genai.GenerativeModel(self.model_name) | |
| chat = model.start_chat(history=[ | |
| {"role": "user" if h[0] else "model", "parts": [h[0] or h[1]]} for h in hist | |
| ]) | |
| full_msg = f"Context: {context}\nInstruction: {role}\nUser: {msg}" | |
| resp = chat.send_message(full_msg) | |
| return resp.text | |
| # ========================================== | |
| # 💾 資料存取層 (Persistence) | |
| # ========================================== | |
| def load_data(filename): | |
| if HF_TOKEN and DATASET_REPO_ID: | |
| try: hf_hub_download(repo_id=DATASET_REPO_ID, filename=filename, repo_type="dataset", token=HF_TOKEN, local_dir=".") | |
| except: pass | |
| if os.path.exists(filename): | |
| try: | |
| with open(filename, 'r', encoding='utf-8') as f: return json.load(f) | |
| except: pass | |
| return [] | |
| def save_data(data, filename): | |
| with open(filename, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=2) | |
| if HF_TOKEN and DATASET_REPO_ID: | |
| try: | |
| api = HfApi(token=HF_TOKEN) | |
| api.upload_file(path_or_fileobj=filename, path_in_repo=filename, repo_id=DATASET_REPO_ID, repo_type="dataset", commit_message="Sync") | |
| except: pass | |
| # ========================================== | |
| # 🖥️ 介面邏輯 (UI Helpers) | |
| # ========================================== | |
| def format_df(data_list, cols): | |
| if not data_list: return pd.DataFrame(columns=cols) | |
| res = [] | |
| for d in data_list: | |
| icon = {'match':'✅','good':'✅','risk':'⚠️'}.get(d.get('status'),'') | |
| res.append([f"{icon} {d.get('name')}", d.get('university') or d.get('industry'), ", ".join(d.get('tags',[]))]) | |
| return pd.DataFrame(res, columns=cols) | |
| # ========================================== | |
| # 🚀 主程式 (Gradio) | |
| # ========================================== | |
| def main_app(): | |
| # 初始化 | |
| prof_data = load_data(PROF_SAVE_FILE) | |
| comp_data = load_data(COMP_SAVE_FILE) | |
| with gr.Blocks(title="Prof.404 x PPT.404 Ultimate", theme=gr.themes.Soft()) as demo: | |
| # 全域 Key | |
| with gr.Accordion("🔑 系統設定 (API Key)", open=False): | |
| api_key = gr.Textbox(label="Google Gemini API Key", type="password", placeholder="若未填寫則使用系統預設") | |
| gr.Markdown( | |
| """ | |
| <div align="center"> | |
| <h1>🚀 Prof.404 Ultimate: 產學導航 & 文件工具站</h1> | |
| <h3>整合文件視覺處理 (PPT/Img) 與 產學資源導航 (Prof/Com) 的全方位平台</h3> | |
| </div> | |
| """ | |
| ) | |
| with gr.Tabs(): | |
| # --- Tab 1: 工具箱 --- | |
| with gr.Tab("🛠️ 文件工具箱 (PPT.404)"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("### 📄 PDF 轉 PPTX (含排版還原)") | |
| pdf_file = gr.File(label="上傳 PDF") | |
| pdf_btn = gr.Button("開始轉換", variant="primary") | |
| ppt_out = gr.File(label="下載 PPTX") | |
| pdf_msg = gr.Textbox(label="狀態", interactive=False) | |
| pdf_btn.click( | |
| lambda f, k: UnifiedService(k).analyze_pdf_to_pptx(f, gr.Progress()), | |
| inputs=[pdf_file, api_key], outputs=[ppt_out, pdf_msg] | |
| ) | |
| with gr.Column(): | |
| gr.Markdown("### 🎨 圖片智慧去字") | |
| img_in = gr.Image(type="pil", label="原圖") | |
| img_btn = gr.Button("一鍵去除", variant="primary") | |
| img_out = gr.Image(label="結果") | |
| img_btn.click( | |
| lambda i, k: UnifiedService(k).remove_text(i), | |
| inputs=[img_in, api_key], outputs=[img_out] | |
| ) | |
| # --- Tab 2: 找教授 --- | |
| with gr.Tab("🎓 找教授 (Prof.404)"): | |
| p_state = gr.State(prof_data) | |
| p_current = gr.State(None) # 當前選中的教授 | |
| with gr.Row(): | |
| p_query = gr.Textbox(label="搜尋領域", scale=4) | |
| p_btn = gr.Button("搜尋", scale=1) | |
| with gr.Row(): | |
| p_table = gr.Dataframe(headers=["姓名", "大學", "標籤"], interactive=False, scale=1) | |
| with gr.Column(scale=1, visible=False) as p_detail_col: | |
| p_md = gr.Markdown() | |
| p_chat = gr.Chatbot(height=300) | |
| p_msg = gr.Textbox(label="詢問關於此教授") | |
| # Logic Wrappers | |
| def search_p(q, k, saved): | |
| svc = UnifiedService(k) | |
| res = svc.search_professors(q) | |
| return res, format_df(res, ["姓名","大學","標籤"]) | |
| def select_p(evt: gr.SelectData, res, k, saved): | |
| svc = UnifiedService(k) | |
| item = res[evt.index[0]] | |
| # 取得詳細資料 | |
| det = svc.get_details(item, "Academic Consultant") | |
| item['details'] = det['text'] | |
| # 簡易儲存邏輯 (為了Demo簡化,實際建議加上去重) | |
| saved.append(item) | |
| save_data(saved, PROF_SAVE_FILE) | |
| display_text = det['text'] + "\n\n📚 來源:\n" + "\n".join([f"- {s['title']}" for s in det['sources']]) | |
| return gr.update(visible=True), display_text, [], item, saved | |
| def chat_p(hist, msg, item, k): | |
| svc = UnifiedService(k) | |
| reply = svc.chat(hist, msg, item.get('details'), "Academic Consultant") | |
| hist.append((msg, reply)) | |
| return hist, "" | |
| p_btn.click(search_p, [p_query, api_key, p_state], [p_state, p_table]) | |
| p_table.select(select_p, [p_state, api_key, p_state], [p_detail_col, p_md, p_chat, p_current, p_state]) | |
| p_msg.submit(chat_p, [p_chat, p_msg, p_current, api_key], [p_chat, p_msg]) | |
| # --- Tab 3: 找公司 --- | |
| with gr.Tab("🏢 找公司 (Com.404)"): | |
| c_state = gr.State(comp_data) | |
| c_current = gr.State(None) | |
| with gr.Row(): | |
| c_query = gr.Textbox(label="搜尋產業/公司", scale=4) | |
| c_btn = gr.Button("搜尋", scale=1) | |
| with gr.Row(): | |
| c_table = gr.Dataframe(headers=["公司", "產業", "標籤"], interactive=False, scale=1) | |
| with gr.Column(scale=1, visible=False) as c_detail_col: | |
| c_md = gr.Markdown() | |
| c_chat = gr.Chatbot(height=300) | |
| c_msg = gr.Textbox(label="詢問關於此公司") | |
| # Logic Wrappers (Similar structure) | |
| def search_c(q, k, saved): | |
| svc = UnifiedService(k) | |
| res = svc.search_companies(q) | |
| return res, format_df(res, ["公司","產業","標籤"]) | |
| def select_c(evt: gr.SelectData, res, k, saved): | |
| svc = UnifiedService(k) | |
| item = res[evt.index[0]] | |
| det = svc.get_details(item, "Business Analyst") | |
| item['details'] = det['text'] | |
| saved.append(item) | |
| save_data(saved, COMP_SAVE_FILE) | |
| display_text = det['text'] + "\n\n📚 來源:\n" + "\n".join([f"- {s['title']}" for s in det['sources']]) | |
| return gr.update(visible=True), display_text, [], item, saved | |
| def chat_c(hist, msg, item, k): | |
| svc = UnifiedService(k) | |
| reply = svc.chat(hist, msg, item.get('details'), "Business Analyst") | |
| hist.append((msg, reply)) | |
| return hist, "" | |
| c_btn.click(search_c, [c_query, api_key, c_state], [c_state, c_table]) | |
| c_table.select(select_c, [c_state, api_key, c_state], [c_detail_col, c_md, c_chat, c_current, c_state]) | |
| c_msg.submit(chat_c, [c_chat, c_msg, c_current, api_key], [c_chat, c_msg]) | |
| demo.queue().launch() | |
| if __name__ == "__main__": | |
| main_app() |