Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import requests | |
| from bs4 import BeautifulSoup | |
| from openai import OpenAI | |
| from dotenv import load_dotenv | |
| import os | |
| from urllib.parse import urljoin, urlparse | |
| from pymongo import MongoClient | |
| from bson.objectid import ObjectId | |
| import time | |
| import threading | |
| # Load environment variables from .env file | |
| load_dotenv() | |
| # Set up OpenAI client | |
| client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) | |
| # MongoDB setup | |
| MONGO_URI = os.getenv("MONGO_URI") | |
| mongo_client = MongoClient(MONGO_URI) | |
| db = mongo_client.website_summarizer | |
| # User management functions | |
| def add_user(username, password): | |
| if db.users.find_one({"username": username}): | |
| return False | |
| db.users.insert_one({ | |
| "username": username, | |
| "password": password, | |
| "update_interval": 3 * 3600 # Default to 3 hours in seconds | |
| }) | |
| return True | |
| def check_user(username, password): | |
| user = db.users.find_one({"username": username, "password": password}) | |
| return user is not None | |
| def get_user_settings(username): | |
| user = db.users.find_one({"username": username}) | |
| return user.get("update_interval", 3 * 3600) if user else 3 * 3600 | |
| def update_user_settings(username, update_interval): | |
| db.users.update_one( | |
| {"username": username}, | |
| {"$set": {"update_interval": update_interval}} | |
| ) | |
| def summarize_text(text): | |
| try: | |
| response = client.chat.completions.create( | |
| model="gpt-4o-mini", | |
| messages=[ | |
| {"role": "system", "content": "You are a helpful assistant that summarizes text."}, | |
| {"role": "user", "content": f"Summarize the key points of this text in 5-10 bullet points: {text[:4000]}"} | |
| ] | |
| ) | |
| return response.choices[0].message.content | |
| except Exception as e: | |
| return None | |
| def scrape_and_summarize(url): | |
| try: | |
| response = requests.get(url, timeout=10) | |
| response.raise_for_status() | |
| soup = BeautifulSoup(response.text, 'html.parser') | |
| title = soup.title.string if soup.title else "No title found" | |
| for script in soup(["script", "style"]): | |
| script.decompose() | |
| text = soup.get_text(separator=' ', strip=True) | |
| summary = summarize_text(text) | |
| if summary: | |
| return {"title": title, "url": url, "summary": summary, "summarized": True} | |
| else: | |
| return {"title": title, "url": url, "summary": "Summary could not be generated.", "summarized": False} | |
| except Exception as e: | |
| return {"title": "Error", "url": url, "summary": f"Error processing the URL: {str(e)}", "summarized": False} | |
| def scrape_multiple_urls(main_url, limit=10): | |
| try: | |
| response = requests.get(main_url, timeout=10) | |
| response.raise_for_status() | |
| soup = BeautifulSoup(response.text, 'html.parser') | |
| links = soup.find_all('a', href=True) | |
| urls_to_scrape = [] | |
| for link in links: | |
| if len(urls_to_scrape) >= limit: | |
| break | |
| href = link['href'] | |
| full_url = urljoin(main_url, href) | |
| # Only include URLs from the same domain | |
| if urlparse(full_url).netloc == urlparse(main_url).netloc: | |
| urls_to_scrape.append(full_url) | |
| results = [] | |
| for url in urls_to_scrape: | |
| result = scrape_and_summarize(url) | |
| results.append(result) | |
| return results | |
| except Exception as e: | |
| return [{"title": "Error", "url": main_url, "summary": f"Error processing the main URL: {str(e)}", "summarized": False}] | |
| def save_urls(username, main_url): | |
| results = scrape_multiple_urls(main_url) | |
| main_result = results[0] # Assume the first result is the main URL | |
| db.user_urls.insert_one({ | |
| "username": username, | |
| "url": main_result["url"], | |
| "title": main_result["title"], | |
| "summarized": main_result["summarized"], | |
| "summary": main_result["summary"], | |
| "sub_urls": results[1:] # Store other URLs as sub_urls | |
| }) | |
| return len(results) | |
| def get_user_urls(username): | |
| return list(db.user_urls.find({"username": username})) | |
| def delete_url(url_id): | |
| db.user_urls.delete_one({"_id": ObjectId(url_id)}) | |
| def get_feed(username): | |
| return list(db.user_urls.find({"username": username, "summarized": True})) | |
| def summarize_url(url_doc): | |
| result = scrape_and_summarize(url_doc['url']) | |
| db.user_urls.update_one( | |
| {"_id": url_doc['_id']}, | |
| {"$set": { | |
| "title": result["title"], | |
| "summarized": result["summarized"], | |
| "summary": result["summary"] | |
| }} | |
| ) | |
| return result["summarized"] | |
| def summarize_multiple_urls(username, limit=10): | |
| unsummarized_urls = list(db.user_urls.find({"username": username, "summarized": False}).limit(limit)) | |
| summarized_count = 0 | |
| error_count = 0 | |
| results = [] | |
| for url_doc in unsummarized_urls: | |
| result = scrape_and_summarize(url_doc['url']) | |
| result['url'] = url_doc['url'] # Add the URL to the result | |
| db.user_urls.update_one( | |
| {"_id": url_doc['_id']}, | |
| {"$set": { | |
| "title": result["title"], | |
| "summarized": result["summarized"], | |
| "summary": result["summary"], | |
| "url": url_doc['url'] | |
| }} | |
| ) | |
| if result["summarized"]: | |
| summarized_count += 1 | |
| results.append(result) | |
| else: | |
| error_count += 1 | |
| return summarized_count, error_count, results | |
| # Add this function to update summaries periodically | |
| def update_summaries(username, interval): | |
| while True: | |
| time.sleep(interval) | |
| with st.spinner("Updating summaries..."): | |
| summarized_count, error_count, _ = summarize_multiple_urls(username) | |
| st.success(f"Updated {summarized_count} summaries. {error_count} errors occurred.") | |
| st.set_page_config(layout="wide", page_title="Website Links Summarizer") | |
| # Session state | |
| if 'logged_in' not in st.session_state: | |
| st.session_state.logged_in = False | |
| st.session_state.username = None | |
| # Initialize session state variables | |
| if 'update_interval' not in st.session_state: | |
| st.session_state.update_interval = 3 * 3600 # Default to 3 hours in seconds | |
| # Login/Signup sidebar | |
| with st.sidebar: | |
| st.title("User Authentication") | |
| if not st.session_state.logged_in: | |
| choice = st.radio("Login/Signup", ["Login", "Sign Up"]) | |
| username = st.text_input("Username") | |
| password = st.text_input("Password", type="password") | |
| if choice == "Sign Up": | |
| if st.button("Sign Up"): | |
| if username and password: | |
| if add_user(username, password): | |
| st.success("Account created successfully!") | |
| else: | |
| st.error("Username already exists.") | |
| else: | |
| st.error("Please enter both username and password.") | |
| else: | |
| if st.button("Login"): | |
| if check_user(username, password): | |
| st.session_state.logged_in = True | |
| st.session_state.username = username | |
| st.session_state.update_interval = get_user_settings(username) | |
| st.success("Logged in successfully!") | |
| st.rerun() | |
| else: | |
| st.error("Invalid username or password.") | |
| else: | |
| st.write(f"Logged in as {st.session_state.username}") | |
| if st.button("Logout"): | |
| st.session_state.logged_in = False | |
| st.session_state.username = None | |
| st.rerun() | |
| # Main app | |
| if st.session_state.logged_in: | |
| tab1, tab2, tab3 = st.tabs(["Dashboard", "Feed", "Settings"]) | |
| with tab1: | |
| st.header("URL Dashboard") | |
| col1, col2 = st.columns([3, 1]) | |
| with col1: | |
| # Add new URL | |
| new_url = st.text_input("Enter a new URL to save:") | |
| if st.button("Add URL"): | |
| if new_url: | |
| with st.spinner("Adding URL and generating summaries..."): | |
| num_added = save_urls(st.session_state.username, new_url) | |
| st.success(f"Added main URL and found {num_added - 1} related URLs successfully!") | |
| st.rerun() | |
| else: | |
| st.warning("Please enter a valid URL.") | |
| with col2: | |
| # Summarize 10 Links button | |
| if st.button("Summarize 10 Links"): | |
| with st.spinner("Summarizing up to 10 unsummarized URLs..."): | |
| summarized_count, error_count, results = summarize_multiple_urls(st.session_state.username) | |
| st.success(f"Summarized {summarized_count} URL(s)!") | |
| if error_count > 0: | |
| st.warning(f"Failed to summarize {error_count} URL(s).") | |
| # Display the summarized links | |
| if results: | |
| st.subheader("Recently Summarized URLs") | |
| for result in results: | |
| with st.expander(result["title"]): | |
| st.markdown(f"**URL:** [{result['url']}]({result['url']})") | |
| st.write("**Summary:**") | |
| st.markdown(result["summary"]) | |
| st.rerun() | |
| # Display saved URLs | |
| st.subheader("Your Saved URLs") | |
| user_urls = get_user_urls(st.session_state.username) | |
| for url in user_urls: | |
| with st.expander(url.get("title", url["url"])): | |
| st.markdown(f"**URL:** [{url['url']}]({url['url']})") | |
| st.write(f"**Status:** {'Summarized' if url.get('summarized', False) else 'Not summarized'}") | |
| st.write("**Summary:**") | |
| summary = url.get('summary', 'No summary available.') | |
| st.markdown(summary) | |
| # Display sub-urls if available | |
| if 'sub_urls' in url and url['sub_urls']: | |
| st.write("**Related URLs:**") | |
| for sub_url in url['sub_urls']: | |
| st.markdown(f"- [{sub_url['url']}]({sub_url['url']})") | |
| if sub_url.get('summarized', False): | |
| st.markdown(f" Summary: {sub_url.get('summary', 'No summary available.')}") | |
| col1, col2 = st.columns([3, 1]) | |
| with col1: | |
| if st.button("Delete", key=f"delete_{url['_id']}"): | |
| delete_url(url['_id']) | |
| st.rerun() | |
| with col2: | |
| if not url.get('summarized', False): | |
| if st.button("Summarize", key=f"summarize_{url['_id']}"): | |
| with st.spinner("Summarizing..."): | |
| if summarize_url(url): | |
| st.success("Summary generated!") | |
| else: | |
| st.error("Failed to generate summary.") | |
| st.rerun() | |
| with tab2: | |
| st.header("Your Feed") | |
| feed_items = get_feed(st.session_state.username) | |
| for item in feed_items: | |
| with st.expander(item.get("title", item["url"])): | |
| st.markdown(f"**URL:** [{item['url']}]({item['url']})") | |
| summary = item.get('summary', 'No summary available.') | |
| st.markdown(summary) | |
| # Display related URLs if available | |
| if 'sub_urls' in item and item['sub_urls']: | |
| st.write("**Related URLs:**") | |
| for sub_url in item['sub_urls']: | |
| st.markdown(f"- [{sub_url['url']}]({sub_url['url']})") | |
| if sub_url.get('summarized', False): | |
| st.markdown(f" Summary: {sub_url.get('summary', 'No summary available.')}") | |
| if not feed_items: | |
| st.info("Your feed is empty. Add some URLs in the Dashboard to see their summaries here!") | |
| with tab3: | |
| st.header("Settings") | |
| update_interval_hours = st.number_input("Update interval (hours)", min_value=1, value=st.session_state.update_interval // 3600, step=1) | |
| if st.button("Save Settings"): | |
| new_interval = update_interval_hours * 3600 # Convert hours to seconds | |
| st.session_state.update_interval = new_interval | |
| update_user_settings(st.session_state.username, new_interval) | |
| st.success("Settings saved!") | |
| if 'update_thread' not in st.session_state: | |
| st.session_state.update_thread = threading.Thread( | |
| target=update_summaries, | |
| args=(st.session_state.username, st.session_state.update_interval), | |
| daemon=True | |
| ) | |
| st.session_state.update_thread.start() | |
| else: | |
| st.warning("Please log in to use the Website Links Summarizer.") | |
| # Add this at the end of the script | |
| if st.session_state.get('update_thread'): | |
| st.session_state.update_thread.join(timeout=0) |