import os import random import time import pandas as pd from fastapi import FastAPI, HTTPException from selenium import webdriver from selenium.webdriver.chrome.service import Service from selenium.webdriver.common.action_chains import ActionChains from selenium.webdriver.chrome.options import Options from webdriver_manager.chrome import ChromeDriverManager from webdriver_manager.core.driver_cache import DriverCacheManager from selenium.webdriver.common.by import By from fake_headers import Headers from fastapi.middleware.cors import CORSMiddleware import logging from pyppeteer import launch import asyncio # from selenium_driverless import webdriver as webdriverless # Initialize FastAPI app = FastAPI( debug=True, title="NextAnalytics Server", consumes=["application/x-www-form-urlencoded", "multipart/form-data"], docs_url='/swagger' ) # Configure CORS app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Setup ChromeDriver and Selenium # custom_wdm_cache = os.path.join(os.getcwd(), 'custom_wdm_cache') # os.environ['WDM_LOCAL'] = custom_wdm_cache # Setup logging logging.basicConfig(level=logging.INFO) def setup_chromedriver(): # logging.info("Setting up ChromeDriver...") custom_wdm_cache = os.path.join(os.getcwd(), 'custom_wdm_cache') os.environ['WDM_LOCAL'] = custom_wdm_cache cache_manager = DriverCacheManager(custom_wdm_cache) os.chmod(custom_wdm_cache, 0o755) # Ensure proper permissions path = ChromeDriverManager(cache_manager=cache_manager).install() # path = GeckoDriverManager(cache_manager=cache_manager).install() os.chmod(path, 0o755) # Ensure ChromeDriver is executable logging.info(f"ChromeDriver path: {path}") return path # Setup headless Chrome options # Define a custom user agent # my_user_agent = "Mozilla/5.0 (X11; Linux x86_64; rv:92.0) Gecko/20100101 Firefox/92.0" header = Headers().generate()["User-Agent"] # my_user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.159 Safari/537.36" # capabilities = webdriver.DesiredCapabilities.CHROME # proxy = None browser_option = Options() # browser_option..level = 'trace' browser_option.add_argument("--headless") # Running in headless mode (no GUI) browser_option.add_argument("--no-sandbox") browser_option.add_argument("--disable-dev-shm-usage") browser_option.add_argument("--disable-blink-features=AutomationControlled") browser_option.add_argument("--ignore-certificate-errors") # profile = webdriver.FirefoxProfile() # profile.set_preference("general.useragent.override", "Your User Agent String") # browser_option.profile = profile logging.info(f"browser_version: {browser_option.browser_version}") # browser_option.set_capability( # ) # name="", # value=capabilities) # browser_option.capabilities = { # "moz:firefoxOptions": { # "args": [ # "--headless", # "--no-sandbox", # "--disable-dev-shm-usage", # "--ignore-certificate-errors", # f"--user-agent={header}" # ] # } # } # browser_option.binary_location = '/usr/bin/firefox' # browser_option.binary_location = r'C:\Users\HP\.cache\selenium\firefox\win64\133.0\firefox.exe' # browser_option.add_argument("--disable-gpu") # browser_option.add_argument("--log-level=3") # browser_option.add_argument("--disable-notifications") # browser_option.add_argument("--disable-popup-blocking") # browser_option.add_argument(f"--user-agent={my_user_agent}") browser_option.add_argument("--user-agent={}".format(header)) # if proxy: # browser_option.add_argument(f"--proxy-server={proxy}") # Setup WebDriver driver_path = setup_chromedriver() service = Service(executable_path=driver_path) driver = webdriver.Chrome(service=service, options=browser_option) # actions = ActionChains(driver) def getSearchPostData(search_keyword, index, name="", forCompetitorAnalysis=False): # Navigate to the search results page url = f'https://www.reddit.com/search/?q={search_keyword}' driver.get(url) time.sleep(3) # Consider using WebDriverWait instead of sleep for better reliability logging.info("Navigated to search page.") posts_data = [] list_length = 0 # posts count try: if forCompetitorAnalysis: driver.execute_script("window.scrollTo(0, document.body.scrollHeight);") time.sleep(5) post_cards = driver.find_elements(By.CSS_SELECTOR, 'a[data-testid="post-title-text"]') post_cards_1 = driver.find_elements(By.CSS_SELECTOR, 'div[data-testid="search-counter-row"]') post_cards_2 = driver.find_elements(By.CSS_SELECTOR, 'faceplate-timeago') logging.info(f"Found {len(post_cards)} post cards.") idx = list_length for card in post_cards_1: try: votes_count = card.find_element(By.XPATH, './/faceplate-number').text comments_count = card.find_element(By.XPATH, './/span[contains(text(), "comment") or contains(text(), "comments")]/preceding-sibling::faceplate-number' ).text posts_data.append({ "index": idx, "comment_count": comments_count, "votes_count": votes_count }) idx += 1 except Exception as e: logging.error(f"Error processing post_card_1: {e}") idx = list_length for card in post_cards: try: url = card.get_attribute("href") title = card.text posts_data[idx]["title"] = title posts_data[idx]["url"] = url idx += 1 except Exception as e: logging.error(f"Error processing post_cards: {e}") idx = list_length for card in post_cards_2: try: time_element = card.find_element(By.XPATH, './time') post_time = time_element.get_attribute('datetime') posts_data[idx]["time"] = post_time idx += 1 except Exception as e: logging.error(f"Error processing post_cards_2: {e}") except Exception as e: logging.error(f"Error in scrolling or extracting data: {e}") df = pd.DataFrame(posts_data) df.to_csv(f'posts_data_{index}.csv', index=False) logging.info(f"Data saved to posts_data_{index}.csv") return df def getPinterestSearchPostData(search_keyword, index, name="", forCompetitorAnalysis=False): # Navigate to the search results page url = f'https://www.pinterest.com/search/pins?q={search_keyword}' driver.get(url) driver.implicitly_wait(5) # Consider using WebDriverWait instead of sleep for better reliability logging.info("Navigated to search page.") # links = driver.find_elements(By.CSS_SELECTOR, "div[data-test-id='related-pins-title']") # print("links", links) posts_data = [] list_length = 0 # posts count try: if forCompetitorAnalysis: driver.execute_script("window.scrollTo(0, document.body.scrollHeight);") time.sleep(5) post_cards = driver.find_elements(By.CSS_SELECTOR, "div[data-test-id='related-pins-title']") # post_cards_1 = driver.find_elements(By.CSS_SELECTOR, 'div[data-testid="search-counter-row"]') # post_cards_2 = driver.find_elements(By.CSS_SELECTOR, 'faceplate-timeago') logging.info(f"Found {len(post_cards)} post cards.") # idx = list_length # for card in post_cards_1: # try: # votes_count = card.find_element(By.XPATH, './/faceplate-number').text # comments_count = card.find_element(By.XPATH, # './/span[contains(text(), "comment") or contains(text(), "comments")]/preceding-sibling::faceplate-number' # ).text # posts_data.append({ # "index": idx, # "comment_count": comments_count, # "votes_count": votes_count # }) # idx += 1 # except Exception as e: # logging.error(f"Error processing post_card_1: {e}") idx = 0 for card in post_cards: try: title = card.find_element(By.XPATH, './/div' ).text # posts_data[idx]["title"] = title print("title", title) # idx += 1 except Exception as e: logging.error(f"Error processing post_cards: {e}") # idx = list_length # for card in post_cards_2: # try: # time_element = card.find_element(By.XPATH, './time') # post_time = time_element.get_attribute('datetime') # posts_data[idx]["time"] = post_time # idx += 1 # except Exception as e: # logging.error(f"Error processing post_cards_2: {e}") except Exception as e: logging.error(f"Error in scrolling or extracting data: {e}") df = pd.DataFrame(posts_data) df.to_csv(f'posts_data_{index}.csv', index=False) logging.info(f"Data saved to posts_data_{index}.csv") return df def get_webpage_title(url: str) -> str: try: # getSearchPostData(search_keyword="migraine", index=0) getPinterestSearchPostData(search_keyword="watercolor art",index=0) driver.get(url) time.sleep(3) title = driver.title logging.info(f"Page title: {title}") return title except Exception as e: logging.error(f"Error fetching webpage title: {e}") return str(e) @app.get("/") async def home(): return {"message": "Hello"} async def pupFcuntin(url)->str: browser = await launch( options={ 'headless': True, 'args': [ '--no-sandbox', '--disable-setuid-sandbox', '--disable-blink-features=AutomationControlled', ], # 'executablePath': 'usr/bin/google-chrome', # 'executablePath': r'C:\Program Files\Google\Chrome\Application\chrome.exe', } ) page = await browser.newPage() await page.setUserAgent('Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/66.0.3359.181 Safari/537.36') # Pretend to be a real browser await page.evaluateOnNewDocument( """ () => { Object.defineProperty(navigator, 'webdriver', { get: () => false }); Object.defineProperty(navigator, 'languages', { get: () => ['en-US', 'en'] }); Object.defineProperty(navigator, 'plugins', { get: () => [1, 2, 3] }); } """ ) await page.goto(url, options={'waitUntil': 'domcontentloaded'}) time.sleep(3) ## Get HTML html = await page.title() await browser.close() logging.info(f"Page title: {html}") return html @app.get("/puppeteerTrial") async def puppeteerTrial(url: str): html =await pupFcuntin(url) return {"message": html} @app.get("/get-title/") async def fetch_title(url: str): """ Fetch the title of a webpage by URL. Example: /get-title/?url=https://www.reddit.com """ try: title = get_webpage_title(url) return {"url": url, "title": title} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) # @app.get("/get-reddit/") # async def getReddit(url: str): # """ # Fetch the title of a webpage by URL. # Example: /get-title/?url=https://www.reddit.com # """ # try: # options = webdriverless.ChromeOptions() # driver_agent = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.159 Safari/537.36" # options.add_argument("--headless") # Running in headless mode (no GUI) # options.add_argument("--no-sandbox") # options.add_argument("--disable-dev-shm-usage") # options.add_argument("--ignore-certificate-errors") # options.add_argument(f"--user-agent={driver_agent}") # title="Notitle" # async with webdriverless.Chrome(options=options) as driver: # await driver.get('https://www.reddit.com') # time.sleep(3) # title = await driver.title # url = await driver.current_url # source = await driver.page_source # print(title) # return {"url": url, "title": title} # return {"url": url, "title": title} # except Exception as e: # raise HTTPException(status_code=500, detail=str(e)) # Run the app # if __name__ == "__main__": # import uvicorn # uvicorn.run(app, host="127.0.0.1", port=7860) # from selenium import webdriver # from flask import Flask, request # from selenium.webdriver.chrome.service import Service # from webdriver_manager.chrome import ChromeDriverManager # from selenium.webdriver.common.by import By # from selenium.webdriver.common.proxy import Proxy, ProxyType # app = Flask(__name__) # def download_selenium(): # prox = Proxy() # prox.proxy_type = ProxyType.MANUAL # prox.http_proxy = "ip_addr:port" # prox.socks_proxy = "ip_addr:port" # prox.ssl_proxy = "ip_addr:port" # chrome_options = webdriver.ChromeOptions() # driver_agent = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.159 Safari/537.36" # capabilities = webdriver.DesiredCapabilities.CHROME # prox.to_capabilities(capabilities) # chrome_options.add_argument("--headless=new") # # chrome_options.add_argument(f"--proxy-server={proxy}") # chrome_options.add_argument("--no-sandbox") # chrome_options.add_argument("--disable-dev-shm-usage") # # chrome_options.add_argument("--ignore-certificate-errors") # # chrome_options.add_argument("--disable-gpu") # # chrome_options.add_argument("--log-level=3") # # chrome_options.add_argument("--disable-notifications") # # chrome_options.add_argument("--disable-popup-blocking") # prefs = {"profile.managed_default_content_settings.images": 2} # # chrome_options.add_experimental_option("prefs", prefs) # chrome_options.add_argument(f"--user-agent={driver_agent}") # driver = webdriver.Chrome(service=Service(ChromeDriverManager().install(),desired_capabilities=capabilities), options=chrome_options) # driver.get("https://reddit.com") # title = driver.title # # language = driver.find_element(By.XPATH, "//div[@id='SIvCob']").text # data = {'Page Title': title} # return data # @app.route('/', methods = ['GET','POST']) # def home(): # if (request.method == 'GET'): # return download_selenium() # if __name__ == "__main__": # app.run(debug=True, port=3000)