nextAnalytics / reddit /reddit_competitor_analysis.py
honey234's picture
fixed bugs and added retry logics
213206a
import asyncio
import json
import os
import threading
import time
import traceback
import google.generativeai as genai
import pandas as pd
import concurrent.futures
from reddit.prompts import getCompetitorPrompt, getTop10CompetitorPrompt
from reddit.reddit_utils import get_microseconds_list
from reddit.scraping import getPostComments, getSearchPostData
from reddit.reddit_gemini import getModelAndGenerationConfigCommon
from reddit.load_env import api_key3,api_key4,api_key5,api_key6,api_key7,api_key8,api_key9,api_key10,api_key11,spare_api_key3,spare_api_key4,spare_api_key1,environment, scraper_ant_keys, reddit_clients
def getCompetitorNames(user_query ): #model, generation_config
prompt=f"""Extract a list of product names, alternatives, and competitors relevant to the query: {user_query}. Ensure that the results focus on tools, platforms, or services explicitly aligned with the domain and purpose of the query. Avoid including general or loosely related products unless they directly offer features tailored to the query's intent.
Additionally, provide the platform(s) (e.g., web, apps, integrations) on which each competitor operates and categorize their functionality. Include a frequency count indicating the number of times each entry is mentioned or relevant to the query. Also, aggregate the total frequency of each platform across all entries. Also give provide popularity score for each competitor out of 100.
give top 6 competitors details only with name of competitor as noun.
return in given json format only:
{{
"competitors":[{{"name":"","platform":[],"category":"","count":number,"popularity":number}}],
"platforms":[{{"platform":name,count:number}}]
}}"""
generation_config = {
"temperature": 1,
"top_p": 0.95,
"top_k": 40,
"max_output_tokens": 8192,
"response_mime_type": "application/json",
}
model = genai.GenerativeModel(
model_name="gemini-1.5-pro-002" if environment=="PRODUCTION" else "gemini-2.0-flash-exp",
generation_config=generation_config,
)
try:
response = model.generate_content(prompt)
data = response.text
print("getCompetitorNames",data)
return json.loads(data)
except:
try:
# retry
response = model.generate_content(prompt)
data = response.text
print("retry getCompetitorNames",data)
return json.loads(data)
except Exception as e:
traceback.print_exc()
raise Exception(f"Error occurred: {str(e )}. Retrying...")
def getCompetitorNamesFromReddit(user_query, fileName, max_retries=1): # model, generation_config
prompt = f"""Extract a list of product names, alternatives, and competitors relevant to the query: {user_query} from the given csv data. Ensure that the results focus on tools, platforms, or services explicitly aligned with the domain and purpose of the query and do not include very general competitors into the list which are not directly related to user query use case or intent.
Additionally, provide the platform(s) (e.g., web, apps, integrations) on which each competitor operates and categorize their functionality. Category should be string. Include a frequency count indicating the number of times each entry is mentioned or relevant to the query. Also, aggregate the total frequency of each platform across all entries. Also give provide popularity score for each competitor out of 100.
give top 6 competitors details only.
return in given json format only:
{{
"competitors":[{{"name":"","platform":[],"category":"","count":number,"popularity":number}}],
"platforms":[{{"platform":name,count:number}}]
}}"""
data = getModelAndGenerationConfigCommon(fileName=fileName,modelName='gemini-exp-1206')
model = data[0]
response = None
def send_message_with_timeout():
nonlocal response
chat_session = model.start_chat(
history=[
{
"role": "user",
"parts": [
data[1],
prompt
],
}
]
)
response = chat_session.send_message("give your last response of competitor names")
retries = 0
try:
while retries <= max_retries:
try:
message_thread = threading.Thread(target=send_message_with_timeout)
message_thread.start()
# Wait for the message thread to complete with a timeout of 50 seconds
message_thread.join(timeout=100)
if message_thread.is_alive():
print("Error: The response did not arrive within 100 seconds")
retries+=1
# Optionally, you can stop the thread if it's still running (but it's tricky and not always safe)
# message_thread._stop()
else:
data = response.text
print("getCompetitorNames", data)
return json.loads(data)
except concurrent.futures.TimeoutError:
print(f"Timeout occurred on attempt {retries + 1}. Retrying...")
except Exception as e:
print(f"Error occurred on attempt {retries + 1}: {str(e)}. Retrying...")
retries += 1
time.sleep(2) # Small delay between retries
# If all retries fail
print("All retry attempts failed.")
return {"details": "Request failed after multiple attempts"}
except Exception as e:
traceback.print_exc()
raise Exception(f"Error occurred on attempt {retries + 1}: {str(e )}. Retrying...")
def getTop10Competitors(user_query,reddit_data,gemini_data):
prompt = getTop10CompetitorPrompt(user_query=user_query,reddit_data=reddit_data,gemini_data=gemini_data)
model = genai.GenerativeModel("gemini-1.5-pro-002" if environment=="PRODUCTION" else "gemini-2.0-flash-exp")
generation_config = genai.GenerationConfig(response_mime_type="application/json")
try:
response = model.generate_content(prompt, generation_config=generation_config) # Adjust if the library supports async
data = response.text
print("getTop10Competitors:", data)
return json.loads(data)
except Exception as e:
traceback.print_exc()
raise Exception(f"Error while fetching getTop10Competitors: {str(e)}")
async def getPostDataofCompetitor(fileName, user_query):
df = fileName
unique_list = get_microseconds_list(length=len(df))
actual_list = []
count=0
competitor_names = []
# Use ThreadPoolExecutor to run tasks concurrently
with concurrent.futures.ThreadPoolExecutor(max_workers=len(scraper_ant_keys)) as executor:
futures = []
# Submit tasks in batches of 3
for i in range(len(df)):
print(f'Running task {i}')
future = executor.submit(getSearchPostData, f"{df.iloc[i]['name']} {df.iloc[i]['category']}",
unique_list[i], df.iloc[i]['name'], forCompetitorAnalysis=True, position=i)
futures.append(future)
if len(futures) == len(scraper_ant_keys):
for future in concurrent.futures.as_completed(futures):
result = future.result()
if result is not None:
actual_list.append(result)
competitor_names.append(df.iloc[count]['name'])
count+=1
futures = []
if futures:
for future in concurrent.futures.as_completed(futures):
result = future.result()
if result is not None:
actual_list.append(result)
competitor_names.append(df.iloc[count]['name'])
count+=1
print("Fetched data for competitors")
fileNames = [f"posts_data_{actual_list[i]}.csv" for i in range(len(actual_list))]
if len(fileNames) != 0:
try:
# Semaphore to limit concurrent tasks
semaphore = asyncio.Semaphore(3 if len(fileNames)>3 else 1)
# Async function to call getPostComments with semaphore
async def fetch_comments_with_limit(file_name, index):
async with semaphore:
await getPostComments(file_name=file_name, is_for_competitor_analysis=True, index=index%len(reddit_clients))
# Chunk the fileNames list into batches of 3
batches = [fileNames[i:i + 3] for i in range(0, len(fileNames), 3)]
# Process each batch with a 5-second wait after completion
for batch_index, batch in enumerate(batches):
print(f"Processing batch {batch_index + 1} with files: {batch}")
await asyncio.gather(
*[fetch_comments_with_limit(file_name=batch[i], index=i + 1) for i in range(len(batch))]
)
# # Proceed with preprocessing
result = preprocessingCompetitorsData(user_query=user_query, fileNames=fileNames,competitor_names=competitor_names)
return result
except Exception as e:
traceback.print_exc()
return {'details': str(e)}
else:
return {'details': 'No data found'}
def preprocessingCompetitorsData(user_query,fileNames,competitor_names):
c=0
competitors_json_data = []
try:
for i in range(len(fileNames)):
if c==6:break
print(f"Processing file {fileNames[i]}")
print('competitor NAme ', competitor_names[i])
json_data = getCompetitorAnalysisReport(user_query=user_query,fileName=fileNames[i],count=c)
c+=1
# if json_data does not contain "details" field, then only save the json
if "details" not in json_data.keys():
print("Competitor Analysis Report",f"competitor_analysis_report_{fileNames[i]}.json")
competitors_json_data.append(json_data)
print('competitor Analysis success for ', competitor_names[i])
for file_path in fileNames:
# Check if the file exists before attempting to delete
if os.path.exists(file_path):
os.remove(file_path)
print("File deleted successfully")
else:
print("File does not exist")
return competitors_json_data
except Exception as e:
traceback.print_exc()
return competitors_json_data
def getCompetitorAnalysisReport(user_query,fileName,count=0):
prompt = getCompetitorPrompt(user_query=user_query)
api_key_map = {
0: api_key5,
1: api_key6,
2: api_key7,
3: api_key8,
4: api_key9,
5: api_key10
}
selected_api_key = api_key_map.get(count, api_key8) # Default to api_key8 if count > 5
genai.configure(api_key=selected_api_key)
data = getModelAndGenerationConfigCommon(fileName=fileName,modelName='gemini-2.0-flash-exp')
model = data[0]
chat_session = model.start_chat(
history=[
{
"role": "user",
"parts": [
data[1],
prompt
],
}
]
)
try:
response = chat_session.send_message("give your last response of competitor analysis")
data = response.text
json_data =json.loads(data)
print("competitor analysis done for ",user_query)
return json_data
except:
try:
# retry
response = chat_session.send_message("give your last response of competitor analysis")
data = response.text
json_data =json.loads(data)
print("retry competitor analysis done for ",user_query)
return json_data
except Exception as e:
print(f"competitor analysis error {api_key_map[count]}",str(e))
traceback.print_exc()
return {"details": str(e)}
async def getCompetitorAnalysisData(user_query,fileName):
start_time = time.time()
genai.configure(api_key=api_key3)
try:
json_data_reddit = getCompetitorNamesFromReddit(user_query=user_query,fileName=fileName)
except Exception as e:
print(f"Error in getCompetitorNamesFromReddit with primary API key 3: {e}. Retrying with spare API key...")
genai.configure(api_key=spare_api_key3)
json_data_reddit = getCompetitorNamesFromReddit(user_query=user_query,fileName=fileName)
genai.configure(api_key=api_key4)
try:
json_data_gemini = getCompetitorNames(user_query=user_query)
except Exception as e:
print(f"Error in getCompetitorNames with primary API key 4: {e}. Retrying with spare API key...")
genai.configure(api_key=spare_api_key4)
json_data_gemini = getCompetitorNames(user_query=user_query)
genai.configure(api_key=api_key11)
try:
data = getTop10Competitors(gemini_data=json_data_gemini,reddit_data=json_data_reddit,user_query=user_query)
except Exception as e:
print(f"Error in getTop10Competitors with primary API key 11: {e}. Retrying with spare API key...")
genai.configure(api_key=spare_api_key1)
data = getTop10Competitors(gemini_data=json_data_gemini,reddit_data=json_data_reddit,user_query=user_query)
df = pd.DataFrame(data["list"])
competitors_data = await getPostDataofCompetitor(user_query=user_query,fileName=df)
end_time = time.time()
return {
"competitors_data": competitors_data,
"all_competitor_data": data["list"],
'e_time':end_time - start_time
}