Spaces:
Running
Running
File size: 5,964 Bytes
ddd2903 ca51ce4 ddd2903 ca51ce4 ddd2903 ca51ce4 ddd2903 ca51ce4 ddd2903 ca51ce4 ddd2903 ed3db4e ddd2903 387bde7 ddd2903 ca51ce4 ddd2903 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 | import pandas as pd
import json
import os
from pathlib import Path
from dotenv import load_dotenv
from app.database import engine
from app.core.utils import clean_nans
# Load environment variables
load_dotenv()
BASE_DIR = Path(__file__).resolve().parent.parent
DATA_DIR = BASE_DIR / "data"
STATS_PATH = DATA_DIR / "stats.json"
# CSV Paths (Backward compatibility for migration/rebuild scripts)
PATH_AHLYA_CSV = os.getenv("PATH_AHLYA_CSV", "Base-Ahlya.csv")
PATH_RNE_CSV = os.getenv("PATH_RNE_CSV", "Base-RNE.csv")
PATH_JORT_CSV = os.getenv("PATH_JORT_CSV", "Base-JORT.csv")
def normalize_company_name(name: str) -> str:
"""
Normalise le nom de la société pour les jointures.
Standards Moez Elbey: Unicode normalisation.
"""
if not name or not isinstance(name, str):
return ""
import unicodedata
import re
# Enlever accents et normaliser
name = unicodedata.normalize('NFKD', name).encode('ASCII', 'ignore').decode('ASCII')
name = name.upper().strip()
# Enlever ponctuation et doubles espaces
name = re.sub(r'[^\w\s]', ' ', name)
name = re.sub(r'\s+', ' ', name)
return name.strip()
class DataLoader:
_instance = None
_cached_df = None
stats_data = None
def __new__(cls):
if cls._instance is None:
cls._instance = super(DataLoader, cls).__new__(cls)
return cls._instance
def load(self):
"""
Loads non-SQL data (stats) and initializes connection check.
"""
print("Initializing DataLoader (Supabase Mode)...")
try:
# Load Stats from JSON
if not STATS_PATH.exists():
print(f"Warning: Stats file not found at {STATS_PATH}")
self.stats_data = {}
else:
with open(STATS_PATH, 'r', encoding='utf-8') as f:
self.stats_data = json.load(f)
# Note: We don't preload companies_df here to save RAM.
# It will be fetched on demand.
except Exception as e:
print(f"Error during DataLoader init: {e}")
self.stats_data = {}
async def fetch_companies_df(self, force_refresh=False):
"""
Fetches the unified view from Supabase and returns a Pandas DataFrame.
This replaces the heavy CSV merging logic.
"""
if self._cached_df is not None and not force_refresh:
return self._cached_df
print("Fetching companies from Supabase view 'companies_unified' via SQL...")
try:
# Fetch directly using SQLAlchemy engine
query = "SELECT * FROM companies_unified"
df = pd.read_sql(query, con=engine)
if df.empty:
print("Warning: No data returned from Supabase view.")
return pd.DataFrame()
# Clean NaNs and convert to DF
# Clean NaNs
# Note: Pandas read_sql already handles many things, but clean_nans ensures JSON safety for API
data = clean_nans(df.to_dict(orient='records'))
df = pd.DataFrame(data)
# Note: We keep the 'id' from the SQL view 'companies_unified' (a.id)
# instead of overwriting it with a virtual range.
# This ensures stable IDs that match enriched_companies.company_id.
if 'id' not in df.columns:
df['id'] = range(1, len(df) + 1)
# Ensure id is always string for consistent matching
df['id'] = df['id'].astype(str)
# Recalculate capital divergence if not already handled by SQL
# (In our current SQL view, we could add this, but keeping it here for safety for now)
threshold = float(os.getenv("CAPITAL_DIVERGENCE_THRESHOLD", 0.05))
df['capital_divergence'] = False
if 'jort_capital' in df.columns and 'rne_capital' in df.columns:
df['jort_capital'] = pd.to_numeric(df['jort_capital'], errors='coerce')
df['rne_capital'] = pd.to_numeric(df['rne_capital'], errors='coerce')
mask = (df['jort_capital'].notna()) & (df['rne_capital'].notna()) & (df['jort_capital'] > 0)
diff = abs(df.loc[mask, 'jort_capital'] - df.loc[mask, 'rne_capital']) / df.loc[mask, 'jort_capital']
df.loc[mask, 'capital_divergence'] = diff > threshold
self._cached_df = df
return df
except Exception as e:
print(f"Error fetching data from Supabase: {e}")
return pd.DataFrame()
async def fetch_company_by_id(self, company_name_normalized: str):
"""
Fetch a single company from Supabase by its normalized name.
"""
try:
query = "SELECT * FROM companies_unified WHERE name_normalized = %s"
# Use pandas read_sql with params or engine.connect()
with engine.connect() as conn:
from sqlalchemy import text
result = conn.execute(text("SELECT * FROM companies_unified WHERE name_normalized = :name"), {"name": company_name_normalized})
row = result.mappings().first()
if row:
return clean_nans(dict(row))
return None
except Exception as e:
print(f"Error fetching specific company: {e}")
return None
data_loader = DataLoader()
def load_data():
"""Startup initialization"""
data_loader.load()
async def get_companies_df():
"""
Main accessor for legacy code.
Note: Now async because of Supabase call.
"""
return await data_loader.fetch_companies_df()
def get_stats_data():
return data_loader.stats_data
|