# At the top of utils.py import re import json import streamlit as st import pandas as pd import plotly.express as px from datetime import datetime from langchain_openai import ChatOpenAI from reportlab.lib import colors from reportlab.lib.pagesizes import letter, landscape from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, PageBreak from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle def update_progress(container, percentage, message=""): if container: progress_bar = container.progress(percentage / 100) container.write(message) def extract_metrics(text): """Extract metrics from text with error handling""" llm = ChatOpenAI(temperature=0, model="gpt-4") metrics_prompt = """Extract the following metrics as JSON from the text: - Market size (with currency) - CAGR (%) - Market leader's share (%) - Number of key players - Key regions - Dominant segment Text: {text} Return in JSON format with these exact keys: market_size, cagr, leader_share, key_players, key_regions, dominant_segment If a metric is not found, use "N/A" as the value.""" try: response = llm.invoke(metrics_prompt.format(text=text)) if response and response.content: json_str = re.search(r'\{.*\}', response.content, re.DOTALL) if json_str: return json.loads(json_str.group()) except Exception as e: st.error(f"Error extracting metrics: {str(e)}") # Return default metrics if extraction fails return { 'market_size': 'N/A', 'cagr': 'N/A', 'leader_share': 'N/A', 'key_players': 'N/A', 'key_regions': 'N/A', 'dominant_segment': 'N/A' } def enhance_report_with_gpt4(base_report, topic): """Enhance report with GPT-4""" try: llm = ChatOpenAI(temperature=0.7, model="gpt-4") if not base_report: return "No base report provided to enhance." prompt = f"""Create a professional market research report for {topic} based on this research: {base_report} Structure the report with: # Executive Summary - Brief overview - Key findings - Market highlights # Market Overview - Current market size and growth - Geographic distribution - Market segmentation # Competitive Analysis - Key players and market shares - Competitive strategies - SWOT analysis # Market Dynamics - Growth drivers - Market challenges - Entry barriers # Industry Trends - Technology trends - Consumer behavior - Regulatory landscape # Future Outlook - Market projections - Emerging opportunities - Risk factors # Strategic Recommendations - Short-term strategies - Long-term opportunities - Risk mitigation""" response = llm.invoke(prompt) return response.content if response else base_report except Exception as e: st.error(f"Error enhancing report: {str(e)}") return base_report def generate_visual_data(metrics): try: # Prepare data for visualizations market_data = { 'Market Size': metrics.get('market_size', 'N/A'), 'CAGR': metrics.get('cagr', 'N/A'), 'Leader Share': metrics.get('leader_share', 'N/A'), 'Key Players': metrics.get('key_players', 'N/A') } return market_data except Exception as e: st.error(f"Error generating visualizations: {str(e)}") return {} def process_crew_output(crew_result, topic): try: # Initialize default outputs agent_outputs = { 'researcher': { 'raw_output': '', 'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), 'analysis_type': 'Market Research' }, 'analyst': { 'raw_output': '', 'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), 'analysis_type': 'Data Analysis' }, 'writer': { 'raw_output': '', 'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), 'analysis_type': 'Report Writing' } } # Get base report base_report = str(crew_result) if crew_result else "No report generated" # Extract individual agent outputs from crew_result if hasattr(crew_result, 'tasks'): for task in crew_result.tasks: if 'research' in task.agent.role.lower(): agent_outputs['researcher']['raw_output'] = task.output if task.output else "No research output available" elif 'analyst' in task.agent.role.lower(): agent_outputs['analyst']['raw_output'] = task.output if task.output else "No analysis output available" elif 'writer' in task.agent.role.lower(): agent_outputs['writer']['raw_output'] = task.output if task.output else "No writer output available" # Extract metrics with error handling try: metrics = extract_metrics(base_report) except Exception as e: st.warning(f"Warning extracting metrics: {str(e)}") metrics = { 'market_size': 'N/A', 'cagr': 'N/A', 'leader_share': 'N/A', 'key_players': 'N/A', 'key_regions': 'N/A', 'dominant_segment': 'N/A' } # Generate enhanced report content try: enhanced_content = enhance_report_with_gpt4(base_report, topic) except Exception as e: st.warning(f"Warning enhancing report: {str(e)}") enhanced_content = base_report # Extract market data try: market_data = extract_market_data(base_report) except Exception as e: st.warning(f"Warning extracting market data: {str(e)}") market_data = { "marketShares": [], "growthTrend": [], "regionalDistribution": [], "techAdoption": [], "keyPlayers": [] } return { 'metrics': metrics, 'content': enhanced_content, 'raw': base_report, 'agent_outputs': agent_outputs, 'market_data': market_data } except Exception as e: st.error(f"Error processing report: {str(e)}") # Return default structure return { 'metrics': { 'market_size': 'N/A', 'cagr': 'N/A', 'leader_share': 'N/A', 'key_players': 'N/A', 'key_regions': 'N/A', 'dominant_segment': 'N/A' }, 'content': "Error generating report content", 'raw': str(crew_result) if crew_result else "No report generated", 'agent_outputs': agent_outputs, 'market_data': { "marketShares": [], "growthTrend": [], "regionalDistribution": [], "techAdoption": [], "keyPlayers": [] } } def extract_market_data(text): """Extract structured market data for visualizations""" llm = ChatOpenAI(temperature=0, model="gpt-4") data_prompt = """Extract the following data points in JSON format: 1. Market shares of key players 2. Growth trends over years 3. Regional distribution 4. Technology adoption rates 5. Company profiles with recent developments Text: {text} Return as JSON with these keys: { "marketShares": [{"company": "Company Name", "share": number}], "growthTrend": [{"year": "YYYY", "growth": number}], "regionalDistribution": [{"region": "Region Name", "share": number}], "techAdoption": [{"name": "Tech Name", "adoptionRate": number}], "keyPlayers": [{"company": "Company Name", "marketShare": number, "strengths": "text", "developments": "text"}] } Use "N/A" for missing values.""" try: response = llm.invoke(data_prompt.format(text=text)) if response and response.content: json_str = re.search(r'\{.*\}', response.content, re.DOTALL) if json_str: return json.loads(json_str.group()) except Exception as e: st.error(f"Error extracting market data: {str(e)}") # Return default structure if extraction fails return { "marketShares": [], "growthTrend": [], "regionalDistribution": [], "techAdoption": [], "keyPlayers": [] } def display_presentation_slide(slide, slide_num, total_slides): """Display a single presentation slide""" st.markdown(f"## {slide['title']}") if slide['type'] == 'title': st.markdown(f"

