spagestic commited on
Commit
493a57c
Β·
1 Parent(s): 0c930e9

Robust Planner Discovery Workflow

Browse files
ui/agent/graph/nodes/__init__.py CHANGED
@@ -1,10 +1,12 @@
1
  # ui/agent/graph/nodes/__init__.py
2
  from .consolidator import consolidator_node
 
3
  from .planner import fan_out_research, planner_node
4
  from .researcher import researcher_node
5
 
6
  __all__ = [
7
  "consolidator_node",
 
8
  "fan_out_research",
9
  "planner_node",
10
  "researcher_node",
 
1
  # ui/agent/graph/nodes/__init__.py
2
  from .consolidator import consolidator_node
3
+ from .discovery import discovery_node
4
  from .planner import fan_out_research, planner_node
5
  from .researcher import researcher_node
6
 
7
  __all__ = [
8
  "consolidator_node",
9
+ "discovery_node",
10
  "fan_out_research",
11
  "planner_node",
12
  "researcher_node",
ui/agent/graph/nodes/discovery.py ADDED
@@ -0,0 +1,105 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # ui/agent/graph/nodes/discovery.py
2
+ from __future__ import annotations
3
+
4
+ import json
5
+ from typing import Any
6
+
7
+ from langchain_core.runnables import RunnableConfig
8
+ from langgraph.config import get_stream_writer
9
+
10
+ from apis.exa import search_immigration
11
+ from ..state import AgentState, CandidateCountry
12
+ from .helpers import (
13
+ candidates_from_search_text,
14
+ discovery_queries,
15
+ heuristic_candidate_countries,
16
+ merge_candidates,
17
+ profile_summary_from_text,
18
+ user_text,
19
+ )
20
+
21
+
22
+ def _format_discovery_summary(
23
+ candidates: list[CandidateCountry],
24
+ search_notes: list[str],
25
+ ) -> str:
26
+ lines = [
27
+ "Initial destination shortlist for parallel research:",
28
+ "",
29
+ ]
30
+ for item in candidates:
31
+ lines.append(
32
+ f"- {item['name']} ({item['iso2']}): {item['pathway_hint']}"
33
+ )
34
+ if search_notes:
35
+ lines.extend(["", "Discovery search notes:"])
36
+ lines.extend(f"- {note}" for note in search_notes[:4])
37
+ lines.append(
38
+ "\nThese are starting candidates only β€” each country will be researched "
39
+ "in depth before the final recommendation."
40
+ )
41
+ return "\n".join(lines)
42
+
43
+
44
+ def discovery_node(state: AgentState, config: RunnableConfig) -> dict[str, Any]:
45
+ del config # discovery uses direct API calls, not the LLM
46
+ writer = get_stream_writer()
47
+ profile_text = user_text(state["user_content"])
48
+ profile_summary = profile_summary_from_text(profile_text)
49
+ heuristic = heuristic_candidate_countries(profile_text)
50
+
51
+ search_notes: list[str] = []
52
+ search_blob = ""
53
+ search_log: dict[str, Any] = {"queries": [], "results": []}
54
+
55
+ for query in discovery_queries(profile_text):
56
+ search_log["queries"].append(query)
57
+ try:
58
+ result = search_immigration(query=query, num_results=6)
59
+ search_log["results"].append(result)
60
+ titles = [
61
+ str(item.get("title") or "")
62
+ for item in result.get("results", [])
63
+ if isinstance(item, dict)
64
+ ]
65
+ if titles:
66
+ search_notes.append(f"{query} β†’ {titles[0]}")
67
+ search_blob += "\n".join(
68
+ f"{item.get('title', '')} {item.get('url', '')}"
69
+ for item in result.get("results", [])
70
+ if isinstance(item, dict)
71
+ )
72
+ except Exception as exc:
73
+ search_notes.append(f"{query} β†’ search unavailable ({exc})")
74
+
75
+ from_search = candidates_from_search_text(search_blob)
76
+ candidates = merge_candidates(from_search, heuristic)
77
+ if not candidates:
78
+ candidates = heuristic
79
+
80
+ summary = _format_discovery_summary(candidates, search_notes)
81
+ writer(
82
+ {
83
+ "type": "discovery",
84
+ "summary": summary,
85
+ "candidates": candidates,
86
+ "log": {
87
+ "tool": "destination_discovery",
88
+ "arguments": {"queries": search_log["queries"]},
89
+ "result": truncate_discovery_log(search_log),
90
+ },
91
+ }
92
+ )
93
+
94
+ return {
95
+ "profile_summary": profile_summary,
96
+ "candidate_countries": candidates,
97
+ "discovery_summary": summary,
98
+ }
99
+
100
+
101
+ def truncate_discovery_log(search_log: dict[str, Any]) -> Any:
102
+ serialized = json.dumps(search_log, default=str)
103
+ if len(serialized) <= 1500:
104
+ return search_log
105
+ return serialized[:1500] + "\n… (truncated)"
ui/agent/graph/nodes/helpers.py CHANGED
@@ -2,14 +2,61 @@
2
  from __future__ import annotations
3
 
4
  import json
 
5
  import uuid
6
  from typing import Any
7
 
8
  from langchain_core.messages import AIMessage
9
 
 
10
  from ...messages import parse_text_tool_calls
11
  from ...tools import _parse_arguments, truncate
12
- from .config import RESEARCH_TOOL_NAMES
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
13
 
14
  def extract_json(text: str) -> dict[str, Any] | None:
15
  decoder = json.JSONDecoder()
@@ -25,24 +72,6 @@ def extract_json(text: str) -> dict[str, Any] | None:
25
  return None
26
 
27
 
28
- def fallback_plan(user_text: str) -> dict[str, Any]:
29
- return {
30
- "thinking": "",
31
- "countries": [],
32
- "labels": [],
33
- "profile_summary": truncate(user_text, 600),
34
- "todos": [
35
- {
36
- "title": "Research migration options",
37
- "description": (
38
- "Research realistic migration options for this profile: "
39
- f"{truncate(user_text, 1200)}"
40
- ),
41
- }
42
- ],
43
- }
44
-
45
-
46
  def user_text(user_content: str | list[dict[str, Any]]) -> str:
47
  if isinstance(user_content, str):
48
  return user_content
@@ -54,6 +83,226 @@ def user_text(user_content: str | list[dict[str, Any]]) -> str:
54
  return "\n".join(part for part in parts if part)
55
 
56
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
57
  def research_tool_calls(
58
  response: AIMessage,
59
  ) -> list[tuple[str, dict[str, Any], str]]:
@@ -82,3 +331,19 @@ def research_tool_calls(
82
  )
83
  )
84
  return normalized
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2
  from __future__ import annotations
3
 
4
  import json
5
+ import re
6
  import uuid
7
  from typing import Any
8
 
9
  from langchain_core.messages import AIMessage
10
 
11
+ from apis.rest_countries import lookup_country
12
  from ...messages import parse_text_tool_calls
13
  from ...tools import _parse_arguments, truncate
14
+ from ..state import CandidateCountry, TodoItem
15
+ from .config import MAX_TODOS, RESEARCH_TOOL_NAMES
16
+
17
+ # Conservative starting shortlist for skilled IT applicants (used when LLM/search fail).
18
+ DEFAULT_SKILLED_IT_COUNTRIES: list[CandidateCountry] = [
19
+ {
20
+ "iso2": "CA",
21
+ "name": "Canada",
22
+ "pathway_hint": "Express Entry / Provincial Nominee skilled worker route",
23
+ "label": "Skilled worker - 12-18 mo",
24
+ },
25
+ {
26
+ "iso2": "DE",
27
+ "name": "Germany",
28
+ "pathway_hint": "EU Blue Card / skilled worker residence route",
29
+ "label": "EU Blue Card - 6-12 mo",
30
+ },
31
+ {
32
+ "iso2": "AU",
33
+ "name": "Australia",
34
+ "pathway_hint": "Skilled Independent / State nomination route",
35
+ "label": "Skilled migration - 12-18 mo",
36
+ },
37
+ {
38
+ "iso2": "IE",
39
+ "name": "Ireland",
40
+ "pathway_hint": "Critical Skills Employment Permit route",
41
+ "label": "Critical Skills - 6-12 mo",
42
+ },
43
+ ]
44
+
45
+ _COUNTRY_NAME_TO_ISO2 = {
46
+ "canada": "CA",
47
+ "germany": "DE",
48
+ "australia": "AU",
49
+ "ireland": "IE",
50
+ "new zealand": "NZ",
51
+ "united kingdom": "GB",
52
+ "uk": "GB",
53
+ "portugal": "PT",
54
+ "netherlands": "NL",
55
+ "singapore": "SG",
56
+ "united states": "US",
57
+ "usa": "US",
58
+ }
59
+
60
 
61
  def extract_json(text: str) -> dict[str, Any] | None:
62
  decoder = json.JSONDecoder()
 
72
  return None
73
 
74
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
75
  def user_text(user_content: str | list[dict[str, Any]]) -> str:
76
  if isinstance(user_content, str):
77
  return user_content
 
83
  return "\n".join(part for part in parts if part)
84
 
85
 
86
+ def profile_summary_from_text(profile_text: str) -> str:
87
+ """Build a short profile summary without calling the LLM."""
88
+ lines = [line.strip() for line in profile_text.splitlines() if line.strip()]
89
+ if not lines:
90
+ return truncate(profile_text, 600)
91
+ headline = lines[0]
92
+ bullets = [line.lstrip("- ").strip() for line in lines[1:] if line.strip().startswith("-")]
93
+ if bullets:
94
+ return truncate(f"{headline}. Key constraints: {'; '.join(bullets[:6])}.", 600)
95
+ return truncate(profile_text, 600)
96
+
97
+
98
+ def heuristic_candidate_countries(profile_text: str) -> list[CandidateCountry]:
99
+ """Deterministic shortlist when discovery/planner cannot produce one."""
100
+ text = profile_text.lower()
101
+ candidates = list(DEFAULT_SKILLED_IT_COUNTRIES)
102
+
103
+ if any(word in text for word in ("software", "it", "engineer", "developer", "tech")):
104
+ return candidates[:MAX_TODOS]
105
+
106
+ if any(word in text for word in ("study", "student", "university")):
107
+ return [
108
+ {
109
+ "iso2": "DE",
110
+ "name": "Germany",
111
+ "pathway_hint": "Student visa / post-study residence route",
112
+ "label": "Study route - 12-24 mo",
113
+ },
114
+ {
115
+ "iso2": "CA",
116
+ "name": "Canada",
117
+ "pathway_hint": "Study permit / PGWP pathway",
118
+ "label": "Study route - 12-24 mo",
119
+ },
120
+ {
121
+ "iso2": "IE",
122
+ "name": "Ireland",
123
+ "pathway_hint": "Study / graduate route",
124
+ "label": "Study route - 12-24 mo",
125
+ },
126
+ {
127
+ "iso2": "AU",
128
+ "name": "Australia",
129
+ "pathway_hint": "Student visa / skilled graduate route",
130
+ "label": "Study route - 12-24 mo",
131
+ },
132
+ ][:MAX_TODOS]
133
+
134
+ return candidates[:MAX_TODOS]
135
+
136
+
137
+ def _candidate_from_iso2(iso2: str) -> CandidateCountry | None:
138
+ info = lookup_country(iso2)
139
+ if not info:
140
+ return None
141
+ default = next(
142
+ (item for item in DEFAULT_SKILLED_IT_COUNTRIES if item["iso2"] == iso2.upper()),
143
+ None,
144
+ )
145
+ if default:
146
+ return default
147
+ return {
148
+ "iso2": info["cca2"],
149
+ "name": str(info["name"]),
150
+ "pathway_hint": "Skilled worker / residence pathway",
151
+ "label": "Skilled route - 12-18 mo",
152
+ }
153
+
154
+
155
+ def candidates_from_search_text(text: str) -> list[CandidateCountry]:
156
+ """Extract mentioned countries from search result text."""
157
+ lowered = text.lower()
158
+ found: list[CandidateCountry] = []
159
+ seen: set[str] = set()
160
+ for name, iso2 in _COUNTRY_NAME_TO_ISO2.items():
161
+ if name in lowered and iso2 not in seen:
162
+ candidate = _candidate_from_iso2(iso2)
163
+ if candidate:
164
+ found.append(candidate)
165
+ seen.add(iso2)
166
+ return found
167
+
168
+
169
+ def merge_candidates(
170
+ primary: list[CandidateCountry],
171
+ secondary: list[CandidateCountry],
172
+ ) -> list[CandidateCountry]:
173
+ merged: list[CandidateCountry] = []
174
+ seen: set[str] = set()
175
+ for item in [*primary, *secondary]:
176
+ iso2 = item["iso2"].upper()
177
+ if iso2 in seen:
178
+ continue
179
+ merged.append(item)
180
+ seen.add(iso2)
181
+ if len(merged) >= MAX_TODOS:
182
+ break
183
+ return merged
184
+
185
+
186
+ def country_todo(
187
+ candidate: CandidateCountry,
188
+ profile_summary: str,
189
+ *,
190
+ todo_id: int,
191
+ ) -> TodoItem:
192
+ return {
193
+ "id": todo_id,
194
+ "title": f"{candidate['name']} β€” {candidate['pathway_hint']}",
195
+ "description": (
196
+ f"Research the best realistic skilled migration pathway to {candidate['name']} "
197
+ f"for this applicant. Focus on {candidate['pathway_hint']}. Cover eligibility, "
198
+ f"required documents, approximate costs, realistic timeline within 12-18 months, "
199
+ f"path to permanent residence, and risks. Use official government or immigration "
200
+ f"authority sources.\n\nApplicant profile: {profile_summary}"
201
+ ),
202
+ }
203
+
204
+
205
+ def plan_from_candidates(
206
+ candidates: list[CandidateCountry],
207
+ profile_text: str,
208
+ *,
209
+ thinking: str = "",
210
+ ) -> dict[str, Any]:
211
+ summary = profile_summary_from_text(profile_text)
212
+ todos = [
213
+ country_todo(candidate, summary, todo_id=index + 1)
214
+ for index, candidate in enumerate(candidates[:MAX_TODOS])
215
+ ]
216
+ return {
217
+ "thinking": thinking,
218
+ "countries": [item["iso2"] for item in candidates[:MAX_TODOS]],
219
+ "labels": [item["label"] for item in candidates[:MAX_TODOS]],
220
+ "profile_summary": summary,
221
+ "todos": todos,
222
+ }
223
+
224
+
225
+ def fallback_plan(
226
+ profile_text: str,
227
+ candidates: list[CandidateCountry] | None = None,
228
+ ) -> dict[str, Any]:
229
+ shortlist = candidates or heuristic_candidate_countries(profile_text)
230
+ return plan_from_candidates(
231
+ shortlist,
232
+ profile_text,
233
+ thinking=(
234
+ "Using a conservative starting shortlist of skilled-worker destinations. "
235
+ "Each country will be researched in parallel."
236
+ ),
237
+ )
238
+
239
+
240
+ def _is_generic_todo(todo: dict[str, Any]) -> bool:
241
+ title = str(todo.get("title") or "").lower()
242
+ description = str(todo.get("description") or "").lower()
243
+ generic_titles = {"research migration options", "research task"}
244
+ if title in generic_titles:
245
+ return True
246
+ if "research realistic migration options for this profile" in description:
247
+ return True
248
+ return len(description) > 800 and description.count("\n") >= 4
249
+
250
+
251
+ def normalize_plan(
252
+ plan: dict[str, Any] | None,
253
+ profile_text: str,
254
+ candidates: list[CandidateCountry],
255
+ ) -> dict[str, Any]:
256
+ """Ensure the plan has 3-4 useful country-specific todos."""
257
+ summary = str((plan or {}).get("profile_summary") or "").strip() or profile_summary_from_text(
258
+ profile_text
259
+ )
260
+ shortlist = candidates[:MAX_TODOS] or heuristic_candidate_countries(profile_text)
261
+
262
+ if plan is None:
263
+ return fallback_plan(profile_text, shortlist)
264
+
265
+ raw_todos = plan.get("todos") or []
266
+ todos: list[TodoItem] = []
267
+ for index, raw in enumerate(raw_todos[:MAX_TODOS]):
268
+ if not isinstance(raw, dict):
269
+ continue
270
+ if _is_generic_todo(raw):
271
+ continue
272
+ title = str(raw.get("title") or "").strip()
273
+ description = str(raw.get("description") or title).strip()
274
+ if not title:
275
+ continue
276
+ todos.append({"id": len(todos) + 1, "title": title, "description": description})
277
+
278
+ if len(todos) < 3:
279
+ todos = [
280
+ country_todo(candidate, summary, todo_id=index + 1)
281
+ for index, candidate in enumerate(shortlist[:MAX_TODOS])
282
+ ]
283
+
284
+ countries = [str(code) for code in plan.get("countries") or [] if code]
285
+ labels = [str(label) for label in plan.get("labels") or [] if label]
286
+ if len(countries) != len(todos):
287
+ countries = [item["iso2"] for item in shortlist[: len(todos)]]
288
+ labels = [item["label"] for item in shortlist[: len(todos)]]
289
+
290
+ thinking = str(plan.get("thinking") or "").strip()
291
+ if not thinking and len(todos) >= 3:
292
+ thinking = (
293
+ f"Split research into {len(todos)} parallel country tasks based on the "
294
+ f"applicant profile and discovery shortlist."
295
+ )
296
+
297
+ return {
298
+ "thinking": thinking,
299
+ "countries": countries,
300
+ "labels": labels,
301
+ "profile_summary": summary,
302
+ "todos": todos,
303
+ }
304
+
305
+
306
  def research_tool_calls(
307
  response: AIMessage,
308
  ) -> list[tuple[str, dict[str, Any], str]]:
 
331
  )
