import streamlit as st import os from bs4 import BeautifulSoup import urllib3 import pandas as pd import tempfile from groq import Groq # Initialize Groq client client = Groq(api_key=os.environ.get('GroqApi')) # Initialize session state for scraped data if "scraped_data" not in st.session_state: st.session_state.scraped_data = [] def scrape_web_data(url, scrape_option): """Scrape data from the given URL based on the scrape option.""" try: # Create a PoolManager with urllib3 to handle SSL http = urllib3.PoolManager() # Send an HTTP request response = http.request('GET', url) # Check if the request was successful (status code 200) if response.status == 200: # Parse the HTML content of the page soup = BeautifulSoup(response.data, 'html.parser') # Prepare the output data if scrape_option == 'data': all_text = soup.get_text() return [{'Data': line.strip()} for line in all_text.split('\n') if line.strip()] elif scrape_option == 'links': links = soup.find_all('a') return [{'Links': link.get('href')} for link in links if link.get('href')] else: st.write(f"Error: {response.status}") except Exception as e: st.write(f"An error occurred: {e}") return [] def process_query_with_groq(query, data): """Process the user's query with Groq based on the scraped data.""" if not data: return "No data available to process. Please scrape data first." try: # Combine the scraped data into a single text block combined_text = "\n".join([str(item) for sublist in data for item in sublist.values()]) # Add the query context prompt = f"Context: {combined_text}\n\nUser Query: {query}\nAnswer:" # Call Groq API response = client.chat.completions.create( messages=[ {"role": "user", "content": prompt} ], model="llama3-8b-8192", ) return response.choices[0].message.content except Exception as e: return f"Error processing query with Groq: {e}" # Streamlit UI st.title("Web Scraping and Query Tool") # Step 1: Scraping st.subheader("Step 1: Scrape Data") website_url = st.text_input("Enter the URL to scrape:") scrape_option = st.selectbox("Select what to scrape:", ['data', 'links']) if st.button("Scrape Data"): scraped_data = scrape_web_data(website_url, scrape_option) if scraped_data: st.session_state.scraped_data = scraped_data st.success(f"Scraping completed. {len(scraped_data)} items found.") # Save data to a temporary CSV file df = pd.DataFrame(scraped_data) csv_file = tempfile.NamedTemporaryFile(delete=False, suffix=".csv") df.to_csv(csv_file.name, index=False) # Provide a download button for the CSV file st.download_button( label="Download Scraped Data as CSV", data=open(csv_file.name, "rb").read(), file_name="scraped_data.csv", mime="text/csv", ) else: st.warning("No data found. Please check the URL or scrape option.") # Step 2: Querying st.subheader("Step 2: Ask a Query") user_query = st.text_input("Enter your query:") if st.button("Get Answer"): if user_query.strip() == "": st.warning("Please enter a valid query.") else: answer = process_query_with_groq(user_query, st.session_state.scraped_data) st.write("**Answer:**") st.write(answer)