import warnings # Suppress Pydantic V1 warning - upstream LangChain issue with Python 3.14 # See: https://github.com/langchain-ai/langchain/issues/33926 warnings.filterwarnings("ignore", message="Core Pydantic V1 functionality") import streamlit as st import streamlit.runtime.scriptrunner_utils.script_run_context as _ctx _original_get_script_run_ctx = _ctx.get_script_run_ctx _ctx.get_script_run_ctx = lambda suppress_warning=True: _original_get_script_run_ctx(suppress_warning=suppress_warning) import json import asyncio import os from dotenv import load_dotenv # Load environment variables from .env file load_dotenv() from app.streamlit_web_scraper_chat import StreamlitWebScraperChat from app.ui_components import display_info_icons, extract_data_from_markdown, format_data from app.utils import loading_animation from src.web_extractor import extract_url, get_website_name from datetime import datetime, timedelta from src.ollama_models import OllamaModel from src.utils.error_handler import ErrorMessages import pandas as pd import base64 from google_auth_oauthlib.flow import Flow from io import BytesIO from src.utils.google_sheets_utils import SCOPES, get_redirect_uri, display_google_sheets_button, initiate_google_auth from src.scrapers.playwright_scraper import ScraperConfig import time import atexit import logging logger = logging.getLogger(__name__) def handle_oauth_callback(): if 'code' in st.query_params: try: flow = Flow.from_client_secrets_file( 'client_secret.json', scopes=SCOPES, redirect_uri=get_redirect_uri() ) flow.fetch_token(code=st.query_params['code']) st.session_state['google_auth_token'] = flow.credentials.to_json() st.success("Successfully authenticated with Google!") st.query_params.clear() except FileNotFoundError: st.error(ErrorMessages.OAUTH_FAILED) logger.error("client_secret.json not found") except Exception as e: st.error(f"{ErrorMessages.OAUTH_FAILED}\n\nDetails: {str(e)}") logger.error(f"OAuth error: {str(e)}") def serialize_bytesio(obj): if isinstance(obj, BytesIO): return { "_type": "BytesIO", "data": base64.b64encode(obj.getvalue()).decode('utf-8') } raise TypeError(f"Object of type {obj.__class__.__name__} is not JSON serializable") def deserialize_bytesio(obj): if isinstance(obj, dict) and "_type" in obj and obj["_type"] == "BytesIO": return BytesIO(base64.b64decode(obj["data"])) return obj def save_chat_history(chat_history): with open("chat_history.json", "w") as f: json.dump(chat_history, f, default=serialize_bytesio) def load_chat_history(): try: with open("chat_history.json", "r") as f: return json.load(f, object_hook=deserialize_bytesio) except FileNotFoundError: return {} def safe_process_message(web_scraper_chat, message, conversation_history=None): if message is None or message.strip() == "": return "I'm sorry, but I didn't receive any input. Could you please try again?" try: progress_placeholder = st.empty() progress_placeholder.text("Initializing scraper...") start_time = time.time() response = web_scraper_chat.process_message(message, conversation_history) end_time = time.time() progress_placeholder.text(f"Scraping completed in {end_time - start_time:.2f} seconds.") # Check for error messages in response if isinstance(response, str) and ("Error:" in response or "Failed to" in response or "is missing" in response): st.error(response) if isinstance(response, tuple): if len(response) == 2 and isinstance(response[1], pd.DataFrame): csv_string, df = response st.dataframe(df) csv_buffer = BytesIO() df.to_csv(csv_buffer, index=False) csv_buffer.seek(0) st.download_button( label="Download CSV", data=csv_buffer, file_name="data.csv", mime="text/csv" ) return csv_string elif len(response) == 2 and isinstance(response[0], BytesIO): excel_buffer, df = response st.dataframe(df) excel_buffer.seek(0) st.download_button( label="Download Excel", data=excel_buffer, file_name="data.xlsx", mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" ) return ("Excel data displayed and available for download.", excel_buffer) elif isinstance(response, pd.DataFrame): st.dataframe(response) csv_buffer = BytesIO() response.to_csv(csv_buffer, index=False) csv_buffer.seek(0) st.download_button( label="Download CSV", data=csv_buffer, file_name="data.csv", mime="text/csv" ) return "DataFrame displayed and available for download as CSV." return response except ValueError as e: # Handle API key errors specifically error_msg = str(e) if "API Key" in error_msg or "missing" in error_msg.lower(): st.error(error_msg) else: st.error(f"{ErrorMessages.SCRAPING_FAILED}\n\nDetails: {error_msg}") logger.error(f"ValueError during processing: {error_msg}") return error_msg except Exception as e: st.error(f"{ErrorMessages.GENERIC_ERROR}\n\nDetails: {str(e)}") logger.error(f"Unexpected error during processing: {str(e)}") return f"{ErrorMessages.GENERIC_ERROR}\n\nDetails: {str(e)}" def get_date_group(date_str): date = datetime.strptime(date_str, "%Y-%m-%d") today = datetime.now().date() if date.date() == today: return "Today" elif date.date() == today - timedelta(days=1): return "Yesterday" elif date.date() > today - timedelta(days=7): return date.strftime("%A") else: return date.strftime("%B %d, %Y") def get_last_url_from_chat(messages): for message in reversed(messages): if message['role'] == 'user': url = extract_url(message['content']) if url: return url return None def initialize_web_scraper_chat(url=None): if st.session_state.selected_model.startswith("ollama:"): model = st.session_state.selected_model else: model = st.session_state.selected_model scraper_config = ScraperConfig( use_current_browser=st.session_state.use_current_browser, headless=not st.session_state.use_current_browser, max_retries=3, delay_after_load=5, debug=True, wait_for='domcontentloaded' ) try: web_scraper_chat = StreamlitWebScraperChat(model_name=model, scraper_config=scraper_config) if url: web_scraper_chat.process_message(url) website_name = get_website_name(url) st.session_state.chat_history[st.session_state.current_chat_id]["name"] = website_name return web_scraper_chat except ValueError as e: # Handle API key errors st.error(str(e)) return None except Exception as e: st.error(f"{ErrorMessages.GENERIC_ERROR}\n\nDetails: {str(e)}") logger.error(f"Error initializing web scraper: {str(e)}") return None async def list_ollama_models(): try: return await OllamaModel.list_models() except Exception as e: logger.warning(f"Error fetching Ollama models: {str(e)}") # Don't show error to user, just return empty list # The warning in the sidebar will guide users return [] def load_css(): with open("app/styles.css", "r") as f: st.markdown(f"", unsafe_allow_html=True) @st.cache_data def get_image_base64(image_path: str) -> str: """Get base64 encoded image with caching to avoid re-encoding on every render.""" with open(image_path, "rb") as image_file: return base64.b64encode(image_file.read()).decode() def check_service_status() -> dict: """Check the status of all services and return a dict with their status.""" status = { "openai": { "name": "OpenAI", "configured": bool(os.getenv("OPENAI_API_KEY")), "env_var": "OPENAI_API_KEY" }, "gemini": { "name": "Gemini", "configured": bool(os.getenv("GOOGLE_API_KEY")), "env_var": "GOOGLE_API_KEY" }, "blablador": { "name": "Blablador", "configured": bool(os.getenv("BLABLADOR_API_KEY")), "env_var": "BLABLADOR_API_KEY" }, "tor": { "name": "Tor", "configured": False, # Will be checked dynamically "env_var": None }, "google_sheets": { "name": "Google Sheets", "configured": os.path.exists("client_secret.json"), "env_var": "client_secret.json" } } # Check Tor status by checking if port 9050 is open import socket def is_tor_running(): try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(1) result = sock.connect_ex(('127.0.0.1', 9050)) sock.close() return result == 0 except Exception: return False status["tor"]["configured"] = is_tor_running() return status def display_service_status(): """Display service status with checkmarks/crosses in the sidebar.""" status = check_service_status() # Inject CSS styles st.markdown(""" """, unsafe_allow_html=True) st.markdown("### Setup Status") for key, info in status.items(): if info["configured"]: icon_html = f'' env_html = "" else: icon_html = f'' if info["env_var"]: env_html = f'({info["env_var"]})' else: env_html = '(Tor not running)' html = f"""
Setup Help:
See README for configuration instructions.
CyberScraper 2077 can make mistakes sometimes. Report any issues to the developers.
""", unsafe_allow_html=True ) if __name__ == "__main__": main()