332
  )
333
  return normalized
334
+
335
+
336
+ def discovery_queries(profile_text: str) -> list[str]:
337
+ text = re.sub(r"\s+", " ", profile_text).strip()
338
+ occupation = "software engineer" if re.search(r"software|it|developer", text, re.I) else "skilled worker"
339
+ origin = "India" if re.search(r"\bindia\b", text, re.I) else "applicant country"
340
+ return [
341
+ (
342
+ f"best skilled worker immigration pathways {occupation} {origin} "
343
+ "official government permanent residence"
344
+ ),
345
+ (
346
+ f"countries skilled worker visa path to permanent residence "
347
+ f"{occupation} Indian citizen official immigration"
348
+ ),
349
+ ]
ui/agent/graph/nodes/planner.py CHANGED
@@ -7,14 +7,27 @@ from langchain_core.runnables import RunnableConfig
7
  from langgraph.config import get_stream_writer
8
  from langgraph.types import Send
9
 
10
- from ...tools import truncate
11
  from ..llm import build_llm
12
- from ..state import AgentState, TodoItem
13
  from .config import MAX_TODOS, PLANNER_MAX_TOKENS, PLANNER_TEMPERATURE
14
- from .helpers import extract_json, fallback_plan, user_text
 
 
 
 
 
15
  from .prompts import PLANNER_SYSTEM_PROMPT
