# csv_util.py import os import pandas as pd import logging logging.basicConfig( level=logging.INFO, format='%(asctime)s [%(levelname)s] %(name)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) logger = logging.getLogger(__name__) CSV_COLUMNS = ["DateTime", "Currency", "Impact", "Event", "Actual", "Forecast", "Previous", "Detail"] def ensure_csv_header(csv_file): if not os.path.exists(csv_file) or os.path.getsize(csv_file) == 0: pd.DataFrame(columns=CSV_COLUMNS).to_csv(csv_file, index=False) def read_existing_data(csv_file): ensure_csv_header(csv_file) try: df = pd.read_csv(csv_file, dtype=str) for col in CSV_COLUMNS: if col not in df.columns: df[col] = "" return df[CSV_COLUMNS] except Exception: return pd.DataFrame(columns=CSV_COLUMNS) def write_data_to_csv(df: pd.DataFrame, csv_file: str): df = df.sort_values(by="DateTime", ascending=True) df.to_csv(csv_file, index=False) def merge_new_data(existing_df, new_df): if existing_df.empty: return new_df def add_unique_key(df): df = df.copy() df['unique_key'] = ( df["DateTime"].astype(str).str.strip() + "_" + df["Currency"].astype(str).str.strip() + "_" + df["Event"].astype(str).str.strip() ) return df existing_df = add_unique_key(existing_df) new_df = add_unique_key(new_df) existing_df.set_index('unique_key', inplace=True) new_df.set_index('unique_key', inplace=True) new_rows_list = [] for key, new_row in new_df.iterrows(): if key in existing_df.index: existing_detail = str(existing_df.at[key, "Detail"]).strip() if pd.notna(existing_df.at[key, "Detail"]) else "" new_detail = str(new_row["Detail"]).strip() if pd.notna(new_row["Detail"]) else "" if not existing_detail and new_detail: existing_df.at[key, "Detail"] = new_detail else: new_rows_list.append(new_row) if new_rows_list: new_rows_df = pd.DataFrame(new_rows_list) existing_df = pd.concat([existing_df, new_rows_df]) merged_df = existing_df.reset_index(drop=True) merged_df = merged_df[CSV_COLUMNS] return merged_df