import concurrent from reddit.reddit_search_scrapper import getFinalData from reddit.reddit_sentiment_analysis import SentimentAnalysis from reddit.reddit_utils import get_microseconds_list from reddit.scraping import getPostComments, getSearchPostData import time import asyncio import time import os import concurrent.futures async def delete_files(file_names): """Helper function to delete created files.""" for file_name in file_names: try: if os.path.exists(file_name): os.remove(file_name) print(f"Deleted file: {file_name}") except Exception as e: print(f"Error deleting file {file_name}: {e}") async def run_with_timeout(task_func, *args, timeout=300): """Runs a task with a timeout.""" try: return await asyncio.wait_for(task_func(*args), timeout=timeout) except asyncio.TimeoutError: print(f"Task exceeded {timeout} seconds timeout.") raise async def getRedditData_with_timeout(user_query, search_keywords, retries=1, timeout=300): """Retries the getRedditData process with a timeout.""" file_names = [] for attempt in range(retries + 1): try: result = await run_with_timeout(getRedditData, user_query, search_keywords, timeout=timeout) return result except Exception as e: print(f"Attempt {attempt + 1} failed with error: {e}") await delete_files(file_names) # Delete created files if attempt == retries: raise Exception("Process failed after retries.") from e async def getRedditData(user_query, search_keywords): unique_list = get_microseconds_list() successful_steps = [] start_time = time.time() fileNames = [] def log_step_time(step_name, start_time, success=True, error=None): elapsed = time.time() - start_time if success: print(f"{step_name} completed successfully in {elapsed:.2f} seconds.") else: print(f"{step_name} failed in {elapsed:.2f} seconds. Error: {error}") # Step 1: Get search post data try: step_start = time.time() with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: futures = [] count = 0 for i in range(len(search_keywords)): future = executor.submit(getSearchPostData, search_keyword=search_keywords[i], index=unique_list[i], position=i) futures.append(future) if len(futures) == 3: for future in concurrent.futures.as_completed(futures): result = future.result() if result: fileNames.append(f"posts_data_{result}.csv") successful_steps.append(('getSearchPostData', count)) count += 1 futures = [] if futures: for future in concurrent.futures.as_completed(futures): result = future.result() if result: fileNames.append(f"posts_data_{result}.csv") successful_steps.append(('getSearchPostData', count)) count += 1 log_step_time("getSearchPostData", step_start) except Exception as e: log_step_time("getSearchPostData", step_start, success=False, error=e) # Step 2: Get final data try: step_start = time.time() res = getFinalData(user_query=user_query, filesNames=fileNames) if res is True: successful_steps.append(('getFinalData')) log_step_time("getFinalData", step_start) except Exception as e: log_step_time("getFinalData", step_start, success=False, error=e) # Step 3: Get post comments try: step_start = time.time() await getPostComments(file_name=fileNames[0]) successful_steps.append(('getPostComments',)) log_step_time("getPostComments", step_start) except Exception as e: log_step_time("getPostComments", step_start, success=False, error=e) reddit_time = time.time() - start_time start_time = time.time() # Step 4: Get sentiment of post comments try: step_start = time.time() sentiment_instance = SentimentAnalysis() sentiment_instance.generate_sentiment_and_emotion_from_data(fileName=fileNames[0]) successful_steps.append(('getPostSentiment',)) log_step_time("getPostSentiment", step_start) except Exception as e: log_step_time("getPostSentiment", step_start, success=False, error=e) sentiment_time = time.time()-start_time return { "fileName": fileNames[0] if fileNames else None, 'reddit_data':reddit_time, 'sentiment_data':sentiment_time, "fileUniqueId": str(unique_list[0]) if unique_list else None, "successful_steps": successful_steps, }