Charles Grandjean commited on
Commit
298e92e
Β·
1 Parent(s): 5d6e1b5

le sputains de noms sont pas bons kevin!

Browse files
agent_states/actors_merger.py CHANGED
@@ -7,3 +7,4 @@ class ActorsMergerState(TypedDict):
7
  existing_actors: List[Dict[str, Any]]
8
  messages: List[Any]
9
  completed: bool
 
 
7
  existing_actors: List[Dict[str, Any]]
8
  messages: List[Any]
9
  completed: bool
10
+ modified_actor_names: List[str]
agents/actors_merger.py CHANGED
@@ -1,11 +1,10 @@
1
-
2
  import os
3
  import httpx
4
  import json
5
  import logging
6
  import asyncio
7
  from typing import TypedDict, List, Dict, Any
8
- from agent_states.actors_merger_state import ActorResolutionState
9
  from langgraph.graph import StateGraph, END
10
  from langchain_core.messages import HumanMessage, SystemMessage, ToolMessage
11
  from prompts.actors_merger import SYSTEM_PROMPT,ACTOR_MERGER_PROMPT
@@ -20,7 +19,7 @@ class ActorsMergerAgent:
20
  self.workflow = self._build_workflow()
21
 
22
  def _build_workflow(self):
23
- workflow = StateGraph(ActorResolutionState)
24
  workflow.add_node("reason", self._reason)
25
  workflow.add_node("run_tools", self._run_tools)
26
 
@@ -38,10 +37,10 @@ class ActorsMergerAgent:
38
 
39
  return workflow.compile()
40
 
41
- def _should_continue(self, state: ActorResolutionState) -> str:
42
  return "done" if state["completed"] else "continue"
43
 
44
- async def _reason(self, state: ActorResolutionState) -> ActorResolutionState:
45
  if not state["messages"]:
