QuotationChatbot_v5 / common_functions_v4.py
jess
chore: clean up buttons, and debug logical error (prompt editor, prompt keys)
69cbcdc
import time
import uuid
import numpy as np
import pandas as pd
from io import StringIO
# from pathlib import Path
from enum import Enum
import psycopg2
from psycopg2.extras import RealDictCursor
import gradio as gr
import datetime
from Project import *
import json
import os
from dotenv import load_dotenv
from langtrace_python_sdk import langtrace
from state import state
from prompt_configs import *
import asyncio
import re
import base64
no_active_session = "**Current Session**:None"
class SLEEP_TIME(Enum):
"""Enum class containing sleep time constants."""
ONE_AND_HALF_SEC = 1.5 # Default sleep time in seconds
TWO_SEC = 1.8 # Default sleep time in seconds
THREE_SEC = 3 # Default sleep time in seconds
load_dotenv()
api_key = os.getenv("LANGTRACE_API_KEY")
if api_key is None:
raise ValueError("Environment variable 'LANGTRACE_API_KEY' is not set. Please set it in your .env file.")
langtrace.init(api_key=api_key)
#TODO: Since now we are merging both page and engage , we have to pull the project type in order to load the correct project
def get_db_connection():
"""Establishes and returns a new database connection."""
db_params = {
'dbname': os.getenv('DB_NAME'),
'user': os.getenv('DB_USER'),
'password': os.getenv('DB_PASSWORD'),
'host': os.getenv('DB_HOST'),
'port': os.getenv('DB_PORT')
}
conn = psycopg2.connect(**db_params)
return conn
def get_latest_components():
"""Fetches the latest project rubric for the project 'Page'."""
try:
conn = get_db_connection()
cur = conn.cursor(cursor_factory=RealDictCursor)
cur.execute("""
SELECT base_project_name,module,submodule,unit_type,quantity,mandays_per_unit
FROM base_project_component pc
WHERE (pc.base_project_name, pc.component_version) IN (
SELECT base_project_name, MAX(component_version)
FROM base_project_component
GROUP BY base_project_name
)
ORDER BY pc.base_project_name;
""")
component = cur.fetchall()
cur.close()
conn.close()
return component
except Exception as e:
return {
'status': 'error',
'message': str(e)
}
def get_section_name_and_rubric_list():
"""Fetches the latest project rubric for the project 'Page'."""
try:
conn = get_db_connection()
cur = conn.cursor(cursor_factory=RealDictCursor)
cur.execute("""
SELECT section_name, criteria, initial_question,explanation, mvp, quantifiable_value
FROM base_project_rubric
WHERE LOWER(base_project_name) = LOWER('Page')
AND rubric_version = (
SELECT MAX(rubric_version)
FROM base_project_rubric
WHERE LOWER(base_project_name) = LOWER('Page')
)
ORDER BY section_name,
CASE mvp
WHEN 'high' THEN 1
WHEN 'med' THEN 2
WHEN 'low' THEN 3
ELSE 4
END;
""")
rubric = cur.fetchall()
cur.close()
conn.close()
# Convert feedback to a list of dictionaries for JSON serialization
rubric_list = [dict(row) for row in rubric]
section_name_list = {row['section_name']: dict(row) for row in rubric}.keys()
# print(f"in get_section_name_and_rubric_list: {rubric_list}, {section_name_list}")
print(f"in get_section_name_and_rubric_list: {section_name_list}")
return section_name_list, rubric_list
except Exception as e:
return {
'status': 'error',
'message': str(e)
}
def update_session_project_requirements(session_id, project_detail):
"""Update project requirements for a session in the database.
Args:
session_id: ID of the session to update
project_detail: Project requirements to store
Returns:
None
Raises:
Exception if database update fails
"""
if session_id:
try:
conn = get_db_connection()
cur = conn.cursor()
# Update project_requirement in sessions table
cur.execute("""
UPDATE sessions
SET project_requirement = %s
WHERE session_id = %s
""", (json.dumps(project_detail), session_id))
conn.commit()
cur.close()
conn.close()
except Exception as e:
print(f"Error updating session: {str(e)}")
raise
def sanitize_text(text):
"""Remove or replace special characters from text"""
# Replace single quotes with double quotes to avoid string formatting issues
text = text.replace("'", '')
# Remove or replace other problematic characters as needed
# Add more replacements here if needed
return text
async def run_question_agent(quotation_project):
configuration_output = await quotation_project.async_execute_prompt("questioning_agent", {"project_detail": quotation_project.get_project_detail()})
try:
config = quotation_project._parse_json_response(configuration_output)
quotation_project.config = config
log_prompt(PROMPTS['component_agent'].step,
PROMPTS['component_agent'].description,
PROMPTS["component_agent"].prompt,
configuration_output)
selected_functions = config[0]["selected_functions"]
print(f"Selected {len(selected_functions)} component to generate")
except Exception as e:
print(f"Error in analyzing configuration: {e}")
if not selected_functions:
# TO DO: there has to be a way to handle this
print("No question generator selected.")
return None
# Execute only the first selected function
function_name = selected_functions[0]
try:
next_question = await quotation_project.async_execute_prompt(function_name)
log_prompt(PROMPTS[function_name].step,
PROMPTS[function_name].description,
PROMPTS[function_name].prompt,
next_question)
return f"## Project Configuration: {config[0]['configuration_type']}\n\n{next_question}"
except Exception as e:
print(f"Error executing {function_name}: {e}")
return None
def save_prompt_to_db(new_prompt: str) -> None:
"""
Saves a prompt to the database after base64 encoding.
Args:
new_prompt: The prompt text to save
Raises:
Exception: If database connection or insertion fails
"""
try:
new_prompt = f"{new_prompt}"
# Encode the prompt in base64
encoded_prompt = base64.b64encode(new_prompt.strip().encode('utf-8')).decode('utf-8')
# Get database connection using helper function
conn = get_db_connection()
cur = conn.cursor()
insert_sql = "INSERT INTO config (config_body) VALUES (%s)"
cur.execute(insert_sql, (encoded_prompt,))
conn.commit()
cur.close()
conn.close()
except Exception as e:
print(f"Error saving prompt to database: {str(e)}")
raise
def get_latest_prompt_from_db() -> dict:
"""
Retrieves the latest prompt from the database, decodes it from base64, and parses as JSON.
Returns:
dict: The decoded and parsed prompt configuration
Raises:
Exception: If database connection or retrieval fails
"""
try:
# Get database connection using helper function
conn = get_db_connection()
cur = conn.cursor()
# Get the latest config entry by ID
select_sql = "SELECT id, config_body FROM config ORDER BY id DESC LIMIT 1"
cur.execute(select_sql)
result = cur.fetchone()
cur.close()
conn.close()
if result:
config_id, encoded_prompt = result
print(f"get_latest_prompt_from_db: config_id: {config_id}")
# Decode from base64
decoded_prompt = base64.b64decode(encoded_prompt).decode('utf-8')
# with open(f'db_config_log_{datetime.datetime.now().strftime("%Y%m%d_%H%M%S")}.txt', 'w') as f:
# f.write(f"Loaded configuration from database: {decoded_prompt}")
# Parse as JSON
# prompt_config = json.loads(decoded_prompt)
return decoded_prompt
else:
print("No prompt configurations found in database")
return {}
except Exception as e:
print(f"Error retrieving prompt from database: {str(e)}")
raise
def process_response(answer, history):
"""Process user responses and generate appropriate follow-up questions."""
try:
# Convert history to list if it's not already
if not isinstance(history, list):
history = []
# Sanitize the answer before processing
sanitized_answer = sanitize_text(str(answer))
# Add the user's answer to project details
state.quotation_project.add_project_detail(sanitized_answer)
# Call the function with project details
if state.quotation_project.session_id:
update_session_project_requirements(
state.quotation_project.session_id,
state.quotation_project.project_detail
)
# Generate next question based on conversation stage
if len(history) == 1: # After first client information question
next_question = state.quotation_project.generate_client_follow_up()
elif len(history) == 2: # After client follow-up
# next_question = quotation_project.generate_questions()
next_question = state.quotation_project.gather_project_input()
else: # Subsequent project requirements questions
# next_question = quotation_project.generate_follow_up()
next_question = state.quotation_project.review_project_input()
# Ensure we're adding a proper tuple to hisxtory
if isinstance(answer, str) and isinstance(next_question, str):
history.append((answer, next_question))
return history, next_question
except Exception as e:
print(f"Error in process_response: {str(e)}")
return history, "Error in generating follow up questions"
def log_chat_history(chat_history, filename="chat_logs.txt"):
"""
Logs chat history to a text file in a clean, readable format.
Args:
chat_history (list): List of dictionaries containing chat messages
filename (str): Name of the log file (default: chat_logs.txt)
"""
with open(filename, 'a', encoding='utf-8') as f:
f.write("Chat History\n")
f.write("="*50 + "\n\n")
for i, message in enumerate(chat_history, 1):
# Message number and role
f.write(f"Message #{i}\n")
f.write(f"Role: {message['role'].upper()}\n\n")
# Write metadata if it exists and is not empty
if message.get('metadata') and message['metadata']:
f.write("Metadata:\n")
for key, value in message['metadata'].items():
f.write(f" • {key}: {value}\n")
f.write("\n")
# Write content
f.write("Content:\n")
f.write(f"{message['content']}\n")
# Add separator between messages
f.write("\n" + "-"*50 + "\n\n")
async def clean_sample_answers(text):
"""Clean up sample answers in the text by removing 'Sample:' and its content.
Args:
text (str): Input text containing sample answers
Returns:
str: Cleaned text with 'Sample:' and its content replaced by 'Answer:'
"""
try:
if not text:
return text
print("Starting to clean sample answers...")
# Replace all '*' or '#' with 'Answer:'
cleaned_text = text.replace('*', '').replace('#', '').replace('-', '')
# this works
# cleaned_text = re.sub(
# r'(?i)\s*(?:\(|\*)?\s*Sample(?:\s*Answer)?\s*:\s*(?:"[^"]*"|[^\)\*\n]*)\s*(?:\)|\*)?',
# '\nAnswer:',
# cleaned_text,
# flags=re.MULTILINE
# )
cleaned_text = re.sub(
r'(?i)\s*(?:\(|\*)?\s*Sample(?:\s*Answers?)?\s*:\s*(?:"[^"]*"|[^\)\*\n]*)\s*(?:\)|\*)?',
'\nAnswer:\n',
cleaned_text,
flags=re.MULTILINE
)
print("Sample answers cleaned successfully.")
return cleaned_text
# return cleaned_text
except Exception as e:
print(f"Error cleaning sample answers: {e}")
return text
# Example usage:
# chat_history = [{'role': 'user', ...}, {'role': 'assistant', ...}]
# log_chat_history(chat_history)
#
#TODO: Ensure it directs towards correct question
async def async_process_response(answer, history):
"""Process user responses and generate appropriate follow-up questions."""
print(f"[DEBUG] Entering async_process_response")
# print(f"[DEBUG] Input answer: {answer}")
log_chat_history(history)
# print(f"[DEBUG] Input history: {history}")
try:
# Convert history to list if it's not already
if not isinstance(history, list):
# print("[DEBUG] Converting history to list")
history = []
# print("[DEBUG] Sanitizing answer")
sanitized_answer = sanitize_text(str(answer))
# print("[DEBUG] Adding project detail")
if sanitized_answer is not None and len(sanitized_answer) > 0:
state.quotation_project.add_project_detail(sanitized_answer)
start_time = time.time()
project_detail_len = len(state.quotation_project.project_detail)
# Determine which function and prompt config to use based on project detail length
if project_detail_len == 1:
function_to_run = state.quotation_project.generate_client_follow_up
prompt_config = PROMPTS["generate_client_follow_up"]
elif project_detail_len == 2:
function_to_run = lambda: run_question_agent(state.quotation_project)
prompt_config = PROMPTS["questioning_agent"]
elif project_detail_len in [3, 4]:
function_to_run = state.quotation_project.generate_further_follow_up_questions
prompt_config = PROMPTS["generate_further_follow_up_questions"]
elif project_detail_len == 5:
function_to_run = state.quotation_project.generate_general_questions
prompt_config = PROMPTS["generate_general_questions"]
else:
function_to_run = state.quotation_project.generate_further_follow_up_questions
prompt_config = PROMPTS["generate_further_follow_up_questions"]
# Create response object with prompt config description as title
response = gr.ChatMessage(
content="",
metadata={
"title": f"_{prompt_config.description}_",
"id": 0,
"status": "pending"
}
)
yield response, ""
# Rest of the function remains the same
next_question_task = asyncio.create_task(function_to_run())
# Update project detail with new list;
if state.quotation_project.session_id:
update_session_project_requirements(
state.quotation_project.session_id,
state.quotation_project.project_detail
)
accumulated_thoughts = ""
thought_index = 0
while not next_question_task.done():
thought = prompt_config.thoughts[thought_index % len(prompt_config.thoughts)]
thought_index += 1
# print(f"[DEBUG] Current thought: {thought}")
await asyncio.sleep(SLEEP_TIME.TWO_SEC.value)
accumulated_thoughts += f"- {thought}\n\n"
response.content = accumulated_thoughts.strip()
# print(f"[DEBUG] Yielding updated thoughts: {response.content}")
yield response, ""
next_question = await next_question_task
print(f"[DEBUG] Next question: {next_question}")
user_input_template = await clean_sample_answers(next_question)
response.metadata["status"] = "done"
response.metadata["duration"] = time.time() - start_time
yield response, ""
response_list = [
response,
gr.ChatMessage(content=next_question)
]
yield response_list, user_input_template
except Exception as e:
print(f"[DEBUG] Error in async_process_response: {str(e)}")
print(f"[DEBUG] Error type: {type(e)}")
yield history, "Error in generating follow up questions"
#TODO: Create calculate mandays for general and mvp
def calculate_mandays_and_costs(generated_results=None):
try:
total_mandays = 0
if generated_results:
for result in generated_results:
if 'result' in result:
result_content = result['result']
# Handle nested dictionary structure
if isinstance(result_content, dict):
# Check if the result contains function-specific data
function_name = result.get('function_name', '')
if function_name in result_content:
records = result_content[function_name]
# Sum up mandays from all records
for record in records:
if 'mandays' in record:
try:
mandays = float(record['mandays'])
total_mandays += mandays
except (ValueError, TypeError):
print(f"Invalid mandays value in record: {record['mandays']}")
continue
# Calculate costs based on total mandays
total_cost = 1500 * total_mandays
estimated_months = total_mandays/30
return (total_mandays, total_cost, estimated_months)
except Exception as e:
print(f"Error calculating mandays and costs: {str(e)}")
return tuple([None] * 3)
def calculate_mvp_mandays_and_costs(generated_mvp_results):
try:
total_mvp_mandays = 0
if generated_mvp_results:
for result in generated_mvp_results:
if 'result' in result:
result_content = result['result']
for section_name, section_data in result_content.items():
if isinstance(section_data, list):
for record in section_data:
mandays = float(record.get('mandays', 0))
total_mvp_mandays += mandays
total_mvp_cost = 1500 * total_mvp_mandays
estimated_mvp_months = total_mvp_mandays / 30
return (total_mvp_mandays, total_mvp_cost, estimated_mvp_months)
except Exception as e:
print(f"Error calculating MVP mandays and costs: {str(e)}")
return 0, 0, 0
def create_new_session():
"""Create a new session in the database and return the session_id"""
try:
conn = get_db_connection()
cur = conn.cursor()
# Insert new session with start time
cur.execute("""
INSERT INTO sessions (start_time)
VALUES (CURRENT_TIMESTAMP)
RETURNING session_id
""")
session_id = cur.fetchone()[0]
# Insert session_base_project record for "Page"
cur.execute("""
INSERT INTO session_base_project (session_id, base_project_name)
VALUES (%s, 'Page')
""", (session_id,))
conn.commit()
cur.close()
conn.close()
return session_id
# return 161
except Exception as e:
print(f"Error creating new session: {str(e)}")
return None
def start_chat():
"""Initialize chat with first question and create new session"""
# Create new session and get session_id
session_id = create_new_session()
# Set the rubric and session_id for the project
state.quotation_project.reset_project()
state.quotation_project.session_id = session_id
# Get the initial question from prompts config
initial_prompt = PROMPTS["client_initial_question"].prompt
# Get project state and combine with session display
status, requirements = get_project_state()
session_display = f"Current Session: {session_id}"
# Return exactly three values as expected by the Gradio interface
# return initial_prompt, initial_history, f"Current Session: {session_id}"
return initial_prompt,session_display
def get_project_state():
"""Get current state of quotation_project project"""
# Create status boxes
status = f"""Session ID: {state.quotation_project.session_id}
Rubric Loaded: {bool(state.quotation_project.rubric)}
Components Loaded: {bool(state.quotation_project.component_list)}
Requirements Loaded: {bool(state.quotation_project.project_detail)}"""
# Format requirements as a table if they exist
requirements_table = ""
if state.quotation_project.project_detail:
print(f"\n\nrequirements : {type(state.quotation_project.project_detail)}")
# Create markdown box for requirements
# requirements_table = "\n\n### Project Requirements\n```markdown\n"
for index,requirement in enumerate(list(state.quotation_project.project_detail)):
requirements_table += f"\n_____________\n"
requirements_table += f"#Requirement {index+1}\n {requirement}"
return status, requirements_table
def fetch_session(session_id):
"""Fetch session details from database and initialize project state"""
try:
# 1. Fetch session details
conn = get_db_connection()
cur = conn.cursor(cursor_factory=RealDictCursor)
cur.execute("""
SELECT project_requirement, start_time
FROM sessions
WHERE session_id = %s
""", (session_id,))
session = cur.fetchone()
cur.close()
conn.close()
print(session)
if session:
# 2. Update quotation_project with session data
state.quotation_project.session_id = session_id
if session['project_requirement']:
try:
# Check if the project requirement is a string
if isinstance(session['project_requirement'], str):
# Attempt to parse it as JSON
try:
requirements = json.loads(session['project_requirement'])
except json.JSONDecodeError:
# If JSON parsing fails, split the string into a list
requirements = session['project_requirement'].split('\n') # or use another delimiter if needed
else:
requirements = session['project_requirement']
# Clear existing details and set new ones
state.quotation_project.project_detail = []
for requirement in requirements:
state.quotation_project.add_project_detail(requirement.strip()) # Use strip() to remove any leading/trailing whitespace
except Exception as e:
return "", "", f"Error processing project requirements in session {session_id}: {str(e)}", no_active_session
section_name_list, rubric_list = get_section_name_and_rubric_list()
state.quotation_project.set_rubric(rubric_list)
state.quotation_project.set_rubric_section_names(section_name_list)
print("in fetch_session: loading config from db")
load_msg = state.quotation_project.load_config_from_db()
# 4. Fetch and set components
component_list = get_latest_components()
state.quotation_project.set_component_list(component_list)
return (*get_project_state(), f"Successfully loaded session {session_id} with all data, {load_msg}", f"Current Session: {session_id}")
# "\n".join(rubric_list), # Return rubric list as a string
# component_list) # Ensure to extract string values
else:
return "", "", f"Session {session_id} not found",no_active_session
# return "", "", f"Session {session_id} not found", "", ""
except Exception as e:
return "", "", f"Error fetching session: {str(e)}",no_active_session
# return "", "", f"Error fetching session: {str(e)}", "", ""
def log_prompt_execution(step_name, sub_step_name, prompt_text):
"""Log prompt execution to the database with a randomly generated prompt_id."""
created_at = datetime.datetime.now()
session_id = state.quotation_project.session_id
# Generate a random prompt_id
prompt_id = str(uuid.uuid4())
# Log the prompt execution into the table "prompts"
conn = None
try:
conn = get_db_connection()
cur = conn.cursor()
# Directly insert the prompt execution without checking for duplicates
cur.execute("""
INSERT INTO prompts (session_id, prompt_id, step_name, sub_step_name, prompt, created_at)
VALUES (%s, %s, %s, %s, %s, %s)
""", (session_id, prompt_id, step_name, sub_step_name, prompt_text, created_at))
conn.commit()
return prompt_id
except Exception as e:
print(f"Error logging prompt execution: {str(e)}")
return None
finally:
if conn:
conn.close()
def log_prompt_execution_output(prompt_id, output):
"""Save prompt execution output to the database"""
created_at = datetime.datetime.now()
output_id = str(uuid.uuid4())
try:
# Establish a database connection
conn = get_db_connection()
cur = conn.cursor()
# Directly insert the output into the outputs table without checking for duplicates
cur.execute("""
INSERT INTO outputs (output_id, prompt_id, output, created_at)
VALUES (%s, %s, %s, %s)
""", (output_id, prompt_id, output, created_at))
# Commit the transaction
conn.commit()
return output_id
except Exception as e:
print(f"Error logging prompt execution output: {str(e)}")
return None
finally:
if conn:
conn.close()
def update_prompt_execution_output(output_id, output):
"""Update prompt execution output in the database based on output_id"""
created_at = datetime.datetime.now()
try:
# Establish a database connection
conn = get_db_connection()
cur = conn.cursor()
# Convert output to a standard Python type if it's a numpy type
if isinstance(output, (np.integer, float)):
output = int(output) if isinstance(output, np.integer) else float(output)
# Update the existing output for the given output_id
cur.execute("""
UPDATE outputs
SET output = %s, created_at = %s
WHERE output_id = %s
""", (output, created_at, output_id))
# Commit the transaction
conn.commit()
print(f"Successfully updated output for {output_id}")
except Exception as e:
print(f"Error updating prompt execution output: {str(e)}")
return None
finally:
if conn:
conn.close()
def log_prompt(prompt_name: str, prompt_description: str, prompt: str, output: Any) -> None:
"""Log prompt execution and its output"""
try:
prompt_id = log_prompt_execution(prompt_name, prompt_description, prompt)
output_id = log_prompt_execution_output(prompt_id, output)
print(f"Succesfully logged {prompt_name}: as {output_id}")
return output_id
except Exception as e:
print(f"Error logging {prompt_name} generation: {str(e)}")