Spaces:
Runtime error
Runtime error
| import os | |
| import random | |
| import time | |
| import pandas as pd | |
| from fastapi import FastAPI, HTTPException | |
| from selenium import webdriver | |
| from selenium.webdriver.chrome.service import Service | |
| from selenium.webdriver.common.action_chains import ActionChains | |
| from selenium.webdriver.chrome.options import Options | |
| from webdriver_manager.chrome import ChromeDriverManager | |
| from webdriver_manager.core.driver_cache import DriverCacheManager | |
| from selenium.webdriver.common.by import By | |
| from fake_headers import Headers | |
| from fastapi.middleware.cors import CORSMiddleware | |
| import logging | |
| from pyppeteer import launch | |
| import asyncio | |
| # from selenium_driverless import webdriver as webdriverless | |
| # Initialize FastAPI | |
| app = FastAPI( | |
| debug=True, | |
| title="NextAnalytics Server", | |
| consumes=["application/x-www-form-urlencoded", "multipart/form-data"], | |
| docs_url='/swagger' | |
| ) | |
| # Configure CORS | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Setup ChromeDriver and Selenium | |
| # custom_wdm_cache = os.path.join(os.getcwd(), 'custom_wdm_cache') | |
| # os.environ['WDM_LOCAL'] = custom_wdm_cache | |
| # Setup logging | |
| logging.basicConfig(level=logging.INFO) | |
| def setup_chromedriver(): | |
| # logging.info("Setting up ChromeDriver...") | |
| custom_wdm_cache = os.path.join(os.getcwd(), 'custom_wdm_cache') | |
| os.environ['WDM_LOCAL'] = custom_wdm_cache | |
| cache_manager = DriverCacheManager(custom_wdm_cache) | |
| os.chmod(custom_wdm_cache, 0o755) # Ensure proper permissions | |
| path = ChromeDriverManager(cache_manager=cache_manager).install() | |
| # path = GeckoDriverManager(cache_manager=cache_manager).install() | |
| os.chmod(path, 0o755) # Ensure ChromeDriver is executable | |
| logging.info(f"ChromeDriver path: {path}") | |
| return path | |
| # Setup headless Chrome options | |
| # Define a custom user agent | |
| # my_user_agent = "Mozilla/5.0 (X11; Linux x86_64; rv:92.0) Gecko/20100101 Firefox/92.0" | |
| header = Headers().generate()["User-Agent"] | |
| # my_user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.159 Safari/537.36" | |
| # capabilities = webdriver.DesiredCapabilities.CHROME | |
| # proxy = None | |
| browser_option = Options() | |
| # browser_option..level = 'trace' | |
| browser_option.add_argument("--headless") # Running in headless mode (no GUI) | |
| browser_option.add_argument("--no-sandbox") | |
| browser_option.add_argument("--disable-dev-shm-usage") | |
| browser_option.add_argument("--disable-blink-features=AutomationControlled") | |
| browser_option.add_argument("--ignore-certificate-errors") | |
| # profile = webdriver.FirefoxProfile() | |
| # profile.set_preference("general.useragent.override", "Your User Agent String") | |
| # browser_option.profile = profile | |
| logging.info(f"browser_version: {browser_option.browser_version}") | |
| # browser_option.set_capability( | |
| # ) | |
| # name="", | |
| # value=capabilities) | |
| # browser_option.capabilities = { | |
| # "moz:firefoxOptions": { | |
| # "args": [ | |
| # "--headless", | |
| # "--no-sandbox", | |
| # "--disable-dev-shm-usage", | |
| # "--ignore-certificate-errors", | |
| # f"--user-agent={header}" | |
| # ] | |
| # } | |
| # } | |
| # browser_option.binary_location = '/usr/bin/firefox' | |
| # browser_option.binary_location = r'C:\Users\HP\.cache\selenium\firefox\win64\133.0\firefox.exe' | |
| # browser_option.add_argument("--disable-gpu") | |
| # browser_option.add_argument("--log-level=3") | |
| # browser_option.add_argument("--disable-notifications") | |
| # browser_option.add_argument("--disable-popup-blocking") | |
| # browser_option.add_argument(f"--user-agent={my_user_agent}") | |
| browser_option.add_argument("--user-agent={}".format(header)) | |
| # if proxy: | |
| # browser_option.add_argument(f"--proxy-server={proxy}") | |
| # Setup WebDriver | |
| driver_path = setup_chromedriver() | |
| service = Service(executable_path=driver_path) | |
| driver = webdriver.Chrome(service=service, options=browser_option) | |
| # actions = ActionChains(driver) | |
| def getSearchPostData(search_keyword, index, name="", forCompetitorAnalysis=False): | |
| # Navigate to the search results page | |
| url = f'https://www.reddit.com/search/?q={search_keyword}' | |
| driver.get(url) | |
| time.sleep(3) # Consider using WebDriverWait instead of sleep for better reliability | |
| logging.info("Navigated to search page.") | |
| posts_data = [] | |
| list_length = 0 # posts count | |
| try: | |
| if forCompetitorAnalysis: | |
| driver.execute_script("window.scrollTo(0, document.body.scrollHeight);") | |
| time.sleep(5) | |
| post_cards = driver.find_elements(By.CSS_SELECTOR, 'a[data-testid="post-title-text"]') | |
| post_cards_1 = driver.find_elements(By.CSS_SELECTOR, 'div[data-testid="search-counter-row"]') | |
| post_cards_2 = driver.find_elements(By.CSS_SELECTOR, 'faceplate-timeago') | |
| logging.info(f"Found {len(post_cards)} post cards.") | |
| idx = list_length | |
| for card in post_cards_1: | |
| try: | |
| votes_count = card.find_element(By.XPATH, './/faceplate-number').text | |
| comments_count = card.find_element(By.XPATH, | |
| './/span[contains(text(), "comment") or contains(text(), "comments")]/preceding-sibling::faceplate-number' | |
| ).text | |
| posts_data.append({ | |
| "index": idx, | |
| "comment_count": comments_count, | |
| "votes_count": votes_count | |
| }) | |
| idx += 1 | |
| except Exception as e: | |
| logging.error(f"Error processing post_card_1: {e}") | |
| idx = list_length | |
| for card in post_cards: | |
| try: | |
| url = card.get_attribute("href") | |
| title = card.text | |
| posts_data[idx]["title"] = title | |
| posts_data[idx]["url"] = url | |
| idx += 1 | |
| except Exception as e: | |
| logging.error(f"Error processing post_cards: {e}") | |
| idx = list_length | |
| for card in post_cards_2: | |
| try: | |
| time_element = card.find_element(By.XPATH, './time') | |
| post_time = time_element.get_attribute('datetime') | |
| posts_data[idx]["time"] = post_time | |
| idx += 1 | |
| except Exception as e: | |
| logging.error(f"Error processing post_cards_2: {e}") | |
| except Exception as e: | |
| logging.error(f"Error in scrolling or extracting data: {e}") | |
| df = pd.DataFrame(posts_data) | |
| df.to_csv(f'posts_data_{index}.csv', index=False) | |
| logging.info(f"Data saved to posts_data_{index}.csv") | |
| return df | |
| def getPinterestSearchPostData(search_keyword, index, name="", forCompetitorAnalysis=False): | |
| # Navigate to the search results page | |
| url = f'https://www.pinterest.com/search/pins?q={search_keyword}' | |
| driver.get(url) | |
| driver.implicitly_wait(5) # Consider using WebDriverWait instead of sleep for better reliability | |
| logging.info("Navigated to search page.") | |
| # links = driver.find_elements(By.CSS_SELECTOR, "div[data-test-id='related-pins-title']") | |
| # print("links", links) | |
| posts_data = [] | |
| list_length = 0 # posts count | |
| try: | |
| if forCompetitorAnalysis: | |
| driver.execute_script("window.scrollTo(0, document.body.scrollHeight);") | |
| time.sleep(5) | |
| post_cards = driver.find_elements(By.CSS_SELECTOR, "div[data-test-id='related-pins-title']") | |
| # post_cards_1 = driver.find_elements(By.CSS_SELECTOR, 'div[data-testid="search-counter-row"]') | |
| # post_cards_2 = driver.find_elements(By.CSS_SELECTOR, 'faceplate-timeago') | |
| logging.info(f"Found {len(post_cards)} post cards.") | |
| # idx = list_length | |
| # for card in post_cards_1: | |
| # try: | |
| # votes_count = card.find_element(By.XPATH, './/faceplate-number').text | |
| # comments_count = card.find_element(By.XPATH, | |
| # './/span[contains(text(), "comment") or contains(text(), "comments")]/preceding-sibling::faceplate-number' | |
| # ).text | |
| # posts_data.append({ | |
| # "index": idx, | |
| # "comment_count": comments_count, | |
| # "votes_count": votes_count | |
| # }) | |
| # idx += 1 | |
| # except Exception as e: | |
| # logging.error(f"Error processing post_card_1: {e}") | |
| idx = 0 | |
| for card in post_cards: | |
| try: | |
| title = card.find_element(By.XPATH, | |
| './/div' | |
| ).text | |
| # posts_data[idx]["title"] = title | |
| print("title", title) | |
| # idx += 1 | |
| except Exception as e: | |
| logging.error(f"Error processing post_cards: {e}") | |
| # idx = list_length | |
| # for card in post_cards_2: | |
| # try: | |
| # time_element = card.find_element(By.XPATH, './time') | |
| # post_time = time_element.get_attribute('datetime') | |
| # posts_data[idx]["time"] = post_time | |
| # idx += 1 | |
| # except Exception as e: | |
| # logging.error(f"Error processing post_cards_2: {e}") | |
| except Exception as e: | |
| logging.error(f"Error in scrolling or extracting data: {e}") | |
| df = pd.DataFrame(posts_data) | |
| df.to_csv(f'posts_data_{index}.csv', index=False) | |
| logging.info(f"Data saved to posts_data_{index}.csv") | |
| return df | |
| def get_webpage_title(url: str) -> str: | |
| try: | |
| # getSearchPostData(search_keyword="migraine", index=0) | |
| getPinterestSearchPostData(search_keyword="watercolor art",index=0) | |
| driver.get(url) | |
| time.sleep(3) | |
| title = driver.title | |
| logging.info(f"Page title: {title}") | |
| return title | |
| except Exception as e: | |
| logging.error(f"Error fetching webpage title: {e}") | |
| return str(e) | |
| async def home(): | |
| return {"message": "Hello"} | |
| async def pupFcuntin(url)->str: | |
| browser = await launch( | |
| options={ | |
| 'headless': True, | |
| 'args': [ | |
| '--no-sandbox', | |
| '--disable-setuid-sandbox', | |
| '--disable-blink-features=AutomationControlled', | |
| ], | |
| # 'executablePath': 'usr/bin/google-chrome', | |
| # 'executablePath': r'C:\Program Files\Google\Chrome\Application\chrome.exe', | |
| } | |
| ) | |
| page = await browser.newPage() | |
| await page.setUserAgent('Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/66.0.3359.181 Safari/537.36') | |
| # Pretend to be a real browser | |
| await page.evaluateOnNewDocument( | |
| """ | |
| () => { | |
| Object.defineProperty(navigator, 'webdriver', { get: () => false }); | |
| Object.defineProperty(navigator, 'languages', { get: () => ['en-US', 'en'] }); | |
| Object.defineProperty(navigator, 'plugins', { get: () => [1, 2, 3] }); | |
| } | |
| """ | |
| ) | |
| await page.goto(url, options={'waitUntil': 'domcontentloaded'}) | |
| time.sleep(3) | |
| ## Get HTML | |
| html = await page.title() | |
| await browser.close() | |
| logging.info(f"Page title: {html}") | |
| return html | |
| async def puppeteerTrial(url: str): | |
| html =await pupFcuntin(url) | |
| return {"message": html} | |
| async def fetch_title(url: str): | |
| """ | |
| Fetch the title of a webpage by URL. | |
| Example: /get-title/?url=https://www.reddit.com | |
| """ | |
| try: | |
| title = get_webpage_title(url) | |
| return {"url": url, "title": title} | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| # @app.get("/get-reddit/") | |
| # async def getReddit(url: str): | |
| # """ | |
| # Fetch the title of a webpage by URL. | |
| # Example: /get-title/?url=https://www.reddit.com | |
| # """ | |
| # try: | |
| # options = webdriverless.ChromeOptions() | |
| # driver_agent = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.159 Safari/537.36" | |
| # options.add_argument("--headless") # Running in headless mode (no GUI) | |
| # options.add_argument("--no-sandbox") | |
| # options.add_argument("--disable-dev-shm-usage") | |
| # options.add_argument("--ignore-certificate-errors") | |
| # options.add_argument(f"--user-agent={driver_agent}") | |
| # title="Notitle" | |
| # async with webdriverless.Chrome(options=options) as driver: | |
| # await driver.get('https://www.reddit.com') | |
| # time.sleep(3) | |
| # title = await driver.title | |
| # url = await driver.current_url | |
| # source = await driver.page_source | |
| # print(title) | |
| # return {"url": url, "title": title} | |
| # return {"url": url, "title": title} | |
| # except Exception as e: | |
| # raise HTTPException(status_code=500, detail=str(e)) | |
| # Run the app | |
| # if __name__ == "__main__": | |
| # import uvicorn | |
| # uvicorn.run(app, host="127.0.0.1", port=7860) | |
| # from selenium import webdriver | |
| # from flask import Flask, request | |
| # from selenium.webdriver.chrome.service import Service | |
| # from webdriver_manager.chrome import ChromeDriverManager | |
| # from selenium.webdriver.common.by import By | |
| # from selenium.webdriver.common.proxy import Proxy, ProxyType | |
| # app = Flask(__name__) | |
| # def download_selenium(): | |
| # prox = Proxy() | |
| # prox.proxy_type = ProxyType.MANUAL | |
| # prox.http_proxy = "ip_addr:port" | |
| # prox.socks_proxy = "ip_addr:port" | |
| # prox.ssl_proxy = "ip_addr:port" | |
| # chrome_options = webdriver.ChromeOptions() | |
| # driver_agent = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.159 Safari/537.36" | |
| # capabilities = webdriver.DesiredCapabilities.CHROME | |
| # prox.to_capabilities(capabilities) | |
| # chrome_options.add_argument("--headless=new") | |
| # # chrome_options.add_argument(f"--proxy-server={proxy}") | |
| # chrome_options.add_argument("--no-sandbox") | |
| # chrome_options.add_argument("--disable-dev-shm-usage") | |
| # # chrome_options.add_argument("--ignore-certificate-errors") | |
| # # chrome_options.add_argument("--disable-gpu") | |
| # # chrome_options.add_argument("--log-level=3") | |
| # # chrome_options.add_argument("--disable-notifications") | |
| # # chrome_options.add_argument("--disable-popup-blocking") | |
| # prefs = {"profile.managed_default_content_settings.images": 2} | |
| # # chrome_options.add_experimental_option("prefs", prefs) | |
| # chrome_options.add_argument(f"--user-agent={driver_agent}") | |
| # driver = webdriver.Chrome(service=Service(ChromeDriverManager().install(),desired_capabilities=capabilities), options=chrome_options) | |
| # driver.get("https://reddit.com") | |
| # title = driver.title | |
| # # language = driver.find_element(By.XPATH, "//div[@id='SIvCob']").text | |
| # data = {'Page Title': title} | |
| # return data | |
| # @app.route('/', methods = ['GET','POST']) | |
| # def home(): | |
| # if (request.method == 'GET'): | |
| # return download_selenium() | |
| # if __name__ == "__main__": | |
| # app.run(debug=True, port=3000) |