Ashok Kumar Bhati commited on
Commit
7694e9d
·
1 Parent(s): a540238

generate content at delivery time

Browse files
src/controllers/_bot_controller.py CHANGED
@@ -45,7 +45,7 @@ class BotController:
45
  logger.info(f"file id is {file_id}")
46
  async with SlackBotService() as slack_bot_service:
47
  uploadedFile = await slack_bot_service.save_file(
48
- event["team_id"], file
49
  )
50
  async with FileService() as file_service:
51
  file_service.process_file(uploadedFile, user.id)
 
45
  logger.info(f"file id is {file_id}")
46
  async with SlackBotService() as slack_bot_service:
47
  uploadedFile = await slack_bot_service.save_file(
48
+ team, file
49
  )
50
  async with FileService() as file_service:
51
  file_service.process_file(uploadedFile, user.id)
src/controllers/_file_controller.py CHANGED
@@ -14,11 +14,13 @@ class FileController:
14
  logger.info(f"Parsing file: {file.filename}")
15
  try:
16
  async with self.file_service as service:
17
- result = await service.parse_document(file)
 
 
18
  return result
19
  except ValueError as e:
20
  logger.warning(f"Validation error: {str(e)}")
21
  raise HTTPException(status_code=400, detail=str(e))
22
  except Exception as e:
23
  logger.error(f"Error parsing document: {str(e)}")
24
- raise HTTPException(status_code=500, detail=str(e))
 
14
  logger.info(f"Parsing file: {file.filename}")
15
  try:
16
  async with self.file_service as service:
17
+ file_path = await service.save_uploaded_file(file)
18
+ file_name = file.filename
19
+ result = await service.parse_document(file_name, file_path)
20
  return result
21
  except ValueError as e:
22
  logger.warning(f"Validation error: {str(e)}")
23
  raise HTTPException(status_code=400, detail=str(e))
24
  except Exception as e:
25
  logger.error(f"Error parsing document: {str(e)}")
26
+ raise HTTPException(status_code=500, detail=str(e))
src/models/_user.py CHANGED
@@ -21,7 +21,7 @@ class User(Base):
21
  content_delivery_frequency = Column(
22
  Enum(ContentDeliveryFrequency),
23
  nullable=False,
24
- default=ContentDeliveryFrequency.WEEKLY,
25
  )
26
  slack_id = Column(String, nullable=True)
27
  slack_team_id = Column(
 
21
  content_delivery_frequency = Column(
22
  Enum(ContentDeliveryFrequency),
23
  nullable=False,
24
+ default=ContentDeliveryFrequency.DAILY,
25
  )
26
  slack_id = Column(String, nullable=True)
27
  slack_team_id = Column(
src/repositories/_personalized_content_repository.py CHANGED
@@ -45,3 +45,13 @@ class PersonalizedContentRepository(BaseRepository):
45
  if content:
46
  content.delivered_at = datetime.now()
47
  await self.update(content)
 
 
 
 
 
 
 
 
 
 
 
45
  if content:
46
  content.delivered_at = datetime.now()
47
  await self.update(content)
48
+
49
+ async def get_users_last_message(self, user_id: int) -> PersonalizedContent:
50
+ """Get the last message sent to the user"""
51
+ query = (
52
+ select(PersonalizedContent)
53
+ .where(PersonalizedContent.user_id == user_id)
54
+ .order_by(PersonalizedContent.delivered_at.desc())
55
+ )
56
+ result = await self.execute(query)
57
+ return result.scalar_one_or_none()
src/services/_content_delivery_service.py CHANGED
@@ -1,4 +1,4 @@
1
- from datetime import datetime
2
  from collections import defaultdict
3
  from typing import List
4
 
@@ -19,7 +19,9 @@ class ContentDeliveryService:
19
  async def __aexit__(self, exc_type, exc_val, exc_tb):
20
  pass
21
 
22
- async def search_and_generate_content(self, user_id: str = None) -> List[str]:
 
 
23
 
24
  query = """
25
  - Focus on the key concepts like B.E.A.M., B.L.I.P., C.O.R.E., and Situational Leadership.
@@ -56,6 +58,14 @@ class ContentDeliveryService:
56
  "role": "system",
57
  "content": "Here is an example of a personalized content for a participant in the Resilient Leadership program:- '🍁 *Gratitude Practice* With Thanksgiving on the horizon, let's cultivate gratitude. Start each meeting this week by sharing one thing you're grateful for.'",
58
  },
 
 
 
 
 
 
 
 
59
  ]
60
 
61
  async with OpenAIClient() as openai_client:
@@ -91,31 +101,53 @@ class ContentDeliveryService:
91
  async def send_messages(self):
92
  try:
93
 
94
- results = await self.content_repository.get_due_content_for_delivery()
 
 
 
95
 
96
- if not results:
 
 
 
 
 
 
 
97
  logger.info("No content due for delivery")
98
  return
99
 
100
  # Group results by team for efficient Slack client usage
101
  team_grouped_data = defaultdict(list)
102
- for user, content, team in results:
103
- team_grouped_data[team].append((user, content))
104
 
105
  # Process each team's users
106
- for team, user_contents in team_grouped_data.items():
107
  try:
108
  # Initialize Slack client with team token
109
  async with SlackClient(token=team.token) as slack:
110
- for user, content in user_contents:
111
  try:
 
 
 
 
 
 
 
112
 
113
- await slack.send_message_to_user(
114
- user_id=user.slack_id,
115
- message=content.content
116
  )
117
 
118
- await self.content_repository.mark_as_delivered(content.id)
 
 
 
 
 
 
119
 
120
  user.last_message_sent_at = datetime.now()
121
  user.next_content_delivery_date = self.calculate_next_delivery_date(
 
1
+ from datetime import datetime, timedelta
2
  from collections import defaultdict
3
  from typing import List
4
 
 
19
  async def __aexit__(self, exc_type, exc_val, exc_tb):
20
  pass
21
 
22
+ async def search_and_generate_content(
23
+ self, user_id: str = None, last_message: str = None
24
+ ) -> List[str]:
25
 
26
  query = """
27
  - Focus on the key concepts like B.E.A.M., B.L.I.P., C.O.R.E., and Situational Leadership.
 
58
  "role": "system",
59
  "content": "Here is an example of a personalized content for a participant in the Resilient Leadership program:- '🍁 *Gratitude Practice* With Thanksgiving on the horizon, let's cultivate gratitude. Start each meeting this week by sharing one thing you're grateful for.'",
60
  },
61
+ {
62
+ "role": "system",
63
+ "content": (
64
+ "This was the last message sent to the user: " + last_message
65
+ if last_message
66
+ else "No previous message sent to the user"
67
+ ),
68
+ },
69
  ]
