cryogenic22's picture
Update utils.py
20e7e83 verified
# At the top of utils.py
import re
import json
import streamlit as st
import pandas as pd
import plotly.express as px
from datetime import datetime
from langchain_openai import ChatOpenAI
from reportlab.lib import colors
from reportlab.lib.pagesizes import letter, landscape
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, PageBreak
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
def update_progress(container, percentage, message=""):
if container:
progress_bar = container.progress(percentage / 100)
container.write(message)
def extract_metrics(text):
"""Extract metrics from text with error handling"""
llm = ChatOpenAI(temperature=0, model="gpt-4")
metrics_prompt = """Extract the following metrics as JSON from the text:
- Market size (with currency)
- CAGR (%)
- Market leader's share (%)
- Number of key players
- Key regions
- Dominant segment
Text: {text}
Return in JSON format with these exact keys:
market_size, cagr, leader_share, key_players, key_regions, dominant_segment
If a metric is not found, use "N/A" as the value."""
try:
response = llm.invoke(metrics_prompt.format(text=text))
if response and response.content:
json_str = re.search(r'\{.*\}', response.content, re.DOTALL)
if json_str:
return json.loads(json_str.group())
except Exception as e:
st.error(f"Error extracting metrics: {str(e)}")
# Return default metrics if extraction fails
return {
'market_size': 'N/A',
'cagr': 'N/A',
'leader_share': 'N/A',
'key_players': 'N/A',
'key_regions': 'N/A',
'dominant_segment': 'N/A'
}
def enhance_report_with_gpt4(base_report, topic):
"""Enhance report with GPT-4"""
try:
llm = ChatOpenAI(temperature=0.7, model="gpt-4")
if not base_report:
return "No base report provided to enhance."
prompt = f"""Create a professional market research report for {topic} based on this research:
{base_report}
Structure the report with:
# Executive Summary
- Brief overview
- Key findings
- Market highlights
# Market Overview
- Current market size and growth
- Geographic distribution
- Market segmentation
# Competitive Analysis
- Key players and market shares
- Competitive strategies
- SWOT analysis
# Market Dynamics
- Growth drivers
- Market challenges
- Entry barriers
# Industry Trends
- Technology trends
- Consumer behavior
- Regulatory landscape
# Future Outlook
- Market projections
- Emerging opportunities
- Risk factors
# Strategic Recommendations
- Short-term strategies
- Long-term opportunities
- Risk mitigation"""
response = llm.invoke(prompt)
return response.content if response else base_report
except Exception as e:
st.error(f"Error enhancing report: {str(e)}")
return base_report
def generate_visual_data(metrics):
try:
# Prepare data for visualizations
market_data = {
'Market Size': metrics.get('market_size', 'N/A'),
'CAGR': metrics.get('cagr', 'N/A'),
'Leader Share': metrics.get('leader_share', 'N/A'),
'Key Players': metrics.get('key_players', 'N/A')
}
return market_data
except Exception as e:
st.error(f"Error generating visualizations: {str(e)}")
return {}
def process_crew_output(crew_result, topic):
try:
# Initialize default outputs
agent_outputs = {
'researcher': {
'raw_output': '',
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'analysis_type': 'Market Research'
},
'analyst': {
'raw_output': '',
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'analysis_type': 'Data Analysis'
},
'writer': {
'raw_output': '',
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'analysis_type': 'Report Writing'
}
}
# Get base report
base_report = str(crew_result) if crew_result else "No report generated"
# Extract individual agent outputs from crew_result
if hasattr(crew_result, 'tasks'):
for task in crew_result.tasks:
if 'research' in task.agent.role.lower():
agent_outputs['researcher']['raw_output'] = task.output if task.output else "No research output available"
elif 'analyst' in task.agent.role.lower():
agent_outputs['analyst']['raw_output'] = task.output if task.output else "No analysis output available"
elif 'writer' in task.agent.role.lower():
agent_outputs['writer']['raw_output'] = task.output if task.output else "No writer output available"
# Extract metrics with error handling
try:
metrics = extract_metrics(base_report)
except Exception as e:
st.warning(f"Warning extracting metrics: {str(e)}")
metrics = {
'market_size': 'N/A',
'cagr': 'N/A',
'leader_share': 'N/A',
'key_players': 'N/A',
'key_regions': 'N/A',
'dominant_segment': 'N/A'
}
# Generate enhanced report content
try:
enhanced_content = enhance_report_with_gpt4(base_report, topic)
except Exception as e:
st.warning(f"Warning enhancing report: {str(e)}")
enhanced_content = base_report
# Extract market data
try:
market_data = extract_market_data(base_report)
except Exception as e:
st.warning(f"Warning extracting market data: {str(e)}")
market_data = {
"marketShares": [],
"growthTrend": [],
"regionalDistribution": [],
"techAdoption": [],
"keyPlayers": []
}
return {
'metrics': metrics,
'content': enhanced_content,
'raw': base_report,
'agent_outputs': agent_outputs,
'market_data': market_data
}
except Exception as e:
st.error(f"Error processing report: {str(e)}")
# Return default structure
return {
'metrics': {
'market_size': 'N/A',
'cagr': 'N/A',
'leader_share': 'N/A',
'key_players': 'N/A',
'key_regions': 'N/A',
'dominant_segment': 'N/A'
},
'content': "Error generating report content",
'raw': str(crew_result) if crew_result else "No report generated",
'agent_outputs': agent_outputs,
'market_data': {
"marketShares": [],
"growthTrend": [],
"regionalDistribution": [],
"techAdoption": [],
"keyPlayers": []
}
}
def extract_market_data(text):
"""Extract structured market data for visualizations"""
llm = ChatOpenAI(temperature=0, model="gpt-4")
data_prompt = """Extract the following data points in JSON format:
1. Market shares of key players
2. Growth trends over years
3. Regional distribution
4. Technology adoption rates
5. Company profiles with recent developments
Text: {text}
Return as JSON with these keys:
{
"marketShares": [{"company": "Company Name", "share": number}],
"growthTrend": [{"year": "YYYY", "growth": number}],
"regionalDistribution": [{"region": "Region Name", "share": number}],
"techAdoption": [{"name": "Tech Name", "adoptionRate": number}],
"keyPlayers": [{"company": "Company Name", "marketShare": number, "strengths": "text", "developments": "text"}]
}
Use "N/A" for missing values."""
try:
response = llm.invoke(data_prompt.format(text=text))
if response and response.content:
json_str = re.search(r'\{.*\}', response.content, re.DOTALL)
if json_str:
return json.loads(json_str.group())
except Exception as e:
st.error(f"Error extracting market data: {str(e)}")
# Return default structure if extraction fails
return {
"marketShares": [],
"growthTrend": [],
"regionalDistribution": [],
"techAdoption": [],
"keyPlayers": []
}
def display_presentation_slide(slide, slide_num, total_slides):
"""Display a single presentation slide"""
st.markdown(f"## {slide['title']}")
if slide['type'] == 'title':
st.markdown(f"<div style='text-align: center; padding: 20px;'><h1>{slide['title']}</h1></div>",
unsafe_allow_html=True)
st.markdown(slide['content'])
elif slide['type'] == 'metrics':
col1, col2, col3 = st.columns(3)
metrics = slide['content']
with col1:
st.metric("Market Size", metrics.get('market_size', 'N/A'))
st.metric("CAGR", metrics.get('cagr', 'N/A'))
with col2:
st.metric("Market Leader Share", metrics.get('leader_share', 'N/A'))
st.metric("Key Players", metrics.get('key_players', 'N/A'))
with col3:
st.metric("Key Region", metrics.get('key_regions', 'N/A'))
st.metric("Dominant Segment", metrics.get('dominant_segment', 'N/A'))
elif slide['type'] == 'chart':
if slide['chart_type'] == 'pie' and slide['content']:
fig = px.pie(
pd.DataFrame(slide['content']),
values='share',
names='company',
title='Market Share Distribution'
)
st.plotly_chart(fig, use_container_width=True)
elif slide['chart_type'] == 'bar' and slide['content']:
fig = px.bar(
pd.DataFrame(slide['content']),
x='region',
y='share',
title='Regional Distribution'
)
st.plotly_chart(fig, use_container_width=True)
elif slide['type'] == 'text':
st.markdown(slide['content'])
# Navigation controls
col1, col2, col3 = st.columns([1, 2, 1])
with col1:
if slide_num > 0:
st.button("← Previous", key=f"prev_{slide_num}",
help="Go to previous slide")
with col2:
st.markdown(f"<div style='text-align: center;'>Slide {slide_num + 1} of {total_slides}</div>",
unsafe_allow_html=True)
with col3:
if slide_num < total_slides - 1:
st.button("Next →", key=f"next_{slide_num}",
help="Go to next slide")
def display_report(report_data):
try:
# Display key metrics in tiles
st.write("### 📊 Key Market Insights")
metrics = report_data.get('metrics', {})
# Create metric tiles in a grid
col1, col2, col3 = st.columns(3)
with col1:
st.markdown("""
<div style='background-color: #f0f7ff; padding: 20px; border-radius: 10px; height: 150px;'>
<h4 style='color: #1e88e5;'>Market Size</h4>
<h2>{}</h2>
<p>CAGR: {}</p>
</div>
""".format(
metrics.get('market_size', 'N/A'),
metrics.get('cagr', 'N/A')
), unsafe_allow_html=True)
with col2:
st.markdown("""
<div style='background-color: #fff8e1; padding: 20px; border-radius: 10px; height: 150px;'>
<h4 style='color: #ffa000;'>Market Leadership</h4>
<h2>{}</h2>
<p>Key Players: {}</p>
</div>
""".format(
metrics.get('leader_share', 'N/A'),
metrics.get('key_players', 'N/A')
), unsafe_allow_html=True)
with col3:
st.markdown("""
<div style='background-color: #e8f5e9; padding: 20px; border-radius: 10px; height: 150px;'>
<h4 style='color: #43a047;'>Regional Focus</h4>
<h2>{}</h2>
<p>Dominant Segment: {}</p>
</div>
""".format(
metrics.get('key_regions', 'N/A'),
metrics.get('dominant_segment', 'N/A')
), unsafe_allow_html=True)
# Create tabs for different sections of the report
report_tabs = st.tabs([
"Executive Summary",
"Market Analysis",
"Competitive Landscape",
"Regional Analysis",
"Future Outlook"
])
# Split content into sections
content = report_data.get('content', '')
sections = content.split('#')
with report_tabs[0]:
st.markdown("""
<div style='background-color: white; padding: 20px; border-radius: 10px; border-left: 5px solid #1e88e5;'>
""", unsafe_allow_html=True)
st.markdown(sections[1] if len(sections) > 1 else "Executive Summary not available")
st.markdown("</div>", unsafe_allow_html=True)
with report_tabs[1]:
st.markdown("""
<div style='background-color: white; padding: 20px; border-radius: 10px; border-left: 5px solid #43a047;'>
""", unsafe_allow_html=True)
st.markdown(sections[2] if len(sections) > 2 else "Market Analysis not available")
st.markdown("</div>", unsafe_allow_html=True)
# Add market visualizations if available
if 'market_data' in report_data:
display_market_visualizations(report_data['market_data'])
with report_tabs[2]:
st.markdown("""
<div style='background-color: white; padding: 20px; border-radius: 10px; border-left: 5px solid #ffa000;'>
""", unsafe_allow_html=True)
st.markdown(sections[3] if len(sections) > 3 else "Competitive Landscape not available")
st.markdown("</div>", unsafe_allow_html=True)
# Add competitor table if available
if 'market_data' in report_data and 'keyPlayers' in report_data['market_data']:
display_competitor_table(report_data['market_data']['keyPlayers'])
with report_tabs[3]:
st.markdown("""
<div style='background-color: white; padding: 20px; border-radius: 10px; border-left: 5px solid #e91e63;'>
""", unsafe_allow_html=True)
st.markdown(sections[4] if len(sections) > 4 else "Regional Analysis not available")
st.markdown("</div>", unsafe_allow_html=True)
with report_tabs[4]:
st.markdown("""
<div style='background-color: white; padding: 20px; border-radius: 10px; border-left: 5px solid #9c27b0;'>
""", unsafe_allow_html=True)
st.markdown(sections[5] if len(sections) > 5 else "Future Outlook not available")
st.markdown("</div>", unsafe_allow_html=True)
except Exception as e:
st.error(f"Error displaying report: {str(e)}")
def display_market_visualizations(market_data):
"""Display market visualizations using Plotly"""
if 'marketShares' in market_data:
fig = px.pie(
market_data['marketShares'],
values='share',
names='company',
title='Market Share Distribution'
)
st.plotly_chart(fig, use_container_width=True)
def display_competitor_table(competitors_data):
"""Display competitor information in a styled table"""
if competitors_data:
df = pd.DataFrame(competitors_data)
st.dataframe(
df,
column_config={
"company": "Company",
"marketShare": st.column_config.NumberColumn(
"Market Share (%)",
format="%.1f%%"
),
"strengths": "Key Strengths",
"developments": "Recent Developments"
},
use_container_width=True,
hide_index=True
)
def apply_report_styling():
return """
<style>
/* Executive Summary Box */
.executive-summary {
background-color: #f8f9fa;
border-left: 5px solid #0d6efd;
padding: 20px;
margin: 20px 0;
border-radius: 5px;
box-shadow: 0 2px 4px rgba(0,0,0,0.1);
}
/* Key Findings Box */
.key-findings {
background-color: #e7f5ff;
border: 1px solid #74c0fc;
padding: 20px;
margin: 20px 0;
border-radius: 5px;
}
/* Section Boxes */
.section-box {
background-color: white;
border: 1px solid #dee2e6;
border-radius: 5px;
padding: 20px;
margin: 20px 0;
box-shadow: 0 2px 4px rgba(0,0,0,0.05);
}
/* Headers */
h1 {
color: #0d6efd;
font-size: 2.5em;
font-weight: 700;
margin-bottom: 30px;
padding-bottom: 10px;
border-bottom: 3px solid #0d6efd;
}
h2 {
color: #1a73e8;
font-size: 2em;
font-weight: 600;
margin-top: 40px;
margin-bottom: 20px;
}
h3 {
color: #2c3e50;
font-size: 1.5em;
font-weight: 500;
margin-top: 30px;
margin-bottom: 15px;
}
/* Lists */
.bullet-points {
background-color: #f8f9fa;
padding: 15px 30px;
border-radius: 5px;
margin: 10px 0;
}
/* Metrics Dashboard */
.metrics-container {
background: white;
padding: 20px;
border-radius: 10px;
box-shadow: 0 4px 6px rgba(0,0,0,0.1);
margin: 20px 0;
}
/* Key Players Section */
.key-players {
background-color: #f1f8ff;
padding: 20px;
border-radius: 5px;
margin: 20px 0;
}
.player-card {
background: white;
padding: 15px;
margin: 10px 0;
border-radius: 5px;
box-shadow: 0 2px 4px rgba(0,0,0,0.05);
}
/* Tables */
table {
width: 100%;
border-collapse: collapse;
margin: 20px 0;
}
th, td {
padding: 12px;
border: 1px solid #dee2e6;
}
th {
background-color: #f8f9fa;
font-weight: 600;
}
</style>
"""
def extract_sources(text):
pattern = r'(?:Source|Reference):\s*(.*?)(?:\n|$)'
sources = re.findall(pattern, text, re.IGNORECASE)
return sources if sources else ["Sources not explicitly mentioned"]