# clean_database.py import pandas as pd from sqlalchemy import create_engine, text import logging # Set up a logger that can be configured by the importer logger = logging.getLogger("DBCleaner") def drop_blacklisted_tables(engine): """Drops tables matching the blacklist patterns.""" drop_patterns = [ "bop_annual", "commercial_banks_average_lending_rates", "depository_corporation", "exchange_rates_end_period", "exchange_rates_period_average", "forex_bureau_rates_sheet", "lr_return_template", "nsfr_return_template" ] with engine.connect() as conn: all_tables = [t[0] for t in conn.execute(text("SELECT name FROM sqlite_master WHERE type='table'")).fetchall()] tables_to_drop = [] for t in all_tables: if any(p in t for p in drop_patterns): tables_to_drop.append(t) if not tables_to_drop: logger.info("No tables found matching blacklist patterns.") return logger.info(f"🗑️ Dropping {len(tables_to_drop)} tables...") for t in tables_to_drop: conn.execute(text(f'DROP TABLE "{t}"')) logger.info(f" - Dropped: {t}") conn.commit() def clean_table(engine, table_name, drop_top_rows=0, rename_map=None, rename_by_index=None, static_date=None): """ Generic cleaner for specific table fixes. """ try: # Check if table exists first with engine.connect() as conn: exists = conn.execute(text(f"SELECT name FROM sqlite_master WHERE type='table' AND name='{table_name}'")).scalar() if not exists: logger.warning(f" Table '{table_name}' not found. Skipping.") return df = pd.read_sql(f'SELECT * FROM "{table_name}"', engine) if df.empty: return # Drop columns that are completely empty df = df.dropna(axis=1, how='all') # Drop top rows if requested if drop_top_rows > 0: df = df.iloc[drop_top_rows:].reset_index(drop=True) # Rename by Index (useful for 'col_1', 'col_2') if rename_by_index: curr_cols = list(df.columns) new_cols = curr_cols.copy() for idx, new_name in rename_by_index.items(): if idx < len(curr_cols): new_cols[idx] = new_name df.columns = new_cols # Rename by Map if rename_map: df.rename(columns=rename_map, inplace=True) # Inject Static Date if missing if static_date: if 'date' not in df.columns: df.insert(0, 'date', static_date) else: df['date'] = static_date # Save back to DB (Replace mode) df.to_sql(table_name, engine, if_exists='replace', index=False) logger.info(f" Fixed '{table_name}': {len(df)} rows") except Exception as e: logger.error(f" Error cleaning '{table_name}': {e}") def run_specific_fixes(engine): """Orchestrates the specific cleaning rules.""" logger.info("🔧 Running specific table fixes...") # 1. Historical Rates clean_table(engine, "download_all_historical_rates", rename_by_index={2: "mean_rate", 3: "buy_rate", 4: "sell_rate"}) # 2. Foreign Trade Summary clean_table(engine, "foreign_trade_summary", drop_top_rows=1) # 3. Forex Bureau Rates clean_table(engine, "forex_bureau_rates", rename_map={"bureau_name": "currency"}) # 4. Indicative Rates (Indicative Sheet) clean_table(engine, "indicative_rates_sheet_indicative", static_date="2017-11-16", rename_by_index={0: "currency", 1: "mean_rate", 2: "buy_rate", 3: "sell_rate"}) # 5. Indicative Rates (Press Sheet) clean_table(engine, "indicative_rates_sheet_press", static_date="2017-11-16", rename_by_index={ 0: "bank_name", 1: "usd_buy", 2: "usd_sell", 3: "usd_margin", 4: "gbp_buy", 5: "gbp_sell", 6: "gbp_margin" }) # 6. Selected Domestic Exports clean_table(engine, "value_of_selected_domestic_exports", drop_top_rows=2) # 7. Imports by Commodity clean_table(engine, "value_of_direct_imports_by_commodities", drop_top_rows=1) def clean_database_pipeline(db_name): """Main entry point for external calls.""" connection_str = f"sqlite:///{db_name}" engine = create_engine(connection_str) logger.info(f" Starting cleanup on {db_name}...") drop_blacklisted_tables(engine) run_specific_fixes(engine) logger.info(" Cleanup Complete.") def drop_tables(engine): """Drops the specific list of tables requested.""" tables_to_drop = [ 'forex_bureau_rates', 'forex_bureaus_rates_sheet_chief_dealers', 'forex_bureaus_rates_sheet_director', 'forex_bureaus_rates_sheet_directors', 'forex_bureaus_rates_sheet_fbx', 'forex_bureaus_rates_sheet_fbx1', 'forex_bureaus_rates_sheet_fbx2', 'forex_bureaus_rates_sheet_fxb1', 'forex_bureaus_rates_sheet_fxb2', 'forex_bureaus_rates_sheet_fxb22', 'forex_bureaus_rates_sheet_market_intelligence', 'forex_bureaus_rates_sheet_sheet1', 'forex_bureaus_rates_sheet_sheet2', 'forex_bureaus_rates_sheet_sheet3', 'forex_bureaus_rates_sheet_sheet4', 'issues_of_treasury_bills', 'issues_of_treasury_bonds' ] print("🗑️ Dropping Tables...") with engine.connect() as conn: for t in tables_to_drop: try: conn.execute(text(f'DROP TABLE IF EXISTS "{t}"')) print(f" - Dropped: {t}") except Exception as e: print(f" Could not drop {t}: {e}") conn.commit() def fix_foreign_trade(engine): """Renames first column to 'year'.""" table_name = "foreign_trade_summary" try: df = pd.read_sql(f'SELECT * FROM "{table_name}"', engine) if 'kenyan_shillings_million_year' in df.columns: df.rename(columns={'kenyan_shillings_million_year': 'year'}, inplace=True) df.to_sql(table_name, engine, if_exists='replace', index=False) print(f" Fixed '{table_name}': Renamed 'year' column.") else: print(f" '{table_name}': Target column not found.") except Exception as e: print(f" Error fixing {table_name}: {e}") def fix_indicative_rates_shift(engine): """ Applies the 'Shift Right + Fixed Date' logic. Inserts 2017-11-16 at position 0, shifting existing data to the right. """ targets = [ "indicative_rates_sheet_indicative", "indicative_rates_sheet_press" ] fixed_date = "2017-11-16" for table in targets: try: df = pd.read_sql(f'SELECT * FROM "{table}"', engine) if df.empty: continue # Logic: Insert new date column at index 0 # This effectively "shifts" the old col 0 to col 1 df.insert(0, 'fixed_date', fixed_date) # Rename columns to reflect the shift clearly # We assume the user wants standard names for the shifted data # Adjust names based on the table type new_columns = list(df.columns) new_columns[0] = "date" # The new fixed column # Assigning generic or specific headers for the shifted data if "press" in table: # Based on previous prompt instructions for Press sheet: # Bank, USD_Buy, USD_Sell, USD_Margin, GBP_Buy... expected_headers = ["date", "bank_name", "usd_buy", "usd_sell", "usd_margin", "gbp_buy", "gbp_sell", "gbp_margin", "euro_buy", "euro_sell", "euro_margin"] else: # Indicative sheet: Currency, Mean, Buy, Sell expected_headers = ["date", "currency", "mean_rate", "buy_rate", "sell_rate"] # Map headers safely (truncate if df has fewer cols, pad if more) final_cols = expected_headers + [f"col_{i}" for i in range(len(df.columns) - len(expected_headers))] df.columns = final_cols[:len(df.columns)] # Clean up: Drop any old 'date' column if it was pushed to the right and is duplicate/garbage # (Optional, but safer to keep strictly what we shifted) df.to_sql(table, engine, if_exists='replace', index=False) print(f" Fixed '{table}': Applied Date Shift & Header Rename.") except Exception as e: print(f" Error fixing {table}: {e}") def fix_cbk_indicative_swap(engine): """Swaps 'date' and 'currency' column names.""" table_name = "cbk_indicative_rates" try: df = pd.read_sql(f'SELECT * FROM "{table_name}"', engine) rename_map = {} if 'date' in df.columns: rename_map['date'] = 'currency' if 'currency' in df.columns: rename_map['currency'] = 'date' if rename_map: df.rename(columns=rename_map, inplace=True) df.to_sql(table_name, engine, if_exists='replace', index=False) print(f" Fixed '{table_name}': Swapped 'date' <-> 'currency'.") except Exception as e: print(f" Error fixing {table_name}: {e}")