sofhiaazzhr commited on
Commit
78c598c
Β·
1 Parent(s): 6b3637d

[KM-624] Implement Compute Functions Tools

Browse files
.gitignore CHANGED
@@ -48,4 +48,6 @@ software/
48
 
49
  tests/
50
  .claude/
51
- migratego/
 
 
 
48
 
49
  tests/
50
  .claude/
51
+ migratego/
52
+ docs/specs/tabular_parquet_contract.md
53
+ docs/specs/tabular_parquet.md
ARCHITECTURE.md CHANGED
@@ -1,7 +1,7 @@
1
  # Architecture β€” Data Eyond Agentic Service
2
 
3
- **Last updated**: 2026-05-07
4
- **Status**: Design phase β€” folder skeleton in place, implementation in progress
5
 
6
  ---
7
 
@@ -21,7 +21,7 @@ A catalog-driven AI service for data analysis. Users upload documents and regist
21
 
22
  The architecture has two paths:
23
 
24
- - **Unstructured** (PDF, DOCX, TXT) β€” dense similarity over prose chunks (the right primitive for free-form text).
25
  - **Structured** (databases, XLSX, CSV, Parquet) β€” a per-user **data catalog** describes what tables/columns exist; an LLM produces a structured **JSON intermediate representation (IR)** of the user's intent; a deterministic compiler turns the IR into SQL or pandas operations.
26
 
27
  The LLM produces *intent*, not query syntax. Deterministic code does the rest.
@@ -120,6 +120,7 @@ Compiler and executors are pure code. No LLM in the hot path of query constructi
120
  ### Ingestion (when user uploads a file or connects a DB)
121
 
122
  ```
 
123
  source upload / DB connect
124
  ↓
125
  introspect schema (DB: information_schema; tabular: file headers + sample rows)
@@ -129,7 +130,7 @@ validate (Pydantic)
129
  write to catalog store (Postgres jsonb in `data_catalog`, keyed by user_id)
130
  ```
131
 
132
- For unstructured files: chunk + embed β†’ PGVector.
133
 
134
  ### Query (per user message)
135
 
@@ -143,7 +144,7 @@ Load chat history
143
  IntentRouter LLM β†’ needs_search? source_hint?
144
  ↓
145
  β”œβ”€β”€ chat β†’ ChatbotAgent β†’ SSE stream
146
- β”œβ”€β”€ unstructured β†’ DocumentRetriever β†’ answerer
147
  └── structured β†’
148
  CatalogReader (load full Cs βˆͺ Ct for user)
149
  ↓
 
1
  # Architecture β€” Data Eyond Agentic Service
2
 
3
+ **Last updated**: 2026-05-20
4
+ **Status**: Phase 2 catalog path shipped; document ingestion has moved to a separate Go service. The long-term split is **Python = agent/ML layer, Go = data plane**; this document covers the Python side only.
5
 
6
  ---
7
 
 
21
 
22
  The architecture has two paths:
23
 
24
+ - **Unstructured** (PDF, DOCX, TXT) β€” dense similarity over prose chunks (the right primitive for free-form text). **Ingestion is handled by a separate Go service**; this Python service reads embeddings from PGVector at query time.
25
  - **Structured** (databases, XLSX, CSV, Parquet) β€” a per-user **data catalog** describes what tables/columns exist; an LLM produces a structured **JSON intermediate representation (IR)** of the user's intent; a deterministic compiler turns the IR into SQL or pandas operations.
26
 
27
  The LLM produces *intent*, not query syntax. Deterministic code does the rest.
 
120
  ### Ingestion (when user uploads a file or connects a DB)
121
 
122
  ```
123
+ Structured sources (DB connect / XLSX / CSV / Parquet upload) β€” Python:
124
  source upload / DB connect
125
  ↓
126
  introspect schema (DB: information_schema; tabular: file headers + sample rows)
 
130
  write to catalog store (Postgres jsonb in `data_catalog`, keyed by user_id)
131
  ```
132
 
133
+ **Unstructured ingestion (PDF / DOCX / TXT) is handled by a separate Go service**, which writes chunks + embeddings into the `documents` collection in PGVector. The Python service does not own this path β€” it reads only.
134
 
135
  ### Query (per user message)
136
 
 
144
  IntentRouter LLM β†’ needs_search? source_hint?
145
  ↓
146
  β”œβ”€β”€ chat β†’ ChatbotAgent β†’ SSE stream
147
+ β”œβ”€β”€ unstructured β†’ DocumentRetriever (raw SQL: pgvector `<=>` cosine or `<+>` manhattan) β†’ answerer
148
  └── structured β†’
149
  CatalogReader (load full Cs βˆͺ Ct for user)
150
  ↓
PHASE1_TO_PHASE2_REPORT.md CHANGED
@@ -52,7 +52,7 @@ Everything else β€” IR validation, SQL/pandas compilation, execution β€” is dete
52
  | `src/agents/chatbot.py` (Phase 1) β†’ deleted, then `src/agents/answer_agent.py` (`AnswerAgent`) β†’ renamed | `src/agents/chatbot.py::ChatbotAgent` | Final answer formation; streams via `astream` |
53
  | `src/knowledge/parquet_service.py` | `src/storage/parquet.py` | Parquet upload/download helper |
54
  | `src/pipeline/document_pipeline/document_pipeline.py` (folder) | `src/pipeline/document_pipeline.py` (flat) | Single module |
55
- | `src/rag/retrievers/document.py` | `src/retrieval/document.py` | `DocumentRetriever` migrated; tabular file types filtered out of results |
56
  | `src/rag/router.py` | `src/retrieval/router.py` | `RetrievalRouter`, Redis-cached, unstructured-only; dead `db: AsyncSession` + `source_hint` params removed |
57
  | `src/rag/base.py` (`RetrievalResult`, `BaseRetriever`) | `src/retrieval/base.py` | Same dataclass + ABC |
58
 
@@ -177,7 +177,7 @@ POST /api/v1/chat/stream
177
  β”‚
178
  β”œβ”€β”€ source_hint == "unstructured"
179
  β”‚ β†’ RetrievalRouter.retrieve() [retrieval/router.py, Redis-cached]
180
- β”‚ β†’ DocumentRetriever (PGVector MMR/cosine/etc.)
181
  β”‚ β†’ ChatbotAgent.astream(chunks=...)
182
  β”‚
183
  └── source_hint == "structured"
@@ -237,7 +237,7 @@ POST /api/v1/chat/stream
237
  | Top-level chat orchestration | Inline in `api/v1/chat.py` | `agents/chat_handler.py::ChatHandler` | Extracted to a reusable module |
238
  | Final answer formation | `agents/chatbot.py` (Phase 1 LangChain) | `agents/chatbot.py::ChatbotAgent` (was `AnswerAgent` in `answer_agent.py` mid-cycle) | Rewritten + renamed |
239
  | Schema retrieval (DB / tabular) | `rag/retrievers/schema.py` + PGVector chunks | **Removed**. Replaced by catalog (`catalog/store.py` jsonb) loaded verbatim into planner prompt | Whole concept replaced |
240
- | Doc retrieval (PDF / DOCX / TXT) | `rag/retrievers/document.py`, `rag/router.py` | `retrieval/document.py`, `retrieval/router.py` | Moved; Redis cache restored; tabular files filtered |
241
  | Query writing | `query/query_executor.py` + `models/sql_query.py` (LLM writes SQL) | `query/planner/service.py` (LLM writes IR) + `query/compiler/sql.py` (deterministic) | LLM emits intent, not SQL |
