mshauri-fedha / src /load /clean_db.py
teofizzy's picture
prototype stage
7011b92
# clean_database.py
import pandas as pd
from sqlalchemy import create_engine, text
import logging
# Set up a logger that can be configured by the importer
logger = logging.getLogger("DBCleaner")
def drop_blacklisted_tables(engine):
"""Drops tables matching the blacklist patterns."""
drop_patterns = [
"bop_annual",
"commercial_banks_average_lending_rates",
"depository_corporation",
"exchange_rates_end_period",
"exchange_rates_period_average",
"forex_bureau_rates_sheet",
"lr_return_template",
"nsfr_return_template"
]
with engine.connect() as conn:
all_tables = [t[0] for t in conn.execute(text("SELECT name FROM sqlite_master WHERE type='table'")).fetchall()]
tables_to_drop = []
for t in all_tables:
if any(p in t for p in drop_patterns):
tables_to_drop.append(t)
if not tables_to_drop:
logger.info("No tables found matching blacklist patterns.")
return
logger.info(f"🗑️ Dropping {len(tables_to_drop)} tables...")
for t in tables_to_drop:
conn.execute(text(f'DROP TABLE "{t}"'))
logger.info(f" - Dropped: {t}")
conn.commit()
def clean_table(engine, table_name, drop_top_rows=0, rename_map=None, rename_by_index=None, static_date=None):
"""
Generic cleaner for specific table fixes.
"""
try:
# Check if table exists first
with engine.connect() as conn:
exists = conn.execute(text(f"SELECT name FROM sqlite_master WHERE type='table' AND name='{table_name}'")).scalar()
if not exists:
logger.warning(f" Table '{table_name}' not found. Skipping.")
return
df = pd.read_sql(f'SELECT * FROM "{table_name}"', engine)
if df.empty: return
# Drop columns that are completely empty
df = df.dropna(axis=1, how='all')
# Drop top rows if requested
if drop_top_rows > 0:
df = df.iloc[drop_top_rows:].reset_index(drop=True)
# Rename by Index (useful for 'col_1', 'col_2')
if rename_by_index:
curr_cols = list(df.columns)
new_cols = curr_cols.copy()
for idx, new_name in rename_by_index.items():
if idx < len(curr_cols):
new_cols[idx] = new_name
df.columns = new_cols
# Rename by Map
if rename_map:
df.rename(columns=rename_map, inplace=True)
# Inject Static Date if missing
if static_date:
if 'date' not in df.columns:
df.insert(0, 'date', static_date)
else:
df['date'] = static_date
# Save back to DB (Replace mode)
df.to_sql(table_name, engine, if_exists='replace', index=False)
logger.info(f" Fixed '{table_name}': {len(df)} rows")
except Exception as e:
logger.error(f" Error cleaning '{table_name}': {e}")
def run_specific_fixes(engine):
"""Orchestrates the specific cleaning rules."""
logger.info("🔧 Running specific table fixes...")
# 1. Historical Rates
clean_table(engine, "download_all_historical_rates",
rename_by_index={2: "mean_rate", 3: "buy_rate", 4: "sell_rate"})
# 2. Foreign Trade Summary
clean_table(engine, "foreign_trade_summary", drop_top_rows=1)
# 3. Forex Bureau Rates
clean_table(engine, "forex_bureau_rates",
rename_map={"bureau_name": "currency"})
# 4. Indicative Rates (Indicative Sheet)
clean_table(engine, "indicative_rates_sheet_indicative",
static_date="2017-11-16",
rename_by_index={0: "currency", 1: "mean_rate", 2: "buy_rate", 3: "sell_rate"})
# 5. Indicative Rates (Press Sheet)
clean_table(engine, "indicative_rates_sheet_press",
static_date="2017-11-16",
rename_by_index={
0: "bank_name",
1: "usd_buy", 2: "usd_sell", 3: "usd_margin",
4: "gbp_buy", 5: "gbp_sell", 6: "gbp_margin"
})
# 6. Selected Domestic Exports
clean_table(engine, "value_of_selected_domestic_exports", drop_top_rows=2)
# 7. Imports by Commodity
clean_table(engine, "value_of_direct_imports_by_commodities", drop_top_rows=1)
def clean_database_pipeline(db_name):
"""Main entry point for external calls."""
connection_str = f"sqlite:///{db_name}"
engine = create_engine(connection_str)
logger.info(f" Starting cleanup on {db_name}...")
drop_blacklisted_tables(engine)
run_specific_fixes(engine)
logger.info(" Cleanup Complete.")
def drop_tables(engine):
"""Drops the specific list of tables requested."""
tables_to_drop = [
'forex_bureau_rates',
'forex_bureaus_rates_sheet_chief_dealers',
'forex_bureaus_rates_sheet_director',
'forex_bureaus_rates_sheet_directors',
'forex_bureaus_rates_sheet_fbx',
'forex_bureaus_rates_sheet_fbx1',
'forex_bureaus_rates_sheet_fbx2',
'forex_bureaus_rates_sheet_fxb1',
'forex_bureaus_rates_sheet_fxb2',
'forex_bureaus_rates_sheet_fxb22',
'forex_bureaus_rates_sheet_market_intelligence',
'forex_bureaus_rates_sheet_sheet1',
'forex_bureaus_rates_sheet_sheet2',
'forex_bureaus_rates_sheet_sheet3',
'forex_bureaus_rates_sheet_sheet4',
'issues_of_treasury_bills',
'issues_of_treasury_bonds'
]
print("🗑️ Dropping Tables...")
with engine.connect() as conn:
for t in tables_to_drop:
try:
conn.execute(text(f'DROP TABLE IF EXISTS "{t}"'))
print(f" - Dropped: {t}")
except Exception as e:
print(f" Could not drop {t}: {e}")
conn.commit()
def fix_foreign_trade(engine):
"""Renames first column to 'year'."""
table_name = "foreign_trade_summary"
try:
df = pd.read_sql(f'SELECT * FROM "{table_name}"', engine)
if 'kenyan_shillings_million_year' in df.columns:
df.rename(columns={'kenyan_shillings_million_year': 'year'}, inplace=True)
df.to_sql(table_name, engine, if_exists='replace', index=False)
print(f" Fixed '{table_name}': Renamed 'year' column.")
else:
print(f" '{table_name}': Target column not found.")
except Exception as e:
print(f" Error fixing {table_name}: {e}")
def fix_indicative_rates_shift(engine):
"""
Applies the 'Shift Right + Fixed Date' logic.
Inserts 2017-11-16 at position 0, shifting existing data to the right.
"""
targets = [
"indicative_rates_sheet_indicative",
"indicative_rates_sheet_press"
]
fixed_date = "2017-11-16"
for table in targets:
try:
df = pd.read_sql(f'SELECT * FROM "{table}"', engine)
if df.empty: continue
# Logic: Insert new date column at index 0
# This effectively "shifts" the old col 0 to col 1
df.insert(0, 'fixed_date', fixed_date)
# Rename columns to reflect the shift clearly
# We assume the user wants standard names for the shifted data
# Adjust names based on the table type
new_columns = list(df.columns)
new_columns[0] = "date" # The new fixed column
# Assigning generic or specific headers for the shifted data
if "press" in table:
# Based on previous prompt instructions for Press sheet:
# Bank, USD_Buy, USD_Sell, USD_Margin, GBP_Buy...
expected_headers = ["date", "bank_name", "usd_buy", "usd_sell", "usd_margin", "gbp_buy", "gbp_sell", "gbp_margin", "euro_buy", "euro_sell", "euro_margin"]
else:
# Indicative sheet: Currency, Mean, Buy, Sell
expected_headers = ["date", "currency", "mean_rate", "buy_rate", "sell_rate"]
# Map headers safely (truncate if df has fewer cols, pad if more)
final_cols = expected_headers + [f"col_{i}" for i in range(len(df.columns) - len(expected_headers))]
df.columns = final_cols[:len(df.columns)]
# Clean up: Drop any old 'date' column if it was pushed to the right and is duplicate/garbage
# (Optional, but safer to keep strictly what we shifted)
df.to_sql(table, engine, if_exists='replace', index=False)
print(f" Fixed '{table}': Applied Date Shift & Header Rename.")
except Exception as e:
print(f" Error fixing {table}: {e}")
def fix_cbk_indicative_swap(engine):
"""Swaps 'date' and 'currency' column names."""
table_name = "cbk_indicative_rates"
try:
df = pd.read_sql(f'SELECT * FROM "{table_name}"', engine)
rename_map = {}
if 'date' in df.columns: rename_map['date'] = 'currency'
if 'currency' in df.columns: rename_map['currency'] = 'date'
if rename_map:
df.rename(columns=rename_map, inplace=True)
df.to_sql(table_name, engine, if_exists='replace', index=False)
print(f" Fixed '{table_name}': Swapped 'date' <-> 'currency'.")
except Exception as e:
print(f" Error fixing {table_name}: {e}")