import logging from datetime import datetime from firecrawl import FirecrawlApp import google.generativeai as genai import os import time from googlesearch import search from flask import Flask, request, jsonify from flask_cors import CORS import requests import uuid # Import uuid for unique file naming # Initialize logging logging.basicConfig(level=logging.DEBUG) # Initialize Firecrawl and Gemini FIRECRAWL_API_KEY = "fc-b69d6504ab0a42b79e87b7827a538199" firecrawl_app = FirecrawlApp(api_key=FIRECRAWL_API_KEY) logging.info("Firecrawl initialized") GOOGLE_API_KEY = os.getenv('GOOGLE_API_KEY', '') if GOOGLE_API_KEY: genai.configure(api_key=GOOGLE_API_KEY) model = genai.GenerativeModel('gemini-1.5-flash') logging.info("Gemini initialized") def get_competitor_data(query): """ Get competitor data with improved rate limit handling """ logging.info(f"\n{'='*50}\nAnalyzing competitors for: {query}\n{'='*50}") result = { "main_competitors": [], "competitor_strengths": [], "key_findings": [], "sources": [] } def scrape_with_retry(url, section): """Helper function to scrape URL with retry logic""" max_retries = 3 for attempt in range(max_retries): try: response = firecrawl_app.scrape_url( url=url, params={'formats': ['markdown']} ) if response and response.get('markdown'): result["sources"].append({ 'url': url, 'domain': extract_domain(url), 'section': section, 'date': datetime.now().strftime("%Y-%m-%d") }) return response.get('markdown') except Exception as e: if "429" in str(e): wait_time = (attempt + 1) * 10 # Exponential backoff logging.info(f"Rate limit hit, waiting {wait_time} seconds...") time.sleep(wait_time) continue logging.error(f"Error scraping {url}: {str(e)}") time.sleep(2) # Basic delay between attempts return None def search_with_retry(search_query): """Helper function to perform search with retry logic""" max_retries = 3 for attempt in range(max_retries): try: return list(search( search_query, num_results=2, # Reduced number lang="en" )) except Exception as e: if "429" in str(e): wait_time = (attempt + 1) * 10 logging.info(f"Search rate limit hit, waiting {wait_time} seconds...") time.sleep(wait_time) continue logging.error(f"Search error: {str(e)}") break return [] # Create directory for output files output_dir = 'gemini_outputs' os.makedirs(output_dir, exist_ok=True) # Phase 1: Top Competitors logging.info("\nPhase 1: Getting Top Competitors") search_query = f"top competitors of {query} list" urls = search_with_retry(search_query) if urls: content = scrape_with_retry(urls[0], 'Top Competitors') if content: prompt = f""" Analyze this content and list the top 5 competitors of {query}. Format each competitor as: [Name] - [Brief description] Content: {content} """ try: competitors = model.generate_content(prompt).text result["main_competitors"] = extract_section(competitors, "") logging.info(f"Found competitors: {result['main_competitors']}") # Create output file for competitors with open(os.path.join(output_dir, 'compitoone.txt'), 'w') as f: f.write(competitors) except Exception as e: logging.error(f"Error in Gemini analysis: {str(e)}") time.sleep(5) # Delay between phases # Phase 2: Competitor Strengths logging.info("\nPhase 2: Getting Competitor Strengths") search_query = f"{query} competitors strengths advantages" urls = search_with_retry(search_query) if urls: content = scrape_with_retry(urls[0], 'Competitor Strengths') if content: prompt = f""" List the key strengths of {query}'s main competitors. Format as: [Competitor Name]: [Key strength] Content: {content} """ try: strengths = model.generate_content(prompt).text result["competitor_strengths"] = extract_section(strengths, "") logging.info(f"Found strengths: {result['competitor_strengths']}") # Create output file for strengths with open(os.path.join(output_dir, 'compitoone.txt'), 'a') as f: f.write(strengths) except Exception as e: logging.error(f"Error in Gemini analysis: {str(e)}") time.sleep(5) # Delay between phases # Phase 3: Key Findings logging.info("\nPhase 3: Getting Key Findings") search_query = f"{query} competitive landscape analysis" urls = search_with_retry(search_query) if urls: content = scrape_with_retry(urls[0], 'Key Findings') if content: prompt = f""" Provide 2-3 key insights about {query}'s competitive landscape. Format as numbered points. Content: {content} """ try: findings = model.generate_content(prompt).text result["key_findings"] = extract_section(findings, "") logging.info(f"Found key findings: {findings}") # Create output file for findings with open(os.path.join(output_dir, 'compitoone.txt'), 'a') as f: f.write(findings) except Exception as e: logging.error(f"Error in Gemini analysis: {str(e)}") # Return fallback if no data found if not any([result["main_competitors"], result["competitor_strengths"], result["key_findings"]]): return create_empty_response() return result def extract_section(text, section_name): """Extract content from a specific section""" try: # Find section start = text.find(section_name + ":") if start == -1: return [] # Find next section or end next_section = float('inf') for section in ["TOP COMPETITORS:", "COMPETITOR STRENGTHS:", "KEY FINDINGS:"]: if section != section_name + ":": pos = text.find(section, start + len(section_name)) if pos != -1: next_section = min(next_section, pos) # Extract content content = text[start + len(section_name) + 1:next_section if next_section != float('inf') else None] # Split into lines and clean lines = [line.strip() for line in content.strip().split('\n') if line.strip()] return lines except Exception as e: logging.error(f"Error extracting {section_name}: {str(e)}") return [] def extract_domain(url): """Extract domain from a URL""" try: return url.split('/')[2] except Exception as e: logging.error(f"Error extracting domain: {str(e)}") return "" def create_empty_response(): return { "main_competitors": [], "competitor_strengths": [], "key_findings": [], "sources": [] }