{slide['title']}

", unsafe_allow_html=True) st.markdown(slide['content']) elif slide['type'] == 'metrics': col1, col2, col3 = st.columns(3) metrics = slide['content'] with col1: st.metric("Market Size", metrics.get('market_size', 'N/A')) st.metric("CAGR", metrics.get('cagr', 'N/A')) with col2: st.metric("Market Leader Share", metrics.get('leader_share', 'N/A')) st.metric("Key Players", metrics.get('key_players', 'N/A')) with col3: st.metric("Key Region", metrics.get('key_regions', 'N/A')) st.metric("Dominant Segment", metrics.get('dominant_segment', 'N/A')) elif slide['type'] == 'chart': if slide['chart_type'] == 'pie' and slide['content']: fig = px.pie( pd.DataFrame(slide['content']), values='share', names='company', title='Market Share Distribution' ) st.plotly_chart(fig, use_container_width=True) elif slide['chart_type'] == 'bar' and slide['content']: fig = px.bar( pd.DataFrame(slide['content']), x='region', y='share', title='Regional Distribution' ) st.plotly_chart(fig, use_container_width=True) elif slide['type'] == 'text': st.markdown(slide['content']) # Navigation controls col1, col2, col3 = st.columns([1, 2, 1]) with col1: if slide_num > 0: st.button("← Previous", key=f"prev_{slide_num}", help="Go to previous slide") with col2: st.markdown(f"
Slide {slide_num + 1} of {total_slides}
", unsafe_allow_html=True) with col3: if slide_num < total_slides - 1: st.button("Next →", key=f"next_{slide_num}", help="Go to next slide") def display_report(report_data): try: # Display key metrics in tiles st.write("### 📊 Key Market Insights") metrics = report_data.get('metrics', {}) # Create metric tiles in a grid col1, col2, col3 = st.columns(3) with col1: st.markdown("""

