# modules/workflow.py import os from typing import Dict, Any, List # ========================= # retriever のロード(相対 → 絶対 → フォールバック実装) # ========================= def _fallback_retrieve_contexts(query: str, top_k: int = 5) -> List[str]: """ フォールバックの簡易 RAG: - modules.utils.data_dir()/chunks.jsonl を読み込み - キーワード一致スコアで上位を返す(埋め込みなし) """ import json from pathlib import Path try: # 遅延 import(循環回避 & 起動高速化) from modules.utils import data_dir, ensure_dirs ensure_dirs() p = data_dir() / "chunks.jsonl" if not p.exists(): return [] rows = [] with open(p, "r", encoding="utf-8") as f: for line in f: line = line.strip() if not line: continue try: obj = json.loads(line) if isinstance(obj, dict) and obj.get("text"): rows.append(obj) except Exception: continue if not rows: return [] # ごく簡易なキーワード一致スコア q = (query or "").lower() def score(t: str) -> int: t = (t or "").lower() s = 0 for w in q.split(): if w and w in t: s += 1 return s scored = sorted(rows, key=lambda r: score(str(r.get("text",""))), reverse=True) top = scored[:max(1, int(top_k))] out = [] for r in top: txt = str(r.get("text","")).strip() src = r.get("source") out.append(f"{txt}\n[source] {src}" if src else txt) return out except Exception: return [] # まず相対 import を試す try: from .rag_retriever import retrieve_contexts # type: ignore except Exception: # 絶対 import try: from modules.rag_retriever import retrieve_contexts # type: ignore except Exception: # 最後の手段: フォールバックを使う retrieve_contexts = _fallback_retrieve_contexts # type: ignore # exporters / utils は相対を優先 try: from .exporters import export_docx, export_pptx from .utils import ensure_dirs, export_dir, make_tracking_token except Exception: from modules.exporters import export_docx, export_pptx # type: ignore from modules.utils import ensure_dirs, export_dir, make_tracking_token # type: ignore def _score_company(company_name: str, website: str, contexts: List[str]) -> Dict[str, Any]: """ デモ用の簡易スコア(実運用では独自ロジック/モデルと差し替え可) """ score = 50 if website: score += 10 if contexts: score += min(40, len(contexts) * 5) return { "company": company_name, "website": website, "score": min(score, 95), "factors": { "has_website": bool(website), "ctx_hits": len(contexts), }, } def _draft_proposal_md(company_name: str, objective: str, contexts: List[str]) -> str: bullets = "\n".join([f"- {c[:200]}..." for c in contexts[:5]]) obj = objective or "商談化のための初回提案" return f"""# 提案ドラフト({company_name} 向け) ## 目的 {obj} ## 背景インサイト(上位抜粋) {bullets if bullets else '- (まだインデックスが空です)'} ## 提案骨子 1. 現状の課題仮説を確認 2. PoC スコープのすり合わせ 3. 成果指標(KPI)とスケジュール 4. 次のアクション """ def _draft_email(company_name: str) -> Dict[str, str]: subject = f"{company_name}様向けご提案(ドラフト)" body = ( f"{company_name} ご担当者様\n\n" "お世話になっております。簡易の提案ドラフトを共有いたします。\n" "ご確認の上、ご意見いただけますと幸いです。\n\n" "何卒よろしくお願いいたします。" ) return {"subject": subject, "body": body} def run_full_workflow( company_name: str, company_website: str, lead_email: str, objective: str, temperature: float = 0.4, ) -> Dict[str, Any]: """ app.py から呼ばれるメイン処理。 - RAG で上位コンテキスト取得(retriever が無い場合はフォールバック) - 簡易スコアリング - 提案ドラフト生成 - DOCX/PPTX にエクスポート - メール文面と次アクションの素案 """ ensure_dirs() query = f"{company_name} {objective or ''}".strip() top_contexts = retrieve_contexts(query, top_k=5) score = _score_company(company_name, company_website, top_contexts) proposal_md = _draft_proposal_md(company_name, objective, top_contexts) # エクスポート先パス out_dir = export_dir() docx_path = os.path.join(out_dir, f"{company_name}_proposal.docx") pptx_path = os.path.join(out_dir, f"{company_name}_proposal.pptx") # 出力 export_docx(proposal_md, docx_path) export_pptx(proposal_md, pptx_path) email = _draft_email(company_name) # 次アクション(ダミー) next_actions = ( "- 先方の課題確認ミーティング候補日程を提案する\n" "- PoC スコープの要件一覧を作成して共有\n" "- 競合比較の要点を 3 点に要約\n" ) # 計測リンクの例(app.py の /t/{token} で受ける想定) tracking_token = make_tracking_token({"company": company_name, "redirect": "/"}) return { "score": score, "top_contexts": top_contexts, "proposal_markdown": proposal_md, "exports": {"docx_path": docx_path, "pptx_path": pptx_path}, "email": email, "next_actions": next_actions, "tracking_token": tracking_token, }