import warnings # Suppress Pydantic V1 warning - upstream LangChain issue with Python 3.14 # See: https://github.com/langchain-ai/langchain/issues/33926 warnings.filterwarnings("ignore", message="Core Pydantic V1 functionality") import streamlit as st import streamlit.runtime.scriptrunner_utils.script_run_context as _ctx _original_get_script_run_ctx = _ctx.get_script_run_ctx _ctx.get_script_run_ctx = lambda suppress_warning=True: _original_get_script_run_ctx(suppress_warning=suppress_warning) import json import asyncio import os from dotenv import load_dotenv # Load environment variables from .env file load_dotenv() from app.streamlit_web_scraper_chat import StreamlitWebScraperChat from app.ui_components import display_info_icons, extract_data_from_markdown, format_data from app.utils import loading_animation from src.web_extractor import extract_url, get_website_name from datetime import datetime, timedelta from src.ollama_models import OllamaModel from src.utils.error_handler import ErrorMessages import pandas as pd import base64 from google_auth_oauthlib.flow import Flow from io import BytesIO from src.utils.google_sheets_utils import SCOPES, get_redirect_uri, display_google_sheets_button, initiate_google_auth from src.scrapers.playwright_scraper import ScraperConfig import time import atexit import logging logger = logging.getLogger(__name__) def handle_oauth_callback(): if 'code' in st.query_params: try: flow = Flow.from_client_secrets_file( 'client_secret.json', scopes=SCOPES, redirect_uri=get_redirect_uri() ) flow.fetch_token(code=st.query_params['code']) st.session_state['google_auth_token'] = flow.credentials.to_json() st.success("Successfully authenticated with Google!") st.query_params.clear() except FileNotFoundError: st.error(ErrorMessages.OAUTH_FAILED) logger.error("client_secret.json not found") except Exception as e: st.error(f"{ErrorMessages.OAUTH_FAILED}\n\nDetails: {str(e)}") logger.error(f"OAuth error: {str(e)}") def serialize_bytesio(obj): if isinstance(obj, BytesIO): return { "_type": "BytesIO", "data": base64.b64encode(obj.getvalue()).decode('utf-8') } raise TypeError(f"Object of type {obj.__class__.__name__} is not JSON serializable") def deserialize_bytesio(obj): if isinstance(obj, dict) and "_type" in obj and obj["_type"] == "BytesIO": return BytesIO(base64.b64decode(obj["data"])) return obj def save_chat_history(chat_history): with open("chat_history.json", "w") as f: json.dump(chat_history, f, default=serialize_bytesio) def load_chat_history(): try: with open("chat_history.json", "r") as f: return json.load(f, object_hook=deserialize_bytesio) except FileNotFoundError: return {} def safe_process_message(web_scraper_chat, message, conversation_history=None): if message is None or message.strip() == "": return "I'm sorry, but I didn't receive any input. Could you please try again?" try: progress_placeholder = st.empty() progress_placeholder.text("Initializing scraper...") start_time = time.time() response = web_scraper_chat.process_message(message, conversation_history) end_time = time.time() progress_placeholder.text(f"Scraping completed in {end_time - start_time:.2f} seconds.") # Check for error messages in response if isinstance(response, str) and ("Error:" in response or "Failed to" in response or "is missing" in response): st.error(response) if isinstance(response, tuple): if len(response) == 2 and isinstance(response[1], pd.DataFrame): csv_string, df = response st.dataframe(df) csv_buffer = BytesIO() df.to_csv(csv_buffer, index=False) csv_buffer.seek(0) st.download_button( label="Download CSV", data=csv_buffer, file_name="data.csv", mime="text/csv" ) return csv_string elif len(response) == 2 and isinstance(response[0], BytesIO): excel_buffer, df = response st.dataframe(df) excel_buffer.seek(0) st.download_button( label="Download Excel", data=excel_buffer, file_name="data.xlsx", mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" ) return ("Excel data displayed and available for download.", excel_buffer) elif isinstance(response, pd.DataFrame): st.dataframe(response) csv_buffer = BytesIO() response.to_csv(csv_buffer, index=False) csv_buffer.seek(0) st.download_button( label="Download CSV", data=csv_buffer, file_name="data.csv", mime="text/csv" ) return "DataFrame displayed and available for download as CSV." return response except ValueError as e: # Handle API key errors specifically error_msg = str(e) if "API Key" in error_msg or "missing" in error_msg.lower(): st.error(error_msg) else: st.error(f"{ErrorMessages.SCRAPING_FAILED}\n\nDetails: {error_msg}") logger.error(f"ValueError during processing: {error_msg}") return error_msg except Exception as e: st.error(f"{ErrorMessages.GENERIC_ERROR}\n\nDetails: {str(e)}") logger.error(f"Unexpected error during processing: {str(e)}") return f"{ErrorMessages.GENERIC_ERROR}\n\nDetails: {str(e)}" def get_date_group(date_str): date = datetime.strptime(date_str, "%Y-%m-%d") today = datetime.now().date() if date.date() == today: return "Today" elif date.date() == today - timedelta(days=1): return "Yesterday" elif date.date() > today - timedelta(days=7): return date.strftime("%A") else: return date.strftime("%B %d, %Y") def get_last_url_from_chat(messages): for message in reversed(messages): if message['role'] == 'user': url = extract_url(message['content']) if url: return url return None def initialize_web_scraper_chat(url=None): if st.session_state.selected_model.startswith("ollama:"): model = st.session_state.selected_model else: model = st.session_state.selected_model scraper_config = ScraperConfig( use_current_browser=st.session_state.use_current_browser, headless=not st.session_state.use_current_browser, max_retries=3, delay_after_load=5, debug=True, wait_for='domcontentloaded' ) try: web_scraper_chat = StreamlitWebScraperChat(model_name=model, scraper_config=scraper_config) if url: web_scraper_chat.process_message(url) website_name = get_website_name(url) st.session_state.chat_history[st.session_state.current_chat_id]["name"] = website_name return web_scraper_chat except ValueError as e: # Handle API key errors st.error(str(e)) return None except Exception as e: st.error(f"{ErrorMessages.GENERIC_ERROR}\n\nDetails: {str(e)}") logger.error(f"Error initializing web scraper: {str(e)}") return None async def list_ollama_models(): try: return await OllamaModel.list_models() except Exception as e: logger.warning(f"Error fetching Ollama models: {str(e)}") # Don't show error to user, just return empty list # The warning in the sidebar will guide users return [] def load_css(): with open("app/styles.css", "r") as f: st.markdown(f"", unsafe_allow_html=True) @st.cache_data def get_image_base64(image_path: str) -> str: """Get base64 encoded image with caching to avoid re-encoding on every render.""" with open(image_path, "rb") as image_file: return base64.b64encode(image_file.read()).decode() def check_service_status() -> dict: """Check the status of all services and return a dict with their status.""" status = { "openai": { "name": "OpenAI", "configured": bool(os.getenv("OPENAI_API_KEY")), "env_var": "OPENAI_API_KEY" }, "gemini": { "name": "Gemini", "configured": bool(os.getenv("GOOGLE_API_KEY")), "env_var": "GOOGLE_API_KEY" }, "blablador": { "name": "Blablador", "configured": bool(os.getenv("BLABLADOR_API_KEY")), "env_var": "BLABLADOR_API_KEY" }, "tor": { "name": "Tor", "configured": False, # Will be checked dynamically "env_var": None }, "google_sheets": { "name": "Google Sheets", "configured": os.path.exists("client_secret.json"), "env_var": "client_secret.json" } } # Check Tor status by checking if port 9050 is open import socket def is_tor_running(): try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(1) result = sock.connect_ex(('127.0.0.1', 9050)) sock.close() return result == 0 except Exception: return False status["tor"]["configured"] = is_tor_running() return status def display_service_status(): """Display service status with checkmarks/crosses in the sidebar.""" status = check_service_status() # Inject CSS styles st.markdown(""" """, unsafe_allow_html=True) st.markdown("### Setup Status") for key, info in status.items(): if info["configured"]: icon_html = f'' env_html = "" else: icon_html = f'' if info["env_var"]: env_html = f'({info["env_var"]})' else: env_html = '(Tor not running)' html = f"""
{icon_html} {info["name"]} {env_html}
""" st.markdown(html, unsafe_allow_html=True) # Show setup help if any service is missing missing_services = [key for key, info in status.items() if not info["configured"]] if missing_services: st.markdown("---") st.markdown("""

