| import os |
| import re |
| import requests |
| from urllib.parse import urlparse, urljoin |
| from bs4 import BeautifulSoup |
| import html2text |
| from typing import Optional, Dict, Tuple, List |
| import time |
|
|
| from tavily import TavilyClient |
| from config import TAVILY_API_KEY |
|
|
| |
| tavily_client = None |
| if TAVILY_API_KEY: |
| try: |
| tavily_client = TavilyClient(api_key=TAVILY_API_KEY) |
| print("[WebUtils] Tavily client initialized successfully") |
| except Exception as e: |
| print(f"[WebUtils] Failed to initialize Tavily client: {e}") |
| tavily_client = None |
| else: |
| print("[WebUtils] Tavily API key not found - web search will be unavailable") |
|
|
| class WebContentExtractor: |
| """Handles web content extraction and processing""" |
| |
| @staticmethod |
| def extract_website_content(url: str) -> str: |
| """Extract HTML code and content from a website URL""" |
| try: |
| |
| parsed_url = urlparse(url) |
| if not parsed_url.scheme: |
| url = "https://" + url |
| parsed_url = urlparse(url) |
| |
| if not parsed_url.netloc: |
| return "Error: Invalid URL provided" |
| |
| print(f"[WebExtract] Fetching content from: {url}") |
| |
| |
| headers = { |
| 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', |
| 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8', |
| 'Accept-Language': 'en-US,en;q=0.9', |
| 'Accept-Encoding': 'gzip, deflate, br', |
| 'DNT': '1', |
| 'Connection': 'keep-alive', |
| 'Upgrade-Insecure-Requests': '1', |
| 'Sec-Fetch-Dest': 'document', |
| 'Sec-Fetch-Mode': 'navigate', |
| 'Sec-Fetch-Site': 'none', |
| 'Sec-Fetch-User': '?1', |
| 'Cache-Control': 'max-age=0' |
| } |
| |
| |
| session = requests.Session() |
| session.headers.update(headers) |
| |
| |
| max_retries = 3 |
| for attempt in range(max_retries): |
| try: |
| response = session.get(url, timeout=15, allow_redirects=True) |
| response.raise_for_status() |
| break |
| except requests.exceptions.HTTPError as e: |
| if e.response.status_code == 403 and attempt < max_retries - 1: |
| |
| session.headers['User-Agent'] = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36' |
| continue |
| else: |
| raise |
| |
| |
| try: |
| response.encoding = response.apparent_encoding |
| raw_html = response.text |
| except: |
| raw_html = response.content.decode('utf-8', errors='ignore') |
| |
| |
| soup = BeautifulSoup(raw_html, 'html.parser') |
| |
| |
| title = soup.find('title') |
| title_text = title.get_text().strip() if title else "No title found" |
| |
| meta_desc = soup.find('meta', attrs={'name': 'description'}) |
| description = meta_desc.get('content', '') if meta_desc else "" |
| |
| |
| WebContentExtractor._fix_image_urls(soup, url) |
| |
| |
| content_info = WebContentExtractor._analyze_content(soup) |
| |
| |
| modified_html = str(soup) |
| |
| |
| cleaned_html = WebContentExtractor._clean_html(modified_html) |
| |
| |
| website_content = WebContentExtractor._format_website_analysis( |
| url, title_text, description, content_info, cleaned_html |
| ) |
| |
| return website_content.strip() |
| |
| except requests.exceptions.HTTPError as e: |
| return WebContentExtractor._handle_http_error(e, url) |
| except requests.exceptions.Timeout: |
| return "Error: Request timed out. The website may be slow or unavailable." |
| except requests.exceptions.ConnectionError: |
| return "Error: Could not connect to the website. Please check your internet connection and the URL." |
| except requests.exceptions.RequestException as e: |
| return f"Error accessing website: {str(e)}" |
| except Exception as e: |
| return f"Error extracting website content: {str(e)}" |
| |
| @staticmethod |
| def _fix_image_urls(soup: BeautifulSoup, base_url: str): |
| """Fix relative image URLs to absolute URLs""" |
| img_elements = soup.find_all('img') |
| for img in img_elements: |
| src = img.get('src', '') |
| if src: |
| img['src'] = WebContentExtractor._make_absolute_url(src, base_url) |
| |
| |
| data_src = img.get('data-src', '') |
| if data_src and not src: |
| img['src'] = WebContentExtractor._make_absolute_url(data_src, base_url) |
| |
| |
| elements_with_style = soup.find_all(attrs={'style': True}) |
| for element in elements_with_style: |
| style_attr = element.get('style', '') |
| bg_pattern = r'background-image:\s*url\(["\']?([^"\']+)["\']?\)' |
| matches = re.findall(bg_pattern, style_attr, re.IGNORECASE) |
| for match in matches: |
| if match: |
| absolute_bg = WebContentExtractor._make_absolute_url(match, base_url) |
| style_attr = style_attr.replace(match, absolute_bg) |
| element['style'] = style_attr |
| |
| |
| style_elements = soup.find_all('style') |
| for style in style_elements: |
| if style.string: |
| style_content = style.string |
| bg_pattern = r'background-image:\s*url\(["\']?([^"\']+)["\']?\)' |
| matches = re.findall(bg_pattern, style_content, re.IGNORECASE) |
| for match in matches: |
| if match: |
| absolute_bg = WebContentExtractor._make_absolute_url(match, base_url) |
| style_content = style_content.replace(match, absolute_bg) |
| style.string = style_content |
| |
| @staticmethod |
| def _make_absolute_url(url: str, base_url: str) -> str: |
| """Convert relative URL to absolute URL""" |
| if url.startswith('//'): |
| return 'https:' + url |
| elif url.startswith('/'): |
| return urljoin(base_url, url) |
| elif not url.startswith(('http://', 'https://')): |
| return urljoin(base_url, url) |
| return url |
| |
| @staticmethod |
| def _analyze_content(soup: BeautifulSoup) -> Dict: |
| """Analyze website content and structure""" |
| content_sections = [] |
| nav_links = [] |
| images = [] |
| |
| |
| main_selectors = [ |
| 'main', 'article', '.content', '.main-content', '.post-content', |
| '#content', '#main', '.entry-content', '.post-body' |
| ] |
| |
| for selector in main_selectors: |
| elements = soup.select(selector) |
| for element in elements: |
| text = element.get_text().strip() |
| if len(text) > 100: |
| content_sections.append(text) |
| |
| |
| nav_elements = soup.find_all(['nav', 'header']) |
| for nav in nav_elements: |
| links = nav.find_all('a') |
| for link in links: |
| link_text = link.get_text().strip() |
| link_href = link.get('href', '') |
| if link_text and link_href: |
| nav_links.append(f"{link_text}: {link_href}") |
| |
| |
| img_elements = soup.find_all('img') |
| for img in img_elements: |
| src = img.get('src', '') |
| alt = img.get('alt', '') |
| if src: |
| images.append({'src': src, 'alt': alt}) |
| |
| |
| working_images = [] |
| for img in images[:10]: |
| if WebContentExtractor._test_image_url(img['src']): |
| working_images.append(img) |
| |
| print(f"[WebExtract] Found {len(images)} images, {len(working_images)} working") |
| |
| return { |
| 'content_sections': content_sections, |
| 'nav_links': nav_links, |
| 'images': images, |
| 'working_images': working_images, |
| 'script_tags': len(soup.find_all('script')) |
| } |
| |
| @staticmethod |
| def _test_image_url(img_url: str) -> bool: |
| """Test if image URL is accessible""" |
| try: |
| test_response = requests.head(img_url, timeout=5, allow_redirects=True) |
| return test_response.status_code == 200 |
| except: |
| return False |
| |
| @staticmethod |
| def _clean_html(html_content: str) -> str: |
| """Clean and format HTML for better readability""" |
| |
| cleaned = re.sub(r'<!--.*?-->', '', html_content, flags=re.DOTALL) |
| cleaned = re.sub(r'\s+', ' ', cleaned) |
| cleaned = re.sub(r'>\s+<', '><', cleaned) |
| |
| |
| if len(cleaned) > 15000: |
| cleaned = cleaned[:15000] + "\n<!-- ... HTML truncated for length ... -->" |
| |
| return cleaned |
| |
| @staticmethod |
| def _format_website_analysis(url: str, title: str, description: str, |
| content_info: Dict, html: str) -> str: |
| """Format comprehensive website analysis""" |
| working_images = content_info['working_images'] |
| all_images = content_info['images'] |
| |
| content = f""" |
| WEBSITE REDESIGN - ORIGINAL HTML CODE |
| ===================================== |
| |
| URL: {url} |
| Title: {title} |
| Description: {description} |
| |
| PAGE ANALYSIS: |
| - Website type: {title.lower()} website |
| - Content sections: {len(content_info['content_sections'])} |
| - Navigation links: {len(content_info['nav_links'])} |
| - Total images: {len(all_images)} |
| - Working images: {len(working_images)} |
| - JavaScript complexity: {"High" if content_info['script_tags'] > 10 else "Low to Medium"} |
| |
| WORKING IMAGES (use these URLs in your redesign): |
| {chr(10).join([f"• {img['alt'] or 'Image'} - {img['src']}" for img in working_images[:20]]) if working_images else "No working images found"} |
| |
| ALL IMAGES (including potentially broken ones): |
| {chr(10).join([f"• {img['alt'] or 'Image'} - {img['src']}" for img in all_images[:20]]) if all_images else "No images found"} |
| |
| ORIGINAL HTML CODE (use this as the base for redesign): |
| ```html |
| {html} |
| ``` |
| |
| REDESIGN INSTRUCTIONS: |
| Please redesign this website with a modern, responsive layout while: |
| 1. Preserving all the original content and structure |
| 2. Maintaining the same navigation and functionality |
| 3. Using the original images and their URLs (listed above) |
| 4. Creating a modern, clean design with improved typography and spacing |
| 5. Making it fully responsive for mobile devices |
| 6. Using modern CSS frameworks and best practices |
| 7. Keeping the same semantic structure but with enhanced styling |
| |
| IMPORTANT: All image URLs have been converted to absolute URLs and are ready to use. |
| Preserve these exact image URLs in your redesigned version. |
| |
| The HTML code above contains the complete original website structure with all images properly linked. |
| Use it as your starting point and create a modernized version. |
| """ |
| return content |
| |
| @staticmethod |
| def _handle_http_error(error, url: str) -> str: |
| """Handle HTTP errors with user-friendly messages""" |
| status_code = error.response.status_code if hasattr(error, 'response') else 0 |
| |
| if status_code == 403: |
| return f"Error: Website blocked access (403 Forbidden). This website may have anti-bot protection. Try a different website or provide a description instead." |
| elif status_code == 404: |
| return f"Error: Website not found (404). Please check the URL and try again." |
| elif status_code >= 500: |
| return f"Error: Website server error ({status_code}). Please try again later." |
| else: |
| return f"Error accessing website: HTTP {status_code} - {str(error)}" |
|
|
| class WebSearchEngine: |
| """Handles web search operations using Tavily""" |
| |
| @staticmethod |
| def perform_web_search(query: str, max_results: int = 5, |
| include_domains: Optional[List[str]] = None, |
| exclude_domains: Optional[List[str]] = None) -> str: |
| """Perform web search using Tavily with advanced parameters""" |
| if not tavily_client: |
| return "Web search is not available. Please set the TAVILY_API_KEY environment variable." |
| |
| try: |
| print(f"[WebSearch] Searching for: {query}") |
| |
| |
| search_params = { |
| "search_depth": "advanced", |
| "max_results": min(max(1, max_results), 20), |
| "include_answer": True, |
| "include_raw_content": False |
| } |
| |
| if include_domains: |
| search_params["include_domains"] = include_domains |
| if exclude_domains: |
| search_params["exclude_domains"] = exclude_domains |
| |
| |
| response = tavily_client.search(query, **search_params) |
| |
| |
| search_results = [] |
| answer = response.get('answer', '') |
| |
| if answer: |
| search_results.append(f"Direct Answer: {answer}\n") |
| |
| for result in response.get('results', []): |
| title = result.get('title', 'No title') |
| url = result.get('url', 'No URL') |
| content = result.get('content', 'No content') |
| score = result.get('score', 0) |
| |
| result_text = ( |
| f"Title: {title}\n" |
| f"URL: {url}\n" |
| f"Relevance Score: {score:.2f}\n" |
| f"Content: {content}\n" |
| ) |
| search_results.append(result_text) |
| |
| if search_results: |
| final_results = "Web Search Results:\n\n" + "\n---\n".join(search_results) |
| print(f"[WebSearch] Found {len(search_results)} results") |
| return final_results |
| else: |
| return "No search results found." |
| |
| except Exception as e: |
| error_msg = f"Search error: {str(e)}" |
| print(f"[WebSearch] Error: {error_msg}") |
| return error_msg |
| |
| @staticmethod |
| def enhance_query_with_search(query: str, enable_search: bool) -> str: |
| """Enhance the query with web search results if search is enabled""" |
| if not enable_search or not tavily_client: |
| return query |
| |
| print("[WebSearch] Enhancing query with web search") |
| |
| |
| search_results = WebSearchEngine.perform_web_search(query, max_results=3) |
| |
| |
| enhanced_query = f"""Original Query: {query} |
| |
| {search_results} |
| |
| Please use the search results above to help create the requested application with the most up-to-date information and best practices.""" |
| |
| return enhanced_query |
|
|
| |
| def parse_repo_or_model_url(url: str) -> Tuple[str, Optional[Dict]]: |
| """Parse a URL and detect if it's a GitHub repo, HF Space, or HF Model""" |
| try: |
| parsed = urlparse(url.strip()) |
| netloc = (parsed.netloc or "").lower() |
| path = (parsed.path or "").strip("/") |
| |
| |
| if ("huggingface.co" in netloc or netloc.endswith("hf.co")) and path.startswith("spaces/"): |
| parts = path.split("/") |
| if len(parts) >= 3: |
| return "hf_space", {"username": parts[1], "project": parts[2]} |
| |
| |
| if ("huggingface.co" in netloc or netloc.endswith("hf.co")) and not path.startswith(("spaces/", "datasets/", "organizations/")): |
| parts = path.split("/") |
| if len(parts) >= 2: |
| repo_id = f"{parts[0]}/{parts[1]}" |
| return "hf_model", {"repo_id": repo_id} |
| |
| |
| if "github.com" in netloc: |
| parts = path.split("/") |
| if len(parts) >= 2: |
| return "github", {"owner": parts[0], "repo": parts[1]} |
| |
| except Exception: |
| pass |
| |
| return "unknown", None |
|
|
| def check_hf_space_url(url: str) -> Tuple[bool, Optional[str], Optional[str]]: |
| """Check if URL is a valid Hugging Face Spaces URL and extract username/project""" |
| url_pattern = re.compile( |
| r'^(https?://)?(huggingface\.co|hf\.co)/spaces/([\w-]+)/([\w-]+)$', |
| re.IGNORECASE |
| ) |
| |
| match = url_pattern.match(url.strip()) |
| if match: |
| username = match.group(3) |
| project_name = match.group(4) |
| return True, username, project_name |
| return False, None, None |
|
|
| |
| web_extractor = WebContentExtractor() |
| web_search = WebSearchEngine() |
|
|
| def extract_website_content(url: str) -> str: |
| return web_extractor.extract_website_content(url) |
|
|
| def perform_web_search(query: str, max_results: int = 5) -> str: |
| return web_search.perform_web_search(query, max_results) |
|
|
| def enhance_query_with_search(query: str, enable_search: bool) -> str: |
| return web_search.enhance_query_with_search(query, enable_search) |