WAPDA_ChatBot / ChatBOt.py
RaheelShah's picture
Upload 8 files
08bbb48 verified
import requests
import streamlit as st
from PIL import Image, ImageOps, ImageDraw
from bs4 import BeautifulSoup
from gtts import gTTS
from langchain.prompts import (
ChatPromptTemplate,
HumanMessagePromptTemplate,
MessagesPlaceholder,
SystemMessagePromptTemplate,
)
from langchain.schema.output_parser import StrOutputParser
from langchain_community.chat_message_histories import StreamlitChatMessageHistory
from langchain_core.runnables.history import RunnableWithMessageHistory
from langchain_google_genai import ChatGoogleGenerativeAI
from streamlit_mic_recorder import speech_to_text
# Set the page configuration
st.set_page_config(page_title="AI Voice Assistant", page_icon="🤖")
# Function for cropping images into a circular shape
def crop_circle(image, crop_size):
img = ImageOps.fit(image, crop_size, centering=(0.5, 0.2))
mask = Image.new("L", crop_size, 0)
mask_draw = ImageDraw.Draw(mask)
mask_draw.ellipse((0, 0, crop_size[0], crop_size[1]), fill=255)
img.putalpha(mask)
return img
# Function for "About Team" section
def about_team():
st.title("About Team")
st.write("Welcome to the About Team page!")
# Load reference image for cropping size
reference_image_path = "Tulaib.png"
try:
reference_image = Image.open(reference_image_path)
crop_size = reference_image.size
except FileNotFoundError:
st.error(f"Reference image {reference_image_path} not found!")
return
team_members = [
{"image": "WakeelAhmad.png", "name": "Wakeel Ahmed", "discipline": "Electrical Engineer"},
{"image": "Raheel.png", "name": "Raheel Naveed", "discipline": "Mechatronics Engineer"},
{"image": "Tulaib.png", "name": "Muhammad Tulaib", "discipline": "Civil Engineer"},
{"image": "NaveedShahnawaz.png", "name": "Naveed Shahnawaz", "discipline": "Electrical Engineer"},
{"image": "Sania.png", "name": "Sania", "discipline": "Biomedical Engineer"},
{"image": "sanapic.png", "name": "Sana Tariq", "discipline": "Electronic Engineer"},
]
for i in range(0, len(team_members), 3):
cols = st.columns(3)
for col, member in zip(cols, team_members[i:i + 3]):
with col:
try:
img = Image.open(member["image"])
circular_img = crop_circle(img, crop_size)
st.image(circular_img, use_container_width=True)
except FileNotFoundError:
st.error(f"Image not found: {member['image']}")
st.subheader(member["name"])
st.write(f"{member['discipline']}")
# Function for chatbot
def chatbot():
st.title("AI Voice Assistant 🎙️")
st.subheader("Interact in Urdu with Real-Time Voice Input")
api_key = "AIzaSyBsOwOK77hDyCAmr3Ce25E3F3rGh7okPb8"
prompt = ChatPromptTemplate(
messages=[
SystemMessagePromptTemplate.from_template(
"You are a helpful AI assistant for WAPDA Services In Pakistan called WAPDA CHATBOT. Please always respond to user queries in Pure Urdu language. And answer to questions regarding the WAPDA to help users, but always keep the responses short, help users to give tips about electricity"),
MessagesPlaceholder(variable_name="chat_history"),
HumanMessagePromptTemplate.from_template("{question}"),
]
)
msgs = StreamlitChatMessageHistory(key="langchain_messages")
model = ChatGoogleGenerativeAI(model="gemini-1.5-flash", google_api_key=api_key)
chain = prompt | model | StrOutputParser()
chain_with_history = RunnableWithMessageHistory(
chain, lambda session_id: msgs, input_messages_key="question", history_messages_key="chat_history"
)
st.write("Press the button and start speaking in Urdu:")
with st.spinner("Converting Speech To Text..."):
text = speech_to_text(language="ur", use_container_width=True, just_once=True, key="STT")
if text:
st.chat_message("human").write(text)
with st.chat_message("assistant"):
message_placeholder = st.empty()
full_response = ""
response = chain_with_history.stream({"question": text}, {"configurable": {"session_id": "any"}})
for res in response:
full_response += res or ""
message_placeholder.markdown(full_response + "|")
message_placeholder.markdown(full_response)
with st.spinner("Converting Text To Speech..."):
tts = gTTS(text=full_response, lang="ur")
tts.save("output.mp3")
st.audio("output.mp3")
else:
st.warning("Please press the button and start speaking.")
# Function to fetch and parse bill details
def determine_url(num):
base_url = {"14": "iescobill", "12": "gepcobill", "11": "lescobill", "15": "mepcobill", "26": "pescobill"}
return f"https://bill.pitc.com.pk/{base_url.get(num[2:4])}/general?refno={num}" if len(
num) == 14 and num.isdigit() and num[2:4] in base_url else None
import requests
from bs4 import BeautifulSoup
import re
import streamlit as st
# Define the mapping of company codes to their corresponding URLs
company_codes = {
"1": "lesco",
"2": "gepco",
"3": "fesco",
"4": "iesco",
"5": "mepco",
}
# Function to fetch and parse the HTML content
def fetch_and_parse(url):
try:
response = requests.get(url)
response.raise_for_status() # Check if the request was successful
soup = BeautifulSoup(response.text, "html.parser")
return soup
except requests.exceptions.RequestException as e:
st.error(f"Error fetching the URL: {e}")
return None
# Function to extract key-value pairs from the table
# Function to extract key-value pairs from the table
def extract_key_value_pairs(soup):
print("Extracting key-value pairs from the soup object...")
if soup:
table = soup.find("table", style="text-align: center; width: 100%; border-collapse: collapse;")
if not table:
print("Table not found!")
return None
rows = table.find_all("tr")
header_cells = rows[0].find_all("td")
headers = [header.get_text(strip=True) for header in header_cells]
data_cells = rows[1].find_all("td")
values = [cell.get_text(strip=True).replace("\n", " ").strip() for cell in data_cells]
key_value_pairs = {
headers[0]: values[0],
headers[1]: values[1],
headers[2]: values[2],
headers[3]: headers[4],
values[3]: values[4]
}
print(f"Extracted key-value pairs: {key_value_pairs}")
return key_value_pairs
else:
print("Soup object is None. Couldn't parse the HTML.")
return None
def refine_dictionary(key_value_pairs):
print("Refining the key-value pairs...")
refined_dict = {}
for key, value in key_value_pairs.items():
refined_dict[key] = re.sub(r'\s+', ' ', value.strip())
print(f"Refined key-value pairs: {refined_dict}")
return refined_dict
# Function to generate a contextual paragraph from bill information
# Function to generate a contextual paragraph from bill information
def generate_context_paragraph(refined_key_value_pairs):
context_paragraph = (
f"The user has provided the following billing information:\n"
f"- The billing month for the user is: {refined_key_value_pairs['BILL MONTH']}.\n"
f"- The due date for the current bill is: {refined_key_value_pairs['DUE DATE']}.\n"
f"- The reference number associated with this bill is: {refined_key_value_pairs['REFERENCE NO']}.\n"
f"- The amount payable within the due date is: {refined_key_value_pairs['PAYABLE WITHIN DUE DATE']}.\n"
f"- If payment is delayed, the amount payable after the due date will be: {refined_key_value_pairs['PAYABLE AFTER DUE DATE']}.\n"
)
return context_paragraph
# Function to fetch bill details based on the reference number
def fetch_bill_details(reference_number):
if len(reference_number) < 4:
st.error("Invalid reference number. It must be at least 4 digits long.")
return None, None, None
fourth_digit = reference_number[3]
if fourth_digit in company_codes:
company_code = company_codes[fourth_digit]
url = f"https://bill.pitc.com.pk/{company_code}bill/general?refno={reference_number}"
soup = fetch_and_parse(url)
if soup:
key_value_pairs = extract_key_value_pairs(soup)
if key_value_pairs:
refined_key_value_pairs = refine_dictionary(key_value_pairs)
# Extract the correct payable amount within the due date
payable_within_due_date = refined_key_value_pairs.get('PAYABLE WITHIN DUE DATE', 'N/A')
if payable_within_due_date == 'PAYABLE AFTER DUE DATE':
refined_key_value_pairs['PAYABLE WITHIN DUE DATE'] = refined_key_value_pairs.get('PAYABLE AFTER DUE DATE', 'N/A')
context_paragraph = generate_context_paragraph(refined_key_value_pairs)
return refined_key_value_pairs, context_paragraph
return None, None
# Main function
def main():
st.sidebar.image("WAPDA_logo.png", width=100)
st.sidebar.markdown("Made with ❤️ by Flash Team")
app_selection = st.sidebar.radio("Go to", ["Welcome","About Team", "WAPDA Bill Checker", "Chat Bot"])
if app_selection == "Welcome":
st.markdown(
"<h1 style='font-size: 50px; font-weight: bold;'>Welcome to the WAPDA BillBOT</h1>",
unsafe_allow_html=True
)
st.markdown(
"""
<p style='font-size: 18px;'>
The WAPDA BillBOT is your AI-powered assistant for managing electricity billing information with ease.
Whether you need to fetch bill details, interact using voice commands in Urdu, or learn more about our team,
this application is here to provide you with a seamless experience. Explore the features from the menu on the left,
and let us assist you in simplifying your billing tasks.
</p>
""",
unsafe_allow_html=True
)
elif app_selection == "About Team":
about_team()
elif app_selection == "WAPDA Bill Checker":
st.title("WAPDA Bill Checker")
reference_number = st.text_input("Enter your bill reference number:")
if st.button("Fetch Bill Details"):
refined_data, context = fetch_bill_details(reference_number)
if refined_data:
st.success("Bill details fetched successfully!")
st.write(context)
else:
st.error("Failed to fetch bill details.")
elif app_selection == "Chat Bot":
chatbot()
if __name__ == "__main__":
main()