AGCO / core /summary_engine.py
ritvikv03
fix: switch to Qwen2.5-7B-Instruct with provider=auto β€” confirmed available on HF
e3d8c22
"""
SummaryEngine β€” Use-case agnostic AI summary generator with persistence.
========================================================================
Generates executive summaries using HuggingFace with a rule-based fallback.
Results are persisted to SQLite so session restarts don't lose summaries.
Usage (from any module)::
from core.summary_engine import SummaryEngine
engine = SummaryEngine(use_case_id="fendt-pestel")
text, source = engine.generate(
context_data={"total": 42, "critical": 3},
prompt_template="Summarize {total} signals, {critical} are CRITICAL.",
fallback_fn=my_rule_based_fn,
)
# source is "ai" or "rule_based"
# Retrieve the most recently persisted summary without calling the API:
cached = engine.get_latest()
"""
from __future__ import annotations
import os
import sqlite3
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Tuple
class SummaryEngine:
"""
AI summary generator with automatic fallback and SQLite persistence.
Parameters
----------
use_case_id:
Stable identifier for the caller (e.g. "fendt-pestel").
Used as a namespace key in the summaries table.
db_path:
Path to the SQLite database. Defaults to the shared summaries.db
inside data/ so no extra file is created.
hf_token:
Read from ``HUGGINGFACEHUB_API_TOKEN`` env var automatically.
If unset or quota is exhausted, falls back to ``fallback_fn``.
"""
def __init__(
self,
use_case_id: str,
db_path: Optional[str] = None,
hf_token: Optional[str] = None,
) -> None:
self.use_case_id = use_case_id
self._quota_hit = False
if db_path is None:
db_path = str(
Path(__file__).parent.parent / "data" / "summaries.db"
)
self.db_path = db_path
self._ensure_summaries_table()
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
def generate(
self,
context_data: Dict[str, Any],
prompt_template: str,
fallback_fn: Callable[[Dict[str, Any]], str],
max_tokens: int = 300,
persist: bool = True,
) -> Tuple[str, str]:
"""
Generate a summary using AI with automatic fallback.
Parameters
----------
context_data:
Arbitrary dict. Values are substituted into ``prompt_template``
via ``str.format_map`` and passed to ``fallback_fn``.
prompt_template:
String with ``{key}`` placeholders matching keys in
``context_data``.
fallback_fn:
``callable(context_data) -> str`` used when HuggingFace is
unavailable or quota is exhausted.
max_tokens:
Max output tokens for the HuggingFace call.
persist:
If ``True``, save the result to SQLite (default).
Returns
-------
(summary_text, source)
``source`` is ``"ai"`` or ``"rule_based"``.
"""
# Guard: empty / None data must never crash here
if not context_data:
return self._no_data_message(), "rule_based"
ai_text = self._call_llm(prompt_template, context_data, max_tokens)
if ai_text:
result, source = ai_text, "ai"
else:
result, source = self._safe_fallback(fallback_fn, context_data), "rule_based"
if persist:
self._persist(result, source)
return result, source
def get_latest(self) -> Optional[str]:
"""Return the most recently persisted summary for this use case."""
try:
with self._connection() as conn:
row = conn.execute(
"SELECT content FROM summaries "
"WHERE use_case_id = ? "
"ORDER BY created_at DESC LIMIT 1",
(self.use_case_id,),
).fetchone()
return row[0] if row else None
except Exception:
return None
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
def _call_llm(
self,
prompt_template: str,
context_data: Dict[str, Any],
max_tokens: int,
) -> Optional[str]:
"""Try HuggingFace InferenceClient chat_completion. Returns text or None on failure."""
if self._quota_hit:
return None
hf_token = os.getenv("HUGGINGFACEHUB_API_TOKEN", "")
if not hf_token:
return None
try:
from huggingface_hub import InferenceClient
prompt = prompt_template.format_map(context_data)
client = InferenceClient(api_key=hf_token, provider="auto")
response = client.chat_completion(
model="Qwen/Qwen2.5-7B-Instruct",
messages=[{"role": "user", "content": prompt}],
max_tokens=max_tokens,
temperature=0.3,
)
result = response.choices[0].message.content.strip()
return result or None
except Exception as exc:
err = str(exc)
if "429" in err or "quota" in err.lower() or "rate" in err.lower():
self._quota_hit = True
return None
def _safe_fallback(
self,
fallback_fn: Callable[[Dict[str, Any]], str],
context_data: Dict[str, Any],
) -> str:
"""Run the caller-supplied fallback, catching all exceptions."""
try:
return fallback_fn(context_data)
except Exception:
return self._no_data_message()
def _no_data_message(self) -> str:
return (
f"No data available for '{self.use_case_id}'. "
"Run the pipeline to generate intelligence."
)
# ------------------------------------------------------------------
# Persistence
# ------------------------------------------------------------------
def _ensure_summaries_table(self) -> None:
try:
with self._connection() as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS summaries (
id INTEGER PRIMARY KEY AUTOINCREMENT,
use_case_id TEXT NOT NULL,
content TEXT NOT NULL,
source TEXT NOT NULL,
created_at TEXT DEFAULT CURRENT_TIMESTAMP
)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_summaries_use_case
ON summaries(use_case_id, created_at)
""")
except Exception:
pass # DB may not exist yet; first pipeline run creates it
def _persist(self, content: str, source: str) -> None:
"""Write the summary to SQLite. Failures are silent."""
try:
with self._connection() as conn:
conn.execute(
"INSERT INTO summaries (use_case_id, content, source) VALUES (?, ?, ?)",
(self.use_case_id, content, source),
)
except Exception:
pass
def _connection(self):
"""Return a WAL-mode SQLite connection as a context manager."""
import contextlib
@contextlib.contextmanager
def _ctx():
conn = sqlite3.connect(self.db_path, timeout=10)
conn.execute("PRAGMA journal_mode=WAL")
conn.row_factory = sqlite3.Row
try:
yield conn
conn.commit()
finally:
conn.close()
return _ctx()
# ── Module-level helper ───────────────────────────────────────────────────────
_BRIEF_SYSTEM = """\
You are the **Lead Strategy Consultant for the Fendt PESTEL-EL Sentinel**, \
advising the AGCO/Fendt board on macro-environmental disruption. Your briefs \
are the sole decision-support document the board will read before allocating \
resources. Every word must earn its place.
PRODUCT CONSTRAINTS β€” NEVER VIOLATE:
- FendtONE is a precision/digital platform, NOT a safety feature.
- The Momentum Planter is NOT sold in the EME region. Do NOT suggest it for \
EU-based recommendations under any circumstances.
STRICT FORMAT β€” use this exact Markdown structure:
# Strategic Intelligence Brief
**Classification:** INTERNAL β€” C-SUITE ONLY
**Generated:** {date}
**Signal Count:** {count}
---
## Purpose and Background
One paragraph (3 sentences maximum) stating:
- What this brief covers and why it was produced
- The time horizon of the intelligence window
- Why the board must read this now (the "so what?" for Fendt specifically)
---
## Key Findings
For each of the top 3–5 signals, create a subsection:
### Finding N: [Signal Title]
- **PESTEL Dimension:** [dimension]
- **Disruption Score:** [score]
- **Time Horizon:** [12M / 24M / 36M based on velocity]
- **Source:** [Source Name](URL) β€” use the Source URL field from the signal data
**Evidence:** Quote or paraphrase the key intelligence from the signal content. \
Maximum 3 sentences.
**Fendt Impact:** Explicitly state how this signal affects Fendt's business \
(revenue, product roadmap, regulatory exposure, or competitive position). \
Maximum 3 sentences. Do not describe industry-wide effects only.
---
## Strategic Implications
3–5 bullet points synthesising cross-cutting themes. Connect the dots between \
findings. Identify convergence patterns (e.g., regulatory + economic pressure). \
Each bullet maximum 3 sentences.
---
## Recommended Actions
Numbered list of 5–7 concrete, actionable steps. Each must include:
- **Timeline** (Immediate / Q2 2026 / Q3 2026 etc.)
- **Owner** (Board, R&D, Policy, Finance, etc.)
- **Action** (specific verb + object)
Do NOT force "Sales Opp," "Competitive Angle," or "Marketing Action" sections \
if they are irrelevant to the signals. Omit or merge them as necessary.
---
## Value Proposition and Impact
| Action | Investment Signal | Risk if Ignored |
|--------|-------------------|-----------------|
| (row per recommendation) | cost/effort hint | consequence |
---
## Conclusion
Maximum 3 sentences reinforcing the strategic inflection point. End with a \
specific next step and deadline.
---
*Generated by Fendt PESTEL-EL Sentinel β€” AI Strategic Writer*
RULES:
- Be decisive, not hedging. Use "must" not "could consider".
- Cite specific signal titles and scores from the provided data.
- Each section must be a maximum of 3 sentences.
- Every market signal must explicitly state how it affects Fendt's business \
specifically, not just the industry at large. Include technical accuracy.
- When citing a signal source, use the format [Source Name](URL) using the \
Source URL field provided in the signal data.
- Keep the total brief under 800 words.
- Use **bold** for key numbers, dates, and risk levels.
- If the provided context does not contain enough specific evidence, state: \
"Current intelligence does not contain sufficient data on this topic." \
Do NOT hallucinate.
"""
_SOURCE_PRIORITY: dict[str, int] = {
"regulatory": 0,
"competitor_press": 1,
"trade_news": 2,
"financial": 3,
"academic": 4,
}
def _get_source_type(source_url: str) -> str:
try:
from core.sources import PESTEL_SOURCES
for src in PESTEL_SOURCES:
if src["url"] in source_url or source_url in src["url"]:
return src.get("source_type", "trade_news")
except Exception:
pass
return "trade_news"
def generate_brief_markdown(signals: List[Any]) -> str:
"""
Generate a full strategic intelligence brief in Markdown format.
Parameters
----------
signals:
List of Signal objects (or dicts with title/content/pestel_dimension/
disruption_score keys). Typically the top 10 by disruption score.
Returns
-------
Markdown string. Falls back to a rule-based brief if the LLM is unavailable.
"""
hf_token = os.getenv("HUGGINGFACEHUB_API_TOKEN", "")
# Build signal digest regardless of LLM availability
signal_data: List[dict] = []
for s in signals:
if hasattr(s, "title"):
title = s.title
content = s.content
dim = s.pestel_dimension.value if hasattr(s.pestel_dimension, "value") else str(s.pestel_dimension)
score = getattr(s, "disruption_score", 0.0)
velocity = getattr(s, "velocity_score", 0.0)
impact = getattr(s, "impact_score", 0.0)
novelty = getattr(s, "novelty_score", 0.0)
entities = getattr(s, "entities", [])
themes = getattr(s, "themes", [])
source_url = getattr(s, "source_url", "")
else:
title = s.get("title", "Untitled")
content = s.get("content", "")
dim = s.get("pestel_dimension", "UNKNOWN")
score = s.get("disruption_score", 0.0)
velocity = s.get("velocity_score", 0.0)
impact = s.get("impact_score", 0.0)
novelty = s.get("novelty_score", 0.0)
entities = s.get("entities", [])
themes = s.get("themes", [])
source_url = s.get("source_url", "")
signal_data.append({
"title": title, "content": content, "dim": dim,
"score": score, "velocity": velocity, "impact": impact,
"novelty": novelty, "entities": entities, "themes": themes,
"source_url": source_url,
})
# Prioritise regulatory and competitor-press signals first in the digest
signal_data.sort(
key=lambda sd: _SOURCE_PRIORITY.get(_get_source_type(sd["source_url"]), 99)
)
# Detect EU/EME context for regional constraint injection
_eu_keywords = {"EU", "EUROPE", "EME", "EUROPEAN UNION", "EUROPEAN"}
eu_signals_present = any(
any(kw in tok for tok in (
" ".join(sd["entities"]).upper().split() + sd["content"].upper().split()
) for kw in _eu_keywords)
for sd in signal_data
)
# Build a rich digest for the LLM
digest_lines: List[str] = []
for i, sd in enumerate(signal_data, 1):
horizon = "12M β€” CRITICAL" if sd["velocity"] >= 0.7 else "24M β€” HIGH" if sd["velocity"] >= 0.4 else "36M β€” MONITOR"
ents = ", ".join(sd["entities"][:5]) if sd["entities"] else "N/A"
digest_lines.append(
f"Signal {i}:\n"
f" Title: {sd['title']}\n"
f" Dimension: {sd['dim']}\n"
f" Disruption Score: {sd['score']:.2f} (impact={sd['impact']:.2f}, novelty={sd['novelty']:.2f}, velocity={sd['velocity']:.2f})\n"
f" Time Horizon: {horizon}\n"
f" Key Entities: {ents}\n"
f" Source URL: {sd['source_url']}\n"
f" Content: {sd['content'][:300]}\n"
)
digest = "\n".join(digest_lines)
ts = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC")
count = len(signal_data)
if not hf_token:
return _rule_based_brief(signal_data, ts)
try:
from huggingface_hub import InferenceClient
system = _BRIEF_SYSTEM.replace("{date}", ts).replace("{count}", str(count))
region_constraint = (
"\n\n\u26a0\ufe0f REGIONAL CONSTRAINT: This brief contains EU/EME signals. "
"The Fendt Momentum Planter is NOT available in EU/EME markets. "
"Do NOT recommend it for EU/EME deployment under any circumstances."
if eu_signals_present else ""
)
user_prompt = (
f"Intelligence Window: {ts}\n"
f"Total Signals Analysed: {count}\n\n"
f"SIGNAL DATA (ranked by source priority and disruption score):\n\n{digest}\n\n"
"Write the full strategic intelligence brief now using the exact "
"format specified in your system instructions. Focus on the top 3–5 "
f"most disruptive signals. Be board-ready.{region_constraint}"
)
client = InferenceClient(
api_key=hf_token,
provider=os.getenv("HF_PROVIDER", "auto"),
)
response = client.chat_completion(
model="Qwen/Qwen2.5-7B-Instruct",
messages=[
{"role": "system", "content": system},
{"role": "user", "content": user_prompt},
],
max_tokens=4096,
temperature=0.35,
)
if not response or not getattr(response, "choices", None):
return _rule_based_brief(signal_data, ts)
text = response.choices[0].message.content.strip()
if not text:
return _rule_based_brief(signal_data, ts)
if "## Conclusion" not in text:
text += (
"\n\n> ⚠️ **Notice:** This brief was truncated before the Conclusion "
"section was generated. Re-run the brief or increase the output token budget."
)
return text
except Exception:
return _rule_based_brief(signal_data, ts)
def _rule_based_brief(signal_data: List[dict], ts: str) -> str:
"""Deterministic fallback brief when the LLM is unavailable."""
count = len(signal_data)
# Dimension counts
dims: dict[str, int] = {}
for sd in signal_data:
dims[sd["dim"]] = dims.get(sd["dim"], 0) + 1
dim_summary = ", ".join(f"**{k}**: {v}" for k, v in sorted(dims.items(), key=lambda x: x[1], reverse=True))
# Severity breakdown
critical = [sd for sd in signal_data if sd["score"] >= 0.75]
high = [sd for sd in signal_data if 0.50 <= sd["score"] < 0.75]
# Top signals as findings
findings: List[str] = []
for i, sd in enumerate(signal_data[:5], 1):
horizon = "12M β€” CRITICAL" if sd["velocity"] >= 0.7 else "24M β€” HIGH" if sd["velocity"] >= 0.4 else "36M β€” MONITOR"
findings.append(
f"### Finding {i}: {sd['title']}\n"
f"- **PESTEL Dimension:** {sd['dim']}\n"
f"- **Disruption Score:** {sd['score']:.2f}\n"
f"- **Time Horizon:** {horizon}\n\n"
f"**Evidence:** {sd['content'][:250]}\n\n"
f"**Industry Implication:** Signals in the {sd['dim']} dimension at this "
f"disruption level ({sd['score']:.2f}) indicate structural market shifts "
f"requiring strategic response within the {horizon.split(' β€” ')[0]} window.\n"
)
findings_str = "\n---\n\n".join(findings)
# Actions table
table_rows: List[str] = []
for i, sd in enumerate(signal_data[:5], 1):
risk = "Market exclusion" if sd["score"] >= 0.75 else "Competitive lag" if sd["score"] >= 0.50 else "Missed opportunity"
table_rows.append(f"| Address {sd['dim']} signal #{i} | Resource reallocation | {risk} |")
table_str = "\n".join(table_rows)
return f"""# Strategic Intelligence Brief
**Classification:** INTERNAL β€” C-SUITE ONLY
**Generated:** {ts}
**Signal Count:** {count}
---
## Purpose and Background
This brief synthesises the **{count}** highest-disruption signals currently tracked across the macro-environmental landscape by the PESTEL-EL Sentinel. The intelligence window covers the **preceding 30 days** of automated signal ingestion from regulatory databases, patent filings, market reports, and industry news. Of the signals analysed, **{len(critical)}** are rated CRITICAL (score β‰₯ 0.75) and **{len(high)}** are HIGH (0.50–0.74), indicating a concentrated period of strategic disruption requiring board-level attention.
---
## Key Findings
{findings_str}
---
## Strategic Implications
- **Regulatory convergence:** {dims.get('LEGAL', 0) + dims.get('POLITICAL', 0)} of {count} signals originate from Legal/Political dimensions, suggesting an accelerating regulatory environment that will constrain product development timelines.
- **Technology velocity:** {dims.get('TECHNOLOGICAL', 0)} signals in the Technological dimension indicate rapid innovation cycles β€” competitors who move first will capture market share.
- **Economic headwinds:** {dims.get('ECONOMIC', 0)} signals highlight macroeconomic pressures on customer purchasing power and subsidy structures.
- **Cross-dimensional risk:** The convergence of regulatory, economic, and technological signals creates a **strategic inflection point** β€” addressing any single dimension in isolation will be insufficient.
- **Dimension distribution:** {dim_summary}.
---
## Recommended Actions
1. **Immediate β€” Board:** Convene emergency strategy review to address the top 3 CRITICAL signals.
2. **Immediate β€” R&D:** Establish cross-functional task force for the highest-velocity technological signal.
3. **Q2 2026 β€” Policy:** Engage EU regulatory affairs team on all Legal/Political signals exceeding score 0.60.
4. **Q2 2026 β€” Finance:** Model revenue impact scenarios for Economic dimension signals.
5. **Q3 2026 β€” Strategy:** Develop competitive response playbook for each Technological signal.
6. **Ongoing β€” Intelligence:** Increase monitoring frequency to daily for all CRITICAL-rated signals.
---
## Value Proposition and Impact
| Action | Investment Signal | Risk if Ignored |
|--------|-------------------|-----------------|
{table_str}
---
## Conclusion
The current intelligence landscape reveals a **concentrated period of strategic disruption** across {len(dims)} PESTEL dimensions. With **{len(critical)} CRITICAL** and **{len(high)} HIGH** signals active simultaneously, the cost of inaction far exceeds the cost of response. Failure to act within the next **90 days** risks permanent competitive disadvantage in the EU's largest agricultural markets.
**Immediate Next Step:** Schedule C-suite portfolio review with CEO and CFO by end of current week.
---
*Generated by Fendt PESTEL-EL Sentinel β€” rule-based fallback (LLM unavailable)*
"""