ritvikv03
fix: switch to Qwen2.5-7B-Instruct with provider=auto β confirmed available on HF
e3d8c22 | """ | |
| SummaryEngine β Use-case agnostic AI summary generator with persistence. | |
| ======================================================================== | |
| Generates executive summaries using HuggingFace with a rule-based fallback. | |
| Results are persisted to SQLite so session restarts don't lose summaries. | |
| Usage (from any module):: | |
| from core.summary_engine import SummaryEngine | |
| engine = SummaryEngine(use_case_id="fendt-pestel") | |
| text, source = engine.generate( | |
| context_data={"total": 42, "critical": 3}, | |
| prompt_template="Summarize {total} signals, {critical} are CRITICAL.", | |
| fallback_fn=my_rule_based_fn, | |
| ) | |
| # source is "ai" or "rule_based" | |
| # Retrieve the most recently persisted summary without calling the API: | |
| cached = engine.get_latest() | |
| """ | |
| from __future__ import annotations | |
| import os | |
| import sqlite3 | |
| from datetime import datetime, timezone | |
| from pathlib import Path | |
| from typing import Any, Callable, Dict, List, Optional, Tuple | |
| class SummaryEngine: | |
| """ | |
| AI summary generator with automatic fallback and SQLite persistence. | |
| Parameters | |
| ---------- | |
| use_case_id: | |
| Stable identifier for the caller (e.g. "fendt-pestel"). | |
| Used as a namespace key in the summaries table. | |
| db_path: | |
| Path to the SQLite database. Defaults to the shared summaries.db | |
| inside data/ so no extra file is created. | |
| hf_token: | |
| Read from ``HUGGINGFACEHUB_API_TOKEN`` env var automatically. | |
| If unset or quota is exhausted, falls back to ``fallback_fn``. | |
| """ | |
| def __init__( | |
| self, | |
| use_case_id: str, | |
| db_path: Optional[str] = None, | |
| hf_token: Optional[str] = None, | |
| ) -> None: | |
| self.use_case_id = use_case_id | |
| self._quota_hit = False | |
| if db_path is None: | |
| db_path = str( | |
| Path(__file__).parent.parent / "data" / "summaries.db" | |
| ) | |
| self.db_path = db_path | |
| self._ensure_summaries_table() | |
| # ------------------------------------------------------------------ | |
| # Public API | |
| # ------------------------------------------------------------------ | |
| def generate( | |
| self, | |
| context_data: Dict[str, Any], | |
| prompt_template: str, | |
| fallback_fn: Callable[[Dict[str, Any]], str], | |
| max_tokens: int = 300, | |
| persist: bool = True, | |
| ) -> Tuple[str, str]: | |
| """ | |
| Generate a summary using AI with automatic fallback. | |
| Parameters | |
| ---------- | |
| context_data: | |
| Arbitrary dict. Values are substituted into ``prompt_template`` | |
| via ``str.format_map`` and passed to ``fallback_fn``. | |
| prompt_template: | |
| String with ``{key}`` placeholders matching keys in | |
| ``context_data``. | |
| fallback_fn: | |
| ``callable(context_data) -> str`` used when HuggingFace is | |
| unavailable or quota is exhausted. | |
| max_tokens: | |
| Max output tokens for the HuggingFace call. | |
| persist: | |
| If ``True``, save the result to SQLite (default). | |
| Returns | |
| ------- | |
| (summary_text, source) | |
| ``source`` is ``"ai"`` or ``"rule_based"``. | |
| """ | |
| # Guard: empty / None data must never crash here | |
| if not context_data: | |
| return self._no_data_message(), "rule_based" | |
| ai_text = self._call_llm(prompt_template, context_data, max_tokens) | |
| if ai_text: | |
| result, source = ai_text, "ai" | |
| else: | |
| result, source = self._safe_fallback(fallback_fn, context_data), "rule_based" | |
| if persist: | |
| self._persist(result, source) | |
| return result, source | |
| def get_latest(self) -> Optional[str]: | |
| """Return the most recently persisted summary for this use case.""" | |
| try: | |
| with self._connection() as conn: | |
| row = conn.execute( | |
| "SELECT content FROM summaries " | |
| "WHERE use_case_id = ? " | |
| "ORDER BY created_at DESC LIMIT 1", | |
| (self.use_case_id,), | |
| ).fetchone() | |
| return row[0] if row else None | |
| except Exception: | |
| return None | |
| # ------------------------------------------------------------------ | |
| # Internal helpers | |
| # ------------------------------------------------------------------ | |
| def _call_llm( | |
| self, | |
| prompt_template: str, | |
| context_data: Dict[str, Any], | |
| max_tokens: int, | |
| ) -> Optional[str]: | |
| """Try HuggingFace InferenceClient chat_completion. Returns text or None on failure.""" | |
| if self._quota_hit: | |
| return None | |
| hf_token = os.getenv("HUGGINGFACEHUB_API_TOKEN", "") | |
| if not hf_token: | |
| return None | |
| try: | |
| from huggingface_hub import InferenceClient | |
| prompt = prompt_template.format_map(context_data) | |
| client = InferenceClient(api_key=hf_token, provider="auto") | |
| response = client.chat_completion( | |
| model="Qwen/Qwen2.5-7B-Instruct", | |
| messages=[{"role": "user", "content": prompt}], | |
| max_tokens=max_tokens, | |
| temperature=0.3, | |
| ) | |
| result = response.choices[0].message.content.strip() | |
| return result or None | |
| except Exception as exc: | |
| err = str(exc) | |
| if "429" in err or "quota" in err.lower() or "rate" in err.lower(): | |
| self._quota_hit = True | |
| return None | |
| def _safe_fallback( | |
| self, | |
| fallback_fn: Callable[[Dict[str, Any]], str], | |
| context_data: Dict[str, Any], | |
| ) -> str: | |
| """Run the caller-supplied fallback, catching all exceptions.""" | |
| try: | |
| return fallback_fn(context_data) | |
| except Exception: | |
| return self._no_data_message() | |
| def _no_data_message(self) -> str: | |
| return ( | |
| f"No data available for '{self.use_case_id}'. " | |
| "Run the pipeline to generate intelligence." | |
| ) | |
| # ------------------------------------------------------------------ | |
| # Persistence | |
| # ------------------------------------------------------------------ | |
| def _ensure_summaries_table(self) -> None: | |
| try: | |
| with self._connection() as conn: | |
| conn.execute(""" | |
| CREATE TABLE IF NOT EXISTS summaries ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| use_case_id TEXT NOT NULL, | |
| content TEXT NOT NULL, | |
| source TEXT NOT NULL, | |
| created_at TEXT DEFAULT CURRENT_TIMESTAMP | |
| ) | |
| """) | |
| conn.execute(""" | |
| CREATE INDEX IF NOT EXISTS idx_summaries_use_case | |
| ON summaries(use_case_id, created_at) | |
| """) | |
| except Exception: | |
| pass # DB may not exist yet; first pipeline run creates it | |
| def _persist(self, content: str, source: str) -> None: | |
| """Write the summary to SQLite. Failures are silent.""" | |
| try: | |
| with self._connection() as conn: | |
| conn.execute( | |
| "INSERT INTO summaries (use_case_id, content, source) VALUES (?, ?, ?)", | |
| (self.use_case_id, content, source), | |
| ) | |
| except Exception: | |
| pass | |
| def _connection(self): | |
| """Return a WAL-mode SQLite connection as a context manager.""" | |
| import contextlib | |
| def _ctx(): | |
| conn = sqlite3.connect(self.db_path, timeout=10) | |
| conn.execute("PRAGMA journal_mode=WAL") | |
| conn.row_factory = sqlite3.Row | |
| try: | |
| yield conn | |
| conn.commit() | |
| finally: | |
| conn.close() | |
| return _ctx() | |
| # ββ Module-level helper βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _BRIEF_SYSTEM = """\ | |
| You are the **Lead Strategy Consultant for the Fendt PESTEL-EL Sentinel**, \ | |
| advising the AGCO/Fendt board on macro-environmental disruption. Your briefs \ | |
| are the sole decision-support document the board will read before allocating \ | |
| resources. Every word must earn its place. | |
| PRODUCT CONSTRAINTS β NEVER VIOLATE: | |
| - FendtONE is a precision/digital platform, NOT a safety feature. | |
| - The Momentum Planter is NOT sold in the EME region. Do NOT suggest it for \ | |
| EU-based recommendations under any circumstances. | |
| STRICT FORMAT β use this exact Markdown structure: | |
| # Strategic Intelligence Brief | |
| **Classification:** INTERNAL β C-SUITE ONLY | |
| **Generated:** {date} | |
| **Signal Count:** {count} | |
| --- | |
| ## Purpose and Background | |
| One paragraph (3 sentences maximum) stating: | |
| - What this brief covers and why it was produced | |
| - The time horizon of the intelligence window | |
| - Why the board must read this now (the "so what?" for Fendt specifically) | |
| --- | |
| ## Key Findings | |
| For each of the top 3β5 signals, create a subsection: | |
| ### Finding N: [Signal Title] | |
| - **PESTEL Dimension:** [dimension] | |
| - **Disruption Score:** [score] | |
| - **Time Horizon:** [12M / 24M / 36M based on velocity] | |
| - **Source:** [Source Name](URL) β use the Source URL field from the signal data | |
| **Evidence:** Quote or paraphrase the key intelligence from the signal content. \ | |
| Maximum 3 sentences. | |
| **Fendt Impact:** Explicitly state how this signal affects Fendt's business \ | |
| (revenue, product roadmap, regulatory exposure, or competitive position). \ | |
| Maximum 3 sentences. Do not describe industry-wide effects only. | |
| --- | |
| ## Strategic Implications | |
| 3β5 bullet points synthesising cross-cutting themes. Connect the dots between \ | |
| findings. Identify convergence patterns (e.g., regulatory + economic pressure). \ | |
| Each bullet maximum 3 sentences. | |
| --- | |
| ## Recommended Actions | |
| Numbered list of 5β7 concrete, actionable steps. Each must include: | |
| - **Timeline** (Immediate / Q2 2026 / Q3 2026 etc.) | |
| - **Owner** (Board, R&D, Policy, Finance, etc.) | |
| - **Action** (specific verb + object) | |
| Do NOT force "Sales Opp," "Competitive Angle," or "Marketing Action" sections \ | |
| if they are irrelevant to the signals. Omit or merge them as necessary. | |
| --- | |
| ## Value Proposition and Impact | |
| | Action | Investment Signal | Risk if Ignored | | |
| |--------|-------------------|-----------------| | |
| | (row per recommendation) | cost/effort hint | consequence | | |
| --- | |
| ## Conclusion | |
| Maximum 3 sentences reinforcing the strategic inflection point. End with a \ | |
| specific next step and deadline. | |
| --- | |
| *Generated by Fendt PESTEL-EL Sentinel β AI Strategic Writer* | |
| RULES: | |
| - Be decisive, not hedging. Use "must" not "could consider". | |
| - Cite specific signal titles and scores from the provided data. | |
| - Each section must be a maximum of 3 sentences. | |
| - Every market signal must explicitly state how it affects Fendt's business \ | |
| specifically, not just the industry at large. Include technical accuracy. | |
| - When citing a signal source, use the format [Source Name](URL) using the \ | |
| Source URL field provided in the signal data. | |
| - Keep the total brief under 800 words. | |
| - Use **bold** for key numbers, dates, and risk levels. | |
| - If the provided context does not contain enough specific evidence, state: \ | |
| "Current intelligence does not contain sufficient data on this topic." \ | |
| Do NOT hallucinate. | |
| """ | |
| _SOURCE_PRIORITY: dict[str, int] = { | |
| "regulatory": 0, | |
| "competitor_press": 1, | |
| "trade_news": 2, | |
| "financial": 3, | |
| "academic": 4, | |
| } | |
| def _get_source_type(source_url: str) -> str: | |
| try: | |
| from core.sources import PESTEL_SOURCES | |
| for src in PESTEL_SOURCES: | |
| if src["url"] in source_url or source_url in src["url"]: | |
| return src.get("source_type", "trade_news") | |
| except Exception: | |
| pass | |
| return "trade_news" | |
| def generate_brief_markdown(signals: List[Any]) -> str: | |
| """ | |
| Generate a full strategic intelligence brief in Markdown format. | |
| Parameters | |
| ---------- | |
| signals: | |
| List of Signal objects (or dicts with title/content/pestel_dimension/ | |
| disruption_score keys). Typically the top 10 by disruption score. | |
| Returns | |
| ------- | |
| Markdown string. Falls back to a rule-based brief if the LLM is unavailable. | |
| """ | |
| hf_token = os.getenv("HUGGINGFACEHUB_API_TOKEN", "") | |
| # Build signal digest regardless of LLM availability | |
| signal_data: List[dict] = [] | |
| for s in signals: | |
| if hasattr(s, "title"): | |
| title = s.title | |
| content = s.content | |
| dim = s.pestel_dimension.value if hasattr(s.pestel_dimension, "value") else str(s.pestel_dimension) | |
| score = getattr(s, "disruption_score", 0.0) | |
| velocity = getattr(s, "velocity_score", 0.0) | |
| impact = getattr(s, "impact_score", 0.0) | |
| novelty = getattr(s, "novelty_score", 0.0) | |
| entities = getattr(s, "entities", []) | |
| themes = getattr(s, "themes", []) | |
| source_url = getattr(s, "source_url", "") | |
| else: | |
| title = s.get("title", "Untitled") | |
| content = s.get("content", "") | |
| dim = s.get("pestel_dimension", "UNKNOWN") | |
| score = s.get("disruption_score", 0.0) | |
| velocity = s.get("velocity_score", 0.0) | |
| impact = s.get("impact_score", 0.0) | |
| novelty = s.get("novelty_score", 0.0) | |
| entities = s.get("entities", []) | |
| themes = s.get("themes", []) | |
| source_url = s.get("source_url", "") | |
| signal_data.append({ | |
| "title": title, "content": content, "dim": dim, | |
| "score": score, "velocity": velocity, "impact": impact, | |
| "novelty": novelty, "entities": entities, "themes": themes, | |
| "source_url": source_url, | |
| }) | |
| # Prioritise regulatory and competitor-press signals first in the digest | |
| signal_data.sort( | |
| key=lambda sd: _SOURCE_PRIORITY.get(_get_source_type(sd["source_url"]), 99) | |
| ) | |
| # Detect EU/EME context for regional constraint injection | |
| _eu_keywords = {"EU", "EUROPE", "EME", "EUROPEAN UNION", "EUROPEAN"} | |
| eu_signals_present = any( | |
| any(kw in tok for tok in ( | |
| " ".join(sd["entities"]).upper().split() + sd["content"].upper().split() | |
| ) for kw in _eu_keywords) | |
| for sd in signal_data | |
| ) | |
| # Build a rich digest for the LLM | |
| digest_lines: List[str] = [] | |
| for i, sd in enumerate(signal_data, 1): | |
| horizon = "12M β CRITICAL" if sd["velocity"] >= 0.7 else "24M β HIGH" if sd["velocity"] >= 0.4 else "36M β MONITOR" | |
| ents = ", ".join(sd["entities"][:5]) if sd["entities"] else "N/A" | |
| digest_lines.append( | |
| f"Signal {i}:\n" | |
| f" Title: {sd['title']}\n" | |
| f" Dimension: {sd['dim']}\n" | |
| f" Disruption Score: {sd['score']:.2f} (impact={sd['impact']:.2f}, novelty={sd['novelty']:.2f}, velocity={sd['velocity']:.2f})\n" | |
| f" Time Horizon: {horizon}\n" | |
| f" Key Entities: {ents}\n" | |
| f" Source URL: {sd['source_url']}\n" | |
| f" Content: {sd['content'][:300]}\n" | |
| ) | |
| digest = "\n".join(digest_lines) | |
| ts = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC") | |
| count = len(signal_data) | |
| if not hf_token: | |
| return _rule_based_brief(signal_data, ts) | |
| try: | |
| from huggingface_hub import InferenceClient | |
| system = _BRIEF_SYSTEM.replace("{date}", ts).replace("{count}", str(count)) | |
| region_constraint = ( | |
| "\n\n\u26a0\ufe0f REGIONAL CONSTRAINT: This brief contains EU/EME signals. " | |
| "The Fendt Momentum Planter is NOT available in EU/EME markets. " | |
| "Do NOT recommend it for EU/EME deployment under any circumstances." | |
| if eu_signals_present else "" | |
| ) | |
| user_prompt = ( | |
| f"Intelligence Window: {ts}\n" | |
| f"Total Signals Analysed: {count}\n\n" | |
| f"SIGNAL DATA (ranked by source priority and disruption score):\n\n{digest}\n\n" | |
| "Write the full strategic intelligence brief now using the exact " | |
| "format specified in your system instructions. Focus on the top 3β5 " | |
| f"most disruptive signals. Be board-ready.{region_constraint}" | |
| ) | |
| client = InferenceClient( | |
| api_key=hf_token, | |
| provider=os.getenv("HF_PROVIDER", "auto"), | |
| ) | |
| response = client.chat_completion( | |
| model="Qwen/Qwen2.5-7B-Instruct", | |
| messages=[ | |
| {"role": "system", "content": system}, | |
| {"role": "user", "content": user_prompt}, | |
| ], | |
| max_tokens=4096, | |
| temperature=0.35, | |
| ) | |
| if not response or not getattr(response, "choices", None): | |
| return _rule_based_brief(signal_data, ts) | |
| text = response.choices[0].message.content.strip() | |
| if not text: | |
| return _rule_based_brief(signal_data, ts) | |
| if "## Conclusion" not in text: | |
| text += ( | |
| "\n\n> β οΈ **Notice:** This brief was truncated before the Conclusion " | |
| "section was generated. Re-run the brief or increase the output token budget." | |
| ) | |
| return text | |
| except Exception: | |
| return _rule_based_brief(signal_data, ts) | |
| def _rule_based_brief(signal_data: List[dict], ts: str) -> str: | |
| """Deterministic fallback brief when the LLM is unavailable.""" | |
| count = len(signal_data) | |
| # Dimension counts | |
| dims: dict[str, int] = {} | |
| for sd in signal_data: | |
| dims[sd["dim"]] = dims.get(sd["dim"], 0) + 1 | |
| dim_summary = ", ".join(f"**{k}**: {v}" for k, v in sorted(dims.items(), key=lambda x: x[1], reverse=True)) | |
| # Severity breakdown | |
| critical = [sd for sd in signal_data if sd["score"] >= 0.75] | |
| high = [sd for sd in signal_data if 0.50 <= sd["score"] < 0.75] | |
| # Top signals as findings | |
| findings: List[str] = [] | |
| for i, sd in enumerate(signal_data[:5], 1): | |
| horizon = "12M β CRITICAL" if sd["velocity"] >= 0.7 else "24M β HIGH" if sd["velocity"] >= 0.4 else "36M β MONITOR" | |
| findings.append( | |
| f"### Finding {i}: {sd['title']}\n" | |
| f"- **PESTEL Dimension:** {sd['dim']}\n" | |
| f"- **Disruption Score:** {sd['score']:.2f}\n" | |
| f"- **Time Horizon:** {horizon}\n\n" | |
| f"**Evidence:** {sd['content'][:250]}\n\n" | |
| f"**Industry Implication:** Signals in the {sd['dim']} dimension at this " | |
| f"disruption level ({sd['score']:.2f}) indicate structural market shifts " | |
| f"requiring strategic response within the {horizon.split(' β ')[0]} window.\n" | |
| ) | |
| findings_str = "\n---\n\n".join(findings) | |
| # Actions table | |
| table_rows: List[str] = [] | |
| for i, sd in enumerate(signal_data[:5], 1): | |
| risk = "Market exclusion" if sd["score"] >= 0.75 else "Competitive lag" if sd["score"] >= 0.50 else "Missed opportunity" | |
| table_rows.append(f"| Address {sd['dim']} signal #{i} | Resource reallocation | {risk} |") | |
| table_str = "\n".join(table_rows) | |
| return f"""# Strategic Intelligence Brief | |
| **Classification:** INTERNAL β C-SUITE ONLY | |
| **Generated:** {ts} | |
| **Signal Count:** {count} | |
| --- | |
| ## Purpose and Background | |
| This brief synthesises the **{count}** highest-disruption signals currently tracked across the macro-environmental landscape by the PESTEL-EL Sentinel. The intelligence window covers the **preceding 30 days** of automated signal ingestion from regulatory databases, patent filings, market reports, and industry news. Of the signals analysed, **{len(critical)}** are rated CRITICAL (score β₯ 0.75) and **{len(high)}** are HIGH (0.50β0.74), indicating a concentrated period of strategic disruption requiring board-level attention. | |
| --- | |
| ## Key Findings | |
| {findings_str} | |
| --- | |
| ## Strategic Implications | |
| - **Regulatory convergence:** {dims.get('LEGAL', 0) + dims.get('POLITICAL', 0)} of {count} signals originate from Legal/Political dimensions, suggesting an accelerating regulatory environment that will constrain product development timelines. | |
| - **Technology velocity:** {dims.get('TECHNOLOGICAL', 0)} signals in the Technological dimension indicate rapid innovation cycles β competitors who move first will capture market share. | |
| - **Economic headwinds:** {dims.get('ECONOMIC', 0)} signals highlight macroeconomic pressures on customer purchasing power and subsidy structures. | |
| - **Cross-dimensional risk:** The convergence of regulatory, economic, and technological signals creates a **strategic inflection point** β addressing any single dimension in isolation will be insufficient. | |
| - **Dimension distribution:** {dim_summary}. | |
| --- | |
| ## Recommended Actions | |
| 1. **Immediate β Board:** Convene emergency strategy review to address the top 3 CRITICAL signals. | |
| 2. **Immediate β R&D:** Establish cross-functional task force for the highest-velocity technological signal. | |
| 3. **Q2 2026 β Policy:** Engage EU regulatory affairs team on all Legal/Political signals exceeding score 0.60. | |
| 4. **Q2 2026 β Finance:** Model revenue impact scenarios for Economic dimension signals. | |
| 5. **Q3 2026 β Strategy:** Develop competitive response playbook for each Technological signal. | |
| 6. **Ongoing β Intelligence:** Increase monitoring frequency to daily for all CRITICAL-rated signals. | |
| --- | |
| ## Value Proposition and Impact | |
| | Action | Investment Signal | Risk if Ignored | | |
| |--------|-------------------|-----------------| | |
| {table_str} | |
| --- | |
| ## Conclusion | |
| The current intelligence landscape reveals a **concentrated period of strategic disruption** across {len(dims)} PESTEL dimensions. With **{len(critical)} CRITICAL** and **{len(high)} HIGH** signals active simultaneously, the cost of inaction far exceeds the cost of response. Failure to act within the next **90 days** risks permanent competitive disadvantage in the EU's largest agricultural markets. | |
| **Immediate Next Step:** Schedule C-suite portfolio review with CEO and CFO by end of current week. | |
| --- | |
| *Generated by Fendt PESTEL-EL Sentinel β rule-based fallback (LLM unavailable)* | |
| """ | |