Spaces:
Runtime error
Runtime error
| """ | |
| Competitive Analysis Agent - All-in-One Application | |
| Complete system with analysis logic and Gradio web interface. | |
| No separate MCP server needed - everything runs in one process. | |
| """ | |
| import re | |
| import time | |
| from collections import Counter | |
| import requests | |
| from bs4 import BeautifulSoup | |
| from duckduckgo_search import DDGS | |
| from openai import OpenAI | |
| import gradio as gr | |
| # ============================================================================ | |
| # ANALYSIS TOOLS (Consolidated from mcp_server.py) | |
| # ============================================================================ | |
| def web_search_tool(query: str, max_results: int = 5) -> str: | |
| """Perform web search using DuckDuckGo""" | |
| try: | |
| with DDGS() as ddgs: | |
| results = list(ddgs.text(query, max_results=max_results)) | |
| formatted_results = [] | |
| for result in results: | |
| formatted_results.append(f"Title: {result['title']}\nURL: {result['href']}\nSnippet: {result['body']}\n") | |
| return "\n---\n".join(formatted_results) | |
| except Exception as e: | |
| return f"Search failed: {str(e)}" | |
| # ============================================================================ | |
| # COMPANY VALIDATION | |
| # ============================================================================ | |
| def validate_company(company_name: str) -> str: | |
| """Validate if company exists using web search""" | |
| print(f"[ANALYSIS]: Validating '{company_name}'") | |
| try: | |
| search_query = f"{company_name} company business official site" | |
| results = web_search_tool(search_query) | |
| if is_company_valid_based_on_search(results, company_name): | |
| return f"β VALID COMPANY: {company_name} (verified via web search)" | |
| else: | |
| return f"β NOT VALID: No substantial evidence found for '{company_name}'" | |
| except Exception as e: | |
| return f"Validation error: {str(e)}" | |
| def is_company_valid_based_on_search(search_results: str, company_name: str) -> bool: | |
| """Analyze search results to determine if company is valid""" | |
| results_lower = search_results.lower() | |
| company_lower = company_name.lower() | |
| evidence_count = 0 | |
| if f"{company_lower}.com" in results_lower or f"{company_lower}.io" in results_lower: | |
| evidence_count += 1 | |
| if "official site" in results_lower or "official website" in results_lower: | |
| evidence_count += 1 | |
| if "company" in results_lower and company_lower in results_lower: | |
| evidence_count += 1 | |
| business_terms = ["corporation", "inc", "ltd", "llc", "business", "enterprise", "founded"] | |
| if any(term in results_lower for term in business_terms): | |
| evidence_count += 1 | |
| if "wikipedia" in results_lower or "news" in results_lower or "about" in results_lower: | |
| evidence_count += 1 | |
| return evidence_count >= 2 | |
| # ============================================================================ | |
| # SECTOR IDENTIFICATION | |
| # ============================================================================ | |
| def identify_sector(company_name: str) -> str: | |
| """Determine industry sector using multiple search strategies""" | |
| print(f"[ANALYSIS]: Identifying sector for '{company_name}'") | |
| try: | |
| all_sectors = [] | |
| results1 = web_search_tool(f"what does {company_name} do business industry") | |
| sectors1 = extract_sectors_advanced(results1, company_name) | |
| all_sectors.extend(sectors1) | |
| time.sleep(0.5) | |
| results2 = web_search_tool(f"{company_name} industry type sector") | |
| sectors2 = extract_sectors_advanced(results2, company_name) | |
| all_sectors.extend(sectors2) | |
| time.sleep(0.5) | |
| results3 = web_search_tool(f"{company_name} sector industry news") | |
| sectors3 = extract_sectors_advanced(results3, company_name) | |
| all_sectors.extend(sectors3) | |
| final_sector = determine_primary_sector(all_sectors) | |
| return final_sector if final_sector else "Unknown sector" | |
| except Exception as e: | |
| return f"Error identifying sector: {str(e)}" | |
| def extract_sectors_advanced(search_results: str, company_name: str) -> list: | |
| """Advanced sector extraction with context analysis""" | |
| results_lower = search_results.lower() | |
| company_lower = company_name.lower() | |
| sector_patterns = { | |
| "Technology": ["technology", "software", "hardware", "saas", "cloud", "ai", "artificial intelligence", "platform"], | |
| "Finance": ["financial", "banking", "investment", "fintech", "insurance", "bank", "payments"], | |
| "Healthcare": ["healthcare", "medical", "pharmaceutical", "biotech", "hospital", "health", "clinical"], | |
| "Education": ["education", "edtech", "e-learning", "online learning", "educational", "training"], | |
| "Retail": ["retail", "e-commerce", "online shopping", "marketplace", "commerce"], | |
| "Manufacturing": ["manufacturing", "industrial", "automotive", "electronics", "factory"], | |
| "Energy": ["energy", "renewable", "oil and gas", "solar", "power", "utility"], | |
| "Telecommunications": ["telecom", "communications", "network", "5g", "broadband"], | |
| } | |
| found_sectors = [] | |
| for sector, keywords in sector_patterns.items(): | |
| for keyword in keywords: | |
| if keyword in results_lower: | |
| if (company_lower in results_lower or | |
| any(phrase in results_lower for phrase in [f"is a {keyword}", f"in the {keyword}"])): | |
| found_sectors.extend([sector] * 2) | |
| else: | |
| found_sectors.append(sector) | |
| return found_sectors | |
| def determine_primary_sector(sectors_list: list) -> str: | |
| """Determine primary sector from list of found sectors""" | |
| if not sectors_list: | |
| return "" | |
| sector_counts = Counter(sectors_list) | |
| most_common = sector_counts.most_common(1)[0] | |
| if most_common[1] >= 2: | |
| return most_common[0] | |
| elif len(sector_counts) == 1 and most_common[1] >= 1: | |
| return most_common[0] | |
| return "" | |
| # ============================================================================ | |
| # COMPETITOR IDENTIFICATION | |
| # ============================================================================ | |
| def identify_competitors(sector: str, company_name: str) -> str: | |
| """Identify top 3 competitors using comprehensive web search""" | |
| print(f"[ANALYSIS]: Finding competitors in '{sector}' sector (excluding '{company_name}')") | |
| try: | |
| competitor_candidates = [] | |
| results1 = web_search_tool(f"top {sector} companies competitors market leaders") | |
| candidates1 = extract_competitors_advanced(results1, company_name, sector) | |
| competitor_candidates.extend(candidates1) | |
| time.sleep(0.5) | |
| results2 = web_search_tool(f"competitors of {company_name}") | |
| candidates2 = extract_competitors_advanced(results2, company_name, sector) | |
| competitor_candidates.extend(candidates2) | |
| time.sleep(0.5) | |
| results3 = web_search_tool(f"{sector} industry leaders key players") | |
| candidates3 = extract_competitors_advanced(results3, company_name, sector) | |
| competitor_candidates.extend(candidates3) | |
| final_competitors = rank_competitors(competitor_candidates, company_name) | |
| if final_competitors: | |
| top_3 = final_competitors[:3] | |
| return ", ".join(top_3) | |
| else: | |
| return "No competitors identified" | |
| except Exception as e: | |
| return f"Error identifying competitors: {str(e)}" | |
| def extract_competitors_advanced(search_results: str, exclude_company: str, sector: str) -> list: | |
| """Advanced competitor extraction with context awareness""" | |
| exclude_lower = exclude_company.lower() | |
| competitors = [] | |
| capitalized_pattern = r'\b[A-Z][a-zA-Z\s&]+(?:Inc|Corp|Ltd|LLC|AG|SE)?' | |
| matches = re.findall(capitalized_pattern, search_results) | |
| for match in matches: | |
| comp = match.strip() | |
| if (is_likely_company_name(comp) and | |
| comp.lower() != exclude_lower and | |
| comp not in competitors and | |
| len(comp) > 2): | |
| competitors.append(comp) | |
| list_patterns = [ | |
| r'(?:competitors?|companies|players|include)[:\s]+([^\.]+)', | |
| r'(?:including|such as)[:\s]+([^\.]+)', | |
| r'(?:top|leading|major)\s+\d*\s*([^:\.]+companies[^:\.]*)', | |
| ] | |
| for pattern in list_patterns: | |
| matches = re.findall(pattern, search_results, re.IGNORECASE) | |
| for match in matches: | |
| potential_companies = re.split(r',|\band\b|\bor\b|;', match) | |
| for comp in potential_companies: | |
| comp = comp.strip() | |
| if (is_likely_company_name(comp) and | |
| comp.lower() != exclude_lower and | |
| comp not in competitors): | |
| competitors.append(comp) | |
| return competitors | |
| def is_likely_company_name(text: str) -> bool: | |
| """Check if text looks like a company name""" | |
| if not text or len(text) < 2 or len(text) > 60: | |
| return False | |
| non_company_words = { | |
| 'the', 'and', 'or', 'but', 'with', 'for', 'from', 'that', 'this', | |
| 'these', 'those', 'their', 'other', 'some', 'such', 'including', | |
| 'etc', 'etc.', 'among', 'various', 'several', 'many', 'such' | |
| } | |
| words = text.lower().split() | |
| if any(word.strip() in non_company_words for word in words): | |
| return False | |
| return text[0].isupper() and any(c.isalpha() for c in text) | |
| def rank_competitors(competitor_candidates: list, exclude_company: str) -> list: | |
| """Rank competitors by frequency and relevance""" | |
| if not competitor_candidates: | |
| return [] | |
| exclude_lower = exclude_company.lower() | |
| filtered_competitors = [ | |
| comp for comp in competitor_candidates | |
| if comp.lower() != exclude_lower and comp.strip() | |
| ] | |
| if not filtered_competitors: | |
| return [] | |
| competitor_counts = Counter(filtered_competitors) | |
| return [comp for comp, count in competitor_counts.most_common()] | |
| # ============================================================================ | |
| # WEB BROWSING | |
| # ============================================================================ | |
| def browse_page(url: str, instructions: str) -> str: | |
| """Browse a webpage and extract information""" | |
| print(f"[ANALYSIS]: Browsing {url}") | |
| try: | |
| if not url.startswith(('http://', 'https://')): | |
| url = 'https://' + url | |
| content = fetch_webpage_content(url) | |
| if not content: | |
| return f"Failed to fetch content from {url}" | |
| extracted_text = extract_relevant_content(content, instructions) | |
| return extracted_text if extracted_text else "No relevant content found" | |
| except Exception as e: | |
| return f"Error browsing page: {str(e)}" | |
| def fetch_webpage_content(url: str) -> str: | |
| """Fetch webpage content with proper headers""" | |
| try: | |
| headers = { | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', | |
| 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', | |
| } | |
| response = requests.get(url, headers=headers, timeout=10) | |
| response.raise_for_status() | |
| soup = BeautifulSoup(response.content, 'html.parser') | |
| for script in soup(["script", "style", "nav", "footer", "header", "meta"]): | |
| script.decompose() | |
| text_parts = [] | |
| for element in soup.find_all(['main', 'article', 'div', 'p']): | |
| text = element.get_text(strip=True) | |
| if text and len(text) > 20: | |
| text_parts.append(text) | |
| return ' '.join(text_parts[:5000]) | |
| except Exception as e: | |
| print(f"Error fetching {url}: {e}") | |
| return None | |
| def extract_relevant_content(content: str, instructions: str) -> str: | |
| """Extract content relevant to the instructions""" | |
| content_lower = content.lower() | |
| instructions_lower = instructions.lower() | |
| sentences = [s.strip() for s in content.split('.') if s.strip()] | |
| relevant_sentences = [] | |
| for sentence in sentences: | |
| sentence_lower = sentence.lower() | |
| instruction_words = set(instructions_lower.split()) | |
| sentence_words = set(sentence_lower.split()) | |
| matching_words = instruction_words.intersection(sentence_words) | |
| if len(matching_words) >= 1 and len(sentence) > 10: | |
| relevant_sentences.append(sentence) | |
| if not relevant_sentences and sentences: | |
| return '. '.join(sentences[:5]) + '...' | |
| return '. '.join(relevant_sentences[:10]) | |
| # ============================================================================ | |
| # REPORT GENERATION | |
| # ============================================================================ | |
| def generate_report(company_name: str, context: str) -> str: | |
| """Generate a competitive analysis report""" | |
| print("[ANALYSIS]: Generating report") | |
| competitors = extract_competitors_from_context(context) | |
| competitor_rows = "" | |
| for i, competitor in enumerate(competitors[:3]): | |
| competitor_rows += f"| {competitor} | Strategic insights | Pricing | Product | Market Position |\n" | |
| if not competitor_rows: | |
| competitor_rows = "| Competitor A | - | - | - | - |\n| Competitor B | - | - | - | - |\n| Competitor C | - | - | - | - |" | |
| report = f""" | |
| # Competitive Analysis Report: {company_name} | |
| ## Executive Summary | |
| Comprehensive analysis of {company_name}'s competitive position based on market research and strategic data. | |
| ## Key Findings | |
| - Industry position and market share indicators | |
| - Competitor strategic approaches | |
| - Differentiation opportunities | |
| ## Competitor Comparison | |
| | Competitor | Strategy | Pricing | Product Focus | Market Position | | |
| |------------|----------|---------|----------------|-----------------| | |
| {competitor_rows} | |
| ## Strategic Insights for {company_name} | |
| ### Strengths to Leverage | |
| - Define unique value propositions | |
| - Identify operational advantages | |
| - Highlight customer loyalty factors | |
| ### Competitive Opportunities | |
| - Market gaps and underserved segments | |
| - Innovation areas competitors are missing | |
| - Customer pain points to address | |
| ### Recommendations | |
| 1. **Differentiation**: Develop distinct positioning vs competitors | |
| 2. **Innovation**: Invest in unique features and capabilities | |
| 3. **Customer Focus**: Enhance engagement and retention strategies | |
| 4. **Market Expansion**: Identify new market segments and geographies | |
| 5. **Efficiency**: Optimize operations to improve margins | |
| ### Next Steps | |
| - Conduct detailed SWOT analysis | |
| - Develop targeted competitor response strategies | |
| - Monitor market movements and competitive activities | |
| - Implement differentiation initiatives | |
| --- | |
| *Report generated on {time.strftime('%Y-%m-%d %H:%M:%S')}* | |
| """ | |
| return report.strip() | |
| def extract_competitors_from_context(context: str) -> list: | |
| """Extract competitor names from context string""" | |
| competitors = [] | |
| if ", " in context: | |
| potential_competitors = context.split(", ") | |
| for comp in potential_competitors: | |
| if comp and len(comp) > 2 and comp[0].isupper(): | |
| competitors.append(comp) | |
| competitor_patterns = [ | |
| r'competitors?[:\s]+([^\.\n]+)', | |
| r'top.*companies?[:\s]+([^\.\n]+)', | |
| ] | |
| for pattern in competitor_patterns: | |
| matches = re.findall(pattern, context, re.IGNORECASE) | |
| for match in matches: | |
| found_comps = re.split(r',|\band\b', match) | |
| competitors.extend([comp.strip() for comp in found_comps if comp.strip()]) | |
| return list(set(competitors))[:5] | |
| # ============================================================================ | |
| # COMPETITIVE ANALYSIS ENGINE (Consolidated from mcp_client.py) | |
| # ============================================================================ | |
| class CompetitiveAnalysisAgent: | |
| def __init__(self, openai_api_key: str): | |
| """Initialize the competitive analysis agent""" | |
| self.client = OpenAI(api_key=openai_api_key) | |
| self.model = "gpt-4" | |
| self.system_prompt = """ | |
| You are an expert Competitive Analysis Agent. Your role is to: | |
| 1. Validate that the input company is a real business | |
| 2. Identify its primary industry sector | |
| 3. Discover its top 3 competitors | |
| 4. Gather strategic data about competitors (pricing, products, marketing) | |
| 5. Generate a comprehensive competitive analysis report with actionable insights | |
| Use logical reasoning to gather information and synthesize insights. | |
| Focus exclusively on the provided company and its top 3 competitors. | |
| Generate insights that help the company outperform its competitors. | |
| """ | |
| def analyze_company(self, company_name: str) -> str: | |
| """Perform comprehensive competitive analysis for a company""" | |
| print(f"\n{'='*60}") | |
| print(f"Starting competitive analysis for: {company_name}") | |
| print(f"{'='*60}\n") | |
| try: | |
| analysis_steps = [] | |
| # Step 1: Validate company | |
| print("Step 1: Validating company...") | |
| validation = validate_company(company_name) | |
| analysis_steps.append(validation) | |
| if "NOT" in validation and "VALID" not in validation: | |
| return f"β Company validation failed:\n{validation}\n\nPlease check the company name and try again." | |
| # Step 2: Identify sector | |
| print("Step 2: Identifying sector...") | |
| sector = identify_sector(company_name) | |
| analysis_steps.append(f"Sector: {sector}") | |
| # Step 3: Identify competitors | |
| print("Step 3: Finding competitors...") | |
| competitors = identify_competitors(sector, company_name) | |
| analysis_steps.append(f"Competitors: {competitors}") | |
| # Step 4: Generate report using OpenAI | |
| print("Step 4: Generating strategic insights...") | |
| context = "\n".join(analysis_steps) | |
| messages = [ | |
| { | |
| "role": "system", | |
| "content": self.system_prompt | |
| }, | |
| { | |
| "role": "user", | |
| "content": f""" | |
| Based on this analysis so far: | |
| {context} | |
| Generate a detailed competitive analysis report for {company_name} including: | |
| - Company overview and market position | |
| - Top competitors analysis | |
| - Competitive advantages and disadvantages | |
| - If possible, specific strategic recommendations | |
| Format as a professional Markdown report. | |
| """ | |
| } | |
| ] | |
| response = self.client.chat.completions.create( | |
| model=self.model, | |
| messages=messages, | |
| temperature=0.7, | |
| max_tokens=2000, | |
| ) | |
| # Combine analysis with OpenAI insights | |
| openai_insights = response.choices[0].message.content | |
| # Generate final report | |
| report = generate_report(company_name, context) | |
| # Append OpenAI insights | |
| final_report = f"{report}\n\n## AI-Generated Strategic Insights\n\n{openai_insights}" | |
| return final_report | |
| except Exception as e: | |
| return f"β Error during analysis: {str(e)}\n\nPlease check your API key and try again." | |
| # ============================================================================ | |
| # GRADIO INTERFACE | |
| # ============================================================================ | |
| def analyze_competitors_interface(company: str, openai_key: str) -> str: | |
| """Interface function for Gradio""" | |
| # Validate inputs | |
| if not company or len(company.strip()) < 2: | |
| return "β **Error**: Please enter a valid company name." | |
| if not openai_key or len(openai_key.strip()) < 10: | |
| return "β **Error**: Please enter a valid OpenAI API key." | |
| # Perform analysis | |
| try: | |
| agent = CompetitiveAnalysisAgent(openai_key) | |
| report = agent.analyze_company(company) | |
| return report | |
| except Exception as e: | |
| return f"β **Error during analysis**: {str(e)}\n\nPlease check your API key and try again." | |
| def create_interface(): | |
| """Create and configure the Gradio interface""" | |
| with gr.Blocks(title="Competitive Analysis Agent") as demo: | |
| gr.Markdown( | |
| """ | |
| # π Competitive Analysis Agent | |
| Analyze competitors for any company using AI-powered research and strategic insights. | |
| ### How it works: | |
| 1. **Enter** a company name you want to analyze | |
| 2. **Provide** your OpenAI API key (kept securely, not stored) | |
| 3. **Click** "Analyze" to generate a comprehensive competitive analysis report | |
| The agent will identify competitors, analyze their strategies, and provide actionable insights. | |
| """ | |
| ) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| company_input = gr.Textbox( | |
| label="Company Name", | |
| placeholder="e.g., Tesla, Spotify, Microsoft", | |
| lines=1 | |
| ) | |
| api_key_input = gr.Textbox( | |
| label="OpenAI API Key", | |
| placeholder="sk-...", | |
| type="password", | |
| lines=1 | |
| ) | |
| analyze_button = gr.Button( | |
| "π Analyze Competitors", | |
| variant="primary", | |
| scale=1 | |
| ) | |
| with gr.Row(): | |
| output = gr.Markdown( | |
| label="Competitive Analysis Report", | |
| value="*Enter a company name and submit to generate analysis report...*" | |
| ) | |
| # Set up button click action | |
| analyze_button.click( | |
| fn=analyze_competitors_interface, | |
| inputs=[company_input, api_key_input], | |
| outputs=output | |
| ) | |
| # Allow Enter key to trigger analysis | |
| company_input.submit( | |
| fn=analyze_competitors_interface, | |
| inputs=[company_input, api_key_input], | |
| outputs=output | |
| ) | |
| # Add footer with information | |
| gr.Markdown( | |
| """ | |
| --- | |
| ### π What's Included in the Report: | |
| - β Company validation and industry sector identification | |
| - β Top 3 competitor identification | |
| - β Competitor strategy analysis and comparison | |
| - β Executive summary with key findings | |
| - β Actionable recommendations for competitive advantage | |
| ### π Privacy & Security: | |
| Your OpenAI API key is **NEVER stored or logged**. It's used only for this analysis session. | |
| ### β‘ Tips for Better Results: | |
| - Use well-known company names for more accurate analysis | |
| - The analysis is generated using latest market data and AI models | |
| - For best results, provide accurate company names | |
| """ | |
| ) | |
| return demo | |
| # ============================================================================ | |
| # MAIN ENTRY POINT | |
| # ============================================================================ | |
| if __name__ == "__main__": | |
| interface = create_interface() | |
| interface.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=False, | |
| show_error=True, | |
| theme=gr.themes.Soft() | |
| ) |