| | """ |
| | LangGraph State Definitions for SPARKNET |
| | Defines state schema, enums, and output models for workflows |
| | """ |
| |
|
| | from typing import TypedDict, Annotated, Sequence, Dict, Any, List, Optional |
| | from enum import Enum |
| | from datetime import datetime |
| | from pydantic import BaseModel, Field |
| | from langchain_core.messages import BaseMessage |
| | from langgraph.graph.message import add_messages |
| |
|
| |
|
| | class ScenarioType(str, Enum): |
| | """ |
| | VISTA scenario types. |
| | Each scenario has a dedicated multi-agent workflow. |
| | """ |
| | PATENT_WAKEUP = "patent_wakeup" |
| | AGREEMENT_SAFETY = "agreement_safety" |
| | PARTNER_MATCHING = "partner_matching" |
| | GENERAL = "general" |
| |
|
| |
|
| | class TaskStatus(str, Enum): |
| | """ |
| | Task execution status throughout workflow. |
| | """ |
| | PENDING = "pending" |
| | PLANNING = "planning" |
| | EXECUTING = "executing" |
| | VALIDATING = "validating" |
| | REFINING = "refining" |
| | COMPLETED = "completed" |
| | FAILED = "failed" |
| |
|
| |
|
| | class AgentState(TypedDict): |
| | """ |
| | LangGraph state for SPARKNET workflows. |
| | |
| | This state is passed between all agents in the workflow. |
| | Uses Annotated with add_messages for automatic message history management. |
| | """ |
| |
|
| | |
| | messages: Annotated[Sequence[BaseMessage], add_messages] |
| |
|
| | |
| | task_id: str |
| | task_description: str |
| | scenario: ScenarioType |
| | status: TaskStatus |
| |
|
| | |
| | current_agent: Optional[str] |
| | iteration_count: int |
| | max_iterations: int |
| |
|
| | |
| | subtasks: Optional[List[Dict[str, Any]]] |
| | execution_order: Optional[List[List[str]]] |
| |
|
| | |
| | agent_outputs: Dict[str, Any] |
| | intermediate_results: List[Dict[str, Any]] |
| |
|
| | |
| | validation_score: Optional[float] |
| | validation_feedback: Optional[str] |
| | validation_issues: List[str] |
| | validation_suggestions: List[str] |
| |
|
| | |
| | retrieved_context: List[Dict[str, Any]] |
| | document_metadata: Dict[str, Any] |
| | input_data: Dict[str, Any] |
| |
|
| | |
| | final_output: Optional[Any] |
| | success: bool |
| | error: Optional[str] |
| |
|
| | |
| | start_time: datetime |
| | end_time: Optional[datetime] |
| | execution_time_seconds: Optional[float] |
| |
|
| | |
| | requires_human_approval: bool |
| | human_feedback: Optional[str] |
| |
|
| |
|
| | class WorkflowOutput(BaseModel): |
| | """ |
| | Structured output from SPARKNET workflows. |
| | Used for serialization and API responses. |
| | """ |
| |
|
| | task_id: str = Field(..., description="Unique task identifier") |
| | scenario: ScenarioType = Field(..., description="Scenario type executed") |
| | status: TaskStatus = Field(..., description="Final task status") |
| | success: bool = Field(..., description="Whether task completed successfully") |
| |
|
| | |
| | output: Any = Field(..., description="Primary output/result") |
| | intermediate_results: List[Dict[str, Any]] = Field( |
| | default_factory=list, |
| | description="Intermediate results from agents" |
| | ) |
| |
|
| | |
| | quality_score: Optional[float] = Field( |
| | None, |
| | ge=0.0, |
| | le=1.0, |
| | description="Quality score from validation (0.0-1.0)" |
| | ) |
| | validation_feedback: Optional[str] = Field( |
| | None, |
| | description="Feedback from CriticAgent" |
| | ) |
| |
|
| | |
| | iterations_used: int = Field(..., description="Number of refinement iterations") |
| | execution_time_seconds: float = Field(..., description="Total execution time") |
| | agents_involved: List[str] = Field( |
| | default_factory=list, |
| | description="List of agents that participated" |
| | ) |
| |
|
| | |
| | subtasks: List[Dict[str, Any]] = Field( |
| | default_factory=list, |
| | description="Subtasks created during planning" |
| | ) |
| | agent_outputs: Dict[str, Any] = Field( |
| | default_factory=dict, |
| | description="Outputs from individual agents" |
| | ) |
| |
|
| | |
| | @property |
| | def validation_score(self) -> Optional[float]: |
| | """Alias for quality_score for backward compatibility.""" |
| | return self.quality_score |
| |
|
| | |
| | message_count: int = Field(..., description="Number of messages exchanged") |
| |
|
| | |
| | error: Optional[str] = Field(None, description="Error message if failed") |
| | warnings: List[str] = Field(default_factory=list, description="Warnings during execution") |
| |
|
| | |
| | start_time: datetime = Field(..., description="Workflow start time") |
| | end_time: datetime = Field(..., description="Workflow end time") |
| |
|
| | class Config: |
| | json_schema_extra = { |
| | "example": { |
| | "task_id": "task_12345", |
| | "scenario": "patent_wakeup", |
| | "status": "completed", |
| | "success": True, |
| | "output": { |
| | "valorization_roadmap": "...", |
| | "market_analysis": "...", |
| | "stakeholder_matches": [...] |
| | }, |
| | "quality_score": 0.92, |
| | "validation_feedback": "Excellent quality. All criteria met.", |
| | "iterations_used": 2, |
| | "execution_time_seconds": 45.3, |
| | "agents_involved": ["PlannerAgent", "DocumentAnalysisAgent", "MarketAnalysisAgent", "CriticAgent"], |
| | "message_count": 18, |
| | "start_time": "2025-11-04T10:00:00", |
| | "end_time": "2025-11-04T10:00:45" |
| | } |
| | } |
| |
|
| |
|
| | class ValidationResult(BaseModel): |
| | """ |
| | Structured validation result from CriticAgent. |
| | Compatible with existing CriticAgent implementation. |
| | """ |
| |
|
| | valid: bool = Field(..., description="Whether output meets quality thresholds") |
| | overall_score: float = Field(..., ge=0.0, le=1.0, description="Overall quality score") |
| | dimension_scores: Dict[str, float] = Field( |
| | ..., |
| | description="Scores for individual quality dimensions" |
| | ) |
| | issues: List[str] = Field( |
| | default_factory=list, |
| | description="List of identified issues" |
| | ) |
| | suggestions: List[str] = Field( |
| | default_factory=list, |
| | description="Improvement suggestions" |
| | ) |
| | details: Dict[str, Any] = Field( |
| | default_factory=dict, |
| | description="Additional validation details" |
| | ) |
| |
|
| |
|
| | class SubTask(BaseModel): |
| | """ |
| | Individual subtask from PlannerAgent. |
| | Compatible with existing PlannerAgent implementation. |
| | """ |
| |
|
| | id: str = Field(..., description="Unique subtask ID") |
| | description: str = Field(..., description="What needs to be done") |
| | agent_type: str = Field(..., description="Which agent should handle this") |
| | dependencies: List[str] = Field( |
| | default_factory=list, |
| | description="IDs of subtasks this depends on" |
| | ) |
| | estimated_duration: float = Field( |
| | default=0.0, |
| | description="Estimated duration in seconds" |
| | ) |
| | priority: int = Field(default=0, description="Priority level") |
| | parameters: Dict[str, Any] = Field( |
| | default_factory=dict, |
| | description="Agent-specific parameters" |
| | ) |
| | status: TaskStatus = Field( |
| | default=TaskStatus.PENDING, |
| | description="Current status" |
| | ) |
| |
|
| |
|
| | |
| |
|
| | def create_initial_state( |
| | task_id: str, |
| | task_description: str, |
| | scenario: ScenarioType = ScenarioType.GENERAL, |
| | max_iterations: int = 3, |
| | input_data: Optional[Dict[str, Any]] = None, |
| | ) -> AgentState: |
| | """ |
| | Create initial AgentState for a new workflow. |
| | |
| | Args: |
| | task_id: Unique task identifier |
| | task_description: Natural language task description |
| | scenario: VISTA scenario type |
| | max_iterations: Maximum refinement iterations |
| | input_data: Optional input data for workflow (e.g., patent_path) |
| | |
| | Returns: |
| | Initialized AgentState |
| | """ |
| | return AgentState( |
| | messages=[], |
| | task_id=task_id, |
| | task_description=task_description, |
| | scenario=scenario, |
| | status=TaskStatus.PENDING, |
| | current_agent=None, |
| | iteration_count=0, |
| | max_iterations=max_iterations, |
| | subtasks=None, |
| | execution_order=None, |
| | agent_outputs={}, |
| | intermediate_results=[], |
| | validation_score=None, |
| | validation_feedback=None, |
| | validation_issues=[], |
| | validation_suggestions=[], |
| | retrieved_context=[], |
| | document_metadata={}, |
| | input_data=input_data or {}, |
| | final_output=None, |
| | success=False, |
| | error=None, |
| | start_time=datetime.now(), |
| | end_time=None, |
| | execution_time_seconds=None, |
| | requires_human_approval=False, |
| | human_feedback=None, |
| | ) |
| |
|
| |
|
| | def state_to_output(state: AgentState) -> WorkflowOutput: |
| | """ |
| | Convert AgentState to WorkflowOutput for serialization. |
| | |
| | Args: |
| | state: Current workflow state |
| | |
| | Returns: |
| | WorkflowOutput model |
| | """ |
| | end_time = state.get("end_time") or datetime.now() |
| | execution_time = (end_time - state["start_time"]).total_seconds() |
| |
|
| | |
| | subtasks = state.get("subtasks") |
| | if subtasks is None: |
| | subtasks = [] |
| |
|
| | agent_outputs = state.get("agent_outputs") |
| | if agent_outputs is None: |
| | agent_outputs = {} |
| |
|
| | return WorkflowOutput( |
| | task_id=state["task_id"], |
| | scenario=state["scenario"], |
| | status=state["status"], |
| | success=state["success"], |
| | output=state.get("final_output"), |
| | intermediate_results=state.get("intermediate_results") or [], |
| | quality_score=state.get("validation_score"), |
| | validation_feedback=state.get("validation_feedback"), |
| | iterations_used=state.get("iteration_count", 0), |
| | execution_time_seconds=execution_time, |
| | agents_involved=list(agent_outputs.keys()), |
| | subtasks=subtasks, |
| | agent_outputs=agent_outputs, |
| | message_count=len(state.get("messages") or []), |
| | error=state.get("error"), |
| | warnings=[], |
| | start_time=state["start_time"], |
| | end_time=end_time, |
| | ) |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class Claim(BaseModel): |
| | """Individual patent claim""" |
| | claim_number: int = Field(..., description="Claim number") |
| | claim_type: str = Field(..., description="independent or dependent") |
| | claim_text: str = Field(..., description="Full claim text") |
| | depends_on: Optional[int] = Field(None, description="Parent claim number if dependent") |
| |
|
| |
|
| | class PatentAnalysis(BaseModel): |
| | """Complete patent analysis output from DocumentAnalysisAgent""" |
| | patent_id: str = Field(..., description="Patent identifier") |
| | title: str = Field(..., description="Patent title") |
| | abstract: str = Field(..., description="Patent abstract") |
| |
|
| | |
| | independent_claims: List[Claim] = Field(default_factory=list, description="Independent claims") |
| | dependent_claims: List[Claim] = Field(default_factory=list, description="Dependent claims") |
| | total_claims: int = Field(..., description="Total number of claims") |
| |
|
| | |
| | ipc_classification: List[str] = Field(default_factory=list, description="IPC codes") |
| | technical_domains: List[str] = Field(default_factory=list, description="Technology domains") |
| | key_innovations: List[str] = Field(default_factory=list, description="Key innovations") |
| | novelty_assessment: str = Field(..., description="Assessment of novelty") |
| |
|
| | |
| | trl_level: int = Field(..., ge=1, le=9, description="Technology Readiness Level") |
| | trl_justification: str = Field(..., description="Reasoning for TRL assessment") |
| | commercialization_potential: str = Field(..., description="High, Medium, or Low") |
| | potential_applications: List[str] = Field(default_factory=list, description="Application areas") |
| |
|
| | |
| | inventors: List[str] = Field(default_factory=list, description="Inventor names") |
| | assignees: List[str] = Field(default_factory=list, description="Assignee organizations") |
| | filing_date: Optional[str] = Field(None, description="Filing date") |
| | publication_date: Optional[str] = Field(None, description="Publication date") |
| |
|
| | |
| | confidence_score: float = Field(..., ge=0.0, le=1.0, description="Analysis confidence") |
| | extraction_completeness: float = Field(..., ge=0.0, le=1.0, description="Extraction completeness") |
| |
|
| |
|
| | class MarketOpportunity(BaseModel): |
| | """Individual market opportunity""" |
| | sector: str = Field(..., description="Industry sector name") |
| | sector_description: str = Field(..., description="Sector description") |
| | market_size_usd: Optional[float] = Field(None, description="Market size in USD") |
| | growth_rate_percent: Optional[float] = Field(None, description="Annual growth rate") |
| | technology_fit: str = Field(..., description="Excellent, Good, or Fair") |
| | market_gap: str = Field(..., description="Specific gap this technology fills") |
| | competitive_advantage: str = Field(..., description="Key competitive advantages") |
| | geographic_focus: List[str] = Field(default_factory=list, description="Target regions") |
| | time_to_market_months: int = Field(..., description="Estimated time to market") |
| | risk_level: str = Field(..., description="Low, Medium, or High") |
| | priority_score: float = Field(..., ge=0.0, le=1.0, description="Priority ranking") |
| |
|
| |
|
| | class MarketAnalysis(BaseModel): |
| | """Complete market analysis output from MarketAnalysisAgent""" |
| | opportunities: List[MarketOpportunity] = Field(default_factory=list, description="Market opportunities") |
| | top_sectors: List[str] = Field(default_factory=list, description="Top 3 sectors by priority") |
| |
|
| | |
| | total_addressable_market_usd: Optional[float] = Field(None, description="Total addressable market") |
| | market_readiness: str = Field(..., description="Ready, Emerging, or Early") |
| | competitive_landscape: str = Field(..., description="Competitive landscape assessment") |
| | regulatory_considerations: List[str] = Field(default_factory=list, description="Regulatory issues") |
| |
|
| | |
| | recommended_focus: str = Field(..., description="Recommended market focus") |
| | strategic_positioning: str = Field(..., description="Strategic positioning advice") |
| | go_to_market_strategy: str = Field(..., description="Go-to-market strategy") |
| |
|
| | |
| | confidence_score: float = Field(..., ge=0.0, le=1.0, description="Analysis confidence") |
| | research_depth: int = Field(..., description="Number of sources consulted") |
| |
|
| |
|
| | class StakeholderMatch(BaseModel): |
| | """Match between patent and potential partner""" |
| | stakeholder_name: str = Field(..., description="Stakeholder name") |
| | stakeholder_type: str = Field(..., description="Investor, Company, University, etc.") |
| |
|
| | |
| | location: str = Field(..., description="Geographic location") |
| | contact_info: Optional[Dict] = Field(None, description="Contact details") |
| |
|
| | |
| | overall_fit_score: float = Field(..., ge=0.0, le=1.0, description="Overall match score") |
| | technical_fit: float = Field(..., ge=0.0, le=1.0, description="Technical capability match") |
| | market_fit: float = Field(..., ge=0.0, le=1.0, description="Market sector alignment") |
| | geographic_fit: float = Field(..., ge=0.0, le=1.0, description="Geographic compatibility") |
| | strategic_fit: float = Field(..., ge=0.0, le=1.0, description="Strategic alignment") |
| |
|
| | |
| | match_rationale: str = Field(..., description="Why this is a good match") |
| | collaboration_opportunities: List[str] = Field(default_factory=list, description="Potential collaborations") |
| | potential_value: str = Field(..., description="High, Medium, or Low") |
| |
|
| | |
| | recommended_approach: str = Field(..., description="How to approach this stakeholder") |
| | talking_points: List[str] = Field(default_factory=list, description="Key talking points") |
| |
|
| |
|
| | class ValorizationBrief(BaseModel): |
| | """Complete valorization package from OutreachAgent""" |
| | patent_id: str = Field(..., description="Patent identifier") |
| |
|
| | |
| | content: str = Field(..., description="Full markdown content") |
| | pdf_path: str = Field(..., description="Path to generated PDF") |
| |
|
| | |
| | executive_summary: str = Field(..., description="Executive summary") |
| | technology_overview: str = Field(..., description="Technology overview section") |
| | market_analysis_summary: str = Field(..., description="Market analysis summary") |
| | partner_recommendations: str = Field(..., description="Partner recommendations") |
| |
|
| | |
| | top_opportunities: List[str] = Field(default_factory=list, description="Top market opportunities") |
| | recommended_partners: List[str] = Field(default_factory=list, description="Top 5 partners") |
| | key_takeaways: List[str] = Field(default_factory=list, description="Key takeaways") |
| |
|
| | |
| | generated_date: str = Field(..., description="Generation date") |
| | version: str = Field(default="1.0", description="Document version") |
| |
|