|
|
""" |
|
|
LangGraph State Definitions for SPARKNET |
|
|
Defines state schema, enums, and output models for workflows |
|
|
""" |
|
|
|
|
|
from typing import TypedDict, Annotated, Sequence, Dict, Any, List, Optional |
|
|
from enum import Enum |
|
|
from datetime import datetime |
|
|
from pydantic import BaseModel, Field |
|
|
from langchain_core.messages import BaseMessage |
|
|
from langgraph.graph.message import add_messages |
|
|
|
|
|
|
|
|
class ScenarioType(str, Enum): |
|
|
""" |
|
|
VISTA scenario types. |
|
|
Each scenario has a dedicated multi-agent workflow. |
|
|
""" |
|
|
PATENT_WAKEUP = "patent_wakeup" |
|
|
AGREEMENT_SAFETY = "agreement_safety" |
|
|
PARTNER_MATCHING = "partner_matching" |
|
|
GENERAL = "general" |
|
|
|
|
|
|
|
|
class TaskStatus(str, Enum): |
|
|
""" |
|
|
Task execution status throughout workflow. |
|
|
""" |
|
|
PENDING = "pending" |
|
|
PLANNING = "planning" |
|
|
EXECUTING = "executing" |
|
|
VALIDATING = "validating" |
|
|
REFINING = "refining" |
|
|
COMPLETED = "completed" |
|
|
FAILED = "failed" |
|
|
|
|
|
|
|
|
class AgentState(TypedDict): |
|
|
""" |
|
|
LangGraph state for SPARKNET workflows. |
|
|
|
|
|
This state is passed between all agents in the workflow. |
|
|
Uses Annotated with add_messages for automatic message history management. |
|
|
""" |
|
|
|
|
|
|
|
|
messages: Annotated[Sequence[BaseMessage], add_messages] |
|
|
|
|
|
|
|
|
task_id: str |
|
|
task_description: str |
|
|
scenario: ScenarioType |
|
|
status: TaskStatus |
|
|
|
|
|
|
|
|
current_agent: Optional[str] |
|
|
iteration_count: int |
|
|
max_iterations: int |
|
|
|
|
|
|
|
|
subtasks: Optional[List[Dict[str, Any]]] |
|
|
execution_order: Optional[List[List[str]]] |
|
|
|
|
|
|
|
|
agent_outputs: Dict[str, Any] |
|
|
intermediate_results: List[Dict[str, Any]] |
|
|
|
|
|
|
|
|
validation_score: Optional[float] |
|
|
validation_feedback: Optional[str] |
|
|
validation_issues: List[str] |
|
|
validation_suggestions: List[str] |
|
|
|
|
|
|
|
|
retrieved_context: List[Dict[str, Any]] |
|
|
document_metadata: Dict[str, Any] |
|
|
input_data: Dict[str, Any] |
|
|
|
|
|
|
|
|
final_output: Optional[Any] |
|
|
success: bool |
|
|
error: Optional[str] |
|
|
|
|
|
|
|
|
start_time: datetime |
|
|
end_time: Optional[datetime] |
|
|
execution_time_seconds: Optional[float] |
|
|
|
|
|
|
|
|
requires_human_approval: bool |
|
|
human_feedback: Optional[str] |
|
|
|
|
|
|
|
|
class WorkflowOutput(BaseModel): |
|
|
""" |
|
|
Structured output from SPARKNET workflows. |
|
|
Used for serialization and API responses. |
|
|
""" |
|
|
|
|
|
task_id: str = Field(..., description="Unique task identifier") |
|
|
scenario: ScenarioType = Field(..., description="Scenario type executed") |
|
|
status: TaskStatus = Field(..., description="Final task status") |
|
|
success: bool = Field(..., description="Whether task completed successfully") |
|
|
|
|
|
|
|
|
output: Any = Field(..., description="Primary output/result") |
|
|
intermediate_results: List[Dict[str, Any]] = Field( |
|
|
default_factory=list, |
|
|
description="Intermediate results from agents" |
|
|
) |
|
|
|
|
|
|
|
|
quality_score: Optional[float] = Field( |
|
|
None, |
|
|
ge=0.0, |
|
|
le=1.0, |
|
|
description="Quality score from validation (0.0-1.0)" |
|
|
) |
|
|
validation_feedback: Optional[str] = Field( |
|
|
None, |
|
|
description="Feedback from CriticAgent" |
|
|
) |
|
|
|
|
|
|
|
|
iterations_used: int = Field(..., description="Number of refinement iterations") |
|
|
execution_time_seconds: float = Field(..., description="Total execution time") |
|
|
agents_involved: List[str] = Field( |
|
|
default_factory=list, |
|
|
description="List of agents that participated" |
|
|
) |
|
|
|
|
|
|
|
|
subtasks: List[Dict[str, Any]] = Field( |
|
|
default_factory=list, |
|
|
description="Subtasks created during planning" |
|
|
) |
|
|
agent_outputs: Dict[str, Any] = Field( |
|
|
default_factory=dict, |
|
|
description="Outputs from individual agents" |
|
|
) |
|
|
|
|
|
|
|
|
@property |
|
|
def validation_score(self) -> Optional[float]: |
|
|
"""Alias for quality_score for backward compatibility.""" |
|
|
return self.quality_score |
|
|
|
|
|
|
|
|
message_count: int = Field(..., description="Number of messages exchanged") |
|
|
|
|
|
|
|
|
error: Optional[str] = Field(None, description="Error message if failed") |
|
|
warnings: List[str] = Field(default_factory=list, description="Warnings during execution") |
|
|
|
|
|
|
|
|
start_time: datetime = Field(..., description="Workflow start time") |
|
|
end_time: datetime = Field(..., description="Workflow end time") |
|
|
|
|
|
class Config: |
|
|
json_schema_extra = { |
|
|
"example": { |
|
|
"task_id": "task_12345", |
|
|
"scenario": "patent_wakeup", |
|
|
"status": "completed", |
|
|
"success": True, |
|
|
"output": { |
|
|
"valorization_roadmap": "...", |
|
|
"market_analysis": "...", |
|
|
"stakeholder_matches": [...] |
|
|
}, |
|
|
"quality_score": 0.92, |
|
|
"validation_feedback": "Excellent quality. All criteria met.", |
|
|
"iterations_used": 2, |
|
|
"execution_time_seconds": 45.3, |
|
|
"agents_involved": ["PlannerAgent", "DocumentAnalysisAgent", "MarketAnalysisAgent", "CriticAgent"], |
|
|
"message_count": 18, |
|
|
"start_time": "2025-11-04T10:00:00", |
|
|
"end_time": "2025-11-04T10:00:45" |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
class ValidationResult(BaseModel): |
|
|
""" |
|
|
Structured validation result from CriticAgent. |
|
|
Compatible with existing CriticAgent implementation. |
|
|
""" |
|
|
|
|
|
valid: bool = Field(..., description="Whether output meets quality thresholds") |
|
|
overall_score: float = Field(..., ge=0.0, le=1.0, description="Overall quality score") |
|
|
dimension_scores: Dict[str, float] = Field( |
|
|
..., |
|
|
description="Scores for individual quality dimensions" |
|
|
) |
|
|
issues: List[str] = Field( |
|
|
default_factory=list, |
|
|
description="List of identified issues" |
|
|
) |
|
|
suggestions: List[str] = Field( |
|
|
default_factory=list, |
|
|
description="Improvement suggestions" |
|
|
) |
|
|
details: Dict[str, Any] = Field( |
|
|
default_factory=dict, |
|
|
description="Additional validation details" |
|
|
) |
|
|
|
|
|
|
|
|
class SubTask(BaseModel): |
|
|
""" |
|
|
Individual subtask from PlannerAgent. |
|
|
Compatible with existing PlannerAgent implementation. |
|
|
""" |
|
|
|
|
|
id: str = Field(..., description="Unique subtask ID") |
|
|
description: str = Field(..., description="What needs to be done") |
|
|
agent_type: str = Field(..., description="Which agent should handle this") |
|
|
dependencies: List[str] = Field( |
|
|
default_factory=list, |
|
|
description="IDs of subtasks this depends on" |
|
|
) |
|
|
estimated_duration: float = Field( |
|
|
default=0.0, |
|
|
description="Estimated duration in seconds" |
|
|
) |
|
|
priority: int = Field(default=0, description="Priority level") |
|
|
parameters: Dict[str, Any] = Field( |
|
|
default_factory=dict, |
|
|
description="Agent-specific parameters" |
|
|
) |
|
|
status: TaskStatus = Field( |
|
|
default=TaskStatus.PENDING, |
|
|
description="Current status" |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def create_initial_state( |
|
|
task_id: str, |
|
|
task_description: str, |
|
|
scenario: ScenarioType = ScenarioType.GENERAL, |
|
|
max_iterations: int = 3, |
|
|
input_data: Optional[Dict[str, Any]] = None, |
|
|
) -> AgentState: |
|
|
""" |
|
|
Create initial AgentState for a new workflow. |
|
|
|
|
|
Args: |
|
|
task_id: Unique task identifier |
|
|
task_description: Natural language task description |
|
|
scenario: VISTA scenario type |
|
|
max_iterations: Maximum refinement iterations |
|
|
input_data: Optional input data for workflow (e.g., patent_path) |
|
|
|
|
|
Returns: |
|
|
Initialized AgentState |
|
|
""" |
|
|
return AgentState( |
|
|
messages=[], |
|
|
task_id=task_id, |
|
|
task_description=task_description, |
|
|
scenario=scenario, |
|
|
status=TaskStatus.PENDING, |
|
|
current_agent=None, |
|
|
iteration_count=0, |
|
|
max_iterations=max_iterations, |
|
|
subtasks=None, |
|
|
execution_order=None, |
|
|
agent_outputs={}, |
|
|
intermediate_results=[], |
|
|
validation_score=None, |
|
|
validation_feedback=None, |
|
|
validation_issues=[], |
|
|
validation_suggestions=[], |
|
|
retrieved_context=[], |
|
|
document_metadata={}, |
|
|
input_data=input_data or {}, |
|
|
final_output=None, |
|
|
success=False, |
|
|
error=None, |
|
|
start_time=datetime.now(), |
|
|
end_time=None, |
|
|
execution_time_seconds=None, |
|
|
requires_human_approval=False, |
|
|
human_feedback=None, |
|
|
) |
|
|
|
|
|
|
|
|
def state_to_output(state: AgentState) -> WorkflowOutput: |
|
|
""" |
|
|
Convert AgentState to WorkflowOutput for serialization. |
|
|
|
|
|
Args: |
|
|
state: Current workflow state |
|
|
|
|
|
Returns: |
|
|
WorkflowOutput model |
|
|
""" |
|
|
end_time = state.get("end_time") or datetime.now() |
|
|
execution_time = (end_time - state["start_time"]).total_seconds() |
|
|
|
|
|
|
|
|
subtasks = state.get("subtasks") |
|
|
if subtasks is None: |
|
|
subtasks = [] |
|
|
|
|
|
agent_outputs = state.get("agent_outputs") |
|
|
if agent_outputs is None: |
|
|
agent_outputs = {} |
|
|
|
|
|
return WorkflowOutput( |
|
|
task_id=state["task_id"], |
|
|
scenario=state["scenario"], |
|
|
status=state["status"], |
|
|
success=state["success"], |
|
|
output=state.get("final_output"), |
|
|
intermediate_results=state.get("intermediate_results") or [], |
|
|
quality_score=state.get("validation_score"), |
|
|
validation_feedback=state.get("validation_feedback"), |
|
|
iterations_used=state.get("iteration_count", 0), |
|
|
execution_time_seconds=execution_time, |
|
|
agents_involved=list(agent_outputs.keys()), |
|
|
subtasks=subtasks, |
|
|
agent_outputs=agent_outputs, |
|
|
message_count=len(state.get("messages") or []), |
|
|
error=state.get("error"), |
|
|
warnings=[], |
|
|
start_time=state["start_time"], |
|
|
end_time=end_time, |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Claim(BaseModel): |
|
|
"""Individual patent claim""" |
|
|
claim_number: int = Field(..., description="Claim number") |
|
|
claim_type: str = Field(..., description="independent or dependent") |
|
|
claim_text: str = Field(..., description="Full claim text") |
|
|
depends_on: Optional[int] = Field(None, description="Parent claim number if dependent") |
|
|
|
|
|
|
|
|
class PatentAnalysis(BaseModel): |
|
|
"""Complete patent analysis output from DocumentAnalysisAgent""" |
|
|
patent_id: str = Field(..., description="Patent identifier") |
|
|
title: str = Field(..., description="Patent title") |
|
|
abstract: str = Field(..., description="Patent abstract") |
|
|
|
|
|
|
|
|
independent_claims: List[Claim] = Field(default_factory=list, description="Independent claims") |
|
|
dependent_claims: List[Claim] = Field(default_factory=list, description="Dependent claims") |
|
|
total_claims: int = Field(..., description="Total number of claims") |
|
|
|
|
|
|
|
|
ipc_classification: List[str] = Field(default_factory=list, description="IPC codes") |
|
|
technical_domains: List[str] = Field(default_factory=list, description="Technology domains") |
|
|
key_innovations: List[str] = Field(default_factory=list, description="Key innovations") |
|
|
novelty_assessment: str = Field(..., description="Assessment of novelty") |
|
|
|
|
|
|
|
|
trl_level: int = Field(..., ge=1, le=9, description="Technology Readiness Level") |
|
|
trl_justification: str = Field(..., description="Reasoning for TRL assessment") |
|
|
commercialization_potential: str = Field(..., description="High, Medium, or Low") |
|
|
potential_applications: List[str] = Field(default_factory=list, description="Application areas") |
|
|
|
|
|
|
|
|
inventors: List[str] = Field(default_factory=list, description="Inventor names") |
|
|
assignees: List[str] = Field(default_factory=list, description="Assignee organizations") |
|
|
filing_date: Optional[str] = Field(None, description="Filing date") |
|
|
publication_date: Optional[str] = Field(None, description="Publication date") |
|
|
|
|
|
|
|
|
confidence_score: float = Field(..., ge=0.0, le=1.0, description="Analysis confidence") |
|
|
extraction_completeness: float = Field(..., ge=0.0, le=1.0, description="Extraction completeness") |
|
|
|
|
|
|
|
|
class MarketOpportunity(BaseModel): |
|
|
"""Individual market opportunity""" |
|
|
sector: str = Field(..., description="Industry sector name") |
|
|
sector_description: str = Field(..., description="Sector description") |
|
|
market_size_usd: Optional[float] = Field(None, description="Market size in USD") |
|
|
growth_rate_percent: Optional[float] = Field(None, description="Annual growth rate") |
|
|
technology_fit: str = Field(..., description="Excellent, Good, or Fair") |
|
|
market_gap: str = Field(..., description="Specific gap this technology fills") |
|
|
competitive_advantage: str = Field(..., description="Key competitive advantages") |
|
|
geographic_focus: List[str] = Field(default_factory=list, description="Target regions") |
|
|
time_to_market_months: int = Field(..., description="Estimated time to market") |
|
|
risk_level: str = Field(..., description="Low, Medium, or High") |
|
|
priority_score: float = Field(..., ge=0.0, le=1.0, description="Priority ranking") |
|
|
|
|
|
|
|
|
class MarketAnalysis(BaseModel): |
|
|
"""Complete market analysis output from MarketAnalysisAgent""" |
|
|
opportunities: List[MarketOpportunity] = Field(default_factory=list, description="Market opportunities") |
|
|
top_sectors: List[str] = Field(default_factory=list, description="Top 3 sectors by priority") |
|
|
|
|
|
|
|
|
total_addressable_market_usd: Optional[float] = Field(None, description="Total addressable market") |
|
|
market_readiness: str = Field(..., description="Ready, Emerging, or Early") |
|
|
competitive_landscape: str = Field(..., description="Competitive landscape assessment") |
|
|
regulatory_considerations: List[str] = Field(default_factory=list, description="Regulatory issues") |
|
|
|
|
|
|
|
|
recommended_focus: str = Field(..., description="Recommended market focus") |
|
|
strategic_positioning: str = Field(..., description="Strategic positioning advice") |
|
|
go_to_market_strategy: str = Field(..., description="Go-to-market strategy") |
|
|
|
|
|
|
|
|
confidence_score: float = Field(..., ge=0.0, le=1.0, description="Analysis confidence") |
|
|
research_depth: int = Field(..., description="Number of sources consulted") |
|
|
|
|
|
|
|
|
class StakeholderMatch(BaseModel): |
|
|
"""Match between patent and potential partner""" |
|
|
stakeholder_name: str = Field(..., description="Stakeholder name") |
|
|
stakeholder_type: str = Field(..., description="Investor, Company, University, etc.") |
|
|
|
|
|
|
|
|
location: str = Field(..., description="Geographic location") |
|
|
contact_info: Optional[Dict] = Field(None, description="Contact details") |
|
|
|
|
|
|
|
|
overall_fit_score: float = Field(..., ge=0.0, le=1.0, description="Overall match score") |
|
|
technical_fit: float = Field(..., ge=0.0, le=1.0, description="Technical capability match") |
|
|
market_fit: float = Field(..., ge=0.0, le=1.0, description="Market sector alignment") |
|
|
geographic_fit: float = Field(..., ge=0.0, le=1.0, description="Geographic compatibility") |
|
|
strategic_fit: float = Field(..., ge=0.0, le=1.0, description="Strategic alignment") |
|
|
|
|
|
|
|
|
match_rationale: str = Field(..., description="Why this is a good match") |
|
|
collaboration_opportunities: List[str] = Field(default_factory=list, description="Potential collaborations") |
|
|
potential_value: str = Field(..., description="High, Medium, or Low") |
|
|
|
|
|
|
|
|
recommended_approach: str = Field(..., description="How to approach this stakeholder") |
|
|
talking_points: List[str] = Field(default_factory=list, description="Key talking points") |
|
|
|
|
|
|
|
|
class ValorizationBrief(BaseModel): |
|
|
"""Complete valorization package from OutreachAgent""" |
|
|
patent_id: str = Field(..., description="Patent identifier") |
|
|
|
|
|
|
|
|
content: str = Field(..., description="Full markdown content") |
|
|
pdf_path: str = Field(..., description="Path to generated PDF") |
|
|
|
|
|
|
|
|
executive_summary: str = Field(..., description="Executive summary") |
|
|
technology_overview: str = Field(..., description="Technology overview section") |
|
|
market_analysis_summary: str = Field(..., description="Market analysis summary") |
|
|
partner_recommendations: str = Field(..., description="Partner recommendations") |
|
|
|
|
|
|
|
|
top_opportunities: List[str] = Field(default_factory=list, description="Top market opportunities") |
|
|
recommended_partners: List[str] = Field(default_factory=list, description="Top 5 partners") |
|
|
key_takeaways: List[str] = Field(default_factory=list, description="Key takeaways") |
|
|
|
|
|
|
|
|
generated_date: str = Field(..., description="Generation date") |
|
|
version: str = Field(default="1.0", description="Document version") |
|
|
|