Spaces:
Runtime error
Runtime error
| # csv_util.py | |
| import os | |
| import pandas as pd | |
| import logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s [%(levelname)s] %(name)s: %(message)s', | |
| datefmt='%Y-%m-%d %H:%M:%S' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| CSV_COLUMNS = ["DateTime", "Currency", "Impact", "Event", "Actual", "Forecast", "Previous", "Detail"] | |
| def ensure_csv_header(csv_file): | |
| if not os.path.exists(csv_file) or os.path.getsize(csv_file) == 0: | |
| pd.DataFrame(columns=CSV_COLUMNS).to_csv(csv_file, index=False) | |
| def read_existing_data(csv_file): | |
| ensure_csv_header(csv_file) | |
| try: | |
| df = pd.read_csv(csv_file, dtype=str) | |
| for col in CSV_COLUMNS: | |
| if col not in df.columns: | |
| df[col] = "" | |
| return df[CSV_COLUMNS] | |
| except Exception: | |
| return pd.DataFrame(columns=CSV_COLUMNS) | |
| def write_data_to_csv(df: pd.DataFrame, csv_file: str): | |
| df = df.sort_values(by="DateTime", ascending=True) | |
| df.to_csv(csv_file, index=False) | |
| def merge_new_data(existing_df, new_df): | |
| if existing_df.empty: | |
| return new_df | |
| def add_unique_key(df): | |
| df = df.copy() | |
| df['unique_key'] = ( | |
| df["DateTime"].astype(str).str.strip() + "_" + | |
| df["Currency"].astype(str).str.strip() + "_" + | |
| df["Event"].astype(str).str.strip() | |
| ) | |
| return df | |
| existing_df = add_unique_key(existing_df) | |
| new_df = add_unique_key(new_df) | |
| existing_df.set_index('unique_key', inplace=True) | |
| new_df.set_index('unique_key', inplace=True) | |
| new_rows_list = [] | |
| for key, new_row in new_df.iterrows(): | |
| if key in existing_df.index: | |
| existing_detail = str(existing_df.at[key, "Detail"]).strip() if pd.notna(existing_df.at[key, "Detail"]) else "" | |
| new_detail = str(new_row["Detail"]).strip() if pd.notna(new_row["Detail"]) else "" | |
| if not existing_detail and new_detail: | |
| existing_df.at[key, "Detail"] = new_detail | |
| else: | |
| new_rows_list.append(new_row) | |
| if new_rows_list: | |
| new_rows_df = pd.DataFrame(new_rows_list) | |
| existing_df = pd.concat([existing_df, new_rows_df]) | |
| merged_df = existing_df.reset_index(drop=True) | |
| merged_df = merged_df[CSV_COLUMNS] | |
| return merged_df |