honey234 commited on
Commit
213206a
·
1 Parent(s): e56eed8

fixed bugs and added retry logics

Browse files
apis/reddit_apis.py CHANGED
@@ -9,7 +9,7 @@ from models.pain_point_model import PainPointAnalysisModel
9
  from models.reddit_models import RedditPostDataModel
10
  from models.session_model import InputInfoModel
11
  from reddit.reddit_competitor_analysis import getCompetitorAnalysisData
12
- from reddit.reddit_functions import getRedditData
13
  from reddit.reddit_gemini import getKeywords
14
  from reddit.reddit_pain_point_analysis import pain_point_analysis
15
  from reddit.reddit_utils import reddit_services_names
@@ -84,7 +84,8 @@ async def getRedditPostsData(request: RedditPostDataModel):
84
  if not search_keywords:
85
  raise HTTPException(status_code=400, detail="Search keywords must not be empty")
86
  print("user_query",user_query,"search_keywords",search_keywords)
87
- result = await getRedditData(user_query=user_query, search_keywords=search_keywords)
 
88
  return result
89
  except Exception as e:
90
  raise HTTPException(status_code=500, detail=str(f"Failed to run getRedditPostsData : {e}"))
@@ -163,7 +164,7 @@ async def analyzeData(inputData:InputInfoModel,user_session:dict):
163
  try:
164
  keywords = getKeywords(user_query=inputData.query)
165
 
166
- reddit_data_result = await getRedditData(user_query=keywords['query'], search_keywords=keywords['top_3_combinations'])
167
  update_user_session(user_session=user_session,session_info=session_info_result,process_info=process_info)
168
 
