import pandas as pd import numpy as np import io import requests import streamlit as st from datetime import datetime from typing import Optional from pydantic import BaseModel, validator, ValidationError from config import DATA_URLS, SAMPLE_MATERIALS, SAMPLE_SHIFTS, SAMPLE_BASE_WEIGHTS class ProductionRecord(BaseModel): date: datetime weight_kg: float material_type: str shift: Optional[str] = None @validator('weight_kg') def weight_must_be_positive(cls, v): if v < 0: raise ValueError('Negative weight detected: possible sensor malfunction') return v @st.cache_data def load_preset_data(year: str) -> Optional[pd.DataFrame]: try: if year in DATA_URLS: response = requests.get(DATA_URLS[year], timeout=10) response.raise_for_status() df = pd.read_csv(io.StringIO(response.text), sep='\t') df['date'] = pd.to_datetime(df['date'], format='%m/%d/%Y') df['day_name'] = df['date'].dt.day_name() return validate_dataframe(df) else: return generate_sample_data(year) except Exception as e: st.warning(f"Could not load remote {year} data. Loading sample data instead.") return generate_sample_data(year) def generate_sample_data(year: str) -> pd.DataFrame: np.random.seed(42 if year == "2024" else 84) start_date = f"01/01/{year}" end_date = f"12/31/{year}" dates = pd.date_range(start=start_date, end=end_date, freq='D') weekdays = dates[dates.weekday < 5] data = [] for date in weekdays: for material in SAMPLE_MATERIALS: for shift in SAMPLE_SHIFTS: base_weight = SAMPLE_BASE_WEIGHTS[material] weight = base_weight + np.random.normal(0, base_weight * 0.2) weight = max(weight, base_weight * 0.3) data.append({ 'date': date.strftime('%m/%d/%Y'), 'weight_kg': round(weight, 1), 'material_type': material, 'shift': shift }) df = pd.DataFrame(data) df['date'] = pd.to_datetime(df['date'], format='%m/%d/%Y') df['day_name'] = df['date'].dt.day_name() return df @st.cache_data def load_uploaded_data(file) -> pd.DataFrame: df = pd.read_csv(file, sep='\t') df['date'] = pd.to_datetime(df['date'], format='%m/%d/%Y') df['day_name'] = df['date'].dt.day_name() return validate_dataframe(df) def validate_dataframe(df: pd.DataFrame) -> pd.DataFrame: invalid_rows = [] for idx, row in df.iterrows(): try: record_dict = row.to_dict() if 'shift' not in record_dict or pd.isna(record_dict['shift']): record_dict['shift'] = None ProductionRecord(**record_dict) except ValidationError as e: invalid_rows.append((idx, str(e))) if invalid_rows: st.warning(f"Found {len(invalid_rows)} anomalous records, automatically filtered") with st.expander("View details"): for idx, error in invalid_rows[:5]: st.error(f"Row {idx}: {error}") valid_indices = [i for i in df.index if i not in [x[0] for x in invalid_rows]] return df.loc[valid_indices] return df def data_health_check(df: pd.DataFrame) -> dict: completeness = (1 - df.isnull().sum().sum() / df.size) * 100 time_span = (df['date'].max() - df['date'].min()).days last_update = df['date'].max().strftime('%Y-%m-%d') return { "Completeness": f"{completeness:.1f}%", "Time Span": f"{time_span} days", "Last Update": last_update, "Total Records": f"{len(df):,}" }