16
 
17
 
 
 
 
 
 
 
 
 
 
18
  def planner_node(state: AgentState, config: RunnableConfig) -> dict[str, Any]:
19
  writer = get_stream_writer()
20
  llm = build_llm(
@@ -23,23 +36,45 @@ def planner_node(state: AgentState, config: RunnableConfig) -> dict[str, Any]:
23
  temperature=PLANNER_TEMPERATURE,
24
  )
25
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
26
  messages: list[Any] = [
27
  {"role": "system", "content": PLANNER_SYSTEM_PROMPT},
28
  *state.get("history_messages", []),
29
- {"role": "user", "content": state["user_content"]},
 
 
 
 
 
 
 
30
  ]
31
 
32
- plan: dict[str, Any] | None = None
33
  for _ in range(2):
34
  response = llm.invoke(messages)
35
- plan = extract_json(str(response.content or ""))
36
- if plan and plan.get("todos"):
37
  break
38
- plan = None
39
 
40
- profile_text = user_text(state["user_content"])
41
- if plan is None:
42
- plan = fallback_plan(profile_text)
43
 
44
  thinking = str(plan.get("thinking") or "").strip()
45
  if thinking:
@@ -52,9 +87,10 @@ def planner_node(state: AgentState, config: RunnableConfig) -> dict[str, Any]:
52
  title = str(raw.get("title") or f"Research task {index + 1}").strip()