46
  state["messages"] = [
47
  SystemMessage(content=SYSTEM_PROMPT),
@@ -50,10 +49,10 @@ class ActorsMergerAgent:
50
  {ACTOR_MERGER_PROMPT}
51
 
52
  NEW_ACTORS:
53
- {json.dumps(new_extractions, indent=2)}
54
 
55
  EXISTING_ACTORS:
56
- {json.dumps(existing_actors, indent=2)}
57
  """
58
  ),
59
  ]
@@ -62,7 +61,7 @@ class ActorsMergerAgent:
62
  state["messages"].append(response)
63
  return state
64
 
65
- async def _run_tools(self, state: ActorResolutionState) -> ActorResolutionState:
66
  last_message = state["messages"][-1]
67
  tool_calls = getattr(last_message, "tool_calls", []) or []
68
 
@@ -73,6 +72,11 @@ class ActorsMergerAgent:
73
  if name == "add_actors":
74
  actors = args.get("actors", [])
75
  state["existing_actors"].extend(actors)
 
 
 
 
 
76
  result = {"ok": True, "added": len(actors)}
77
 
78
  elif name == "modify_actors":
@@ -86,6 +90,10 @@ class ActorsMergerAgent:
86
 
87
  for actor in state["existing_actors"]:
88
  if actor.get("name") == target_name:
 
 
 
 
89
  if mod.get("name"):
90
  actor["name"] = mod["name"]
91
 
@@ -159,8 +167,8 @@ class ActorsMergerAgent:
159
  logger.error(f"❌ Fetch existing actors failed: {e}")
160
  return []
161
 
162
- async def _update_actors(self, user_id: str, actors: List[Dict[str, Any]]) -> bool:
163
- """Update actors to Supabase"""
164
  try:
165
  base_url = os.getenv("SUPABASE_BASE_URL")
166
  if not base_url:
@@ -185,18 +193,19 @@ class ActorsMergerAgent:
185
  new_extractions: List[Dict[str, Any]],
186
  existing_actors: List[Dict[str, Any]]
187
  ) -> Dict[str, Any]:
188
- initial_state: ActorResolutionState = {
189
  "new_extractions": new_extractions,
190
  "existing_actors": existing_actors,
191
  "messages": [],
192
  "completed": False,
 
193
  }
194
 
195
  final_state = await self.workflow.ainvoke(initial_state)
196
 
197
  return {
198
  "existing_actors": final_state["existing_actors"],
199
- "completed": final_state["completed"],
200
  }
201
 
202
  async def process_actors_async(self, actors_md: str, user_id: str):
@@ -212,29 +221,46 @@ class ActorsMergerAgent:
212
  try:
213
  logger.info(f"πŸ”„ Starting actors processing for user {user_id}")
214
 
215
- # Step 1: Fetch existing actors from Supabase
 
 
 
 
 
216
  logger.info("πŸ“₯ Fetching existing actors from Supabase...")
217
  existing_actors = await self._fetch_existing_actors(user_id)
218
  logger.info(f"βœ… Fetched {len(existing_actors)} existing actors")
219
 
220
- # Step 2: Merge using LLM reasoning
221
  logger.info("πŸ€– Merging actors using LLM...")
222
  result = await self.resolve(
223
- new_extractions=actors_md,
224
  existing_actors=existing_actors
225
  )
226
  merged_actors = result["existing_actors"]
227
- logger.info(f"βœ… Merged into {len(merged_actors)} actors")
 
228
 
229
- # Step 3: Update actors to Supabase
230
- logger.info("πŸ“€ Updating actors to Supabase...")
231
- success = await self._update_actors(user_id, merged_actors)
232
- if success:
233
- logger.info("βœ… Actors processing completed successfully")
 
 
 
 
 
 
 
 
 
 
 
234
  else:
235
- logger.error("❌ Failed to update actors to Supabase")
236
 
237
  except Exception as e:
238
  logger.error(f"❌ Error in process_actors_async: {e}")
239
  import traceback
240
- logger.error(f"πŸ” Traceback: {traceback.format_exc()}")
 
 
1
  import os
2
  import httpx
3
  import json
4
  import logging
5
  import asyncio
6
  from typing import TypedDict, List, Dict, Any
7
+ from agent_states.actors_merger import ActorsMergerState
8
  from langgraph.graph import StateGraph, END
9
  from langchain_core.messages import HumanMessage, SystemMessage, ToolMessage
10
  from prompts.actors_merger import SYSTEM_PROMPT,ACTOR_MERGER_PROMPT
 
19
  self.workflow = self._build_workflow()
20
 
21
  def _build_workflow(self):
22
+ workflow = StateGraph(ActorsMergerState)
23
  workflow.add_node("reason", self._reason)
24
  workflow.add_node("run_tools", self._run_tools)
25
 
 
37
 
38
  return workflow.compile()
39
 
40
+ def _should_continue(self, state: ActorsMergerState) -> str:
41
  return "done" if state["completed"] else "continue"
42
 
43
+ async def _reason(self, state: ActorsMergerState) -> ActorsMergerState:
44
  if not state["messages"]:
45
  state["messages"] = [
46
  SystemMessage(content=SYSTEM_PROMPT),
 
49
  {ACTOR_MERGER_PROMPT}
50
 
51
  NEW_ACTORS:
52
+ {json.dumps(state["new_extractions"], indent=2)}
53
 
54
  EXISTING_ACTORS:
55
+ {json.dumps(state["existing_actors"], indent=2)}
56
  """
57
  ),
58
  ]
 
61
  state["messages"].append(response)
62
  return state
63
 
64
+ async def _run_tools(self, state: ActorsMergerState) -> ActorsMergerState:
65
  last_message = state["messages"][-1]
66
  tool_calls = getattr(last_message, "tool_calls", []) or []
67
 
 
72
  if name == "add_actors":
73
  actors = args.get("actors", [])
74
  state["existing_actors"].extend(actors)
75
+ # Track added actor names
76
+ for actor in actors:
77
+ actor_name = actor.get("name")
78
+ if actor_name and actor_name not in state["modified_actor_names"]:
79
+ state["modified_actor_names"].append(actor_name)
80
  result = {"ok": True, "added": len(actors)}
81
 
82
  elif name == "modify_actors":
 
90
 
91
  for actor in state["existing_actors"]:
92
  if actor.get("name") == target_name:
93
+ # Track modified actor name (using original name before change)
94
+ if target_name not in state["modified_actor_names"]:
95
+ state["modified_actor_names"].append(target_name)
96
+
97
  if mod.get("name"):
98
  actor["name"] = mod["name"]
99
 
 
167
  logger.error(f"❌ Fetch existing actors failed: {e}")
168
  return []
169
 
170
+ async def _update_changed_actors(self, user_id: str, actors: List[Dict[str, Any]]) -> bool:
171
+ """Update changed actors to Supabase (only modified or newly added actors)"""
172
  try:
173
  base_url = os.getenv("SUPABASE_BASE_URL")
174
  if not base_url:
 
193
  new_extractions: List[Dict[str, Any]],
