Spaces:
Sleeping
Sleeping
File size: 10,339 Bytes
876196f 3856e1d 876196f 3856e1d 876196f | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 | """
LangGraph workflow for the Polymer Datasheet Crawler Agent.
Workflow:
βββββββββββ
β router β ββ decides search vs upload path
ββββββ¬ββββββ
β
ββββββΌβββββββββββ ββββββββββββββββββββ
β web_search β β process_upload β
ββββββ¬βββββββββββ βββββββββ¬βββββββββββ
β β
ββββββββββββ¬ββββββββββββββ
βββββββΌββββββ
β llm_parse β ββ calls LLaMA 3.1 to extract properties
βββββββ¬ββββββ
βββββββΌββββββ
β store_db β ββ persists to SQLite
βββββββ¬ββββββ
βββββββΌββββββ
β finalize β ββ formats output
βββββββββββββ
"""
from __future__ import annotations
import logging
from typing import Any, Literal
from langgraph.graph import END, StateGraph
from database import DatasheetDB
from llm_parser import parse_datasheet, parse_uploaded_text
from models import AgentState
from web_crawler import search_datasheets, _pick_best_source_url
logger = logging.getLogger(__name__)
# ββ Shared DB instance βββββββββββββββββββββββββββββββββββββββββββββββββββββββ
db = DatasheetDB()
# ββ Node functions βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def router_node(state: dict[str, Any]) -> dict[str, Any]:
"""Determine whether we're doing a web search or processing an upload."""
input_mode = state.get("input_mode", "search")
logger.info("Router: mode=%s", input_mode)
return {"input_mode": input_mode}
def web_search_node(state: dict[str, Any]) -> dict[str, Any]:
"""Execute Tavily web search for polymer datasheets."""
manufacturer = state.get("manufacturer", "")
polymer_family = state.get("polymer_family", "")
grade = state.get("grade", "")
logger.info(
"Web search: manufacturer=%s, polymer=%s, grade=%s",
manufacturer, polymer_family, grade,
)
results, raw_content = search_datasheets(
manufacturer=manufacturer,
polymer_family=polymer_family,
grade=grade,
)
# Pick the best non-PDF source URL
source_url = _pick_best_source_url(results) if results else ""
return {
"search_results": results,
"raw_content": raw_content,
"source_url": source_url,
"status": "searched" if raw_content else "no_results",
"message": (
f"Found {len(results)} sources with {len(raw_content)} chars of content."
if raw_content
else "No relevant datasheets found in web search."
),
}
def process_upload_node(state: dict[str, Any]) -> dict[str, Any]:
"""Process user-uploaded datasheet text."""
uploaded_text = state.get("uploaded_text", "")
if not uploaded_text.strip():
return {
"status": "error",
"message": "No text found in uploaded file.",
}
return {
"raw_content": uploaded_text,
"status": "uploaded",
"message": f"Uploaded text: {len(uploaded_text)} chars ready for parsing.",
}
def llm_parse_node(state: dict[str, Any]) -> dict[str, Any]:
"""Call LLaMA 3.1 to extract structured properties from raw content."""
raw_content = state.get("raw_content", "")
if not raw_content.strip():
return {
"status": "error",
"message": "No content available for LLM parsing.",
"parsing_errors": ["Empty raw content"],
}
manufacturer = state.get("manufacturer", "")
polymer_family = state.get("polymer_family", "")
grade = state.get("grade", "")
source_url = state.get("source_url", "")
logger.info("LLM parsing %d chars of raw content...", len(raw_content))
record, errors = parse_datasheet(
raw_content=raw_content,
manufacturer=manufacturer,
polymer_family=polymer_family,
grade=grade,
source_url=source_url,
)
if record:
return {
"parsed_datasheet": record.model_dump(),
"parsing_errors": errors,
"status": "parsed",
"message": f"Successfully extracted datasheet for {record.trade_name or record.material_name}.",
}
else:
return {
"parsing_errors": errors,
"status": "parse_failed",
"message": f"Failed to parse datasheet: {'; '.join(errors)}",
}
def store_db_node(state: dict[str, Any]) -> dict[str, Any]:
"""Store the parsed datasheet in the SQLite database."""
parsed = state.get("parsed_datasheet")
if not parsed:
return {
"status": "error",
"message": "No parsed datasheet to store.",
}
from models import DatasheetRecord
record = DatasheetRecord(**parsed)
record_id = db.upsert(record)
count = db.count()
return {
"status": "stored",
"message": (
f"Stored datasheet '{record.trade_name or record.material_name}' "
f"(ID: {record_id}). Database now has {count} records."
),
}
def finalize_node(state: dict[str, Any]) -> dict[str, Any]:
"""Final node β consolidates the output message."""
status = state.get("status", "unknown")
message = state.get("message", "")
if status in ("stored",):
return {"status": "success", "message": message}
elif status in ("error", "parse_failed", "no_results"):
return {"status": "failed", "message": message}
else:
return {"status": status, "message": message}
# ββ Conditional edges ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def route_by_mode(state: dict[str, Any]) -> Literal["web_search", "process_upload"]:
"""Route to search or upload branch based on input_mode."""
if state.get("input_mode") == "upload":
return "process_upload"
return "web_search"
def route_after_content(state: dict[str, Any]) -> Literal["llm_parse", "finalize"]:
"""Skip LLM parsing if no content was found."""
status = state.get("status", "")
if status in ("no_results", "error"):
return "finalize"
return "llm_parse"
def route_after_parse(state: dict[str, Any]) -> Literal["store_db", "finalize"]:
"""Skip DB storage if parsing failed."""
if state.get("parsed_datasheet"):
return "store_db"
return "finalize"
# ββ Build the graph ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def build_graph() -> StateGraph:
"""Construct and compile the LangGraph workflow."""
workflow = StateGraph(dict)
# Add nodes
workflow.add_node("router", router_node)
workflow.add_node("web_search", web_search_node)
workflow.add_node("process_upload", process_upload_node)
workflow.add_node("llm_parse", llm_parse_node)
workflow.add_node("store_db", store_db_node)
workflow.add_node("finalize", finalize_node)
# Set entry point
workflow.set_entry_point("router")
# Router β search or upload
workflow.add_conditional_edges(
"router",
route_by_mode,
{
"web_search": "web_search",
"process_upload": "process_upload",
},
)
# After content acquisition β parse or finalize
workflow.add_conditional_edges(
"web_search",
route_after_content,
{"llm_parse": "llm_parse", "finalize": "finalize"},
)
workflow.add_conditional_edges(
"process_upload",
route_after_content,
{"llm_parse": "llm_parse", "finalize": "finalize"},
)
# After parsing β store or finalize
workflow.add_conditional_edges(
"llm_parse",
route_after_parse,
{"store_db": "store_db", "finalize": "finalize"},
)
# store_db β finalize β END
workflow.add_edge("store_db", "finalize")
workflow.add_edge("finalize", END)
return workflow.compile()
# ββ Convenience runners ββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def run_search(
manufacturer: str,
polymer_family: str,
grade: str = "",
) -> dict[str, Any]:
"""Run the full workflow in search mode."""
graph = build_graph()
initial_state = {
"input_mode": "search",
"manufacturer": manufacturer,
"polymer_family": polymer_family,
"grade": grade,
}
result = graph.invoke(initial_state)
return result
def run_upload(uploaded_text: str) -> dict[str, Any]:
"""Run the full workflow in upload mode."""
graph = build_graph()
initial_state = {
"input_mode": "upload",
"uploaded_text": uploaded_text,
}
result = graph.invoke(initial_state)
return result
def search_database(
query: str = "",
manufacturer: str = "",
polymer_family: str = "",
) -> Any:
"""Search the existing database."""
return db.search(
query=query,
manufacturer=manufacturer,
polymer_family=polymer_family,
)
def get_database_summary() -> Any:
"""Get summary of all records in the database."""
return db.get_summary_dataframe()
|