Spaces:
Runtime error
Runtime error
| import asyncio | |
| import json | |
| import os | |
| import threading | |
| import time | |
| import traceback | |
| import google.generativeai as genai | |
| import pandas as pd | |
| import concurrent.futures | |
| from reddit.prompts import getCompetitorPrompt, getTop10CompetitorPrompt | |
| from reddit.reddit_utils import get_microseconds_list | |
| from reddit.scraping import getPostComments, getSearchPostData | |
| from reddit.reddit_gemini import getModelAndGenerationConfigCommon | |
| from reddit.load_env import api_key3,api_key4,api_key5,api_key6,api_key7,api_key8,api_key9,api_key10,api_key11,spare_api_key3,spare_api_key4,spare_api_key1,environment, scraper_ant_keys, reddit_clients | |
| def getCompetitorNames(user_query ): #model, generation_config | |
| prompt=f"""Extract a list of product names, alternatives, and competitors relevant to the query: {user_query}. Ensure that the results focus on tools, platforms, or services explicitly aligned with the domain and purpose of the query. Avoid including general or loosely related products unless they directly offer features tailored to the query's intent. | |
| Additionally, provide the platform(s) (e.g., web, apps, integrations) on which each competitor operates and categorize their functionality. Include a frequency count indicating the number of times each entry is mentioned or relevant to the query. Also, aggregate the total frequency of each platform across all entries. Also give provide popularity score for each competitor out of 100. | |
| give top 6 competitors details only with name of competitor as noun. | |
| return in given json format only: | |
| {{ | |
| "competitors":[{{"name":"","platform":[],"category":"","count":number,"popularity":number}}], | |
| "platforms":[{{"platform":name,count:number}}] | |
| }}""" | |
| generation_config = { | |
| "temperature": 1, | |
| "top_p": 0.95, | |
| "top_k": 40, | |
| "max_output_tokens": 8192, | |
| "response_mime_type": "application/json", | |
| } | |
| model = genai.GenerativeModel( | |
| model_name="gemini-1.5-pro-002" if environment=="PRODUCTION" else "gemini-2.0-flash-exp", | |
| generation_config=generation_config, | |
| ) | |
| try: | |
| response = model.generate_content(prompt) | |
| data = response.text | |
| print("getCompetitorNames",data) | |
| return json.loads(data) | |
| except: | |
| try: | |
| # retry | |
| response = model.generate_content(prompt) | |
| data = response.text | |
| print("retry getCompetitorNames",data) | |
| return json.loads(data) | |
| except Exception as e: | |
| traceback.print_exc() | |
| raise Exception(f"Error occurred: {str(e )}. Retrying...") | |
| def getCompetitorNamesFromReddit(user_query, fileName, max_retries=1): # model, generation_config | |
| prompt = f"""Extract a list of product names, alternatives, and competitors relevant to the query: {user_query} from the given csv data. Ensure that the results focus on tools, platforms, or services explicitly aligned with the domain and purpose of the query and do not include very general competitors into the list which are not directly related to user query use case or intent. | |
| Additionally, provide the platform(s) (e.g., web, apps, integrations) on which each competitor operates and categorize their functionality. Category should be string. Include a frequency count indicating the number of times each entry is mentioned or relevant to the query. Also, aggregate the total frequency of each platform across all entries. Also give provide popularity score for each competitor out of 100. | |
| give top 6 competitors details only. | |
| return in given json format only: | |
| {{ | |
| "competitors":[{{"name":"","platform":[],"category":"","count":number,"popularity":number}}], | |
| "platforms":[{{"platform":name,count:number}}] | |
| }}""" | |
| data = getModelAndGenerationConfigCommon(fileName=fileName,modelName='gemini-exp-1206') | |
| model = data[0] | |
| response = None | |
| def send_message_with_timeout(): | |
| nonlocal response | |
| chat_session = model.start_chat( | |
| history=[ | |
| { | |
| "role": "user", | |
| "parts": [ | |
| data[1], | |
| prompt | |
| ], | |
| } | |
| ] | |
| ) | |
| response = chat_session.send_message("give your last response of competitor names") | |
| retries = 0 | |
| try: | |
| while retries <= max_retries: | |
| try: | |
| message_thread = threading.Thread(target=send_message_with_timeout) | |
| message_thread.start() | |
| # Wait for the message thread to complete with a timeout of 50 seconds | |
| message_thread.join(timeout=100) | |
| if message_thread.is_alive(): | |
| print("Error: The response did not arrive within 100 seconds") | |
| retries+=1 | |
| # Optionally, you can stop the thread if it's still running (but it's tricky and not always safe) | |
| # message_thread._stop() | |
| else: | |
| data = response.text | |
| print("getCompetitorNames", data) | |
| return json.loads(data) | |
| except concurrent.futures.TimeoutError: | |
| print(f"Timeout occurred on attempt {retries + 1}. Retrying...") | |
| except Exception as e: | |
| print(f"Error occurred on attempt {retries + 1}: {str(e)}. Retrying...") | |
| retries += 1 | |
| time.sleep(2) # Small delay between retries | |
| # If all retries fail | |
| print("All retry attempts failed.") | |
| return {"details": "Request failed after multiple attempts"} | |
| except Exception as e: | |
| traceback.print_exc() | |
| raise Exception(f"Error occurred on attempt {retries + 1}: {str(e )}. Retrying...") | |
| def getTop10Competitors(user_query,reddit_data,gemini_data): | |
| prompt = getTop10CompetitorPrompt(user_query=user_query,reddit_data=reddit_data,gemini_data=gemini_data) | |
| model = genai.GenerativeModel("gemini-1.5-pro-002" if environment=="PRODUCTION" else "gemini-2.0-flash-exp") | |
| generation_config = genai.GenerationConfig(response_mime_type="application/json") | |
| try: | |
| response = model.generate_content(prompt, generation_config=generation_config) # Adjust if the library supports async | |
| data = response.text | |
| print("getTop10Competitors:", data) | |
| return json.loads(data) | |
| except Exception as e: | |
| traceback.print_exc() | |
| raise Exception(f"Error while fetching getTop10Competitors: {str(e)}") | |
| async def getPostDataofCompetitor(fileName, user_query): | |
| df = fileName | |
| unique_list = get_microseconds_list(length=len(df)) | |
| actual_list = [] | |
| count=0 | |
| competitor_names = [] | |
| # Use ThreadPoolExecutor to run tasks concurrently | |
| with concurrent.futures.ThreadPoolExecutor(max_workers=len(scraper_ant_keys)) as executor: | |
| futures = [] | |
| # Submit tasks in batches of 3 | |
| for i in range(len(df)): | |
| print(f'Running task {i}') | |
| future = executor.submit(getSearchPostData, f"{df.iloc[i]['name']} {df.iloc[i]['category']}", | |
| unique_list[i], df.iloc[i]['name'], forCompetitorAnalysis=True, position=i) | |
| futures.append(future) | |
| if len(futures) == len(scraper_ant_keys): | |
| for future in concurrent.futures.as_completed(futures): | |
| result = future.result() | |
| if result is not None: | |
| actual_list.append(result) | |
| competitor_names.append(df.iloc[count]['name']) | |
| count+=1 | |
| futures = [] | |
| if futures: | |
| for future in concurrent.futures.as_completed(futures): | |
| result = future.result() | |
| if result is not None: | |
| actual_list.append(result) | |
| competitor_names.append(df.iloc[count]['name']) | |
| count+=1 | |
| print("Fetched data for competitors") | |
| fileNames = [f"posts_data_{actual_list[i]}.csv" for i in range(len(actual_list))] | |
| if len(fileNames) != 0: | |
| try: | |
| # Semaphore to limit concurrent tasks | |
| semaphore = asyncio.Semaphore(3 if len(fileNames)>3 else 1) | |
| # Async function to call getPostComments with semaphore | |
| async def fetch_comments_with_limit(file_name, index): | |
| async with semaphore: | |
| await getPostComments(file_name=file_name, is_for_competitor_analysis=True, index=index%len(reddit_clients)) | |
| # Chunk the fileNames list into batches of 3 | |
| batches = [fileNames[i:i + 3] for i in range(0, len(fileNames), 3)] | |
| # Process each batch with a 5-second wait after completion | |
| for batch_index, batch in enumerate(batches): | |
| print(f"Processing batch {batch_index + 1} with files: {batch}") | |
| await asyncio.gather( | |
| *[fetch_comments_with_limit(file_name=batch[i], index=i + 1) for i in range(len(batch))] | |
| ) | |
| # # Proceed with preprocessing | |
| result = preprocessingCompetitorsData(user_query=user_query, fileNames=fileNames,competitor_names=competitor_names) | |
| return result | |
| except Exception as e: | |
| traceback.print_exc() | |
| return {'details': str(e)} | |
| else: | |
| return {'details': 'No data found'} | |
| def preprocessingCompetitorsData(user_query,fileNames,competitor_names): | |
| c=0 | |
| competitors_json_data = [] | |
| try: | |
| for i in range(len(fileNames)): | |
| if c==6:break | |
| print(f"Processing file {fileNames[i]}") | |
| print('competitor NAme ', competitor_names[i]) | |
| json_data = getCompetitorAnalysisReport(user_query=user_query,fileName=fileNames[i],count=c) | |
| c+=1 | |
| # if json_data does not contain "details" field, then only save the json | |
| if "details" not in json_data.keys(): | |
| print("Competitor Analysis Report",f"competitor_analysis_report_{fileNames[i]}.json") | |
| competitors_json_data.append(json_data) | |
| print('competitor Analysis success for ', competitor_names[i]) | |
| for file_path in fileNames: | |
| # Check if the file exists before attempting to delete | |
| if os.path.exists(file_path): | |
| os.remove(file_path) | |
| print("File deleted successfully") | |
| else: | |
| print("File does not exist") | |
| return competitors_json_data | |
| except Exception as e: | |
| traceback.print_exc() | |
| return competitors_json_data | |
| def getCompetitorAnalysisReport(user_query,fileName,count=0): | |
| prompt = getCompetitorPrompt(user_query=user_query) | |
| api_key_map = { | |
| 0: api_key5, | |
| 1: api_key6, | |
| 2: api_key7, | |
| 3: api_key8, | |
| 4: api_key9, | |
| 5: api_key10 | |
| } | |
| selected_api_key = api_key_map.get(count, api_key8) # Default to api_key8 if count > 5 | |
| genai.configure(api_key=selected_api_key) | |
| data = getModelAndGenerationConfigCommon(fileName=fileName,modelName='gemini-2.0-flash-exp') | |
| model = data[0] | |
| chat_session = model.start_chat( | |
| history=[ | |
| { | |
| "role": "user", | |
| "parts": [ | |
| data[1], | |
| prompt | |
| ], | |
| } | |
| ] | |
| ) | |
| try: | |
| response = chat_session.send_message("give your last response of competitor analysis") | |
| data = response.text | |
| json_data =json.loads(data) | |
| print("competitor analysis done for ",user_query) | |
| return json_data | |
| except: | |
| try: | |
| # retry | |
| response = chat_session.send_message("give your last response of competitor analysis") | |
| data = response.text | |
| json_data =json.loads(data) | |
| print("retry competitor analysis done for ",user_query) | |
| return json_data | |
| except Exception as e: | |
| print(f"competitor analysis error {api_key_map[count]}",str(e)) | |
| traceback.print_exc() | |
| return {"details": str(e)} | |
| async def getCompetitorAnalysisData(user_query,fileName): | |
| start_time = time.time() | |
| genai.configure(api_key=api_key3) | |
| try: | |
| json_data_reddit = getCompetitorNamesFromReddit(user_query=user_query,fileName=fileName) | |
| except Exception as e: | |
| print(f"Error in getCompetitorNamesFromReddit with primary API key 3: {e}. Retrying with spare API key...") | |
| genai.configure(api_key=spare_api_key3) | |
| json_data_reddit = getCompetitorNamesFromReddit(user_query=user_query,fileName=fileName) | |
| genai.configure(api_key=api_key4) | |
| try: | |
| json_data_gemini = getCompetitorNames(user_query=user_query) | |
| except Exception as e: | |
| print(f"Error in getCompetitorNames with primary API key 4: {e}. Retrying with spare API key...") | |
| genai.configure(api_key=spare_api_key4) | |
| json_data_gemini = getCompetitorNames(user_query=user_query) | |
| genai.configure(api_key=api_key11) | |
| try: | |
| data = getTop10Competitors(gemini_data=json_data_gemini,reddit_data=json_data_reddit,user_query=user_query) | |
| except Exception as e: | |
| print(f"Error in getTop10Competitors with primary API key 11: {e}. Retrying with spare API key...") | |
| genai.configure(api_key=spare_api_key1) | |
| data = getTop10Competitors(gemini_data=json_data_gemini,reddit_data=json_data_reddit,user_query=user_query) | |
| df = pd.DataFrame(data["list"]) | |
| competitors_data = await getPostDataofCompetitor(user_query=user_query,fileName=df) | |
| end_time = time.time() | |
| return { | |
| "competitors_data": competitors_data, | |
| "all_competitor_data": data["list"], | |
| 'e_time':end_time - start_time | |
| } |