ahlya / app /services /data_loader.py
Ba7ath-Project's picture
Fix ID Troubleshoot
5741143
import pandas as pd
import json
import os
from pathlib import Path
from dotenv import load_dotenv
from app.database import engine
from app.core.utils import clean_nans
# Load environment variables
load_dotenv()
BASE_DIR = Path(__file__).resolve().parent.parent
DATA_DIR = BASE_DIR / "data"
STATS_PATH = DATA_DIR / "stats.json"
# CSV Paths (Backward compatibility for migration/rebuild scripts)
PATH_AHLYA_CSV = os.getenv("PATH_AHLYA_CSV", "Base-Ahlya.csv")
PATH_RNE_CSV = os.getenv("PATH_RNE_CSV", "Base-RNE.csv")
PATH_JORT_CSV = os.getenv("PATH_JORT_CSV", "Base-JORT.csv")
def normalize_company_name(name: str) -> str:
"""
Normalise le nom de la société pour les jointures.
Standards Moez Elbey: Unicode normalisation.
"""
if not name or not isinstance(name, str):
return ""
import unicodedata
import re
# Enlever accents et normaliser
name = unicodedata.normalize('NFKD', name).encode('ASCII', 'ignore').decode('ASCII')
name = name.upper().strip()
# Enlever ponctuation et doubles espaces
name = re.sub(r'[^\w\s]', ' ', name)
name = re.sub(r'\s+', ' ', name)
return name.strip()
class DataLoader:
_instance = None
_cached_df = None
stats_data = None
def __new__(cls):
if cls._instance is None:
cls._instance = super(DataLoader, cls).__new__(cls)
return cls._instance
def load(self):
"""
Loads non-SQL data (stats) and initializes connection check.
"""
print("Initializing DataLoader (Supabase Mode)...")
try:
# Load Stats from JSON
if not STATS_PATH.exists():
print(f"Warning: Stats file not found at {STATS_PATH}")
self.stats_data = {}
else:
with open(STATS_PATH, 'r', encoding='utf-8') as f:
self.stats_data = json.load(f)
# Note: We don't preload companies_df here to save RAM.
# It will be fetched on demand.
except Exception as e:
print(f"Error during DataLoader init: {e}")
self.stats_data = {}
async def fetch_companies_df(self, force_refresh=False):
"""
Fetches the unified view from Supabase and returns a Pandas DataFrame.
This replaces the heavy CSV merging logic.
"""
if self._cached_df is not None and not force_refresh:
return self._cached_df
print("Fetching companies from Supabase view 'companies_unified' via SQL...")
try:
# Fetch directly using SQLAlchemy engine
query = "SELECT * FROM companies_unified"
df = pd.read_sql(query, con=engine)
if df.empty:
print("Warning: No data returned from Supabase view.")
return pd.DataFrame()
# Clean NaNs and convert to DF
# Clean NaNs
# Note: Pandas read_sql already handles many things, but clean_nans ensures JSON safety for API
data = clean_nans(df.to_dict(orient='records'))
df = pd.DataFrame(data)
# Note: We keep the 'id' from the SQL view 'companies_unified' (a.id)
# instead of overwriting it with a virtual range.
# This ensures stable IDs that match enriched_companies.company_id.
if 'id' not in df.columns:
df['id'] = range(1, len(df) + 1)
# Recalculate capital divergence if not already handled by SQL
# (In our current SQL view, we could add this, but keeping it here for safety for now)
threshold = float(os.getenv("CAPITAL_DIVERGENCE_THRESHOLD", 0.05))
df['capital_divergence'] = False
if 'jort_capital' in df.columns and 'rne_capital' in df.columns:
df['jort_capital'] = pd.to_numeric(df['jort_capital'], errors='coerce')
df['rne_capital'] = pd.to_numeric(df['rne_capital'], errors='coerce')
mask = (df['jort_capital'].notna()) & (df['rne_capital'].notna()) & (df['jort_capital'] > 0)
diff = abs(df.loc[mask, 'jort_capital'] - df.loc[mask, 'rne_capital']) / df.loc[mask, 'jort_capital']
df.loc[mask, 'capital_divergence'] = diff > threshold
self._cached_df = df
return df
except Exception as e:
print(f"Error fetching data from Supabase: {e}")
return pd.DataFrame()
async def fetch_company_by_id(self, company_name_normalized: str):
"""
Fetch a single company from Supabase by its normalized name.
"""
try:
query = "SELECT * FROM companies_unified WHERE name_normalized = %s"
# Use pandas read_sql with params or engine.connect()
with engine.connect() as conn:
from sqlalchemy import text
result = conn.execute(text("SELECT * FROM companies_unified WHERE name_normalized = :name"), {"name": company_name_normalized})
row = result.mappings().first()
if row:
return clean_nans(dict(row))
return None
except Exception as e:
print(f"Error fetching specific company: {e}")
return None
data_loader = DataLoader()
def load_data():
"""Startup initialization"""
data_loader.load()
async def get_companies_df():
"""
Main accessor for legacy code.
Note: Now async because of Supabase call.
"""
return await data_loader.fetch_companies_df()
def get_stats_data():
return data_loader.stats_data