pranav8tripathi@gmail.com
fixed
cbdfb07
"""
FastAPI application for RivalLens - Competitor Intelligence API
"""
import os
import asyncio
import uuid
import logging
from datetime import datetime
from typing import List, Optional, Dict, Any
from app.data_sources.wikidata import WikidataClient
from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import List, Optional, Dict, Any
from app.config import settings
from app.services.search import search_adapter
from app.utils.logging_config import setup_logger
from app.models.schemas import CompanyData, CompetitorInsight, UserPayload, ReportResponse
from app.services.llm_client import llm
# Initialize logger
logger = setup_logger(__name__)
# Create FastAPI app
app = FastAPI(
title=settings.APP_NAME,
description="API for generating competitive intelligence reports",
version="1.0.0",
debug=settings.DEBUG
)
# Log application startup
logger.info(f"{settings.APP_NAME} v1.0.0 starting up...")
logger.info(f"Environment: {'development' if settings.DEBUG else 'production'}")
logger.info(f"API Key: {'Configured' if settings.DEEPSEEK_API_KEY else 'Not configured'}")
# Add CORS middleware with specific allowed origins
app.add_middleware(
CORSMiddleware,
allow_origins=[
"https://devyugensys-bizinsights-frontend.hf.space",
"https://devyugensys-bizinsight-rasa.hf.space",
"*"
],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
expose_headers=["Content-Disposition"] # For file downloads if any
)
# Constants for research modes
RESEARCH_MODES = {
"quick": {
"max_tokens": 250,
"time_estimate": "2-3 minutes",
"description": "Quick snapshot for rapid insights"
},
"standard": {
"max_tokens": 600,
"time_estimate": "5-7 minutes",
"description": "Balanced overview with key insights"
},
"deep": {
"max_tokens": 1100,
"time_estimate": "12-15 minutes",
"description": "Comprehensive research with detailed analysis"
},
"custom": {
"base_tokens": 100,
"per_insight_tokens": 200,
"time_estimate": "Variable",
"description": "Custom research based on selected areas"
}
}
# Helper functions
async def build_system_prompt(
company_name: str,
insight_selection: List[str],
research_mode: str = "standard",
deep_dive: Optional[List[str]] = None
) -> str:
"""Build a robust system prompt for the LLM to generate a single formatted summary.
Args:
company_name: Name of the company to analyze
insight_selection: List of insight categories to include
research_mode: One of 'quick', 'standard', 'deep', or 'custom'
deep_dive: Optional list of categories for deeper analysis
"""
# Validate research mode
if research_mode not in RESEARCH_MODES:
research_mode = "standard"
mode_info = RESEARCH_MODES[research_mode]
# Calculate token budget for custom mode
if research_mode == "custom":
max_tokens = mode_info["base_tokens"] + (len(insight_selection) * mode_info["per_insight_tokens"])
else:
max_tokens = mode_info["max_tokens"]
categories = ", ".join(insight_selection)
deep_dive_text = f"\nFor deeper analysis, focus on: {', '.join(deep_dive)}." if deep_dive else ""
# Map insight categories to their descriptions
category_descriptions = {
'company_profile': 'Overview of the company\'s history, leadership, and organizational structure',
'financials': 'Financial performance, revenue streams, and key financial metrics',
'market_position': 'Market share, competitive landscape, and industry positioning',
'competitors': 'Analysis of key competitors and competitive advantages',
'products_services': 'Overview of products and services offered',
'swot': 'Strengths, Weaknesses, Opportunities, and Threats analysis'
}
# Build the categories section
categories_section = "\n".join([
f"• {cat.replace('_', ' ').title()}: {category_descriptions.get(cat, '')}"
for cat in insight_selection
])
return (
"You are a competitive intelligence analyst. Generate a comprehensive analysis "
"as a structured, well-formatted text with emojis. Follow this structure for each category:\n\n"
"[Category Name]\n"
"Key Metrics\n"
"• [Relevant metrics with context, one per line]\n\n"
"Analysis\n"
"[1-2 paragraph analysis specific to this category]\n\n"
"Key Insights\n"
"• [Key insights, one per line]\n\n"
"Strategic Recommendations\n"
"• [Actionable recommendations, one per line]\n\n"
"Data Sources\n"
"• [Source Title] - [URL] (one per line, include all relevant sources)\n\n"
"Analysis Guidelines:\n"
f"- Follow the exact structure above, with separate sections for each category\n"
f"- Use the following categories in this exact order: {', '.join(insight_selection)}\n"
f"{deep_dive_text}\n"
"- Be specific and data-driven in your analysis\n"
"- Use bullet points for better readability\n"
"- Include relevant metrics with context\n"
"- Provide actionable recommendations\n"
"- Cite sources with titles and URLs\n"
"- Keep the tone professional but engaging"
)
async def generate_insights(
company: CompanyData,
categories: list,
business_name: str = "your business",
research_mode: str = "standard"
) -> CompetitorInsight:
"""Generate insights for a single company using the LLM.
Args:
company: The company data to analyze
categories: List of categories to focus the analysis on
business_name: Name of the business being analyzed (for context in the prompt)
research_mode: One of 'quick', 'standard', 'deep', or 'custom'
"""
try:
system_prompt = await build_system_prompt(company.name, categories, research_mode)
# Get research mode details
mode_info = RESEARCH_MODES.get(research_mode, RESEARCH_MODES["standard"])
# Prepare context based on research mode
analysis_scope = {
"quick": "Provide a concise analysis focusing on key highlights and immediate insights.",
"standard": "Provide a balanced analysis with key insights and recommendations.",
"deep": "Provide a comprehensive, detailed analysis with in-depth insights and strategic recommendations.",
"custom": f"Provide analysis focusing on the selected categories: {', '.join(categories)}."
}.get(research_mode, "Provide a balanced analysis.")
# Prepare user prompt with company data
user_prompt = (
f"Company Analysis: {company.name}\n\n"
f"Research Mode: {research_mode.capitalize()}\n"
f"{analysis_scope}\n\n"
f"Company Overview\n"
f"Description: {company.description or 'No description available'}\n"
f"Industry: {getattr(company, 'industry', 'Not specified')}\n"
f"Location: {getattr(company, 'location', 'Not specified')}\n"
f"CEO: {getattr(company, 'ceo', 'Not specified')}\n"
f"Founded: {getattr(company, 'founded', 'Not available')}\n\n"
f"Analysis Categories\n"
f"Please analyze this company's position in the following areas:\n"
f"• {chr(10)+'• '.join(categories)}\n\n"
f"Additional Context\n"
f"- Compare with {business_name} where relevant\n"
f"- Focus on specific, data-driven insights\n"
f"- Include actionable recommendations\n"
f"- Be concise yet comprehensive\n\n"
f"Output Format\n"
f"Please structure your response with clear headings for each category."
)
logger.info(f"Generating insights for {company.name}...")
# Get max_tokens based on research mode
mode_info = RESEARCH_MODES.get(research_mode, RESEARCH_MODES["standard"])
max_tokens = mode_info["max_tokens"] if "max_tokens" in mode_info else 2000
# Enhanced logging for better visibility
logger.info("\n" + "="*80)
logger.info(f"RESEARCH MODE: {research_mode.upper()}")
logger.info(f"MAX TOKENS: {max_tokens}")
logger.info(f"MODE INFO: {mode_info}")
logger.info("="*80 + "\n")
logger.info(f"Generating insights with max_tokens={max_tokens} for research_mode={research_mode}")
llm_response = await llm.summarize(system_prompt, user_prompt, max_tokens=max_tokens)
if not llm_response:
raise ValueError("Empty response received from LLM")
# Return a CompetitorInsight object with the formatted summary
return CompetitorInsight(
company=company,
summary=llm_response,
confidence="high", # Default confidence level
category_breakdown={cat: llm_response for cat in categories},
sources=[] # Will be populated from the sources in the response
)
except Exception as e:
error_msg = f"Failed to generate insights for {company.name}: {str(e)}"
logger.error(error_msg, exc_info=True)
raise
# API Endpoints
@app.get("/")
async def root():
"""Root endpoint with basic API information."""
return {
"app": settings.APP_NAME,
"status": "running",
"version": "1.0.0"
}
class CompanyDetailsRequest(BaseModel):
company_asked: str
class CompanyDetailsResponse(BaseModel):
name: str = "Not available"
location: str = "Not available"
industry: str = "Not available"
ceo: str = "Not available"
founded: str = "Not available"
details: str = "No details available"
@app.post("/api/v1/get-company-details", response_model=CompanyDetailsResponse)
async def get_company_details(request: CompanyDetailsRequest):
"""
Get more detailed information about a company using Wikidata.
Args:
request: CompanyDetailsRequest containing the company name to look up
Returns:
CompanyDetailsResponse with company information
"""
try:
# Get company details using the search adapter
company_data = await search_adapter.enrich_company(
company_name=request.company_asked,
citation_depth=1,
geography=None
)
# Ensure no None values are passed to the response model
return CompanyDetailsResponse(
name=str(company_data.get('name', request.company_asked)),
location=str(company_data.get('location', 'Not available')),
industry=str(company_data.get('industry', 'Not available')),
ceo=str(company_data.get('ceo', 'Not available')),
founded=str(company_data.get('founded', 'Not available')),
details=str(company_data.get('details', f"No details available for {request.company_asked}"))
)
except Exception as e:
logger.error(f"Error fetching company details: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Failed to fetch company details: {str(e)}"
)
class CompanyAnalysisRequest(BaseModel):
"""Request model for company analysis."""
company_info: Dict[str, Any]
research_mode: str = "standard"
insight_selection: List[str]
class Config:
schema_extra = {
"example": {
"company_info": {
"name": "Tesla, Inc.",
"location": "Austin, Texas, USA",
"industry": "Automotive & Energy",
"ceo": "Elon Musk",
"founded": 2003
},
"research_mode": "standard",
"insight_selection": [
"company_profile",
"financials",
"market_position",
"competitors"
]
}
}
@app.post("/api/v1/analyze", response_model=ReportResponse)
async def analyze_competitors(
payload: CompanyAnalysisRequest,
background_tasks: BackgroundTasks
):
"""
Main endpoint for company analysis with configurable research depth.
This endpoint:
1. Validates the research mode and insight selection
2. Searches for the company on Wikidata
3. Fetches detailed information based on selected insights
4. Generates insights using the LLM with appropriate detail level
Research Modes:
- quick: Brief overview (2-3 minutes)
- standard: Balanced analysis (5-7 minutes)
- deep: Comprehensive research (12-15 minutes)
- custom: Tailored to selected insights (variable time)
"""
# Initialize Wikidata client
wikidata = WikidataClient()
# Generate a unique request ID
request_id = str(uuid.uuid4())
# Validate research mode
if payload.research_mode not in RESEARCH_MODES:
raise HTTPException(
status_code=400,
detail=f"Invalid research mode. Must be one of: {', '.join(RESEARCH_MODES.keys())}"
)
# Define insights for each research mode
RESEARCH_INSIGHTS = {
"quick": [
"Company Profile",
"Competitors",
"Products",
"Latest News"
],
"standard": [
"Company Profile",
"Financials",
"Competitors",
"Leadership & Team",
"Funding",
"Products",
"Latest News",
"Strategy"
],
"deep": [
"Company Profile",
"Financials",
"Market Position",
"Competitors",
"Leadership & Team",
"Funding",
"Geographic Presence",
"Products",
"Latest News",
"Strategy",
"Risk Analysis",
"Growth Outlook"
]
}
# Set insights based on research mode
if payload.research_mode != "custom":
payload.insight_selection = RESEARCH_INSIGHTS[payload.research_mode]
elif not payload.insight_selection:
raise HTTPException(
status_code=400,
detail="For 'custom' research mode, at least one insight category must be selected in 'insight_selection'"
)
# Get company info
company_info = payload.company_info
company_name = company_info.get('name')
if not company_name:
raise HTTPException(
status_code=400,
detail="Company name is required in company_info"
)
try:
company_search = await wikidata.search_entity(company_name, limit=1)
if not company_search:
raise HTTPException(
status_code=404,
detail=f"Could not find company '{company_name}' on Wikidata"
)
company_id = company_search[0]['id']
# Step 2: Get company details with all available properties
company_details = await wikidata.get_entity_details(company_id)
# Prepare company data for analysis
company_data = {
'name': str(company_info.get('name', '')),
'description': str(company_info.get('description') or
company_details.get('descriptions', {}).get('en', {}).get('value', '')),
'industry': str(company_info.get('industry') or
', '.join(company_details.get('industry', []))),
'founded': str(company_info.get('founded') or
company_details.get('inception', 'Not available')),
'location': str(company_info.get('location') or
company_details.get('location', 'Not specified')),
'ceo': str(company_info.get('ceo') or
company_details.get('ceo', 'Not specified')),
'website': str(company_details.get('official_website', '')),
'metrics': {
'revenue': company_details.get('revenue'),
'employees': company_details.get('number_of_employees')
}
}
# Step 3: Get competitors if in insight selection
competitors = []
if 'competitors' in payload.insight_selection:
competitors = await wikidata.get_related_entities(
company_id,
relation_type='competitor', # P1592
limit=min(5, getattr(settings, 'MAX_COMPETITORS', 5))
)
# Add competitor data to company data
company_data['competitors'] = [
comp.get('labels', {}).get('en', {}).get('value', 'Unknown')
for comp in competitors[:5] # Limit to top 5
]
# Step 4: Generate insights using the LLM
company_insight = await generate_insights(
company=CompanyData(**company_data),
categories=payload.insight_selection,
business_name=company_name,
research_mode=payload.research_mode
)
# Step 5: Generate competitor insights if needed
competitor_insights = []
if 'competitors' in payload.insight_selection and competitors:
competitor_tasks = []
for comp in competitors[:3]: # Limit to top 3 competitors for analysis
try:
comp_details = await wikidata.get_entity_details(comp['id'])
comp_data = {
'name': str(comp.get('labels', {}).get('en', {}).get('value', 'Unknown')),
'description': str(comp_details.get('descriptions', {}).get('en', {}).get('value', '')),
'industry': str(', '.join(comp_details.get('industry', []))),
'founded': str(comp_details.get('inception', '')),
'location': str(comp_details.get('location', 'Unknown')),
'website': str(comp_details.get('official_website', '')),
'metrics': {
'revenue': comp_details.get('revenue'),
'employees': comp_details.get('number_of_employees')
}
}
task = generate_insights(
company=CompanyData(**comp_data),
categories=payload.insight_selection,
business_name=company_name,
research_mode=payload.research_mode
)
competitor_tasks.append(task)
except Exception as e:
logger.warning(f"Failed to process competitor {comp.get('id')}: {str(e)}")
# Run competitor analysis in parallel
if competitor_tasks:
competitor_insights = await asyncio.gather(*competitor_tasks, return_exceptions=True)
competitor_insights = [ci for ci in competitor_insights if not isinstance(ci, Exception)]
# Prepare the response with company and competitor insights
combined_summary = f"# {company_name} Analysis Report\n\n"
# Add report metadata
mode_info = RESEARCH_MODES.get(payload.research_mode, {})
combined_summary += f"Report Overview\n"
combined_summary += f"- Research Mode: {payload.research_mode.capitalize()} ({mode_info.get('description', '')})\n"
combined_summary += f"- Generated At: {datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S UTC')}\n"
combined_summary += f"- Analysis Scope: {', '.join(payload.insight_selection)}\n\n"
# Add company insights
combined_summary += "Company Analysis\n"
combined_summary += company_insight.summary + "\n\n"
# Add competitor insights if available
if competitor_insights:
combined_summary += "Competitive Analysis\n\n"
for i, insight in enumerate(competitor_insights, 1):
if insight and insight.summary:
combined_summary += f"{insight.company.name}\n"
# Add a brief summary of the competitor
combined_summary += f"*{insight.company.description or 'No description available'}*\n\n"
# Add key insights (first 2-3 bullet points)
points = [line.strip() for line in insight.summary.split('•') if line.strip()][:3]
if points:
combined_summary += "Key Insights:\n"
for point in points:
combined_summary += f"• {point.strip()}\n"
combined_summary += "\n"
# Add summary and recommendations section
combined_summary += "Summary & Recommendations\n\n"
# Add key takeaways based on research mode
combined_summary += "Key Takeaways\n\n"
if payload.research_mode == "quick":
combined_summary += f"Quick snapshot of {company_name}'s current position and immediate opportunities.\n\n"
elif payload.research_mode == "standard":
combined_summary += f"Comprehensive analysis of {company_name}'s market position and strategic outlook.\n\n"
elif payload.research_mode == "deep":
combined_summary += f"In-depth analysis of {company_name} with detailed strategic insights and long-term projections.\n\n"
else: # custom
combined_summary += f"Focused analysis of {company_name} with emphasis on: {', '.join(payload.insight_selection)}.\n\n"
# Add strategic recommendations (only if any exist)
# The LLM will include this section if there are recommendations in its response
# Add data sources section
combined_summary += "Data Sources\n\n"
combined_summary += "This report was generated using data from the following sources:\n"
combined_summary += "1. Wikidata - Free and open knowledge base\n"
combined_summary += "2. Public Company Data - Various public sources\n\n"
# Add disclaimer
combined_summary += "*Note: This report is for informational purposes only and should not be considered as financial or investment advice.*\n"
# Check if PDF export is requested
pdf_url = None
if hasattr(payload, 'preferences') and payload.preferences and hasattr(payload.preferences, 'export_format') and payload.preferences.export_format == 'pdf':
pdf_url = f"/api/v1/exports/{request_id}.pdf"
# In a real implementation, you would generate the PDF here
# background_tasks.add_task(generate_pdf_export, request_id, combined_summary, [])
# Prepare the final response
report = ReportResponse(
request_id=request_id,
company_name=company_name,
generated_at=datetime.utcnow().isoformat(),
summary=combined_summary,
metrics=company_insight.metrics if hasattr(company_insight, 'metrics') else [],
insights=[], # Individual insights are included in the summary
recommendations=[], # Recommendations are included in the summary
sources=[], # Sources are included in the summary
pdf_url=pdf_url
)
return report
except HTTPException:
raise
except Exception as e:
logger.error(f"Error in analyze_competitors: {str(e)}", exc_info=True)
raise HTTPException(
status_code=500,
detail=f"An error occurred while analyzing competitors: {str(e)}"
)
async def generate_pdf_export(request_id: str, report_data: dict, charts: list):
"""Background task to generate and store PDF report."""
# In a real implementation, you would:
# 1. Generate the PDF
# 2. Store it in a persistent storage (S3, filesystem, etc.)
# 3. Update the report status in your database
pass
# Example usage
if __name__ == "__main__":
import uvicorn
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)