53
  description = str(raw.get("description") or title).strip()
54
  todos.append({"id": index + 1, "title": title, "description": description})
 
55
  if not todos:
56
- fallback = fallback_plan(profile_text)["todos"][0]
57
- todos = [{"id": 1, "title": fallback["title"], "description": fallback["description"]}]
58
 
59
  writer({"type": "plan", "todos": todos})
60
 
@@ -68,10 +104,10 @@ def planner_node(state: AgentState, config: RunnableConfig) -> dict[str, Any]:
68
  }
69
  )
70
 
71
- profile_summary = str(plan.get("profile_summary") or "").strip() or truncate(
72
- profile_text, 600
73
- )
74
- return {"todos": todos, "profile_summary": profile_summary}
75
 
76
 
77
  def fan_out_research(state: AgentState) -> list[Send]:
 
7
  from langgraph.config import get_stream_writer
8
  from langgraph.types import Send
9
 
 
10
  from ..llm import build_llm
11
+ from ..state import AgentState, CandidateCountry, TodoItem
12
  from .config import MAX_TODOS, PLANNER_MAX_TOKENS, PLANNER_TEMPERATURE
13
+ from .helpers import (
14
+ extract_json,
15
+ heuristic_candidate_countries,
16
+ normalize_plan,
17
+ user_text,
18
+ )
19
  from .prompts import PLANNER_SYSTEM_PROMPT
