Spaces:
Sleeping
Sleeping
File size: 8,835 Bytes
f871fed |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 |
import asyncio
import sqlite3
from typing import Annotated, Dict, List, Optional
from ai_prompter import Prompter
from langchain_core.messages import AIMessage, SystemMessage
from langchain_core.runnables import RunnableConfig
from open_notebook.utils import clean_thinking_content
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.graph import END, START, StateGraph
from langgraph.graph.message import add_messages
from typing_extensions import TypedDict
from open_notebook.config import LANGGRAPH_CHECKPOINT_FILE
from open_notebook.domain.notebook import Source, SourceInsight
from open_notebook.graphs.utils import provision_langchain_model
from open_notebook.utils.context_builder import ContextBuilder
class SourceChatState(TypedDict):
messages: Annotated[list, add_messages]
source_id: str
source: Optional[Source]
insights: Optional[List[SourceInsight]]
context: Optional[str]
model_override: Optional[str]
context_indicators: Optional[Dict[str, List[str]]]
def call_model_with_source_context(
state: SourceChatState, config: RunnableConfig
) -> dict:
"""
Main function that builds source context and calls the model.
This function:
1. Uses ContextBuilder to build source-specific context
2. Applies the source_chat Jinja2 prompt template
3. Handles model provisioning with override support
4. Tracks context indicators for referenced insights/content
"""
source_id = state.get("source_id")
if not source_id:
raise ValueError("source_id is required in state")
# Build source context using ContextBuilder (run async code in new loop)
def build_context():
"""Build context in a new event loop"""
new_loop = asyncio.new_event_loop()
try:
asyncio.set_event_loop(new_loop)
context_builder = ContextBuilder(
source_id=source_id,
include_insights=True,
include_notes=False, # Focus on source-specific content
max_tokens=50000, # Reasonable limit for source context
)
return new_loop.run_until_complete(context_builder.build())
finally:
new_loop.close()
asyncio.set_event_loop(None)
# Get the built context
try:
# Try to get the current event loop
asyncio.get_running_loop()
# If we're in an event loop, run in a thread with a new loop
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(build_context)
context_data = future.result()
except RuntimeError:
# No event loop running, safe to create a new one
context_data = build_context()
# Extract source and insights from context
source = None
insights = []
context_indicators: dict[str, list[str | None]] = {
"sources": [],
"insights": [],
"notes": [],
}
if context_data.get("sources"):
source_info = context_data["sources"][0] # First source
source = Source(**source_info) if isinstance(source_info, dict) else source_info
context_indicators["sources"].append(source.id)
if context_data.get("insights"):
for insight_data in context_data["insights"]:
insight = (
SourceInsight(**insight_data)
if isinstance(insight_data, dict)
else insight_data
)
insights.append(insight)
context_indicators["insights"].append(insight.id)
# Format context for the prompt
formatted_context = _format_source_context(context_data)
# Build prompt data for the template
prompt_data = {
"source": source.model_dump() if source else None,
"insights": [insight.model_dump() for insight in insights] if insights else [],
"context": formatted_context,
"context_indicators": context_indicators,
}
# Apply the source_chat prompt template
system_prompt = Prompter(prompt_template="source_chat").render(data=prompt_data)
payload = [SystemMessage(content=system_prompt)] + state.get("messages", [])
# Handle async model provisioning from sync context
def run_in_new_loop():
"""Run the async function in a new event loop"""
new_loop = asyncio.new_event_loop()
try:
asyncio.set_event_loop(new_loop)
return new_loop.run_until_complete(
provision_langchain_model(
str(payload),
config.get("configurable", {}).get("model_id")
or state.get("model_override"),
"chat",
max_tokens=8192,
)
)
finally:
new_loop.close()
asyncio.set_event_loop(None)
try:
# Try to get the current event loop
asyncio.get_running_loop()
# If we're in an event loop, run in a thread with a new loop
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(run_in_new_loop)
model = future.result()
except RuntimeError:
# No event loop running, safe to use asyncio.run()
model = asyncio.run(
provision_langchain_model(
str(payload),
config.get("configurable", {}).get("model_id")
or state.get("model_override"),
"chat",
max_tokens=8192,
)
)
ai_message = model.invoke(payload)
# Clean thinking content from AI response (e.g., <think>...</think> tags)
content = ai_message.content if isinstance(ai_message.content, str) else str(ai_message.content)
cleaned_content = clean_thinking_content(content)
cleaned_message = ai_message.model_copy(update={"content": cleaned_content})
# Update state with context information
return {
"messages": cleaned_message,
"source": source,
"insights": insights,
"context": formatted_context,
"context_indicators": context_indicators,
}
def _format_source_context(context_data: Dict) -> str:
"""
Format the context data into a readable string for the prompt.
Args:
context_data: Context data from ContextBuilder
Returns:
Formatted context string
"""
context_parts = []
# Add source information
if context_data.get("sources"):
context_parts.append("## SOURCE CONTENT")
for source in context_data["sources"]:
if isinstance(source, dict):
context_parts.append(f"**Source ID:** {source.get('id', 'Unknown')}")
context_parts.append(f"**Title:** {source.get('title', 'No title')}")
if source.get("full_text"):
# Truncate full text if too long
full_text = source["full_text"]
if len(full_text) > 5000:
full_text = full_text[:5000] + "...\n[Content truncated]"
context_parts.append(f"**Content:**\n{full_text}")
context_parts.append("") # Empty line for separation
# Add insights
if context_data.get("insights"):
context_parts.append("## SOURCE INSIGHTS")
for insight in context_data["insights"]:
if isinstance(insight, dict):
context_parts.append(f"**Insight ID:** {insight.get('id', 'Unknown')}")
context_parts.append(
f"**Type:** {insight.get('insight_type', 'Unknown')}"
)
context_parts.append(
f"**Content:** {insight.get('content', 'No content')}"
)
context_parts.append("") # Empty line for separation
# Add metadata
if context_data.get("metadata"):
metadata = context_data["metadata"]
context_parts.append("## CONTEXT METADATA")
context_parts.append(f"- Source count: {metadata.get('source_count', 0)}")
context_parts.append(f"- Insight count: {metadata.get('insight_count', 0)}")
context_parts.append(f"- Total tokens: {context_data.get('total_tokens', 0)}")
context_parts.append("")
return "\n".join(context_parts)
# Create SQLite checkpointer
conn = sqlite3.connect(
LANGGRAPH_CHECKPOINT_FILE,
check_same_thread=False,
)
memory = SqliteSaver(conn)
# Create the StateGraph
source_chat_state = StateGraph(SourceChatState)
source_chat_state.add_node("source_chat_agent", call_model_with_source_context)
source_chat_state.add_edge(START, "source_chat_agent")
source_chat_state.add_edge("source_chat_agent", END)
source_chat_graph = source_chat_state.compile(checkpointer=memory)
|