Spaces:
Running
Running
| import pandas as pd | |
| import json | |
| import os | |
| from pathlib import Path | |
| from dotenv import load_dotenv | |
| from app.database import engine | |
| from app.core.utils import clean_nans | |
| # Load environment variables | |
| load_dotenv() | |
| BASE_DIR = Path(__file__).resolve().parent.parent | |
| DATA_DIR = BASE_DIR / "data" | |
| STATS_PATH = DATA_DIR / "stats.json" | |
| # CSV Paths (Backward compatibility for migration/rebuild scripts) | |
| PATH_AHLYA_CSV = os.getenv("PATH_AHLYA_CSV", "Base-Ahlya.csv") | |
| PATH_RNE_CSV = os.getenv("PATH_RNE_CSV", "Base-RNE.csv") | |
| PATH_JORT_CSV = os.getenv("PATH_JORT_CSV", "Base-JORT.csv") | |
| def normalize_company_name(name: str) -> str: | |
| """ | |
| Normalise le nom de la société pour les jointures. | |
| Standards Moez Elbey: Unicode normalisation. | |
| """ | |
| if not name or not isinstance(name, str): | |
| return "" | |
| import unicodedata | |
| import re | |
| # Enlever accents et normaliser | |
| name = unicodedata.normalize('NFKD', name).encode('ASCII', 'ignore').decode('ASCII') | |
| name = name.upper().strip() | |
| # Enlever ponctuation et doubles espaces | |
| name = re.sub(r'[^\w\s]', ' ', name) | |
| name = re.sub(r'\s+', ' ', name) | |
| return name.strip() | |
| class DataLoader: | |
| _instance = None | |
| _cached_df = None | |
| stats_data = None | |
| def __new__(cls): | |
| if cls._instance is None: | |
| cls._instance = super(DataLoader, cls).__new__(cls) | |
| return cls._instance | |
| def load(self): | |
| """ | |
| Loads non-SQL data (stats) and initializes connection check. | |
| """ | |
| print("Initializing DataLoader (Supabase Mode)...") | |
| try: | |
| # Load Stats from JSON | |
| if not STATS_PATH.exists(): | |
| print(f"Warning: Stats file not found at {STATS_PATH}") | |
| self.stats_data = {} | |
| else: | |
| with open(STATS_PATH, 'r', encoding='utf-8') as f: | |
| self.stats_data = json.load(f) | |
| # Note: We don't preload companies_df here to save RAM. | |
| # It will be fetched on demand. | |
| except Exception as e: | |
| print(f"Error during DataLoader init: {e}") | |
| self.stats_data = {} | |
| async def fetch_companies_df(self, force_refresh=False): | |
| """ | |
| Fetches the unified view from Supabase and returns a Pandas DataFrame. | |
| This replaces the heavy CSV merging logic. | |
| """ | |
| if self._cached_df is not None and not force_refresh: | |
| return self._cached_df | |
| print("Fetching companies from Supabase view 'companies_unified' via SQL...") | |
| try: | |
| # Fetch directly using SQLAlchemy engine | |
| query = "SELECT * FROM companies_unified" | |
| df = pd.read_sql(query, con=engine) | |
| if df.empty: | |
| print("Warning: No data returned from Supabase view.") | |
| return pd.DataFrame() | |
| # Clean NaNs and convert to DF | |
| # Clean NaNs | |
| # Note: Pandas read_sql already handles many things, but clean_nans ensures JSON safety for API | |
| data = clean_nans(df.to_dict(orient='records')) | |
| df = pd.DataFrame(data) | |
| # Note: We keep the 'id' from the SQL view 'companies_unified' (a.id) | |
| # instead of overwriting it with a virtual range. | |
| # This ensures stable IDs that match enriched_companies.company_id. | |
| if 'id' not in df.columns: | |
| df['id'] = range(1, len(df) + 1) | |
| # Recalculate capital divergence if not already handled by SQL | |
| # (In our current SQL view, we could add this, but keeping it here for safety for now) | |
| threshold = float(os.getenv("CAPITAL_DIVERGENCE_THRESHOLD", 0.05)) | |
| df['capital_divergence'] = False | |
| if 'jort_capital' in df.columns and 'rne_capital' in df.columns: | |
| df['jort_capital'] = pd.to_numeric(df['jort_capital'], errors='coerce') | |
| df['rne_capital'] = pd.to_numeric(df['rne_capital'], errors='coerce') | |
| mask = (df['jort_capital'].notna()) & (df['rne_capital'].notna()) & (df['jort_capital'] > 0) | |
| diff = abs(df.loc[mask, 'jort_capital'] - df.loc[mask, 'rne_capital']) / df.loc[mask, 'jort_capital'] | |
| df.loc[mask, 'capital_divergence'] = diff > threshold | |
| self._cached_df = df | |
| return df | |
| except Exception as e: | |
| print(f"Error fetching data from Supabase: {e}") | |
| return pd.DataFrame() | |
| async def fetch_company_by_id(self, company_name_normalized: str): | |
| """ | |
| Fetch a single company from Supabase by its normalized name. | |
| """ | |
| try: | |
| query = "SELECT * FROM companies_unified WHERE name_normalized = %s" | |
| # Use pandas read_sql with params or engine.connect() | |
| with engine.connect() as conn: | |
| from sqlalchemy import text | |
| result = conn.execute(text("SELECT * FROM companies_unified WHERE name_normalized = :name"), {"name": company_name_normalized}) | |
| row = result.mappings().first() | |
| if row: | |
| return clean_nans(dict(row)) | |
| return None | |
| except Exception as e: | |
| print(f"Error fetching specific company: {e}") | |
| return None | |
| data_loader = DataLoader() | |
| def load_data(): | |
| """Startup initialization""" | |
| data_loader.load() | |
| async def get_companies_df(): | |
| """ | |
| Main accessor for legacy code. | |
| Note: Now async because of Supabase call. | |
| """ | |
| return await data_loader.fetch_companies_df() | |
| def get_stats_data(): | |
| return data_loader.stats_data | |