70
 
71
  async with OpenAIClient() as openai_client:
 
101
  async def send_messages(self):
102
  try:
103
 
104
+ start_of_day = datetime.now().replace(
105
+ hour=0, minute=0, second=0, microsecond=0
106
+ )
107
+ end_of_day = start_of_day + timedelta(days=1)
108
 
109
+ users = await self.user_repository.get_all(
110
+ filter_by={
111
+ "next_content_delivery_date__gte": start_of_day,
112
+ "next_content_delivery_date__lt": end_of_day,
113
+ }
114
+ )
115
+
116
+ if not users:
117
  logger.info("No content due for delivery")
118
  return
119
 
120
  # Group results by team for efficient Slack client usage
121
  team_grouped_data = defaultdict(list)
122
+ for user in users:
123
+ team_grouped_data[user.slack_team_id].append(user)
124
 
125
  # Process each team's users
126
+ for team in team_grouped_data.items():
127
  try:
128
  # Initialize Slack client with team token
129
  async with SlackClient(token=team.token) as slack:
130
+ for user in team:
131
  try:
132
+ last_messages = await self.content_repository.get_users_last_message(
133
+ user.id
134
+ )
135
+ content = await self.search_and_generate_content(
136
+ str(user.id),
137
+ last_messages[0].content if last_messages else None,
138
+ )
139
 
140
+ await slack.send_message(
141
+ user_id=user.slack_id, message=content[0]
 
142
  )
143
 
144
+ await self.content_repository.create(
145
+ {
146
+ "user_id": user.id,
147
+ "content": content[0],
148
+ "delivered_at": datetime.now(),
149
+ }
150
+ )
151
 
152
  user.last_message_sent_at = datetime.now()