Setup Help:
See README for configuration instructions.

""", unsafe_allow_html=True) def render_message(role, content, avatar_path): message_class = "user-message" if role == "user" else "assistant-message" avatar_base64 = get_image_base64(avatar_path) return f"""
{role} avatar
{content}
""" def display_message_with_sheets_upload(message, message_index): content = message["content"] if isinstance(content, (str, bytes, BytesIO)): data = extract_data_from_markdown(content) if data is not None: try: is_excel = isinstance(data, BytesIO) or (isinstance(content, str) and 'excel' in content.lower()) if is_excel: df = format_data(data, 'excel') else: df = format_data(data, 'csv') if df is not None: st.dataframe(df) if not is_excel: csv_buffer = BytesIO() df.to_csv(csv_buffer, index=False) csv_buffer.seek(0) st.download_button( label="📥 Download as CSV", data=csv_buffer, file_name="data.csv", mime="text/csv", key=f"csv_download_{message_index}" ) else: excel_buffer = BytesIO() with pd.ExcelWriter(excel_buffer, engine='xlsxwriter') as writer: df.to_excel(writer, index=False, sheet_name='Sheet1') excel_buffer.seek(0) st.download_button( label="📥 Download as Excel", data=excel_buffer, file_name="data.xlsx", mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", key=f"excel_download_{message_index}" ) display_google_sheets_button(df, f"sheets_upload_{message_index}") else: st.warning("Failed to display data as a table. Showing raw content:") st.code(content) except Exception as e: st.error(f"Error processing data: {str(e)}") st.code(content) else: st.markdown(content) else: st.markdown(str(content)) def cleanup(): """Clean up resources on exit.""" try: if 'web_scraper_chat' in st.session_state and st.session_state.web_scraper_chat: del st.session_state.web_scraper_chat except Exception: pass # Ignore errors during cleanup atexit.register(cleanup) def main(): st.set_page_config( page_title="CyberScraper 2077", page_icon="app/icons/radiation.png", layout="wide" ) load_css() handle_oauth_callback() # avatar paths user_avatar_path = "app/icons/man.png" ai_avatar_path = "app/icons/skull.png" if 'chat_history' not in st.session_state: st.session_state.chat_history = load_chat_history() if 'current_chat_id' not in st.session_state or st.session_state.current_chat_id not in st.session_state.chat_history: if st.session_state.chat_history: st.session_state.current_chat_id = next(iter(st.session_state.chat_history)) else: new_chat_id = str(datetime.now().timestamp()) st.session_state.chat_history[new_chat_id] = { "messages": [], "date": datetime.now().strftime("%Y-%m-%d") } st.session_state.current_chat_id = new_chat_id save_chat_history(st.session_state.chat_history) if 'selected_model' not in st.session_state: st.session_state.selected_model = "alias-fast" if 'web_scraper_chat' not in st.session_state: st.session_state.web_scraper_chat = None with st.sidebar: st.title("CyberScraper-2077") # Model selection st.subheader("Select Model") default_models = ["alias-fast", "alias-large", "gpt-4o-mini", "gemini-1.5-flash"] ollama_models = st.session_state.get('ollama_models', []) all_models = default_models + [f"ollama:{model}" for model in ollama_models] selected_model = st.selectbox("Choose a model", all_models, index=all_models.index(st.session_state.selected_model) if st.session_state.selected_model in all_models else 0) if selected_model != st.session_state.selected_model: st.session_state.selected_model = selected_model st.session_state.web_scraper_chat = None st.rerun() # Display service status with checkmarks/crosses display_service_status() st.markdown("---") st.session_state.use_current_browser = st.checkbox("Use Current Browser (No Docker)", value=False, help="Works Natively, Doesn't Work with Docker. if a website is blocking your browser, you can use this option to use the current browser instead of opening a new one.") if st.button("Refresh Ollama Models"): with st.spinner("Fetching Ollama models..."): st.session_state.ollama_models = asyncio.run(list_ollama_models()) st.success(f"Found {len(st.session_state.ollama_models)} Ollama models") st.rerun() if st.button("+ 🗨️ New Chat", key="new_chat", use_container_width=True): new_chat_id = str(datetime.now().timestamp()) st.session_state.chat_history[new_chat_id] = { "messages": [], "date": datetime.now().strftime("%Y-%m-%d"), "name": "🗨️ New Chat" } st.session_state.current_chat_id = new_chat_id st.session_state.web_scraper_chat = None save_chat_history(st.session_state.chat_history) st.rerun() grouped_chats = {} for chat_id, chat_data in st.session_state.chat_history.items(): date_group = get_date_group(chat_data['date']) if date_group not in grouped_chats: grouped_chats[date_group] = [] grouped_chats[date_group].append((chat_id, chat_data)) for date_group, chats in grouped_chats.items(): st.markdown(f"
{date_group}
", unsafe_allow_html=True) for chat_id, chat_data in chats: button_label = chat_data.get('name', "🗨️ Unnamed Chat") col1, col2 = st.columns([0.78, 0.22]) with col1: if st.button(button_label, key=f"history_{chat_id}", use_container_width=True): st.session_state.current_chat_id = chat_id messages = chat_data['messages'] last_url = get_last_url_from_chat(messages) if last_url and not st.session_state.web_scraper_chat: st.session_state.web_scraper_chat = initialize_web_scraper_chat(last_url) st.rerun() with col2: if st.button("🗑️", key=f"delete_{chat_id}"): del st.session_state.chat_history[chat_id] save_chat_history(st.session_state.chat_history) if st.session_state.current_chat_id == chat_id: if st.session_state.chat_history: st.session_state.current_chat_id = next(iter(st.session_state.chat_history)) else: st.session_state.current_chat_id = None st.session_state.web_scraper_chat = None st.rerun() st.markdown( """