242
  | DB execution | `query/executors/db_executor.py` | `query/executor/db.py::DbExecutor` | Folder renamed (`executors` β†’ `executor`); sqlglot guard + RO txn + 30 s timeout kept |
243
  | Tabular execution | `query/executors/tabular.py` | `query/executor/tabular.py::TabularExecutor` | Parquet-only; pandas compiler split out |
@@ -258,3 +258,16 @@ POST /api/v1/chat/stream
258
  **Bottom line.** Every Phase 1 file under `src/rag/`, `src/tools/`, `src/query/executors/`, `src/query/query_executor.py`, `src/query/base.py`, `src/api/v1/knowledge.py`, and `src/config/agents/` is gone. Phase 1 introspection helpers under `src/pipeline/db_pipeline/` and `src/database_client/` are still imported by Phase 2 β€” they were not rewritten, just wrapped. The three LLM call sites are now explicit and the SQL-writing one no longer exists; the planner emits a Pydantic-validated `QueryIR` instead.
259
 
260
  The one filename gotcha to remember: the **intent router** still lives at `src/agents/orchestration.py` as class `OrchestratorAgent` (Phase 1 name kept for import-site compatibility, Phase 2 body). The matching prompt and tests use the `intent_router` name, but the source module does not.
 
 
 
 
 
 
 
 
 
 
 
 
 
 
52
  | `src/agents/chatbot.py` (Phase 1) β†’ deleted, then `src/agents/answer_agent.py` (`AnswerAgent`) β†’ renamed | `src/agents/chatbot.py::ChatbotAgent` | Final answer formation; streams via `astream` |
53
  | `src/knowledge/parquet_service.py` | `src/storage/parquet.py` | Parquet upload/download helper |
54
  | `src/pipeline/document_pipeline/document_pipeline.py` (folder) | `src/pipeline/document_pipeline.py` (flat) | Single module |
55
+ | `src/rag/retrievers/document.py` | `src/retrieval/document.py` | `DocumentRetriever` migrated; tabular file types filtered out of results. **Post-report update (mentor commit 61c746f, 2026-05-20):** rewritten to raw SQL (pgvector `<=>` cosine, `<+>` manhattan only) to dodge asyncpg type-mapping issues with the Go-ingested schema. MMR / euclidean / inner_product dropped. |
56
  | `src/rag/router.py` | `src/retrieval/router.py` | `RetrievalRouter`, Redis-cached, unstructured-only; dead `db: AsyncSession` + `source_hint` params removed |
57
  | `src/rag/base.py` (`RetrievalResult`, `BaseRetriever`) | `src/retrieval/base.py` | Same dataclass + ABC |
58
 
 
177
  β”‚
178
  β”œβ”€β”€ source_hint == "unstructured"
179
  β”‚ β†’ RetrievalRouter.retrieve() [retrieval/router.py, Redis-cached]
180
+ β”‚ β†’ DocumentRetriever (raw SQL: pgvector `<=>` cosine or `<+>` manhattan)
181
  β”‚ β†’ ChatbotAgent.astream(chunks=...)
182
  β”‚
183
  └── source_hint == "structured"
 
237
  | Top-level chat orchestration | Inline in `api/v1/chat.py` | `agents/chat_handler.py::ChatHandler` | Extracted to a reusable module |
238
  | Final answer formation | `agents/chatbot.py` (Phase 1 LangChain) | `agents/chatbot.py::ChatbotAgent` (was `AnswerAgent` in `answer_agent.py` mid-cycle) | Rewritten + renamed |
239
  | Schema retrieval (DB / tabular) | `rag/retrievers/schema.py` + PGVector chunks | **Removed**. Replaced by catalog (`catalog/store.py` jsonb) loaded verbatim into planner prompt | Whole concept replaced |
240
+ | Doc retrieval (PDF / DOCX / TXT) | `rag/retrievers/document.py`, `rag/router.py` | `retrieval/document.py`, `retrieval/router.py` | Moved; Redis cache restored; tabular files filtered. **Post-report update:** rewritten to raw SQL (cosine / manhattan only); collection renamed `document_embeddings` β†’ `documents` to match the Go ingestion service. |
241
  | Query writing | `query/query_executor.py` + `models/sql_query.py` (LLM writes SQL) | `query/planner/service.py` (LLM writes IR) + `query/compiler/sql.py` (deterministic) | LLM emits intent, not SQL |
242
  | DB execution | `query/executors/db_executor.py` | `query/executor/db.py::DbExecutor` | Folder renamed (`executors` β†’ `executor`); sqlglot guard + RO txn + 30 s timeout kept |
243
  | Tabular execution | `query/executors/tabular.py` | `query/executor/tabular.py::TabularExecutor` | Parquet-only; pandas compiler split out |
 
258
  **Bottom line.** Every Phase 1 file under `src/rag/`, `src/tools/`, `src/query/executors/`, `src/query/query_executor.py`, `src/query/base.py`, `src/api/v1/knowledge.py`, and `src/config/agents/` is gone. Phase 1 introspection helpers under `src/pipeline/db_pipeline/` and `src/database_client/` are still imported by Phase 2 β€” they were not rewritten, just wrapped. The three LLM call sites are now explicit and the SQL-writing one no longer exists; the planner emits a Pydantic-validated `QueryIR` instead.
259
 
260
  The one filename gotcha to remember: the **intent router** still lives at `src/agents/orchestration.py` as class `OrchestratorAgent` (Phase 1 name kept for import-site compatibility, Phase 2 body). The matching prompt and tests use the `intent_router` name, but the source module does not.
261
+
262
+ ---
263
+
264
+ ## 5. Addendum β€” post-report changes (2026-05-20, mentor commit `61c746f`)
265
+
266
+ This report was originally written as a snapshot at Phase 2 completion. The Phase 2 architecture itself hasn't changed, but a few implementation details have shifted as the Go migration progresses. Captured here so the report stays trustworthy:
267
+
268
+ - **Doc ingestion is now a Go service.** PDF/DOCX/TXT chunking + embedding + writes into PGVector are no longer done by Python. The Python service reads only.
269
+ - **PGVector collection renamed:** `document_embeddings` β†’ `documents` (to match the Go service's writes). Touched files: `db/postgres/vector_store.py`, `retrieval/document.py`.
270
+ - **`DocumentRetriever` rewritten to raw SQL.** Uses pgvector operators directly (`<=>` cosine, `<+>` manhattan). The LangChain ORM path couldn't cope with the schema written by the Go service (asyncpg type-mapping issues β€” id String vs UUID, jsonb_path_match binding quirks). MMR / euclidean / inner_product were dropped as part of the rewrite.
271
+ - **Intent router defaults flipped.** Ambiguous topical/knowledge questions now prefer `unstructured` (was `structured`). Indonesian few-shot examples added to the prompt.
272
+ - **Cache management endpoints added:** `DELETE /api/v1/chat/cache`, `DELETE /api/v1/chat/cache/room/{id}`, `DELETE /api/v1/retrieval/cache/{user_id}`. Redis chat cache now stores `{response, sources}` (was just `response`) so cached replies repopulate `message_sources`.
273
+ - **Direction.** The long-term split is **Python = agent/ML layer, Go = data plane**. More pieces are expected to follow doc ingestion out of Python.
REPO_CONTEXT.md CHANGED
@@ -156,7 +156,7 @@ makes any LLM calls.)
156
  | `db/postgres/connection.py` | two async engines: `engine` (app) and `_pgvector_engine` (PGVector) |
