Spaces:
Sleeping
Sleeping
| import re | |
| import random | |
| from app_config import SYSTEM_PROMPT, SLOT_ID_PATTERN, INVOICE_NUM_PATTERN,ROOT_DIR | |
| from utils.schemas import ChatBotInput, SlotScheduleInput, SlotUpdateInput | |
| from PriceEstimation.classify import predict_class | |
| from PriceEstimation.price_predict import waste_tyre_price | |
| from langchain.agents import tool | |
| from langchain_core.messages import SystemMessage, HumanMessage | |
| from dotenv import load_dotenv | |
| from pathlib import Path | |
| import pandas as pd | |
| from datetime import datetime, timedelta | |
| import os | |
| import json | |
| env_path = Path('.') / '.env' | |
| load_dotenv(dotenv_path=env_path) | |
| import session_manager | |
| session_state = session_manager.get_session_state() | |
| def response_generator(prompt: str) -> str: | |
| """this function can be used for general quetion answers which are related to tyrex and tyre recycling | |
| Args: | |
| prompt (string): user query | |
| Returns: | |
| string: answer of the query | |
| """ | |
| try: | |
| session_state = session_manager.get_session_state() | |
| session_state.last_tool = "question-answer-tool" | |
| prompt = session_state.last_query | |
| print("my_Prompt is", prompt) | |
| print() | |
| print() | |
| retriever = session_state.retriever | |
| docs = retriever.invoke(prompt) | |
| my_context = [doc.page_content for doc in docs] | |
| my_context = '\n\n'.join(my_context) | |
| print("***************************** CONTEXT *****************************") | |
| print(my_context) | |
| system_message = SystemMessage(content=SYSTEM_PROMPT.format(context=my_context, previous_message_summary=session_state.rag_memory.moving_summary_buffer)) | |
| chat_messages = (system_message + session_state.rag_memory.chat_memory.messages + HumanMessage(content=prompt)).messages | |
| print("messages", chat_messages) | |
| print() | |
| print() | |
| response = session_state.llm.invoke(chat_messages) | |
| print(response) | |
| return response.content | |
| except Exception as error: | |
| print(error) | |
| return "Oops! something went wrong, please try again." | |
| def schedule_slot(address, tyre_counts) -> str: | |
| """this function can be used for scheduling or booking slot | |
| Args: | |
| address (string): Address to pickup the tyres | |
| tyre_counts (int): Number of the tyres to pickup | |
| Returns: | |
| string: final responce which user needs | |
| """ | |
| try: | |
| session_state = session_manager.get_session_state() | |
| session_state.last_tool = "slot-scheduling-tool" | |
| if "slot_booking_agent" in session_state.agent_history: | |
| session_state.agent_history['slot_booking_agent'].append({"role": "user", "content": session_state.last_query}) | |
| else: | |
| session_state.agent_history['slot_booking_agent'] = [{"role": "user", "content": session_state.last_query}] | |
| session_state.next_agent = "slot_booking_agent" | |
| try: | |
| tyre_counts = int(tyre_counts) | |
| except Exception as error: | |
| tyre_counts = None | |
| print(error) | |
| words = ['provide', 'give me ', 'address', 'need', 'want', 'needs', 'wants'] | |
| is_valid_address = True | |
| for word in words: | |
| if word in address.lower(): | |
| is_valid_address = False | |
| break | |
| if (not address or not is_valid_address) and not tyre_counts: | |
| response = "We will need your address and number of tyres to pick up to book a slot. also you can type **CANCLE** to cancle ongoing process of slot scheduling." | |
| session_state.agent_history['slot_booking_agent'].append({"role": "assistant", "content": response}) | |
| return response | |
| elif not address or not is_valid_address: | |
| response = "We will need your address to book a slot. also you can type **CANCLE** to cancle ongoing process of slot scheduling." | |
| session_state.agent_history['slot_booking_agent'].append({"role": "assistant", "content": response}) | |
| return response | |
| elif not tyre_counts: | |
| response = "We will need number of tyres to pick up to book a slot. also you can type **CANCLE** to cancle ongoing process of slot scheduling." | |
| session_state.agent_history['slot_booking_agent'].append({"role": "assistant", "content": response}) | |
| return response | |
| day = random.randint(1, 10) | |
| pickup_time = datetime.now() + timedelta(days=day) | |
| df = pd.read_csv(os.path.join(ROOT_DIR,str(os.getenv('SLOT_DETAILS_PATH')))) | |
| df.loc[len(df)] = [f"SLOT-{len(df)}", address, tyre_counts, pickup_time.strftime('%d-%m-%Y'), "Booked"] | |
| df.to_csv(os.path.join(ROOT_DIR,str(os.getenv('SLOT_DETAILS_PATH'))), index=False) | |
| response = f"Your slot has been booked with slot id ***{df.iloc[-1]['slot_id']}*** our pickup agent will come at ***{pickup_time.strftime('%d-%m-%Y')}*** at your address ***{address}*** to pickup ***{tyre_counts}*** tyres." | |
| session_state.agent_history['slot_booking_agent'] = [] | |
| session_state.next_agent = "general_agent" | |
| return response | |
| except Exception as error: | |
| print(error) | |
| return error | |
| def cancle_slot(inp = None) -> str: | |
| """this function can be used cancle booked slot. | |
| Returns: | |
| string: final responce after slot canclelation. | |
| """ | |
| try: | |
| session_state = session_manager.get_session_state() | |
| session_state.last_tool = "slot-canclelation-tool" | |
| if "slot_canclelation_agent" in session_state.agent_history: | |
| session_state.agent_history['slot_canclelation_agent'].append({"role": "user", "content": session_state.last_query}) | |
| else: | |
| session_state.agent_history['slot_canclelation_agent'] = [{"role": "user", "content": session_state.last_query}] | |
| session_state.next_agent = "slot_canclelation_agent" | |
| slot_id = None | |
| for message in reversed(session_state.agent_history['slot_canclelation_agent']): | |
| if message['role'] == 'user': | |
| match = re.search(SLOT_ID_PATTERN, message['content']) | |
| if match: | |
| slot_id = match.group(0) | |
| break | |
| if slot_id is None: | |
| response = "We will need a valid Slot id to Cancle a slot." | |
| session_state.agent_history['slot_canclelation_agent'].append({"role": "assistant", "content": response}) | |
| return response | |
| df = pd.read_csv(os.path.join(ROOT_DIR,str(os.getenv('SLOT_DETAILS_PATH')))) | |
| if len(df[df['slot_id'] == slot_id]) > 0: | |
| df.loc[df[df['slot_id']==slot_id]['status'].index[0], "status"] = 'Cancled' | |
| df.to_csv(os.path.join(ROOT_DIR,str(os.getenv('SLOT_DETAILS_PATH'))), index=False) | |
| response = f"Your slot with slot id ***{df.iloc[-1]['slot_id']}*** has been cancleled successfully." | |
| session_state.agent_history['slot_canclelation_agent'] = [] | |
| session_state.next_agent = "general_agent" | |
| return response | |
| else: | |
| response = f"We couldn't find any slot with slot id ***{slot_id}***. Please enter valid slot id." | |
| session_state.agent_history['slot_canclelation_agent'].append({"role": "assistant", "content": response}) | |
| return response | |
| except Exception as error: | |
| print(error) | |
| return error | |
| def update_slot(slot_id,address,tyre_counts) -> str: | |
| """this function can be used update booked slot details like address or tyre count. | |
| Args: | |
| slot_id (string): Slot id to cancle a slot. Slot id must be in formate 'SLOT-*', | |
| address (string): Address to pickup the tyres | |
| tyre_counts (int): Number of the tyres to pickup | |
| Returns: | |
| string: final responce after slot updation. | |
| """ | |
| try: | |
| session_state = session_manager.get_session_state() | |
| session_state.last_tool = "slot-update-tool" | |
| if "slot_update_agent" in session_state.agent_history: | |
| session_state.agent_history['slot_update_agent'].append({"role": "user", "content": session_state.last_query}) | |
| else: | |
| session_state.agent_history['slot_update_agent'] = [{"role": "user", "content": session_state.last_query}] | |
| session_state.next_agent = "slot_update_agent" | |
| slot_id = None | |
| for message in reversed(session_state.agent_history['slot_update_agent']): | |
| if message['role'] == 'user': | |
| match = re.search(SLOT_ID_PATTERN, message['content']) | |
| if match: | |
| slot_id = match.group(0) | |
| break | |
| df = pd.read_csv(os.path.join(ROOT_DIR,str(os.getenv('SLOT_DETAILS_PATH')))) | |
| if slot_id is None or len(df[df['slot_id']==slot_id])==0: | |
| response = "We will need a valid Slot id to update a slot." | |
| session_state.agent_history['slot_update_agent'].append({"role": "assistant", "content": response}) | |
| return response | |
| day = random.randint(1, 10) | |
| pickup_time = datetime.now() + timedelta(days=day) | |
| if df.loc[df[df['slot_id']==slot_id]['status'].index[0], "status"] != 'Booked': | |
| response = "You can not update this slot because this slot is already " + str(df.loc[df[df['slot_id']==slot_id]['status'].index[0], "status"])+ "." | |
| session_state.agent_history['slot_update_agent'] = [] | |
| session_state.next_agent = "general_agent" | |
| return response | |
| try: | |
| tyre_counts = int(tyre_counts) | |
| except Exception as error: | |
| tyre_counts = None | |
| print(error) | |
| words = ['provide', 'give me ', 'address', 'need', 'want', 'needs', 'wants'] | |
| is_valid_address = True | |
| for word in words: | |
| if word in address.lower(): | |
| is_valid_address = False | |
| break | |
| if (not address or not is_valid_address) and not tyre_counts: | |
| response = "We will need your address or number of tyres to pick up for update a slot. also you can type **CANCLE** to cancle ongoing process of slot scheduling." | |
| session_state.agent_history['slot_update_agent'].append({"role": "assistant", "content": response}) | |
| return response | |
| if len(df[df['slot_id'] == slot_id]) > 0 : | |
| if address and is_valid_address: | |
| # update address | |
| df.loc[df[df['slot_id']==slot_id]['address'].index[0], "address"] = address | |
| if tyre_counts: | |
| # update tyre count | |
| df.loc[df[df['slot_id']==slot_id]['num_of_tyres'].index[0], "num_of_tyres"] = tyre_counts | |
| df.loc[df[df['slot_id']==slot_id]['pickup_date'].index[0], "pickup_date"] = pickup_time.strftime('%d-%m-%Y') | |
| df.loc[df[df['slot_id']==slot_id]['status'].index[0], "status"] = 'Booked' | |
| df.to_csv(os.path.join(ROOT_DIR,str(os.getenv('SLOT_DETAILS_PATH'))), index=False) | |
| response = f"We have updated your slot with ***{slot_id}***. Now your address is ***{df[df['slot_id']==slot_id]['address'].iloc[0]}***. Number of tyres is ***{df[df['slot_id']==slot_id]['num_of_tyres'].iloc[0]}*** and new pickup time is ***{pickup_time.strftime('%d-%m-%Y')}***" | |
| session_state.agent_history['slot_update_agent'] = [] | |
| session_state.next_agent = "general_agent" | |
| return response | |
| else: | |
| response = f"We couldn't find any slot with slot id ***{slot_id}***. Please enter valid slot id." | |
| session_state.agent_history['slot_update_agent'].append({"role": "assistant", "content": response}) | |
| return response | |
| except Exception as error: | |
| print(error) | |
| return error | |
| def get_slot_details(inp = None) -> str: | |
| """this function can be used to get details of slots. | |
| Returns: | |
| pandas.core.frame.DataFrame: Dataframe which contains user slot details. | |
| """ | |
| try: | |
| session_state = session_manager.get_session_state() | |
| session_state.last_tool = "slot-fetching-tool" | |
| df = pd.read_csv(os.path.join(ROOT_DIR,str(os.getenv('SLOT_DETAILS_PATH')))) | |
| num_of_tyres = df[df['status']=='Booked']['num_of_tyres'].sum() | |
| return {"df": df, "num_of_tyres": num_of_tyres} | |
| except Exception as error: | |
| print(error) | |
| return error | |
| def get_invoice(inp = None) -> str: | |
| """this function can be used to get Invoice. | |
| Returns: | |
| string: final invoice. | |
| """ | |
| try: | |
| session_state = session_manager.get_session_state() | |
| session_state.last_tool = "get-invoice-tool" | |
| if "get_invoice_agent" in session_state.agent_history: | |
| session_state.agent_history['get_invoice_agent'].append({"role": "user", "content": session_state.last_query}) | |
| else: | |
| session_state.agent_history['get_invoice_agent'] = [{"role": "user", "content": session_state.last_query}] | |
| session_state.next_agent = "get_invoice_agent" | |
| invoice_num = None | |
| print(session_state.agent_history['get_invoice_agent']) | |
| for message in reversed(session_state.agent_history['get_invoice_agent']): | |
| if message['role'] == 'user': | |
| print() | |
| print(message['content']) | |
| match = re.search(INVOICE_NUM_PATTERN, message['content']) | |
| print(match) | |
| if match: | |
| invoice_num = match.group(0) | |
| break | |
| if invoice_num is None: | |
| response = "We will need a valid Invoice Number to get Invoice." | |
| session_state.agent_history['get_invoice_agent'].append({"role": "assistant", "content": response}) | |
| return response | |
| with open(os.path.join(ROOT_DIR,str(os.getenv('INVOICE_DETAILS_PATH'))),"r") as fp: | |
| invoice_data = json.load(fp=fp) | |
| if invoice_num in invoice_data: | |
| data = invoice_data[invoice_num] | |
| # Extract the necessary information from the JSON data | |
| date = data["Date"] | |
| time = data["Time"] | |
| company_name = data["Company Name"] | |
| items = data["items"] | |
| # Create the HTML string | |
| part_1 = f""" | |
| <html> | |
| <head> | |
| <style> | |
| .invoice-container {{ | |
| width: 100%; | |
| padding: 20px; | |
| }} | |
| .invoice-header {{ | |
| text-align: center; | |
| margin-bottom: 20px; | |
| }} | |
| .invoice-header h1 {{ | |
| margin: 0; | |
| }} | |
| .invoice-details, .invoice-footer {{ | |
| margin-bottom: 20px; | |
| }} | |
| .invoice-items {{ | |
| display: flex; | |
| flex-direction: column; | |
| }} | |
| .invoice-item {{ | |
| display: grid; | |
| grid-template-columns: 1fr 1fr 1fr 1fr; | |
| align-content: center; | |
| padding: 10px; | |
| border-bottom: 1px solid #ddd; | |
| }} | |
| .item-header {{ | |
| font-weight: bold; | |
| }} | |
| </style> | |
| </head> | |
| <body> | |
| <div class="invoice-container"> | |
| <div class="invoice-header"> | |
| <h1>INVOICE</h1> | |
| <p><strong>Date:</strong> {date}</p> | |
| <p><strong>Time:</strong> {time}</p> | |
| <p><strong>Company Name:</strong> {company_name}</p> | |
| </div> | |
| <div class="invoice-details"> | |
| <div class="invoice-items"> | |
| <div class="invoice-item item-header"> | |
| <div>Category</div> | |
| <div>Quality</div> | |
| <div>Quantity</div> | |
| <div>Price per Unit</div> | |
| </div> | |
| """ | |
| part_2 = "" | |
| for item in items: | |
| category = item["Category"] | |
| quality = item["Qulity"] | |
| quantity = item["Quntity"] | |
| price_per_unit = item["price_per_unit"] | |
| part_2 += ( | |
| f"<div class='invoice-item'>" | |
| f"<div>{category}</div>" | |
| f"<div>{quality}</div>" | |
| f"<div>{quantity}</div>" | |
| f"<div>${price_per_unit}</div>" | |
| f"</div>" | |
| ) | |
| part_3 = """ | |
| </div> | |
| </div> | |
| <div class="invoice-footer"> | |
| <p><strong>Thank you for recycling tyres with us!</strong></p> | |
| </div> | |
| </div> | |
| </body> | |
| </html> | |
| """ | |
| # Display the HTML invoice using Streamlit | |
| session_state.agent_history['get_invoice_agent'] = [] | |
| session_state.next_agent = "general_agent" | |
| return f"""{part_1} | |
| {part_2} | |
| {part_3}""" | |
| else: | |
| response = f"We couldn't find any invoice with invoice number ***{invoice_num}***. Please enter valid invoice number." | |
| session_state.agent_history['get_invoice_agent'].append({"role": "assistant", "content": response}) | |
| return response | |
| except Exception as error: | |
| print(error) | |
| return error | |
| def price_estimation(inp = None) -> str: | |
| """this function can be used to estimate price of the tyre. | |
| Returns: | |
| string: response which contains estimated price of tyre. | |
| """ | |
| try: | |
| session_state = session_manager.get_session_state() | |
| session_state.last_tool = "price-estimation-tool" | |
| if "price_estimation_agent" in session_state.agent_history: | |
| session_state.agent_history['price_estimation_agent'].append({"role": "user", "content": session_state.last_query}) | |
| else: | |
| session_state.agent_history['price_estimation_agent'] = [{"role": "user", "content": session_state.last_query}] | |
| session_state.next_agent = "price_estimation_agent" | |
| is_valid_file = Path("Tyre.png").is_file() | |
| category = None | |
| for message in reversed(session_state.agent_history['price_estimation_agent']): | |
| if message['role'] == 'user': | |
| for word in ['bike', 'car', 'truck']: | |
| if word in message['content'].lower(): | |
| category = word | |
| break | |
| if not is_valid_file and not category: | |
| response = "Please provide valid image of tyre and category of tyre. Category of tyre must be from car, bike or truck. Let me know once you uploaded a image of tyre." | |
| session_state.agent_history['price_estimation_agent'].append({"role": "assistant", "content": response}) | |
| return response | |
| elif not category: | |
| response = "Please provide valid category of tyre. Category of tyre must be from car, bike or truck." | |
| session_state.agent_history['price_estimation_agent'].append({"role": "assistant", "content": response}) | |
| return response | |
| elif not is_valid_file: | |
| response = "Plesase provide a image of tyre and let me know once you upload an image." | |
| session_state.agent_history['price_estimation_agent'].append({"role": "assistant", "content": response}) | |
| return response | |
| tyre_quality = predict_class(os.path.join(ROOT_DIR,str(os.getenv('QULITY_PREDICT_MODEL_PATH'))), 'Tyre.png') | |
| print(tyre_quality) | |
| tyre_price = waste_tyre_price(tyre_quality, category.lower()) | |
| print(tyre_price) | |
| session_state.agent_history['price_estimation_agent'] = [] | |
| session_state.next_agent = "general_agent" | |
| return f"Your tyre quality is {tyre_quality} and estimated price of your tyre is {tyre_price}" | |
| except Exception as error: | |
| print(error) | |
| return error | |
| def cancle_ongoing_process(inp = None) -> str: | |
| """This function can be used to cancle any ongoing process. | |
| Returns: | |
| str: response after canclelation. | |
| """ | |
| session_state = session_manager.get_session_state() | |
| cancle_response = "" | |
| if session_state.next_agent != "general_agent": | |
| session_state.next_agent = "general_agent" | |
| cancle_response = "Your ongoing process got cancleled." | |
| else: | |
| cancle_response = "You don't have any ongoing process to cancle." | |
| session_state.agent_history = {} | |
| return cancle_response | |