| from ibm_watsonx_ai import APIClient, Credentials |
| from typing import Dict, Optional, List, Union, Any, Set |
| import pandas as pd |
| import marimo as mo |
| import json |
| import glob |
| import io |
| import os |
|
|
| def get_cred_value(key, creds_var_name="baked_in_creds", default=""): |
| """ |
| Helper function to safely get a value from a credentials dictionary. |
| |
| Searches for credentials in: |
| 1. Global variables with the specified variable name |
| 2. Imported modules containing the specified variable name |
| |
| Args: |
| key: The key to look up in the credentials dictionary. |
| creds_var_name: The variable name of the credentials dictionary. |
| default: The default value to return if the key is not found. |
| Returns: |
| The value from the credentials dictionary if it exists and contains the key, |
| otherwise returns the default value. |
| """ |
| |
| if creds_var_name in globals(): |
| creds_dict = globals()[creds_var_name] |
| if isinstance(creds_dict, dict) and key in creds_dict: |
| return creds_dict[key] |
| |
| |
| import sys |
| for module_name, module_obj in sys.modules.items(): |
| if hasattr(module_obj, creds_var_name): |
| creds_dict = getattr(module_obj, creds_var_name) |
| if isinstance(creds_dict, dict) and key in creds_dict: |
| return creds_dict[key] |
| |
| return default |
|
|
| def get_key_by_value(dictionary, value): |
| for key, val in dictionary.items(): |
| if val == value: |
| return key |
| return None |
|
|
| def markdown_spacing(number): |
| """Convert a number to that many ' ' characters.""" |
| return ' ' * number |
|
|
| def wrap_with_spaces(text_to_wrap, prefix_spaces=2, suffix_spaces=2): |
| """Wrap text with non-breaking spaces on either side.""" |
| prefix = markdown_spacing(prefix_spaces) if prefix_spaces > 0 else "" |
| suffix = markdown_spacing(suffix_spaces) if suffix_spaces > 0 else "" |
| return f"{prefix}{text_to_wrap}{suffix}" |
| |
|
|
| def load_file_dataframe(file, file_extension, sheet_selector=None, excel_data=None, header_row=0): |
| """ |
| Load a dataframe from an uploaded file with customizable header and row skipping. |
| |
| Parameters: |
| ----------- |
| file : marimo.ui.file object |
| The file upload component containing the file data |
| file_extension : str |
| The extension of the uploaded file (.xlsx, .xls, .csv, .json) |
| sheet_selector : marimo.ui.dropdown, optional |
| Dropdown component for selecting Excel sheets |
| excel_data : BytesIO, optional |
| BytesIO object containing Excel data |
| header_row : int, optional |
| Row index to use as column headers (0-based). Default is 0 (first row). |
| Use None to have pandas generate default column names. |
| |
| Returns: |
| -------- |
| tuple |
| (pandas.DataFrame, list) - The loaded dataframe and list of column names |
| """ |
| |
| dataframe = pd.DataFrame([]) |
| column_names = [] |
| |
| if file.contents(): |
| |
| if file_extension in ['.xlsx', '.xls'] and sheet_selector is not None and sheet_selector.value: |
| |
| excel_data.seek(0) |
| dataframe = pd.read_excel( |
| excel_data, |
| sheet_name=sheet_selector.value, |
| header=header_row, |
| engine="openpyxl" if file_extension == '.xlsx' else "xlrd" |
| ) |
| column_names = list(dataframe.columns) |
| elif file_extension == '.csv': |
| |
| csv_data = io.StringIO(file.contents().decode('utf-8')) |
| dataframe = pd.read_csv(csv_data, header=header_row) |
| column_names = list(dataframe.columns) |
| elif file_extension == '.json': |
| |
| try: |
| json_data = json.loads(file.contents().decode('utf-8')) |
| |
| if isinstance(json_data, list): |
| dataframe = pd.DataFrame(json_data) |
| elif isinstance(json_data, dict): |
| |
| if any(isinstance(v, (dict, list)) for v in json_data.values()): |
| |
| dataframe = pd.json_normalize(json_data) |
| else: |
| |
| dataframe = pd.DataFrame([json_data]) |
| column_names = list(dataframe.columns) |
| except Exception as e: |
| print(f"Error parsing JSON: {e}") |
| |
| return dataframe, column_names |
|
|
|
|
| def create_parameter_table(input_list, column_name="Active Options", label="Select the Parameters to set to Active", |
| selection_type="multi-cell", text_justify="center"): |
| """ |
| Creates a marimo table for parameter selection. |
| |
| Args: |
| input_list: List of parameter names to display in the table |
| column_name: Name of the column (default: "Active Options") |
| label: Label for the table (default: "Select the Parameters to set to Active:") |
| selection_type: Selection type, either "single-cell" or "multi-cell" (default: "multi-cell") |
| text_justify: Text justification for the column (default: "center") |
| |
| Returns: |
| A marimo table configured for parameter selection |
| """ |
| import marimo as mo |
|
|
| |
| if selection_type not in ["single-cell", "multi-cell"]: |
| raise ValueError("selection_type must be either 'single-cell' or 'multi-cell'") |
|
|
| |
| if text_justify not in ["left", "center", "right"]: |
| raise ValueError("text_justify must be one of: 'left', 'center', 'right'") |
|
|
| |
| parameter_table = mo.ui.table( |
| label=f"**{label}**", |
| data={column_name: input_list}, |
| selection=selection_type, |
| text_justify_columns={column_name: text_justify} |
| ) |
|
|
| return parameter_table |
|
|
| def get_cell_values(parameter_options): |
| """ |
| Extract active parameter values from a mo.ui.table. |
| |
| Args: |
| parameter_options: A mo.ui.table with cell selection enabled |
| |
| Returns: |
| Dictionary mapping parameter names to boolean values (True/False) |
| """ |
| |
| all_params = set() |
|
|
| |
| if hasattr(parameter_options, 'data'): |
| table_data = parameter_options.data |
|
|
| |
| if hasattr(table_data, 'shape') and hasattr(table_data, 'iloc'): |
| for i in range(table_data.shape[0]): |
| |
| if table_data.shape[1] > 0: |
| param = table_data.iloc[i, 0] |
| if param and isinstance(param, str): |
| all_params.add(param) |
|
|
| |
| elif isinstance(table_data, dict): |
| |
| if len(table_data) > 0: |
| col_name = next(iter(table_data)) |
| for param in table_data[col_name]: |
| if param and isinstance(param, str): |
| all_params.add(param) |
|
|
| |
| result = {param: False for param in all_params} |
|
|
| |
| if hasattr(parameter_options, 'value') and parameter_options.value is not None: |
| selected_cells = parameter_options.value |
|
|
| |
| for cell in selected_cells: |
| if hasattr(cell, 'value') and cell.value in result: |
| result[cell.value] = True |
| elif isinstance(cell, dict) and 'value' in cell and cell['value'] in result: |
| result[cell['value']] = True |
| elif isinstance(cell, str) and cell in result: |
| result[cell] = True |
|
|
| return result |
|
|
| def convert_table_to_json_docs(df, selected_columns=None): |
| """ |
| Convert a pandas DataFrame or dictionary to a list of JSON documents. |
| Dynamically includes columns based on user selection. |
| Column names are standardized to lowercase with underscores instead of spaces |
| and special characters removed. |
| |
| Args: |
| df: The DataFrame or dictionary to process |
| selected_columns: List of column names to include in the output documents |
| |
| Returns: |
| list: A list of dictionaries, each representing a row as a JSON document |
| """ |
| import pandas as pd |
| import re |
|
|
| def standardize_key(key): |
| """Convert a column name to lowercase with underscores instead of spaces and no special characters""" |
| if not isinstance(key, str): |
| return str(key).lower() |
| |
| key = key.lower().replace(' ', '_') |
| |
| return re.sub(r'[^\w]', '', key) |
|
|
| |
| if isinstance(df, dict): |
| |
| if selected_columns: |
| return [{standardize_key(k): df.get(k, None) for k in selected_columns}] |
| else: |
| |
| return [{standardize_key(k): v for k, v in df.items()}] |
|
|
| |
| if df is None: |
| return [] |
|
|
| |
| if not isinstance(df, pd.DataFrame): |
| try: |
| df = pd.DataFrame(df) |
| except: |
| return [] |
|
|
| |
| if df.empty: |
| return [] |
|
|
| |
| if isinstance(selected_columns, dict): |
| |
| selected_columns = [col for col, include in selected_columns.items() if include] |
|
|
| |
| if not selected_columns or not isinstance(selected_columns, list) or len(selected_columns) == 0: |
| selected_columns = list(df.columns) |
|
|
| |
| available_columns = [] |
| columns_lower = {col.lower(): col for col in df.columns if isinstance(col, str)} |
|
|
| for col in selected_columns: |
| if col in df.columns: |
| available_columns.append(col) |
| elif isinstance(col, str) and col.lower() in columns_lower: |
| available_columns.append(columns_lower[col.lower()]) |
|
|
| |
| if not available_columns: |
| return [] |
|
|
| |
| json_docs = [] |
| for _, row in df.iterrows(): |
| doc = {} |
| for col in available_columns: |
| value = row[col] |
| |
| std_col = standardize_key(col) |
| doc[std_col] = None if pd.isna(value) else value |
| json_docs.append(doc) |
|
|
| return json_docs |
|
|
| def filter_models_by_function(resources, function_type="prompt_chat"): |
| """ |
| Filter model IDs from resources list that have a specific function type |
| |
| Args: |
| resources (list): List of model resource objects |
| function_type (str, optional): Function type to filter by. Defaults to "prompt_chat". |
| |
| Returns: |
| list: List of model IDs that have the specified function |
| """ |
| filtered_model_ids = [] |
|
|
| if not resources or not isinstance(resources, list): |
| return filtered_model_ids |
|
|
| for model in resources: |
| |
| if "functions" in model and isinstance(model["functions"], list): |
| |
| has_function = any( |
| func.get("id") == function_type |
| for func in model["functions"] |
| if isinstance(func, dict) |
| ) |
|
|
| if has_function and "model_id" in model: |
| filtered_model_ids.append(model["model_id"]) |
|
|
| return filtered_model_ids |
|
|
|
|
| def get_model_selection_table(client=None, model_type="all", filter_functionality=None, selection_mode="single-cell"): |
| """ |
| Creates and displays a table for model selection based on specified parameters. |
| |
| Args: |
| client: The client object for API calls. If None, returns default models. |
| model_type (str): Type of models to display. Options: "all", "chat", "embedding". |
| filter_functionality (str, optional): Filter models by functionality type. |
| Options include: "image_chat", "text_chat", "autoai_rag", |
| "text_generation", "multilingual", etc. |
| selection_mode (str): Mode for selecting table entries. Options: "single", "single-cell". |
| Defaults to "single-cell". |
| |
| Returns: |
| The selected model ID from the displayed table. |
| """ |
| |
| default_models = ['mistralai/mistral-large'] |
| |
| if client is None: |
| |
| available_models = default_models |
| selection = mo.ui.table( |
| available_models, |
| selection="single", |
| label="Select a model to use.", |
| page_size=30, |
| ) |
| return selection |
| |
| |
| if model_type == "chat": |
| model_specs = client.foundation_models.get_chat_model_specs() |
| elif model_type == "embedding": |
| model_specs = client.foundation_models.get_embeddings_model_specs() |
| else: |
| model_specs = client.foundation_models.get_model_specs() |
| |
| |
| resources = model_specs.get("resources", []) |
| |
| |
| if filter_functionality and resources: |
| model_id_list = filter_models_by_function(resources, filter_functionality) |
| else: |
| |
| model_id_list = [resource["model_id"] for resource in resources] |
| |
| |
| if not model_id_list: |
| model_id_list = default_models |
| |
| |
| model_selector = mo.ui.table( |
| model_id_list, |
| selection=selection_mode, |
| label="Select a model to use.", |
| page_size=30, |
| initial_selection = [("0", "value")] if selection_mode == "single-cell" else [0] |
| |
| ) |
| |
| return model_selector, resources, model_id_list |
|
|
| def _enforce_model_selection(model_selection, model_id_list): |
| |
| if not model_selection.value: |
| |
| model = 0 |
| model_selection._value = model_id_list[model] |
| print(model_selection.value) |
| return model_selection.value |
| |
| def update_max_tokens_limit(model_selection, resources, model_id_list): |
| |
| default_max_tokens = 4096 |
|
|
| try: |
| |
| if model_selection.value is None or not hasattr(model_selection, 'value'): |
| print("No model selection or selection has no value") |
| return default_max_tokens |
|
|
| if not resources or not isinstance(resources, list) or len(resources) == 0: |
| print("Resources is empty or not a list") |
| return default_max_tokens |
|
|
| |
| selected_value = model_selection.value |
| print(f"Raw selection value: {selected_value}") |
|
|
| |
| if isinstance(selected_value, list) and len(selected_value) > 0: |
| if isinstance(selected_value[0], int) and 0 <= selected_value[0] < len(model_id_list): |
| selected_model_id = model_id_list[selected_value[0]] |
| else: |
| selected_model_id = str(selected_value[0]) |
| else: |
| selected_model_id = str(selected_value) |
|
|
| print(f"Selected model ID: {selected_model_id}") |
|
|
| |
| for model in resources: |
| model_id = model.get("model_id") |
| if model_id == selected_model_id: |
| if "model_limits" in model and "max_output_tokens" in model["model_limits"]: |
| return model["model_limits"]["max_output_tokens"] |
| break |
|
|
| except Exception as e: |
| print(f"Error: {e}") |
|
|
| return default_max_tokens |
|
|
|
|
| def load_templates( |
| folder_path: str, |
| file_extensions: Optional[List[str]] = None, |
| strip_whitespace: bool = True |
| ) -> Dict[str, str]: |
| """ |
| Load template files from a specified folder into a dictionary. |
| |
| Args: |
| folder_path: Path to the folder containing template files |
| file_extensions: List of file extensions to include (default: ['.txt', '.md']) |
| strip_whitespace: Whether to strip leading/trailing whitespace from templates (default: True) |
| |
| Returns: |
| Dictionary with filename (without extension) as key and file content as value |
| """ |
| |
| if file_extensions is None: |
| file_extensions = ['.txt', '.md'] |
|
|
| |
| file_extensions = [ext if ext.startswith('.') else f'.{ext}' for ext in file_extensions] |
|
|
| templates = {"empty": " "} |
|
|
| |
| patterns = [os.path.join(folder_path, f'*{ext}') for ext in file_extensions] |
|
|
| |
| for pattern in patterns: |
| for file_path in glob.glob(pattern): |
| try: |
| |
| filename = os.path.basename(file_path) |
| template_name = os.path.splitext(filename)[0] |
|
|
| |
| with open(file_path, 'r', encoding='utf-8') as file: |
| content = file.read() |
|
|
| |
| if strip_whitespace: |
| content = content.strip() |
|
|
| templates[template_name] = content |
|
|
| except Exception as e: |
| print(f"Error loading template from {file_path}: {str(e)}") |
|
|
| return templates |
|
|