157
  | `db/postgres/init_db.py` | startup: creates `vector` extension, all tables, HNSW + GIN indexes |
158
  | `db/postgres/models.py` | SQLAlchemy app tables (users, rooms, chat messages, …) |
159
- | `db/postgres/vector_store.py` | shared PGVector instance (collection `document_embeddings`) |
160
  | `db/redis/connection.py` | async Redis client |
161
  | `storage/az_blob/az_blob.py` | Azure Blob async wrapper (uploads + Parquet) |
162
  | `middlewares/{cors,logging,rate_limit}.py` | CORS allow-all (POC), structlog JSON, slowapi |
@@ -318,7 +318,7 @@ Single-table only in v1. `having`, `offset`, boolean filter trees, `distinct`, j
318
  | QueryService | βœ… | plan β†’ validate β†’ retry-on-fail (max 3) β†’ dispatch β†’ execute β†’ `QueryResult` |
319
  | `ChatbotAgent` + prompt + guardrails | βœ… | Renamed from `AnswerAgent` in Cleanup PR. Guardrails appended to `chatbot_system.md` |
320
  | `ChatHandler` (top-level chat orchestrator) | βœ… | SSE events: `intent` / `chunk` / `done` / `error` |
321
- | `DocumentRetriever` + `RetrievalRouter` (Redis-cached) | βœ… | Migrated from `src/rag/` (now deleted); MMR/cosine/euclidean/manhattan/inner_product |
322
  | `/api/v1/chat/stream` | βœ… | Rewired to `ChatHandler`; Redis cache + fast intent + history + message persistence remain in chat.py |
323
  | `/api/v1/db-clients/{id}/ingest` | βœ… | Calls only `on_db_registered`; Phase 1 dual-write removed |
324
  | `/api/v1/document/{upload,process,delete}` | βœ… | `/process` triggers `on_tabular_uploaded` for CSV/XLSX |
 
156
  | `db/postgres/connection.py` | two async engines: `engine` (app) and `_pgvector_engine` (PGVector) |
157
  | `db/postgres/init_db.py` | startup: creates `vector` extension, all tables, HNSW + GIN indexes |
158
  | `db/postgres/models.py` | SQLAlchemy app tables (users, rooms, chat messages, …) |
159
+ | `db/postgres/vector_store.py` | shared PGVector instance (collection `documents` β€” written by Go ingestion service) |
160
  | `db/redis/connection.py` | async Redis client |
161
  | `storage/az_blob/az_blob.py` | Azure Blob async wrapper (uploads + Parquet) |
162
  | `middlewares/{cors,logging,rate_limit}.py` | CORS allow-all (POC), structlog JSON, slowapi |
 
318
  | QueryService | βœ… | plan β†’ validate β†’ retry-on-fail (max 3) β†’ dispatch β†’ execute β†’ `QueryResult` |
319
  | `ChatbotAgent` + prompt + guardrails | βœ… | Renamed from `AnswerAgent` in Cleanup PR. Guardrails appended to `chatbot_system.md` |
320
  | `ChatHandler` (top-level chat orchestrator) | βœ… | SSE events: `intent` / `chunk` / `done` / `error` |
321
+ | `DocumentRetriever` + `RetrievalRouter` (Redis-cached) | βœ… | Migrated from `src/rag/` (now deleted). Mentor commit `61c746f` rewrote to raw SQL (pgvector `<=>` cosine, `<+>` manhattan) to dodge asyncpg type-mapping issues with Go-ingested schema. Methods reduced to `cosine | manhattan`. Collection: `documents`. |
322
  | `/api/v1/chat/stream` | βœ… | Rewired to `ChatHandler`; Redis cache + fast intent + history + message persistence remain in chat.py |
323
  | `/api/v1/db-clients/{id}/ingest` | βœ… | Calls only `on_db_registered`; Phase 1 dual-write removed |
324
  | `/api/v1/document/{upload,process,delete}` | βœ… | `/process` triggers `on_tabular_uploaded` for CSV/XLSX |
