CS5130_finalProject / data_processor.py
Khang Nguyen
inital commit
aa893a9
import pandas as pd
def load_data(file):
"""
Load a CSV or Excel file into a pandas DataFrame.
This function should work both with:
- a file path string
- a Gradio UploadedFile object (has .name)
"""
try:
# If "file" is a Gradio upload, it has a .name attribute.
if hasattr(file, "name"):
path = file.name
else:
path = file
if path.endswith(".csv"):
df = pd.read_csv(path)
elif path.endswith(".xlsx") or path.endswith(".xls"):
df = pd.read_excel(path)
else:
raise ValueError("Only .csv, .xlsx, or .xls files are supported.")
# Try to parse any column that already looks like a date.
# For your Tesla data, "Date" will be parsed correctly.
for col in df.columns:
if "date" in col.lower():
try:
df[col] = pd.to_datetime(df[col])
except Exception:
# If parsing fails, just keep it as is.
pass
return df, None
except Exception as e:
# Return None and an error message so Gradio can display it.
return None, f"Error loading data: {e}"
def get_basic_info(df):
"""
Return basic information about the dataset:
- number of rows and columns
- column names
- data types as strings
"""
shape = df.shape
columns = list(df.columns)
dtypes = df.dtypes.astype(str).to_dict()
info = {
"n_rows": shape[0],
"n_cols": shape[1],
"columns": columns,
"dtypes": dtypes,
}
return info
def detect_column_types(df):
"""
Split columns into:
- numeric_cols
- categorical_cols
- date_cols
This will be used for:
- summary statistics
- filters
- visualizations
"""
numeric_cols = df.select_dtypes(include=["number"]).columns.tolist()
date_cols = df.select_dtypes(include=["datetime64[ns]", "datetime64[ns, UTC]"]).columns.tolist()
# Everything else is treated as categorical for this project.
categorical_cols = [col for col in df.columns if col not in numeric_cols + date_cols]
col_types = {
"numeric": numeric_cols,
"categorical": categorical_cols,
"date": date_cols,
}
return col_types
def numeric_summary(df, numeric_cols):
"""
Calculate summary statistics for numeric columns.
Returns a DataFrame where each row is a column and
columns include: count, mean, std, min, 25%, 50%, 75%, max
"""
if not numeric_cols:
return pd.DataFrame()
summary = df[numeric_cols].describe().T # transpose so each row is a column
summary = summary.reset_index().rename(columns={"index": "column"})
return summary
def categorical_summary(df, categorical_cols, max_unique_to_show=20):
"""
Create a summary for categorical columns.
For each categorical column we will show:
- number of unique values
- the most frequent value (mode)
- frequency of the mode
- up to 'max_unique_to_show' value counts (for display if needed)
"""
rows = []
for col in categorical_cols:
series = df[col].astype("object")
n_unique = series.nunique(dropna=False)
# Mode (most common value)
if not series.mode(dropna=False).empty:
mode_value = series.mode(dropna=False).iloc[0]
else:
mode_value = None
value_counts = series.value_counts(dropna=False)
mode_freq = int(value_counts.iloc[0]) if len(value_counts) > 0 else 0
# We keep the top value counts as a JSON-like string to show in a table if needed.
top_values = value_counts.head(max_unique_to_show).to_dict()
rows.append(
{
"column": col,
"unique_values": int(n_unique),
"mode": mode_value,
"mode_freq": mode_freq,
"top_values": str(top_values),
}
)
if not rows:
return pd.DataFrame()
summary_df = pd.DataFrame(rows)
return summary_df
def missing_values_report(df):
"""
Return a DataFrame with:
- column name
- number of missing values
- percentage of missing values
"""
total_rows = len(df)
missing_counts = df.isna().sum()
rows = []
for col, count in missing_counts.items():
if total_rows > 0:
pct = (count / total_rows) * 100
else:
pct = 0.0
rows.append(
{
"column": col,
"missing_count": int(count),
"missing_pct": round(pct, 2),
}
)
report_df = pd.DataFrame(rows)
return report_df
def correlation_matrix(df, numeric_cols):
"""
Compute the correlation matrix for numeric columns.
"""
if len(numeric_cols) < 2:
return pd.DataFrame()
corr = df[numeric_cols].corr()
return corr
def build_filter_metadata(df, col_types):
"""
Prepare simple metadata that the Gradio UI can use to build filters.
For numeric columns:
min and max value
For categorical columns:
sorted list of unique values
For date columns:
min and max date
"""
meta = {
"numeric": {},
"categorical": {},
"date": {},
}
# Numeric ranges
for col in col_types["numeric"]:
col_series = df[col].dropna()
if col_series.empty:
continue
meta["numeric"][col] = {
"min": float(col_series.min()),
"max": float(col_series.max()),
}
# Categorical unique values
for col in col_types["categorical"]:
unique_vals = df[col].dropna().unique().tolist()
# Convert numpy types to plain Python for safety
unique_vals = [str(v) for v in unique_vals]
meta["categorical"][col] = sorted(unique_vals)
# Date min/max
for col in col_types["date"]:
col_series = df[col].dropna()
if col_series.empty:
continue
meta["date"][col] = {
"min": col_series.min(),
"max": col_series.max(),
}
return meta
def apply_filters(df, numeric_filters=None, categorical_filters=None, date_filters=None):
"""
Apply simple filters to the DataFrame.
numeric_filters: dict like
{
"Estimated_Deliveries": [min_val, max_val],
"Production_Units": [min_val, max_val],
}
categorical_filters: dict like
{
"Region": ["Europe", "Asia"],
"Model": ["Model 3", "Model Y"],
}
date_filters: dict like
{
"Date": ["2018-01-01", "2023-12-31"]
}
All arguments are optional. If a filter dict is None, it is ignored.
"""
filtered = df.copy()
# Numeric ranges
if numeric_filters:
for col, bounds in numeric_filters.items():
if col not in filtered.columns:
continue
try:
min_val, max_val = bounds
filtered = filtered[
(filtered[col] >= min_val) & (filtered[col] <= max_val)
]
except Exception:
# If something goes wrong, just skip this column filter.
continue
# Categorical selections (multi-select)
if categorical_filters:
for col, allowed_values in categorical_filters.items():
if col not in filtered.columns:
continue
if not allowed_values:
# If list is empty, skip this filter.
continue
filtered = filtered[filtered[col].astype(str).isin(allowed_values)]
# Date range filters
if date_filters:
for col, bounds in date_filters.items():
if col not in filtered.columns:
continue
try:
start, end = bounds
# Convert to datetime just in case inputs are strings.
start = pd.to_datetime(start)
end = pd.to_datetime(end)
filtered = filtered[
(filtered[col] >= start) & (filtered[col] <= end)
]
except Exception:
continue
return filtered