import logging from datetime import datetime from firecrawl import FirecrawlApp import os import time import google.generativeai as genai import requests # Import requests for making API calls from googlesearch import search # Add this import at the top import json # Initialize logging logging.basicConfig(level=logging.DEBUG) # Initialize Firecrawl FIRECRAWL_API_KEY = "fc-5fadfeae30314d4ea8a3d9afaa75c493" firecrawl_app = FirecrawlApp(api_key=FIRECRAWL_API_KEY) logging.info("Firecrawl initialized") # Initialize Gemini GOOGLE_API_KEY = os.getenv('GOOGLE_API_KEY', '') if GOOGLE_API_KEY: genai.configure(api_key=GOOGLE_API_KEY) model = genai.GenerativeModel('gemini-1.5-flash') logging.info("Gemini initialized") else: logging.warning("No Gemini API key found") def perform_search(query, use_custom_api=True): """ Perform search with fallback mechanism First tries Custom Search API, then falls back to googlesearch package """ try: if use_custom_api: # Try Custom Search API first api_key = "AIzaSyAxeLlJ6vZxOl-TblUJg_dInBS3vNxaFVY" search_engine_id = "37793b12975da4e35" url = f"https://www.googleapis.com/customsearch/v1?key={api_key}&cx={search_engine_id}&q={query}&num=2" response = requests.get(url) if response.status_code == 200: search_results = response.json().get('items', []) if search_results: return [item['link'] for item in search_results] logging.warning("Custom Search API failed, falling back to googlesearch") # Fallback to googlesearch package logging.info("Using googlesearch package") return list(search(query, num_results=2, lang="en")) except Exception as e: logging.error(f"Search error: {str(e)}") return [] def scrape_with_retry(url, max_retries=3, timeout=15): """Helper function to scrape URL with retry logic and improved timeout handling""" # List of problematic domains that often timeout problematic_domains = [ 'sparktoro.com', 'j-jdis.com', 'linkedin.com', 'facebook.com', 'twitter.com', 'reddit.com', '.pdf' ] # Skip problematic URLs immediately if any(domain in url.lower() for domain in problematic_domains): logging.info(f"Skipping known problematic URL: {url}") return None for attempt in range(max_retries): try: # Use shorter timeout for initial attempts current_timeout = timeout * (attempt + 1) # Increase timeout with each retry logging.info(f"Attempting to scrape {url} (timeout: {current_timeout}s)") # Add timeout and rate limiting parameters response = firecrawl_app.scrape_url( url=url, params={ 'formats': ['markdown'], 'timeout': current_timeout, 'wait': True, # Enable rate limiting 'max_retries': 2 # Internal retries } ) if response and response.get('markdown'): content = response.get('markdown') if len(content.strip()) > 200: # Verify content quality logging.info(f"Successfully scraped {url}") return content else: logging.warning(f"Content too short from {url}") return None except Exception as e: error_msg = str(e).lower() wait_time = (attempt + 1) * 5 # Reduced wait times if "timeout" in error_msg or "408" in error_msg: if attempt < max_retries - 1: logging.warning(f"Timeout error for {url}, attempt {attempt + 1}") logging.info(f"Waiting {wait_time}s before retry...") time.sleep(wait_time) continue else: logging.error(f"Final timeout for {url} after {max_retries} attempts") break elif "429" in error_msg: # Rate limit logging.info(f"Rate limit hit, waiting {wait_time}s...") time.sleep(wait_time) continue else: logging.error(f"Error scraping {url}: {error_msg}") break time.sleep(1) # Reduced basic delay return None def get_trends_data(query): """Get market trends data with improved error handling""" try: if not query: logging.error("No query provided") return generate_fallback_response("Unknown Business") logging.info(f"\n{'='*50}\nGathering trends data for: {query}\n{'='*50}") # Define search queries search_queries = [ # Market Overview f"{query} market size revenue statistics analysis", # Industry Trends f"{query} industry trends growth forecast analysis", # Competition Analysis f"{query} market share competitive landscape analysis", # Technology & Innovation f"{query} technology innovation disruption analysis", # Future Outlook f"{query} market future outlook predictions analysis" ] scraped_content = [] use_custom_api = True successful_scrapes = 0 min_required_content = 2 max_attempts_per_url = 2 for search_query in search_queries: if successful_scrapes >= min_required_content: break try: logging.info(f"\nSearching for: {search_query}") search_results = perform_search(search_query, use_custom_api) if not search_results and use_custom_api: use_custom_api = False search_results = perform_search(search_query, use_custom_api=False) if search_results: attempts = 0 for url in search_results: if successful_scrapes >= min_required_content or attempts >= max_attempts_per_url: break content = scrape_with_retry(url, timeout=15) # Reduced initial timeout if content: scraped_content.append({ 'url': url, 'domain': extract_domain(url), 'section': 'Market Trends', 'date': datetime.now().strftime("%Y-%m-%d"), 'content': content[:2000] }) successful_scrapes += 1 attempts += 1 time.sleep(1) # Reduced delay between queries except Exception as e: logging.error(f"Error in search for query '{search_query}': {str(e)}") continue if not scraped_content: logging.warning("No content scraped, returning fallback response") return generate_fallback_response(query) try: result = process_scraped_content(scraped_content, query) # Save analysis to file timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_file = os.path.join('gemini_outputs', f'market_trends_{timestamp}.txt') with open(output_file, 'w', encoding='utf-8') as f: f.write(f"Market Trends Analysis for: {query}\n") f.write(f"Generated on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") f.write("="*50 + "\n\n") f.write(json.dumps(result, indent=2)) f.write("\n\nData Sources:\n") for source in scraped_content: f.write(f"- {source['domain']} ({source['date']})\n") return result except Exception as e: logging.error(f"Error processing content: {str(e)}") return generate_fallback_response(query) except Exception as e: logging.error(f"Error during market trends analysis: {str(e)}") return generate_fallback_response(query) def process_scraped_content(scraped_content, query): try: # Generate analysis using the scraped content analysis = generate_analysis(scraped_content, query) # Structure the response result = { "market_size_growth": { "total_market_value": extract_bullet_points(analysis, "Market Size"), "market_segments": extract_bullet_points(analysis, "Market Segments"), "regional_distribution": extract_bullet_points(analysis, "Regional Distribution") }, "competitive_landscape": { "market_leaders": extract_bullet_points(analysis, "Market Leaders"), "market_differentiators": extract_bullet_points(analysis, "Market Differentiators"), "industry_dynamics": extract_bullet_points(analysis, "Industry Dynamics") }, "consumer_analysis": { "segments": extract_bullet_points(analysis, "Consumer Segments"), "behavior_patterns": extract_bullet_points(analysis, "Behavior Patterns"), "pain_points": extract_bullet_points(analysis, "Pain Points") }, "metrics": extract_metrics(scraped_content), "sources": [{ 'url': item['url'], 'domain': item['domain'], 'section': item['section'], 'date': item['date'] } for item in scraped_content] } return result except Exception as e: logging.error(f"Error processing scraped content: {str(e)}") return generate_fallback_response(query) def extract_domain(url): """Extract domain name from URL""" try: from urllib.parse import urlparse domain = urlparse(url).netloc return domain.replace('www.', '') except: return url def generate_fallback_response(query): """Generate fallback response when analysis fails""" return { "market_size_growth": { "total_market_value": [f"Market size analysis for {query} pending (Inferred)"], "market_segments": ["Market segmentation analysis needed (Inferred)"], "regional_distribution": ["Regional analysis to be conducted (Inferred)"] }, "competitive_landscape": { "market_leaders": ["Market leader analysis pending (Inferred)"], "market_differentiators": ["Differentiator analysis needed (Inferred)"], "industry_dynamics": ["Industry dynamics to be evaluated (Inferred)"] }, "consumer_analysis": { "segments": ["Consumer segmentation pending (Inferred)"], "behavior_patterns": ["Behavior analysis needed (Inferred)"], "pain_points": ["Pain point identification required (Inferred)"] }, "metrics": {}, "sources": [] } def process_analysis(analysis, scraped_content): """Process and structure the analysis for frontend consumption""" result = { "market_size_growth": { "total_market_value": [], "market_segments": [], "regional_distribution": [], "growth_drivers": [] }, "competitive_landscape": { "market_leaders": [], "market_differentiators": [], "industry_dynamics": [], "entry_barriers": [] }, "consumer_analysis": { "segments": [], "behavior_patterns": [], "pain_points": [], "decision_factors": [] }, "technology_innovation": { "current_trends": [], "emerging_tech": [], "digital_impact": [], "innovation_opportunities": [] }, "regulatory_environment": { "key_regulations": [], "compliance_requirements": [], "environmental_impact": [], "sustainability": [] }, "future_outlook": { "growth_forecast": [], "opportunities": [], "challenges": [], "evolution_scenarios": [] }, "strategic_recommendations": { "entry_strategies": [], "product_development": [], "tech_investments": [], "risk_mitigation": [] }, "metrics": extract_metrics(scraped_content), "sources": [] } # Extract sections using more specific patterns for section in result.keys(): if section != "metrics" and section != "sources": for subsection in result[section].keys(): result[section][subsection] = extract_bullet_points(analysis, subsection.replace('_', ' ').title()) return result def extract_metrics(scraped_content): """Extract and structure metrics from scraped content""" metrics = { "market_share": {}, "growth_rates": {}, "revenue": {} } for item in scraped_content: if 'metrics' in item: # Process market share for i, share in enumerate(item['metrics'].get('market_share', [])): try: value = float(share) metrics['market_share'][f'Company {i+1}'] = value except ValueError: continue # Process growth rates for i, rate in enumerate(item['metrics'].get('growth_rates', [])): try: value = float(rate) metrics['growth_rates'][f'Period {i+1}'] = value except ValueError: continue # Process revenue figures for i, amount in enumerate(item['metrics'].get('money', [])): try: value = float(amount) metrics['revenue'][f'Entity {i+1}'] = value except ValueError: continue return metrics def extract_bullet_points(text, section_name): """Extract bullet points from a specific section""" try: lines = [] in_section = False for line in text.split('\n'): if section_name in line: in_section = True continue elif any(s in line for s in [ "Market Size", "Market Segments", "Regional Distribution", "Market Leaders", "Market Differentiators", "Industry Dynamics", "Consumer Segments", "Behavior Patterns", "Pain Points", "Current Trends", "Emerging Technologies", "Growth Forecast", "Opportunities", "Challenges" ]): in_section = False elif in_section and line.strip().startswith('•'): cleaned_line = line.strip('• ').strip() if cleaned_line and not cleaned_line.endswith(':'): lines.append(cleaned_line) return lines if lines else [f"Analysis for {section_name} pending (Inferred)"] except Exception as e: logging.error(f"Error extracting bullet points for {section_name}: {str(e)}") return [f"Error extracting {section_name} data (Inferred)"] def generate_analysis(scraped_content, query): """Generate market trends analysis using Gemini""" try: # Prepare content for analysis content_text = "\n\n".join([item['content'] for item in scraped_content]) # Create the analysis prompt analysis_prompt = f""" Task: Analyze the provided content to create a detailed market trends analysis for {query}. Content to analyze: {content_text} Please provide a structured analysis covering these exact sections: Market Size & Growth: Market Size: • [Provide market size estimates with specific numbers where available] • [Include year-over-year growth rates] Market Segments: • [Identify key market segments] • [Provide segment-wise breakdown] Regional Distribution: • [Analyze geographical distribution] • [Identify key markets and growth regions] Competitive Landscape: Market Leaders: • [List top companies and their market positions] • [Include market share data where available] Market Differentiators: • [Identify key competitive advantages] • [Analyze unique selling propositions] Industry Dynamics: • [Analyze industry trends and changes] • [Identify market drivers and challenges] Consumer Analysis: Consumer Segments: • [Identify key customer segments] • [Analyze segment characteristics] Behavior Patterns: • [Analyze purchasing patterns] • [Identify decision factors] Pain Points: • [List key customer challenges] • [Identify unmet needs] Technology & Innovation: Current Trends: • [Identify current technology trends] • [Analyze adoption rates] Emerging Technologies: • [List emerging technologies] • [Assess potential impact] Future Outlook: Growth Forecast: • [Provide growth projections] • [Identify growth drivers] Opportunities: • [List market opportunities] • [Identify potential areas for expansion] Challenges: • [Identify market challenges] • [List potential risks] Format each point with specific data where available. Mark inferences with (Inferred). Prioritize insights based on confidence and impact. """ # Generate analysis using Gemini response = model.generate_content(analysis_prompt) if not response or not response.text: raise Exception("No response from Gemini") analysis = response.text # Save raw analysis to file timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") raw_output_file = os.path.join('gemini_outputs', f'market_trends_raw_{timestamp}.txt') with open(raw_output_file, 'w', encoding='utf-8') as f: f.write(f"Raw Market Trends Analysis for: {query}\n") f.write(f"Generated on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") f.write("="*50 + "\n\n") f.write("Input Content:\n") f.write("-"*30 + "\n") f.write(content_text[:1000] + "...\n\n") f.write("Generated Analysis:\n") f.write("-"*30 + "\n") f.write(analysis) return analysis except Exception as e: logging.error(f"Error generating analysis: {str(e)}") raise