20
 
21
 
22
+ def _format_candidate_shortlist(candidates: list[CandidateCountry]) -> str:
23
+ lines = ["Discovery shortlist (preferred starting countries):"]
24
+ for item in candidates:
25
+ lines.append(
26
+ f"- {item['iso2']} {item['name']}: {item['pathway_hint']} ({item['label']})"
27
+ )
28
+ return "\n".join(lines)
29
+
30
+
31
  def planner_node(state: AgentState, config: RunnableConfig) -> dict[str, Any]:
32
  writer = get_stream_writer()
33
  llm = build_llm(
 
36
  temperature=PLANNER_TEMPERATURE,
37
  )
38
 
39
+ profile_text = user_text(state["user_content"])
40
+ candidates = state.get("candidate_countries") or heuristic_candidate_countries(
41
+ profile_text
42
+ )
43
+ discovery_summary = str(state.get("discovery_summary") or "").strip()
44
+ profile_summary = str(state.get("profile_summary") or "").strip()
45
+
46
+ planner_context = "\n\n".join(
47
+ part
48
+ for part in [
49
+ _format_candidate_shortlist(candidates),
50
+ f"Discovery notes:\n{discovery_summary}" if discovery_summary else "",
51
+ f"Profile summary:\n{profile_summary}" if profile_summary else "",
52
+ ]
53
+ if part
54
+ )
55
+
56
  messages: list[Any] = [
57
  {"role": "system", "content": PLANNER_SYSTEM_PROMPT},
58
  *state.get("history_messages", []),
59
+ {
60
+ "role": "user",
61
+ "content": (
62
+ f"{planner_context}\n\n"
63
+ f"Original user request:\n{profile_text}\n\n"
64
+ "Produce the JSON research plan now."
65
+ ),
66
+ },
67
  ]
