| | import pandas as pd |
| | import numpy as np |
| | import io |
| | import requests |
| | import streamlit as st |
| | from datetime import datetime |
| | from typing import Optional |
| | from pydantic import BaseModel, validator, ValidationError |
| |
|
| | from config import DATA_URLS, SAMPLE_MATERIALS, SAMPLE_SHIFTS, SAMPLE_BASE_WEIGHTS |
| |
|
| | class ProductionRecord(BaseModel): |
| | date: datetime |
| | weight_kg: float |
| | material_type: str |
| | shift: Optional[str] = None |
| | |
| | @validator('weight_kg') |
| | def weight_must_be_positive(cls, v): |
| | if v < 0: |
| | raise ValueError('Negative weight detected: possible sensor malfunction') |
| | return v |
| |
|
| | @st.cache_data |
| | def load_preset_data(year: str) -> Optional[pd.DataFrame]: |
| | try: |
| | if year in DATA_URLS: |
| | response = requests.get(DATA_URLS[year], timeout=10) |
| | response.raise_for_status() |
| | df = pd.read_csv(io.StringIO(response.text), sep='\t') |
| | df['date'] = pd.to_datetime(df['date'], format='%m/%d/%Y') |
| | df['day_name'] = df['date'].dt.day_name() |
| | return validate_dataframe(df) |
| | else: |
| | return generate_sample_data(year) |
| | except Exception as e: |
| | st.warning(f"Could not load remote {year} data. Loading sample data instead.") |
| | return generate_sample_data(year) |
| |
|
| | def generate_sample_data(year: str) -> pd.DataFrame: |
| | np.random.seed(42 if year == "2024" else 84) |
| | start_date = f"01/01/{year}" |
| | end_date = f"12/31/{year}" |
| | dates = pd.date_range(start=start_date, end=end_date, freq='D') |
| | weekdays = dates[dates.weekday < 5] |
| | |
| | data = [] |
| | for date in weekdays: |
| | for material in SAMPLE_MATERIALS: |
| | for shift in SAMPLE_SHIFTS: |
| | base_weight = SAMPLE_BASE_WEIGHTS[material] |
| | weight = base_weight + np.random.normal(0, base_weight * 0.2) |
| | weight = max(weight, base_weight * 0.3) |
| | |
| | data.append({ |
| | 'date': date.strftime('%m/%d/%Y'), |
| | 'weight_kg': round(weight, 1), |
| | 'material_type': material, |
| | 'shift': shift |
| | }) |
| | |
| | df = pd.DataFrame(data) |
| | df['date'] = pd.to_datetime(df['date'], format='%m/%d/%Y') |
| | df['day_name'] = df['date'].dt.day_name() |
| | return df |
| |
|
| | @st.cache_data |
| | def load_uploaded_data(file) -> pd.DataFrame: |
| | df = pd.read_csv(file, sep='\t') |
| | df['date'] = pd.to_datetime(df['date'], format='%m/%d/%Y') |
| | df['day_name'] = df['date'].dt.day_name() |
| | return validate_dataframe(df) |
| |
|
| | def validate_dataframe(df: pd.DataFrame) -> pd.DataFrame: |
| | invalid_rows = [] |
| | |
| | for idx, row in df.iterrows(): |
| | try: |
| | record_dict = row.to_dict() |
| | if 'shift' not in record_dict or pd.isna(record_dict['shift']): |
| | record_dict['shift'] = None |
| | ProductionRecord(**record_dict) |
| | except ValidationError as e: |
| | invalid_rows.append((idx, str(e))) |
| | |
| | if invalid_rows: |
| | st.warning(f"Found {len(invalid_rows)} anomalous records, automatically filtered") |
| | with st.expander("View details"): |
| | for idx, error in invalid_rows[:5]: |
| | st.error(f"Row {idx}: {error}") |
| | |
| | valid_indices = [i for i in df.index if i not in [x[0] for x in invalid_rows]] |
| | return df.loc[valid_indices] |
| | |
| | return df |
| |
|
| | def data_health_check(df: pd.DataFrame) -> dict: |
| | completeness = (1 - df.isnull().sum().sum() / df.size) * 100 |
| | time_span = (df['date'].max() - df['date'].min()).days |
| | last_update = df['date'].max().strftime('%Y-%m-%d') |
| | |
| | return { |
| | "Completeness": f"{completeness:.1f}%", |
| | "Time Span": f"{time_span} days", |
| | "Last Update": last_update, |
| | "Total Records": f"{len(df):,}" |
| | } |