CyberScraper 2077

""", unsafe_allow_html=True ) display_info_icons() if st.session_state.current_chat_id not in st.session_state.chat_history: if st.session_state.chat_history: st.session_state.current_chat_id = next(iter(st.session_state.chat_history)) else: new_chat_id = str(datetime.now().timestamp()) st.session_state.chat_history[new_chat_id] = { "messages": [], "date": datetime.now().strftime("%Y-%m-%d") } st.session_state.current_chat_id = new_chat_id save_chat_history(st.session_state.chat_history) chat_container = st.container() with chat_container: st.markdown('
', unsafe_allow_html=True) for index, message in enumerate(st.session_state.chat_history[st.session_state.current_chat_id]["messages"]): if message["role"] == "user": st.markdown(render_message("user", message["content"], user_avatar_path), unsafe_allow_html=True) else: with st.container(): st.markdown(render_message("assistant", "", ai_avatar_path), unsafe_allow_html=True) display_message_with_sheets_upload(message, index) st.markdown('
', unsafe_allow_html=True) prompt = st.chat_input("Enter the URL to scrape or ask a question regarding the data", key="user_input") if prompt: st.session_state.chat_history[st.session_state.current_chat_id]["messages"].append({"role": "user", "content": prompt}) if not st.session_state.web_scraper_chat: st.session_state.web_scraper_chat = initialize_web_scraper_chat() url = extract_url(prompt) if url: website_name = get_website_name(url) st.session_state.chat_history[st.session_state.current_chat_id]["name"] = website_name with st.chat_message("assistant"): try: # Get current chat messages for conversation context chat_messages = st.session_state.chat_history[st.session_state.current_chat_id]["messages"] full_response = loading_animation( safe_process_message, st.session_state.web_scraper_chat, prompt, chat_messages ) if isinstance(full_response, str) and not full_response.startswith("Error:"): st.success("Scraping completed successfully!") if full_response is not None: if isinstance(full_response, tuple) and len(full_response) == 2 and isinstance(full_response[1], BytesIO): st.session_state.chat_history[st.session_state.current_chat_id]["messages"].append({"role": "assistant", "content": full_response[0]}) else: st.session_state.chat_history[st.session_state.current_chat_id]["messages"].append({"role": "assistant", "content": full_response}) save_chat_history(st.session_state.chat_history) except Exception as e: st.error(f"An unexpected error occurred: {str(e)}") save_chat_history(st.session_state.chat_history) st.rerun() st.markdown( """

CyberScraper 2077 can make mistakes sometimes. Report any issues to the developers.

""", unsafe_allow_html=True ) if __name__ == "__main__": main()