import streamlit as st import requests from bs4 import BeautifulSoup from openai import OpenAI from dotenv import load_dotenv import os from urllib.parse import urljoin, urlparse from pymongo import MongoClient from bson.objectid import ObjectId import time import threading # Load environment variables from .env file load_dotenv() # Set up OpenAI client client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) # MongoDB setup MONGO_URI = os.getenv("MONGO_URI") mongo_client = MongoClient(MONGO_URI) db = mongo_client.website_summarizer # User management functions def add_user(username, password): if db.users.find_one({"username": username}): return False db.users.insert_one({ "username": username, "password": password, "update_interval": 3 * 3600 # Default to 3 hours in seconds }) return True def check_user(username, password): user = db.users.find_one({"username": username, "password": password}) return user is not None def get_user_settings(username): user = db.users.find_one({"username": username}) return user.get("update_interval", 3 * 3600) if user else 3 * 3600 def update_user_settings(username, update_interval): db.users.update_one( {"username": username}, {"$set": {"update_interval": update_interval}} ) def summarize_text(text): try: response = client.chat.completions.create( model="gpt-4o-mini", messages=[ {"role": "system", "content": "You are a helpful assistant that summarizes text."}, {"role": "user", "content": f"Summarize the key points of this text in 5-10 bullet points: {text[:4000]}"} ] ) return response.choices[0].message.content except Exception as e: return None def scrape_and_summarize(url): try: response = requests.get(url, timeout=10) response.raise_for_status() soup = BeautifulSoup(response.text, 'html.parser') title = soup.title.string if soup.title else "No title found" for script in soup(["script", "style"]): script.decompose() text = soup.get_text(separator=' ', strip=True) summary = summarize_text(text) if summary: return {"title": title, "url": url, "summary": summary, "summarized": True} else: return {"title": title, "url": url, "summary": "Summary could not be generated.", "summarized": False} except Exception as e: return {"title": "Error", "url": url, "summary": f"Error processing the URL: {str(e)}", "summarized": False} def scrape_multiple_urls(main_url, limit=10): try: response = requests.get(main_url, timeout=10) response.raise_for_status() soup = BeautifulSoup(response.text, 'html.parser') links = soup.find_all('a', href=True) urls_to_scrape = [] for link in links: if len(urls_to_scrape) >= limit: break href = link['href'] full_url = urljoin(main_url, href) # Only include URLs from the same domain if urlparse(full_url).netloc == urlparse(main_url).netloc: urls_to_scrape.append(full_url) results = [] for url in urls_to_scrape: result = scrape_and_summarize(url) results.append(result) return results except Exception as e: return [{"title": "Error", "url": main_url, "summary": f"Error processing the main URL: {str(e)}", "summarized": False}] def save_urls(username, main_url): results = scrape_multiple_urls(main_url) main_result = results[0] # Assume the first result is the main URL db.user_urls.insert_one({ "username": username, "url": main_result["url"], "title": main_result["title"], "summarized": main_result["summarized"], "summary": main_result["summary"], "sub_urls": results[1:] # Store other URLs as sub_urls }) return len(results) def get_user_urls(username): return list(db.user_urls.find({"username": username})) def delete_url(url_id): db.user_urls.delete_one({"_id": ObjectId(url_id)}) def get_feed(username): return list(db.user_urls.find({"username": username, "summarized": True})) def summarize_url(url_doc): result = scrape_and_summarize(url_doc['url']) db.user_urls.update_one( {"_id": url_doc['_id']}, {"$set": { "title": result["title"], "summarized": result["summarized"], "summary": result["summary"] }} ) return result["summarized"] def summarize_multiple_urls(username, limit=10): unsummarized_urls = list(db.user_urls.find({"username": username, "summarized": False}).limit(limit)) summarized_count = 0 error_count = 0 results = [] for url_doc in unsummarized_urls: result = scrape_and_summarize(url_doc['url']) result['url'] = url_doc['url'] # Add the URL to the result db.user_urls.update_one( {"_id": url_doc['_id']}, {"$set": { "title": result["title"], "summarized": result["summarized"], "summary": result["summary"], "url": url_doc['url'] }} ) if result["summarized"]: summarized_count += 1 results.append(result) else: error_count += 1 return summarized_count, error_count, results # Add this function to update summaries periodically def update_summaries(username, interval): while True: time.sleep(interval) with st.spinner("Updating summaries..."): summarized_count, error_count, _ = summarize_multiple_urls(username) st.success(f"Updated {summarized_count} summaries. {error_count} errors occurred.") st.set_page_config(layout="wide", page_title="Website Links Summarizer") # Session state if 'logged_in' not in st.session_state: st.session_state.logged_in = False st.session_state.username = None # Initialize session state variables if 'update_interval' not in st.session_state: st.session_state.update_interval = 3 * 3600 # Default to 3 hours in seconds # Login/Signup sidebar with st.sidebar: st.title("User Authentication") if not st.session_state.logged_in: choice = st.radio("Login/Signup", ["Login", "Sign Up"]) username = st.text_input("Username") password = st.text_input("Password", type="password") if choice == "Sign Up": if st.button("Sign Up"): if username and password: if add_user(username, password): st.success("Account created successfully!") else: st.error("Username already exists.") else: st.error("Please enter both username and password.") else: if st.button("Login"): if check_user(username, password): st.session_state.logged_in = True st.session_state.username = username st.session_state.update_interval = get_user_settings(username) st.success("Logged in successfully!") st.rerun() else: st.error("Invalid username or password.") else: st.write(f"Logged in as {st.session_state.username}") if st.button("Logout"): st.session_state.logged_in = False st.session_state.username = None st.rerun() # Main app if st.session_state.logged_in: tab1, tab2, tab3 = st.tabs(["Dashboard", "Feed", "Settings"]) with tab1: st.header("URL Dashboard") col1, col2 = st.columns([3, 1]) with col1: # Add new URL new_url = st.text_input("Enter a new URL to save:") if st.button("Add URL"): if new_url: with st.spinner("Adding URL and generating summaries..."): num_added = save_urls(st.session_state.username, new_url) st.success(f"Added main URL and found {num_added - 1} related URLs successfully!") st.rerun() else: st.warning("Please enter a valid URL.") with col2: # Summarize 10 Links button if st.button("Summarize 10 Links"): with st.spinner("Summarizing up to 10 unsummarized URLs..."): summarized_count, error_count, results = summarize_multiple_urls(st.session_state.username) st.success(f"Summarized {summarized_count} URL(s)!") if error_count > 0: st.warning(f"Failed to summarize {error_count} URL(s).") # Display the summarized links if results: st.subheader("Recently Summarized URLs") for result in results: with st.expander(result["title"]): st.markdown(f"**URL:** [{result['url']}]({result['url']})") st.write("**Summary:**") st.markdown(result["summary"]) st.rerun() # Display saved URLs st.subheader("Your Saved URLs") user_urls = get_user_urls(st.session_state.username) for url in user_urls: with st.expander(url.get("title", url["url"])): st.markdown(f"**URL:** [{url['url']}]({url['url']})") st.write(f"**Status:** {'Summarized' if url.get('summarized', False) else 'Not summarized'}") st.write("**Summary:**") summary = url.get('summary', 'No summary available.') st.markdown(summary) # Display sub-urls if available if 'sub_urls' in url and url['sub_urls']: st.write("**Related URLs:**") for sub_url in url['sub_urls']: st.markdown(f"- [{sub_url['url']}]({sub_url['url']})") if sub_url.get('summarized', False): st.markdown(f" Summary: {sub_url.get('summary', 'No summary available.')}") col1, col2 = st.columns([3, 1]) with col1: if st.button("Delete", key=f"delete_{url['_id']}"): delete_url(url['_id']) st.rerun() with col2: if not url.get('summarized', False): if st.button("Summarize", key=f"summarize_{url['_id']}"): with st.spinner("Summarizing..."): if summarize_url(url): st.success("Summary generated!") else: st.error("Failed to generate summary.") st.rerun() with tab2: st.header("Your Feed") feed_items = get_feed(st.session_state.username) for item in feed_items: with st.expander(item.get("title", item["url"])): st.markdown(f"**URL:** [{item['url']}]({item['url']})") summary = item.get('summary', 'No summary available.') st.markdown(summary) # Display related URLs if available if 'sub_urls' in item and item['sub_urls']: st.write("**Related URLs:**") for sub_url in item['sub_urls']: st.markdown(f"- [{sub_url['url']}]({sub_url['url']})") if sub_url.get('summarized', False): st.markdown(f" Summary: {sub_url.get('summary', 'No summary available.')}") if not feed_items: st.info("Your feed is empty. Add some URLs in the Dashboard to see their summaries here!") with tab3: st.header("Settings") update_interval_hours = st.number_input("Update interval (hours)", min_value=1, value=st.session_state.update_interval // 3600, step=1) if st.button("Save Settings"): new_interval = update_interval_hours * 3600 # Convert hours to seconds st.session_state.update_interval = new_interval update_user_settings(st.session_state.username, new_interval) st.success("Settings saved!") if 'update_thread' not in st.session_state: st.session_state.update_thread = threading.Thread( target=update_summaries, args=(st.session_state.username, st.session_state.update_interval), daemon=True ) st.session_state.update_thread.start() else: st.warning("Please log in to use the Website Links Summarizer.") # Add this at the end of the script if st.session_state.get('update_thread'): st.session_state.update_thread.join(timeout=0)