Spaces:
Running
Running
File size: 9,531 Bytes
7011b92 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 |
# clean_database.py
import pandas as pd
from sqlalchemy import create_engine, text
import logging
# Set up a logger that can be configured by the importer
logger = logging.getLogger("DBCleaner")
def drop_blacklisted_tables(engine):
"""Drops tables matching the blacklist patterns."""
drop_patterns = [
"bop_annual",
"commercial_banks_average_lending_rates",
"depository_corporation",
"exchange_rates_end_period",
"exchange_rates_period_average",
"forex_bureau_rates_sheet",
"lr_return_template",
"nsfr_return_template"
]
with engine.connect() as conn:
all_tables = [t[0] for t in conn.execute(text("SELECT name FROM sqlite_master WHERE type='table'")).fetchall()]
tables_to_drop = []
for t in all_tables:
if any(p in t for p in drop_patterns):
tables_to_drop.append(t)
if not tables_to_drop:
logger.info("No tables found matching blacklist patterns.")
return
logger.info(f"🗑️ Dropping {len(tables_to_drop)} tables...")
for t in tables_to_drop:
conn.execute(text(f'DROP TABLE "{t}"'))
logger.info(f" - Dropped: {t}")
conn.commit()
def clean_table(engine, table_name, drop_top_rows=0, rename_map=None, rename_by_index=None, static_date=None):
"""
Generic cleaner for specific table fixes.
"""
try:
# Check if table exists first
with engine.connect() as conn:
exists = conn.execute(text(f"SELECT name FROM sqlite_master WHERE type='table' AND name='{table_name}'")).scalar()
if not exists:
logger.warning(f" Table '{table_name}' not found. Skipping.")
return
df = pd.read_sql(f'SELECT * FROM "{table_name}"', engine)
if df.empty: return
# Drop columns that are completely empty
df = df.dropna(axis=1, how='all')
# Drop top rows if requested
if drop_top_rows > 0:
df = df.iloc[drop_top_rows:].reset_index(drop=True)
# Rename by Index (useful for 'col_1', 'col_2')
if rename_by_index:
curr_cols = list(df.columns)
new_cols = curr_cols.copy()
for idx, new_name in rename_by_index.items():
if idx < len(curr_cols):
new_cols[idx] = new_name
df.columns = new_cols
# Rename by Map
if rename_map:
df.rename(columns=rename_map, inplace=True)
# Inject Static Date if missing
if static_date:
if 'date' not in df.columns:
df.insert(0, 'date', static_date)
else:
df['date'] = static_date
# Save back to DB (Replace mode)
df.to_sql(table_name, engine, if_exists='replace', index=False)
logger.info(f" Fixed '{table_name}': {len(df)} rows")
except Exception as e:
logger.error(f" Error cleaning '{table_name}': {e}")
def run_specific_fixes(engine):
"""Orchestrates the specific cleaning rules."""
logger.info("🔧 Running specific table fixes...")
# 1. Historical Rates
clean_table(engine, "download_all_historical_rates",
rename_by_index={2: "mean_rate", 3: "buy_rate", 4: "sell_rate"})
# 2. Foreign Trade Summary
clean_table(engine, "foreign_trade_summary", drop_top_rows=1)
# 3. Forex Bureau Rates
clean_table(engine, "forex_bureau_rates",
rename_map={"bureau_name": "currency"})
# 4. Indicative Rates (Indicative Sheet)
clean_table(engine, "indicative_rates_sheet_indicative",
static_date="2017-11-16",
rename_by_index={0: "currency", 1: "mean_rate", 2: "buy_rate", 3: "sell_rate"})
# 5. Indicative Rates (Press Sheet)
clean_table(engine, "indicative_rates_sheet_press",
static_date="2017-11-16",
rename_by_index={
0: "bank_name",
1: "usd_buy", 2: "usd_sell", 3: "usd_margin",
4: "gbp_buy", 5: "gbp_sell", 6: "gbp_margin"
})
# 6. Selected Domestic Exports
clean_table(engine, "value_of_selected_domestic_exports", drop_top_rows=2)
# 7. Imports by Commodity
clean_table(engine, "value_of_direct_imports_by_commodities", drop_top_rows=1)
def clean_database_pipeline(db_name):
"""Main entry point for external calls."""
connection_str = f"sqlite:///{db_name}"
engine = create_engine(connection_str)
logger.info(f" Starting cleanup on {db_name}...")
drop_blacklisted_tables(engine)
run_specific_fixes(engine)
logger.info(" Cleanup Complete.")
def drop_tables(engine):
"""Drops the specific list of tables requested."""
tables_to_drop = [
'forex_bureau_rates',
'forex_bureaus_rates_sheet_chief_dealers',
'forex_bureaus_rates_sheet_director',
'forex_bureaus_rates_sheet_directors',
'forex_bureaus_rates_sheet_fbx',
'forex_bureaus_rates_sheet_fbx1',
'forex_bureaus_rates_sheet_fbx2',
'forex_bureaus_rates_sheet_fxb1',
'forex_bureaus_rates_sheet_fxb2',
'forex_bureaus_rates_sheet_fxb22',
'forex_bureaus_rates_sheet_market_intelligence',
'forex_bureaus_rates_sheet_sheet1',
'forex_bureaus_rates_sheet_sheet2',
'forex_bureaus_rates_sheet_sheet3',
'forex_bureaus_rates_sheet_sheet4',
'issues_of_treasury_bills',
'issues_of_treasury_bonds'
]
print("🗑️ Dropping Tables...")
with engine.connect() as conn:
for t in tables_to_drop:
try:
conn.execute(text(f'DROP TABLE IF EXISTS "{t}"'))
print(f" - Dropped: {t}")
except Exception as e:
print(f" Could not drop {t}: {e}")
conn.commit()
def fix_foreign_trade(engine):
"""Renames first column to 'year'."""
table_name = "foreign_trade_summary"
try:
df = pd.read_sql(f'SELECT * FROM "{table_name}"', engine)
if 'kenyan_shillings_million_year' in df.columns:
df.rename(columns={'kenyan_shillings_million_year': 'year'}, inplace=True)
df.to_sql(table_name, engine, if_exists='replace', index=False)
print(f" Fixed '{table_name}': Renamed 'year' column.")
else:
print(f" '{table_name}': Target column not found.")
except Exception as e:
print(f" Error fixing {table_name}: {e}")
def fix_indicative_rates_shift(engine):
"""
Applies the 'Shift Right + Fixed Date' logic.
Inserts 2017-11-16 at position 0, shifting existing data to the right.
"""
targets = [
"indicative_rates_sheet_indicative",
"indicative_rates_sheet_press"
]
fixed_date = "2017-11-16"
for table in targets:
try:
df = pd.read_sql(f'SELECT * FROM "{table}"', engine)
if df.empty: continue
# Logic: Insert new date column at index 0
# This effectively "shifts" the old col 0 to col 1
df.insert(0, 'fixed_date', fixed_date)
# Rename columns to reflect the shift clearly
# We assume the user wants standard names for the shifted data
# Adjust names based on the table type
new_columns = list(df.columns)
new_columns[0] = "date" # The new fixed column
# Assigning generic or specific headers for the shifted data
if "press" in table:
# Based on previous prompt instructions for Press sheet:
# Bank, USD_Buy, USD_Sell, USD_Margin, GBP_Buy...
expected_headers = ["date", "bank_name", "usd_buy", "usd_sell", "usd_margin", "gbp_buy", "gbp_sell", "gbp_margin", "euro_buy", "euro_sell", "euro_margin"]
else:
# Indicative sheet: Currency, Mean, Buy, Sell
expected_headers = ["date", "currency", "mean_rate", "buy_rate", "sell_rate"]
# Map headers safely (truncate if df has fewer cols, pad if more)
final_cols = expected_headers + [f"col_{i}" for i in range(len(df.columns) - len(expected_headers))]
df.columns = final_cols[:len(df.columns)]
# Clean up: Drop any old 'date' column if it was pushed to the right and is duplicate/garbage
# (Optional, but safer to keep strictly what we shifted)
df.to_sql(table, engine, if_exists='replace', index=False)
print(f" Fixed '{table}': Applied Date Shift & Header Rename.")
except Exception as e:
print(f" Error fixing {table}: {e}")
def fix_cbk_indicative_swap(engine):
"""Swaps 'date' and 'currency' column names."""
table_name = "cbk_indicative_rates"
try:
df = pd.read_sql(f'SELECT * FROM "{table_name}"', engine)
rename_map = {}
if 'date' in df.columns: rename_map['date'] = 'currency'
if 'currency' in df.columns: rename_map['currency'] = 'date'
if rename_map:
df.rename(columns=rename_map, inplace=True)
df.to_sql(table_name, engine, if_exists='replace', index=False)
print(f" Fixed '{table_name}': Swapped 'date' <-> 'currency'.")
except Exception as e:
print(f" Error fixing {table_name}: {e}") |