File size: 4,984 Bytes
c3837c5
b70f413
c3837c5
 
 
 
bf4f857
213206a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c3837c5
bf4f857
c3837c5
 
bf4f857
213206a
 
 
 
 
 
 
 
 
c3837c5
b70f413
213206a
b70f413
 
213206a
b70f413
 
 
 
 
 
 
213206a
e56eed8
213206a
 
b70f413
 
 
 
 
213206a
e56eed8
213206a
 
 
b70f413
213206a
c3837c5
213206a
c3837c5
213206a
 
e1ce9ca
213206a
 
c3837c5
213206a
c3837c5
213206a
c3837c5
213206a
bf4f857
213206a
 
c3837c5
213206a
 
bf4f857
213206a
c3837c5
213206a
c3837c5
 
213206a
 
c3837c5
213206a
 
c3837c5
213206a
 
 
 
bf4f857
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123

import concurrent
from reddit.reddit_search_scrapper import getFinalData
from reddit.reddit_sentiment_analysis import SentimentAnalysis
from reddit.reddit_utils import get_microseconds_list
from reddit.scraping import getPostComments, getSearchPostData
import time
import asyncio
import time
import os
import concurrent.futures

async def delete_files(file_names):
    """Helper function to delete created files."""
    for file_name in file_names:
        try:
            if os.path.exists(file_name):
                os.remove(file_name)
                print(f"Deleted file: {file_name}")
        except Exception as e:
            print(f"Error deleting file {file_name}: {e}")

async def run_with_timeout(task_func, *args, timeout=300):
    """Runs a task with a timeout."""
    try:
        return await asyncio.wait_for(task_func(*args), timeout=timeout)
    except asyncio.TimeoutError:
        print(f"Task exceeded {timeout} seconds timeout.")
        raise

async def getRedditData_with_timeout(user_query, search_keywords, retries=1, timeout=300):
    """Retries the getRedditData process with a timeout."""
    file_names = []
    for attempt in range(retries + 1):
        try:
            result = await run_with_timeout(getRedditData, user_query, search_keywords, timeout=timeout)
            return result
        except Exception as e:
            print(f"Attempt {attempt + 1} failed with error: {e}")
            await delete_files(file_names)  # Delete created files
            if attempt == retries:
                raise Exception("Process failed after retries.") from e

async def getRedditData(user_query, search_keywords):
    unique_list = get_microseconds_list()
    successful_steps = []
    start_time = time.time()
    fileNames = []

    def log_step_time(step_name, start_time, success=True, error=None):
        elapsed = time.time() - start_time
        if success:
            print(f"{step_name} completed successfully in {elapsed:.2f} seconds.")
        else:
            print(f"{step_name} failed in {elapsed:.2f} seconds. Error: {error}")

    # Step 1: Get search post data
    try:
        step_start = time.time()
        with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
            futures = []
            count = 0
            for i in range(len(search_keywords)):
                future = executor.submit(getSearchPostData, search_keyword=search_keywords[i], index=unique_list[i], position=i)
                futures.append(future)

                if len(futures) == 3:
                    for future in concurrent.futures.as_completed(futures):
                        result = future.result()
                        if result:
                            fileNames.append(f"posts_data_{result}.csv")
                            successful_steps.append(('getSearchPostData', count))
                            count += 1
                    futures = []

            if futures:
                for future in concurrent.futures.as_completed(futures):
                    result = future.result()
                    if result:
                        fileNames.append(f"posts_data_{result}.csv")
                        successful_steps.append(('getSearchPostData', count))
                        count += 1
        log_step_time("getSearchPostData", step_start)
    except Exception as e:
        log_step_time("getSearchPostData", step_start, success=False, error=e)

    # Step 2: Get final data
    try:
        step_start = time.time()
        res = getFinalData(user_query=user_query, filesNames=fileNames)
        if res is True:
            successful_steps.append(('getFinalData'))
        log_step_time("getFinalData", step_start)
    except Exception as e:
        log_step_time("getFinalData", step_start, success=False, error=e)

    # Step 3: Get post comments
    try:
        step_start = time.time()
        await getPostComments(file_name=fileNames[0])
        successful_steps.append(('getPostComments',))
        log_step_time("getPostComments", step_start)
    except Exception as e:
        log_step_time("getPostComments", step_start, success=False, error=e)
    reddit_time = time.time() - start_time
    start_time = time.time()
    # Step 4: Get sentiment of post comments
    try:
        step_start = time.time()
        sentiment_instance = SentimentAnalysis()
        sentiment_instance.generate_sentiment_and_emotion_from_data(fileName=fileNames[0])
        successful_steps.append(('getPostSentiment',))
        log_step_time("getPostSentiment", step_start)
    except Exception as e:
        log_step_time("getPostSentiment", step_start, success=False, error=e)
    sentiment_time = time.time()-start_time
    return {
        "fileName": fileNames[0] if fileNames else None,
        'reddit_data':reddit_time,
        'sentiment_data':sentiment_time,
        "fileUniqueId": str(unique_list[0]) if unique_list else None,
        "successful_steps": successful_steps,
    }