mbudisic commited on
Commit
96121d7
·
1 Parent(s): 583756a

feat: Implement lazy graph initialization for LangGraph compatibility

Browse files

- Introduced a lazy initialization pattern for the LangGraph to optimize module imports and prevent unnecessary compilation.
- Added a factory function `graph` to handle graph and datastore initialization on demand.
- Updated `initialize` function to manage datastore and graph builder setup.
- Enhanced documentation in DEVELOPER.md to explain the new lazy initialization feature and its benefits.

Files changed (2) hide show
  1. docs/DEVELOPER.md +55 -0
  2. pstuts_rag/pstuts_rag/nodes.py +80 -42
docs/DEVELOPER.md CHANGED
@@ -158,6 +158,61 @@ ipdb # Available for interactive debugging
158
  - `evaluate_rag.ipynb` for systematic evaluation
159
  - Fine-tuning experiments in `Fine_Tuning_Embedding_for_PSTuts.ipynb`
160
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
161
  ## 🏗️ Architecture Notes
162
 
163
  - **Embedding models**: Uses custom fine-tuned `snowflake-arctic-embed-s-ft-pstuts` by default
 
158
  - `evaluate_rag.ipynb` for systematic evaluation
159
  - Fine-tuning experiments in `Fine_Tuning_Embedding_for_PSTuts.ipynb`
160
 
161
+ ## 🌊 Lazy Graph Initialization
162
+
163
+ The project uses a **lazy initialization pattern** for the LangGraph to avoid expensive compilation during module imports while maintaining compatibility with LangGraph Studio.
164
+
165
+ ### 🔧 Implementation Pattern
166
+
167
+ ```python
168
+ # In pstuts_rag/nodes.py
169
+ _compiled_graph = None
170
+
171
+ def graph(config: RunnableConfig = None):
172
+ """Graph factory function for LangGraph Studio compatibility.
173
+
174
+ This function provides lazy initialization of the graph and datastore,
175
+ allowing the module to be imported without triggering compilation.
176
+ LangGraph Studio requires this function to take exactly one RunnableConfig argument.
177
+
178
+ Args:
179
+ config: RunnableConfig (required by LangGraph Studio, but can be None)
180
+
181
+ Returns:
182
+ Compiled LangGraph instance
183
+ """
184
+ global _compiled_graph
185
+ if _compiled_graph is None:
186
+ _compiled_graph = graph_builder.compile()
187
+ # Initialize datastore when graph is first accessed
188
+ asyncio.run(datastore.from_json_globs(Configuration().transcript_glob))
189
+ return _compiled_graph
190
+
191
+ def get_graph():
192
+ """Convenience function to get the compiled graph without config argument."""
193
+ return graph()
194
+ ```
195
+
196
+ ### 🎯 Benefits
197
+
198
+ - **Fast imports**: Module loading doesn't trigger graph compilation 🚀
199
+ - **LangGraph Studio compatibility**: Maintains expected `graph` variable for discovery 🛠️
200
+ - **On-demand initialization**: Graph and datastore only initialize when actually used ⚡
201
+ - **Memory efficiency**: Resources allocated only when needed 💾
202
+
203
+ ### 📄 Studio Configuration
204
+
205
+ The `langgraph.json` file correctly references the factory function:
206
+ ```json
207
+ {
208
+ "graphs": {
209
+ "enhanced_video_archive": "./pstuts_rag/pstuts_rag/nodes.py:graph"
210
+ }
211
+ }
212
+ ```
213
+
214
+ When LangGraph Studio accesses the `graph` function, it automatically triggers lazy initialization and provides the compiled graph instance. The factory function pattern ensures compatibility while maintaining performance benefits.
215
+
216
  ## 🏗️ Architecture Notes
217
 
218
  - **Embedding models**: Uses custom fine-tuned `snowflake-arctic-embed-s-ft-pstuts` by default
pstuts_rag/pstuts_rag/nodes.py CHANGED
@@ -1,7 +1,7 @@
1
  # nodes.py
2
  from enum import Enum
3
- from typing import Annotated, Any, Callable, Dict, Literal
4
-
5
  import asyncio
6
  import logging
7
  import operator
@@ -36,10 +36,6 @@ class TutorialState(MessagesState):
36
  loop_count: int
37
 
38
 
39
- datastore = DatastoreManager()
40
- datastore.add_completion_callback(lambda: logging.warning("Loading complete."))
41
-
42
-
43
  def research(state: TutorialState, config: RunnableConfig):
44
  """Generate a research query based on conversation history and current query.
45
 
@@ -130,7 +126,9 @@ async def search_help(
130
  return {"messages": [url_summary], "url_references": results["results"]}
131
 
132
 
133
- async def search_rag(state: TutorialState, config: RunnableConfig):
 
 
134
  """Search tutorial transcripts using RAG (Retrieval-Augmented Generation).
135
 
136
  Args:
@@ -151,18 +149,6 @@ async def search_rag(state: TutorialState, config: RunnableConfig):
151
  }
152
 
153
 
154
- def join(state: TutorialState, config: RunnableConfig):
155
- """Join/merge results from multiple search sources.
156
-
157
- Args:
158
- state: Current TutorialState with search results
159
- config: RunnableConfig for accessing configuration parameters
160
-
161
- Returns:
162
- None: Currently a placeholder function
163
- """
164
- pass
165
-
166
 
167
  def write_answer(state: TutorialState, config: RunnableConfig):
