PrimoGreedy-Agent / src /agents /news_intelligence_agent.py
CiscsoPonce's picture
feat: LangGraph best-practices refactor (Sprint 8 Epics 2-3-4)
d5ba3a3
import os
import random
from typing import Dict, Any, List, Optional
from datetime import datetime
from ..workflows.state import AgentState
from ..tools.finnhub_tool import get_company_news
from ..tools.firecrawl_tool import scrape_url
from ..config.model_factory import ModelFactory
from ..config import config
from ..prompts import (
get_news_analysis_template,
get_news_output_parser,
get_news_significance_assessment_template,
get_article_summarization_template,
format_news_data,
format_significant_news_data,
clean_input_string
)
def format_date_for_display(date_str: str) -> str:
"""Format date string from YYYYMMDDHHMMSS to YYYY-MM-DD HH:MM:SS for better readability."""
try:
if len(date_str) == 14 and date_str.isdigit():
dt = datetime.strptime(date_str, '%Y%m%d%H%M%S')
return dt.strftime('%Y-%m-%d %H:%M:%S')
return date_str # Return original if parsing fails
except ValueError:
return date_str # Return original if parsing fails
def sample_random_news(news_items: List[Dict], max_count: int = 5) -> List[Dict]:
"""Sample random news items, filtering out promotional content."""
if not news_items:
return []
# Filter out promotional content
filtered = [
item for item in news_items
if not item.get('summary', '').startswith("Looking for stock market analysis")
]
return random.sample(filtered, min(max_count, len(filtered)))
async def assess_significance(
headline: str,
summary: str,
date: str,
symbol: str,
company_info: Optional[Dict[str, Any]] = None
) -> float:
"""Assess news significance using AI with company context."""
try:
api_key = config.get_api_key('openai')
if not api_key:
return 0.0
os.environ['OPENAI_API_KEY'] = api_key
llm = ModelFactory.get_assess_significance_model()
prompt_template = get_news_significance_assessment_template()
# Extract company information for context
if company_info:
company_name = company_info.get('name', company_info.get('longName', symbol))
industry = company_info.get('industry', company_info.get('sector', 'Unknown Industry'))
market_cap = company_info.get('marketCap', company_info.get('market_cap', 'N/A'))
business_description = company_info.get('longBusinessSummary', company_info.get('description', 'No description available'))
# Format market cap if it's a number
if isinstance(market_cap, (int, float)) and market_cap > 0:
market_cap = f"${market_cap:,.0f}"
elif market_cap == 'N/A' or not market_cap:
market_cap = "N/A"
else:
company_name = symbol
industry = "Unknown Industry"
market_cap = "N/A"
business_description = "No company information available"
response = await (prompt_template | llm).ainvoke({
'headline': headline,
'summary': summary,
'date': format_date_for_display(date),
'symbol': symbol,
'company_name': company_name,
'industry': industry,
'market_cap': market_cap,
'business_description': business_description
})
significance_text = str(response.content).strip()
return max(0.0, min(1.0, float(significance_text)))
except Exception as e:
print(f"Significance assessment failed: {e}")
return 0.0
async def scrape_article_content(news_item: Dict) -> Optional[str]:
"""Scrape full article content using Firecrawl."""
try:
url = news_item.get('url')
if not url:
return None
result = await scrape_url(url)
if result.success and result.data:
return result.data.get('content', '')
return None
except Exception as e:
print(f"Article scraping failed: {e}")
return None
async def create_enhanced_summary(title: str, original_summary: str, full_content: str, symbol: str) -> Optional[str]:
"""Create enhanced summary from scraped content."""
try:
api_key = config.get_api_key('openai')
if not api_key:
return None
os.environ['OPENAI_API_KEY'] = api_key
llm = ModelFactory.get_enhanced_summary_model()
prompt_template = get_article_summarization_template()
response = await (prompt_template | llm).ainvoke({
'title': title,
'source': 'News',
'original_summary': original_summary,
'full_content': full_content[:8000],
'symbol': symbol
})
return str(response.content).strip()
except Exception as e:
print(f"Enhanced summary failed: {e}")
return None
async def analyze_news(
symbol: str,
analysis_date: Optional[str] = None,
technical_data: Optional[Dict[str, Any]] = None,
company_data: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Complete news analysis with PrimoGPT workflow:
1. Get news -> 2. Sample random articles -> 3. Assess significance
4. Scrape significant articles -> 5. Generate NLP features
"""
try:
symbol = symbol.upper()
# Extract company information from data_collection_results
company_info = None
if company_data:
company_info = company_data.get('company_info') or company_data.get('company_profile')
# 1. Get news data for analysis
news_result = await get_company_news(symbol, analysis_date)
# Extract news data
news_data = []
if news_result and news_result.success and news_result.data:
if isinstance(news_result.data, dict):
news_data = news_result.data.get('news', [])
elif isinstance(news_result.data, list):
news_data = news_result.data
if not news_data:
return {
'symbol': symbol,
'success': False,
'error': 'No news data available'
}
# 2. Sample random news items per config
sampled_news = sample_random_news(news_data, max_count=config.news_sample_count)
# 3. Process each news item for significance with company context
processed_news = []
significant_news = []
for news_item in sampled_news:
# Assess significance with company context
significance = await assess_significance(
news_item.get('headline', ''),
news_item.get('summary', ''),
news_item.get('date', ''),
symbol,
company_info
)
news_item['significance_score'] = significance
# 4. If significant (threshold per config), scrape and enhance
if significance >= config.news_significance_threshold:
full_content = await scrape_article_content(news_item)
if full_content:
enhanced_summary = await create_enhanced_summary(
news_item.get('headline', ''),
news_item.get('summary', ''),
full_content,
symbol
)
if enhanced_summary:
news_item['enhanced_summary'] = enhanced_summary
significant_news.append(news_item)
processed_news.append(news_item)
# 5. Generate NLP features
nlp_features = await extract_nlp_features(
symbol,
processed_news,
significant_news,
technical_data
)
if not nlp_features:
return {
'symbol': symbol,
'success': False,
'error': 'NLP feature extraction failed'
}
return {
'symbol': symbol,
'nlp_features': nlp_features,
'news_count': len(processed_news),
'significant_count': len(significant_news),
'success': True
}
except Exception as e:
print(f"Error in news analysis for {symbol}: {e}")
return {
'symbol': symbol,
'success': False,
'error': str(e)
}
async def extract_nlp_features(
symbol: str,
all_news: List[Dict],
significant_news: List[Dict],
technical_data: Optional[Dict[str, Any]] = None
) -> Optional[Dict[str, int]]:
"""Extract 7 NLP features using proper separation of regular vs significant news."""
try:
api_key = config.get_api_key('openai')
if not api_key:
print(f"No OpenAI API key available for {symbol}")
return None
os.environ['OPENAI_API_KEY'] = api_key
llm = ModelFactory.get_nlp_features_model()
# Prepare prompts and data
prompt_template = get_news_analysis_template()
output_parser = get_news_output_parser()
# Separate regular and significant news, filtering out low significance items
# Only include news with significance >= moderate_threshold in the analysis
moderate_news = [item for item in all_news if item not in significant_news and item.get('significance_score', 0.0) >= config.news_moderate_threshold]
# Format data (company_info removed - focusing only on news content)
formatted_regular_news = format_news_data(moderate_news)
formatted_significant_news = format_significant_news_data(significant_news)
# Clean strings
formatted_regular_news = clean_input_string(formatted_regular_news)
formatted_significant_news = clean_input_string(formatted_significant_news)
# Prepare prompt input
prompt_input = {
'symbol': symbol,
'news': formatted_regular_news,
'significant_news': formatted_significant_news,
'format_instructions': output_parser.get_format_instructions()
}
# Execute chain
chain = prompt_template | llm | output_parser
result = await chain.ainvoke(prompt_input)
# Validate and convert results
if isinstance(result, dict):
nlp_features = {}
required_features = [
'news_relevance', 'sentiment', 'price_impact_potential',
'trend_direction', 'earnings_impact', 'investor_confidence',
'risk_profile_change'
]
for feature in required_features:
value = result.get(feature)
if value is not None:
try:
int_value = int(value)
# Validate that value is in allowed range: -2, -1, 0, 1, 2
if int_value not in [-2, -1, 0, 1, 2]:
print(f"Invalid {feature} value: {int_value} (must be -2, -1, 0, 1, or 2)")
return None
nlp_features[feature] = int_value
except (ValueError, TypeError):
print(f"Invalid {feature} value: {value} (must be integer)")
return None
else:
print(f"Missing {feature} in result")
return None
return nlp_features
else:
print(f"Invalid result format: {type(result)}")
return None
except Exception as e:
print(f"Error extracting NLP features for {symbol}: {e}")
return None
async def news_intelligence_agent_node(state: AgentState) -> dict:
"""LangGraph node for news intelligence. Returns partial state updates."""
try:
symbol = state['symbols'][0] if state['symbols'] else 'AAPL'
analysis_date = state['analysis_date']
technical_data = state.get('technical_analysis_results')
company_data = state.get('data_collection_results')
result = await analyze_news(symbol, analysis_date, technical_data, company_data)
updates: dict = {
"news_intelligence_results": result,
"current_step": "news_intelligence_complete",
}
if not result['success']:
updates["error"] = result.get('error', 'News intelligence failed')
return updates
except Exception as e:
print(f"News intelligence node error: {e}")
return {"error": str(e), "current_step": "error"}