nextAnalytics / services /priority_queue.py
honey234's picture
updated backend
d1fa8b0
# import heapq
# import asyncio
# # Priority mapping for user types
# PRIORITY_MAP = {
# "paid": 1,
# "logged_in": 2,
# "not_logged_in": 3,
# }
# # Task class to manage requests
# class Task:
# def __init__(self, user, user_status, topic, category, tag):
# self.user = user
# self.priority = PRIORITY_MAP[user_status]
# self.user_status = user_status
# self.topic = topic
# self.category = category
# self.tag = tag
# def __lt__(self, other):
# return self.priority < other.priority
# def __repr__(self):
# return f"Task(user={self.user}, priority={self.priority}, tag={self.tag}, topic={self.topic})"
# # Queues for scraping and analysis
# scraping_queue = []
# analysis_queue = []
# # Dictionary to cache scraping results
# scraping_results = {}
# # Function to simulate scraping
# async def perform_scraping(task):
# print(f"Scraping data for {task}...")
# await asyncio.sleep(3) # Simulate scraping delay
# filename = f"{task.topic}_scraped_data.json"
# scraping_results[task.topic] = filename # Cache the result
# print(f"Scraping complete for {task}. Saved as {filename}.")
# return filename
# # Function to simulate analysis
# async def perform_analysis(task, filename):
# print(f"Analyzing {task} with data from {filename}...")
# await asyncio.sleep(5) # Simulate analysis delay
# print(f"Analysis complete for {task}.")
# # Background worker for scraping
# async def scraping_worker():
# while True:
# if scraping_queue:
# task = heapq.heappop(scraping_queue)
# if task.topic not in scraping_results: # Perform scraping only if not already done
# filename = await perform_scraping(task)
# heapq.heappush(analysis_queue, (task, filename))
# else:
# filename = scraping_results[task.topic]
# heapq.heappush(analysis_queue, (task, filename))
# else:
# await asyncio.sleep(1)
# # Background worker for analysis
# async def analysis_worker():
# while True:
# if analysis_queue:
# task, filename = heapq.heappop(analysis_queue)
# await perform_analysis(task, filename)
# else:
# await asyncio.sleep(1)
# # Function to add a task to the scraping queue
# async def add_task(user, user_status, topic, category, tags):
# for tag in tags:
# task = Task(user, user_status, topic, category, tag)
# heapq.heappush(scraping_queue, task)
# print(f"Task added to scraping queue: {task}")
# # Main function for testing
# async def initPriorityQueue():
# # Start workers
# asyncio.create_task(scraping_worker())
# asyncio.create_task(analysis_worker())
# # # Add tasks
# # await add_task("User1", "not_logged_in", "AI in education", "tools", ["pain_point"])
# # await asyncio.sleep(1) # Simulate delay
# # await add_task("User2", "paid", "E-commerce trends", "Amazon", ["competitor"])
# # await asyncio.sleep(1) # Simulate delay
# # await add_task("User3", "logged_in", "Healthcare AI", "devices", ["pain_point", "competitor"])
# # # Keep the program running
# # while True:
# # await asyncio.sleep(1)