File size: 3,780 Bytes
7ad2215
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import pandas as pd
import numpy as np
import io
import requests
import streamlit as st
from datetime import datetime
from typing import Optional
from pydantic import BaseModel, validator, ValidationError

from config import DATA_URLS, SAMPLE_MATERIALS, SAMPLE_SHIFTS, SAMPLE_BASE_WEIGHTS

class ProductionRecord(BaseModel):
    date: datetime
    weight_kg: float
    material_type: str
    shift: Optional[str] = None
    
    @validator('weight_kg')
    def weight_must_be_positive(cls, v):
        if v < 0:
            raise ValueError('Negative weight detected: possible sensor malfunction')
        return v

@st.cache_data
def load_preset_data(year: str) -> Optional[pd.DataFrame]:
    try:
        if year in DATA_URLS:
            response = requests.get(DATA_URLS[year], timeout=10)
            response.raise_for_status()
            df = pd.read_csv(io.StringIO(response.text), sep='\t')
            df['date'] = pd.to_datetime(df['date'], format='%m/%d/%Y')
            df['day_name'] = df['date'].dt.day_name()
            return validate_dataframe(df)
        else:
            return generate_sample_data(year)
    except Exception as e:
        st.warning(f"Could not load remote {year} data. Loading sample data instead.")
        return generate_sample_data(year)

def generate_sample_data(year: str) -> pd.DataFrame:
    np.random.seed(42 if year == "2024" else 84)
    start_date = f"01/01/{year}"
    end_date = f"12/31/{year}"
    dates = pd.date_range(start=start_date, end=end_date, freq='D')
    weekdays = dates[dates.weekday < 5]
    
    data = []
    for date in weekdays:
        for material in SAMPLE_MATERIALS:
            for shift in SAMPLE_SHIFTS:
                base_weight = SAMPLE_BASE_WEIGHTS[material]
                weight = base_weight + np.random.normal(0, base_weight * 0.2)
                weight = max(weight, base_weight * 0.3)
                
                data.append({
                    'date': date.strftime('%m/%d/%Y'),
                    'weight_kg': round(weight, 1),
                    'material_type': material,
                    'shift': shift
                })
    
    df = pd.DataFrame(data)
    df['date'] = pd.to_datetime(df['date'], format='%m/%d/%Y')
    df['day_name'] = df['date'].dt.day_name()
    return df

@st.cache_data
def load_uploaded_data(file) -> pd.DataFrame:
    df = pd.read_csv(file, sep='\t')
    df['date'] = pd.to_datetime(df['date'], format='%m/%d/%Y')
    df['day_name'] = df['date'].dt.day_name()
    return validate_dataframe(df)

def validate_dataframe(df: pd.DataFrame) -> pd.DataFrame:
    invalid_rows = []
    
    for idx, row in df.iterrows():
        try:
            record_dict = row.to_dict()
            if 'shift' not in record_dict or pd.isna(record_dict['shift']):
                record_dict['shift'] = None
            ProductionRecord(**record_dict)
        except ValidationError as e:
            invalid_rows.append((idx, str(e)))
    
    if invalid_rows:
        st.warning(f"Found {len(invalid_rows)} anomalous records, automatically filtered")
        with st.expander("View details"):
            for idx, error in invalid_rows[:5]:
                st.error(f"Row {idx}: {error}")
        
        valid_indices = [i for i in df.index if i not in [x[0] for x in invalid_rows]]
        return df.loc[valid_indices]
    
    return df

def data_health_check(df: pd.DataFrame) -> dict:
    completeness = (1 - df.isnull().sum().sum() / df.size) * 100
    time_span = (df['date'].max() - df['date'].min()).days
    last_update = df['date'].max().strftime('%Y-%m-%d')
    
    return {
        "Completeness": f"{completeness:.1f}%",
        "Time Span": f"{time_span} days",
        "Last Update": last_update,
        "Total Records": f"{len(df):,}"
    }