169
  services_result,session_info_result = await getServices(
 
9
  from models.reddit_models import RedditPostDataModel
10
  from models.session_model import InputInfoModel
11
  from reddit.reddit_competitor_analysis import getCompetitorAnalysisData
12
+ from reddit.reddit_functions import getRedditData_with_timeout
13
  from reddit.reddit_gemini import getKeywords
14
  from reddit.reddit_pain_point_analysis import pain_point_analysis
15
  from reddit.reddit_utils import reddit_services_names
 
84
  if not search_keywords:
85
  raise HTTPException(status_code=400, detail="Search keywords must not be empty")
86
  print("user_query",user_query,"search_keywords",search_keywords)
87
+ result = await getRedditData_with_timeout(user_query=user_query, search_keywords=search_keywords)
88
+ print('getRedditPostsData: ', result)
89
  return result
90
  except Exception as e:
91
  raise HTTPException(status_code=500, detail=str(f"Failed to run getRedditPostsData : {e}"))
 
164
  try:
165
  keywords = getKeywords(user_query=inputData.query)
166
 
167
+ reddit_data_result = await getRedditData_with_timeout(user_query=keywords['query'], search_keywords=keywords['top_3_combinations'])
168
  update_user_session(user_session=user_session,session_info=session_info_result,process_info=process_info)
169
 
170
  services_result,session_info_result = await getServices(
reddit/prompts.py CHANGED
@@ -112,7 +112,7 @@ def featureAnalysisPrompt():
112
 
113
  def getPainPointAnalysisPrompt(user_query):
114
  return f"""
115
- Analyze the file_with_sentiment.csv data of Reddit posts for the user query = "{user_query}" to perform **pain point analysis**. Use the categories derived internally for the analysis, but do not return them. Focus only on the detailed pain point analysis results.
116
 
117
  Return the response in the **JSON format** provided below, and include data for all categories identified during your internal process. Ensure your response adheres strictly to this structure and **do not include any intermediate data or steps**.
118
 
 
112
 
113
  def getPainPointAnalysisPrompt(user_query):
114
  return f"""
115
+ Analyze the given csv data of Reddit posts for the user query = "{user_query}" to perform **pain point analysis**. Use the categories derived internally for the analysis, but do not return them. Focus only on the detailed pain point analysis results.
116
 
117
  Return the response in the **JSON format** provided below, and include data for all categories identified during your internal process. Ensure your response adheres strictly to this structure and **do not include any intermediate data or steps**.
118
 
reddit/reddit_competitor_analysis.py CHANGED
@@ -135,6 +135,7 @@ async def getPostDataofCompetitor(fileName, user_query):
135
  unique_list = get_microseconds_list(length=len(df))
136
  actual_list = []
137
  count=0
 
138
  # Use ThreadPoolExecutor to run tasks concurrently
139
  with concurrent.futures.ThreadPoolExecutor(max_workers=len(scraper_ant_keys)) as executor:
140
  futures = []
@@ -152,7 +153,8 @@ async def getPostDataofCompetitor(fileName, user_query):
152
  result = future.result()
153
  if result is not None:
154
  actual_list.append(result)
155
- count+=1
 
156
  futures = []
157
 
158
  if futures:
@@ -160,7 +162,8 @@ async def getPostDataofCompetitor(fileName, user_query):
160
  result = future.result()
161
  if result is not None:
162
  actual_list.append(result)
163
- count+=1
 
164
 
165
  print("Fetched data for competitors")
166
  fileNames = [f"posts_data_{actual_list[i]}.csv" for i in range(len(actual_list))]
@@ -186,7 +189,7 @@ async def getPostDataofCompetitor(fileName, user_query):
186
  )
187
 
188
  # # Proceed with preprocessing
189
- result = preprocessingCompetitorsData(user_query=user_query, fileNames=fileNames)
190
  return result
191
  except Exception as e:
192
  traceback.print_exc()
@@ -195,20 +198,21 @@ async def getPostDataofCompetitor(fileName, user_query):
195
  return {'details': 'No data found'}
196
 
197
 
198
- def preprocessingCompetitorsData(user_query,fileNames):
199
  c=0
200
  competitors_json_data = []
201
  try:
202
  for i in range(len(fileNames)):
203
  if c==6:break
204
  print(f"Processing file {fileNames[i]}")
 
205
  json_data = getCompetitorAnalysisReport(user_query=user_query,fileName=fileNames[i],count=c)
206
  c+=1
207
  # if json_data does not contain "details" field, then only save the json
208
  if "details" not in json_data.keys():
209
  print("Competitor Analysis Report",f"competitor_analysis_report_{fileNames[i]}.json")
210
  competitors_json_data.append(json_data)
211
-
212
 
213
  for file_path in fileNames:
214
  # Check if the file exists before attempting to delete
@@ -222,56 +226,52 @@ def preprocessingCompetitorsData(user_query,fileNames):
222
  traceback.print_exc()
223
  return competitors_json_data
224
  def getCompetitorAnalysisReport(user_query,fileName,count=0):
225
- try:
226
- prompt = getCompetitorPrompt(user_query=user_query)
227
- api_key_map = {
228
- 0: api_key5,
229
- 1: api_key6,
230
- 2: api_key7,
231
- 3: api_key8,
232
- 4: api_key9,
233
- 5: api_key10
234
- }
235
 
236
- selected_api_key = api_key_map.get(count, api_key8) # Default to api_key8 if count > 5
237
- genai.configure(api_key=selected_api_key)
238
- data = getModelAndGenerationConfigCommon(fileName=fileName,modelName='gemini-2.0-flash-exp')
239
- model = data[0]
240
- chat_session = model.start_chat(
241
- history=[
242
- {
243
- "role": "user",
244
- "parts": [
245
- data[1],
246
- prompt
247
- ],
248
- }
249
- ]
250
- )
251
 
252
 
 
 
 
 
 
 
 
253
  try:
 
254
  response = chat_session.send_message("give your last response of competitor analysis")
255
  data = response.text
256
  json_data =json.loads(data)
257
- print("competitor analysis done for ",user_query)
258
  return json_data
259
- except:
260
- try:
261
- # retry
262
- response = chat_session.send_message("give your last response of competitor analysis")
263
- data = response.text
264
- json_data =json.loads(data)
265
- print("retry competitor analysis done for ",user_query)
266
- return json_data
267
- except Exception as e:
268
- print(f"competitor analysis error {api_key_map[count]}",str(e))
269
- traceback.print_exc()
270
- return {"details": str(e)}
271
- except Exception as e:
272
- print(f"competitor analysis error {api_key_map[count]}",str(e))
273
- traceback.print_exc()
274
- return {"details": str(e)}
275
  async def getCompetitorAnalysisData(user_query,fileName):
276
  start_time = time.time()
277
 
 
135
  unique_list = get_microseconds_list(length=len(df))
136
  actual_list = []
137
  count=0
138
+ competitor_names = []
139
  # Use ThreadPoolExecutor to run tasks concurrently
140
  with concurrent.futures.ThreadPoolExecutor(max_workers=len(scraper_ant_keys)) as executor:
141
  futures = []
 
153
  result = future.result()
154
  if result is not None:
155
  actual_list.append(result)
156
+ competitor_names.append(df.iloc[count]['name'])
157
+ count+=1
158
  futures = []
159
 
160
  if futures:
 
162
  result = future.result()
163
  if result is not None:
164
  actual_list.append(result)
165
+ competitor_names.append(df.iloc[count]['name'])
166
+ count+=1
167
 
168
  print("Fetched data for competitors")
169
  fileNames = [f"posts_data_{actual_list[i]}.csv" for i in range(len(actual_list))]
 
189
  )
