Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| from transformers import AutoTokenizer, AutoModelForSequenceClassification | |
| import torch | |
| import nltk | |
| import numpy as np | |
| import tflearn | |
| import tensorflow as tf | |
| import random | |
| import json | |
| import pickle | |
| from nltk.tokenize import word_tokenize | |
| from nltk.stem.lancaster import LancasterStemmer | |
| import requests | |
| import csv | |
| import time | |
| import re | |
| from bs4 import BeautifulSoup | |
| import pandas as pd | |
| from selenium import webdriver | |
| from selenium.webdriver.chrome.options import Options | |
| import chromedriver_autoinstaller | |
| import os | |
| # Ensure necessary NLTK resources are downloaded | |
| nltk.download('punkt') | |
| # Initialize the stemmer | |
| stemmer = LancasterStemmer() | |
| # Load intents.json | |
| try: | |
| with open("intents.json") as file: | |
| data = json.load(file) | |
| except FileNotFoundError: | |
| raise FileNotFoundError("Error: 'intents.json' file not found. Ensure it exists in the current directory.") | |
| # Load preprocessed data from pickle | |
| try: | |
| with open("data.pickle", "rb") as f: | |
| words, labels, training, output = pickle.load(f) | |
| except FileNotFoundError: | |
| raise FileNotFoundError("Error: 'data.pickle' file not found. Ensure it exists and matches the model.") | |
| # Build the model structure | |
| net = tflearn.input_data(shape=[None, len(training[0])]) | |
| net = tflearn.fully_connected(net, 8) | |
| net = tflearn.fully_connected(net, 8) | |
| net = tflearn.fully_connected(net, len(output[0]), activation="softmax") | |
| net = tflearn.regression(net) | |
| # Load the trained model | |
| model = tflearn.DNN(net) | |
| try: | |
| model.load("MentalHealthChatBotmodel.tflearn") | |
| except FileNotFoundError: | |
| raise FileNotFoundError("Error: Trained model file 'MentalHealthChatBotmodel.tflearn' not found.") | |
| # Function to process user input into a bag-of-words format | |
| def bag_of_words(s, words): | |
| bag = [0 for _ in range(len(words))] | |
| s_words = word_tokenize(s) | |
| s_words = [stemmer.stem(word.lower()) for word in s_words if word.lower() in words] | |
| for se in s_words: | |
| for i, w in enumerate(words): | |
| if w == se: | |
| bag[i] = 1 | |
| return np.array(bag) | |
| # Chat function | |
| def chat(message, history): | |
| history = history or [] | |
| message = message.lower() | |
| try: | |
| # Predict the tag | |
| results = model.predict([bag_of_words(message, words)]) | |
| results_index = np.argmax(results) | |
| tag = labels[results_index] | |
| # Match tag with intent and choose a random response | |
| for tg in data["intents"]: | |
| if tg['tag'] == tag: | |
| responses = tg['responses'] | |
| response = random.choice(responses) | |
| break | |
| else: | |
| response = "I'm sorry, I didn't understand that. Could you please rephrase?" | |
| except Exception as e: | |
| response = f"An error occurred: {str(e)}" | |
| history.append((message, response)) | |
| return history, history | |
| # Function to send a request to Google Places API and fetch places data | |
| def get_places_data(query, location, radius, api_key, next_page_token=None): | |
| params = { | |
| "query": query, | |
| "location": location, | |
| "radius": radius, | |
| "key": api_key | |
| } | |
| if next_page_token: | |
| params["pagetoken"] = next_page_token | |
| response = requests.get(url, params=params) | |
| if response.status_code == 200: | |
| return response.json() | |
| else: | |
| return None | |
| # Function to fetch detailed information for a specific place using its place_id | |
| def get_place_details(place_id, api_key): | |
| details_url = places_details_url | |
| params = { | |
| "place_id": place_id, | |
| "key": api_key | |
| } | |
| response = requests.get(details_url, params=params) | |
| if response.status_code == 200: | |
| details_data = response.json().get("result", {}) | |
| return { | |
| "opening_hours": details_data.get("opening_hours", {}).get("weekday_text", "Not available"), | |
| "reviews": details_data.get("reviews", "Not available"), | |
| "phone_number": details_data.get("formatted_phone_number", "Not available"), | |
| "website": details_data.get("website", "Not available") | |
| } | |
| else: | |
| return {} | |
| # Scrape website URL from Google Maps results (using Selenium) | |
| def scrape_website_from_google_maps(place_name): | |
| chrome_options = Options() | |
| chrome_options.add_argument("--headless") | |
| chrome_options.add_argument("--no-sandbox") | |
| chrome_options.add_argument("--disable-dev-shm-usage") | |
| driver = webdriver.Chrome(options=chrome_options) | |
| search_url = f"https://www.google.com/maps/search/{place_name.replace(' ', '+')}" | |
| driver.get(search_url) | |
| time.sleep(5) | |
| try: | |
| website_element = driver.find_element_by_xpath('//a[contains(@aria-label, "Visit") and contains(@aria-label, "website")]') | |
| website_url = website_element.get_attribute('href') | |
| except: | |
| website_url = "Not available" | |
| driver.quit() | |
| return website_url | |
| # Scraping the website to extract phone number or email | |
| def scrape_website_for_contact_info(website): | |
| phone_number = "Not available" | |
| email = "Not available" | |
| try: | |
| response = requests.get(website, timeout=5) | |
| soup = BeautifulSoup(response.content, 'html.parser') | |
| phone_match = re.search(r'\(?\+?[0-9]*\)?[0-9_\- \(\)]*', soup.get_text()) | |
| if phone_match: | |
| phone_number = phone_match.group() | |
| email_match = re.search(r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}', soup.get_text()) | |
| if email_match: | |
| email = email_match.group() | |
| except Exception as e: | |
| print(f"Error scraping website {website}: {e}") | |
| return phone_number, email | |
| # Function to fetch all places data including pagination | |
| def get_all_places(query, location, radius, api_key): | |
| all_results = [] | |
| next_page_token = None | |
| while True: | |
| data = get_places_data(query, location, radius, api_key, next_page_token) | |
| if data: | |
| results = data.get('results', []) | |
| if not results: | |
| break | |
| for place in results: | |
| place_id = place.get("place_id") | |
| name = place.get("name") | |
| address = place.get("formatted_address") | |
| rating = place.get("rating", "Not available") | |
| business_status = place.get("business_status", "Not available") | |
| user_ratings_total = place.get("user_ratings_total", "Not available") | |
| website = place.get("website", "Not available") | |
| types = ", ".join(place.get("types", [])) | |
| location = place.get("geometry", {}).get("location", {}) | |
| latitude = location.get("lat", "Not available") | |
| longitude = location.get("lng", "Not available") | |
| details = get_place_details(place_id, api_key) | |
| phone_number = details.get("phone_number", "Not available") | |
| if phone_number == "Not available" and website != "Not available": | |
| phone_number, email = scrape_website_for_contact_info(website) | |
| else: | |
| email = "Not available" | |
| if website == "Not available": | |
| website = scrape_website_from_google_maps(name) | |
| all_results.append([name, address, phone_number, rating, business_status, | |
| user_ratings_total, website, types, latitude, longitude, | |
| details.get("opening_hours", "Not available"), | |
| details.get("reviews", "Not available"), email]) | |
| next_page_token = data.get('next_page_token') | |
| if not next_page_token: | |
| break | |
| time.sleep(2) | |
| else: | |
| break | |
| return all_results | |
| # Function to save results to CSV file | |
| def save_to_csv(data, filename): | |
| with open(filename, mode='w', newline='', encoding='utf-8') as file: | |
| writer = csv.writer(file) | |
| writer.writerow(["Name", "Address", "Phone", "Rating", "Business Status", "User Ratings Total", "Website", "Types", "Latitude", "Longitude", "Opening Hours", "Reviews", "Email"]) | |
| writer.writerows(data) | |
| print(f"Data saved to {filename}") | |
| # Main function to execute script | |
| def main(): | |
| google_places_data = get_all_places(query, location, radius, api_key) | |
| if google_places_data: | |
| save_to_csv(google_places_data, "wellness_professionals_hawaii.csv") | |
| else: | |
| print("No data found.") | |
| # Gradio UI setup | |
| with gr.Blocks() as demo: | |
| # Load pre-trained model and tokenizer | |
| def load_model(): | |
| tokenizer = AutoTokenizer.from_pretrained("j-hartmann/emotion-english-distilroberta-base") | |
| model = AutoModelForSequenceClassification.from_pretrained("j-hartmann/emotion-english-distilroberta-base") | |
| return tokenizer, model | |
| tokenizer, model = load_model() | |
| # Display header | |
| gr.Markdown("# Emotion Detection and Well-Being Suggestions") | |
| # User input for text (emotion detection) | |
| user_input = gr.Textbox(lines=1, label="How are you feeling today?") | |
| emotion_output = gr.Textbox(label="Emotion Detected") | |
| # Model prediction | |
| def predict_emotion(text): | |
| pipe = pipeline("text-classification", model=model, tokenizer=tokenizer) | |
| result = pipe(text) | |
| emotion = result[0]['label'] | |
| return emotion | |
| user_input.change(predict_emotion, inputs=user_input, outputs=emotion_output) | |
| # Chatbot functionality | |
| chatbot = gr.Chatbot(label="Chat") | |
| message_input = gr.Textbox(lines=1, label="Message") | |
| history_state = gr.State([]) | |
| def chat(message, history): | |
| history = history or [] | |
| message = message.lower() | |
| try: | |
| # Predict the tag | |
| results = model.predict([bag_of_words(message, words)]) | |
| results_index = np.argmax(results) | |
| tag = labels[results_index] | |
| # Match tag with intent and choose a random response | |
| for tg in data["intents"]: | |
| if tg['tag'] == tag: | |
| responses = tg['responses'] | |
| response = random.choice(responses) | |
| break | |
| else: | |
| response = "I'm sorry, I didn't understand that. Could you please rephrase?" | |
| except Exception as e: | |
| response = f"An error occurred: {str(e)}" | |
| history.append((message, response)) | |
| return history, history | |
| message_input.submit(chat, inputs=[message_input, history_state], outputs=[chatbot, history_state]) | |
| # Button to fetch wellness professionals data | |
| fetch_button = gr.Button("Fetch Wellness Professionals Data") | |
| data_output = gr.File(label="Download Data") | |
| def fetch_data(): | |
| all_results = get_all_places(query, location, radius, api_key) | |
| if all_results: | |
| df = pd.DataFrame(all_results, columns=["Name", "Address", "Phone", "Rating", "Business Status", "User Ratings Total", "Website", "Types", "Latitude", "Longitude", "Opening Hours", "Reviews", "Email"]) | |
| csv_file = df.to_csv(index=False) | |
| return csv_file | |
| else: | |
| return "No data found." | |
| fetch_button.click(fetch_data, inputs=None, outputs=data_output) | |
| # Launch Gradio interface | |
| demo.launch() |