Sadeep Sachintha
feat: implement async database session management and CBSL currency exchange rate service with persistent caching
61207aa | import os | |
| import json | |
| import time | |
| import socket | |
| import logging | |
| import aiohttp | |
| import re | |
| from datetime import datetime, timezone, timedelta | |
| from typing import Optional, Dict, Tuple | |
| from html.parser import HTMLParser | |
| from core.config import settings | |
| logger = logging.getLogger(__name__) | |
| ALLOWED_CURRENCIES = { | |
| "USD", "GBP", "EUR", "AED", "SAR", | |
| "AUD", "INR", "JPY", "CNY", "QAR", "LKR" | |
| } | |
| # Resolve cache file path relative to this script's directory (resolves to project root/rate_cache.json) | |
| CACHE_FILE = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "rate_cache.json") | |
| CACHE_TTL = 3600 # Cache duration in seconds (1 hour) | |
| class CBSLTableParser(HTMLParser): | |
| """Custom light-weight HTML parser to extract tables from CBSL exrates results.""" | |
| def __init__(self): | |
| super().__init__() | |
| self.tables = [] | |
| self.current_table = [] | |
| self.current_row = [] | |
| self.current_cell = [] | |
| self.in_table = False | |
| self.in_row = False | |
| self.in_cell = False | |
| def handle_starttag(self, tag, attrs): | |
| if tag == 'table': | |
| self.in_table = True | |
| self.current_table = [] | |
| elif tag == 'tr' and self.in_table: | |
| self.in_row = True | |
| self.current_row = [] | |
| elif tag in ('td', 'th') and self.in_row: | |
| self.in_cell = True | |
| self.current_cell = [] | |
| def handle_endtag(self, tag): | |
| if tag == 'table' and self.in_table: | |
| self.in_table = False | |
| self.tables.append(self.current_table) | |
| elif tag == 'tr' and self.in_row: | |
| self.in_row = False | |
| self.current_table.append(self.current_row) | |
| elif tag in ('td', 'th') and self.in_cell: | |
| self.in_cell = False | |
| text = "".join(self.current_cell).strip().replace('\n', ' ') | |
| self.current_row.append(text) | |
| def handle_data(self, data): | |
| if self.in_cell: | |
| self.current_cell.append(data) | |
| class FXService: | |
| def __init__(self): | |
| self.cache: Dict[str, Dict] = {} | |
| self.load_cache() | |
| def is_valid_currency(self, currency: str) -> bool: | |
| """Validates if the currency is supported.""" | |
| return currency.upper() in ALLOWED_CURRENCIES | |
| def load_cache(self): | |
| """Loads rates cache from persistent JSON file on disk.""" | |
| if os.path.exists(CACHE_FILE): | |
| try: | |
| with open(CACHE_FILE, "r", encoding="utf-8") as f: | |
| self.cache = json.load(f) | |
| logger.info(f"Loaded {len(self.cache)} FX rates from cache file: {CACHE_FILE}") | |
| except Exception as e: | |
| logger.warning(f"Could not load persistent cache file: {e}") | |
| self.cache = {} | |
| else: | |
| self.cache = {} | |
| def save_cache(self): | |
| """Saves current rates cache to persistent JSON file on disk.""" | |
| try: | |
| with open(CACHE_FILE, "w", encoding="utf-8") as f: | |
| json.dump(self.cache, f, indent=4) | |
| logger.debug(f"Saved cache containing {len(self.cache)} rates to {CACHE_FILE}") | |
| except Exception as e: | |
| logger.warning(f"Could not save persistent cache file: {e}") | |
| def update_cache_entry(self, base: str, target: str, rate: float): | |
| """Helper to add or update an entry in the cache with the current timestamp.""" | |
| key = f"{base.upper()}_{target.upper()}" | |
| self.cache[key] = { | |
| "rate": rate, | |
| "timestamp": time.time() | |
| } | |
| def get_cached_rate(self, base: str, target: str) -> Optional[float]: | |
| """Checks if a rate exists in the cache and is still valid (not expired).""" | |
| key = f"{base.upper()}_{target.upper()}" | |
| if key in self.cache: | |
| entry = self.cache[key] | |
| age = time.time() - entry.get("timestamp", 0) | |
| if age < CACHE_TTL: | |
| logger.debug(f"Cache HIT for rate {key}: {entry['rate']} (Age: {int(age)}s)") | |
| return entry["rate"] | |
| else: | |
| logger.debug(f"Cache EXPIRED for rate {key} (Age: {int(age)}s)") | |
| return None | |
| def get_stale_rate(self, base: str, target: str) -> Optional[float]: | |
| """Fallback to retrieve an expired cached rate in case all external sources fail.""" | |
| key = f"{base.upper()}_{target.upper()}" | |
| if key in self.cache: | |
| entry = self.cache[key] | |
| logger.warning(f"Using STALE fallback rate for {key}: {entry['rate']}") | |
| return entry["rate"] | |
| return None | |
| async def save_historical_rates(self, rates: Dict[str, float]): | |
| """Saves a dictionary of currency rates to LKR in the database history with a 12-hour throttling check.""" | |
| from db.session import async_session | |
| from db.models import ExchangeRateHistory | |
| from sqlalchemy import select, and_ | |
| now = datetime.now(timezone.utc) | |
| throttle_time = now - timedelta(hours=12) | |
| async with async_session() as session: | |
| try: | |
| for cur, rate in rates.items(): | |
| # 1. Throttling Check: See if we wrote this currency in the last 12 hours | |
| stmt = select(ExchangeRateHistory.id).where( | |
| and_( | |
| ExchangeRateHistory.currency == cur, | |
| ExchangeRateHistory.timestamp >= throttle_time.replace(tzinfo=None) | |
| ) | |
| ).limit(1) | |
| res = await session.execute(stmt) | |
| if res.first(): | |
| logger.debug(f"History entry for {cur} is throttled (written in last 12 hours). Skipping database record.") | |
| continue | |
| # 2. If not throttled, write to DB | |
| history_entry = ExchangeRateHistory( | |
| currency=cur, | |
| rate_to_lkr=rate, | |
| timestamp=now.replace(tzinfo=None) | |
| ) | |
| session.add(history_entry) | |
| logger.info(f"Recorded database rate history: {cur} = {rate} LKR") | |
| await session.commit() | |
| except Exception as e: | |
| logger.error(f"Failed to save exchange rate history to database: {e}") | |
| async def fetch_cbsl_rates(self) -> bool: | |
| """ | |
| Scrapes LKR exchange rates directly from the official Central Bank of Sri Lanka website. | |
| Fetches all major currencies in exactly one request and handles date backtracking for weekends/holidays. | |
| """ | |
| url = "https://www.cbsl.gov.lk/cbsl_custom/exrates/exrates_results.php" | |
| # Build form checkboxes for all allowed currencies to query them at once | |
| data_params = [ | |
| ("lookupPage", "lookup_daily_exchange_rates.php"), | |
| ("startRange", "2006-11-11"), | |
| ("rangeType", "dates"), | |
| ("chk_cur[]", "USD~US Dollar"), | |
| ("chk_cur[]", "EUR~Euro"), | |
| ("chk_cur[]", "GBP~Sterling Pound"), | |
| ("chk_cur[]", "AUD~Australian Dollar"), | |
| ("chk_cur[]", "JPY~Japanese Yen"), | |
| ("chk_cur[]", "AED~UAE Dirham"), | |
| ("chk_cur[]", "SAR~Saudi Arabian Riyal"), | |
| ("chk_cur[]", "INR~Indian Rupee"), | |
| ("chk_cur[]", "CNY~Chinese Yuan (Renminbi)"), | |
| ("chk_cur[]", "QAR~Qatar Riyal"), | |
| ("submit_button", "Submit") | |
| ] | |
| headers = { | |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", | |
| "Origin": "https://www.cbsl.gov.lk", | |
| "Referer": "https://www.cbsl.gov.lk/cbsl_custom/exrates/exrates.php" | |
| } | |
| # Force IPv4 TCPConnector to bypass HF Spaces DNS/routing bugs | |
| connector = aiohttp.TCPConnector(family=socket.AF_INET) | |
| try: | |
| async with aiohttp.ClientSession(connector=connector, headers=headers) as session: | |
| today = datetime.now() | |
| # Backtrack up to 7 days to handle weekends/holidays when CBSL does not publish rates | |
| for i in range(8): | |
| query_date = today - timedelta(days=i) | |
| date_str = query_date.strftime("%Y-%m-%d") | |
| logger.info(f"Scraping CBSL rates for date: {date_str}...") | |
| # Update date parameters dynamically in our form payload list | |
| post_data = [] | |
| for k, v in data_params: | |
| if k == "txtStart" or k == "txtEnd": | |
| continue | |
| post_data.append((k, v)) | |
| post_data.append(("txtStart", date_str)) | |
| post_data.append(("txtEnd", date_str)) | |
| try: | |
| async with session.post(url, data=post_data, timeout=10) as response: | |
| if response.status == 200: | |
| html_text = await response.text() | |
| parser = CBSLTableParser() | |
| parser.feed(html_text) | |
| has_parsed_any = False | |
| scraped_rates = {} | |
| for table in parser.tables: | |
| cleaned_rows = [] | |
| for row in table: | |
| row_cleaned = [item.strip() for item in row if item.strip()] | |
| if row_cleaned: | |
| cleaned_rows.append(row_cleaned) | |
| # Each table contains [Header row, Data row] | |
| if len(cleaned_rows) > 1: | |
| header = cleaned_rows[0] | |
| data_row = cleaned_rows[1] | |
| # Parse currency, e.g. "1 USD -> LKR" | |
| match = re.search(r'1\s+([A-Z]{3})\s+->', header[1]) | |
| if match and len(data_row) >= 2: | |
| cur_code = match.group(1).upper() | |
| try: | |
| rate_val = float(data_row[1]) | |
| # Save base_LKR rates in cache | |
| self.update_cache_entry(cur_code, "LKR", rate_val) | |
| # Also calculate LKR_base reciprocal | |
| self.update_cache_entry("LKR", cur_code, 1.0 / rate_val if rate_val > 0 else 0.0) | |
| scraped_rates[cur_code] = rate_val | |
| has_parsed_any = True | |
| except ValueError: | |
| pass | |
| if has_parsed_any: | |
| logger.info(f"Successfully scraped CBSL rates for LKR on {date_str}.") | |
| self.save_cache() | |
| await self.save_historical_rates(scraped_rates) | |
| return True | |
| else: | |
| logger.warning(f"CBSL scrape post failed with status: {response.status}") | |
| except Exception as e: | |
| logger.error(f"Error making POST request to CBSL for {date_str}: {e}") | |
| logger.warning("Failed to find any active exchange rates from CBSL in the last 7 days.") | |
| return False | |
| except Exception as e: | |
| logger.error(f"Global exception in fetch_cbsl_rates: {e}") | |
| return False | |
| async def get_rate(self, base_currency: str, target_currency: str) -> Optional[float]: | |
| """ | |
| Retrieves real-time exchange rate with support for in-memory caching, | |
| disk persistence, CBSL web scraping, bridge-rate conversions, and robust multi-tiered fallbacks. | |
| """ | |
| base = base_currency.upper() | |
| target = target_currency.upper() | |
| if not self.is_valid_currency(base) or not self.is_valid_currency(target): | |
| logger.warning(f"Invalid currency pair requested: {base}/{target}") | |
| return None | |
| # 1. Check Identity rate (e.g. USD -> USD) | |
| if base == target: | |
| return 1.0 | |
| # 2. Check Cache | |
| cached_rate = self.get_cached_rate(base, target) | |
| if cached_rate is not None: | |
| return cached_rate | |
| # 3. Check if we can derive the rate mathematically using LKR as a bridge from cache. | |
| # For any non-LKR conversion, if we have USD_LKR and EUR_LKR, rate(USD->EUR) = rate(USD->LKR) / rate(EUR->LKR) | |
| if base != "LKR" and target != "LKR": | |
| base_lkr = self.get_cached_rate(base, "LKR") | |
| target_lkr = self.get_cached_rate(target, "LKR") | |
| if base_lkr is not None and target_lkr is not None and target_lkr > 0: | |
| derived = base_lkr / target_lkr | |
| self.update_cache_entry(base, target, derived) | |
| logger.info(f"Derived cross rate {base}->{target} from cached LKR rates: {derived}") | |
| return derived | |
| # 4. Cache MISS: Retrieve fresh rates. | |
| # Since we are fully using web scraping, any cache miss should trigger CBSL scraper. | |
| # CBSL scraper fetches all rates (against LKR) and populates them into the cache. | |
| logger.info(f"Cache miss for {base}->{target}. Launching CBSL Web Scraper...") | |
| cbsl_success = await self.fetch_cbsl_rates() | |
| if cbsl_success: | |
| # Try to get rate directly from cache (if either is LKR) | |
| res = self.get_cached_rate(base, target) | |
| if res is not None: | |
| return res | |
| # Or try to derive cross rate mathematically if both are non-LKR | |
| if base != "LKR" and target != "LKR": | |
| base_lkr = self.get_cached_rate(base, "LKR") | |
| target_lkr = self.get_cached_rate(target, "LKR") | |
| if base_lkr is not None and target_lkr is not None and target_lkr > 0: | |
| derived = base_lkr / target_lkr | |
| self.update_cache_entry(base, target, derived) | |
| self.save_cache() | |
| logger.info(f"Derived cross rate {base}->{target} after CBSL scrape: {derived}") | |
| return derived | |
| # 5. Total Failure: Fallback to stale/expired cached rate if it exists | |
| logger.warning(f"All live FX sources failed for {base}->{target}. Attempting stale cache fallback...") | |
| stale_rate = self.get_stale_rate(base, target) | |
| if stale_rate is not None: | |
| return stale_rate | |
| # If we failed to get a direct stale rate, try to compute it from stale LKR rates | |
| if base != "LKR" and target != "LKR": | |
| base_lkr_stale = self.get_stale_rate(base, "LKR") | |
| target_lkr_stale = self.get_stale_rate(target, "LKR") | |
| if base_lkr_stale is not None and target_lkr_stale is not None and target_lkr_stale > 0: | |
| derived = base_lkr_stale / target_lkr_stale | |
| self.update_cache_entry(base, target, derived) | |
| self.save_cache() | |
| logger.warning(f"Derived stale cross rate {base}->{target} from stale LKR rates: {derived}") | |
| return derived | |
| # 6. Absolute Failure: No rates available | |
| logger.critical(f"No rates available in memory, disk, or APIs for {base}->{target}!") | |
| return None | |
| fx_service = FXService() | |