SPARKNET / src /agents /scenario1 /market_analysis_agent.py
MHamdan's picture
Initial commit: SPARKNET framework
a9dc537
"""
MarketAnalysisAgent for Patent Wake-Up Scenario
Analyzes market opportunities for patented technologies:
- Identifies relevant industry sectors
- Assesses market size and growth potential
- Analyzes competitive landscape
- Recommends geographic focus (EU, Canada)
"""
from typing import Optional
from loguru import logger
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import JsonOutputParser
from ..base_agent import BaseAgent, Task
from ...llm.langchain_ollama_client import LangChainOllamaClient
from ...workflow.langgraph_state import PatentAnalysis, MarketAnalysis, MarketOpportunity
class MarketAnalysisAgent(BaseAgent):
"""
Specialized agent for market opportunity analysis.
Uses research tools and LLM reasoning to assess commercialization potential.
"""
def __init__(self, llm_client: LangChainOllamaClient, memory_agent=None):
"""
Initialize MarketAnalysisAgent.
Args:
llm_client: LangChain Ollama client
memory_agent: Optional memory agent for context retrieval
"""
# Note: MarketAnalysisAgent uses LangChain directly
self.name = "MarketAnalysisAgent"
self.description = "Market opportunity analysis and assessment"
self.llm_client = llm_client
self.memory_agent = memory_agent
# Use analysis complexity for market research
self.llm = llm_client.get_llm('analysis') # mistral:latest
# Create analysis chain
self.analysis_chain = self._create_analysis_chain()
logger.info("Initialized MarketAnalysisAgent")
def _create_analysis_chain(self):
"""Create chain for market analysis"""
prompt = ChatPromptTemplate.from_messages([
("system", "You are an expert market analyst specializing in technology commercialization."),
("human", """
Analyze market opportunities for this technology:
Title: {title}
Technical Domains: {technical_domains}
Key Innovations: {key_innovations}
TRL: {trl_level}
Potential Applications: {potential_applications}
Provide comprehensive market analysis in JSON format:
{{
"opportunities": [
{{
"sector": "Healthcare/Pharmaceuticals",
"sector_description": "Brief description",
"market_size_usd": 50000000000,
"growth_rate_percent": 8.5,
"technology_fit": "Excellent/Good/Fair",
"market_gap": "Specific gap this fills",
"competitive_advantage": "Key advantages",
"geographic_focus": ["EU", "Canada", "US"],
"time_to_market_months": 18,
"risk_level": "Low/Medium/High",
"priority_score": 0.9
}}
],
"total_addressable_market_usd": 150000000000,
"market_readiness": "Ready/Emerging/Early",
"competitive_landscape": "Assessment of competition",
"regulatory_considerations": ["FDA approval required", "GDPR compliance"],
"recommended_focus": "Primary market focus",
"strategic_positioning": "How to position technology",
"go_to_market_strategy": "Recommended strategy",
"confidence_score": 0.85,
"research_depth": 5
}}
Requirements:
1. Identify 3-5 relevant market opportunities
2. Prioritize EU and Canada markets (VISTA focus)
3. Assess realistic market sizes and growth
4. Consider regulatory environment
5. Evaluate time-to-market for each opportunity
6. Rank by priority_score (0-1)
Return ONLY valid JSON.
""")
])
parser = JsonOutputParser()
return prompt | self.llm | parser
async def analyze_market(self, patent_analysis: PatentAnalysis) -> MarketAnalysis:
"""
Analyze market opportunities based on patent analysis.
Args:
patent_analysis: Output from DocumentAnalysisAgent
Returns:
MarketAnalysis with identified opportunities
"""
logger.info(f"📊 Analyzing market for: {patent_analysis.title}")
# Retrieve relevant market data from memory if available
context = None
if self.memory_agent:
try:
context = await self.memory_agent.retrieve_relevant_context(
query=f"market analysis {' '.join(patent_analysis.technical_domains)}",
context_type="semantic",
top_k=3
)
if context:
logger.debug(f"Retrieved {len(context)} market context documents")
except Exception as e:
logger.warning(f"Memory retrieval failed: {e}")
# Optionally perform web searches for market data
# (Would use web_search_tool here in production)
# Execute market analysis
logger.info("Generating market opportunity analysis...")
result = await self.analysis_chain.ainvoke({
"title": patent_analysis.title,
"technical_domains": ", ".join(patent_analysis.technical_domains),
"key_innovations": ", ".join(patent_analysis.key_innovations),
"trl_level": patent_analysis.trl_level,
"potential_applications": ", ".join(patent_analysis.potential_applications)
})
# Build MarketAnalysis object
analysis = self._build_market_analysis(result)
logger.success(f"✅ Market analysis complete: {len(analysis.opportunities)} opportunities identified")
return analysis
def _build_market_analysis(self, result: dict) -> MarketAnalysis:
"""
Build MarketAnalysis object from LLM output.
Args:
result: JSON output from analysis chain
Returns:
Complete MarketAnalysis object
"""
# Convert opportunities to MarketOpportunity objects
# Set market_size_usd to None for demo (early stage)
opportunities = []
for opp in result.get('opportunities', []):
opp_dict = dict(opp)
opp_dict['market_size_usd'] = None # Placeholder for demo
opportunities.append(MarketOpportunity(**opp_dict))
# Sort by priority score
opportunities.sort(key=lambda x: x.priority_score, reverse=True)
# Extract top sectors
top_sectors = [opp.sector for opp in opportunities[:3]]
return MarketAnalysis(
opportunities=opportunities,
top_sectors=top_sectors,
total_addressable_market_usd=None, # Set to None for demo (early stage)
market_readiness=result.get('market_readiness', 'Emerging'),
competitive_landscape=result.get('competitive_landscape', 'Competitive analysis unavailable'),
regulatory_considerations=result.get('regulatory_considerations', []),
recommended_focus=result.get('recommended_focus', top_sectors[0] if top_sectors else 'General Market'),
strategic_positioning=result.get('strategic_positioning', 'Position as innovative solution'),
go_to_market_strategy=result.get('go_to_market_strategy', 'Partner with established players'),
confidence_score=result.get('confidence_score', 0.8),
research_depth=result.get('research_depth', 3)
)
async def process_task(self, task: Task) -> Task:
"""
Process task using agent interface.
Args:
task: Task with patent_analysis in metadata
Returns:
Task with MarketAnalysis result
"""
task.status = "in_progress"
try:
patent_analysis_dict = task.metadata.get('patent_analysis')
if not patent_analysis_dict:
raise ValueError("patent_analysis required in task metadata")
# Convert dict to PatentAnalysis
patent_analysis = PatentAnalysis(**patent_analysis_dict)
analysis = await self.analyze_market(patent_analysis)
task.result = analysis.model_dump()
task.status = "completed"
except Exception as e:
logger.error(f"Market analysis failed: {e}")
task.status = "failed"
task.error = str(e)
return task