Spaces:
Running
Running
| import os | |
| import re | |
| import json | |
| import io | |
| import time | |
| import logging | |
| import requests | |
| import subprocess | |
| import pandas as pd | |
| from pathlib import Path | |
| from sqlalchemy import create_engine | |
| from ollama import Client | |
| # --- LOGGING --- | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s') | |
| logger = logging.getLogger("KNBS_Ingest") | |
| # --- 1. INFRASTRUCTURE --- | |
| def _manage_ollama_server(ollama_host, ollama_port, ollama_bin, model): | |
| """Ensures Ollama is running (reuses existing logic).""" | |
| try: | |
| if requests.get(ollama_host).status_code == 200: | |
| logger.info(" Ollama connected.") | |
| return True | |
| except: pass | |
| logger.info(f"🚀 Starting Ollama ({model})...") | |
| scratch_env = os.environ.get("SCRATCH", "/tmp") | |
| models_dir = Path(scratch_env) / "ollama_core/models" | |
| server_env = os.environ.copy() | |
| server_env["OLLAMA_HOST"] = f"127.0.0.1:{ollama_port}" | |
| server_env["OLLAMA_MODELS"] = str(models_dir) | |
| try: | |
| subprocess.Popen([str(ollama_bin), "serve"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, env=server_env) | |
| time.sleep(5) | |
| subprocess.run([str(ollama_bin), "pull", model], env=server_env, check=True) | |
| return True | |
| except Exception as e: | |
| logger.error(f" Server Error: {e}") | |
| return False | |
| # --- 2. MARKDOWN PARSING ENGINE --- | |
| def extract_tables_from_markdown(md_content: str) -> list[pd.DataFrame]: | |
| """ | |
| Scans markdown text for pipe-delimited tables (| col | col |) | |
| and converts them to Pandas DataFrames. | |
| """ | |
| tables = [] | |
| lines = md_content.split('\n') | |
| buffer = [] | |
| inside_table = False | |
| for line in lines: | |
| stripped = line.strip() | |
| # Detect table lines (must start and end with |) | |
| if stripped.startswith('|') and stripped.endswith('|'): | |
| inside_table = True | |
| buffer.append(stripped) | |
| else: | |
| if inside_table: | |
| # Table block ended, process buffer | |
| if buffer: | |
| try: | |
| table_str = '\n'.join(buffer) | |
| # Read using pandas, handling markdown separators | |
| df = pd.read_csv( | |
| io.StringIO(table_str), | |
| sep="|", | |
| skipinitialspace=True, | |
| engine='python' | |
| ) | |
| # CLEANUP PANDAS ARTIFACTS | |
| # 1. Drop empty columns (pandas creates empty cols for leading/trailing pipes) | |
| df = df.dropna(axis=1, how='all') | |
| # 2. Filter out the markdown divider row (e.g. ---|---|---) | |
| if not df.empty: | |
| df = df[~df.iloc[:,0].astype(str).str.contains('---', regex=False)] | |
| if not df.empty and len(df.columns) > 1: | |
| tables.append(df) | |
| except Exception as e: | |
| logger.warning(f"Failed to parse a table block: {e}") | |
| buffer = [] | |
| inside_table = False | |
| return tables | |
| # --- 3. LLM HEADER CLEANER (KNBS SPECIFIC) --- | |
| def clean_knbs_headers(df: pd.DataFrame, filename: str, table_index: int, client: Client, model: str) -> pd.DataFrame: | |
| """ | |
| Uses LLM to sanitize headers, handling split headers common in PDF-to-Markdown. | |
| """ | |
| raw_headers = [str(c).strip() for c in df.columns] | |
| # Context: Provide first 2 rows to help identify if headers are split across rows | |
| data_preview = df.head(2).astype(str).values.tolist() | |
| prompt = f""" | |
| You are a Data Engineer cleaning Kenya National Bureau of Statistics (KNBS) data. | |
| Source File: "{filename}" | |
| Table Index: {table_index} | |
| Current Headers: {raw_headers} | |
| Data Preview (First 2 Rows): {data_preview} | |
| Task: Return a list of {len(raw_headers)} clean, snake_case SQL column names. | |
| RULES: | |
| 1. INFER MEANING: If header is "Gross" and Row 1 is "Domestic Product", the column name is "gdp". | |
| 2. HANDLE YEARS: If headers are "2019", "2020", keep as "year_2019". | |
| 3. HANDLE GARBAGE: If header is "Unnamed: 1" look at Data Preview. If it contains items like "Agriculture", name it "sector". | |
| 4. KNBS reports often have a "Total" column. Ensure it is named "total". | |
| Respond ONLY with a JSON list of strings. | |
| """ | |
| try: | |
| res = client.chat(model=model, messages=[{'role': 'user', 'content': prompt}], format='json') | |
| new_headers = json.loads(res['message']['content']) | |
| # Handle dictionary wrapper if LLM returns {"headers": [...]} | |
| if isinstance(new_headers, dict): | |
| for val in new_headers.values(): | |
| if isinstance(val, list): | |
| new_headers = val | |
| break | |
| # Validation: Length must match | |
| if isinstance(new_headers, list) and len(new_headers) == len(df.columns): | |
| df.columns = new_headers | |
| else: | |
| # Fallback: keep originals but snake_case them | |
| df.columns = [re.sub(r'[^a-zA-Z0-9]', '_', str(c).strip()).lower() for c in df.columns] | |
| except Exception as e: | |
| logger.warning(f"LLM Header clean failed (Table {table_index}): {e}") | |
| return df | |
| # --- 4. MAIN PIPELINE EXPORT --- | |
| def ingest_knbs_data(input_dir: str, db_name: str, model: str = "qwen2.5:14b"): | |
| """ | |
| Main entry point to run the KNBS ingestion pipeline. | |
| Recursively scans input_dir for all .md files. | |
| """ | |
| # Paths | |
| SCRATCH = os.environ.get("SCRATCH", "/tmp") | |
| BASE_DIR = Path(SCRATCH) | |
| INPUT_PATH = Path(input_dir) | |
| if not INPUT_PATH.exists(): | |
| INPUT_PATH = BASE_DIR / input_dir | |
| if not INPUT_PATH.exists(): | |
| logger.error(f" Input directory not found: {INPUT_PATH}") | |
| return | |
| OLLAMA_BIN = BASE_DIR / "ollama_core/bin/ollama" | |
| CUSTOM_PORT = "25000" | |
| OLLAMA_HOST = f"http://127.0.0.1:{CUSTOM_PORT}" | |
| # Infrastructure | |
| if not _manage_ollama_server(OLLAMA_HOST, CUSTOM_PORT, OLLAMA_BIN, model): return | |
| engine = create_engine(f"sqlite:///{db_name}") | |
| client = Client(host=OLLAMA_HOST) | |
| # Process Files (RECURSIVE SEARCH using rglob) | |
| files = sorted(list(INPUT_PATH.rglob("*.md"))) | |
| logger.info(f"🚀 Found {len(files)} KNBS markdown files (Recursive Scan). Starting ingestion...") | |
| for f in files: | |
| logger.info(f"📄 Processing {f.name}...") | |
| try: | |
| with open(f, 'r', encoding='utf-8', errors='ignore') as file: | |
| content = file.read() | |
| # A. Extract Tables | |
| dfs = extract_tables_from_markdown(content) | |
| if not dfs: | |
| continue | |
| logger.info(f" found {len(dfs)} tables.") | |
| # B. Clean & Load Tables | |
| for i, df in enumerate(dfs): | |
| # Basic cleanup | |
| df = df.dropna(how='all', axis=1).dropna(how='all', axis=0) | |
| if df.empty or len(df) < 2: continue # Skip empty/tiny tables | |
| # LLM Semantic Cleaning | |
| df = clean_knbs_headers(df, f.name, i, client, model) | |
| # Sanitize numeric data | |
| for c in df.columns: | |
| if any(x in str(c).lower() for x in ['rate', 'value', 'amount', 'total', 'year', 'price']): | |
| df[c] = df[c].apply(lambda x: pd.to_numeric(str(x).replace(',', '').replace('%',''), errors='ignore')) | |
| # Naming: knbs_{filename_slug}_tab{index} | |
| slug = re.sub(r'[^a-zA-Z0-9]', '_', f.stem).lower()[:40].lstrip('_') | |
| table_name = f"{slug}_tab{i}" | |
| df['source_file'] = f.name | |
| df.to_sql(table_name, engine, if_exists='replace', index=False) | |
| logger.info(f" -> Saved table: {table_name} ({len(df)} rows)") | |
| except Exception as e: | |
| logger.error(f" Failed {f.name}: {e}") | |
| logger.info(" KNBS Ingestion Complete.") |