68
 
69
+ raw_plan: dict[str, Any] | None = None
70
  for _ in range(2):
71
  response = llm.invoke(messages)
72
+ raw_plan = extract_json(str(response.content or ""))
73
+ if raw_plan and raw_plan.get("todos"):
74
  break
75
+ raw_plan = None
76
 
77
+ plan = normalize_plan(raw_plan, profile_text, candidates)
 
 
78
 
79
  thinking = str(plan.get("thinking") or "").strip()
80
  if thinking:
 
87
  title = str(raw.get("title") or f"Research task {index + 1}").strip()
88
  description = str(raw.get("description") or title).strip()
89
  todos.append({"id": index + 1, "title": title, "description": description})
90
+
91
  if not todos:
92
+ plan = normalize_plan(None, profile_text, candidates)
93
+ todos = plan["todos"]
94
 
95
  writer({"type": "plan", "todos": todos})
96
 
 
104
  }
105
  )
106
 
107
+ return {
108
+ "todos": todos,
109
+ "profile_summary": str(plan.get("profile_summary") or profile_summary),
110
+ }
111
 
112
 
113
  def fan_out_research(state: AgentState) -> list[Send]:
ui/agent/graph/nodes/prompts.py CHANGED
@@ -4,23 +4,32 @@ from __future__ import annotations
4
  PLANNER_SYSTEM_PROMPT = """
5
  You are the planning supervisor of Borderless, an immigration research agency.
6
 
 
 
 
 
7
  Read the user's profile and goals, then produce a focused research plan that a
8
- team of parallel research analysts will execute. Identify 3-5 plausible
9
- destination countries (prefer realistic fit over popular destinations) and
10
- break the research into 3-5 self-contained to-dos. Each to-do must be
11
- researchable independently β€” typically one to-do per recommended country
12
- covering its best visa pathway, eligibility, documents, costs, timelines, and
13
- risks. You may add one cross-cutting to-do (e.g. comparing costs or document
14
- preparation) when useful.
 
 
 
 
 
15
 
16
  Respond with ONLY a JSON object, no other text:
17
  {
18
  "thinking": "brief reasoning about the user's profile and country choices",
19
- "countries": ["ISO-2 codes of recommended countries, e.g. CA", "DE"],
20
- "labels": ["short marker label per country, e.g. Skilled worker - 6-12 mo"],
21
  "profile_summary": "2-3 sentence summary of the user's profile and constraints",
22
  "todos": [
23
- {"title": "short title", "description": "specific research instructions for the analyst"}
24
  ]
25
  }
26
  """.strip()
 
