Spaces:
Sleeping
Sleeping
| # Using Gemini via LangChain. | |
| import os | |
| import time | |
| import requests | |
| import cloudscraper | |
| import streamlit as st | |
| from bs4 import BeautifulSoup | |
| from dotenv import load_dotenv | |
| from selenium import webdriver | |
| from urllib.parse import urljoin, urlparse | |
| from langchain_core.prompts import ChatPromptTemplate | |
| from selenium.webdriver.chrome.options import Options | |
| from selenium.webdriver.chrome.service import Service | |
| from webdriver_manager.chrome import ChromeDriverManager | |
| from langchain_google_genai import ( | |
| ChatGoogleGenerativeAI, | |
| ) # CHANGED: use Gemini integration | |
| # Load API Key | |
| load_dotenv() | |
| # CHANGED: read Gemini key instead of Groq | |
| GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") or os.getenv("GOOGLE_API_KEY") | |
| if not GEMINI_API_KEY: | |
| st.error( | |
| "Error: Gemini API Key is missing. Please set 'GEMINI_API_KEY' or 'GOOGLE_API_KEY' as an environment variable." | |
| ) | |
| # CHANGED: initialize Gemini chat model | |
| chat = ChatGoogleGenerativeAI( | |
| temperature=0, google_api_key=GEMINI_API_KEY, model="gemini-2.0-flash" | |
| ) | |
| # Model Token Limits | |
| MODEL_TOKEN_LIMIT = 1_048_576 | |
| CHUNK_SIZE = 32_000 | |
| # Initialize Cloudscraper | |
| scraper = cloudscraper.create_scraper() | |
| # Headers to mimic real browser requests | |
| HEADERS = { | |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", | |
| "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8", | |
| "Connection": "keep-alive", | |
| "Upgrade-Insecure-Requests": "1", | |
| } | |
| # β **Extract Links** | |
| def get_valid_links(base_url): | |
| """Extracts all internal links, including footer and JavaScript-rendered links.""" | |
| try: | |
| response = scraper.get(base_url, headers=HEADERS, timeout=10) | |
| response.raise_for_status() | |
| soup = BeautifulSoup(response.text, "html.parser") | |
| domain = urlparse(base_url).netloc | |
| links = set() | |
| for link in soup.find_all("a", href=True): | |
| full_url = urljoin(base_url, link.get("href")) | |
| if domain in urlparse(full_url).netloc: | |
| links.add(full_url) | |
| # If few links are found, fallback to Selenium | |
| if len(links) < 5 or not check_footer_links(soup): | |
| selenium_links = get_links_with_selenium(base_url) | |
| links.update(selenium_links) | |
| return links | |
| except requests.exceptions.RequestException as e: | |
| print(f"β Error fetching links: {e}") | |
| return set() | |
| def check_footer_links(soup): | |
| """Checks if footer links exist.""" | |
| footer = soup.find("footer") | |
| return footer and footer.find_all("a", href=True) | |
| def get_links_with_selenium(url): | |
| """Extracts JavaScript-rendered links using Selenium.""" | |
| try: | |
| options = Options() | |
| options.add_argument("--headless") | |
| options.add_argument("--disable-gpu") | |
| options.add_argument("--no-sandbox") | |
| driver = webdriver.Chrome( | |
| service=Service(ChromeDriverManager().install()), options=options | |
| ) | |
| driver.get(url) | |
| time.sleep(5) # Allow JavaScript to load | |
| soup = BeautifulSoup(driver.page_source, "html.parser") | |
| driver.quit() | |
| links = set() | |
| domain = urlparse(url).netloc | |
| for link in soup.find_all("a", href=True): | |
| full_url = urljoin(url, link.get("href")) | |
| if domain in urlparse(full_url).netloc: | |
| links.add(full_url) | |
| return links | |
| except Exception as e: | |
| print(f"β Selenium Error: {e}") | |
| return set() | |
| # β **Scrape Pages** | |
| def scrape_page(url): | |
| """Scrapes a webpage, using Selenium if necessary.""" | |
| try: | |
| response = scraper.get(url, headers=HEADERS, timeout=10) | |
| response.raise_for_status() | |
| soup = BeautifulSoup(response.text, "html.parser") | |
| if len(soup.get_text(strip=True)) < 500: | |
| return scrape_with_selenium(url) | |
| return extract_text(soup) | |
| except requests.exceptions.RequestException: | |
| return scrape_with_selenium(url) | |
| def scrape_with_selenium(url): | |
| """Scrapes JavaScript-heavy pages using Selenium.""" | |
| try: | |
| options = Options() | |
| options.add_argument("--headless") | |
| options.add_argument("--disable-gpu") | |
| options.add_argument("--no-sandbox") | |
| driver = webdriver.Chrome( | |
| service=Service(ChromeDriverManager().install()), options=options | |
| ) | |
| driver.get(url) | |
| time.sleep(5) # Allow JavaScript to load | |
| soup = BeautifulSoup(driver.page_source, "html.parser") | |
| driver.quit() | |
| return extract_text(soup) | |
| except Exception as e: | |
| return f"β Selenium Scraping Error: {e}" | |
| def extract_text(soup): | |
| """Extracts **all** meaningful text from HTML content, including dynamic elements.""" | |
| # β Extracts all text from the HTML, not just specific tags | |
| all_text = soup.get_text(separator="\n", strip=True) | |
| # β Removes duplicate lines & unwanted spaces | |
| unique_lines = set(all_text.split("\n")) | |
| cleaned_text = "\n".join( | |
| line for line in unique_lines if len(line) > 3 | |
| ) # Exclude tiny fragments | |
| return cleaned_text | |
| # β **Chunking for Large AI Requests** | |
| def split_into_chunks(text, chunk_size): | |
| """Splits long content into manageable chunks for AI processing.""" | |
| words = text.split() | |
| chunks = [] | |
| current_chunk = [] | |
| current_length = 0 | |
| for word in words: | |
| if current_length + len(word) + 1 > chunk_size: | |
| chunks.append(" ".join(current_chunk)) | |
| current_chunk = [] | |
| current_length = 0 | |
| current_chunk.append(word) | |
| current_length += len(word) + 1 | |
| if current_chunk: | |
| chunks.append(" ".join(current_chunk)) | |
| return chunks | |
| # β **AI-Powered Company Breakdown** | |
| def generate_detailed_company_info(company_data): | |
| """Generates an in-depth company breakdown with AI.""" | |
| system_prompt = """ | |
| You are a business research AI. Provide **detailed** insights strictly from the extracted company data. | |
| - Do **not** infer missing details. | |
| - If data is missing, label it as **"Data Not Available"**. | |
| """ | |
| user_prompt_template = f""" | |
| Based on the extracted content, **generate a structured company analysis**: | |
| ## **Company Overview** | |
| - Full company name, industry, and key differentiators. | |
| - Headquarters location & founding year (if available). | |
| ## **Mission & Vision** | |
| - Clearly state the company's mission and vision. | |
| - If missing, state **"Data Not Available"**. | |
| ## **Products & Services** | |
| - List major products/services and their benefits. | |
| ## **Target Audience** | |
| - Define customer demographics or industries served. | |
| ## **Business Model & Revenue Streams** | |
| - Describe revenue model (e.g., SaaS, B2B, freemium). | |
| ## **Competitive Edge & Market Position** | |
| - Highlight unique features, patents, and innovations. | |
| ## **Clients & Industry Impact** | |
| - Notable clients, case studies, or market influence. | |
| **Extracted Data:** | |
| {company_data} | |
| """ | |
| responses = [] | |
| if len(company_data) > CHUNK_SIZE: | |
| st.warning("π Large content detected! Splitting into multiple AI requests.") | |
| chunks = split_into_chunks(company_data, CHUNK_SIZE) | |
| for i, chunk in enumerate(chunks): | |
| st.write(f"Processing AI Response {i + 1}/{len(chunks)}...") | |
| prompt = ChatPromptTemplate.from_messages( | |
| [("system", system_prompt), ("human", user_prompt_template)] | |
| ) | |
| chain = prompt | chat | |
| response = chain.invoke( | |
| {"text": user_prompt_template.format(company_data=chunk)} | |
| ) | |
| responses.append(response.content) | |
| return "\n\n".join(responses) | |
| else: | |
| prompt = ChatPromptTemplate.from_messages( | |
| [("system", system_prompt), ("human", user_prompt_template)] | |
| ) | |
| chain = prompt | chat | |
| response = chain.invoke({"text": user_prompt_template}) | |
| return response.content | |
| # β **Streamlit UI** | |
| def main(): | |
| st.title("π AI-Powered Company Website Scraper") | |
| base_url = st.text_input("π Enter Website URL", "") | |
| if st.button("Scrape"): | |
| if base_url: | |
| st.write(f"π Scraping: {base_url}... Please wait.") | |
| valid_links = get_valid_links(base_url) | |
| if valid_links: | |
| scraped_content = {link: scrape_page(link) for link in valid_links} | |
| full_content = "\n".join(scraped_content.values()) | |
| detailed_info = generate_detailed_company_info(full_content) | |
| st.write(detailed_info) | |
| # Make the Scraped Content collapsible | |
| with st.expander( | |
| "π **View Scraped Content** (Click to Expand)", expanded=False | |
| ): | |
| for url, content in scraped_content.items(): | |
| st.write(f"### {url}") | |
| st.write(content) | |
| if __name__ == "__main__": | |
| main() | |