Spaces:
Sleeping
Sleeping
| """ | |
| Auto-Ontology — Automotive Market Intelligence | |
| HuggingFace Space: Gradio Blocks with pipeline narrative + Strands Agent chatbot. | |
| """ | |
| import os | |
| from pathlib import Path | |
| import gradio as gr | |
| import pandas as pd | |
| # --------------------------------------------------------------------------- | |
| # Load dataset (parquet files from the HF dataset repo) | |
| # --------------------------------------------------------------------------- | |
| DATA_DIR = Path(__file__).parent / "data" | |
| HF_DATASET = "cp500/auto-ontology" | |
| def _load_parquet(subdir: str, table: str) -> pd.DataFrame | None: | |
| """Load a parquet table — try local first, then HF hub.""" | |
| local = DATA_DIR / subdir / f"{table}.parquet" | |
| if local.exists(): | |
| return pd.read_parquet(local) | |
| try: | |
| return pd.read_parquet(f"hf://datasets/{HF_DATASET}/data/{subdir}/{table}.parquet") | |
| except Exception: | |
| return None | |
| # Lazy-load dataframes | |
| _cache: dict[str, pd.DataFrame | None] = {} | |
| def get_df(subdir: str, table: str) -> pd.DataFrame: | |
| key = f"{subdir}/{table}" | |
| if key not in _cache: | |
| _cache[key] = _load_parquet(subdir, table) | |
| df = _cache[key] | |
| if df is None: | |
| raise ValueError(f"Table {key} not available") | |
| return df | |
| # --------------------------------------------------------------------------- | |
| # Strands Agent tools — query the parquet dataset | |
| # --------------------------------------------------------------------------- | |
| from strands import Agent, tool | |
| from strands.models.openai import OpenAIModel | |
| def search_products(make: str = "", model: str = "", year: int = 0) -> str: | |
| """Search the product index for vehicles by make, model, and/or year. | |
| Returns matching vehicles with their IDs, make, model, year, and body class.""" | |
| df = get_df("hypergraph", "product_index") | |
| mask = pd.Series(True, index=df.index) | |
| if make: | |
| mask &= df["make"].str.contains(make, case=False, na=False) | |
| if model: | |
| mask &= df["model"].str.contains(model, case=False, na=False) | |
| if year: | |
| mask &= df["model_year"] == year | |
| results = df[mask].head(20) | |
| if results.empty: | |
| return "No products found matching the criteria." | |
| return results.to_markdown(index=False) | |
| def browse_signals(domain: str = "", sentiment: str = "", keyword: str = "") -> str: | |
| """Browse market signals. Filter by L1 domain code (P/T/C/F/S/R/M/ST), | |
| sentiment (bullish/bearish/neutral/mixed), or keyword in signal name. | |
| Returns up to 15 matching signals.""" | |
| si = get_df("hypergraph", "signal_index") | |
| nodes = get_df("hypergraph", "nodes") | |
| # Merge to get signal names | |
| signals = si.merge(nodes[nodes["node_type"] == "Signal"][["id", "name"]], | |
| left_on="signal_id", right_on="id", how="left") | |
| mask = pd.Series(True, index=signals.index) | |
| if domain: | |
| mask &= signals["domain"].str.upper() == domain.upper() | |
| if sentiment: | |
| mask &= signals["sentiment"].str.lower() == sentiment.lower() | |
| if keyword: | |
| mask &= signals["name"].str.contains(keyword, case=False, na=False) | |
| results = signals[mask][["signal_id", "name", "domain", "subdomain", | |
| "sentiment", "impact", "timestamp"]].head(15) | |
| if results.empty: | |
| return "No signals found matching the criteria." | |
| return results.to_markdown(index=False) | |
| def get_competitors(product_id: str) -> str: | |
| """Get vehicles that compete with a given product. | |
| Takes a product_id like 'prd_tesla_model_y_2024' and returns competing vehicles.""" | |
| edges = get_df("hypergraph", "edges") | |
| products = get_df("hypergraph", "product_index") | |
| # Find COMPETES_WITH edges in both directions | |
| compete = edges[edges["role"] == "COMPETES_WITH"] | |
| as_source = compete[compete["source_id"] == product_id]["target_id"] | |
| as_target = compete[compete["target_id"] == product_id]["source_id"] | |
| competitor_ids = pd.concat([as_source, as_target]).unique() | |
| if len(competitor_ids) == 0: | |
| return f"No competitors found for {product_id}." | |
| results = products[products["product_id"].isin(competitor_ids)] | |
| return f"Competitors of {product_id}:\n\n{results.to_markdown(index=False)}" | |
| def graph_stats() -> str: | |
| """Get summary statistics of the auto-ontology hypergraph — | |
| node counts by type, edge counts by role, signal domain distribution, etc.""" | |
| nodes = get_df("hypergraph", "nodes") | |
| edges = get_df("hypergraph", "edges") | |
| si = get_df("hypergraph", "signal_index") | |
| node_counts = nodes["node_type"].value_counts().to_dict() | |
| edge_counts = edges["role"].value_counts().to_dict() | |
| domain_counts = si["domain"].value_counts().to_dict() | |
| sentiment_counts = si["sentiment"].value_counts().to_dict() | |
| lines = [ | |
| "## Hypergraph Statistics\n", | |
| f"**Total nodes:** {len(nodes):,}", | |
| f"**Total edges:** {len(edges):,}\n", | |
| "### Node Types", | |
| ] | |
| for t, c in sorted(node_counts.items(), key=lambda x: -x[1]): | |
| lines.append(f"- {t}: {c:,}") | |
| lines.append("\n### Edge Roles") | |
| for r, c in sorted(edge_counts.items(), key=lambda x: -x[1]): | |
| lines.append(f"- {r}: {c:,}") | |
| lines.append("\n### Signal Domains (L1)") | |
| domain_names = { | |
| "P": "Product", "C": "Competitive", "T": "Technology", "M": "Market", | |
| "F": "Financial", "S": "Supply Chain", "R": "Regulatory", "ST": "Strategic", | |
| } | |
| for d, c in sorted(domain_counts.items(), key=lambda x: -x[1]): | |
| lines.append(f"- {d} ({domain_names.get(d, d)}): {c:,}") | |
| lines.append("\n### Signal Sentiment") | |
| for s, c in sorted(sentiment_counts.items(), key=lambda x: -x[1]): | |
| lines.append(f"- {s}: {c:,}") | |
| return "\n".join(lines) | |
| # --------------------------------------------------------------------------- | |
| # Build Strands Agent | |
| # --------------------------------------------------------------------------- | |
| SYSTEM_PROMPT = """\ | |
| You are an automotive market intelligence analyst with access to the Auto-Ontology \ | |
| hypergraph — 176K nodes and 537K edges connecting 94,671 market signals to 1,261 vehicles. | |
| The data was extracted from Common Crawl and resolved against the NHTSA vPIC registry. | |
| Use your tools to search products, browse signals, find competitors, and get graph stats. \ | |
| When answering, cite specific data from the tools. Be concise and analytical. | |
| Signal domains: P (Product), T (Technology), C (Competitive), F (Financial), \ | |
| S (Supply Chain), R (Regulatory), M (Market), ST (Strategic). | |
| Sentiments: bullish, bearish, neutral, mixed. | |
| """ | |
| def _build_agent(): | |
| """Build the Strands agent with HF Inference API.""" | |
| hf_token = os.environ.get("HF_TOKEN", "") | |
| model = OpenAIModel( | |
| client_args={ | |
| "base_url": "https://router.huggingface.co/v1/", | |
| "api_key": hf_token, | |
| }, | |
| model_id="Qwen/Qwen2.5-72B-Instruct", | |
| ) | |
| return Agent( | |
| model=model, | |
| tools=[search_products, browse_signals, get_competitors, graph_stats], | |
| system_prompt=SYSTEM_PROMPT, | |
| ) | |
| _agent = None | |
| def get_agent(): | |
| global _agent | |
| if _agent is None: | |
| _agent = _build_agent() | |
| return _agent | |
| # --------------------------------------------------------------------------- | |
| # Chat handler | |
| # --------------------------------------------------------------------------- | |
| def chat_fn(message: str, history: list[dict]) -> str: | |
| """Handle a chat message using the Strands agent.""" | |
| try: | |
| agent = get_agent() | |
| result = agent(message) | |
| return str(result) | |
| except Exception as e: | |
| return f"Error: {e}\n\nMake sure the HF_TOKEN secret is configured in Space settings." | |
| # --------------------------------------------------------------------------- | |
| # Pipeline narrative HTML | |
| # --------------------------------------------------------------------------- | |
| PIPELINE_HTML_PATH = Path(__file__).parent / "pipeline.html" | |
| def load_pipeline_html() -> str: | |
| if PIPELINE_HTML_PATH.exists(): | |
| html = PIPELINE_HTML_PATH.read_text() | |
| # Wrap in iframe for isolation | |
| import base64 | |
| encoded = base64.b64encode(html.encode()).decode() | |
| return f'<iframe src="data:text/html;base64,{encoded}" width="100%" height="900" style="border:none; border-radius:12px;"></iframe>' | |
| return "<p>Pipeline narrative not found. Check pipeline.html.</p>" | |
| # --------------------------------------------------------------------------- | |
| # Gradio App | |
| # --------------------------------------------------------------------------- | |
| DESCRIPTION = """\ | |
| # Auto-Ontology — Automotive Market Intelligence | |
| Explore a hypergraph of **94,671 market signals** connected to **1,261 vehicles** \ | |
| from the NHTSA vPIC registry. Built from Common Crawl data using an AWS pipeline \ | |
| with NuExtract structured extraction and vPIC entity resolution. | |
| """ | |
| with gr.Blocks( | |
| title="Auto-Ontology", | |
| theme=gr.themes.Base( | |
| primary_hue="indigo", | |
| secondary_hue="emerald", | |
| neutral_hue="slate", | |
| ), | |
| ) as demo: | |
| gr.Markdown(DESCRIPTION) | |
| with gr.Tabs(): | |
| with gr.Tab("The Pipeline"): | |
| gr.HTML(load_pipeline_html()) | |
| with gr.Tab("Ask the Ontology"): | |
| gr.Markdown( | |
| "Chat with a **Strands Agent** that can search products, " | |
| "browse market signals, find competitors, and query graph statistics." | |
| ) | |
| gr.ChatInterface( | |
| fn=chat_fn, | |
| type="messages", | |
| examples=[ | |
| "What are the graph statistics?", | |
| "Search for Tesla vehicles in the dataset", | |
| "Show me bearish signals in the technology domain", | |
| "What competes with the Tesla Model Y 2024?", | |
| "Find signals about battery technology", | |
| ], | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() | |