""" MarketAnalysisAgent for Patent Wake-Up Scenario Analyzes market opportunities for patented technologies: - Identifies relevant industry sectors - Assesses market size and growth potential - Analyzes competitive landscape - Recommends geographic focus (EU, Canada) """ from typing import Optional from loguru import logger from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers import JsonOutputParser from ..base_agent import BaseAgent, Task from ...llm.langchain_ollama_client import LangChainOllamaClient from ...workflow.langgraph_state import PatentAnalysis, MarketAnalysis, MarketOpportunity class MarketAnalysisAgent(BaseAgent): """ Specialized agent for market opportunity analysis. Uses research tools and LLM reasoning to assess commercialization potential. """ def __init__(self, llm_client: LangChainOllamaClient, memory_agent=None): """ Initialize MarketAnalysisAgent. Args: llm_client: LangChain Ollama client memory_agent: Optional memory agent for context retrieval """ # Note: MarketAnalysisAgent uses LangChain directly self.name = "MarketAnalysisAgent" self.description = "Market opportunity analysis and assessment" self.llm_client = llm_client self.memory_agent = memory_agent # Use analysis complexity for market research self.llm = llm_client.get_llm('analysis') # mistral:latest # Create analysis chain self.analysis_chain = self._create_analysis_chain() logger.info("Initialized MarketAnalysisAgent") def _create_analysis_chain(self): """Create chain for market analysis""" prompt = ChatPromptTemplate.from_messages([ ("system", "You are an expert market analyst specializing in technology commercialization."), ("human", """ Analyze market opportunities for this technology: Title: {title} Technical Domains: {technical_domains} Key Innovations: {key_innovations} TRL: {trl_level} Potential Applications: {potential_applications} Provide comprehensive market analysis in JSON format: {{ "opportunities": [ {{ "sector": "Healthcare/Pharmaceuticals", "sector_description": "Brief description", "market_size_usd": 50000000000, "growth_rate_percent": 8.5, "technology_fit": "Excellent/Good/Fair", "market_gap": "Specific gap this fills", "competitive_advantage": "Key advantages", "geographic_focus": ["EU", "Canada", "US"], "time_to_market_months": 18, "risk_level": "Low/Medium/High", "priority_score": 0.9 }} ], "total_addressable_market_usd": 150000000000, "market_readiness": "Ready/Emerging/Early", "competitive_landscape": "Assessment of competition", "regulatory_considerations": ["FDA approval required", "GDPR compliance"], "recommended_focus": "Primary market focus", "strategic_positioning": "How to position technology", "go_to_market_strategy": "Recommended strategy", "confidence_score": 0.85, "research_depth": 5 }} Requirements: 1. Identify 3-5 relevant market opportunities 2. Prioritize EU and Canada markets (VISTA focus) 3. Assess realistic market sizes and growth 4. Consider regulatory environment 5. Evaluate time-to-market for each opportunity 6. Rank by priority_score (0-1) Return ONLY valid JSON. """) ]) parser = JsonOutputParser() return prompt | self.llm | parser async def analyze_market(self, patent_analysis: PatentAnalysis) -> MarketAnalysis: """ Analyze market opportunities based on patent analysis. Args: patent_analysis: Output from DocumentAnalysisAgent Returns: MarketAnalysis with identified opportunities """ logger.info(f"📊 Analyzing market for: {patent_analysis.title}") # Retrieve relevant market data from memory if available context = None if self.memory_agent: try: context = await self.memory_agent.retrieve_relevant_context( query=f"market analysis {' '.join(patent_analysis.technical_domains)}", context_type="semantic", top_k=3 ) if context: logger.debug(f"Retrieved {len(context)} market context documents") except Exception as e: logger.warning(f"Memory retrieval failed: {e}") # Optionally perform web searches for market data # (Would use web_search_tool here in production) # Execute market analysis logger.info("Generating market opportunity analysis...") result = await self.analysis_chain.ainvoke({ "title": patent_analysis.title, "technical_domains": ", ".join(patent_analysis.technical_domains), "key_innovations": ", ".join(patent_analysis.key_innovations), "trl_level": patent_analysis.trl_level, "potential_applications": ", ".join(patent_analysis.potential_applications) }) # Build MarketAnalysis object analysis = self._build_market_analysis(result) logger.success(f"✅ Market analysis complete: {len(analysis.opportunities)} opportunities identified") return analysis def _build_market_analysis(self, result: dict) -> MarketAnalysis: """ Build MarketAnalysis object from LLM output. Args: result: JSON output from analysis chain Returns: Complete MarketAnalysis object """ # Convert opportunities to MarketOpportunity objects # Set market_size_usd to None for demo (early stage) opportunities = [] for opp in result.get('opportunities', []): opp_dict = dict(opp) opp_dict['market_size_usd'] = None # Placeholder for demo opportunities.append(MarketOpportunity(**opp_dict)) # Sort by priority score opportunities.sort(key=lambda x: x.priority_score, reverse=True) # Extract top sectors top_sectors = [opp.sector for opp in opportunities[:3]] return MarketAnalysis( opportunities=opportunities, top_sectors=top_sectors, total_addressable_market_usd=None, # Set to None for demo (early stage) market_readiness=result.get('market_readiness', 'Emerging'), competitive_landscape=result.get('competitive_landscape', 'Competitive analysis unavailable'), regulatory_considerations=result.get('regulatory_considerations', []), recommended_focus=result.get('recommended_focus', top_sectors[0] if top_sectors else 'General Market'), strategic_positioning=result.get('strategic_positioning', 'Position as innovative solution'), go_to_market_strategy=result.get('go_to_market_strategy', 'Partner with established players'), confidence_score=result.get('confidence_score', 0.8), research_depth=result.get('research_depth', 3) ) async def process_task(self, task: Task) -> Task: """ Process task using agent interface. Args: task: Task with patent_analysis in metadata Returns: Task with MarketAnalysis result """ task.status = "in_progress" try: patent_analysis_dict = task.metadata.get('patent_analysis') if not patent_analysis_dict: raise ValueError("patent_analysis required in task metadata") # Convert dict to PatentAnalysis patent_analysis = PatentAnalysis(**patent_analysis_dict) analysis = await self.analyze_market(patent_analysis) task.result = analysis.model_dump() task.status = "completed" except Exception as e: logger.error(f"Market analysis failed: {e}") task.status = "failed" task.error = str(e) return task