Spaces:
Sleeping
Sleeping
| import os, json, traceback, io | |
| from fastapi import FastAPI, Request, HTTPException | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import RedirectResponse, PlainTextResponse | |
| import gradio as gr | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| # ========= 画面から見られる簡易ログ ========= | |
| _APP_LOG = io.StringIO() | |
| def _log(msg: str): | |
| print(msg, flush=True) | |
| _APP_LOG.write(msg + "\n") | |
| # ========= 例外は必ず画面にも表示 ========= | |
| async def _exception_printer(request: Request, exc: Exception): | |
| tb = "".join(traceback.format_exception(type(exc), exc, exc.__traceback__)) | |
| _log("=== Unhandled Exception ===\n" + tb) | |
| return PlainTextResponse(f"Internal error\n\n{tb}", status_code=500) | |
| # ========= 重い依存は遅延インポート ========= | |
| def _lazy_imports_for_main(): | |
| from modules.utils import ensure_dirs | |
| ensure_dirs() | |
| from modules.rag_indexer import index_files_and_urls | |
| from modules.workflow import run_full_workflow | |
| from modules.emailer import build_tracking_url | |
| return index_files_and_urls, run_full_workflow, build_tracking_url | |
| # ========= Gradio コールバック(型アノテ外し版) ========= | |
| def ui_company_score_and_proposal(company_name, | |
| company_website, | |
| lead_email, | |
| urls_text, | |
| files=None, | |
| custom_objective="", | |
| temperature=0.4): | |
| try: | |
| index_files_and_urls, run_full_workflow, build_tracking_url = _lazy_imports_for_main() | |
| _log("[init] lazy imports loaded") | |
| except Exception: | |
| tb = traceback.format_exc() | |
| _log("[error] init failed\n" + tb) | |
| return "### ❌ 初期化エラー\n```\n" + tb + "\n```" | |
| urls = [u.strip() for u in (urls_text or "").splitlines() if u.strip()] | |
| # gr.File or str のどちらでも拾えるように安全化 | |
| try: | |
| iterable = files or [] | |
| # Gradio の multiple=False のときに単体で来た場合に備える | |
| if not isinstance(iterable, (list, tuple)): | |
| iterable = [iterable] | |
| file_paths = [getattr(f, "name", str(f)) for f in iterable if f] | |
| except Exception: | |
| file_paths = [] | |
| # 1) インデックス | |
| try: | |
| index_report = index_files_and_urls(file_paths=file_paths, urls=urls) | |
| _log(f"[index] {index_report}") | |
| except Exception: | |
| tb = traceback.format_exc() | |
| _log("[error] index failed\n" + tb) | |
| index_report = "Index error:\n```\n" + tb + "\n```" | |
| # 2) ワークフロー | |
| try: | |
| result = run_full_workflow( | |
| company_name=company_name, | |
| company_website=company_website, | |
| lead_email=lead_email, | |
| objective=custom_objective, | |
| temperature=float(temperature or 0.4), | |
| ) | |
| _log("[workflow] finished") | |
| except Exception: | |
| tb = traceback.format_exc() | |
| _log("[error] workflow failed\n" + tb) | |
| return "### ❌ ワークフロー実行エラー\n```\n" + tb + "\n```" | |
| # 3) 表示 | |
| outputs = [] | |
| try: | |
| outputs.append("### ✅ 企業スコア\n" + json.dumps(result.get("score", {}), ensure_ascii=False, indent=2)) | |
| ctxs = result.get("top_contexts", []) | |
| outputs.append("### 🧠 抽出コンテキスト(上位)\n" + "\n\n".join([f"- {c[:300]}..." for c in ctxs])) | |
| outputs.append("### ✍️ 提案ドラフト\n" + result.get("proposal_markdown", "")) | |
| ex = result.get("exports", {}) | |
| outputs.append("### 📎 エクスポート\n" + "\n".join([ | |
| f"- DOCX: {ex.get('docx_path','')}", | |
| f"- PPTX: {ex.get('pptx_path','')}" | |
| ])) | |
| if lead_email: | |
| email = result.get("email", {}) | |
| outputs.append("### ✉️ メール準備\n" + f"To: {lead_email}\nSubject: {email.get('subject','')}\n\n{email.get('body','')}") | |
| try: | |
| outputs.append(f"(本文内の計測リンク例)\n{build_tracking_url('preview-only', {'company':company_name})}") | |
| except Exception: | |
| pass | |
| outputs.append("### 🤖 次アクション提案\n" + result.get("next_actions", "")) | |
| outputs.append("### 🧩 インデックス更新ログ\n" + index_report) | |
| except Exception: | |
| tb = traceback.format_exc() | |
| _log("[error] render failed\n" + tb) | |
| outputs.append("### ⚠️ 出力整形エラー\n```\n" + tb + "\n```") | |
| return "\n\n".join(outputs) | |
| def get_health_text(): | |
| return "OK" | |
| def get_runtime_log(): | |
| try: | |
| return "```\n" + _APP_LOG.getvalue() + "\n```" | |
| except Exception: | |
| return "(ログを取得できませんでした)" | |
| # ========= Gradio UI(ルート / に表示) ========= | |
| with gr.Blocks(title="営業自動化 Agent Studio") as demo: | |
| gr.Markdown("## 営業自動化 Agent Studio(Space内で完結)") | |
| with gr.Tab("アプリ"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| company_name = gr.Textbox(label="企業名") | |
| company_website = gr.Textbox(label="企業サイトURL") | |
| lead_email = gr.Textbox(label="送信先メール(任意)") | |
| custom_objective = gr.Textbox(label="提案の目的/狙い(任意)", placeholder="例)SaaS導入の無料PoC打診") | |
| urls_text = gr.Textbox(label="RAG用URL(複数は改行)", lines=4) | |
| files = gr.File(label="RAG用ファイル(複数可)", file_count="multiple") | |
| temperature = gr.Slider(0.0, 1.0, value=0.4, step=0.05, label="生成温度") | |
| run_btn = gr.Button("ワークフロー実行", variant="primary") | |
| ping_btn = gr.Button("Ping(疎通)") | |
| with gr.Column(): | |
| out = gr.Markdown() | |
| ping_out = gr.Markdown() | |
| run_btn.click( | |
| fn=ui_company_score_and_proposal, | |
| inputs=[company_name, company_website, lead_email, urls_text, files, custom_objective, temperature], | |
| outputs=[out] | |
| ) | |
| ping_btn.click(lambda: "pong 🎯", None, ping_out) | |
| with gr.Tab("ステータス / ログ"): | |
| health_btn = gr.Button("ヘルスチェック") | |
| health_out = gr.Markdown() | |
| log_btn = gr.Button("アプリログを表示") | |
| log_out = gr.Markdown(value="(ここに実行ログが表示されます)") | |
| health_btn.click(fn=get_health_text, inputs=None, outputs=health_out) | |
| log_btn.click(fn=get_runtime_log, inputs=None, outputs=log_out) | |
| # ========= FastAPI(/ はUI、/ui は / へ転送、/t と /health はAPI) ========= | |
| app = FastAPI(title="Agent Studio - Docker (root UI)") | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], | |
| ) | |
| app.add_exception_handler(Exception, _exception_printer) | |
| def health(): | |
| return {"ok": True} | |
| def old_ui(): | |
| return RedirectResponse(url="/") # 旧リンク互換 | |
| def track_click(token: str, request: Request): | |
| try: | |
| from modules.utils import verify_tracking_token, log_event | |
| payload = verify_tracking_token(token) | |
| if payload is None: | |
| raise HTTPException(status_code=400, detail="invalid token") | |
| ip = request.client.host if request.client else "unknown" | |
| ua = request.headers.get("User-Agent", "") if request.headers else "" | |
| log_event("click", payload, {"ip": ip, "ua": ua}) | |
| redirect_to = payload.get("redirect") or os.getenv("PUBLIC_BASE_URL", "/") | |
| return RedirectResponse(url=redirect_to) | |
| except Exception as e: | |
| tb = "".join(traceback.format_exception(type(e), e, e.__traceback__)) | |
| _log("[error] tracking failed\n" + tb) | |
| return PlainTextResponse(f"tracking error\n\n{tb}", status_code=500) | |
| # ルート(/)で Gradio を表示(Space 内で完結) | |
| app = gr.mount_gradio_app(app, demo, path="/") | |