File size: 8,404 Bytes
7011b92
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
import os
import re
import json
import io
import time
import logging
import requests
import subprocess
import pandas as pd
from pathlib import Path
from sqlalchemy import create_engine
from ollama import Client

# --- LOGGING ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s')
logger = logging.getLogger("KNBS_Ingest")

# --- 1. INFRASTRUCTURE ---

def _manage_ollama_server(ollama_host, ollama_port, ollama_bin, model):
    """Ensures Ollama is running (reuses existing logic)."""
    try:
        if requests.get(ollama_host).status_code == 200:
            logger.info(" Ollama connected.")
            return True
    except: pass

    logger.info(f"๐Ÿš€ Starting Ollama ({model})...")
    scratch_env = os.environ.get("SCRATCH", "/tmp")
    models_dir = Path(scratch_env) / "ollama_core/models"
    
    server_env = os.environ.copy()
    server_env["OLLAMA_HOST"] = f"127.0.0.1:{ollama_port}"
    server_env["OLLAMA_MODELS"] = str(models_dir)
    
    try:
        subprocess.Popen([str(ollama_bin), "serve"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, env=server_env)
        time.sleep(5)
        subprocess.run([str(ollama_bin), "pull", model], env=server_env, check=True)
        return True
    except Exception as e:
        logger.error(f" Server Error: {e}")
        return False

# --- 2. MARKDOWN PARSING ENGINE ---

def extract_tables_from_markdown(md_content: str) -> list[pd.DataFrame]:
    """
    Scans markdown text for pipe-delimited tables (| col | col |) 
    and converts them to Pandas DataFrames.
    """
    tables = []
    lines = md_content.split('\n')
    buffer = []
    inside_table = False
    
    for line in lines:
        stripped = line.strip()
        # Detect table lines (must start and end with |)
        if stripped.startswith('|') and stripped.endswith('|'):
            inside_table = True
            buffer.append(stripped)
        else:
            if inside_table:
                # Table block ended, process buffer
                if buffer:
                    try:
                        table_str = '\n'.join(buffer)
                        # Read using pandas, handling markdown separators
                        df = pd.read_csv(
                            io.StringIO(table_str), 
                            sep="|", 
                            skipinitialspace=True, 
                            engine='python'
                        )
                        
                        # CLEANUP PANDAS ARTIFACTS
                        # 1. Drop empty columns (pandas creates empty cols for leading/trailing pipes)
                        df = df.dropna(axis=1, how='all')
                        
                        # 2. Filter out the markdown divider row (e.g. ---|---|---)
                        if not df.empty:
                            df = df[~df.iloc[:,0].astype(str).str.contains('---', regex=False)]
                            
                        if not df.empty and len(df.columns) > 1:
                            tables.append(df)
                            
                    except Exception as e:
                        logger.warning(f"Failed to parse a table block: {e}")
                
                buffer = []
                inside_table = False
                
    return tables

# --- 3. LLM HEADER CLEANER (KNBS SPECIFIC) ---

def clean_knbs_headers(df: pd.DataFrame, filename: str, table_index: int, client: Client, model: str) -> pd.DataFrame:
    """
    Uses LLM to sanitize headers, handling split headers common in PDF-to-Markdown.
    """
    raw_headers = [str(c).strip() for c in df.columns]
    
    # Context: Provide first 2 rows to help identify if headers are split across rows
    data_preview = df.head(2).astype(str).values.tolist()
    
    prompt = f"""
    You are a Data Engineer cleaning Kenya National Bureau of Statistics (KNBS) data.
    
    Source File: "{filename}"
    Table Index: {table_index}
    
    Current Headers: {raw_headers}
    Data Preview (First 2 Rows): {data_preview}
    
    Task: Return a list of {len(raw_headers)} clean, snake_case SQL column names.
    
    RULES:
    1. INFER MEANING: If header is "Gross" and Row 1 is "Domestic Product", the column name is "gdp".
    2. HANDLE YEARS: If headers are "2019", "2020", keep as "year_2019".
    3. HANDLE GARBAGE: If header is "Unnamed: 1" look at Data Preview. If it contains items like "Agriculture", name it "sector".
    4. KNBS reports often have a "Total" column. Ensure it is named "total".
    
    Respond ONLY with a JSON list of strings.
    """
    
    try:
        res = client.chat(model=model, messages=[{'role': 'user', 'content': prompt}], format='json')
        new_headers = json.loads(res['message']['content'])
        
        # Handle dictionary wrapper if LLM returns {"headers": [...]}
        if isinstance(new_headers, dict):
            for val in new_headers.values():
                if isinstance(val, list): 
                    new_headers = val
                    break
        
        # Validation: Length must match
        if isinstance(new_headers, list) and len(new_headers) == len(df.columns):
            df.columns = new_headers
        else:
            # Fallback: keep originals but snake_case them
            df.columns = [re.sub(r'[^a-zA-Z0-9]', '_', str(c).strip()).lower() for c in df.columns]
            
    except Exception as e:
        logger.warning(f"LLM Header clean failed (Table {table_index}): {e}")
        
    return df

# --- 4. MAIN PIPELINE EXPORT ---

def ingest_knbs_data(input_dir: str, db_name: str, model: str = "qwen2.5:14b"):
    """
    Main entry point to run the KNBS ingestion pipeline.
    Recursively scans input_dir for all .md files.
    """
    # Paths
    SCRATCH = os.environ.get("SCRATCH", "/tmp")
    BASE_DIR = Path(SCRATCH)
    
    INPUT_PATH = Path(input_dir)
    if not INPUT_PATH.exists():
        INPUT_PATH = BASE_DIR / input_dir
    
    if not INPUT_PATH.exists():
        logger.error(f" Input directory not found: {INPUT_PATH}")
        return

    OLLAMA_BIN = BASE_DIR / "ollama_core/bin/ollama"
    CUSTOM_PORT = "25000"
    OLLAMA_HOST = f"http://127.0.0.1:{CUSTOM_PORT}"
    
    # Infrastructure
    if not _manage_ollama_server(OLLAMA_HOST, CUSTOM_PORT, OLLAMA_BIN, model): return
    
    engine = create_engine(f"sqlite:///{db_name}")
    client = Client(host=OLLAMA_HOST)
    
    # Process Files (RECURSIVE SEARCH using rglob)
    files = sorted(list(INPUT_PATH.rglob("*.md")))
    logger.info(f"๐Ÿš€ Found {len(files)} KNBS markdown files (Recursive Scan). Starting ingestion...")
    
    for f in files:
        logger.info(f"๐Ÿ“„ Processing {f.name}...")
        try:
            with open(f, 'r', encoding='utf-8', errors='ignore') as file:
                content = file.read()
            
            # A. Extract Tables
            dfs = extract_tables_from_markdown(content)
            
            if not dfs:
                continue
                
            logger.info(f"   found {len(dfs)} tables.")
            
            # B. Clean & Load Tables
            for i, df in enumerate(dfs):
                # Basic cleanup
                df = df.dropna(how='all', axis=1).dropna(how='all', axis=0)
                if df.empty or len(df) < 2: continue # Skip empty/tiny tables
                
                # LLM Semantic Cleaning
                df = clean_knbs_headers(df, f.name, i, client, model)
                
                # Sanitize numeric data
                for c in df.columns:
                    if any(x in str(c).lower() for x in ['rate', 'value', 'amount', 'total', 'year', 'price']):
                        df[c] = df[c].apply(lambda x: pd.to_numeric(str(x).replace(',', '').replace('%',''), errors='ignore'))

                # Naming: knbs_{filename_slug}_tab{index}
                slug = re.sub(r'[^a-zA-Z0-9]', '_', f.stem).lower()[:40].lstrip('_')
                table_name = f"{slug}_tab{i}"
                
                df['source_file'] = f.name
                
                df.to_sql(table_name, engine, if_exists='replace', index=False)
                logger.info(f"     -> Saved table: {table_name} ({len(df)} rows)")
                
        except Exception as e:
            logger.error(f"   Failed {f.name}: {e}")

    logger.info(" KNBS Ingestion Complete.")