190
 
191
  # # Proceed with preprocessing
192
+ result = preprocessingCompetitorsData(user_query=user_query, fileNames=fileNames,competitor_names=competitor_names)
193
  return result
194
  except Exception as e:
195
  traceback.print_exc()
 
198
  return {'details': 'No data found'}
199
 
200
 
201
+ def preprocessingCompetitorsData(user_query,fileNames,competitor_names):
202
  c=0
203
  competitors_json_data = []
204
  try:
205
  for i in range(len(fileNames)):
206
  if c==6:break
207
  print(f"Processing file {fileNames[i]}")
208
+ print('competitor NAme ', competitor_names[i])
209
  json_data = getCompetitorAnalysisReport(user_query=user_query,fileName=fileNames[i],count=c)
210
  c+=1
211
  # if json_data does not contain "details" field, then only save the json
212
  if "details" not in json_data.keys():
213
  print("Competitor Analysis Report",f"competitor_analysis_report_{fileNames[i]}.json")
214
  competitors_json_data.append(json_data)
215
+ print('competitor Analysis success for ', competitor_names[i])
216
 
217
  for file_path in fileNames:
218
  # Check if the file exists before attempting to delete
 
226
  traceback.print_exc()
227
  return competitors_json_data
228
  def getCompetitorAnalysisReport(user_query,fileName,count=0):
229
+ prompt = getCompetitorPrompt(user_query=user_query)
230
+ api_key_map = {
231
+ 0: api_key5,
232
+ 1: api_key6,
233
+ 2: api_key7,
234
+ 3: api_key8,
235
+ 4: api_key9,
236
+ 5: api_key10
237
+ }
 
238
 
239
+ selected_api_key = api_key_map.get(count, api_key8) # Default to api_key8 if count > 5
240
+ genai.configure(api_key=selected_api_key)
241
+ data = getModelAndGenerationConfigCommon(fileName=fileName,modelName='gemini-2.0-flash-exp')
242
+ model = data[0]
243
+ chat_session = model.start_chat(
244
+ history=[
245
+ {
246
+ "role": "user",
247
+ "parts": [
248
+ data[1],
249
+ prompt
250
+ ],
251
+ }
252
+ ]
253
+ )
254
 
255
 
256
+ try:
257
+ response = chat_session.send_message("give your last response of competitor analysis")
258
+ data = response.text
259
+ json_data =json.loads(data)
260
+ print("competitor analysis done for ",user_query)
261
+ return json_data
262
+ except:
263
  try:
264
+ # retry
265
  response = chat_session.send_message("give your last response of competitor analysis")
266
  data = response.text
267
  json_data =json.loads(data)
268
+ print("retry competitor analysis done for ",user_query)
269
  return json_data
270
+ except Exception as e:
271
+ print(f"competitor analysis error {api_key_map[count]}",str(e))
272
+ traceback.print_exc()
273
+ return {"details": str(e)}
274
+
 
 
 
 
 
 
 
 
 
 
 
275
  async def getCompetitorAnalysisData(user_query,fileName):
276
  start_time = time.time()
277
 
reddit/reddit_functions.py CHANGED
@@ -5,82 +5,119 @@ from reddit.reddit_sentiment_analysis import SentimentAnalysis
5
  from reddit.reddit_utils import get_microseconds_list
6
  from reddit.scraping import getPostComments, getSearchPostData
7
  import time
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8
 
9
  async def getRedditData(user_query, search_keywords):
10
  unique_list = get_microseconds_list()
11
  successful_steps = []
12
-
13
- # Record the start time
14
  start_time = time.time()
15
- fileNames=[]
 
 
 
 
 
 
 
 
16
  # Step 1: Get search post data
17
  try:
18
- # Use ThreadPoolExecutor to run tasks concurrently
19
  with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
20
  futures = []
21
- count =0
22
- # Submit tasks in batches of 3
23
  for i in range(len(search_keywords)):
24
- print(f'Running task {i}')
25
-
26
  future = executor.submit(getSearchPostData, search_keyword=search_keywords[i], index=unique_list[i], position=i)
27
  futures.append(future)
28
 
