Spaces:
Sleeping
Sleeping
| import os | |
| import random | |
| from typing import Dict, Any, List, Optional | |
| from datetime import datetime | |
| from ..workflows.state import AgentState | |
| from ..tools.finnhub_tool import get_company_news | |
| from ..tools.firecrawl_tool import scrape_url | |
| from ..config.model_factory import ModelFactory | |
| from ..config import config | |
| from ..prompts import ( | |
| get_news_analysis_template, | |
| get_news_output_parser, | |
| get_news_significance_assessment_template, | |
| get_article_summarization_template, | |
| format_news_data, | |
| format_significant_news_data, | |
| clean_input_string | |
| ) | |
| def format_date_for_display(date_str: str) -> str: | |
| """Format date string from YYYYMMDDHHMMSS to YYYY-MM-DD HH:MM:SS for better readability.""" | |
| try: | |
| if len(date_str) == 14 and date_str.isdigit(): | |
| dt = datetime.strptime(date_str, '%Y%m%d%H%M%S') | |
| return dt.strftime('%Y-%m-%d %H:%M:%S') | |
| return date_str # Return original if parsing fails | |
| except ValueError: | |
| return date_str # Return original if parsing fails | |
| def sample_random_news(news_items: List[Dict], max_count: int = 5) -> List[Dict]: | |
| """Sample random news items, filtering out promotional content.""" | |
| if not news_items: | |
| return [] | |
| # Filter out promotional content | |
| filtered = [ | |
| item for item in news_items | |
| if not item.get('summary', '').startswith("Looking for stock market analysis") | |
| ] | |
| return random.sample(filtered, min(max_count, len(filtered))) | |
| async def assess_significance( | |
| headline: str, | |
| summary: str, | |
| date: str, | |
| symbol: str, | |
| company_info: Optional[Dict[str, Any]] = None | |
| ) -> float: | |
| """Assess news significance using AI with company context.""" | |
| try: | |
| api_key = config.get_api_key('openai') | |
| if not api_key: | |
| return 0.0 | |
| os.environ['OPENAI_API_KEY'] = api_key | |
| llm = ModelFactory.get_assess_significance_model() | |
| prompt_template = get_news_significance_assessment_template() | |
| # Extract company information for context | |
| if company_info: | |
| company_name = company_info.get('name', company_info.get('longName', symbol)) | |
| industry = company_info.get('industry', company_info.get('sector', 'Unknown Industry')) | |
| market_cap = company_info.get('marketCap', company_info.get('market_cap', 'N/A')) | |
| business_description = company_info.get('longBusinessSummary', company_info.get('description', 'No description available')) | |
| # Format market cap if it's a number | |
| if isinstance(market_cap, (int, float)) and market_cap > 0: | |
| market_cap = f"${market_cap:,.0f}" | |
| elif market_cap == 'N/A' or not market_cap: | |
| market_cap = "N/A" | |
| else: | |
| company_name = symbol | |
| industry = "Unknown Industry" | |
| market_cap = "N/A" | |
| business_description = "No company information available" | |
| response = await (prompt_template | llm).ainvoke({ | |
| 'headline': headline, | |
| 'summary': summary, | |
| 'date': format_date_for_display(date), | |
| 'symbol': symbol, | |
| 'company_name': company_name, | |
| 'industry': industry, | |
| 'market_cap': market_cap, | |
| 'business_description': business_description | |
| }) | |
| significance_text = str(response.content).strip() | |
| return max(0.0, min(1.0, float(significance_text))) | |
| except Exception as e: | |
| print(f"Significance assessment failed: {e}") | |
| return 0.0 | |
| async def scrape_article_content(news_item: Dict) -> Optional[str]: | |
| """Scrape full article content using Firecrawl.""" | |
| try: | |
| url = news_item.get('url') | |
| if not url: | |
| return None | |
| result = await scrape_url(url) | |
| if result.success and result.data: | |
| return result.data.get('content', '') | |
| return None | |
| except Exception as e: | |
| print(f"Article scraping failed: {e}") | |
| return None | |
| async def create_enhanced_summary(title: str, original_summary: str, full_content: str, symbol: str) -> Optional[str]: | |
| """Create enhanced summary from scraped content.""" | |
| try: | |
| api_key = config.get_api_key('openai') | |
| if not api_key: | |
| return None | |
| os.environ['OPENAI_API_KEY'] = api_key | |
| llm = ModelFactory.get_enhanced_summary_model() | |
| prompt_template = get_article_summarization_template() | |
| response = await (prompt_template | llm).ainvoke({ | |
| 'title': title, | |
| 'source': 'News', | |
| 'original_summary': original_summary, | |
| 'full_content': full_content[:8000], | |
| 'symbol': symbol | |
| }) | |
| return str(response.content).strip() | |
| except Exception as e: | |
| print(f"Enhanced summary failed: {e}") | |
| return None | |
| async def analyze_news( | |
| symbol: str, | |
| analysis_date: Optional[str] = None, | |
| technical_data: Optional[Dict[str, Any]] = None, | |
| company_data: Optional[Dict[str, Any]] = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| Complete news analysis with PrimoGPT workflow: | |
| 1. Get news -> 2. Sample random articles -> 3. Assess significance | |
| 4. Scrape significant articles -> 5. Generate NLP features | |
| """ | |
| try: | |
| symbol = symbol.upper() | |
| # Extract company information from data_collection_results | |
| company_info = None | |
| if company_data: | |
| company_info = company_data.get('company_info') or company_data.get('company_profile') | |
| # 1. Get news data for analysis | |
| news_result = await get_company_news(symbol, analysis_date) | |
| # Extract news data | |
| news_data = [] | |
| if news_result and news_result.success and news_result.data: | |
| if isinstance(news_result.data, dict): | |
| news_data = news_result.data.get('news', []) | |
| elif isinstance(news_result.data, list): | |
| news_data = news_result.data | |
| if not news_data: | |
| return { | |
| 'symbol': symbol, | |
| 'success': False, | |
| 'error': 'No news data available' | |
| } | |
| # 2. Sample random news items per config | |
| sampled_news = sample_random_news(news_data, max_count=config.news_sample_count) | |
| # 3. Process each news item for significance with company context | |
| processed_news = [] | |
| significant_news = [] | |
| for news_item in sampled_news: | |
| # Assess significance with company context | |
| significance = await assess_significance( | |
| news_item.get('headline', ''), | |
| news_item.get('summary', ''), | |
| news_item.get('date', ''), | |
| symbol, | |
| company_info | |
| ) | |
| news_item['significance_score'] = significance | |
| # 4. If significant (threshold per config), scrape and enhance | |
| if significance >= config.news_significance_threshold: | |
| full_content = await scrape_article_content(news_item) | |
| if full_content: | |
| enhanced_summary = await create_enhanced_summary( | |
| news_item.get('headline', ''), | |
| news_item.get('summary', ''), | |
| full_content, | |
| symbol | |
| ) | |
| if enhanced_summary: | |
| news_item['enhanced_summary'] = enhanced_summary | |
| significant_news.append(news_item) | |
| processed_news.append(news_item) | |
| # 5. Generate NLP features | |
| nlp_features = await extract_nlp_features( | |
| symbol, | |
| processed_news, | |
| significant_news, | |
| technical_data | |
| ) | |
| if not nlp_features: | |
| return { | |
| 'symbol': symbol, | |
| 'success': False, | |
| 'error': 'NLP feature extraction failed' | |
| } | |
| return { | |
| 'symbol': symbol, | |
| 'nlp_features': nlp_features, | |
| 'news_count': len(processed_news), | |
| 'significant_count': len(significant_news), | |
| 'success': True | |
| } | |
| except Exception as e: | |
| print(f"Error in news analysis for {symbol}: {e}") | |
| return { | |
| 'symbol': symbol, | |
| 'success': False, | |
| 'error': str(e) | |
| } | |
| async def extract_nlp_features( | |
| symbol: str, | |
| all_news: List[Dict], | |
| significant_news: List[Dict], | |
| technical_data: Optional[Dict[str, Any]] = None | |
| ) -> Optional[Dict[str, int]]: | |
| """Extract 7 NLP features using proper separation of regular vs significant news.""" | |
| try: | |
| api_key = config.get_api_key('openai') | |
| if not api_key: | |
| print(f"No OpenAI API key available for {symbol}") | |
| return None | |
| os.environ['OPENAI_API_KEY'] = api_key | |
| llm = ModelFactory.get_nlp_features_model() | |
| # Prepare prompts and data | |
| prompt_template = get_news_analysis_template() | |
| output_parser = get_news_output_parser() | |
| # Separate regular and significant news, filtering out low significance items | |
| # Only include news with significance >= moderate_threshold in the analysis | |
| moderate_news = [item for item in all_news if item not in significant_news and item.get('significance_score', 0.0) >= config.news_moderate_threshold] | |
| # Format data (company_info removed - focusing only on news content) | |
| formatted_regular_news = format_news_data(moderate_news) | |
| formatted_significant_news = format_significant_news_data(significant_news) | |
| # Clean strings | |
| formatted_regular_news = clean_input_string(formatted_regular_news) | |
| formatted_significant_news = clean_input_string(formatted_significant_news) | |
| # Prepare prompt input | |
| prompt_input = { | |
| 'symbol': symbol, | |
| 'news': formatted_regular_news, | |
| 'significant_news': formatted_significant_news, | |
| 'format_instructions': output_parser.get_format_instructions() | |
| } | |
| # Execute chain | |
| chain = prompt_template | llm | output_parser | |
| result = await chain.ainvoke(prompt_input) | |
| # Validate and convert results | |
| if isinstance(result, dict): | |
| nlp_features = {} | |
| required_features = [ | |
| 'news_relevance', 'sentiment', 'price_impact_potential', | |
| 'trend_direction', 'earnings_impact', 'investor_confidence', | |
| 'risk_profile_change' | |
| ] | |
| for feature in required_features: | |
| value = result.get(feature) | |
| if value is not None: | |
| try: | |
| int_value = int(value) | |
| # Validate that value is in allowed range: -2, -1, 0, 1, 2 | |
| if int_value not in [-2, -1, 0, 1, 2]: | |
| print(f"Invalid {feature} value: {int_value} (must be -2, -1, 0, 1, or 2)") | |
| return None | |
| nlp_features[feature] = int_value | |
| except (ValueError, TypeError): | |
| print(f"Invalid {feature} value: {value} (must be integer)") | |
| return None | |
| else: | |
| print(f"Missing {feature} in result") | |
| return None | |
| return nlp_features | |
| else: | |
| print(f"Invalid result format: {type(result)}") | |
| return None | |
| except Exception as e: | |
| print(f"Error extracting NLP features for {symbol}: {e}") | |
| return None | |
| async def news_intelligence_agent_node(state: AgentState) -> dict: | |
| """LangGraph node for news intelligence. Returns partial state updates.""" | |
| try: | |
| symbol = state['symbols'][0] if state['symbols'] else 'AAPL' | |
| analysis_date = state['analysis_date'] | |
| technical_data = state.get('technical_analysis_results') | |
| company_data = state.get('data_collection_results') | |
| result = await analyze_news(symbol, analysis_date, technical_data, company_data) | |
| updates: dict = { | |
| "news_intelligence_results": result, | |
| "current_step": "news_intelligence_complete", | |
| } | |
| if not result['success']: | |
| updates["error"] = result.get('error', 'News intelligence failed') | |
| return updates | |
| except Exception as e: | |
| print(f"News intelligence node error: {e}") | |
| return {"error": str(e), "current_step": "error"} |