""" Competitive Analysis Agent - All-in-One Application Complete system with analysis logic and Gradio web interface. No separate MCP server needed - everything runs in one process. """ import re import time from collections import Counter import requests from bs4 import BeautifulSoup from duckduckgo_search import DDGS from openai import OpenAI import gradio as gr # ============================================================================ # ANALYSIS TOOLS (Consolidated from mcp_server.py) # ============================================================================ def web_search_tool(query: str, max_results: int = 5) -> str: """Perform web search using DuckDuckGo""" try: with DDGS() as ddgs: results = list(ddgs.text(query, max_results=max_results)) formatted_results = [] for result in results: formatted_results.append(f"Title: {result['title']}\nURL: {result['href']}\nSnippet: {result['body']}\n") return "\n---\n".join(formatted_results) except Exception as e: return f"Search failed: {str(e)}" # ============================================================================ # COMPANY VALIDATION # ============================================================================ def validate_company(company_name: str) -> str: """Validate if company exists using web search""" print(f"[ANALYSIS]: Validating '{company_name}'") try: search_query = f"{company_name} company business official site" results = web_search_tool(search_query) if is_company_valid_based_on_search(results, company_name): return f"✓ VALID COMPANY: {company_name} (verified via web search)" else: return f"✗ NOT VALID: No substantial evidence found for '{company_name}'" except Exception as e: return f"Validation error: {str(e)}" def is_company_valid_based_on_search(search_results: str, company_name: str) -> bool: """Analyze search results to determine if company is valid""" results_lower = search_results.lower() company_lower = company_name.lower() evidence_count = 0 if f"{company_lower}.com" in results_lower or f"{company_lower}.io" in results_lower: evidence_count += 1 if "official site" in results_lower or "official website" in results_lower: evidence_count += 1 if "company" in results_lower and company_lower in results_lower: evidence_count += 1 business_terms = ["corporation", "inc", "ltd", "llc", "business", "enterprise", "founded"] if any(term in results_lower for term in business_terms): evidence_count += 1 if "wikipedia" in results_lower or "news" in results_lower or "about" in results_lower: evidence_count += 1 return evidence_count >= 2 # ============================================================================ # SECTOR IDENTIFICATION # ============================================================================ def identify_sector(company_name: str) -> str: """Determine industry sector using multiple search strategies""" print(f"[ANALYSIS]: Identifying sector for '{company_name}'") try: all_sectors = [] results1 = web_search_tool(f"what does {company_name} do business industry") sectors1 = extract_sectors_advanced(results1, company_name) all_sectors.extend(sectors1) time.sleep(0.5) results2 = web_search_tool(f"{company_name} industry type sector") sectors2 = extract_sectors_advanced(results2, company_name) all_sectors.extend(sectors2) time.sleep(0.5) results3 = web_search_tool(f"{company_name} sector industry news") sectors3 = extract_sectors_advanced(results3, company_name) all_sectors.extend(sectors3) final_sector = determine_primary_sector(all_sectors) return final_sector if final_sector else "Unknown sector" except Exception as e: return f"Error identifying sector: {str(e)}" def extract_sectors_advanced(search_results: str, company_name: str) -> list: """Advanced sector extraction with context analysis""" results_lower = search_results.lower() company_lower = company_name.lower() sector_patterns = { "Technology": ["technology", "software", "hardware", "saas", "cloud", "ai", "artificial intelligence", "platform"], "Finance": ["financial", "banking", "investment", "fintech", "insurance", "bank", "payments"], "Healthcare": ["healthcare", "medical", "pharmaceutical", "biotech", "hospital", "health", "clinical"], "Education": ["education", "edtech", "e-learning", "online learning", "educational", "training"], "Retail": ["retail", "e-commerce", "online shopping", "marketplace", "commerce"], "Manufacturing": ["manufacturing", "industrial", "automotive", "electronics", "factory"], "Energy": ["energy", "renewable", "oil and gas", "solar", "power", "utility"], "Telecommunications": ["telecom", "communications", "network", "5g", "broadband"], } found_sectors = [] for sector, keywords in sector_patterns.items(): for keyword in keywords: if keyword in results_lower: if (company_lower in results_lower or any(phrase in results_lower for phrase in [f"is a {keyword}", f"in the {keyword}"])): found_sectors.extend([sector] * 2) else: found_sectors.append(sector) return found_sectors def determine_primary_sector(sectors_list: list) -> str: """Determine primary sector from list of found sectors""" if not sectors_list: return "" sector_counts = Counter(sectors_list) most_common = sector_counts.most_common(1)[0] if most_common[1] >= 2: return most_common[0] elif len(sector_counts) == 1 and most_common[1] >= 1: return most_common[0] return "" # ============================================================================ # COMPETITOR IDENTIFICATION # ============================================================================ def identify_competitors(sector: str, company_name: str) -> str: """Identify top 3 competitors using comprehensive web search""" print(f"[ANALYSIS]: Finding competitors in '{sector}' sector (excluding '{company_name}')") try: competitor_candidates = [] results1 = web_search_tool(f"top {sector} companies competitors market leaders") candidates1 = extract_competitors_advanced(results1, company_name, sector) competitor_candidates.extend(candidates1) time.sleep(0.5) results2 = web_search_tool(f"competitors of {company_name}") candidates2 = extract_competitors_advanced(results2, company_name, sector) competitor_candidates.extend(candidates2) time.sleep(0.5) results3 = web_search_tool(f"{sector} industry leaders key players") candidates3 = extract_competitors_advanced(results3, company_name, sector) competitor_candidates.extend(candidates3) final_competitors = rank_competitors(competitor_candidates, company_name) if final_competitors: top_3 = final_competitors[:3] return ", ".join(top_3) else: return "No competitors identified" except Exception as e: return f"Error identifying competitors: {str(e)}" def extract_competitors_advanced(search_results: str, exclude_company: str, sector: str) -> list: """Advanced competitor extraction with context awareness""" exclude_lower = exclude_company.lower() competitors = [] capitalized_pattern = r'\b[A-Z][a-zA-Z\s&]+(?:Inc|Corp|Ltd|LLC|AG|SE)?' matches = re.findall(capitalized_pattern, search_results) for match in matches: comp = match.strip() if (is_likely_company_name(comp) and comp.lower() != exclude_lower and comp not in competitors and len(comp) > 2): competitors.append(comp) list_patterns = [ r'(?:competitors?|companies|players|include)[:\s]+([^\.]+)', r'(?:including|such as)[:\s]+([^\.]+)', r'(?:top|leading|major)\s+\d*\s*([^:\.]+companies[^:\.]*)', ] for pattern in list_patterns: matches = re.findall(pattern, search_results, re.IGNORECASE) for match in matches: potential_companies = re.split(r',|\band\b|\bor\b|;', match) for comp in potential_companies: comp = comp.strip() if (is_likely_company_name(comp) and comp.lower() != exclude_lower and comp not in competitors): competitors.append(comp) return competitors def is_likely_company_name(text: str) -> bool: """Check if text looks like a company name""" if not text or len(text) < 2 or len(text) > 60: return False non_company_words = { 'the', 'and', 'or', 'but', 'with', 'for', 'from', 'that', 'this', 'these', 'those', 'their', 'other', 'some', 'such', 'including', 'etc', 'etc.', 'among', 'various', 'several', 'many', 'such' } words = text.lower().split() if any(word.strip() in non_company_words for word in words): return False return text[0].isupper() and any(c.isalpha() for c in text) def rank_competitors(competitor_candidates: list, exclude_company: str) -> list: """Rank competitors by frequency and relevance""" if not competitor_candidates: return [] exclude_lower = exclude_company.lower() filtered_competitors = [ comp for comp in competitor_candidates if comp.lower() != exclude_lower and comp.strip() ] if not filtered_competitors: return [] competitor_counts = Counter(filtered_competitors) return [comp for comp, count in competitor_counts.most_common()] # ============================================================================ # WEB BROWSING # ============================================================================ def browse_page(url: str, instructions: str) -> str: """Browse a webpage and extract information""" print(f"[ANALYSIS]: Browsing {url}") try: if not url.startswith(('http://', 'https://')): url = 'https://' + url content = fetch_webpage_content(url) if not content: return f"Failed to fetch content from {url}" extracted_text = extract_relevant_content(content, instructions) return extracted_text if extracted_text else "No relevant content found" except Exception as e: return f"Error browsing page: {str(e)}" def fetch_webpage_content(url: str) -> str: """Fetch webpage content with proper headers""" try: headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', } response = requests.get(url, headers=headers, timeout=10) response.raise_for_status() soup = BeautifulSoup(response.content, 'html.parser') for script in soup(["script", "style", "nav", "footer", "header", "meta"]): script.decompose() text_parts = [] for element in soup.find_all(['main', 'article', 'div', 'p']): text = element.get_text(strip=True) if text and len(text) > 20: text_parts.append(text) return ' '.join(text_parts[:5000]) except Exception as e: print(f"Error fetching {url}: {e}") return None def extract_relevant_content(content: str, instructions: str) -> str: """Extract content relevant to the instructions""" content_lower = content.lower() instructions_lower = instructions.lower() sentences = [s.strip() for s in content.split('.') if s.strip()] relevant_sentences = [] for sentence in sentences: sentence_lower = sentence.lower() instruction_words = set(instructions_lower.split()) sentence_words = set(sentence_lower.split()) matching_words = instruction_words.intersection(sentence_words) if len(matching_words) >= 1 and len(sentence) > 10: relevant_sentences.append(sentence) if not relevant_sentences and sentences: return '. '.join(sentences[:5]) + '...' return '. '.join(relevant_sentences[:10]) # ============================================================================ # REPORT GENERATION # ============================================================================ def generate_report(company_name: str, context: str) -> str: """Generate a competitive analysis report""" print("[ANALYSIS]: Generating report") competitors = extract_competitors_from_context(context) competitor_rows = "" for i, competitor in enumerate(competitors[:3]): competitor_rows += f"| {competitor} | Strategic insights | Pricing | Product | Market Position |\n" if not competitor_rows: competitor_rows = "| Competitor A | - | - | - | - |\n| Competitor B | - | - | - | - |\n| Competitor C | - | - | - | - |" report = f""" # Competitive Analysis Report: {company_name} ## Executive Summary Comprehensive analysis of {company_name}'s competitive position based on market research and strategic data. ## Key Findings - Industry position and market share indicators - Competitor strategic approaches - Differentiation opportunities ## Competitor Comparison | Competitor | Strategy | Pricing | Product Focus | Market Position | |------------|----------|---------|----------------|-----------------| {competitor_rows} ## Strategic Insights for {company_name} ### Strengths to Leverage - Define unique value propositions - Identify operational advantages - Highlight customer loyalty factors ### Competitive Opportunities - Market gaps and underserved segments - Innovation areas competitors are missing - Customer pain points to address ### Recommendations 1. **Differentiation**: Develop distinct positioning vs competitors 2. **Innovation**: Invest in unique features and capabilities 3. **Customer Focus**: Enhance engagement and retention strategies 4. **Market Expansion**: Identify new market segments and geographies 5. **Efficiency**: Optimize operations to improve margins ### Next Steps - Conduct detailed SWOT analysis - Develop targeted competitor response strategies - Monitor market movements and competitive activities - Implement differentiation initiatives --- *Report generated on {time.strftime('%Y-%m-%d %H:%M:%S')}* """ return report.strip() def extract_competitors_from_context(context: str) -> list: """Extract competitor names from context string""" competitors = [] if ", " in context: potential_competitors = context.split(", ") for comp in potential_competitors: if comp and len(comp) > 2 and comp[0].isupper(): competitors.append(comp) competitor_patterns = [ r'competitors?[:\s]+([^\.\n]+)', r'top.*companies?[:\s]+([^\.\n]+)', ] for pattern in competitor_patterns: matches = re.findall(pattern, context, re.IGNORECASE) for match in matches: found_comps = re.split(r',|\band\b', match) competitors.extend([comp.strip() for comp in found_comps if comp.strip()]) return list(set(competitors))[:5] # ============================================================================ # COMPETITIVE ANALYSIS ENGINE (Consolidated from mcp_client.py) # ============================================================================ class CompetitiveAnalysisAgent: def __init__(self, openai_api_key: str): """Initialize the competitive analysis agent""" self.client = OpenAI(api_key=openai_api_key) self.model = "gpt-4" self.system_prompt = """ You are an expert Competitive Analysis Agent. Your role is to: 1. Validate that the input company is a real business 2. Identify its primary industry sector 3. Discover its top 3 competitors 4. Gather strategic data about competitors (pricing, products, marketing) 5. Generate a comprehensive competitive analysis report with actionable insights Use logical reasoning to gather information and synthesize insights. Focus exclusively on the provided company and its top 3 competitors. Generate insights that help the company outperform its competitors. """ def analyze_company(self, company_name: str) -> str: """Perform comprehensive competitive analysis for a company""" print(f"\n{'='*60}") print(f"Starting competitive analysis for: {company_name}") print(f"{'='*60}\n") try: analysis_steps = [] # Step 1: Validate company print("Step 1: Validating company...") validation = validate_company(company_name) analysis_steps.append(validation) if "NOT" in validation and "VALID" not in validation: return f"❌ Company validation failed:\n{validation}\n\nPlease check the company name and try again." # Step 2: Identify sector print("Step 2: Identifying sector...") sector = identify_sector(company_name) analysis_steps.append(f"Sector: {sector}") # Step 3: Identify competitors print("Step 3: Finding competitors...") competitors = identify_competitors(sector, company_name) analysis_steps.append(f"Competitors: {competitors}") # Step 4: Generate report using OpenAI print("Step 4: Generating strategic insights...") context = "\n".join(analysis_steps) messages = [ { "role": "system", "content": self.system_prompt }, { "role": "user", "content": f""" Based on this analysis so far: {context} Generate a detailed competitive analysis report for {company_name} including: - Company overview and market position - Top competitors analysis - Competitive advantages and disadvantages - If possible, specific strategic recommendations Format as a professional Markdown report. """ } ] response = self.client.chat.completions.create( model=self.model, messages=messages, temperature=0.7, max_tokens=2000, ) # Combine analysis with OpenAI insights openai_insights = response.choices[0].message.content # Generate final report report = generate_report(company_name, context) # Append OpenAI insights final_report = f"{report}\n\n## AI-Generated Strategic Insights\n\n{openai_insights}" return final_report except Exception as e: return f"❌ Error during analysis: {str(e)}\n\nPlease check your API key and try again." # ============================================================================ # GRADIO INTERFACE # ============================================================================ def analyze_competitors_interface(company: str, openai_key: str) -> str: """Interface function for Gradio""" # Validate inputs if not company or len(company.strip()) < 2: return "❌ **Error**: Please enter a valid company name." if not openai_key or len(openai_key.strip()) < 10: return "❌ **Error**: Please enter a valid OpenAI API key." # Perform analysis try: agent = CompetitiveAnalysisAgent(openai_key) report = agent.analyze_company(company) return report except Exception as e: return f"❌ **Error during analysis**: {str(e)}\n\nPlease check your API key and try again." def create_interface(): """Create and configure the Gradio interface""" with gr.Blocks(title="Competitive Analysis Agent") as demo: gr.Markdown( """ # 🏆 Competitive Analysis Agent Analyze competitors for any company using AI-powered research and strategic insights. ### How it works: 1. **Enter** a company name you want to analyze 2. **Provide** your OpenAI API key (kept securely, not stored) 3. **Click** "Analyze" to generate a comprehensive competitive analysis report The agent will identify competitors, analyze their strategies, and provide actionable insights. """ ) with gr.Row(): with gr.Column(scale=1): company_input = gr.Textbox( label="Company Name", placeholder="e.g., Tesla, Spotify, Microsoft", lines=1 ) api_key_input = gr.Textbox( label="OpenAI API Key", placeholder="sk-...", type="password", lines=1 ) analyze_button = gr.Button( "🔍 Analyze Competitors", variant="primary", scale=1 ) with gr.Row(): output = gr.Markdown( label="Competitive Analysis Report", value="*Enter a company name and submit to generate analysis report...*" ) # Set up button click action analyze_button.click( fn=analyze_competitors_interface, inputs=[company_input, api_key_input], outputs=output ) # Allow Enter key to trigger analysis company_input.submit( fn=analyze_competitors_interface, inputs=[company_input, api_key_input], outputs=output ) # Add footer with information gr.Markdown( """ --- ### 📋 What's Included in the Report: - ✅ Company validation and industry sector identification - ✅ Top 3 competitor identification - ✅ Competitor strategy analysis and comparison - ✅ Executive summary with key findings - ✅ Actionable recommendations for competitive advantage ### 🔒 Privacy & Security: Your OpenAI API key is **NEVER stored or logged**. It's used only for this analysis session. ### ⚡ Tips for Better Results: - Use well-known company names for more accurate analysis - The analysis is generated using latest market data and AI models - For best results, provide accurate company names """ ) return demo # ============================================================================ # MAIN ENTRY POINT # ============================================================================ if __name__ == "__main__": interface = create_interface() interface.launch( server_name="0.0.0.0", server_port=7860, share=False, show_error=True, theme=gr.themes.Soft() )