168
  """Write a preliminary answer (placeholder function).
@@ -346,32 +332,84 @@ def write_answer(state: TutorialState, config: RunnableConfig):
346
  return {"messages": [final_answer]}
347
 
348
 
349
- graph_builder = StateGraph(TutorialState)
 
 
 
350
 
351
- # graph_builder.add_node(route_is_relevant)
352
- # graph_builder.add_node(route_is_complete, defer=True)
353
- graph_builder.add_node(research)
354
- graph_builder.add_node(search_help)
355
- graph_builder.add_node(search_rag)
356
- graph_builder.add_node(join)
357
- graph_builder.add_node(write_answer)
358
 
359
- # graph_builder.add_conditional_edges(
360
- # START,
361
- # route_is_relevant,
362
- # {"yes": research.__name__, "no": write_answer.__name__},
363
- # )
364
- graph_builder.add_node(route_is_relevant)
365
- graph_builder.add_node(route_is_complete, defer=True)
366
 
367
- graph_builder.add_edge(START, route_is_relevant.__name__)
368
- graph_builder.add_edge(research.__name__, search_help.__name__)
369
- graph_builder.add_edge(research.__name__, search_rag.__name__)
370
- graph_builder.add_edge(search_help.__name__, route_is_complete.__name__)
371
- graph_builder.add_edge(search_rag.__name__, route_is_complete.__name__)
 
 
 
372
 
373
- graph_builder.add_edge(write_answer.__name__, END)
 
 
 
 
 
 
374
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
375
 
376
- graph = graph_builder.compile()
377
- asyncio.run(datastore.from_json_globs(Configuration().transcript_glob))
 
1
  # nodes.py
2
  from enum import Enum
3
+ from typing import Annotated, Any, Callable, Dict, Literal, Tuple
4
+ import functools
5
  import asyncio
6
  import logging
7
  import operator
 
36
  loop_count: int
37
 
38
 
 
 
 
 
39
  def research(state: TutorialState, config: RunnableConfig):
40
  """Generate a research query based on conversation history and current query.
41
 
 
126
  return {"messages": [url_summary], "url_references": results["results"]}
127
 
128
 
129
+ async def search_rag(
130
+ state: TutorialState, config: RunnableConfig, datastore: DatastoreManager
131
+ ):
132
  """Search tutorial transcripts using RAG (Retrieval-Augmented Generation).
133
 
134
  Args:
 
149
  }
150
 
151
 
 
 
 
 
 
 
 
 
 
 
 
 
152
 
153
  def write_answer(state: TutorialState, config: RunnableConfig):
154
  """Write a preliminary answer (placeholder function).
 
332
  return {"messages": [final_answer]}
333
 
334
 
335
+ # Lazy initialization: compiled graph is cached
336
+ _compiled_graph = None
337
+ _datastore = None
338
+
339
 
340
+ def initialize(
341
+ datastore: DatastoreManager | None = None,
342
+ ) -> Tuple[DatastoreManager, StateGraph]:
343
+ if datastore is None:
344
+ datastore = DatastoreManager(
345
+ config=Configuration()
346
+ ).add_completion_callback(lambda: "Datastore loading completed.")
347
 
348
+ graph_builder = StateGraph(TutorialState)
 
 
 
 
 
 
349
 
350
+ # graph_builder.add_node(route_is_relevant)
351
+ # graph_builder.add_node(route_is_complete, defer=True)
352
+ graph_builder.add_node(research)
353
+ graph_builder.add_node(search_help)
354
+ graph_builder.add_node(
355
+ "search_rag", functools.partial(search_rag, datastore=datastore)
356
+ )
357
+ graph_builder.add_node(write_answer)
358
 
359
+ # graph_builder.add_conditional_edges(
360
+ # START,
361
+ # route_is_relevant,
362
+ # {"yes": research.__name__, "no": write_answer.__name__},
363
+ # )
364
+ graph_builder.add_node(route_is_relevant)
365
+ graph_builder.add_node(route_is_complete, defer=True)
366
 
367
+ graph_builder.add_edge(START, route_is_relevant.__name__)
368
+ graph_builder.add_edge(research.__name__, search_help.__name__)
369
+ graph_builder.add_edge(research.__name__, search_rag.__name__)
370
+ graph_builder.add_edge(search_help.__name__, route_is_complete.__name__)
371
+ graph_builder.add_edge(search_rag.__name__, route_is_complete.__name__)
372
+
373
+ graph_builder.add_edge(write_answer.__name__, END)
374
+
375
+ return datastore, graph_builder
376
+
377
+
378
+ async def graph(config: RunnableConfig = None):
379
+ """Graph factory function for LangGraph Studio compatibility.
380
+
381
+ This function provides lazy initialization of the graph and datastore,
382
+ allowing the module to be imported without triggering compilation.
383
+ LangGraph Studio requires this function to take exactly one
384
+ RunnableConfig argument.
385
+
386
+ Args:
387
+ config: RunnableConfig (required by LangGraph Studio, but can be None)
388
+
389
+ Returns:
390
+ Compiled LangGraph instance
391
+ """
392
+ global _compiled_graph
393
+ global _datastore
394
+
395
+ # Initialize datastore using asyncio.to_thread to avoid blocking
396
+ initialize_datastore: bool = _datastore is None
397
+ if initialize_datastore:
398
+ _datastore = await asyncio.to_thread(
399
+ lambda: DatastoreManager(
400
+ config=Configuration()
401
+ ).add_completion_callback(lambda: "Datastore loading completed.")
402
+ )
403
+
404
+ # Initialize and compile graph synchronously (blocking as intended)
405
+ if _compiled_graph is None:
406
+ _datastore, graph_builder = initialize(_datastore)
407
+ _compiled_graph = graph_builder.compile()
408
+
409
+ # Start datastore population as background task (non-blocking)
410
+ if initialize_datastore:
411
+ asyncio.create_task(
412
+ _datastore.from_json_globs(Configuration().transcript_glob)
413
+ )
414
 
415
+ return _compiled_graph