import pandas as pd import json import os from pathlib import Path from dotenv import load_dotenv from app.database import engine from app.core.utils import clean_nans # Load environment variables load_dotenv() BASE_DIR = Path(__file__).resolve().parent.parent DATA_DIR = BASE_DIR / "data" STATS_PATH = DATA_DIR / "stats.json" # CSV Paths (Backward compatibility for migration/rebuild scripts) PATH_AHLYA_CSV = os.getenv("PATH_AHLYA_CSV", "Base-Ahlya.csv") PATH_RNE_CSV = os.getenv("PATH_RNE_CSV", "Base-RNE.csv") PATH_JORT_CSV = os.getenv("PATH_JORT_CSV", "Base-JORT.csv") def normalize_company_name(name: str) -> str: """ Normalise le nom de la société pour les jointures. Standards Moez Elbey: Unicode normalisation. """ if not name or not isinstance(name, str): return "" import unicodedata import re # Enlever accents et normaliser name = unicodedata.normalize('NFKD', name).encode('ASCII', 'ignore').decode('ASCII') name = name.upper().strip() # Enlever ponctuation et doubles espaces name = re.sub(r'[^\w\s]', ' ', name) name = re.sub(r'\s+', ' ', name) return name.strip() class DataLoader: _instance = None _cached_df = None stats_data = None def __new__(cls): if cls._instance is None: cls._instance = super(DataLoader, cls).__new__(cls) return cls._instance def load(self): """ Loads non-SQL data (stats) and initializes connection check. """ print("Initializing DataLoader (Supabase Mode)...") try: # Load Stats from JSON if not STATS_PATH.exists(): print(f"Warning: Stats file not found at {STATS_PATH}") self.stats_data = {} else: with open(STATS_PATH, 'r', encoding='utf-8') as f: self.stats_data = json.load(f) # Note: We don't preload companies_df here to save RAM. # It will be fetched on demand. except Exception as e: print(f"Error during DataLoader init: {e}") self.stats_data = {} async def fetch_companies_df(self, force_refresh=False): """ Fetches the unified view from Supabase and returns a Pandas DataFrame. This replaces the heavy CSV merging logic. """ if self._cached_df is not None and not force_refresh: return self._cached_df print("Fetching companies from Supabase view 'companies_unified' via SQL...") try: # Fetch directly using SQLAlchemy engine query = "SELECT * FROM companies_unified" df = pd.read_sql(query, con=engine) if df.empty: print("Warning: No data returned from Supabase view.") return pd.DataFrame() # Clean NaNs and convert to DF # Clean NaNs # Note: Pandas read_sql already handles many things, but clean_nans ensures JSON safety for API data = clean_nans(df.to_dict(orient='records')) df = pd.DataFrame(data) # Note: We keep the 'id' from the SQL view 'companies_unified' (a.id) # instead of overwriting it with a virtual range. # This ensures stable IDs that match enriched_companies.company_id. if 'id' not in df.columns: df['id'] = range(1, len(df) + 1) # Ensure id is always string for consistent matching df['id'] = df['id'].astype(str) # Recalculate capital divergence if not already handled by SQL # (In our current SQL view, we could add this, but keeping it here for safety for now) threshold = float(os.getenv("CAPITAL_DIVERGENCE_THRESHOLD", 0.05)) df['capital_divergence'] = False if 'jort_capital' in df.columns and 'rne_capital' in df.columns: df['jort_capital'] = pd.to_numeric(df['jort_capital'], errors='coerce') df['rne_capital'] = pd.to_numeric(df['rne_capital'], errors='coerce') mask = (df['jort_capital'].notna()) & (df['rne_capital'].notna()) & (df['jort_capital'] > 0) diff = abs(df.loc[mask, 'jort_capital'] - df.loc[mask, 'rne_capital']) / df.loc[mask, 'jort_capital'] df.loc[mask, 'capital_divergence'] = diff > threshold self._cached_df = df return df except Exception as e: print(f"Error fetching data from Supabase: {e}") return pd.DataFrame() async def fetch_company_by_id(self, company_name_normalized: str): """ Fetch a single company from Supabase by its normalized name. """ try: query = "SELECT * FROM companies_unified WHERE name_normalized = %s" # Use pandas read_sql with params or engine.connect() with engine.connect() as conn: from sqlalchemy import text result = conn.execute(text("SELECT * FROM companies_unified WHERE name_normalized = :name"), {"name": company_name_normalized}) row = result.mappings().first() if row: return clean_nans(dict(row)) return None except Exception as e: print(f"Error fetching specific company: {e}") return None data_loader = DataLoader() def load_data(): """Startup initialization""" data_loader.load() async def get_companies_df(): """ Main accessor for legacy code. Note: Now async because of Supabase call. """ return await data_loader.fetch_companies_df() def get_stats_data(): return data_loader.stats_data