Market Size

{}

CAGR: {}

""".format( metrics.get('market_size', 'N/A'), metrics.get('cagr', 'N/A') ), unsafe_allow_html=True) with col2: st.markdown("""

Market Leadership

{}

Key Players: {}

""".format( metrics.get('leader_share', 'N/A'), metrics.get('key_players', 'N/A') ), unsafe_allow_html=True) with col3: st.markdown("""

Regional Focus

{}

Dominant Segment: {}

""".format( metrics.get('key_regions', 'N/A'), metrics.get('dominant_segment', 'N/A') ), unsafe_allow_html=True) # Create tabs for different sections of the report report_tabs = st.tabs([ "Executive Summary", "Market Analysis", "Competitive Landscape", "Regional Analysis", "Future Outlook" ]) # Split content into sections content = report_data.get('content', '') sections = content.split('#') with report_tabs[0]: st.markdown("""
""", unsafe_allow_html=True) st.markdown(sections[1] if len(sections) > 1 else "Executive Summary not available") st.markdown("
", unsafe_allow_html=True) with report_tabs[1]: st.markdown("""
""", unsafe_allow_html=True) st.markdown(sections[2] if len(sections) > 2 else "Market Analysis not available") st.markdown("
", unsafe_allow_html=True) # Add market visualizations if available if 'market_data' in report_data: display_market_visualizations(report_data['market_data']) with report_tabs[2]: st.markdown("""
""", unsafe_allow_html=True) st.markdown(sections[3] if len(sections) > 3 else "Competitive Landscape not available") st.markdown("
", unsafe_allow_html=True) # Add competitor table if available if 'market_data' in report_data and 'keyPlayers' in report_data['market_data']: display_competitor_table(report_data['market_data']['keyPlayers']) with report_tabs[3]: st.markdown("""
""", unsafe_allow_html=True) st.markdown(sections[4] if len(sections) > 4 else "Regional Analysis not available") st.markdown("
", unsafe_allow_html=True) with report_tabs[4]: st.markdown("""
""", unsafe_allow_html=True) st.markdown(sections[5] if len(sections) > 5 else "Future Outlook not available") st.markdown("
", unsafe_allow_html=True) except Exception as e: st.error(f"Error displaying report: {str(e)}") def display_market_visualizations(market_data): """Display market visualizations using Plotly""" if 'marketShares' in market_data: fig = px.pie( market_data['marketShares'], values='share', names='company', title='Market Share Distribution' ) st.plotly_chart(fig, use_container_width=True) def display_competitor_table(competitors_data): """Display competitor information in a styled table""" if competitors_data: df = pd.DataFrame(competitors_data) st.dataframe( df, column_config={ "company": "Company", "marketShare": st.column_config.NumberColumn( "Market Share (%)", format="%.1f%%" ), "strengths": "Key Strengths", "developments": "Recent Developments" }, use_container_width=True, hide_index=True ) def apply_report_styling(): return """ """ def extract_sources(text): pattern = r'(?:Source|Reference):\s*(.*?)(?:\n|$)' sources = re.findall(pattern, text, re.IGNORECASE) return sources if sources else ["Sources not explicitly mentioned"]