desolo-2918's picture
Move sync action to .github/workflows
5d1056c
"""
Competitive Analysis Agent - All-in-One Application
Complete system with analysis logic and Gradio web interface.
No separate MCP server needed - everything runs in one process.
"""
import re
import time
from collections import Counter
import requests
from bs4 import BeautifulSoup
from duckduckgo_search import DDGS
from openai import OpenAI
import gradio as gr
# ============================================================================
# ANALYSIS TOOLS (Consolidated from mcp_server.py)
# ============================================================================
def web_search_tool(query: str, max_results: int = 5) -> str:
"""Perform web search using DuckDuckGo"""
try:
with DDGS() as ddgs:
results = list(ddgs.text(query, max_results=max_results))
formatted_results = []
for result in results:
formatted_results.append(f"Title: {result['title']}\nURL: {result['href']}\nSnippet: {result['body']}\n")
return "\n---\n".join(formatted_results)
except Exception as e:
return f"Search failed: {str(e)}"
# ============================================================================
# COMPANY VALIDATION
# ============================================================================
def validate_company(company_name: str) -> str:
"""Validate if company exists using web search"""
print(f"[ANALYSIS]: Validating '{company_name}'")
try:
search_query = f"{company_name} company business official site"
results = web_search_tool(search_query)
if is_company_valid_based_on_search(results, company_name):
return f"βœ“ VALID COMPANY: {company_name} (verified via web search)"
else:
return f"βœ— NOT VALID: No substantial evidence found for '{company_name}'"
except Exception as e:
return f"Validation error: {str(e)}"
def is_company_valid_based_on_search(search_results: str, company_name: str) -> bool:
"""Analyze search results to determine if company is valid"""
results_lower = search_results.lower()
company_lower = company_name.lower()
evidence_count = 0
if f"{company_lower}.com" in results_lower or f"{company_lower}.io" in results_lower:
evidence_count += 1
if "official site" in results_lower or "official website" in results_lower:
evidence_count += 1
if "company" in results_lower and company_lower in results_lower:
evidence_count += 1
business_terms = ["corporation", "inc", "ltd", "llc", "business", "enterprise", "founded"]
if any(term in results_lower for term in business_terms):
evidence_count += 1
if "wikipedia" in results_lower or "news" in results_lower or "about" in results_lower:
evidence_count += 1
return evidence_count >= 2
# ============================================================================
# SECTOR IDENTIFICATION
# ============================================================================
def identify_sector(company_name: str) -> str:
"""Determine industry sector using multiple search strategies"""
print(f"[ANALYSIS]: Identifying sector for '{company_name}'")
try:
all_sectors = []
results1 = web_search_tool(f"what does {company_name} do business industry")
sectors1 = extract_sectors_advanced(results1, company_name)
all_sectors.extend(sectors1)
time.sleep(0.5)
results2 = web_search_tool(f"{company_name} industry type sector")
sectors2 = extract_sectors_advanced(results2, company_name)
all_sectors.extend(sectors2)
time.sleep(0.5)
results3 = web_search_tool(f"{company_name} sector industry news")
sectors3 = extract_sectors_advanced(results3, company_name)
all_sectors.extend(sectors3)
final_sector = determine_primary_sector(all_sectors)
return final_sector if final_sector else "Unknown sector"
except Exception as e:
return f"Error identifying sector: {str(e)}"
def extract_sectors_advanced(search_results: str, company_name: str) -> list:
"""Advanced sector extraction with context analysis"""
results_lower = search_results.lower()
company_lower = company_name.lower()
sector_patterns = {
"Technology": ["technology", "software", "hardware", "saas", "cloud", "ai", "artificial intelligence", "platform"],
"Finance": ["financial", "banking", "investment", "fintech", "insurance", "bank", "payments"],
"Healthcare": ["healthcare", "medical", "pharmaceutical", "biotech", "hospital", "health", "clinical"],
"Education": ["education", "edtech", "e-learning", "online learning", "educational", "training"],
"Retail": ["retail", "e-commerce", "online shopping", "marketplace", "commerce"],
"Manufacturing": ["manufacturing", "industrial", "automotive", "electronics", "factory"],
"Energy": ["energy", "renewable", "oil and gas", "solar", "power", "utility"],
"Telecommunications": ["telecom", "communications", "network", "5g", "broadband"],
}
found_sectors = []
for sector, keywords in sector_patterns.items():
for keyword in keywords:
if keyword in results_lower:
if (company_lower in results_lower or
any(phrase in results_lower for phrase in [f"is a {keyword}", f"in the {keyword}"])):
found_sectors.extend([sector] * 2)
else:
found_sectors.append(sector)
return found_sectors
def determine_primary_sector(sectors_list: list) -> str:
"""Determine primary sector from list of found sectors"""
if not sectors_list:
return ""
sector_counts = Counter(sectors_list)
most_common = sector_counts.most_common(1)[0]
if most_common[1] >= 2:
return most_common[0]
elif len(sector_counts) == 1 and most_common[1] >= 1:
return most_common[0]
return ""
# ============================================================================
# COMPETITOR IDENTIFICATION
# ============================================================================
def identify_competitors(sector: str, company_name: str) -> str:
"""Identify top 3 competitors using comprehensive web search"""
print(f"[ANALYSIS]: Finding competitors in '{sector}' sector (excluding '{company_name}')")
try:
competitor_candidates = []
results1 = web_search_tool(f"top {sector} companies competitors market leaders")
candidates1 = extract_competitors_advanced(results1, company_name, sector)
competitor_candidates.extend(candidates1)
time.sleep(0.5)
results2 = web_search_tool(f"competitors of {company_name}")
candidates2 = extract_competitors_advanced(results2, company_name, sector)
competitor_candidates.extend(candidates2)
time.sleep(0.5)
results3 = web_search_tool(f"{sector} industry leaders key players")
candidates3 = extract_competitors_advanced(results3, company_name, sector)
competitor_candidates.extend(candidates3)
final_competitors = rank_competitors(competitor_candidates, company_name)
if final_competitors:
top_3 = final_competitors[:3]
return ", ".join(top_3)
else:
return "No competitors identified"
except Exception as e:
return f"Error identifying competitors: {str(e)}"
def extract_competitors_advanced(search_results: str, exclude_company: str, sector: str) -> list:
"""Advanced competitor extraction with context awareness"""
exclude_lower = exclude_company.lower()
competitors = []
capitalized_pattern = r'\b[A-Z][a-zA-Z\s&]+(?:Inc|Corp|Ltd|LLC|AG|SE)?'
matches = re.findall(capitalized_pattern, search_results)
for match in matches:
comp = match.strip()
if (is_likely_company_name(comp) and
comp.lower() != exclude_lower and
comp not in competitors and
len(comp) > 2):
competitors.append(comp)
list_patterns = [
r'(?:competitors?|companies|players|include)[:\s]+([^\.]+)',
r'(?:including|such as)[:\s]+([^\.]+)',
r'(?:top|leading|major)\s+\d*\s*([^:\.]+companies[^:\.]*)',
]
for pattern in list_patterns:
matches = re.findall(pattern, search_results, re.IGNORECASE)
for match in matches:
potential_companies = re.split(r',|\band\b|\bor\b|;', match)
for comp in potential_companies:
comp = comp.strip()
if (is_likely_company_name(comp) and
comp.lower() != exclude_lower and
comp not in competitors):
competitors.append(comp)
return competitors
def is_likely_company_name(text: str) -> bool:
"""Check if text looks like a company name"""
if not text or len(text) < 2 or len(text) > 60:
return False
non_company_words = {
'the', 'and', 'or', 'but', 'with', 'for', 'from', 'that', 'this',
'these', 'those', 'their', 'other', 'some', 'such', 'including',
'etc', 'etc.', 'among', 'various', 'several', 'many', 'such'
}
words = text.lower().split()
if any(word.strip() in non_company_words for word in words):
return False
return text[0].isupper() and any(c.isalpha() for c in text)
def rank_competitors(competitor_candidates: list, exclude_company: str) -> list:
"""Rank competitors by frequency and relevance"""
if not competitor_candidates:
return []
exclude_lower = exclude_company.lower()
filtered_competitors = [
comp for comp in competitor_candidates
if comp.lower() != exclude_lower and comp.strip()
]
if not filtered_competitors:
return []
competitor_counts = Counter(filtered_competitors)
return [comp for comp, count in competitor_counts.most_common()]
# ============================================================================
# WEB BROWSING
# ============================================================================
def browse_page(url: str, instructions: str) -> str:
"""Browse a webpage and extract information"""
print(f"[ANALYSIS]: Browsing {url}")
try:
if not url.startswith(('http://', 'https://')):
url = 'https://' + url
content = fetch_webpage_content(url)
if not content:
return f"Failed to fetch content from {url}"
extracted_text = extract_relevant_content(content, instructions)
return extracted_text if extracted_text else "No relevant content found"
except Exception as e:
return f"Error browsing page: {str(e)}"
def fetch_webpage_content(url: str) -> str:
"""Fetch webpage content with proper headers"""
try:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
}
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.content, 'html.parser')
for script in soup(["script", "style", "nav", "footer", "header", "meta"]):
script.decompose()
text_parts = []
for element in soup.find_all(['main', 'article', 'div', 'p']):
text = element.get_text(strip=True)
if text and len(text) > 20:
text_parts.append(text)
return ' '.join(text_parts[:5000])
except Exception as e:
print(f"Error fetching {url}: {e}")
return None
def extract_relevant_content(content: str, instructions: str) -> str:
"""Extract content relevant to the instructions"""
content_lower = content.lower()
instructions_lower = instructions.lower()
sentences = [s.strip() for s in content.split('.') if s.strip()]
relevant_sentences = []
for sentence in sentences:
sentence_lower = sentence.lower()
instruction_words = set(instructions_lower.split())
sentence_words = set(sentence_lower.split())
matching_words = instruction_words.intersection(sentence_words)
if len(matching_words) >= 1 and len(sentence) > 10:
relevant_sentences.append(sentence)
if not relevant_sentences and sentences:
return '. '.join(sentences[:5]) + '...'
return '. '.join(relevant_sentences[:10])
# ============================================================================
# REPORT GENERATION
# ============================================================================
def generate_report(company_name: str, context: str) -> str:
"""Generate a competitive analysis report"""
print("[ANALYSIS]: Generating report")
competitors = extract_competitors_from_context(context)
competitor_rows = ""
for i, competitor in enumerate(competitors[:3]):
competitor_rows += f"| {competitor} | Strategic insights | Pricing | Product | Market Position |\n"
if not competitor_rows:
competitor_rows = "| Competitor A | - | - | - | - |\n| Competitor B | - | - | - | - |\n| Competitor C | - | - | - | - |"
report = f"""
# Competitive Analysis Report: {company_name}
## Executive Summary
Comprehensive analysis of {company_name}'s competitive position based on market research and strategic data.
## Key Findings
- Industry position and market share indicators
- Competitor strategic approaches
- Differentiation opportunities
## Competitor Comparison
| Competitor | Strategy | Pricing | Product Focus | Market Position |
|------------|----------|---------|----------------|-----------------|
{competitor_rows}
## Strategic Insights for {company_name}
### Strengths to Leverage
- Define unique value propositions
- Identify operational advantages
- Highlight customer loyalty factors
### Competitive Opportunities
- Market gaps and underserved segments
- Innovation areas competitors are missing
- Customer pain points to address
### Recommendations
1. **Differentiation**: Develop distinct positioning vs competitors
2. **Innovation**: Invest in unique features and capabilities
3. **Customer Focus**: Enhance engagement and retention strategies
4. **Market Expansion**: Identify new market segments and geographies
5. **Efficiency**: Optimize operations to improve margins
### Next Steps
- Conduct detailed SWOT analysis
- Develop targeted competitor response strategies
- Monitor market movements and competitive activities
- Implement differentiation initiatives
---
*Report generated on {time.strftime('%Y-%m-%d %H:%M:%S')}*
"""
return report.strip()
def extract_competitors_from_context(context: str) -> list:
"""Extract competitor names from context string"""
competitors = []
if ", " in context:
potential_competitors = context.split(", ")
for comp in potential_competitors:
if comp and len(comp) > 2 and comp[0].isupper():
competitors.append(comp)
competitor_patterns = [
r'competitors?[:\s]+([^\.\n]+)',
r'top.*companies?[:\s]+([^\.\n]+)',
]
for pattern in competitor_patterns:
matches = re.findall(pattern, context, re.IGNORECASE)
for match in matches:
found_comps = re.split(r',|\band\b', match)
competitors.extend([comp.strip() for comp in found_comps if comp.strip()])
return list(set(competitors))[:5]
# ============================================================================
# COMPETITIVE ANALYSIS ENGINE (Consolidated from mcp_client.py)
# ============================================================================
class CompetitiveAnalysisAgent:
def __init__(self, openai_api_key: str):
"""Initialize the competitive analysis agent"""
self.client = OpenAI(api_key=openai_api_key)
self.model = "gpt-4"
self.system_prompt = """
You are an expert Competitive Analysis Agent. Your role is to:
1. Validate that the input company is a real business
2. Identify its primary industry sector
3. Discover its top 3 competitors
4. Gather strategic data about competitors (pricing, products, marketing)
5. Generate a comprehensive competitive analysis report with actionable insights
Use logical reasoning to gather information and synthesize insights.
Focus exclusively on the provided company and its top 3 competitors.
Generate insights that help the company outperform its competitors.
"""
def analyze_company(self, company_name: str) -> str:
"""Perform comprehensive competitive analysis for a company"""
print(f"\n{'='*60}")
print(f"Starting competitive analysis for: {company_name}")
print(f"{'='*60}\n")
try:
analysis_steps = []
# Step 1: Validate company
print("Step 1: Validating company...")
validation = validate_company(company_name)
analysis_steps.append(validation)
if "NOT" in validation and "VALID" not in validation:
return f"❌ Company validation failed:\n{validation}\n\nPlease check the company name and try again."
# Step 2: Identify sector
print("Step 2: Identifying sector...")
sector = identify_sector(company_name)
analysis_steps.append(f"Sector: {sector}")
# Step 3: Identify competitors
print("Step 3: Finding competitors...")
competitors = identify_competitors(sector, company_name)
analysis_steps.append(f"Competitors: {competitors}")
# Step 4: Generate report using OpenAI
print("Step 4: Generating strategic insights...")
context = "\n".join(analysis_steps)
messages = [
{
"role": "system",
"content": self.system_prompt
},
{
"role": "user",
"content": f"""
Based on this analysis so far:
{context}
Generate a detailed competitive analysis report for {company_name} including:
- Company overview and market position
- Top competitors analysis
- Competitive advantages and disadvantages
- If possible, specific strategic recommendations
Format as a professional Markdown report.
"""
}
]
response = self.client.chat.completions.create(
model=self.model,
messages=messages,
temperature=0.7,
max_tokens=2000,
)
# Combine analysis with OpenAI insights
openai_insights = response.choices[0].message.content
# Generate final report
report = generate_report(company_name, context)
# Append OpenAI insights
final_report = f"{report}\n\n## AI-Generated Strategic Insights\n\n{openai_insights}"
return final_report
except Exception as e:
return f"❌ Error during analysis: {str(e)}\n\nPlease check your API key and try again."
# ============================================================================
# GRADIO INTERFACE
# ============================================================================
def analyze_competitors_interface(company: str, openai_key: str) -> str:
"""Interface function for Gradio"""
# Validate inputs
if not company or len(company.strip()) < 2:
return "❌ **Error**: Please enter a valid company name."
if not openai_key or len(openai_key.strip()) < 10:
return "❌ **Error**: Please enter a valid OpenAI API key."
# Perform analysis
try:
agent = CompetitiveAnalysisAgent(openai_key)
report = agent.analyze_company(company)
return report
except Exception as e:
return f"❌ **Error during analysis**: {str(e)}\n\nPlease check your API key and try again."
def create_interface():
"""Create and configure the Gradio interface"""
with gr.Blocks(title="Competitive Analysis Agent") as demo:
gr.Markdown(
"""
# πŸ† Competitive Analysis Agent
Analyze competitors for any company using AI-powered research and strategic insights.
### How it works:
1. **Enter** a company name you want to analyze
2. **Provide** your OpenAI API key (kept securely, not stored)
3. **Click** "Analyze" to generate a comprehensive competitive analysis report
The agent will identify competitors, analyze their strategies, and provide actionable insights.
"""
)
with gr.Row():
with gr.Column(scale=1):
company_input = gr.Textbox(
label="Company Name",
placeholder="e.g., Tesla, Spotify, Microsoft",
lines=1
)
api_key_input = gr.Textbox(
label="OpenAI API Key",
placeholder="sk-...",
type="password",
lines=1
)
analyze_button = gr.Button(
"πŸ” Analyze Competitors",
variant="primary",
scale=1
)
with gr.Row():
output = gr.Markdown(
label="Competitive Analysis Report",
value="*Enter a company name and submit to generate analysis report...*"
)
# Set up button click action
analyze_button.click(
fn=analyze_competitors_interface,
inputs=[company_input, api_key_input],
outputs=output
)
# Allow Enter key to trigger analysis
company_input.submit(
fn=analyze_competitors_interface,
inputs=[company_input, api_key_input],
outputs=output
)
# Add footer with information
gr.Markdown(
"""
---
### πŸ“‹ What's Included in the Report:
- βœ… Company validation and industry sector identification
- βœ… Top 3 competitor identification
- βœ… Competitor strategy analysis and comparison
- βœ… Executive summary with key findings
- βœ… Actionable recommendations for competitive advantage
### πŸ”’ Privacy & Security:
Your OpenAI API key is **NEVER stored or logged**. It's used only for this analysis session.
### ⚑ Tips for Better Results:
- Use well-known company names for more accurate analysis
- The analysis is generated using latest market data and AI models
- For best results, provide accurate company names
"""
)
return demo
# ============================================================================
# MAIN ENTRY POINT
# ============================================================================
if __name__ == "__main__":
interface = create_interface()
interface.launch(
server_name="0.0.0.0",
server_port=7860,
share=False,
show_error=True,
theme=gr.themes.Soft()
)