Spaces:
Running
Running
| """Data profiler β samples the actual database to give the AI business context. | |
| Profiles each table to discover: | |
| - Categorical columns and their distinct values (status, type, category, etc.) | |
| - Numeric column ranges (min, max, avg) | |
| - Date column ranges | |
| - Sample rows | |
| This info is injected into the AI prompts so it can make smart | |
| business decisions (e.g., filter by status='closed' for revenue). | |
| """ | |
| import time | |
| from typing import Any | |
| from sqlalchemy import text | |
| from db.connection import get_engine | |
| from db.schema import get_schema | |
| # ββ Cache βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _profile_cache: str | None = None | |
| _profile_ts: float = 0.0 | |
| _PROFILE_TTL: float = 600.0 # 10 minutes | |
| def get_data_profile(force_refresh: bool = False) -> str: | |
| """Return a formatted data profile string for prompt injection.""" | |
| global _profile_cache, _profile_ts | |
| if not force_refresh and _profile_cache and (time.time() - _profile_ts < _PROFILE_TTL): | |
| return _profile_cache | |
| schema = get_schema() | |
| profile_parts: list[str] = [] | |
| engine = get_engine() | |
| with engine.connect() as conn: | |
| for table, columns in schema.items(): | |
| table_profile = _profile_table(conn, table, columns) | |
| if table_profile: | |
| profile_parts.append(table_profile) | |
| # Auto-generate business rules | |
| rules = _generate_business_rules(schema) | |
| if rules: | |
| profile_parts.append(rules) | |
| _profile_cache = "\n".join(profile_parts) | |
| _profile_ts = time.time() | |
| return _profile_cache | |
| def _profile_table(conn, table: str, columns: list[dict]) -> str: | |
| """Profile a single table.""" | |
| lines: list[str] = [f"TABLE PROFILE: {table}"] | |
| # Row count | |
| try: | |
| count = conn.execute(text(f'SELECT count(*) FROM "{table}"')).scalar() | |
| lines.append(f" Total rows: {count}") | |
| except Exception: | |
| return "" | |
| if count == 0: | |
| lines.append(" (empty table)") | |
| return "\n".join(lines) | |
| # Profile each column | |
| for col in columns: | |
| cname = col["column_name"] | |
| dtype = col["data_type"] | |
| try: | |
| if _is_categorical(dtype, cname): | |
| profile = _profile_categorical(conn, table, cname, count) | |
| if profile: | |
| lines.append(profile) | |
| elif _is_numeric(dtype): | |
| profile = _profile_numeric(conn, table, cname) | |
| if profile: | |
| lines.append(profile) | |
| elif _is_date(dtype): | |
| profile = _profile_date(conn, table, cname) | |
| if profile: | |
| lines.append(profile) | |
| except Exception: | |
| continue | |
| lines.append("") | |
| return "\n".join(lines) | |
| def _is_categorical(dtype: str, cname: str) -> bool: | |
| """Check if a column is likely categorical (status, type, category, etc.).""" | |
| categorical_types = {"character varying", "text", "varchar", "char", "character"} | |
| categorical_keywords = { | |
| "status", "state", "type", "category", "kind", "class", | |
| "group", "level", "tier", "grade", "priority", "stage", | |
| "flag", "mode", "role", "region", "country", "city", | |
| "gender", "channel", "source", "segment", "department", | |
| } | |
| if dtype.lower() in categorical_types: | |
| # Check if the column name suggests it's categorical | |
| lower_name = cname.lower() | |
| if any(kw in lower_name for kw in categorical_keywords): | |
| return True | |
| # Also profile short text columns | |
| return True | |
| return False | |
| def _is_numeric(dtype: str) -> bool: | |
| numeric_types = { | |
| "integer", "bigint", "smallint", "numeric", "real", | |
| "double precision", "decimal", "float", "int", | |
| } | |
| return dtype.lower() in numeric_types | |
| def _is_date(dtype: str) -> bool: | |
| date_types = { | |
| "date", "timestamp", "timestamp without time zone", | |
| "timestamp with time zone", "timestamptz", | |
| } | |
| return dtype.lower() in date_types | |
| def _profile_categorical(conn, table: str, col: str, total_rows: int) -> str | None: | |
| """Get distinct values for categorical columns (up to 25 values).""" | |
| result = conn.execute(text( | |
| f'SELECT "{col}", count(*) as cnt FROM "{table}" ' | |
| f'WHERE "{col}" IS NOT NULL ' | |
| f'GROUP BY "{col}" ORDER BY cnt DESC LIMIT 25' | |
| )).fetchall() | |
| if not result: | |
| return None | |
| distinct_count = len(result) | |
| # Only profile if it's truly categorical (not too many unique values) | |
| if distinct_count > 20: | |
| # Check total distinct count | |
| total_distinct = conn.execute(text( | |
| f'SELECT count(DISTINCT "{col}") FROM "{table}" WHERE "{col}" IS NOT NULL' | |
| )).scalar() | |
| if total_distinct > 50: | |
| return f" {col}: {total_distinct} distinct values (high cardinality - not categorical)" | |
| values_str = ", ".join( | |
| f"'{r[0]}' ({r[1]} rows)" for r in result[:15] | |
| ) | |
| return f" {col}: DISTINCT VALUES = [{values_str}]" | |
| def _profile_numeric(conn, table: str, col: str) -> str | None: | |
| """Get min, max, avg for numeric columns.""" | |
| result = conn.execute(text( | |
| f'SELECT min("{col}"), max("{col}"), round(avg("{col}")::numeric, 2) ' | |
| f'FROM "{table}" WHERE "{col}" IS NOT NULL' | |
| )).fetchone() | |
| if not result or result[0] is None: | |
| return None | |
| return f" {col}: min={result[0]}, max={result[1]}, avg={result[2]}" | |
| def _profile_date(conn, table: str, col: str) -> str | None: | |
| """Get date range.""" | |
| result = conn.execute(text( | |
| f'SELECT min("{col}"), max("{col}") ' | |
| f'FROM "{table}" WHERE "{col}" IS NOT NULL' | |
| )).fetchone() | |
| if not result or result[0] is None: | |
| return None | |
| return f" {col}: from {result[0]} to {result[1]}" | |
| def _generate_business_rules(schema: dict[str, list[dict]]) -> str: | |
| """Auto-infer business rules from column patterns across all tables.""" | |
| rules: list[str] = [ | |
| "=" * 60, | |
| "BUSINESS INTELLIGENCE RULES β YOU MUST FOLLOW THESE", | |
| "=" * 60, | |
| ] | |
| # ββ Rule 0: Query type awareness | |
| rules.append("") | |
| rules.append("RULE 0 β KNOW YOUR QUERY TYPE:") | |
| rules.append(" PRODUCT ATTRIBUTE queries (category, name, weight, details):") | |
| rules.append(" β Use product/variant catalog tables directly.") | |
| rules.append(" β No status filter needed.") | |
| rules.append(" PRODUCT PRICE queries (most expensive, cheapest, price lookup):") | |
| rules.append(" β Use sales_order_line_pricing.selling_price_per_unit as source of truth.") | |
| rules.append(" β JOIN to product_master for product_name. GROUP BY to avoid duplicates.") | |
| rules.append(" TRANSACTIONAL queries (revenue, AOV, order counts, sales trends):") | |
| rules.append(" β Use sales tables. MUST filter by sales_order.status = 'closed'.") | |
| rules.append(" β Examples: 'total revenue', 'AOV', 'top customers by spending'") | |
| # ββ Rule 1: Avoiding duplicates | |
| rules.append("") | |
| rules.append("RULE 1 β AVOID DUPLICATE ROWS (CRITICAL):") | |
| rules.append(" When JOINing tables, products may have MULTIPLE variants (different karat, quality, etc.).") | |
| rules.append(" This causes duplicate product names in results.") | |
| rules.append(" ALWAYS use one of these to prevent duplicates:") | |
| rules.append(" - GROUP BY product_id (or product_name) with MAX/MIN/AVG on value columns") | |
| rules.append(" - SELECT DISTINCT when you only need unique values") | |
| rules.append(" - Use subqueries with aggregation before joining") | |
| rules.append(" NEVER return raw joins that produce repeated product names.") | |
| # ββ Rule 2: Product price lookup | |
| rules.append("") | |
| rules.append("RULE 2 β PRODUCT PRICE LOOKUP (SOURCE OF TRUTH):") | |
| rules.append(" The SOURCE OF TRUTH for product prices is the sales_order_line_pricing table.") | |
| rules.append(" It has 'selling_price_per_unit' which is the actual price per 1 unit of a product.") | |
| rules.append(" For 'most expensive products', 'cheapest products', 'product price':") | |
| rules.append(" β Query sales_order_line_pricing and JOIN to product tables for product_name") | |
| rules.append(" β Use selling_price_per_unit (NOT line_total_price, NOT selling_price from catalog)") | |
| rules.append(" β GROUP BY product_id, product_name and use MAX(selling_price_per_unit)") | |
| rules.append(" β Join path: sales_order_line_pricing.product_id = product_master.product_id") | |
| rules.append(" Do NOT use product_variant_summary.selling_price or variant_sku_table.selling_price") | |
| rules.append(" β those are catalog/list prices, not actual transaction prices.") | |
| rules.append(" For 'highest revenue products' or 'best selling products':") | |
| rules.append(" β Use SUM(line_total_price) grouped by product, filtered by status='closed'") | |
| # ββ Rule 3: Status filtering (only for transactional queries) | |
| rules.append("") | |
| rules.append("RULE 3 β STATUS FILTERING (TRANSACTIONAL ONLY):") | |
| rules.append(" The 'status' column on the sales_order table has values: closed, open, cancelled, processing.") | |
| rules.append(" For revenue, AOV, sales counts: WHERE status = 'closed'") | |
| rules.append(" For product catalog queries: NO status filter needed") | |
| rules.append(" IMPORTANT: The 'status' column is ONLY on the sales_order table.") | |
| rules.append(" Do NOT look for payment_status or status on pricing/line tables β it does not exist there.") | |
| # ββ Rule 4: Unit price vs total price | |
| rules.append("") | |
| rules.append("RULE 4 β UNIT PRICE vs TOTAL PRICE:") | |
| rules.append(" line_total_price = selling_price_per_unit Γ quantity (total for order line)") | |
| rules.append(" selling_price_per_unit = the actual price of 1 unit of the product") | |
| rules.append(" base_price_per_unit = cost price of 1 unit before margin") | |
| rules.append(" NEVER use line_total_price as a product's price β it includes quantity.") | |
| rules.append(" To get a product's price: use selling_price_per_unit or selling_price column") | |
| # ββ Rule 5: Common metrics formulas | |
| rules.append("") | |
| rules.append("RULE 5 β METRIC FORMULAS:") | |
| rules.append(" AOV = SUM(so.total_amount) / COUNT(DISTINCT so.so_id) WHERE so.status='closed'") | |
| rules.append(" Revenue = SUM(so.total_amount) WHERE so.status='closed'") | |
| rules.append(" Most Expensive Product = MAX(pvs.selling_price) GROUP BY product_id, product_name") | |
| rules.append(" Margin % = (selling_price - base_price) / selling_price Γ 100") | |
| rules.append(" Order Count = COUNT(DISTINCT so.so_id) WHERE so.status='closed'") | |
| # ββ Rule 6: Table relationships | |
| rules.append("") | |
| rules.append("RULE 6 β TABLE JOIN PATHS:") | |
| rules.append(" Sales chain: sales_order(so_id) β sales_order_line(so_id, sol_id) β sales_order_line_pricing(sol_id)") | |
| rules.append(" Product chain: product_master(product_id) β product_variant_summary(product_id) β variant_sku_table(variant_sku)") | |
| rules.append(" Sales β Product: sales_order_line.variant_sku = variant_sku_table.variant_sku") | |
| rules.append(" Sales β Customer: sales_order.customer_id = customer_master.customer_id") | |
| rules.append(" Sales β Payment: sales_order.so_id = sales_order_payments.so_id") | |
| return "\n".join(rules) | |