| """ |
| SAP-RPT-1-OSS Playground Utilities |
| |
| Functions for handling dataset uploads, previews, training, and results export. |
| """ |
|
|
| import pandas as pd |
| import numpy as np |
| from pathlib import Path |
| from typing import Tuple, Optional, Dict, Any |
| import tempfile |
| import os |
|
|
|
|
| def load_dataset(file_path: str) -> Tuple[pd.DataFrame, Optional[str]]: |
| """ |
| Load dataset from file (CSV, Parquet, or JSON). |
| |
| Args: |
| file_path: Path to the dataset file |
| |
| Returns: |
| Tuple of (DataFrame, error_message) |
| """ |
| try: |
| file_ext = Path(file_path).suffix.lower() |
| |
| if file_ext == '.csv': |
| df = pd.read_csv(file_path) |
| elif file_ext == '.parquet': |
| df = pd.read_parquet(file_path) |
| elif file_ext in ['.json', '.jsonl']: |
| df = pd.read_json(file_path, lines=(file_ext == '.jsonl')) |
| else: |
| return None, f"Unsupported file format: {file_ext}" |
| |
| return df, None |
| except Exception as e: |
| return None, f"Error loading file: {str(e)}" |
|
|
|
|
| def detect_task_type(filename: str) -> str: |
| """ |
| Detect task type from filename. |
| |
| Args: |
| filename: Name of the uploaded file |
| |
| Returns: |
| 'classification' or 'regression' |
| """ |
| filename_lower = filename.lower() |
| if 'classification' in filename_lower: |
| return 'classification' |
| elif 'regression' in filename_lower: |
| return 'regression' |
| return 'classification' |
|
|
|
|
| def get_dataset_info(df: pd.DataFrame) -> Dict[str, Any]: |
| """ |
| Get information about the dataset. |
| |
| Args: |
| df: DataFrame to analyze |
| |
| Returns: |
| Dictionary with dataset information |
| """ |
| info = { |
| 'num_rows': len(df), |
| 'num_columns': len(df.columns), |
| 'columns': list(df.columns), |
| 'numeric_columns': list(df.select_dtypes(include=[np.number]).columns), |
| 'categorical_columns': list(df.select_dtypes(include=['object', 'category']).columns), |
| 'missing_values': df.isnull().sum().to_dict(), |
| 'dtypes': df.dtypes.astype(str).to_dict() |
| } |
| return info |
|
|
|
|
| def auto_select_target_column(df: pd.DataFrame, task_type: str) -> Optional[str]: |
| """ |
| Auto-select target column (defaults to last column). |
| |
| Args: |
| df: DataFrame |
| task_type: 'classification' or 'regression' |
| |
| Returns: |
| Column name or None |
| """ |
| if len(df.columns) == 0: |
| return None |
| |
| |
| target = df.columns[-1] |
| |
| |
| if task_type == 'regression': |
| numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist() |
| if numeric_cols: |
| |
| for col in reversed(numeric_cols): |
| if col == target or df[col].dtype in [np.float64, np.int64]: |
| return col |
| |
| return target |
|
|
|
|
| def detect_task_type_from_column(df: pd.DataFrame, target_column: str) -> str: |
| """ |
| Detect task type from target column's data type. |
| |
| Args: |
| df: DataFrame |
| target_column: Name of target column |
| |
| Returns: |
| 'classification' or 'regression' |
| """ |
| if target_column not in df.columns: |
| return 'classification' |
| |
| target_series = df[target_column] |
| |
| |
| if not pd.api.types.is_numeric_dtype(target_series): |
| return 'classification' |
| |
| |
| try: |
| unique_values = target_series.dropna().nunique() |
| if unique_values <= 20: |
| |
| sample = target_series.dropna().head(100) |
| int_values = sample.astype(int) |
| float_values = sample.astype(float) |
| if (int_values == float_values).all(): |
| return 'classification' |
| |
| |
| return 'regression' |
| except: |
| return 'regression' |
|
|
|
|
| def prepare_train_test_split( |
| df: pd.DataFrame, |
| target_column: str, |
| test_size: float = 0.2 |
| ) -> Tuple[pd.DataFrame, pd.Series, pd.DataFrame, pd.Series]: |
| """ |
| Prepare train/test split. |
| |
| Args: |
| df: Full dataset |
| target_column: Name of target column |
| test_size: Proportion of test set (0.1 to 0.5) |
| |
| Returns: |
| Tuple of (X_train, y_train, X_test, y_test) |
| """ |
| if target_column not in df.columns: |
| raise ValueError(f"Target column '{target_column}' not found in dataset") |
| |
| |
| X = df.drop(columns=[target_column]) |
| y = df[target_column] |
| |
| |
| valid_mask = ~y.isnull() |
| X = X[valid_mask] |
| y = y[valid_mask] |
| |
| |
| n_total = len(X) |
| n_test = int(n_total * test_size) |
| n_train = n_total - n_test |
| |
| if n_test == 0: |
| n_test = 1 |
| n_train = n_total - 1 |
| |
| |
| X_train = X.iloc[:n_train] |
| y_train = y.iloc[:n_train] |
| X_test = X.iloc[n_train:n_train + n_test] |
| y_test = y.iloc[n_train:n_train + n_test] |
| |
| return X_train, y_train, X_test, y_test |
|
|
|
|
| def preprocess_data( |
| df: pd.DataFrame, |
| handle_missing: str = 'mean', |
| normalize: bool = False |
| ) -> pd.DataFrame: |
| """ |
| Preprocess dataset. |
| |
| Args: |
| df: DataFrame to preprocess |
| handle_missing: How to handle missing values ('mean', 'median', 'drop', 'zero') |
| normalize: Whether to normalize numeric columns |
| |
| Returns: |
| Preprocessed DataFrame |
| """ |
| df_processed = df.copy() |
| |
| |
| numeric_cols = df_processed.select_dtypes(include=[np.number]).columns |
| |
| if handle_missing == 'mean': |
| df_processed[numeric_cols] = df_processed[numeric_cols].fillna( |
| df_processed[numeric_cols].mean() |
| ) |
| elif handle_missing == 'median': |
| df_processed[numeric_cols] = df_processed[numeric_cols].fillna( |
| df_processed[numeric_cols].median() |
| ) |
| elif handle_missing == 'zero': |
| df_processed[numeric_cols] = df_processed[numeric_cols].fillna(0) |
| elif handle_missing == 'drop': |
| df_processed = df_processed.dropna(subset=numeric_cols) |
| |
| |
| if normalize: |
| from sklearn.preprocessing import StandardScaler |
| scaler = StandardScaler() |
| df_processed[numeric_cols] = scaler.fit_transform(df_processed[numeric_cols]) |
| |
| return df_processed |
|
|
|
|
| def export_results( |
| X_test: pd.DataFrame, |
| y_test: pd.Series, |
| predictions: np.ndarray, |
| task_type: str, |
| filename_prefix: str = "results" |
| ) -> str: |
| """ |
| Export results to CSV file. |
| |
| Args: |
| X_test: Test features |
| y_test: True target values |
| predictions: Model predictions |
| task_type: 'classification' or 'regression' |
| filename_prefix: Prefix for output filename |
| |
| Returns: |
| Path to exported CSV file |
| """ |
| |
| results_df = X_test.copy() |
| results_df['true_value'] = y_test.values |
| |
| if task_type == 'classification': |
| results_df['predicted_class'] = predictions |
| else: |
| |
| results_df['predicted_value'] = predictions |
| |
| |
| temp_dir = tempfile.gettempdir() |
| output_path = os.path.join(temp_dir, f"{filename_prefix}_results.csv") |
| |
| results_df.to_csv(output_path, index=False) |
| |
| return output_path |
|
|
|
|
| def check_embedding_server() -> Tuple[bool, str]: |
| """ |
| Check if embedding server is running. |
| |
| Returns: |
| Tuple of (is_running, message) |
| """ |
| |
| try: |
| import zmq |
| except ImportError: |
| return False, "pyzmq package not installed. Install with: pip install pyzmq" |
| |
| |
| try: |
| context = zmq.Context() |
| socket = context.socket(zmq.REQ) |
| socket.setsockopt(zmq.LINGER, 0) |
| socket.settimeout(1000) |
| socket.connect("tcp://localhost:5555") |
| socket.send_string("ping") |
| |
| |
| poller = zmq.Poller() |
| poller.register(socket, zmq.POLLIN) |
| if poller.poll(1000): |
| response = socket.recv_string() |
| socket.close() |
| context.term() |
| return True, "Embedding server is running and responding" |
| else: |
| socket.close() |
| context.term() |
| return False, "Embedding server not responding on port 5555" |
| except zmq.ZMQError as e: |
| if "Connection refused" in str(e) or "No such file or directory" in str(e): |
| return False, "Embedding server is not running" |
| return False, f"Connection error: {str(e)}" |
| except Exception as e: |
| return False, f"Error checking embedding server: {str(e)}" |
|
|
|
|
| def is_sap_rpt_oss_installed() -> bool: |
| """Check if sap-rpt-oss package is installed.""" |
| try: |
| import sap_rpt_oss |
| return True |
| except ImportError: |
| try: |
| |
| from sap_rpt_oss import SAP_RPT_OSS_Classifier |
| return True |
| except ImportError: |
| return False |
|
|
|
|
| def start_embedding_server(gpu_idx: Optional[int] = None) -> Tuple[bool, str]: |
| """ |
| Start the embedding server automatically. |
| |
| Args: |
| gpu_idx: GPU index to use (None for CPU) |
| |
| Returns: |
| Tuple of (success, message) |
| """ |
| |
| if not is_sap_rpt_oss_installed(): |
| return False, "sap-rpt-oss package not found. Install with: pip install git+https://github.com/SAP-samples/sap-rpt-1-oss" |
| |
| |
| is_running, _ = check_embedding_server() |
| if is_running: |
| return True, "Embedding server is already running" |
| |
| try: |
| |
| start_func = None |
| import_paths = [ |
| "sap_rpt_oss.scripts.start_embedding_server", |
| "sap_rpt_oss.start_embedding_server", |
| "sap_rpt_oss.data.tokenizer", |
| ] |
| |
| for import_path in import_paths: |
| try: |
| module = __import__(import_path, fromlist=['start_embedding_server']) |
| if hasattr(module, 'start_embedding_server'): |
| start_func = getattr(module, 'start_embedding_server') |
| break |
| except (ImportError, AttributeError): |
| continue |
| |
| if start_func is None: |
| |
| |
| return False, "Embedding server will start automatically when the model makes predictions. No manual start needed." |
| |
| |
| import threading |
| |
| def run_server(): |
| try: |
| start_func( |
| sentence_embedding_model_name="sentence-transformers/all-MiniLM-L6-v2", |
| gpu_idx=gpu_idx |
| ) |
| except Exception as e: |
| |
| pass |
| |
| |
| server_thread = threading.Thread(target=run_server, daemon=True) |
| server_thread.start() |
| |
| |
| import time |
| time.sleep(3) |
| |
| |
| is_running, message = check_embedding_server() |
| if is_running: |
| return True, "Embedding server started successfully in background" |
| else: |
| |
| return False, "Server thread started. The embedding server will be available when the model needs it (starts on-demand during predictions)." |
| |
| except Exception as e: |
| |
| return False, f"Manual start not available: {str(e)}. The embedding server will start automatically when the model makes predictions." |
|
|
|
|
| def ensure_embedding_server_running() -> Tuple[bool, str]: |
| """ |
| Ensure embedding server is running, start it if not. |
| Note: SAP-RPT-OSS may start the server automatically when needed. |
| |
| Returns: |
| Tuple of (is_running, message) |
| """ |
| is_running, message = check_embedding_server() |
| if is_running: |
| return True, message |
| |
| |
| success, start_message = start_embedding_server(None) |
| if success: |
| return True, f"Auto-started: {start_message}" |
| else: |
| |
| |
| return False, f"Server not currently running. {start_message}" |
|
|
|
|