29
  if len(futures) == 3:
30
  for future in concurrent.futures.as_completed(futures):
31
  result = future.result()
32
- if result is not None:
33
  fileNames.append(f"posts_data_{result}.csv")
34
- successful_steps.append(('getSearchPostData', count)) # Mark this step as successful
35
- count+=1
36
  futures = []
37
 
38
  if futures:
39
  for future in concurrent.futures.as_completed(futures):
40
  result = future.result()
41
- if result is not None:
42
  fileNames.append(f"posts_data_{result}.csv")
43
- successful_steps.append(('getSearchPostData', count)) # Mark this step as successful
44
- count+=1
 
45
  except Exception as e:
46
- print(f"Failed at getSearchPostData: {e}")
47
 
48
- # Step 3: Get final data
49
  try:
50
- print("fileNames", fileNames)
51
- res=getFinalData(user_query=user_query, filesNames=fileNames)
52
  if res is True:
53
- successful_steps.append(('getFinalData')) # Mark this step as successful
 
54
  except Exception as e:
55
- print(f"Failed at getFinalData: {e}")
56
 
57
- # Step 4: Get post comments
58
  try:
 
59
  await getPostComments(file_name=fileNames[0])
60
- successful_steps.append(('getPostComments',)) # Mark this step as successful
 
61
  except Exception as e:
62
- print(f"Failed at getPostComments: {e}")
63
-
64
- # Record the time just after getting post comments
65
- time_after_comments = time.time()
66
- elapsed_time_after_comments = time_after_comments - start_time
67
-
68
- # Start timer for sentiment file
69
  start_time = time.time()
70
- # Step 5: Get sentiment of post comments
71
  try:
 
72
  sentiment_instance = SentimentAnalysis()
73
  sentiment_instance.generate_sentiment_and_emotion_from_data(fileName=fileNames[0])
74
- successful_steps.append(('getPostSentiment',)) # Mark this step as successful
 
75
  except Exception as e:
76
- print(f"Failed at getPostSentiment: {e}")
77
- time_after_sentiment = time.time()
78
-
79
- # Optionally, return the successful steps for logging or further processing
80
  return {
81
- "fileName":fileNames[0],
82
- "fileUniqueId": str(unique_list[0]),
 
 
83
  "successful_steps": successful_steps,
84
- "reddit_data": elapsed_time_after_comments,
85
- "sentiment_data": time_after_sentiment - start_time,
86
  }
 
5
  from reddit.reddit_utils import get_microseconds_list
6
  from reddit.scraping import getPostComments, getSearchPostData
7
  import time
8
+ import asyncio
9
+ import time
10
+ import os
11
+ import concurrent.futures
12
+
13
+ async def delete_files(file_names):
14
+ """Helper function to delete created files."""
15
+ for file_name in file_names:
16
+ try:
17
+ if os.path.exists(file_name):
18
+ os.remove(file_name)
19
+ print(f"Deleted file: {file_name}")
20
+ except Exception as e:
21
+ print(f"Error deleting file {file_name}: {e}")
22
+
23
+ async def run_with_timeout(task_func, *args, timeout=300):
24
+ """Runs a task with a timeout."""
25
+ try:
26
+ return await asyncio.wait_for(task_func(*args), timeout=timeout)
27
+ except asyncio.TimeoutError:
28
+ print(f"Task exceeded {timeout} seconds timeout.")
29
+ raise
30
+
31
+ async def getRedditData_with_timeout(user_query, search_keywords, retries=1, timeout=300):
32
+ """Retries the getRedditData process with a timeout."""
33
+ file_names = []
34
+ for attempt in range(retries + 1):
35
+ try:
36
+ result = await run_with_timeout(getRedditData, user_query, search_keywords, timeout=timeout)
37
+ return result
38
+ except Exception as e:
39
+ print(f"Attempt {attempt + 1} failed with error: {e}")
40
+ await delete_files(file_names) # Delete created files
41
+ if attempt == retries:
42
+ raise Exception("Process failed after retries.") from e
43
 
44
  async def getRedditData(user_query, search_keywords):
45
  unique_list = get_microseconds_list()
46
  successful_steps = []
 
 
47
  start_time = time.time()
48
+ fileNames = []
49
+
50
+ def log_step_time(step_name, start_time, success=True, error=None):
51
+ elapsed = time.time() - start_time
52
+ if success:
53
+ print(f"{step_name} completed successfully in {elapsed:.2f} seconds.")
54
+ else:
55
+ print(f"{step_name} failed in {elapsed:.2f} seconds. Error: {error}")
56
+
57
  # Step 1: Get search post data