4
  PLANNER_SYSTEM_PROMPT = """
5
  You are the planning supervisor of Borderless, an immigration research agency.
6
 
7
+ A discovery step has already produced a candidate country shortlist. Use
8
+ `candidate_countries` as your preferred starting point unless the profile clearly
9
+ rules a country out.
10
+
11
  Read the user's profile and goals, then produce a focused research plan that a
12
+ team of parallel research analysts will execute. Always produce 3-4
13
+ country-specific to-dos unless the profile is too sparse to justify that many.
14
+ Each to-do must be researchable independently β€” one to-do per country covering
15
+ its best visa pathway, eligibility, documents, costs, timelines, and risks.
16
+
17
+ Each todo title MUST include the country name and pathway direction, for example:
18
+ - "Canada β€” Express Entry / PNP skilled worker route"
19
+ - "Germany β€” EU Blue Card / skilled worker residence route"
20
+ - "Australia β€” Skilled Independent / State nomination route"
21
+ - "Ireland β€” Critical Skills Employment Permit route"
22
+
23
+ Do NOT create one broad todo that repeats the full user prompt.
24
 
25
  Respond with ONLY a JSON object, no other text:
26
  {
27
  "thinking": "brief reasoning about the user's profile and country choices",
28
+ "countries": ["ISO-2 codes matching the todos, e.g. CA", "DE"],
29
+ "labels": ["short marker label per country, e.g. Skilled worker - 12-18 mo"],
30
  "profile_summary": "2-3 sentence summary of the user's profile and constraints",
31
  "todos": [
32
+ {"title": "Country β€” pathway title", "description": "specific research instructions for the analyst"}
33
  ]
34
  }
35
  """.strip()
