import streamlit as st import configparser import os import pandas as pd import requests from google.cloud import bigquery from google.oauth2 import service_account import logging from datetime import datetime from html_templates import button_styles global_user_data = [] # For appending user_data global_data_rows = [] # For appending data rows # Function to initialize session state def initialize_session_state(): if 'data' not in st.session_state: st.session_state.data = [] if 'alphabet_counter' not in st.session_state: st.session_state.alphabet_counter = 0 if 'serial_counter' not in st.session_state: st.session_state.serial_counter = -1 if 'company_name' not in st.session_state: st.session_state['company_name'] = "Company name not available" if 'global_data_rows' not in st.session_state: st.session_state.global_data_rows = [] if 'new_mapping' not in st.session_state: st.session_state['new_mapping'] = {} if 'file_data' not in st.session_state: st.session_state['file_data'] = None if 'mapping_mode' not in st.session_state: st.session_state['mapping_mode'] = None # None, 'default', 'custom' if 'button_clicked' not in st.session_state: st.session_state['button_clicked'] = None if 'business_head' not in st.session_state: st.session_state['business_head'] = None if 'company_id' not in st.session_state: st.session_state['company_id'] = None if 'company_name' not in st.session_state: st.session_state['company_name'] = None if 'currency' not in st.session_state: st.session_state['currency'] = None if 'bundle_by' not in st.session_state: st.session_state['bundle_by'] = None if 'plan_name' not in st.session_state: st.session_state['plan_name'] = None if 'plan_description' not in st.session_state: st.session_state['plan_description'] = None if 'selected_ordering_channel' not in st.session_state: st.session_state['selected_ordering_channel'] = None if 'fulfilling_location' not in st.session_state: st.session_state['fulfilling_location'] = None if 'application_id' not in st.session_state: st.session_state['application_id'] = None if 'selected_fee_type' not in st.session_state: st.session_state['selected_fee_type'] = None if 'selected_variable_type' not in st.session_state: st.session_state['selected_variable_type'] = None if 'selected_chargeable_on' not in st.session_state: st.session_state['selected_chargeable_on'] = None if 'selected_fee_nature' not in st.session_state: st.session_state['selected_fee_nature'] = None if 'user_input_3' not in st.session_state: st.session_state['user_input_3'] = None if 'usage_3' not in st.session_state: st.session_state['usage_3'] = None if 'Capping_or_Minimum_Guarantee_3' not in st.session_state: st.session_state['Capping_or_Minimum_Guarantee_3'] = None if 'threshold' not in st.session_state: st.session_state['threshold'] = None if 'expected_billing' not in st.session_state: st.session_state['expected_billing'] = None if 'selected_payment_method' not in st.session_state: st.session_state['selected_payment_method'] = None if 'selected_plan_validity' not in st.session_state: st.session_state['selected_plan_validity'] = None if 'final_billing' not in st.session_state: st.session_state['final_billing'] = None if 'fee_reversal' not in st.session_state: st.session_state['fee_reversal'] = None if 'reversal_per' not in st.session_state: st.session_state['reversal_per'] = None if 'data' not in st.session_state: st.session_state['data'] = [] # Function to save user data def save_user_data(user_data): if 'data' not in st.session_state: st.session_state['data'] = [] st.session_state.data.append(user_data) # Function to save Slab data def save_slab_data(new_row): st.session_state.data.append(new_row) # Function to generate PM_id def generate_rule_id(user_data): company_id = user_data.get('Company ID', '') ordering_channels = user_data.get('Product lines', '') fulfilling_location = user_data.get('Fulfilling Location', '') application_id = user_data.get('Application ID', '') selected_fee_type = user_data.get('Fee Type', '') selected_variable_type = user_data.get('Variable Type', '') selected_chargeable_on = user_data.get('Chargeable on', '') selected_fee_nature = user_data.get('Fee Nature', '') threshold = user_data.get('Threshold option', '') # Ensure all fields are strings and handle None or empty values company_id = str(company_id).zfill(5)[:5] if company_id else '00000' ordering_channels = str(ordering_channels)[:2] if ordering_channels else '00' fulfilling_location = str(fulfilling_location)[:2] if fulfilling_location else '00' application_id = str(application_id)[:2] if application_id else '00' selected_fee_type = str(selected_fee_type)[:2] if selected_fee_type else '00' selected_variable_type = str(selected_variable_type)[:2] if selected_variable_type else '00' selected_chargeable_on = str(selected_chargeable_on)[:2] if selected_chargeable_on else '00' selected_fee_nature = str(selected_fee_nature)[:2] if selected_fee_nature else '00' threshold = str(threshold)[:2] if threshold else '00' # Extract first 2 characters from each field and ensure proper length rule_id_parts = [ company_id, # Ensure 5 characters ordering_channels, # Ensure 2 characters fulfilling_location, # Ensure 2 characters application_id, # Ensure 2 characters selected_fee_type, # Ensure 2 characters selected_variable_type, # Ensure 2 characters selected_chargeable_on, # Ensure 2 characters selected_fee_nature, # Ensure 2 characters threshold # Ensure 2 characters ] rule_id = '_'.join(rule_id_parts) # Join parts with an underscore return rule_id # Function to generate Plan ID def generate_plan_id(user_data): initialize_session_state() # Ensure session state is initialized alphabet_counter = st.session_state.alphabet_counter serial_counter = st.session_state.serial_counter company_id = user_data.get('Company ID', '') # Get company_id from user_data if company_id is None or not company_id.strip(): company_id = '' # Increment serial counter and reset if it reaches 999999 serial_counter += 1 if serial_counter > 999999: serial_counter = 0 alphabet_counter += 1 # Save updated counters back to session state st.session_state['alphabet_counter'] = alphabet_counter st.session_state['serial_counter'] = serial_counter # Calculate alphabet ('a' to 'z') current_alphabet = chr(alphabet_counter % 26 + ord('a')) # Format serial counter as six-digit number serial_number = str(serial_counter).zfill(6) # Get the first 5 characters of company_id and pad it with zeros company_id_part = company_id[:5].zfill(5) # Generate the plan_id plan_id = f"{company_id_part}_{current_alphabet}_{serial_number}" return plan_id # Function to display saved records as a table and save to CSV def display_saved_records(): df = pd.DataFrame(st.session_state.data) if not df.empty: # Apply generate_rule_id function to each row to generate rule_id df['rule_id'] = df.apply(lambda row: generate_rule_id(row), axis=1) # Display DataFrame with increased height # st.write(df.style.set_table_attributes('style="font-size: 14px; line-height: 18px; width: auto; height: auto;"')) # Save records to CSV folder_path = "data" # Folder name os.makedirs(folder_path, exist_ok=True) # Create folder if it doesn't exist file_path = os.path.join(folder_path, "records.csv") # Save DataFrame to CSV without index df.to_csv(file_path, index=False) # Custom CSS styles button_styles = """ """ # Define file path (replace with your actual file path) folder_path = "/Users/ninadmandavkar/Desktop/Fynd/plan_maker_2/data/" file_path = os.path.join(folder_path, "records.csv") # Function to read file data def read_file(file_path): with open(file_path, 'rb') as f: file_data = f.read() return file_data # Download button with custom styling # st.download_button( # label="Download CSV", # data=read_file(file_path), # file_name="records.csv", # mime="text/csv", # ) # Function to load theme configuration def load_theme_config(): config = configparser.ConfigParser() config.read('config.toml') # Extract theme settings theme_settings = config['theme'] return theme_settings # Function to handle custom button click def custom_button_click(): st.session_state.button_clicked = 'custom' # Function to handle default button click def default_button_click(): st.session_state.button_clicked = 'default' def uploader(uploaded_file): try: # Read the file, treating the first row as the header if uploaded_file.name.endswith('.csv'): data = pd.read_csv(uploaded_file, header=0) elif uploaded_file.name.endswith('.xlsx'): data = pd.read_excel(uploaded_file, header=0) # Rename the headers new_headers = ['ordering_channel', 'fee_type', 'variable_type', 'chargeable_on'] data.columns = new_headers # Save the data to session state st.session_state['file_data'] = data # Initialize the mapping new_mapping = {} # Build the mapping for _, row in data.iterrows(): ordering_channel = row['ordering_channel'] fee_type = row['fee_type'] variable_type = row['variable_type'] chargeable_on = row['chargeable_on'] if ordering_channel not in new_mapping: new_mapping[ordering_channel] = {} if fee_type not in new_mapping[ordering_channel]: new_mapping[ordering_channel][fee_type] = {} if variable_type not in new_mapping[ordering_channel][fee_type]: new_mapping[ordering_channel][fee_type][variable_type] = [] if chargeable_on not in new_mapping[ordering_channel][fee_type][variable_type]: new_mapping[ordering_channel][fee_type][variable_type].append(chargeable_on) st.session_state['new_mapping'] = new_mapping st.session_state['mapping_mode'] = 'custom' except Exception as e: st.error(f"Error reading the file: {e}") def render_selectboxes(session_state, new_mapping): selected_ordering_channel = st.sidebar.selectbox( "Product lines", [""] + list(new_mapping.keys()), help="Select a Product line", key='ordering_channel' ) if selected_ordering_channel: selected_ordering_channel = str(selected_ordering_channel) or int(selected_ordering_channel) # Determine fulfilling_location and application_id based on selected_ordering_channel if any(channel in selected_ordering_channel for channel in ["TMS", "GMC", "Catalog Cloud"]): session_state['fulfilling_location'] = None session_state['application_id'] = None else: abc = ["Store", "Warehouse"] session_state['fulfilling_location'] = st.sidebar.selectbox( "Fulfilling Location", [""] + abc, help="Select the fulfilling location" ) session_state['application_id'] = st.sidebar.text_input( "Application ID", key="application_id_input", help="Enter the application ID" ) # Use the session state variables in your app fulfilling_location = session_state['fulfilling_location'] application_id = session_state['application_id'] # 2nd layer: Select fee type based on the selected ordering channel if 'selected_fee_type' not in session_state: session_state['selected_fee_type'] = None fee_types_for_channel = new_mapping.get(selected_ordering_channel, {}).keys() fee_types_for_channel = list(fee_types_for_channel) if fee_types_for_channel: # Define selected_fee_type based on user input session_state['selected_fee_type'] = st.selectbox( "Fee Type", [""] + fee_types_for_channel, help="Fee type is a revenue head for grouping the revenue", key='fee_type' ) # Access selected_fee_type for any session state operations selected_fee_type = session_state['selected_fee_type'] # 3rd layer: Select variable type based on the selected fee type variable_type_options = [] if selected_fee_type: details = new_mapping.get(selected_ordering_channel, {}) variables = details.get(selected_fee_type, {}) variable_type_options = list(variables.keys()) session_state['selected_variable_type'] = st.selectbox( "Variable Type", [""] + variable_type_options, help="Variable type is an attribute on which revenue calculation will be done", key='variable_type' ) selected_variable_type = session_state['selected_variable_type'] # 4th layer: Select chargeable on based on the selected variable type chargeable_on_options = [] if selected_variable_type: chargeable_on_options = variables.get(selected_variable_type, []) session_state['selected_chargeable_on'] = st.selectbox( "Chargeable on", [""] + chargeable_on_options, help="Chargeable on is a service status on which the above fee will be applicable", key='chargeable_on' ) selected_chargeable_on = session_state['selected_chargeable_on'] else: st.write("No fee types available for the selected ordering channel.") else: st.write("") def slab_data(slab, max_value, user_input, Usage, Capping_or_Minimum_Guarantee, threshold): # Convert values to int if not None max_value = int(max_value.strip()) if max_value else None user_input_11 = int(user_input.strip()) if user_input else None try: user_input_22 = float(user_input_11) user_input = "{:.2f}".format(user_input_22) except (TypeError, ValueError): user_input = 0 Usage = int(Usage.strip()) if Usage else None Capping_or_Minimum_Guarantee = int(Capping_or_Minimum_Guarantee.strip()) if Capping_or_Minimum_Guarantee else None # Create a new row dictionary new_row = { "Slab": slab, "Max_value": max_value, "Commercial_value": user_input, "Usage": Usage, "Capping/Min_Guarantee_value": Capping_or_Minimum_Guarantee, "Threshold": threshold } # Append new_row to global list of rows in session state st.session_state.global_data_rows.append(new_row) def slab_entry(): # Initialize session state if not already initialized if 'global_data_rows' not in st.session_state: st.session_state.global_data_rows = [] # Inputs for adding data col1, col2 = st.columns([2, 2]) with col1: slab = st.selectbox("Slab:", [1, 2, 3], key="slab_select") # Selectbox for slab numbers user_input = st.text_input("Commercial Value:", key="user_input") Capping_or_Minimum_Guarantee = st.text_input("Capping/Min_Guarantee:", key="Capping_or_Minimum_Guarantee_input") with col2: max_value = st.text_input("Max value:", key="max_value_input") Usage = st.text_input("Usage:", key="Usage_input") threshold = st.text_input("Threshold:", key="threshold_input") # Button to add data if st.button("Add Data", key="add_data_button"): if not (slab and user_input and Capping_or_Minimum_Guarantee): st.warning("Please fill in all required fields.") else: slab_data(slab, max_value, user_input, Usage, Capping_or_Minimum_Guarantee, threshold) st.success(f"Data for Slab '{slab}' added successfully!") # Display all entries in a single table if 'global_data_rows' in st.session_state and st.session_state.global_data_rows: st.subheader("All Slabs Data Summary") for i, row in enumerate(st.session_state.global_data_rows): st.write(f"Entry {i + 1}:") st.table([row]) # Run the slab_entry function when the script is executed if __name__ == "__main__": slab_entry() def store_and_fetch_company_info(company_id): if company_id.strip(): try: company_id_int = int(company_id.strip()) st.session_state['company_id'] = company_id.strip() # Always store the company id in session state except ValueError: st.error("Invalid Company ID. Please enter a valid integer.") company_id_int = None # Construct the API URL and fetch data only if company_id_int is valid if company_id_int is not None: try: api_url = f"https://api.boltic.io/service/platform/bolt/share/v1/data?api_token=eyJpZCI6IjU4OTdmZjZlLTMwMTYtNDZjMC1hMzdlLTJmNjBkMjhjYjc5YyIsInRlbmFudCI6ImdvZnluZGNvbSJ9&batch_size=10000&display_meta=true&cmpid={company_id_int}" response = requests.get(api_url, verify=False ) if response.status_code == 200: data = response.json() if "results" in data and len(data["results"]) > 0: st.session_state['company_name'] = data["results"][0].get("company_name", "Company name not found") else: st.session_state['company_name'] = "Company name not found" else: st.session_state['company_name'] = f"Error: Unable to fetch data (Status code: {response.status_code})" except Exception as e: st.session_state['company_name'] = f"An error occurred: {e}" else: st.session_state['company_name'] = "Company ID is invalid or missing." else: st.session_state['company_name'] = "Company ID cannot be empty." def get_feature_user_data(business_head, company_id, company_name, currency, bundle_by, plan_name, plan_description, selected_ordering_channel, fulfilling_location, application_id, selected_fee_type, selected_variable_type, selected_chargeable_on, selected_fee_nature, user_input_3, usage_3, Capping_or_Minimum_Guarantee_3, threshold, expected_billing, selected_payment_method, selected_plan_validity, final_billing): return { "Business_Head": business_head, "Company_ID": company_id.strip() if company_id else None, "Company_Name": company_name, "Currency": currency, "Bundle_by": bundle_by, "Plan_Name": plan_name, "Plan_Description": plan_description, "Product_lines": selected_ordering_channel, "Fulfilling_Location": fulfilling_location, "Application_ID": application_id, "Fee_Type": selected_fee_type, "Variable_Type": selected_variable_type, "Chargeable_on": selected_chargeable_on, "Fee_Nature": selected_fee_nature, "Commercial_value": user_input_3, "Fee_reversal": 0, "Reversal_%": 0, "Usage_limit": usage_3, "Threshold_value": Capping_or_Minimum_Guarantee_3, "Threshold_option": threshold, "Expected_Billing": expected_billing, "Payment_Method": selected_payment_method, "Plan_Validity": selected_plan_validity, "Net_total_billing": int(final_billing) } def insert_into_bq(user_data, project_id, dataset_id, table_id, service_account_info): initialize_session_state() try: # Use from_service_account_info instead of from_service_account_file credentials = service_account.Credentials.from_service_account_info(service_account_info) # Initialize a BigQuery client client = bigquery.Client(credentials=credentials, project=project_id) logging.info("BigQuery client initialized successfully.") # Specify the table reference table_ref = f"{project_id}.{dataset_id}.{table_id}" table = client.get_table(table_ref) logging.info(f"Table reference obtained: {table_ref}") # Insert form data into the table rows_to_insert = [user_data] errors = client.insert_rows_json(table, rows_to_insert) if errors: logging.error(f"Errors occurred while inserting rows: {errors}") st.error(f"Errors occurred while inserting rows: {errors}") return False else: logging.info("Insert successful") st.success("Insert successful") return True except Exception as e: logging.exception("An error occurred during the BigQuery insert operation.") st.error(f"An error occurred: {e}") return False project_id = "fynd-db" dataset_id = "finance_dwh" table_id = "plan_v2" # Access session state variables safely # Retrieve variables from session state with default values business_head = st.session_state.get('business_head', '') company_id = st.session_state.get('company_id', '') company_name = st.session_state.get('company_name', '') currency = st.session_state.get('currency', '') bundle_by = st.session_state.get('bundle_by', '') plan_name = st.session_state.get('plan_name', '') plan_description = st.session_state.get('plan_description', '') selected_ordering_channel = st.session_state.get('selected_ordering_channel', '') fulfilling_location = st.session_state.get('fulfilling_location', '') application_id = st.session_state.get('application_id', '') selected_fee_type = st.session_state.get('selected_fee_type', '') selected_variable_type = st.session_state.get('selected_variable_type', '') selected_chargeable_on = st.session_state.get('selected_chargeable_on', '') selected_fee_nature = st.session_state.get('selected_fee_nature', '') user_input_3 = st.session_state.get('user_input_3', 0) # Assuming numeric default usage_3 = st.session_state.get('usage_3', 0) # Assuming numeric default Capping_or_Minimum_Guarantee_3 = st.session_state.get('Capping_or_Minimum_Guarantee_3', 0) # Assuming numeric default threshold = st.session_state.get('threshold', '') expected_billing = st.session_state.get('expected_billing', 0) # Assuming numeric default selected_payment_method = st.session_state.get('selected_payment_method', '') selected_plan_validity = st.session_state.get('selected_plan_validity', '') rule_id = st.session_state.get('rule_id', '') plan_id = st.session_state.get('plan_id', '') created_at = datetime.now().strftime('%Y-%m-%d %H:%M:%S') final_billing = st.session_state.get('final_billing', 0) # Handle the case where final_billing is None if final_billing is None: final_billing = 0 # or another default value else: final_billing = int(final_billing) user_data = { "Business_Head": business_head, "Company_ID": company_id, "Company_Name": company_name, "Currency": currency, "Bundle_by": bundle_by, "Plan_Name": plan_name, "Plan_Description": plan_description, "Product_lines": selected_ordering_channel, "Fulfilling_Location": fulfilling_location, "Application_ID": application_id, "Fee_Type": selected_fee_type, "Variable_Type": selected_variable_type, "Chargeable_on": selected_chargeable_on, "Fee_Nature": selected_fee_nature, "Commercial_value": user_input_3, "Fee_reversal": 0, "Reversal_%": 0, "Usage_limit": usage_3, "Threshold_value": Capping_or_Minimum_Guarantee_3, "Threshold_option": threshold, "Expected_Billing": expected_billing, "Payment_Method": selected_payment_method, "Plan_Validity": selected_plan_validity, "rule_id": rule_id, "plan_id": plan_id, "created_at": created_at, "Net_total_billing":int(final_billing) } save_user_data(user_data) # Save user data after submission