File size: 3,466 Bytes
1eaee2c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 |
import pandas as pd
import numpy as np
from pathlib import Path
import sys
sys.path.append(str(Path(__file__).resolve().parents[1]))
from utils.logger import *
import logging
logger = logging.getLogger(__name__)
def clean_news_events(df):
required_columns = ['title', 'publishedAt', 'description', 'source', 'url']
df = df[[col for col in required_columns if col in df.columns]]
df = df.drop_duplicates(subset=['title', 'publishedAt'])
df['title'] = df['title'].str.strip().str.lower()
df['description'] = df['description'].str.strip().str.lower()
df['publishedAt'] = pd.to_datetime(df['publishedAt'], errors='coerce')
df = df.dropna(subset=['title', 'publishedAt'])
logger.info(f"Cleaned news events: {df.shape}")
return df
def clean_weather_alerts(df):
keep_cols = ['city', 'country', 'lat', 'lon', 'weather_main', 'timestamp']
df = df[[col for col in keep_cols if col in df.columns]].copy()
df['city'] = df['city'].str.strip().str.title()
df['country'] = df['country'].str.strip().str.upper()
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='s', errors='coerce')
df = df.dropna(subset=['city', 'timestamp'])
logger.info(f"Cleaned weather alerts: {df.shape}")
return df
def clean_supply_chain_disruptions(df):
df = df.drop_duplicates()
df['order date (DateOrders)'] = pd.to_datetime(df['order date (DateOrders)'], errors='coerce')
df['shipping date (DateOrders)'] = pd.to_datetime(df['shipping date (DateOrders)'], errors='coerce')
if 'Late_delivery_risk' in df.columns:
df['Late_delivery_risk'] = df['Late_delivery_risk'].fillna(0).astype(int)
if 'Order Status' in df.columns:
df['Order Status'] = df['Order Status'].str.strip().str.title()
logger.info(f"Cleaned supply chain CSV: {df.shape}")
return df
if __name__ == "__main__":
artifacts = Path(__file__).resolve().parents[2] / "artifacts" / "data" / "raw"
try:
news_df = pd.read_json(artifacts / "news_events.json")
cleaned_news = clean_news_events(news_df)
logger.info(f"News Alerts cleaned successfully: shape {cleaned_news.shape}")
except Exception as e:
logger.error(f"Error cleaning news: {e}")
try:
weather_df = pd.read_json(artifacts / "weather_alerts.json")
cleaned_weather = clean_weather_alerts(weather_df)
logger.info(f"Weather Alerts cleaned successfully: shape {cleaned_weather.shape}")
except Exception as e:
logger.error(f"Error cleaning weather: {e}")
try:
try:
sc_df = pd.read_csv(artifacts / "DataCoSupplyChainDataset.csv", encoding="utf-8")
except UnicodeDecodeError:
sc_df = pd.read_csv(artifacts / "DataCoSupplyChainDataset.csv", encoding="ISO-8859-1")
cleaned_sc = clean_supply_chain_disruptions(sc_df)
logger.info(f"Supply chain CSV cleaned successfully: shape {cleaned_sc.shape}")
except Exception as e:
logger.error(f"Error cleaning supply chain CSV: {e}")
processed_dir = Path(__file__).resolve().parents[2] / "artifacts" / "data" / "processed"
processed_dir.mkdir(parents=True, exist_ok=True)
# Save cleaned datasets
cleaned_news.to_csv(processed_dir / "news_events_clean.csv", index=False)
cleaned_weather.to_csv(processed_dir / "weather_alerts_clean.csv", index=False)
cleaned_sc.to_csv(processed_dir / "supply_chain_disruptions_clean.csv", index=False)
|