plan_maker_v2.0 / utilities.py
Ninad077's picture
Upload 8 files
01d5640 verified
import streamlit as st
import configparser
import os
import pandas as pd
import requests
from google.cloud import bigquery
from google.oauth2 import service_account
import logging
from datetime import datetime
from html_templates import button_styles
global_user_data = [] # For appending user_data
global_data_rows = [] # For appending data rows
# Function to initialize session state
def initialize_session_state():
if 'data' not in st.session_state:
st.session_state.data = []
if 'alphabet_counter' not in st.session_state:
st.session_state.alphabet_counter = 0
if 'serial_counter' not in st.session_state:
st.session_state.serial_counter = -1
if 'company_name' not in st.session_state:
st.session_state['company_name'] = "Company name not available"
if 'global_data_rows' not in st.session_state:
st.session_state.global_data_rows = []
if 'new_mapping' not in st.session_state:
st.session_state['new_mapping'] = {}
if 'file_data' not in st.session_state:
st.session_state['file_data'] = None
if 'mapping_mode' not in st.session_state:
st.session_state['mapping_mode'] = None # None, 'default', 'custom'
if 'button_clicked' not in st.session_state:
st.session_state['button_clicked'] = None
if 'business_head' not in st.session_state:
st.session_state['business_head'] = None
if 'company_id' not in st.session_state:
st.session_state['company_id'] = None
if 'company_name' not in st.session_state:
st.session_state['company_name'] = None
if 'currency' not in st.session_state:
st.session_state['currency'] = None
if 'bundle_by' not in st.session_state:
st.session_state['bundle_by'] = None
if 'plan_name' not in st.session_state:
st.session_state['plan_name'] = None
if 'plan_description' not in st.session_state:
st.session_state['plan_description'] = None
if 'selected_ordering_channel' not in st.session_state:
st.session_state['selected_ordering_channel'] = None
if 'fulfilling_location' not in st.session_state:
st.session_state['fulfilling_location'] = None
if 'application_id' not in st.session_state:
st.session_state['application_id'] = None
if 'selected_fee_type' not in st.session_state:
st.session_state['selected_fee_type'] = None
if 'selected_variable_type' not in st.session_state:
st.session_state['selected_variable_type'] = None
if 'selected_chargeable_on' not in st.session_state:
st.session_state['selected_chargeable_on'] = None
if 'selected_fee_nature' not in st.session_state:
st.session_state['selected_fee_nature'] = None
if 'user_input_3' not in st.session_state:
st.session_state['user_input_3'] = None
if 'usage_3' not in st.session_state:
st.session_state['usage_3'] = None
if 'Capping_or_Minimum_Guarantee_3' not in st.session_state:
st.session_state['Capping_or_Minimum_Guarantee_3'] = None
if 'threshold' not in st.session_state:
st.session_state['threshold'] = None
if 'expected_billing' not in st.session_state:
st.session_state['expected_billing'] = None
if 'selected_payment_method' not in st.session_state:
st.session_state['selected_payment_method'] = None
if 'selected_plan_validity' not in st.session_state:
st.session_state['selected_plan_validity'] = None
if 'final_billing' not in st.session_state:
st.session_state['final_billing'] = None
if 'fee_reversal' not in st.session_state:
st.session_state['fee_reversal'] = None
if 'reversal_per' not in st.session_state:
st.session_state['reversal_per'] = None
if 'data' not in st.session_state:
st.session_state['data'] = []
# Function to save user data
def save_user_data(user_data):
if 'data' not in st.session_state:
st.session_state['data'] = []
st.session_state.data.append(user_data)
# Function to save Slab data
def save_slab_data(new_row):
st.session_state.data.append(new_row)
# Function to generate PM_id
def generate_rule_id(user_data):
company_id = user_data.get('Company ID', '')
ordering_channels = user_data.get('Product lines', '')
fulfilling_location = user_data.get('Fulfilling Location', '')
application_id = user_data.get('Application ID', '')
selected_fee_type = user_data.get('Fee Type', '')
selected_variable_type = user_data.get('Variable Type', '')
selected_chargeable_on = user_data.get('Chargeable on', '')
selected_fee_nature = user_data.get('Fee Nature', '')
threshold = user_data.get('Threshold option', '')
# Ensure all fields are strings and handle None or empty values
company_id = str(company_id).zfill(5)[:5] if company_id else '00000'
ordering_channels = str(ordering_channels)[:2] if ordering_channels else '00'
fulfilling_location = str(fulfilling_location)[:2] if fulfilling_location else '00'
application_id = str(application_id)[:2] if application_id else '00'
selected_fee_type = str(selected_fee_type)[:2] if selected_fee_type else '00'
selected_variable_type = str(selected_variable_type)[:2] if selected_variable_type else '00'
selected_chargeable_on = str(selected_chargeable_on)[:2] if selected_chargeable_on else '00'
selected_fee_nature = str(selected_fee_nature)[:2] if selected_fee_nature else '00'
threshold = str(threshold)[:2] if threshold else '00'
# Extract first 2 characters from each field and ensure proper length
rule_id_parts = [
company_id, # Ensure 5 characters
ordering_channels, # Ensure 2 characters
fulfilling_location, # Ensure 2 characters
application_id, # Ensure 2 characters
selected_fee_type, # Ensure 2 characters
selected_variable_type, # Ensure 2 characters
selected_chargeable_on, # Ensure 2 characters
selected_fee_nature, # Ensure 2 characters
threshold # Ensure 2 characters
]
rule_id = '_'.join(rule_id_parts) # Join parts with an underscore
return rule_id
# Function to generate Plan ID
def generate_plan_id(user_data):
initialize_session_state() # Ensure session state is initialized
alphabet_counter = st.session_state.alphabet_counter
serial_counter = st.session_state.serial_counter
company_id = user_data.get('Company ID', '') # Get company_id from user_data
if company_id is None or not company_id.strip():
company_id = ''
# Increment serial counter and reset if it reaches 999999
serial_counter += 1
if serial_counter > 999999:
serial_counter = 0
alphabet_counter += 1
# Save updated counters back to session state
st.session_state['alphabet_counter'] = alphabet_counter
st.session_state['serial_counter'] = serial_counter
# Calculate alphabet ('a' to 'z')
current_alphabet = chr(alphabet_counter % 26 + ord('a'))
# Format serial counter as six-digit number
serial_number = str(serial_counter).zfill(6)
# Get the first 5 characters of company_id and pad it with zeros
company_id_part = company_id[:5].zfill(5)
# Generate the plan_id
plan_id = f"{company_id_part}_{current_alphabet}_{serial_number}"
return plan_id
# Function to display saved records as a table and save to CSV
def display_saved_records():
df = pd.DataFrame(st.session_state.data)
if not df.empty:
# Apply generate_rule_id function to each row to generate rule_id
df['rule_id'] = df.apply(lambda row: generate_rule_id(row), axis=1)
# Display DataFrame with increased height
# st.write(df.style.set_table_attributes('style="font-size: 14px; line-height: 18px; width: auto; height: auto;"'))
# Save records to CSV
folder_path = "data" # Folder name
os.makedirs(folder_path, exist_ok=True) # Create folder if it doesn't exist
file_path = os.path.join(folder_path, "records.csv")
# Save DataFrame to CSV without index
df.to_csv(file_path, index=False)
# Custom CSS styles
button_styles = """
<style>
.stDownloadButton button {
color: #ffffff; /* Text color */
font-size: 20px;
background-image: linear-gradient(0deg, #a689f6 3%, #413bb9 61%);
border: none;
padding: 10px 20px;
cursor: pointer;
border-radius: 15px;
display: inline-block;
text-decoration: none;
text-align: center;
transition: background-color 0.3s ease;
}
.stDownloadButton button:hover {
background-color: #00ff00; /* Hover background color */
color: #ff0000; /* Hover text color */
}
</style>
"""
# Define file path (replace with your actual file path)
folder_path = "/Users/ninadmandavkar/Desktop/Fynd/plan_maker_2/data/"
file_path = os.path.join(folder_path, "records.csv")
# Function to read file data
def read_file(file_path):
with open(file_path, 'rb') as f:
file_data = f.read()
return file_data
# Download button with custom styling
# st.download_button(
# label="Download CSV",
# data=read_file(file_path),
# file_name="records.csv",
# mime="text/csv",
# )
# Function to load theme configuration
def load_theme_config():
config = configparser.ConfigParser()
config.read('config.toml')
# Extract theme settings
theme_settings = config['theme']
return theme_settings
# Function to handle custom button click
def custom_button_click():
st.session_state.button_clicked = 'custom'
# Function to handle default button click
def default_button_click():
st.session_state.button_clicked = 'default'
def uploader(uploaded_file):
try:
# Read the file, treating the first row as the header
if uploaded_file.name.endswith('.csv'):
data = pd.read_csv(uploaded_file, header=0)
elif uploaded_file.name.endswith('.xlsx'):
data = pd.read_excel(uploaded_file, header=0)
# Rename the headers
new_headers = ['ordering_channel', 'fee_type', 'variable_type', 'chargeable_on']
data.columns = new_headers
# Save the data to session state
st.session_state['file_data'] = data
# Initialize the mapping
new_mapping = {}
# Build the mapping
for _, row in data.iterrows():
ordering_channel = row['ordering_channel']
fee_type = row['fee_type']
variable_type = row['variable_type']
chargeable_on = row['chargeable_on']
if ordering_channel not in new_mapping:
new_mapping[ordering_channel] = {}
if fee_type not in new_mapping[ordering_channel]:
new_mapping[ordering_channel][fee_type] = {}
if variable_type not in new_mapping[ordering_channel][fee_type]:
new_mapping[ordering_channel][fee_type][variable_type] = []
if chargeable_on not in new_mapping[ordering_channel][fee_type][variable_type]:
new_mapping[ordering_channel][fee_type][variable_type].append(chargeable_on)
st.session_state['new_mapping'] = new_mapping
st.session_state['mapping_mode'] = 'custom'
except Exception as e:
st.error(f"Error reading the file: {e}")
def render_selectboxes(session_state, new_mapping):
selected_ordering_channel = st.sidebar.selectbox(
"Product lines",
[""] + list(new_mapping.keys()),
help="Select a Product line",
key='ordering_channel'
)
if selected_ordering_channel:
selected_ordering_channel = str(selected_ordering_channel) or int(selected_ordering_channel)
# Determine fulfilling_location and application_id based on selected_ordering_channel
if any(channel in selected_ordering_channel for channel in ["TMS", "GMC", "Catalog Cloud"]):
session_state['fulfilling_location'] = None
session_state['application_id'] = None
else:
abc = ["Store", "Warehouse"]
session_state['fulfilling_location'] = st.sidebar.selectbox(
"Fulfilling Location", [""] + abc, help="Select the fulfilling location"
)
session_state['application_id'] = st.sidebar.text_input(
"Application ID", key="application_id_input", help="Enter the application ID"
)
# Use the session state variables in your app
fulfilling_location = session_state['fulfilling_location']
application_id = session_state['application_id']
# 2nd layer: Select fee type based on the selected ordering channel
if 'selected_fee_type' not in session_state:
session_state['selected_fee_type'] = None
fee_types_for_channel = new_mapping.get(selected_ordering_channel, {}).keys()
fee_types_for_channel = list(fee_types_for_channel)
if fee_types_for_channel:
# Define selected_fee_type based on user input
session_state['selected_fee_type'] = st.selectbox(
"Fee Type",
[""] + fee_types_for_channel,
help="Fee type is a revenue head for grouping the revenue",
key='fee_type'
)
# Access selected_fee_type for any session state operations
selected_fee_type = session_state['selected_fee_type']
# 3rd layer: Select variable type based on the selected fee type
variable_type_options = []
if selected_fee_type:
details = new_mapping.get(selected_ordering_channel, {})
variables = details.get(selected_fee_type, {})
variable_type_options = list(variables.keys())
session_state['selected_variable_type'] = st.selectbox(
"Variable Type",
[""] + variable_type_options,
help="Variable type is an attribute on which revenue calculation will be done",
key='variable_type'
)
selected_variable_type = session_state['selected_variable_type']
# 4th layer: Select chargeable on based on the selected variable type
chargeable_on_options = []
if selected_variable_type:
chargeable_on_options = variables.get(selected_variable_type, [])
session_state['selected_chargeable_on'] = st.selectbox(
"Chargeable on",
[""] + chargeable_on_options,
help="Chargeable on is a service status on which the above fee will be applicable",
key='chargeable_on'
)
selected_chargeable_on = session_state['selected_chargeable_on']
else:
st.write("No fee types available for the selected ordering channel.")
else:
st.write("")
def slab_data(slab, max_value, user_input, Usage, Capping_or_Minimum_Guarantee, threshold):
# Convert values to int if not None
max_value = int(max_value.strip()) if max_value else None
user_input_11 = int(user_input.strip()) if user_input else None
try:
user_input_22 = float(user_input_11)
user_input = "{:.2f}".format(user_input_22)
except (TypeError, ValueError):
user_input = 0
Usage = int(Usage.strip()) if Usage else None
Capping_or_Minimum_Guarantee = int(Capping_or_Minimum_Guarantee.strip()) if Capping_or_Minimum_Guarantee else None
# Create a new row dictionary
new_row = {
"Slab": slab,
"Max_value": max_value,
"Commercial_value": user_input,
"Usage": Usage,
"Capping/Min_Guarantee_value": Capping_or_Minimum_Guarantee,
"Threshold": threshold
}
# Append new_row to global list of rows in session state
st.session_state.global_data_rows.append(new_row)
def slab_entry():
# Initialize session state if not already initialized
if 'global_data_rows' not in st.session_state:
st.session_state.global_data_rows = []
# Inputs for adding data
col1, col2 = st.columns([2, 2])
with col1:
slab = st.selectbox("Slab:", [1, 2, 3], key="slab_select") # Selectbox for slab numbers
user_input = st.text_input("Commercial Value:", key="user_input")
Capping_or_Minimum_Guarantee = st.text_input("Capping/Min_Guarantee:", key="Capping_or_Minimum_Guarantee_input")
with col2:
max_value = st.text_input("Max value:", key="max_value_input")
Usage = st.text_input("Usage:", key="Usage_input")
threshold = st.text_input("Threshold:", key="threshold_input")
# Button to add data
if st.button("Add Data", key="add_data_button"):
if not (slab and user_input and Capping_or_Minimum_Guarantee):
st.warning("Please fill in all required fields.")
else:
slab_data(slab, max_value, user_input, Usage, Capping_or_Minimum_Guarantee, threshold)
st.success(f"Data for Slab '{slab}' added successfully!")
# Display all entries in a single table
if 'global_data_rows' in st.session_state and st.session_state.global_data_rows:
st.subheader("All Slabs Data Summary")
for i, row in enumerate(st.session_state.global_data_rows):
st.write(f"Entry {i + 1}:")
st.table([row])
# Run the slab_entry function when the script is executed
if __name__ == "__main__":
slab_entry()
def store_and_fetch_company_info(company_id):
if company_id.strip():
try:
company_id_int = int(company_id.strip())
st.session_state['company_id'] = company_id.strip() # Always store the company id in session state
except ValueError:
st.error("Invalid Company ID. Please enter a valid integer.")
company_id_int = None
# Construct the API URL and fetch data only if company_id_int is valid
if company_id_int is not None:
try:
api_url = f"https://api.boltic.io/service/platform/bolt/share/v1/data?api_token=eyJpZCI6IjU4OTdmZjZlLTMwMTYtNDZjMC1hMzdlLTJmNjBkMjhjYjc5YyIsInRlbmFudCI6ImdvZnluZGNvbSJ9&batch_size=10000&display_meta=true&cmpid={company_id_int}"
response = requests.get(api_url, verify=False )
if response.status_code == 200:
data = response.json()
if "results" in data and len(data["results"]) > 0:
st.session_state['company_name'] = data["results"][0].get("company_name", "Company name not found")
else:
st.session_state['company_name'] = "Company name not found"
else:
st.session_state['company_name'] = f"Error: Unable to fetch data (Status code: {response.status_code})"
except Exception as e:
st.session_state['company_name'] = f"An error occurred: {e}"
else:
st.session_state['company_name'] = "Company ID is invalid or missing."
else:
st.session_state['company_name'] = "Company ID cannot be empty."
def get_feature_user_data(business_head, company_id, company_name, currency, bundle_by, plan_name, plan_description,
selected_ordering_channel, fulfilling_location, application_id, selected_fee_type,
selected_variable_type, selected_chargeable_on, selected_fee_nature, user_input_3,
usage_3, Capping_or_Minimum_Guarantee_3, threshold, expected_billing, selected_payment_method,
selected_plan_validity, final_billing):
return {
"Business_Head": business_head,
"Company_ID": company_id.strip() if company_id else None,
"Company_Name": company_name,
"Currency": currency,
"Bundle_by": bundle_by,
"Plan_Name": plan_name,
"Plan_Description": plan_description,
"Product_lines": selected_ordering_channel,
"Fulfilling_Location": fulfilling_location,
"Application_ID": application_id,
"Fee_Type": selected_fee_type,
"Variable_Type": selected_variable_type,
"Chargeable_on": selected_chargeable_on,
"Fee_Nature": selected_fee_nature,
"Commercial_value": user_input_3,
"Fee_reversal": 0,
"Reversal_%": 0,
"Usage_limit": usage_3,
"Threshold_value": Capping_or_Minimum_Guarantee_3,
"Threshold_option": threshold,
"Expected_Billing": expected_billing,
"Payment_Method": selected_payment_method,
"Plan_Validity": selected_plan_validity,
"Net_total_billing": int(final_billing)
}
def insert_into_bq(user_data, project_id, dataset_id, table_id, service_account_info):
initialize_session_state()
try:
# Use from_service_account_info instead of from_service_account_file
credentials = service_account.Credentials.from_service_account_info(service_account_info)
# Initialize a BigQuery client
client = bigquery.Client(credentials=credentials, project=project_id)
logging.info("BigQuery client initialized successfully.")
# Specify the table reference
table_ref = f"{project_id}.{dataset_id}.{table_id}"
table = client.get_table(table_ref)
logging.info(f"Table reference obtained: {table_ref}")
# Insert form data into the table
rows_to_insert = [user_data]
errors = client.insert_rows_json(table, rows_to_insert)
if errors:
logging.error(f"Errors occurred while inserting rows: {errors}")
st.error(f"Errors occurred while inserting rows: {errors}")
return False
else:
logging.info("Insert successful")
st.success("Insert successful")
return True
except Exception as e:
logging.exception("An error occurred during the BigQuery insert operation.")
st.error(f"An error occurred: {e}")
return False
project_id = "fynd-db"
dataset_id = "finance_dwh"
table_id = "plan_v2"
# Access session state variables safely
# Retrieve variables from session state with default values
business_head = st.session_state.get('business_head', '')
company_id = st.session_state.get('company_id', '')
company_name = st.session_state.get('company_name', '')
currency = st.session_state.get('currency', '')
bundle_by = st.session_state.get('bundle_by', '')
plan_name = st.session_state.get('plan_name', '')
plan_description = st.session_state.get('plan_description', '')
selected_ordering_channel = st.session_state.get('selected_ordering_channel', '')
fulfilling_location = st.session_state.get('fulfilling_location', '')
application_id = st.session_state.get('application_id', '')
selected_fee_type = st.session_state.get('selected_fee_type', '')
selected_variable_type = st.session_state.get('selected_variable_type', '')
selected_chargeable_on = st.session_state.get('selected_chargeable_on', '')
selected_fee_nature = st.session_state.get('selected_fee_nature', '')
user_input_3 = st.session_state.get('user_input_3', 0) # Assuming numeric default
usage_3 = st.session_state.get('usage_3', 0) # Assuming numeric default
Capping_or_Minimum_Guarantee_3 = st.session_state.get('Capping_or_Minimum_Guarantee_3', 0) # Assuming numeric default
threshold = st.session_state.get('threshold', '')
expected_billing = st.session_state.get('expected_billing', 0) # Assuming numeric default
selected_payment_method = st.session_state.get('selected_payment_method', '')
selected_plan_validity = st.session_state.get('selected_plan_validity', '')
rule_id = st.session_state.get('rule_id', '')
plan_id = st.session_state.get('plan_id', '')
created_at = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
final_billing = st.session_state.get('final_billing', 0)
# Handle the case where final_billing is None
if final_billing is None:
final_billing = 0 # or another default value
else:
final_billing = int(final_billing)
user_data = {
"Business_Head": business_head,
"Company_ID": company_id,
"Company_Name": company_name,
"Currency": currency,
"Bundle_by": bundle_by,
"Plan_Name": plan_name,
"Plan_Description": plan_description,
"Product_lines": selected_ordering_channel,
"Fulfilling_Location": fulfilling_location,
"Application_ID": application_id,
"Fee_Type": selected_fee_type,
"Variable_Type": selected_variable_type,
"Chargeable_on": selected_chargeable_on,
"Fee_Nature": selected_fee_nature,
"Commercial_value": user_input_3,
"Fee_reversal": 0,
"Reversal_%": 0,
"Usage_limit": usage_3,
"Threshold_value": Capping_or_Minimum_Guarantee_3,
"Threshold_option": threshold,
"Expected_Billing": expected_billing,
"Payment_Method": selected_payment_method,
"Plan_Validity": selected_plan_validity,
"rule_id": rule_id,
"plan_id": plan_id,
"created_at": created_at,
"Net_total_billing":int(final_billing)
}
save_user_data(user_data) # Save user data after submission