58
  try:
59
+ step_start = time.time()
60
  with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
61
  futures = []
62
+ count = 0
 
63
  for i in range(len(search_keywords)):
 
 
64
  future = executor.submit(getSearchPostData, search_keyword=search_keywords[i], index=unique_list[i], position=i)
65
  futures.append(future)
66
 
67
  if len(futures) == 3:
68
  for future in concurrent.futures.as_completed(futures):
69
  result = future.result()
70
+ if result:
71
  fileNames.append(f"posts_data_{result}.csv")
72
+ successful_steps.append(('getSearchPostData', count))
73
+ count += 1
74
  futures = []
75
 
76
  if futures:
77
  for future in concurrent.futures.as_completed(futures):
78
  result = future.result()
79
+ if result:
80
  fileNames.append(f"posts_data_{result}.csv")
81
+ successful_steps.append(('getSearchPostData', count))
82
+ count += 1
83
+ log_step_time("getSearchPostData", step_start)
84
  except Exception as e:
85
+ log_step_time("getSearchPostData", step_start, success=False, error=e)
86
 
87
+ # Step 2: Get final data
88
  try:
89
+ step_start = time.time()
90
+ res = getFinalData(user_query=user_query, filesNames=fileNames)
91
  if res is True:
92
+ successful_steps.append(('getFinalData'))
93
+ log_step_time("getFinalData", step_start)
94
  except Exception as e:
95
+ log_step_time("getFinalData", step_start, success=False, error=e)
96
 
97
+ # Step 3: Get post comments
98
  try:
99
+ step_start = time.time()
100
  await getPostComments(file_name=fileNames[0])
101
+ successful_steps.append(('getPostComments',))
102
+ log_step_time("getPostComments", step_start)
103
  except Exception as e:
104
+ log_step_time("getPostComments", step_start, success=False, error=e)
105
+ reddit_time = time.time() - start_time
 
 
 
 
 
106
  start_time = time.time()
107
+ # Step 4: Get sentiment of post comments
108
  try:
109
+ step_start = time.time()
110
  sentiment_instance = SentimentAnalysis()
111
  sentiment_instance.generate_sentiment_and_emotion_from_data(fileName=fileNames[0])
112
+ successful_steps.append(('getPostSentiment',))
113
+ log_step_time("getPostSentiment", step_start)
114
  except Exception as e:
115
+ log_step_time("getPostSentiment", step_start, success=False, error=e)
116
+ sentiment_time = time.time()-start_time
 
 
117
  return {
118
+ "fileName": fileNames[0] if fileNames else None,
119
+ 'reddit_data':reddit_time,
120
+ 'sentiment_data':sentiment_time,
121
+ "fileUniqueId": str(unique_list[0]) if unique_list else None,
122
  "successful_steps": successful_steps,
 
 
123
  }
reddit/reddit_gemini.py CHANGED
@@ -7,7 +7,7 @@ from reddit.prompts import getKeywordsPrompt
7
 
8
  def getKeywords(user_query: str):
9
  prompt = getKeywordsPrompt(user_query)
10
- model = genai.GenerativeModel("gemini-exp-1114")
11
 
12
  generation_config = genai.GenerationConfig(response_mime_type="application/json")
13
  try:
 
7
 
8
  def getKeywords(user_query: str):
9
  prompt = getKeywordsPrompt(user_query)
10
+ model = genai.GenerativeModel("gemini-2.0-flash-exp")
11
 
12
  generation_config = genai.GenerationConfig(response_mime_type="application/json")
13
  try:
reddit/scraping.py CHANGED
@@ -302,7 +302,6 @@ async def getPostComments(file_name, is_for_competitor_analysis=False, index=0):
302
  if comments_json is not None:
303
  for i in range(len(data)):
304
  if comments_json[i] is not None:
305
- print('Comment', comments_json[i]['index'], i)
306
  data.at[comments_json[i]['index'], 'comments'] = {'comments':comments_json[i]['comments']}
307
  data.at[comments_json[i]['index'], 'descriptions'] = comments_json[i]['description']
308
  else:
 
302
  if comments_json is not None:
303
  for i in range(len(data)):
304
  if comments_json[i] is not None:
 
305
  data.at[comments_json[i]['index'], 'comments'] = {'comments':comments_json[i]['comments']}
306
  data.at[comments_json[i]['index'], 'descriptions'] = comments_json[i]['description']
307
  else: