Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import pandas as pd | |
| import torch | |
| import base64 | |
| from io import BytesIO | |
| from gtts import gTTS | |
| from sentence_transformers import SentenceTransformer, util | |
| from transformers import AutoModelForSeq2SeqLM, AutoTokenizer | |
| import datetime # Logging | |
| import json # Chat history | |
| from textblob import TextBlob # Sentiment analysis | |
| from deep_translator import GoogleTranslator # Language translation | |
| import speech_recognition as sr # Voice recognition | |
| from streamlit_webrtc import webrtc_streamer, WebRtcMode, RTCConfiguration # Video calling | |
| from reportlab.lib.pagesizes import letter | |
| from reportlab.pdfgen import canvas | |
| from PyPDF2 import PdfReader | |
| import docx | |
| # Load dataset | |
| def load_dataset(): | |
| df = pd.read_csv("Chatbot.csv") | |
| questions = df[df["name"] == "User"]["line"].tolist() | |
| answers = df[df["name"] == "ECO"]["line"].tolist() | |
| return questions, answers | |
| questions, answers = load_dataset() | |
| # Load models | |
| def load_models(): | |
| embedding_model = SentenceTransformer("all-MiniLM-L6-v2") | |
| chatbot_model_name = "facebook/blenderbot-400M-distill" | |
| chatbot_model = AutoModelForSeq2SeqLM.from_pretrained(chatbot_model_name) | |
| chatbot_tokenizer = AutoTokenizer.from_pretrained(chatbot_model_name) | |
| return embedding_model, chatbot_model, chatbot_tokenizer | |
| embedding_model, chatbot_model, chatbot_tokenizer = load_models() | |
| # Generate embeddings for dataset questions | |
| def generate_question_embeddings(): | |
| return embedding_model.encode(questions, convert_to_tensor=True) | |
| question_embeddings = generate_question_embeddings() | |
| # Initialize translator | |
| translator = GoogleTranslator(source="auto", target="en") | |
| # Video Call Configuration | |
| RTC_CONFIG = RTCConfiguration({"iceServers": [{"urls": ["stun:stun.l.google.com:19302"]}]}) | |
| # Initialize video call session state | |
| if "video_call_active" not in st.session_state: | |
| st.session_state.video_call_active = False | |
| # Streamlit UI | |
| st.title("π€ AI Chatbot with File Upload & Video Calling π") | |
| # πΉ **Video Call Feature** | |
| st.subheader("πΉ Video Call") | |
| if st.button("π Start Video Call"): | |
| st.session_state.video_call_active = True | |
| if st.button("β End Video Call"): | |
| st.session_state.video_call_active = False | |
| if st.session_state.video_call_active: | |
| webrtc_streamer(key="video-chat", mode=WebRtcMode.SENDRECV, rtc_configuration=RTC_CONFIG) | |
| # π **File Upload Feature** | |
| uploaded_file = st.file_uploader("π Upload a document for Q&A", type=["txt", "pdf", "docx"]) | |
| if uploaded_file: | |
| extracted_text = None | |
| file_extension = uploaded_file.name.split(".")[-1].lower() | |
| if file_extension == "txt": | |
| extracted_text = uploaded_file.getvalue().decode("utf-8") | |
| elif file_extension == "pdf": | |
| reader = PdfReader(uploaded_file) | |
| extracted_text = "\n".join([page.extract_text() for page in reader.pages if page.extract_text()]) | |
| elif file_extension == "docx": | |
| doc = docx.Document(uploaded_file) | |
| extracted_text = "\n".join([para.text for para in doc.paragraphs]) | |
| if extracted_text: | |
| st.subheader("π Extracted File Content:") | |
| st.text_area("File Content", extracted_text, height=200) | |
| else: | |
| st.warning("Unsupported file format.") | |
| # π‘ **Suggested Questions** | |
| st.subheader("π‘ Suggested Questions:") | |
| suggested_questions = ["What is AI?", "Tell me a joke!", "How does machine learning work?"] | |
| cols = st.columns(len(suggested_questions)) | |
| user_input = None | |
| for i, q in enumerate(suggested_questions): | |
| if cols[i].button(q): | |
| user_input = q | |
| # π€ **Voice Input** | |
| st.subheader("π€ Speak instead of typing!") | |
| if st.button("ποΈ Use Voice Input"): | |
| recognizer = sr.Recognizer() | |
| with sr.Microphone() as source: | |
| st.write("π€ Listening... Speak now!") | |
| audio = recognizer.listen(source) | |
| try: | |
| user_input = recognizer.recognize_google(audio) | |
| except sr.UnknownValueError: | |
| user_input = "Sorry, I couldn't understand that." | |
| except sr.RequestError: | |
| user_input = "Speech recognition service error." | |
| # βοΈ **Text Input** | |
| if user_input is None: | |
| user_input = st.chat_input("Type your message here...") | |
| # ποΈ **Clear Chat Button** | |
| if st.button("ποΈ Clear Chat"): | |
| st.session_state.messages = [] | |
| st.rerun() | |
| # π **Chat Processing** | |
| if "messages" not in st.session_state: | |
| st.session_state.messages = [] | |
| if user_input: | |
| translated_text = translator.translate(user_input) | |
| if translated_text != user_input: | |
| user_input = translated_text | |
| input_embedding = embedding_model.encode(user_input, convert_to_tensor=True) | |
| similarities = util.pytorch_cos_sim(input_embedding, question_embeddings)[0].cpu() | |
| best_match_idx = torch.argmax(similarities).item() | |
| best_match_score = similarities[best_match_idx].item() | |
| if best_match_score > 0.7: | |
| response = answers[best_match_idx] | |
| else: | |
| inputs = chatbot_tokenizer(user_input, return_tensors="pt") | |
| outputs = chatbot_model.generate(**inputs) | |
| response = chatbot_tokenizer.decode(outputs[0], skip_special_tokens=True) | |
| sentiment = TextBlob(user_input).sentiment.polarity | |
| sentiment_result = "π Positive" if sentiment > 0 else "π Negative" if sentiment < 0 else "π Neutral" | |
| st.session_state.messages.append({"role": "user", "content": user_input}) | |
| st.session_state.messages.append({"role": "assistant", "content": response}) | |
| tts = gTTS(text=response, lang="en") | |
| audio_file = BytesIO() | |
| tts.write_to_fp(audio_file) | |
| audio_file.seek(0) | |
| with st.chat_message("assistant"): | |
| st.write(f"{response}\n\n**Sentiment Analysis:** {sentiment_result}") | |
| st.audio(audio_file, format="audio/mp3") | |
| # π₯ **Download Chat as PDF** | |
| buffer = BytesIO() | |
| c = canvas.Canvas(buffer, pagesize=letter) | |
| width, height = letter | |
| y_position = height - 40 | |
| c.setFont("Helvetica-Bold", 14) | |
| c.drawString(30, y_position, "Chat History") | |
| y_position -= 20 | |
| c.setFont("Helvetica", 10) | |
| for message in st.session_state.messages: | |
| role = "User: " if message["role"] == "user" else "Bot: " | |
| text = role + message["content"] | |
| for line in text.split("\n"): | |
| if y_position < 40: | |
| c.showPage() | |
| c.setFont("Helvetica", 10) | |
| y_position = height - 40 | |
| c.drawString(30, y_position, line) | |
| y_position -= 15 | |
| c.save() | |
| buffer.seek(0) | |
| st.download_button("π₯ Download Chat as PDF", buffer, "chat_history.pdf", "application/pdf") | |