194
  existing_actors: List[Dict[str, Any]]
195
  ) -> Dict[str, Any]:
196
+ initial_state: ActorsMergerState = {
197
  "new_extractions": new_extractions,
198
  "existing_actors": existing_actors,
199
  "messages": [],
200
  "completed": False,
201
+ "modified_actor_names": [],
202
  }
203
 
204
  final_state = await self.workflow.ainvoke(initial_state)
205
 
206
  return {
207
  "existing_actors": final_state["existing_actors"],
208
+ "modified_actor_names": final_state["modified_actor_names"],
209
  }
210
 
211
  async def process_actors_async(self, actors_md: str, user_id: str):
 
221
  try:
222
  logger.info(f"πŸ”„ Starting actors processing for user {user_id}")
223
 
224
+ # Step 1: Parse markdown to actor list
225
+ logger.info("πŸ“ Parsing actors from markdown...")
226
+ new_extractions = self._parse_actors(actors_md)
227
+ logger.info(f"βœ… Parsed {len(new_extractions)} new actors")
228
+
229
+ # Step 2: Fetch existing actors from Supabase
230
  logger.info("πŸ“₯ Fetching existing actors from Supabase...")
231
  existing_actors = await self._fetch_existing_actors(user_id)
232
  logger.info(f"βœ… Fetched {len(existing_actors)} existing actors")
233
 
234
+ # Step 3: Merge using LLM reasoning
235
  logger.info("πŸ€– Merging actors using LLM...")
236
  result = await self.resolve(
237
+ new_extractions=new_extractions,
238
  existing_actors=existing_actors
239
  )
240
  merged_actors = result["existing_actors"]
241
+ modified_actor_names = result["modified_actor_names"]
242
+ logger.info(f"βœ… Merged into {len(merged_actors)} actors, {len(modified_actor_names)} modified")
243
 
244
+ # Step 4: Filter only modified actors
245
+ if modified_actor_names:
246
+ logger.info(f"πŸ” Filtering {len(modified_actor_names)} modified actors...")
247
+ changed_actors = [
248
+ actor for actor in merged_actors
249
+ if actor.get("name") in modified_actor_names
250
+ ]
251
+ logger.info(f"βœ… Found {len(changed_actors)} changed actors")
252
+
253
+ # Step 5: Update only changed actors to Supabase
254
+ logger.info("πŸ“€ Updating changed actors to Supabase...")
255
+ success = await self._update_changed_actors(user_id, changed_actors)
256
+ if success:
257
+ logger.info("βœ… Actors processing completed successfully")
258
+ else:
259
+ logger.error("❌ Failed to update actors to Supabase")
260
  else:
261
+ logger.info("βœ… No actors were modified, skipping update")
262
 
263
  except Exception as e:
264
  logger.error(f"❌ Error in process_actors_async: {e}")
265
  import traceback
266
+ logger.error(f"πŸ” Traceback: {traceback.format_exc()}")
agents/pdf_analyzer.py CHANGED
@@ -187,12 +187,22 @@ class PDFAnalyzerAgent:
187
  logger.info("=" * 80)
188
  logger.info("βœ… Actors extracted")
189
 
190
- # asyncio.create_task(
191
- # self.actors_merger.process_actors_async(
192
- # actors_md=state["actors"],
193
- # user_id=state["user_id"]
194
- # )
195
- # )
 
 
 
 
 
 
 
 
 
 
196
  return state
197
 
198
  async def _extract_key_details(self, state: PDFAnalyzerState) -> PDFAnalyzerState:
 
187
  logger.info("=" * 80)
188
  logger.info("βœ… Actors extracted")
189
 
190
+ # Launch actors merger asynchronously in the background
191
+ if self.actors_merger and state.get("user_id"):
192
+ logger.info("πŸš€ Launching actors merger in background...")
193
+ asyncio.create_task(
194
+ self.actors_merger.process_actors_async(
195
+ actors_md=state["actors"],
196
+ user_id=state["user_id"]
197
+ )
198
+ )
199
+ logger.info("βœ… Actors merger task created (running in background)")
200
+ else:
201
+ if not self.actors_merger:
202
+ logger.warning("⚠️ Actors merger not initialized, skipping")
203
+ if not state.get("user_id"):
204
+ logger.warning("⚠️ user_id not provided, skipping actors merger")
205
+
206
  return state
207
 
208
  async def _extract_key_details(self, state: PDFAnalyzerState) -> PDFAnalyzerState: