""" Enhanced Data processing utilities for UAP Data Analysis Tool Centralizes filtering, transformation, and data handling logic with intelligent caching and optimization """ import pandas as pd import numpy as np import streamlit as st from dateutil import parser from pandas.api.types import ( is_categorical_dtype, is_datetime64_any_dtype, is_numeric_dtype, is_object_dtype, ) from concurrent.futures import ThreadPoolExecutor, as_completed import concurrent.futures import json import logging import hashlib import time from typing import Dict, Any, List, Tuple, Optional, Union from functools import wraps, lru_cache # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def performance_monitor(func): """Decorator to monitor function performance""" @wraps(func) def wrapper(*args, **kwargs): start_time = time.time() result = func(*args, **kwargs) end_time = time.time() logger.info(f"{func.__name__} took {end_time - start_time:.2f} seconds") return result return wrapper class DataProcessor: """Enhanced centralized data processing functionality with intelligent filtering and caching""" # Class-level cache for filter states _filter_cache = {} _data_profile_cache = {} @staticmethod def _get_dataframe_hash(df: pd.DataFrame) -> str: """Generate a robust hash for a dataframe to use as cache key. Handles unhashable cell types (lists/dicts/sets) gracefully. """ try: # Fast path: hash pandas object bytes hashed = pd.util.hash_pandas_object(df, index=True).values.tobytes() return hashlib.md5(hashed).hexdigest()[:16] except Exception: # Fallback: serialize to JSON with safe default handler try: serialized = df.to_json(orient='split', default_handler=str) except Exception: # Last resort: stringify cells serialized = df.astype(str).to_csv(index=True) return hashlib.md5(serialized.encode('utf-8')).hexdigest()[:16] @staticmethod @performance_monitor def profile_data(df: pd.DataFrame) -> Dict[str, Any]: """Profile dataframe to optimize filtering UI""" df_hash = DataProcessor._get_dataframe_hash(df) if df_hash in DataProcessor._data_profile_cache: return DataProcessor._data_profile_cache[df_hash] profile = { 'shape': df.shape, 'columns': list(df.columns), 'dtypes': df.dtypes.to_dict(), 'null_counts': df.isnull().sum().to_dict(), 'memory_usage': df.memory_usage(deep=True).sum(), 'categorical_columns': [], 'numeric_columns': [], 'datetime_columns': [], 'text_columns': [], 'high_cardinality_columns': [] } for col in df.columns: # Safe nunique for unhashable values try: unique_count = df[col].nunique() except Exception: unique_count = None total_count = len(df) if is_categorical_dtype(df[col]) or ( unique_count is not None and unique_count < 120 and not is_datetime64_any_dtype(df[col]) and not is_numeric_dtype(df[col]) ): profile['categorical_columns'].append({ 'name': col, 'unique_count': unique_count if unique_count is not None else -1, 'top_values': (df[col].value_counts().head(10).to_dict() if unique_count is not None else df[col].astype(str).value_counts().head(10).to_dict()) }) elif is_numeric_dtype(df[col]): profile['numeric_columns'].append({ 'name': col, 'min': float(df[col].min()), 'max': float(df[col].max()), 'mean': float(df[col].mean()), 'std': float(df[col].std()) if df[col].std() is not None else 0 }) elif is_datetime64_any_dtype(df[col]): # Already a datetime dtype try: mn = df[col].dropna().min() mx = df[col].dropna().max() profile['datetime_columns'].append({ 'name': col, 'min_date': str(mn) if pd.notna(mn) else '', 'max_date': str(mx) if pd.notna(mx) else '', }) except Exception: profile['text_columns'].append(col) elif is_object_dtype(df[col]): # Treat as text by default; only classify as datetime if a strong sample converts cleanly try: sample = df[col].dropna().astype(str).head(50) # Require a strong signal (>70%) of parsable strings to consider datetime parsed = pd.to_datetime(sample, errors='coerce') if parsed.notna().mean() > 0.7: converted = pd.to_datetime(df[col], errors='coerce') mn = converted.dropna().min() mx = converted.dropna().max() profile['datetime_columns'].append({ 'name': col, 'min_date': str(mn) if pd.notna(mn) else '', 'max_date': str(mx) if pd.notna(mx) else '', }) else: profile['text_columns'].append(col) except Exception: profile['text_columns'].append(col) # Flag high cardinality columns that might need special handling if unique_count is not None and unique_count > total_count * 0.8: profile['high_cardinality_columns'].append(col) DataProcessor._data_profile_cache[df_hash] = profile return profile @staticmethod def filter_dataframe_enhanced(df: pd.DataFrame, for_map: bool = False, enable_quick_filters: bool = False, enable_advanced_filters: bool = True) -> pd.DataFrame: """Enhanced filtering interface that builds on top of helper methods. Shows a compact data profile, optional quick filters, and an advanced filter builder. """ from utils.session_manager import SessionStateManager SessionStateManager.initialize() profile = DataProcessor.profile_data(df) df_ = df.copy() # Top-level summary try: c1, c2, c3, c4 = st.columns(4) with c1: st.metric("Rows", f"{len(df_):,}") with c2: st.metric("Columns", len(df_.columns)) with c3: st.metric("Memory", f"{profile['memory_usage'] / 1024**2:.1f} MB") with c4: st.metric("Nulls", sum(profile['null_counts'].values())) except Exception: pass # Quick filters if enable_quick_filters: with st.expander("Quick Filters", expanded=False): df_ = DataProcessor._render_quick_filters(df_, profile) # Advanced filters if enable_advanced_filters: with st.expander("Advanced Filters", expanded=False): categories = { "📊 Categorical": [c['name'] for c in profile['categorical_columns']], "🔢 Numeric": [c['name'] for c in profile['numeric_columns']], "📅 DateTime": [c['name'] for c in profile['datetime_columns']], "📝 Text": profile['text_columns'] } options = [] for k, cols in categories.items(): options.extend([f"{k}: {col}" for col in cols]) try: selected = st.multiselect("Select columns to filter", options) except: try: selected = st.multiselect("Select columns to filter on", options) except: pass active = {} for item in selected: if ": " in item: k, col = item.split(": ", 1) active[col] = k if active: df_ = DataProcessor._apply_intelligent_filters(df_, active, profile) # Results summary try: st.write(f"{len(df_)} rows ({(len(df_) / max(len(df),1)) * 100:.2f}%)") except Exception: pass if for_map: df_ = DataProcessor._prepare_for_mapping(df_) return df_ @staticmethod @st.cache_data(show_spinner="Applying intelligent filters...") def filter_dataframe(df: pd.DataFrame, for_map: bool = True, enable_quick_filters: bool = False) -> pd.DataFrame: """ Adds a UI on top of a dataframe to let viewers filter columns Args: df (pd.DataFrame): Original dataframe for_map (bool): If True, convert datetime columns to string at the end for mapping layers Returns: pd.DataFrame: Filtered dataframe """ from utils.visualization import UAP_Visualizer df_ = df.copy() # Get columns to filter to_filter_columns = st.multiselect("Filter dataframe on", df_.columns) date_column = None filtered_columns = [] for column in to_filter_columns: left, right = st.columns((1, 20)) # Handle categorical columns if is_categorical_dtype(df_[column]) or (df_[column].nunique() < 120 and not is_datetime64_any_dtype(df_[column]) and not is_numeric_dtype(df_[column])): user_cat_input = right.multiselect( f"Values for {column}", df_[column].value_counts().index.tolist(), default=list(df_[column].value_counts().index) ) df_ = df_[df_[column].isin(user_cat_input)] filtered_columns.append(column) with st.status(f"Category Distribution: {column}", expanded=False) as stat: st.pyplot(UAP_Visualizer.plot_treemap(df_, column)) # Handle numeric columns elif is_numeric_dtype(df_[column]): _min = float(df_[column].min()) _max = float(df_[column].max()) step = (_max - _min) / 100 user_num_input = right.slider( f"Values for {column}", min_value=_min, max_value=_max, value=(_min, _max), step=step, ) df_ = df_[df_[column].between(*user_num_input)] filtered_columns.append(column) with st.status(f"Numerical Distribution: {column}", expanded=False) as stat_: bins = int(round(len(df_[column].unique())-1)/2) if len(df_[column].unique()) > 2 else 10 st.pyplot(UAP_Visualizer.plot_hist(df_, column, bins=bins)) # Handle date columns elif is_object_dtype(df_[column]): # Only handle as date if strong evidence; otherwise treat as text df_ = DataProcessor._handle_date_column(df_, column, right, filtered_columns) if pd.api.types.is_datetime64_any_dtype(df_[column]): date_column = column # Handle text columns else: user_text_input = right.text_input( f"Substring or regex in {column}", ) if user_text_input: df_ = df_[df_[column].astype(str).str.contains(user_text_input, regex=True, na=False)] # Display filter results st.write(f"{len(df_)} rows ({len(df_) / len(df) * 100:.2f}%)") # Optional: convert datetime columns to string for mapping libraries if for_map: for col in df_.columns: if is_datetime64_any_dtype(df_[col]): try: df_[col] = df_[col].dt.strftime('%Y-%m-%d %H:%M:%S') except Exception: # If conversion fails, leave column as-is pass return df_ @staticmethod def _handle_date_column(df_, column, right_col, filtered_columns): """Handle date column filtering and visualization""" from utils.visualization import UAP_Visualizer # Try to convert to datetime try: df_[column] = pd.to_datetime(df_[column], infer_datetime_format=True, errors='coerce') except Exception: try: df_[column] = df_[column].apply(parser.parse) except Exception: pass if is_datetime64_any_dtype(df_[column]): df_[column] = df_[column].dt.tz_localize(None) valid = df_[column].dropna() if valid.empty: return df_ try: min_date = valid.min().date() max_date = valid.max().date() except (OverflowError, ValueError, OSError): return df_ user_date_input = right_col.date_input( f"Values for {column}", value=(min_date, max_date), min_value=min_date, max_value=max_date, ) if len(user_date_input) == 2: user_date_input = tuple(map(pd.to_datetime, user_date_input)) start_date, end_date = user_date_input # Create date distribution visualization DataProcessor._visualize_date_distribution(df_, column, start_date, end_date) df_ = df_.loc[df_[column].between(start_date, end_date)] return df_ @staticmethod def _visualize_date_distribution(df_, column, start_date, end_date): """Create date distribution visualizations""" from utils.visualization import UAP_Visualizer # Determine the most appropriate time unit for plot time_units = { 'year': df_[column].dt.year, 'month': df_[column].dt.to_period('M'), 'day': df_[column].dt.date } unique_counts = {unit: col.nunique() for unit, col in time_units.items()} closest_to_36 = min(unique_counts, key=lambda k: abs(unique_counts[k] - 36)) # Group by the most appropriate time unit and count occurrences grouped = df_.groupby(time_units[closest_to_36]).size().reset_index(name='count') grouped.columns = [column, 'count'] # Create a complete date range if closest_to_36 == 'year': date_range = pd.date_range(start=f"{start_date.year}-01-01", end=f"{end_date.year}-12-31", freq='YS') elif closest_to_36 == 'month': date_range = pd.date_range(start=start_date.replace(day=1), end=end_date + pd.offsets.MonthEnd(0), freq='MS') else: # day date_range = pd.date_range(start=start_date, end=end_date, freq='D') # Create a DataFrame with the complete date range complete_range = pd.DataFrame({column: date_range}) # Convert the date column to the appropriate format if closest_to_36 == 'year': complete_range[column] = complete_range[column].dt.year elif closest_to_36 == 'month': complete_range[column] = complete_range[column].dt.to_period('M') # Merge the complete range with the grouped data final_data = pd.merge(complete_range, grouped, on=column, how='left').fillna(0) with st.status(f"Date Distributions: {column}", expanded=False) as stat: try: st.pyplot(UAP_Visualizer.plot_bar(final_data, column, 'count')) except Exception as e: st.error(f"Error plotting bar chart: {e}") @staticmethod @st.cache_data def load_data(file_path: str, key: str = 'df') -> pd.DataFrame: """Load data from HDF5 file with caching""" try: return pd.read_hdf(file_path, key=key) except Exception as e: logger.error(f"Error loading data from {file_path}: {e}") raise @staticmethod def parse_responses_parallel(responses: Dict[str, str], max_workers: int = 4) -> Dict[str, Any]: """Parse responses in parallel for better performance""" def parse_single_response(key: str, value: str) -> Tuple[str, Any]: """Parse a single response with proper error handling""" try: return key, json.loads(value) except json.JSONDecodeError as e: logger.warning(f"JSON decode error for key {key}: {e}") try: # Try with single quotes replaced return key, json.loads(value.replace("'", '"')) except json.JSONDecodeError: logger.error(f"Failed to parse response for key {key}") return key, None results = {} failed_count = 0 with ThreadPoolExecutor(max_workers=max_workers) as executor: # Submit all tasks futures = {executor.submit(parse_single_response, k, v): k for k, v in responses.items()} # Process completed tasks for future in as_completed(futures): key = futures[future] try: k, parsed_value = future.result() if parsed_value is not None: results[k] = parsed_value else: failed_count += 1 except Exception as e: logger.error(f"Unexpected error parsing key {key}: {e}") failed_count += 1 logger.info(f"Successfully parsed {len(results)} responses, {failed_count} failed") return results @staticmethod def find_lat_lon_columns(df: pd.DataFrame) -> Tuple[Optional[str], Optional[str]]: """Find latitude and longitude columns in dataframe""" lat_columns = df.columns[df.columns.str.lower().str.contains('lat')] lon_columns = df.columns[df.columns.str.lower().str.contains('lon|lng')] if len(lat_columns) > 0 and len(lon_columns) > 0: return lat_columns[0], lon_columns[0] else: return None, None @staticmethod def merge_clusters(df, column, distance_threshold: int = 3): """Merge similar clusters based on Levenshtein distance""" from Levenshtein import distance cluster_terms_ = df.__dict__.get('cluster_terms', []) cluster_labels_ = df.__dict__.get('cluster_labels', []) if not cluster_terms_ or not cluster_labels_: logger.warning("No cluster information found") return [] merge_map = {} # Iterate over term pairs and decide on merging based on the distance for idx, term1 in enumerate(cluster_terms_): for jdx, term2 in enumerate(cluster_terms_): if idx < jdx and distance(term1, term2) <= distance_threshold: # Find labels corresponding to jdx and map them to idx labels_to_merge = [label for label, term_index in enumerate(cluster_labels_) if term_index == jdx] for label in labels_to_merge: merge_map[label] = idx # Update the analyzer with the merged numeric labels updated_cluster_labels_ = [merge_map.get(label, label) for label in cluster_labels_] df.__dict__['cluster_labels'] = updated_cluster_labels_ # Update string labels to reflect merged labels updated_string_labels = [cluster_terms_[label] for label in updated_cluster_labels_] df.__dict__['string_labels'] = updated_string_labels return updated_string_labels @staticmethod def _render_quick_filters(df: pd.DataFrame, profile: Dict[str, Any]) -> pd.DataFrame: """Render quick filter presets for common filtering scenarios""" quick_filter_options = [] # Add quick filters based on data characteristics if profile['categorical_columns']: quick_filter_options.extend([ "🏆 Top Categories Only", "🔍 Remove Rare Categories", "📊 Balanced Sample" ]) if profile['numeric_columns']: quick_filter_options.extend([ "📈 Remove Outliers", "🎯 Focus on Normal Range" ]) if profile['datetime_columns']: quick_filter_options.extend([ "📅 Recent Data (Last Year)", "🕐 Recent Data (Last Month)" ]) if quick_filter_options: selected_quick_filter = st.selectbox( "Apply Quick Filter", options=["None"] + quick_filter_options, help="Pre-configured filters for common analysis scenarios" ) if selected_quick_filter != "None": return DataProcessor._apply_quick_filter(df, selected_quick_filter, profile) return df @staticmethod def _apply_quick_filter(df: pd.DataFrame, filter_type: str, profile: Dict[str, Any]) -> pd.DataFrame: """Apply predefined quick filters""" df_filtered = df.copy() if filter_type == "🏆 Top Categories Only": # Keep only top 5 categories for each categorical column for col_info in profile['categorical_columns']: col = col_info['name'] top_categories = list(col_info['top_values'].keys())[:5] df_filtered = df_filtered[df_filtered[col].isin(top_categories)] elif filter_type == "🔍 Remove Rare Categories": # Remove categories that appear less than 1% of the time min_count = len(df) * 0.01 for col_info in profile['categorical_columns']: col = col_info['name'] value_counts = df[col].value_counts() frequent_values = value_counts[value_counts >= min_count].index df_filtered = df_filtered[df_filtered[col].isin(frequent_values)] elif filter_type == "📈 Remove Outliers": # Remove statistical outliers using IQR method for col_info in profile['numeric_columns']: col = col_info['name'] Q1 = df[col].quantile(0.25) Q3 = df[col].quantile(0.75) IQR = Q3 - Q1 lower_bound = Q1 - 1.5 * IQR upper_bound = Q3 + 1.5 * IQR df_filtered = df_filtered[(df_filtered[col] >= lower_bound) & (df_filtered[col] <= upper_bound)] return df_filtered @staticmethod def _apply_intelligent_filters(df: pd.DataFrame, active_filters: Dict[str, str], profile: Dict[str, Any]) -> pd.DataFrame: """Apply filters with intelligent optimization and caching""" df_filtered = df.copy() for column, category in active_filters.items(): st.write(f"### {category.replace('📊 ', '').replace('🔢 ', '').replace('📅 ', '').replace('📝 ', '')} Filter: {column}") if "Categorical" in category: df_filtered = DataProcessor._apply_categorical_filter(df_filtered, column, profile) elif "Numeric" in category: df_filtered = DataProcessor._apply_numeric_filter(df_filtered, column, profile) elif "DateTime" in category: df_filtered = DataProcessor._apply_datetime_filter(df_filtered, column, profile) elif "Text" in category: df_filtered = DataProcessor._apply_text_filter(df_filtered, column) return df_filtered @staticmethod def _apply_categorical_filter(df: pd.DataFrame, column: str, profile: Dict[str, Any]) -> pd.DataFrame: """Apply enhanced categorical filtering with visualization""" from utils.visualization import UAP_Visualizer col_info = next((col for col in profile['categorical_columns'] if col['name'] == column), None) if not col_info: return df col1, col2 = st.columns([1, 2]) with col1: # Smart selection options selection_mode = st.radio( f"Selection mode for {column}", ["Select specific values", "Select top N", "Exclude values"], key=f"selection_mode_{column}" ) if selection_mode == "Select specific values": available_values = df[column].value_counts().index.tolist() selected_values = st.multiselect( f"Values for {column}", options=available_values, default=available_values[:min(5, len(available_values))], key=f"multiselect_{column}" ) df_filtered = df[df[column].isin(selected_values)] elif selection_mode == "Select top N": top_n = st.slider(f"Top N categories for {column}", 1, min(20, col_info['unique_count']), 5, key=f"topn_{column}") top_values = df[column].value_counts().head(top_n).index.tolist() df_filtered = df[df[column].isin(top_values)] else: # Exclude values exclude_values = st.multiselect( f"Exclude values from {column}", options=df[column].value_counts().index.tolist(), key=f"exclude_{column}" ) df_filtered = df[~df[column].isin(exclude_values)] with col2: if len(df_filtered) > 0: with st.container(): st.pyplot(UAP_Visualizer.plot_treemap(df_filtered, column, top_n=15)) return df_filtered @staticmethod def _apply_numeric_filter(df: pd.DataFrame, column: str, profile: Dict[str, Any]) -> pd.DataFrame: """Apply enhanced numeric filtering with statistics""" from utils.visualization import UAP_Visualizer col_info = next((col for col in profile['numeric_columns'] if col['name'] == column), None) if not col_info: return df # Binary boolean columns (values ⊆ {0, 1}) — Range/Percentile/StdDev are # meaningless here, so offer a 0/1 value picker instead of a slider. _nonnull = df[column].dropna() _uniq = set(_nonnull.unique()) if _uniq and _uniq.issubset({0, 1}): bcol1, bcol2 = st.columns([1, 2]) with bcol1: _opts = sorted(_uniq) picked = st.multiselect( f"Values for {column}", _opts, default=_opts, format_func=lambda v: f"{int(v)} — {'true' if int(v) == 1 else 'false'}", key=f"binary_{column}", ) df_filtered = df[df[column].isin(picked)] with bcol2: if len(df_filtered) > 0: _vc = df_filtered[column].value_counts().sort_index() _vc.index = _vc.index.map( lambda v: f"{int(v)} ({'true' if int(v) == 1 else 'false'})" ) st.bar_chart(_vc) return df_filtered col1, col2 = st.columns([1, 2]) with col1: filter_mode = st.radio( f"Filter mode for {column}", ["Range", "Percentile", "Standard Deviation"], key=f"numeric_mode_{column}" ) if filter_mode == "Range": min_val, max_val = st.slider( f"Range for {column}", min_value=col_info['min'], max_value=col_info['max'], value=(col_info['min'], col_info['max']), key=f"range_{column}" ) df_filtered = df[df[column].between(min_val, max_val)] elif filter_mode == "Percentile": lower_pct, upper_pct = st.slider( f"Percentile range for {column}", 0, 100, (10, 90), key=f"percentile_{column}" ) lower_val = df[column].quantile(lower_pct / 100) upper_val = df[column].quantile(upper_pct / 100) df_filtered = df[df[column].between(lower_val, upper_val)] else: # Standard Deviation std_range = st.slider( f"Standard deviations from mean for {column}", 0.5, 3.0, 2.0, step=0.5, key=f"std_{column}" ) mean_val = col_info['mean'] std_val = col_info['std'] lower_bound = mean_val - (std_range * std_val) upper_bound = mean_val + (std_range * std_val) df_filtered = df[df[column].between(lower_bound, upper_bound)] with col2: if len(df_filtered) > 0: bins = min(50, max(10, len(df_filtered[column].unique()))) st.pyplot(UAP_Visualizer.plot_hist(df_filtered, column, bins=bins)) return df_filtered @staticmethod def _apply_datetime_filter(df: pd.DataFrame, column: str, profile: Dict[str, Any]) -> pd.DataFrame: """Apply enhanced datetime filtering with multiple modes and preview.""" # Ensure datetime dtype try: df_local = df.copy() df_local[column] = pd.to_datetime(df_local[column], errors='coerce') except Exception: st.warning(f"Could not parse {column} as datetime") return df if not is_datetime64_any_dtype(df_local[column]): st.info(f"{column} is not datetime-like; skipping date filter") return df # Normalize timezone and precision try: df_local[column] = df_local[column].dt.tz_localize(None) except Exception: pass df_local[column] = df_local[column].dt.floor('ms') valid = df_local[column].dropna() if valid.empty: st.info(f"No valid datetime values in {column}") return df try: min_date = valid.min().date() max_date = valid.max().date() except (OverflowError, ValueError, OSError): st.info(f"{column} contains dates outside the supported range; skipping date filter") return df col1, col2 = st.columns([1, 2]) with col1: mode = st.radio( f"Date filter for {column}", ["Date Range", "Relative Period", "Specific Years"], key=f"dt_mode_{column}" ) if mode == "Date Range": start_date, end_date = st.date_input( f"Range for {column}", value=(min_date, max_date), min_value=min_date, max_value=max_date, key=f"dt_range_{column}" ) if isinstance(start_date, tuple): start_date, end_date = start_date mask = df_local[column].dt.date.between(start_date, end_date) df_filtered = df_local[mask] elif mode == "Relative Period": choice = st.selectbox( f"Relative period for {column}", ["Last 7 days", "Last 30 days", "Last 90 days", "Last year"], key=f"dt_rel_{column}" ) today = pd.Timestamp.now().normalize() if choice == "Last 7 days": cutoff = today - pd.Timedelta(days=7) elif choice == "Last 30 days": cutoff = today - pd.Timedelta(days=30) elif choice == "Last 90 days": cutoff = today - pd.Timedelta(days=90) else: cutoff = today - pd.Timedelta(days=365) mask = df_local[column] >= cutoff df_filtered = df_local[mask] else: # Specific Years years = sorted(valid.dt.year.unique()) selected_years = st.multiselect( f"Years for {column}", years, default=years[-3:] if len(years) >= 3 else years, key=f"dt_years_{column}" ) mask = df_local[column].dt.year.isin(selected_years) df_filtered = df_local[mask] with col2: # Preview distribution bar using helper try: start = df_filtered[column].min() if not df_filtered.empty else valid.min() end = df_filtered[column].max() if not df_filtered.empty else valid.max() if pd.notna(start) and pd.notna(end): DataProcessor._visualize_date_distribution(df_local, column, start, end) except Exception: pass return df_filtered @staticmethod def _apply_text_filter(df: pd.DataFrame, column: str) -> pd.DataFrame: """Apply flexible text filtering: contains/starts/ends/regex/length.""" col1, col2 = st.columns([1, 2]) with col1: mode = st.radio( f"Text filter for {column}", ["Contains", "Starts with", "Ends with", "Regex", "Length"], key=f"txt_mode_{column}" ) if mode == "Length": lengths = df[column].astype(str).str.len() min_len = int(lengths.min() if len(lengths) else 0) max_len = int(lengths.max() if len(lengths) else 0) lo, hi = st.slider( f"Length range for {column}", min_len, max_len, (min_len, max_len), key=f"txt_len_{column}" ) mask = lengths.between(lo, hi) return df[mask] else: query = st.text_input(f"Search in {column}", key=f"txt_q_{column}") if not query: return df series = df[column].astype(str) try: if mode == "Contains": mask = series.str.contains(query, case=False, na=False) elif mode == "Starts with": mask = series.str.startswith(query, na=False) elif mode == "Ends with": mask = series.str.endswith(query, na=False) else: # Regex mask = series.str.contains(query, regex=True, na=False) except Exception: st.warning("Invalid pattern; no filter applied") return df return df[mask] @staticmethod def _prepare_for_mapping(df: pd.DataFrame) -> pd.DataFrame: """Prepare dataframe for mapping libraries by converting datetime columns to strings""" df_map = df.copy() for col in df_map.columns: if is_datetime64_any_dtype(df_map[col]): try: df_map[col] = df_map[col].dt.strftime('%Y-%m-%d %H:%M:%S') except Exception: # If conversion fails, leave column as-is pass return df_map