ui/agent/graph/respond.py CHANGED
@@ -51,6 +51,20 @@ class _UiState:
51
  )
52
  return True
53
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
54
  if kind == "plan":
55
  todos = event.get("todos") or []
56
  self.ui_messages.append(
 
51
  )
52
  return True
53
 
54
+ if kind == "discovery":
55
+ self.ui_messages.append(
56
+ ChatMessage(
57
+ role="assistant",
58
+ content=str(event.get("summary") or ""),
59
+ metadata={
60
+ "title": "Destination discovery",
61
+ "status": "done",
62
+ "log": event.get("log") or {},
63
+ },
64
+ )
65
+ )
66
+ return True
67
+
68
  if kind == "plan":
69
  todos = event.get("todos") or []
70
  self.ui_messages.append(
ui/agent/graph/state.py CHANGED
@@ -17,12 +17,21 @@ class Finding(TypedDict):
17
  summary: str
18
 
19
 
 
 
 
 
 
 
 
20
  class AgentState(TypedDict, total=False):
21
  """Top-level workflow state."""
22
 
23
  user_content: str | list[dict[str, Any]]
24
  history_messages: list[dict[str, Any]]
25
  profile_summary: str
 
 
26
  todos: list[TodoItem]
27
  findings: Annotated[list[Finding], operator.add]
28
  final_answer: str
 
17
  summary: str
18
 
19
 
20
+ class CandidateCountry(TypedDict):
21
+ iso2: str
22
+ name: str
23
+ pathway_hint: str
24
+ label: str
25
+
26
+
27
  class AgentState(TypedDict, total=False):
28
  """Top-level workflow state."""
29
 
30
  user_content: str | list[dict[str, Any]]
31
  history_messages: list[dict[str, Any]]
32
  profile_summary: str
33
+ candidate_countries: list[CandidateCountry]
34
+ discovery_summary: str
35
  todos: list[TodoItem]
36
  findings: Annotated[list[Finding], operator.add]
37
  final_answer: str
ui/agent/graph/workflow.py CHANGED
@@ -5,18 +5,26 @@ from functools import lru_cache
5
 
6
  from langgraph.graph import END, START, StateGraph
7
 
8
- from .nodes import consolidator_node, fan_out_research, planner_node, researcher_node
 
 
 
 
 
 
9
  from .state import AgentState
10
 
11
 
12
  @lru_cache(maxsize=1)
13
  def build_workflow():
14
  builder = StateGraph(AgentState)
 
15
  builder.add_node("planner", planner_node)
16
  builder.add_node("researcher", researcher_node)
17
  builder.add_node("consolidator", consolidator_node)
18
 
19
- builder.add_edge(START, "planner")
 
20
  builder.add_conditional_edges("planner", fan_out_research, ["researcher"])
21
  builder.add_edge("researcher", "consolidator")
22
  builder.add_edge("consolidator", END)
 
5
 
6
  from langgraph.graph import END, START, StateGraph
7
 
8
+ from .nodes import (
9
+ consolidator_node,
10
+ discovery_node,
11
+ fan_out_research,
12
+ planner_node,
13
+ researcher_node,
14
+ )
15
  from .state import AgentState
16
 
17
 
18
  @lru_cache(maxsize=1)
19
  def build_workflow():
20
  builder = StateGraph(AgentState)
21
+ builder.add_node("discovery", discovery_node)
22
  builder.add_node("planner", planner_node)
23
  builder.add_node("researcher", researcher_node)
24
  builder.add_node("consolidator", consolidator_node)
25
 
26
+ builder.add_edge(START, "discovery")
27
+ builder.add_edge("discovery", "planner")
28
  builder.add_conditional_edges("planner", fan_out_research, ["researcher"])
29
  builder.add_edge("researcher", "consolidator")
30
  builder.add_edge("consolidator", END)