WebScraper / app.py
Engineer786's picture
Upload 2 files
0a3a0c3 verified
import streamlit as st
import os
from bs4 import BeautifulSoup
import urllib3
import pandas as pd
import tempfile
from groq import Groq
# Initialize Groq client
client = Groq(api_key=os.environ.get('GroqApi'))
# Initialize session state for scraped data
if "scraped_data" not in st.session_state:
st.session_state.scraped_data = []
def scrape_web_data(url, scrape_option):
"""Scrape data from the given URL based on the scrape option."""
try:
# Create a PoolManager with urllib3 to handle SSL
http = urllib3.PoolManager()
# Send an HTTP request
response = http.request('GET', url)
# Check if the request was successful (status code 200)
if response.status == 200:
# Parse the HTML content of the page
soup = BeautifulSoup(response.data, 'html.parser')
# Prepare the output data
if scrape_option == 'data':
all_text = soup.get_text()
return [{'Data': line.strip()} for line in all_text.split('\n') if line.strip()]
elif scrape_option == 'links':
links = soup.find_all('a')
return [{'Links': link.get('href')} for link in links if link.get('href')]
else:
st.write(f"Error: {response.status}")
except Exception as e:
st.write(f"An error occurred: {e}")
return []
def process_query_with_groq(query, data):
"""Process the user's query with Groq based on the scraped data."""
if not data:
return "No data available to process. Please scrape data first."
try:
# Combine the scraped data into a single text block
combined_text = "\n".join([str(item) for sublist in data for item in sublist.values()])
# Add the query context
prompt = f"Context: {combined_text}\n\nUser Query: {query}\nAnswer:"
# Call Groq API
response = client.chat.completions.create(
messages=[
{"role": "user", "content": prompt}
],
model="llama3-8b-8192",
)
return response.choices[0].message.content
except Exception as e:
return f"Error processing query with Groq: {e}"
# Streamlit UI
st.title("Web Scraping and Query Tool")
# Step 1: Scraping
st.subheader("Step 1: Scrape Data")
website_url = st.text_input("Enter the URL to scrape:")
scrape_option = st.selectbox("Select what to scrape:", ['data', 'links'])
if st.button("Scrape Data"):
scraped_data = scrape_web_data(website_url, scrape_option)
if scraped_data:
st.session_state.scraped_data = scraped_data
st.success(f"Scraping completed. {len(scraped_data)} items found.")
# Save data to a temporary CSV file
df = pd.DataFrame(scraped_data)
csv_file = tempfile.NamedTemporaryFile(delete=False, suffix=".csv")
df.to_csv(csv_file.name, index=False)
# Provide a download button for the CSV file
st.download_button(
label="Download Scraped Data as CSV",
data=open(csv_file.name, "rb").read(),
file_name="scraped_data.csv",
mime="text/csv",
)
else:
st.warning("No data found. Please check the URL or scrape option.")
# Step 2: Querying
st.subheader("Step 2: Ask a Query")
user_query = st.text_input("Enter your query:")
if st.button("Get Answer"):
if user_query.strip() == "":
st.warning("Please enter a valid query.")
else:
answer = process_query_with_groq(user_query, st.session_state.scraped_data)
st.write("**Answer:**")
st.write(answer)