src/tools/__init__.py ADDED
@@ -0,0 +1,7 @@
 
 
 
 
 
 
 
 
1
+ """Analytics & utility tools (KM-608).
2
+
3
+ Each tool is a deterministic computation (no LLM, no SQL generation) invoked by
4
+ the Planner/TaskRunner. The compute layer (calculation logic) lives in per-family
5
+ submodules (e.g. `analytics`); the wrapper layer (ToolSpec + ToolOutput +
6
+ registry) is added once the Planner seam is settled.
7
+ """
src/tools/analytics/__init__.py ADDED
@@ -0,0 +1 @@
 
 
1
+ """Analytics tool family (KM-608)."""
src/tools/analytics/aggregation.py ADDED
@@ -0,0 +1,91 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """analyze_aggregate β€” group-by aggregation (KM-608).
2
+
3
+ An analytical "family" tool: in ONE call it groups rows by one or more keys
4
+ and computes aggregates (sum, mean, count, min, max, median, nunique) per
5
+ group. This is the deterministic compute twin of the Planner's
6
+ `query_structured` step β€” the wrapper layer later maps a QueryIR onto this.
7
+
8
+ STATUS: compute layer only β€” the function takes an already-materialized
9
+ DataFrame. The wrapper layer (fetching data from the catalog via source_id,
10
+ the ToolOutput envelope, ToolSpec registration) is added once the Planner
11
+ seam (KM-418) is settled. Keeping compute separate from data-fetching makes
12
+ this function easy to unit-test in isolation and stable when wrapped.
13
+ """
14
+
15
+ from __future__ import annotations
16
+
17
+ import pandas as pd
18
+
19
+ from src.tools.analytics.descriptive import ColumnNotFoundError
20
+
21
+ # Aggregation functions the tool understands. Whitelisted on purpose so an
22
+ # unknown function fails loudly instead of silently doing the wrong thing.
23
+ SUPPORTED_AGGS = ("sum", "mean", "count", "min", "max", "median", "nunique")
24
+
25
+
26
+ class UnsupportedAggregationError(ValueError):
27
+ """Requested aggregation is not in SUPPORTED_AGGS (maps to UNSUPPORTED_AGG)."""
28
+
29
+
30
+ def _clean(value: object) -> object:
31
+ """Convert numpy scalars to plain Python so the output is JSON-clean."""
32
+ if hasattr(value, "item"):
33
+ return value.item()
34
+ return value
35
+
36
+
37
+ def analyze_aggregate(
38
+ df: pd.DataFrame,
39
+ aggregations: dict[str, list[str]],
40
+ group_by: list[str] | None = None,
41
+ ) -> list[dict[str, object]]:
42
+ """Group-by aggregation over one or many keys.
43
+
44
+ Args:
45
+ df: already-materialized data (in the real system the wrapper fetches
46
+ this from a source_id).
47
+ aggregations: which columns to aggregate and how, e.g.
48
+ {"revenue": ["sum", "mean"], "order_id": ["count"]}.
49
+ group_by: grouping keys. If None/empty, the whole table is aggregated
50
+ into a single row.
51
+
52
+ Returns:
53
+ list[dict]: one row per group. Each row holds the group keys plus a
54
+ column per aggregate, named "<column>_<func>" (e.g. "revenue_sum").
55
+
56
+ Raises:
57
+ ColumnNotFoundError: if any group_by or aggregated column is absent.
58
+ UnsupportedAggregationError: if a requested function is not supported.
59
+ """
60
+ group_by = group_by or []
61
+
62
+ # Validate columns first (fail-fast on caller mistakes).
63
+ referenced = list(group_by) + list(aggregations.keys())
64
+ missing = [c for c in referenced if c not in df.columns]
65
+ if missing:
66
+ raise ColumnNotFoundError(f"columns not found: {missing}")
67
+
68
+ # Validate aggregation functions.
69
+ for col, funcs in aggregations.items():
70
+ bad = [f for f in funcs if f not in SUPPORTED_AGGS]
71
+ if bad:
72
+ raise UnsupportedAggregationError(
73
+ f"unsupported aggregation(s) {bad} for column '{col}'; "
74
+ f"supported: {list(SUPPORTED_AGGS)}"
75
+ )
76
+
77
+ # Build named aggregations: {"revenue_sum": ("revenue", "sum"), ...}
78
+ named = {
79
+ f"{col}_{func}": (col, func)
80
+ for col, funcs in aggregations.items()
81
+ for func in funcs
82
+ }
83
+
84
+ # No grouping β†’ aggregate the entire table into a single row.
85
+ if not group_by:
86
+ row = {name: _clean(df[col].agg(func)) for name, (col, func) in named.items()}
87
+ return [row]
88
+
89
+ # dropna=False keeps groups whose key is null so completeness is honest.
90
+ grouped = df.groupby(group_by, dropna=False).agg(**named).reset_index()
91
+ return [{k: _clean(v) for k, v in record.items()} for record in grouped.to_dict("records")]
src/tools/analytics/comparison.py ADDED
@@ -0,0 +1,108 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """analyze_comparison β€” compare a metric across two groups (KM-608).
2
+
3
+ An analytical "family" tool: in ONE call it aggregates a value for two groups
4
+ of a dimension (e.g. region "A" vs "B", channel "online" vs "store") and
5
+ reports the gap between them β€” absolute difference, percent difference, and
6
+ direction. group_a is treated as the baseline. Answers questions like
7
+ "how does revenue in region A compare to region B?".
8
+
9
+ STATUS: compute layer only β€” the function takes an already-materialized
10
+ DataFrame. The wrapper layer (fetching data from the catalog via source_id,
11
+ the ToolOutput envelope, ToolSpec registration) is added once the Planner
12
+ seam (KM-418) is settled. Keeping compute separate from data-fetching makes
13
+ this function easy to unit-test in isolation and stable when wrapped.
14
+ """
15
+
16
+ from __future__ import annotations
17
+
18
+ import pandas as pd
19
+
20
+ from src.tools.analytics.descriptive import ColumnNotFoundError
21
+
22
+ # How to aggregate the value within each group before comparing.
23
+ SUPPORTED_AGGS = ("sum", "mean", "count", "min", "max", "median")
24
+
25
+
26
+ class UnsupportedAggregationError(ValueError):
27
+ """The requested aggregation is not supported (maps to error_code UNSUPPORTED_AGG)."""
28
+
29
+
30
+ class GroupNotFoundError(ValueError):
31
+ """A requested group value does not occur in the dimension column (maps to GROUP_NOT_FOUND)."""
32
+
33
+
34
+
35
+
36
+ def analyze_comparison(
37
+ df: pd.DataFrame,
38
+ dimension: str,
39
+ value_column: str,
40
+ group_a: object,
41
+ group_b: object,
42
+ agg: str = "sum",
43
+ ) -> dict[str, object]:
44
+ """Compare one aggregated metric between two groups of a dimension.
45
+
46
+ Args:
47
+ df: already-materialized data (in the real system the wrapper fetches
48
+ this from a source_id).
49
+ dimension: the categorical column whose values define the two groups.
50
+ value_column: numeric column to aggregate for each group.
51
+ group_a: baseline group value (the "from").
52
+ group_b: comparison group value (the "to").
53
+ agg: how to aggregate within each group β€” one of SUPPORTED_AGGS.
54
+
55
+ Returns:
56
+ dict with:
57
+ dimension, value_column, agg β€” echo of the chosen settings
58
+ group_a, value_a β€” baseline group + its aggregate
59
+ group_b, value_b β€” comparison group + its aggregate
60
+ diff_abs β€” value_b - value_a
61
+ diff_pct β€” diff_abs / value_a, or None if value_a == 0
62
+ comparison β€” "higher" | "lower" | "equal" (b relative to a)
63
+
64
+ Raises:
65
+ ColumnNotFoundError: if dimension or value_column is absent.
66
+ UnsupportedAggregationError: if agg is not supported.
67
+ GroupNotFoundError: if group_a or group_b has no rows.
68
+ """
69
+ missing = [c for c in (dimension, value_column) if c not in df.columns]
70
+ if missing:
71
+ raise ColumnNotFoundError(f"columns not found: {missing}")
72
+ if agg not in SUPPORTED_AGGS:
73
+ raise UnsupportedAggregationError(
74
+ f"unsupported aggregation '{agg}'; supported: {list(SUPPORTED_AGGS)}"
75
+ )
76
+
77
+ rows_a = df.loc[df[dimension] == group_a, value_column]
78
+ rows_b = df.loc[df[dimension] == group_b, value_column]
79
+ empty = [g for g, rows in ((group_a, rows_a), (group_b, rows_b)) if rows.empty]
80
+ if empty:
81
+ raise GroupNotFoundError(
82
+ f"no rows for group(s) {empty} in column '{dimension}'"
83
+ )
84
+
85
+ value_a = float(rows_a.agg(agg))
86
+ value_b = float(rows_b.agg(agg))
87
+ diff_abs = value_b - value_a
88
+ diff_pct = (diff_abs / value_a) if value_a != 0 else None
89
+
90
+ if diff_abs > 0:
91
+ comparison = "higher"
92
+ elif diff_abs < 0:
93
+ comparison = "lower"
94
+ else:
95
+ comparison = "equal"
96
+
97
+ return {
98
+ "dimension": dimension,
99
+ "value_column": value_column,
100
+ "agg": agg,
101
+ "group_a": group_a,
102
+ "value_a": value_a,
103
+ "group_b": group_b,
104
+ "value_b": value_b,
105
+ "diff_abs": diff_abs,
106
+ "diff_pct": diff_pct,
107
+ "comparison": comparison,
108
+ }
src/tools/analytics/decomposition.py ADDED
@@ -0,0 +1,110 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """analyze_contribution β€” share-of-total per category (KM-608).
2
+
3
+ An analytical "family" tool: in ONE call it breaks a total down into each
4
+ category's contribution (absolute value, share of total, and running
5
+ cumulative share), sorted largest-first. This is a single snapshot β€” "who
6
+ makes up the total right now" β€” as opposed to analyze_comparison (two groups)
7
+ or analyze_trend (movement over time). Answers questions like "which region
8
+ drives most of revenue?" and supports Pareto (80/20) reasoning.
9
+
10
+ STATUS: compute layer only β€” the function takes an already-materialized
11
+ DataFrame. The wrapper layer (fetching data from the catalog via source_id,
12
+ the ToolOutput envelope, ToolSpec registration) is added once the Planner
13
+ seam (KM-418) is settled. Keeping compute separate from data-fetching makes
14
+ this function easy to unit-test in isolation and stable when wrapped.
15
+ """
16
+
17
+ from __future__ import annotations
18
+
19
+ import pandas as pd
20
+
21
+ from src.tools.analytics.descriptive import ColumnNotFoundError
22
+
23
+ # Share-of-total is most meaningful for additive aggregates (sum/count), but
24
+ # mean/min/max are allowed; "total" is then the sum of the group aggregates.
25
+ SUPPORTED_AGGS = ("sum", "mean", "count", "min", "max", "median")
26
+
27
+
28
+ class UnsupportedAggregationError(ValueError):
29
+ """The requested aggregation is not supported (maps to error_code UNSUPPORTED_AGG)."""
30
+
31
+
32
+ def _clean(value: object) -> object:
33
+ """Convert numpy scalars to plain Python so the output is JSON-clean."""
34
+ if hasattr(value, "item"):
35
+ return value.item()
36
+ return value
37
+
38
+
39
+ def analyze_contribution(
40
+ df: pd.DataFrame,
41
+ dimension: str,
42
+ value_column: str,
43
+ agg: str = "sum",
44
+ top_n: int | None = None,
45
+ ) -> dict[str, object]:
46
+ """Contribution (share of total) of each category, largest first.
47
+
48
+ Args:
49
+ df: already-materialized data (in the real system the wrapper fetches
50
+ this from a source_id).
51
+ dimension: categorical column to break the total down by.
52
+ value_column: numeric column to aggregate per category.
53
+ agg: how to aggregate within each category β€” one of SUPPORTED_AGGS.
54
+ top_n: if set, keep the top N categories and lump the remainder into a
55
+ single "Others" row.
56
+
57
+ Returns:
58
+ dict with:
59
+ dimension, value_column, agg β€” echo of the chosen settings
60
+ total β€” sum of all category aggregates
61
+ items β€” [{"category", "value", "share",
62
+ "cumulative_share"}] largest first
63
+
64
+ Raises:
65
+ ColumnNotFoundError: if dimension or value_column is absent.
66
+ UnsupportedAggregationError: if agg is not supported.
67
+ """
68
+ missing = [c for c in (dimension, value_column) if c not in df.columns]
69
+ if missing:
70
+ raise ColumnNotFoundError(f"columns not found: {missing}")
71
+ if agg not in SUPPORTED_AGGS:
72
+ raise UnsupportedAggregationError(
73
+ f"unsupported aggregation '{agg}'; supported: {list(SUPPORTED_AGGS)}"
74
+ )
75
+
76
+ grouped = df.groupby(dimension, dropna=False)[value_column].agg(agg)
77
+ grouped = grouped.sort_values(ascending=False)
78
+ pairs = list(grouped.items())
79
+
80
+ # Optionally collapse the long tail into a single "Others" bucket.
81
+ if top_n is not None and len(pairs) > top_n:
82
+ head = pairs[:top_n]
83
+ others_value = sum(val for _, val in pairs[top_n:])
84
+ head.append(("Others", others_value))
85
+ pairs = head
86
+
87
+ total = sum(val for _, val in pairs)
88
+
89
+ items = []
90
+ cumulative = 0.0
91
+ for cat, val in pairs:
92
+ share = (val / total) if total else None
93
+ if share is not None:
94
+ cumulative += share
95
+ items.append(
96
+ {
97
+ "category": _clean(cat),
98
+ "value": _clean(val),
99
+ "share": share,
100
+ "cumulative_share": cumulative if total else None,
101
+ }
102
+ )
103
+
104
+ return {
105
+ "dimension": dimension,
106
+ "value_column": value_column,
107
+ "agg": agg,
108
+ "total": _clean(total),
109
+ "items": items,
110
+ }
src/tools/analytics/descriptive.py ADDED
@@ -0,0 +1,111 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """analyze_descriptive β€” single/multi-column EDA (KM-608).
2
+
3
+ An analytical "family" tool: in ONE call it computes a column's center,
4
+ spread, shape, and completeness (mean, median, mode, std, variance,
5
+ quartiles, min/max, skew, null_rate).
6
+
7
+ STATUS: compute layer only β€” the function takes an already-materialized
8
+ DataFrame. The wrapper layer (fetching data from the catalog via source_id,
9
+ the ToolOutput envelope, ToolSpec registration) is added once the Planner
10
+ seam (KM-418) is settled. Keeping compute separate from data-fetching makes
11
+ this function easy to unit-test in isolation and stable when wrapped.
12
+ """
13
+
14
+ from __future__ import annotations
15
+
16
+ import pandas as pd
17
+
18
+ # Default metrics used when the caller does not narrow them via `metrics`.
19
+ DEFAULT_METRICS = (
20
+ "count",
21
+ "mean",
22
+ "median",
23
+ "mode",
24
+ "std",
25
+ "var",
26
+ "q1",
27
+ "q3",
28
+ "min",
29
+ "max",
30
+ "skew",
31
+ "null_count",
32
+ "null_rate",
33
+ )
34
+
35
+
36
+ class ColumnNotFoundError(ValueError):
37
+ """A requested column is absent from the DataFrame (maps to error_code COLUMN_NOT_FOUND)."""
38
+
39
+
40
+ def _describe_one(series: pd.Series, metrics: tuple[str, ...]) -> dict[str, object]:
41
+ """Compute descriptive metrics for a single column.
42
+
43
+ Numeric metrics are computed over non-null values. `null_rate` & `count`
44
+ are computed over all rows (nulls included) so they reflect completeness
45
+ as-is. Undefined cases (e.g. std of a single value) return None β€” degrade
46
+ gracefully instead of raising.
47
+ """
48
+ total = len(series)
49
+ non_null = series.dropna()
50
+ is_numeric = pd.api.types.is_numeric_dtype(series)
51
+
52
+ out: dict[str, object] = {}
53
+ for m in metrics:
54
+ if m == "count":
55
+ out["count"] = int(total)
56
+ elif m == "null_count":
57
+ out["null_count"] = int(series.isna().sum())
58
+ elif m == "null_rate":
59
+ out["null_rate"] = float(series.isna().mean()) if total else 0.0
60
+ elif m == "mode":
61
+ modes = non_null.mode()
62
+ out["mode"] = modes.iloc[0] if not modes.empty else None
63
+ elif not is_numeric:
64
+ out[m] = None
65
+ elif m == "mean":
66
+ out["mean"] = float(non_null.mean()) if not non_null.empty else None
67
+ elif m == "median":
68
+ out["median"] = float(non_null.median()) if not non_null.empty else None
69
+ elif m == "std":
70
+ out["std"] = float(non_null.std()) if non_null.shape[0] > 1 else None
71
+ elif m == "var":
72
+ out["var"] = float(non_null.var()) if non_null.shape[0] > 1 else None
73
+ elif m == "q1":
74
+ out["q1"] = float(non_null.quantile(0.25)) if not non_null.empty else None
75
+ elif m == "q3":
76
+ out["q3"] = float(non_null.quantile(0.75)) if not non_null.empty else None
77
+ elif m == "min":
78
+ out["min"] = float(non_null.min()) if not non_null.empty else None
79
+ elif m == "max":
80
+ out["max"] = float(non_null.max()) if not non_null.empty else None
81
+ elif m == "skew":
82
+ out["skew"] = float(non_null.skew()) if non_null.shape[0] > 2 else None
83
+ return out
84
+
85
+
86
+ def analyze_descriptive(
87
+ df: pd.DataFrame,
88
+ column_ids: list[str],
89
+ metrics: list[str] | None = None,
90
+ ) -> dict[str, dict[str, object]]:
91
+ """Descriptive EDA for one or many columns.
92
+
93
+ Args:
94
+ df: already-materialized data (in the real system the wrapper fetches
95
+ this from a source_id).
96
+ column_ids: columns to analyze.
97
+ metrics: subset of metrics; defaults to all of DEFAULT_METRICS.
98
+
99
+ Returns:
100
+ dict: { column_id: { metric: value, ... }, ... }
101
+
102
+ Raises:
103
+ ColumnNotFoundError: if any column_id is absent from df.
104
+ """
105
+ chosen = tuple(metrics) if metrics else DEFAULT_METRICS
106
+
107
+ missing = [c for c in column_ids if c not in df.columns]
108
+ if missing:
109
+ raise ColumnNotFoundError(f"columns not found: {missing}")
110
+
111
+ return {col: _describe_one(df[col], chosen) for col in column_ids}
src/tools/analytics/quality.py ADDED
@@ -0,0 +1,108 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """analyze_profile β€” per-column data-quality profile (KM-608).
2
+
3
+ An analytical "family" tool: in ONE call it profiles each column's health β€”
4
+ dtype, inferred type, completeness (null count/rate), cardinality (distinct
5
+ count/rate, constant flag), and β€” for numeric columns β€” min/max/mean plus an
6
+ IQR-based outlier count; for non-numeric columns the most frequent value.
7
+ Answers "is this data clean enough to analyze?" and surfaces issues (lots of
8
+ nulls, a constant column, outliers) before deeper analysis.
9
+
10
+ STATUS: compute layer only β€” the function takes an already-materialized
11
+ DataFrame. The wrapper layer (fetching data from the catalog via source_id,
12
+ the ToolOutput envelope, ToolSpec registration) is added once the Planner
13
+ seam (KM-418) is settled. Keeping compute separate from data-fetching makes
14
+ this function easy to unit-test in isolation and stable when wrapped.
15
+ """
16
+
17
+ from __future__ import annotations
18
+
19
+ import pandas as pd
20
+
21
+ from src.tools.analytics.descriptive import ColumnNotFoundError
22
+
23
+
24
+ def _clean(value: object) -> object:
25
+ """Convert numpy scalars to plain Python so the output is JSON-clean."""
26
+ if hasattr(value, "item"):
27
+ return value.item()
28
+ return value
29
+
30
+
31
+ def _profile_one(series: pd.Series) -> dict[str, object]:
32
+ """Build the quality profile for a single column."""
33
+ total = len(series)
34
+ non_null = series.dropna()
35
+ nn = len(non_null)
36
+ distinct = int(series.nunique(dropna=True))
37
+
38
+ is_bool = pd.api.types.is_bool_dtype(series)
39
+ is_datetime = pd.api.types.is_datetime64_any_dtype(series)
40
+ # bool is technically numeric in pandas; treat it as its own type.
41
+ is_numeric = pd.api.types.is_numeric_dtype(series) and not is_bool
42
+
43
+ if is_bool:
44
+ inferred = "boolean"
45
+ elif is_datetime:
46
+ inferred = "datetime"
47
+ elif is_numeric:
48
+ inferred = "numeric"
49
+ else:
50
+ inferred = "categorical"
51
+
52
+ out: dict[str, object] = {
53
+ "dtype": str(series.dtype),
54
+ "inferred_type": inferred,
55
+ "count": int(total),
56
+ "null_count": int(series.isna().sum()),
57
+ "null_rate": float(series.isna().mean()) if total else 0.0,
58
+ "distinct_count": distinct,
59
+ "distinct_rate": (distinct / nn) if nn else 0.0, # over non-null values
60
+ "is_constant": distinct <= 1,
61
+ }
62
+
63
+ if is_numeric and nn > 0:
64
+ out["min"] = _clean(non_null.min())
65
+ out["max"] = _clean(non_null.max())
66
+ out["mean"] = _clean(non_null.mean())
67
+ # IQR rule: values outside [Q1 - 1.5*IQR, Q3 + 1.5*IQR] are outliers.
68
+ # Needs enough points for stable quartiles.
69
+ if nn >= 4:
70
+ q1 = non_null.quantile(0.25)
71
+ q3 = non_null.quantile(0.75)
72
+ iqr = q3 - q1
73
+ lower, upper = q1 - 1.5 * iqr, q3 + 1.5 * iqr
74
+ out["outlier_count"] = int(((non_null < lower) | (non_null > upper)).sum())
75
+ else:
76
+ out["outlier_count"] = None
77
+ elif not is_numeric and nn > 0:
78
+ counts = non_null.value_counts()
79
+ out["top_value"] = _clean(counts.index[0])
80
+ out["top_freq"] = int(counts.iloc[0])
81
+
82
+ return out
83
+
84
+
85
+ def analyze_profile(
86
+ df: pd.DataFrame,
87
+ column_ids: list[str] | None = None,
88
+ ) -> dict[str, dict[str, object]]:
89
+ """Per-column data-quality profile.
90
+
91
+ Args:
92
+ df: already-materialized data (in the real system the wrapper fetches
93
+ this from a source_id).
94
+ column_ids: columns to profile. If None, every column is profiled.
95
+
96
+ Returns:
97
+ dict: { column_id: { profile fields, ... }, ... }
98
+
99
+ Raises:
100
+ ColumnNotFoundError: if any column_id is absent from df.
101
+ """
102
+ cols = list(column_ids) if column_ids is not None else list(df.columns)
103
+
104
+ missing = [c for c in cols if c not in df.columns]
105
+ if missing:
106
+ raise ColumnNotFoundError(f"columns not found: {missing}")
107
+
108
+ return {col: _profile_one(df[col]) for col in cols}
src/tools/analytics/relationship.py ADDED
@@ -0,0 +1,108 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """analyze_correlation β€” correlation among numeric columns (KM-608).
2
+
3
+ An analytical "family" tool: in ONE call it measures how strongly numeric
4
+ columns move together. Returns the full correlation matrix plus a list of
5
+ column pairs ranked by strength. Answers questions like "does price relate to
6
+ units sold?".
7
+
8
+ STATUS: compute layer only β€” the function takes an already-materialized
9
+ DataFrame. The wrapper layer (fetching data from the catalog via source_id,
10
+ the ToolOutput envelope, ToolSpec registration) is added once the Planner
11
+ seam (KM-418) is settled. Keeping compute separate from data-fetching makes
12
+ this function easy to unit-test in isolation and stable when wrapped.
13
+ """
14
+
15
+ from __future__ import annotations
16
+
17
+ import math
18
+
19
+ import pandas as pd
20
+
21
+ from src.tools.analytics.descriptive import ColumnNotFoundError
22
+
23
+ # Correlation methods supported by pandas .corr().
24
+ SUPPORTED_METHODS = ("pearson", "spearman", "kendall")
25
+
26
+
27
+ class InvalidMethodError(ValueError):
28
+ """The requested method is not supported (maps to error_code INVALID_METHOD)."""
29
+
30
+
31
+ class NonNumericColumnError(ValueError):
32
+ """A requested column is not numeric (maps to error_code NON_NUMERIC_COLUMN)."""
33
+
34
+
35
+ class NotEnoughColumnsError(ValueError):
36
+ """Correlation needs at least two numeric columns (maps to NOT_ENOUGH_COLUMNS)."""
37
+
38
+
39
+ def _clean(value: object) -> float | None:
40
+ """Cast to plain float; NaN (e.g. a zero-variance column) -> None."""
41
+ if value is None:
42
+ return None
43
+ f = float(value) # type: ignore[arg-type]
44
+ return None if math.isnan(f) else f
45
+
46
+
47
+ def analyze_correlation(
48
+ df: pd.DataFrame,
49
+ column_ids: list[str] | None = None,
50
+ method: str = "pearson",
51
+ ) -> dict[str, object]:
52
+ """Pairwise correlation across numeric columns.
53
+
54
+ Args:
55
+ df: already-materialized data (in the real system the wrapper fetches
56
+ this from a source_id).
57
+ column_ids: numeric columns to correlate. If None, every numeric
58
+ column in df is used.
59
+ method: "pearson" (linear), "spearman" (rank), or "kendall".
60
+
61
+ Returns:
62
+ dict with:
63
+ method β€” echo of the chosen method
64
+ columns β€” the numeric columns actually correlated
65
+ matrix β€” { col: { col: corr|None } } full square matrix
66
+ pairs β€” [{"a", "b", "corr"}] unique pairs, strongest |corr| first
67
+
68
+ Raises:
69
+ InvalidMethodError: if method is unknown.
70
+ ColumnNotFoundError: if an explicit column is absent.
71
+ NonNumericColumnError: if an explicit column is not numeric.
72
+ NotEnoughColumnsError: if fewer than two numeric columns remain.
73
+ """
74
+ if method not in SUPPORTED_METHODS:
75
+ raise InvalidMethodError(
76
+ f"unknown method '{method}'; supported: {list(SUPPORTED_METHODS)}"
77
+ )
78
+
79
+ if column_ids is None:
80
+ cols = [c for c in df.columns if pd.api.types.is_numeric_dtype(df[c])]
81
+ else:
82
+ missing = [c for c in column_ids if c not in df.columns]
83
+ if missing:
84
+ raise ColumnNotFoundError(f"columns not found: {missing}")
85
+ non_numeric = [
86
+ c for c in column_ids if not pd.api.types.is_numeric_dtype(df[c])
87
+ ]
88
+ if non_numeric:
89
+ raise NonNumericColumnError(f"columns are not numeric: {non_numeric}")
90
+ cols = list(column_ids)
91
+
92
+ if len(cols) < 2:
93
+ raise NotEnoughColumnsError(
94
+ f"need >= 2 numeric columns, got {len(cols)}: {cols}"
95
+ )
96
+
97
+ corr = df[cols].corr(method=method)
98
+ matrix = {a: {b: _clean(corr.loc[a, b]) for b in cols} for a in cols}
99
+
100
+ pairs = []
101
+ for i in range(len(cols)):
102
+ for j in range(i + 1, len(cols)):
103
+ val = _clean(corr.iloc[i, j])
104
+ if val is not None:
105
+ pairs.append({"a": cols[i], "b": cols[j], "corr": val})
106
+ pairs.sort(key=lambda p: abs(p["corr"]), reverse=True)
107
+
108
+ return {"method": method, "columns": cols, "matrix": matrix, "pairs": pairs}
src/tools/analytics/segmentation.py ADDED
@@ -0,0 +1,124 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """analyze_segment β€” bucket rows into segments (KM-608).
2
+
3
+ An analytical "family" tool: in ONE call it bins a numeric column into
4
+ segments and reports how rows distribute across them (count, and optionally an
5
+ aggregate of another column per segment). Two binning modes: explicit cut
6
+ "edges" (e.g. age 0-18-35-60) or equal-frequency "quantile" buckets (quartiles,
7
+ deciles). Answers questions like "split customers into age brackets" or "bucket
8
+ orders into value tiers".
9
+
10
+ STATUS: compute layer only β€” the function takes an already-materialized
11
+ DataFrame. The wrapper layer (fetching data from the catalog via source_id,
12
+ the ToolOutput envelope, ToolSpec registration) is added once the Planner
13
+ seam (KM-418) is settled. Keeping compute separate from data-fetching makes
14
+ this function easy to unit-test in isolation and stable when wrapped.
15
+ """
16
+
17
+ from __future__ import annotations
18
+
19
+ import math
20
+
21
+ import pandas as pd
22
+
23
+ from src.tools.analytics.descriptive import ColumnNotFoundError
24
+
25
+ # Binning strategies.
26
+ SUPPORTED_METHODS = ("edges", "quantile")
27
+
28
+ # How to aggregate the value column within each segment.
29
+ SUPPORTED_AGGS = ("sum", "mean", "count", "min", "max", "median")
30
+
31
+
32
+ class InvalidMethodError(ValueError):
33
+ """The requested binning method is not supported (maps to INVALID_METHOD)."""
34
+
35
+
36
+ class NonNumericColumnError(ValueError):
37
+ """The column to segment on is not numeric (maps to NON_NUMERIC_COLUMN)."""
38
+
39
+
40
+ class UnsupportedAggregationError(ValueError):
41
+ """The requested aggregation is not supported (maps to UNSUPPORTED_AGG)."""
42
+
43
+
44
+ def _clean(value: object) -> object:
45
+ """Convert numpy scalars to plain Python; NaN -> None for JSON-clean output."""
46
+ if value is None:
47
+ return None
48
+ if hasattr(value, "item"):
49
+ value = value.item()
50
+ if isinstance(value, float) and math.isnan(value):
51
+ return None
52
+ return value
53
+
54
+
55
+ def analyze_segment(
56
+ df: pd.DataFrame,
57
+ column: str,
58
+ bins: list[float] | int,
59
+ method: str = "edges",
60
+ labels: list[str] | None = None,
61
+ value_column: str | None = None,
62
+ agg: str = "sum",
63
+ ) -> dict[str, object]:
64
+ """Segment rows by binning a numeric column.
65
+
66
+ Args:
67
+ df: already-materialized data (in the real system the wrapper fetches
68
+ this from a source_id).
69
+ column: numeric column to bin on.
70
+ bins: for method "edges", the list of cut boundaries (e.g.
71
+ [0, 18, 35, 60]); for method "quantile", the number of equal-
72
+ frequency buckets (e.g. 4 for quartiles).
73
+ method: "edges" (explicit boundaries) or "quantile" (equal frequency).
74
+ labels: optional segment names; for "edges" there must be
75
+ len(bins) - 1 of them.
76
+ value_column: if given, also aggregate this column per segment.
77
+ agg: how to aggregate value_column β€” one of SUPPORTED_AGGS.
78
+
79
+ Returns:
80
+ dict with:
81
+ column, method β€” echo of the chosen settings
82
+ agg β€” present only when value_column is given
83
+ segments β€” [{"segment", "count", ("value")}], in bin order
84
+
85
+ Raises:
86
+ ColumnNotFoundError: if column or value_column is absent.
87
+ NonNumericColumnError: if column is not numeric.
88
+ InvalidMethodError: if method is unknown.
89
+ UnsupportedAggregationError: if agg is not supported.
90
+ """
91
+ referenced = [column] + ([value_column] if value_column else [])
92
+ missing = [c for c in referenced if c not in df.columns]
93
+ if missing:
94
+ raise ColumnNotFoundError(f"columns not found: {missing}")
95
+ if not pd.api.types.is_numeric_dtype(df[column]):
96
+ raise NonNumericColumnError(f"column '{column}' is not numeric")
97
+ if method not in SUPPORTED_METHODS:
98
+ raise InvalidMethodError(
99
+ f"unknown method '{method}'; supported: {list(SUPPORTED_METHODS)}"
100
+ )
101
+ if value_column is not None and agg not in SUPPORTED_AGGS:
102
+ raise UnsupportedAggregationError(
103
+ f"unsupported aggregation '{agg}'; supported: {list(SUPPORTED_AGGS)}"
104
+ )
105
+
106
+ if method == "edges":
107
+ cats = pd.cut(df[column], bins=bins, labels=labels, include_lowest=True)
108
+ else: # quantile
109
+ cats = pd.qcut(df[column], q=bins, labels=labels, duplicates="drop")
110
+
111
+ grouped = df.groupby(cats, observed=False)
112
+ counts = grouped.size()
113
+
114
+ segments = []
115
+ for seg in counts.index:
116
+ row = {"segment": str(seg), "count": int(counts[seg])}
117
+ if value_column is not None:
118
+ row["value"] = _clean(grouped[value_column].agg(agg).get(seg))
119
+ segments.append(row)
120
+
121
+ out: dict[str, object] = {"column": column, "method": method, "segments": segments}
122
+ if value_column is not None:
123
+ out["agg"] = agg
124
+ return out
src/tools/analytics/temporal.py ADDED
@@ -0,0 +1,159 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """analyze_trend β€” time-series trend over a period (KM-608).
2
+
3
+ An analytical "family" tool: in ONE call it buckets rows into time periods
4
+ (day/week/month/quarter/year), aggregates a value per period, and summarizes
5
+ the movement (first vs last, absolute & percent change, direction, linear
6
+ slope). Answers questions like "how did revenue trend month over month?".
7
+
8
+ STATUS: compute layer only β€” the function takes an already-materialized
9
+ DataFrame. The wrapper layer (fetching data from the catalog via source_id,
10
+ the ToolOutput envelope, ToolSpec registration) is added once the Planner
11
+ seam (KM-418) is settled. Keeping compute separate from data-fetching makes
12
+ this function easy to unit-test in isolation and stable when wrapped.
13
+ """
14
+
15
+ from __future__ import annotations
16
+
17
+ import numpy as np
18
+ import pandas as pd
19
+
20
+ from src.tools.analytics.descriptive import ColumnNotFoundError
21
+
22
+ # Friendly period name -> pandas resample rule. Using the non-deprecated
23
+ # pandas 2.2 codes (ME/QE/YE) avoids FutureWarnings.
24
+ FREQ_MAP = {
25
+ "day": "D",
26
+ "week": "W",
27
+ "month": "ME",
28
+ "quarter": "QE",
29
+ "year": "YE",
30
+ }
31
+
32
+ # How to aggregate the value within each period.
33
+ SUPPORTED_AGGS = ("sum", "mean", "count", "min", "max", "median")
34
+
35
+
36
+ class InvalidFrequencyError(ValueError):
37
+ """The requested period is not in FREQ_MAP (maps to error_code INVALID_FREQUENCY)."""
38
+
39
+
40
+ class UnsupportedAggregationError(ValueError):
41
+ """The requested aggregation is not supported (maps to error_code UNSUPPORTED_AGG)."""
42
+
43
+
44
+ def _clean(value: object) -> object:
45
+ """Convert numpy scalars to plain Python; NaN -> None for JSON-clean output."""
46
+ if value is None:
47
+ return None
48
+ if isinstance(value, float) and np.isnan(value):
49
+ return None
50
+ if hasattr(value, "item"):
51
+ value = value.item()
52
+ return None if isinstance(value, float) and np.isnan(value) else value
53
+ return value
54
+
55
+
56
+ def _period_label(ts: pd.Timestamp, freq: str) -> str:
57
+ """Human-readable period label keyed off the friendly frequency name."""
58
+ if freq == "month":
59
+ return str(ts.strftime("%Y-%m"))
60
+ if freq == "quarter":
61
+ return f"{ts.year}-Q{ts.quarter}"
62
+ if freq == "year":
63
+ return str(ts.strftime("%Y"))
64
+ return str(ts.strftime("%Y-%m-%d")) # day / week
65
+
66
+
67
+ def analyze_trend(
68
+ df: pd.DataFrame,
69
+ date_column: str,
70
+ value_column: str,
71
+ freq: str = "month",
72
+ agg: str = "sum",
73
+ ) -> dict[str, object]:
74
+ """Time-series trend of one value over evenly-spaced periods.
75
+
76
+ Args:
77
+ df: already-materialized data (in the real system the wrapper fetches
78
+ this from a source_id).
79
+ date_column: column holding dates/timestamps.
80
+ value_column: numeric column to aggregate per period.
81
+ freq: period granularity β€” one of FREQ_MAP keys (default "month").
82
+ agg: how to aggregate within a period β€” one of SUPPORTED_AGGS.
83
+
84
+ Returns:
85
+ dict with:
86
+ freq, agg β€” echo of the chosen settings
87
+ points β€” [{"period": str, "value": number|None}, ...]
88
+ first, last β€” value of the first/last non-empty period
89
+ change_abs β€” last - first
90
+ change_pct β€” (last - first) / first, or None if first == 0
91
+ direction β€” "up" | "down" | "flat"
92
+ slope β€” linear slope across periods, or None if < 2 points
93
+
94
+ Raises:
95
+ ColumnNotFoundError: if date_column or value_column is absent.
96
+ InvalidFrequencyError: if freq is not a known period.
97
+ UnsupportedAggregationError: if agg is not supported.
98
+ """
99
+ missing = [c for c in (date_column, value_column) if c not in df.columns]
100
+ if missing:
101
+ raise ColumnNotFoundError(f"columns not found: {missing}")
102
+ if freq not in FREQ_MAP:
103
+ raise InvalidFrequencyError(
104
+ f"unknown frequency '{freq}'; supported: {list(FREQ_MAP)}"
105
+ )
106
+ if agg not in SUPPORTED_AGGS:
107
+ raise UnsupportedAggregationError(
108
+ f"unsupported aggregation '{agg}'; supported: {list(SUPPORTED_AGGS)}"
109
+ )
110
+
111
+ # Build a clean datetime-indexed series, then resample into periods.
112
+ s = df[[date_column, value_column]].copy()
113
+ s[date_column] = pd.to_datetime(s[date_column])
114
+ s = s.dropna(subset=[date_column]).set_index(date_column).sort_index()
115
+ resampled = s[value_column].resample(FREQ_MAP[freq]).agg(agg)
116
+
117
+ points = [
118
+ {"period": _period_label(ts, freq), "value": _clean(val)}
119
+ for ts, val in resampled.items()
120
+ ]
121
+
122
+ # Summary stats are computed over non-empty periods only.
123
+ non_null = resampled.dropna()
124
+ first: float | None
125
+ last: float | None
126
+ change_abs: float | None
127
+ change_pct: float | None
128
+ slope: float | None
129
+ if non_null.empty:
130
+ first = last = change_abs = change_pct = slope = None
131
+ direction = "flat"
132
+ else:
133
+ first = float(non_null.iloc[0])
134
+ last = float(non_null.iloc[-1])
135
+ change_abs = last - first
136
+ change_pct = (change_abs / first) if first != 0 else None
137
+ if change_abs > 0:
138
+ direction = "up"
139
+ elif change_abs < 0:
140
+ direction = "down"
141
+ else:
142
+ direction = "flat"
143
+ if non_null.shape[0] > 1:
144
+ x = np.arange(non_null.shape[0])
145
+ slope = float(np.polyfit(x, non_null.to_numpy(dtype=float), 1)[0])
146
+ else:
147
+ slope = None
148
+
149
+ return {
150
+ "freq": freq,
151
+ "agg": agg,
152
+ "points": points,
153
+ "first": first,
154
+ "last": last,
155
+ "change_abs": change_abs,
156
+ "change_pct": change_pct,
157
+ "direction": direction,
158
+ "slope": slope,
159
+ }