Spaces:
Runtime error
Runtime error
| import concurrent | |
| from reddit.reddit_search_scrapper import getFinalData | |
| from reddit.reddit_sentiment_analysis import SentimentAnalysis | |
| from reddit.reddit_utils import get_microseconds_list | |
| from reddit.scraping import getPostComments, getSearchPostData | |
| import time | |
| import asyncio | |
| import time | |
| import os | |
| import concurrent.futures | |
| async def delete_files(file_names): | |
| """Helper function to delete created files.""" | |
| for file_name in file_names: | |
| try: | |
| if os.path.exists(file_name): | |
| os.remove(file_name) | |
| print(f"Deleted file: {file_name}") | |
| except Exception as e: | |
| print(f"Error deleting file {file_name}: {e}") | |
| async def run_with_timeout(task_func, *args, timeout=300): | |
| """Runs a task with a timeout.""" | |
| try: | |
| return await asyncio.wait_for(task_func(*args), timeout=timeout) | |
| except asyncio.TimeoutError: | |
| print(f"Task exceeded {timeout} seconds timeout.") | |
| raise | |
| async def getRedditData_with_timeout(user_query, search_keywords, retries=1, timeout=300): | |
| """Retries the getRedditData process with a timeout.""" | |
| file_names = [] | |
| for attempt in range(retries + 1): | |
| try: | |
| result = await run_with_timeout(getRedditData, user_query, search_keywords, timeout=timeout) | |
| return result | |
| except Exception as e: | |
| print(f"Attempt {attempt + 1} failed with error: {e}") | |
| await delete_files(file_names) # Delete created files | |
| if attempt == retries: | |
| raise Exception("Process failed after retries.") from e | |
| async def getRedditData(user_query, search_keywords): | |
| unique_list = get_microseconds_list() | |
| successful_steps = [] | |
| start_time = time.time() | |
| fileNames = [] | |
| def log_step_time(step_name, start_time, success=True, error=None): | |
| elapsed = time.time() - start_time | |
| if success: | |
| print(f"{step_name} completed successfully in {elapsed:.2f} seconds.") | |
| else: | |
| print(f"{step_name} failed in {elapsed:.2f} seconds. Error: {error}") | |
| # Step 1: Get search post data | |
| try: | |
| step_start = time.time() | |
| with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: | |
| futures = [] | |
| count = 0 | |
| for i in range(len(search_keywords)): | |
| future = executor.submit(getSearchPostData, search_keyword=search_keywords[i], index=unique_list[i], position=i) | |
| futures.append(future) | |
| if len(futures) == 3: | |
| for future in concurrent.futures.as_completed(futures): | |
| result = future.result() | |
| if result: | |
| fileNames.append(f"posts_data_{result}.csv") | |
| successful_steps.append(('getSearchPostData', count)) | |
| count += 1 | |
| futures = [] | |
| if futures: | |
| for future in concurrent.futures.as_completed(futures): | |
| result = future.result() | |
| if result: | |
| fileNames.append(f"posts_data_{result}.csv") | |
| successful_steps.append(('getSearchPostData', count)) | |
| count += 1 | |
| log_step_time("getSearchPostData", step_start) | |
| except Exception as e: | |
| log_step_time("getSearchPostData", step_start, success=False, error=e) | |
| # Step 2: Get final data | |
| try: | |
| step_start = time.time() | |
| res = getFinalData(user_query=user_query, filesNames=fileNames) | |
| if res is True: | |
| successful_steps.append(('getFinalData')) | |
| log_step_time("getFinalData", step_start) | |
| except Exception as e: | |
| log_step_time("getFinalData", step_start, success=False, error=e) | |
| # Step 3: Get post comments | |
| try: | |
| step_start = time.time() | |
| await getPostComments(file_name=fileNames[0]) | |
| successful_steps.append(('getPostComments',)) | |
| log_step_time("getPostComments", step_start) | |
| except Exception as e: | |
| log_step_time("getPostComments", step_start, success=False, error=e) | |
| reddit_time = time.time() - start_time | |
| start_time = time.time() | |
| # Step 4: Get sentiment of post comments | |
| try: | |
| step_start = time.time() | |
| sentiment_instance = SentimentAnalysis() | |
| sentiment_instance.generate_sentiment_and_emotion_from_data(fileName=fileNames[0]) | |
| successful_steps.append(('getPostSentiment',)) | |
| log_step_time("getPostSentiment", step_start) | |
| except Exception as e: | |
| log_step_time("getPostSentiment", step_start, success=False, error=e) | |
| sentiment_time = time.time()-start_time | |
| return { | |
| "fileName": fileNames[0] if fileNames else None, | |
| 'reddit_data':reddit_time, | |
| 'sentiment_data':sentiment_time, | |
| "fileUniqueId": str(unique_list[0]) if unique_list else None, | |
| "successful_steps": successful_steps, | |
| } |