153
  user.next_content_delivery_date = self.calculate_next_delivery_date(
src/services/_file_service.py CHANGED
@@ -1,3 +1,5 @@
 
 
1
  from pathlib import Path
2
  from typing import List, Tuple, Dict
3
  from fastapi import UploadFile
@@ -32,9 +34,13 @@ class FileService:
32
  Returns:
33
  Path: Path where file was saved
34
  """
 
35
  file_path = self.upload_dir / file.filename
36
  with open(file_path, "wb") as buffer:
37
- shutil.copyfileobj(file.file, buffer)
 
 
 
38
  return file_path
39
 
40
  async def filter_ppt_content(self, parsed_content: Tuple[Tuple[int, str], ...]) -> Tuple[Tuple[int, str], ...]:
@@ -70,24 +76,8 @@ class FileService:
70
  return tuple(filtered_content)
71
 
72
  async def parse_document(
73
- self, file: UploadFile, user_id: int = None, slack_file_id: str = None
74
  ) -> Dict:
75
- file_path = await self.save_uploaded_file(file)
76
- file_name = file.filename
77
-
78
- if user_id is not None:
79
- # Create file record in database only if user_id exists
80
- mime_type = file.content_type or f"application/{file_name.split('.')[-1]}"
81
- db_file = await self.file_repository.create_file(
82
- file_name=file_name,
83
- user_id=user_id,
84
- mime_type=mime_type,
85
- slack_file_id=slack_file_id
86
- )
87
- identifier = db_file.id
88
- else:
89
- # Skip database operations if user_id is None
90
- identifier = file_name
91
 
92
  async with DocumentParser() as document_parser:
93
  parsed_content = await document_parser.parse_file_content(str(file_path))
@@ -96,30 +86,35 @@ class FileService:
96
  parsed_content = await self.filter_ppt_content(parsed_content)
97
 
98
  async with Helper() as helper:
99
- documents = helper.create_documents(parsed_content, identifier, user_id)
100
 
101
  # Store documents in vector store and get record IDs
102
  async with VectorEmbedding() as vector_embedding:
103
  record_ids = await vector_embedding.store_documents(documents)
104
 
105
- if user_id is not None:
106
- if record_ids:
107
- await self.vector_repository.create_records(db_file.id, record_ids)
108
- await self.file_repository.mark_as_processed(db_file.id)
109
- return_key = "file_id"
110
- else:
111
- return_key = "file_name"
112
-
113
- return {
114
- "message": "Document's embeddings created successfully",
115
- return_key: identifier
116
- }
117
 
118
  async def process_file(
119
  self, file: UploadFile, user_id: int = None, slack_file_id: str = None
120
  ) -> Dict:
121
  try:
122
- await self.parse_document(file, user_id, slack_file_id)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
123
 
124
  async with UserService() as user_service:
125
  await user_service.send_message(
 
1
+ import asyncio
2
+ from concurrent.futures import ThreadPoolExecutor
3
  from pathlib import Path
4
  from typing import List, Tuple, Dict
5
  from fastapi import UploadFile
 
34
  Returns:
35
  Path: Path where file was saved
36
  """
37
+ loop = asyncio.get_event_loop()
38
  file_path = self.upload_dir / file.filename
39
  with open(file_path, "wb") as buffer:
40
+ with ThreadPoolExecutor() as pool:
41
+ await loop.run_in_executor(
42
+ pool, lambda: shutil.copyfileobj(file.file, buffer)
43
+ )
44
  return file_path
45
 
46
  async def filter_ppt_content(self, parsed_content: Tuple[Tuple[int, str], ...]) -> Tuple[Tuple[int, str], ...]:
 
76
  return tuple(filtered_content)
77
 
78
  async def parse_document(
79
+ self, file_name: str, file_path: str, user_id: int = None
80
  ) -> Dict:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
81
 
82
  async with DocumentParser() as document_parser:
83
  parsed_content = await document_parser.parse_file_content(str(file_path))
 
86
  parsed_content = await self.filter_ppt_content(parsed_content)
87
 
88
  async with Helper() as helper:
89
+ documents = helper.create_documents(parsed_content, file_name, user_id)
90
 
91
  # Store documents in vector store and get record IDs
92
  async with VectorEmbedding() as vector_embedding:
93
  record_ids = await vector_embedding.store_documents(documents)
94
 
95
+ return record_ids
 
 
 
 
 
 
 
 
 
 
 
96
 
97
  async def process_file(
98
  self, file: UploadFile, user_id: int = None, slack_file_id: str = None
99
  ) -> Dict:
100
  try:
101
+ file_path = await self.save_uploaded_file(file)
102
+ file_name = file.filename
103
+
104
+ if user_id is not None:
105
+ # Create file record in database only if user_id exists
106
+ mime_type = (
107
+ file.content_type or f"application/{file_name.split('.')[-1]}"
108
+ )
109
+ db_file = await self.file_repository.create_file(
110
+ file_name=file_name,
111
+ user_id=user_id,
112
+ mime_type=mime_type,
113
+ slack_file_id=slack_file_id,
114
+ )
115
+ record_ids = await self.parse_document(file_name, file_path, user_id)
116
+ await self.vector_repository.create_records(db_file.id, record_ids)
117
+ await self.file_repository.mark_as_processed(db_file.id)
118
 
119
  async with UserService() as user_service:
120
  await user_service.send_message(
src/services/_slack_bot_service.py CHANGED
@@ -64,13 +64,7 @@ class SlackBotService:
64
  response = await slack_client.get_users_list()
65
  return response
66
 
67
- async def save_file(self, team_id: str, file: dict):
68
- logger.info(f"team_id is {team_id}")
69
- team = await self.get_team_by_slack_team_id(team_id)
70
-
71
- if not team:
72
- logger.error(f"Team not found for slack_team_id: {team_id}")
73
- return {"message": "Team not found"}
74
 
75
  file_url = file.get("url_private")
76
  file_name = file.get("name", "downloaded_file")
 
64
  response = await slack_client.get_users_list()
65
  return response
66
 
67
+ async def save_file(self, team: any, file: dict):
 
 
 
 
 
 
68
 
69
  file_url = file.get("url_private")
70
  file_name = file.get("name", "downloaded_file")
src/services/_user_service.py CHANGED
@@ -25,6 +25,10 @@ class UserService:
25
  return await self.user_repository.get_user_by_email(email=email)
26
 
27
  async def update_user_info(self, user_id, info):
 
 
 
 
28
  return await self.user_repository.patch(user_id, info)
29
 
30
  async def get_users_list(self):
@@ -53,7 +57,7 @@ class UserService:
53
  def calculate_next_delivery_date(self, last_sent_at: Optional[datetime], frequency: ContentDeliveryFrequency) -> datetime:
54
  """Calculate next delivery date based on last message sent date"""
55
  base_date = last_sent_at if last_sent_at else datetime.now()
56
-
57
  if frequency == ContentDeliveryFrequency.DAILY:
58
  return base_date + timedelta(days=1)
59
  elif frequency == ContentDeliveryFrequency.WEEKLY:
@@ -62,7 +66,7 @@ class UserService:
62
  return base_date + timedelta(weeks=2)
63
  elif frequency == ContentDeliveryFrequency.MONTHLY:
64
  return base_date + timedelta(days=30)
65
-
66
  return base_date + timedelta(weeks=1) # Default to weekly
67
 
68
  async def update_user_frequency(self, user_id: int, frequency: ContentDeliveryFrequency) -> Optional[User]:
@@ -85,7 +89,6 @@ class UserService:
85
 
86
  return await self.user_repository.patch(user_id, update_data)
87
 
88
-
89
  async def send_message(self, user_id: int, message: str):
90
  """Send message to user"""
91
  user = await self.user_repository.get(user_id)
 
25
  return await self.user_repository.get_user_by_email(email=email)
26
 
27
  async def update_user_info(self, user_id, info):
28
+ next_delivery_date = self.calculate_next_delivery_date(
29
+ datetime.now(), ContentDeliveryFrequency.DAILY
30
+ )
31
+ info["next_content_delivery_date"] = next_delivery_date
32
  return await self.user_repository.patch(user_id, info)
33
 
34
  async def get_users_list(self):
 
57
  def calculate_next_delivery_date(self, last_sent_at: Optional[datetime], frequency: ContentDeliveryFrequency) -> datetime:
58
  """Calculate next delivery date based on last message sent date"""
59
  base_date = last_sent_at if last_sent_at else datetime.now()
60
+
61
  if frequency == ContentDeliveryFrequency.DAILY:
62
  return base_date + timedelta(days=1)
63
  elif frequency == ContentDeliveryFrequency.WEEKLY:
 
66
  return base_date + timedelta(weeks=2)
67
  elif frequency == ContentDeliveryFrequency.MONTHLY:
68
  return base_date + timedelta(days=30)
69
+
70
  return base_date + timedelta(weeks=1) # Default to weekly
71
 
72
  async def update_user_frequency(self, user_id: int, frequency: ContentDeliveryFrequency) -> Optional[User]:
 
89
 
90
  return await self.user_repository.patch(user_id, update_data)
91
 
 
92
  async def send_message(self, user_id: int, message: str):
93
  """Send message to user"""
94
  user = await self.user_repository.get(user_id)
src/utils/_helpers.py CHANGED
@@ -15,7 +15,7 @@ class Helper:
15
  async def create_documents(
16
  page_texts: List[Dict],
17
  file_name: str,
18
- user_id: Optional[str] = None,
19
  ) -> List[Document]:
20
  """
21
  Create Document objects from parsed page texts using either file_id or file_name.
@@ -34,7 +34,7 @@ class Helper:
34
  metadata = {
35
  "file_name": file_name,
36
  "page_number": page_num,
37
- "user_id": user_id,
38
  }
39
  loop = asyncio.get_event_loop()
40
  with ThreadPoolExecutor() as pool:
 
15
  async def create_documents(
16
  page_texts: List[Dict],
17
  file_name: str,
18
+ user_id: int = None,
19
  ) -> List[Document]:
20
  """
21
  Create Document objects from parsed page texts using either file_id or file_name.
 
34
  metadata = {
35
  "file_name": file_name,
36
  "page_number": page_num,
37
+ "user_id": str(user_id) if user_id else "public",
38
  }
39
  loop = asyncio.get_event_loop()
40
  with ThreadPoolExecutor() as pool: