""" DemoBuilder Class for managing demo preparation workflow state """ from typing import Dict, Optional, List from dataclasses import dataclass import json import zipfile import tempfile import os from datetime import datetime @dataclass class DemoBuilder: """Manages state and workflow for demo preparation process""" def __init__(self, use_case: str, company_url: str): self.use_case = use_case self.company_url = company_url # Workflow state self.current_stage = "research" # research, create, populate, complete self.stage_status = "ready" # ready, processing, complete # Raw prompt results for building next stage self.company_analysis_results = None self.industry_research_results = None self.combined_research_results = None self.schema_generation_results = None self.data_population_results = None # Processed/structured data self.website_data = None # Website object with content, CSS, logos self.database_schema = None self.demo_dataset = None self.demo_narrative = None # Prompt storage for tabs (original generated versions) self.company_analysis_prompt = None self.industry_research_prompt = None self.ddl_generation_prompt = None self.population_prompt = None # Edited prompt versions (for Expert mode) self.company_analysis_prompt_edited = None self.industry_research_prompt_edited = None self.ddl_generation_prompt_edited = None self.population_prompt_edited = None # Stage completion tracking self.stages_completed = { "research": False, "create": False, "populate": False, "deploy": False } # Asset generation self.generated_zip_path = None def get_current_button_text(self) -> str: """Get button text based on current stage and status""" if self.stage_status == "processing": if self.current_stage == "research": return "Researching..." elif self.current_stage == "create": return "Creating DDL..." elif self.current_stage == "populate": return "Creating Code..." elif self.current_stage == "deploy": return "Deploying..." else: if self.current_stage == "research": return "Start Research" elif self.current_stage == "create": return "Create DDL" elif self.current_stage == "populate": return "Create Population Code" elif self.current_stage == "deploy": return "Deploy" elif self.current_stage == "complete": return "DEPLOYED!" return "Start Research" def is_button_disabled(self) -> bool: """Check if button should be disabled""" return self.stage_status == "processing" def advance_stage(self): """Move to next stage after current stage completes""" self.stages_completed[self.current_stage] = True self.stage_status = "ready" if self.current_stage == "research": self.current_stage = "create" elif self.current_stage == "create": self.current_stage = "populate" elif self.current_stage == "populate": self.current_stage = "deploy" elif self.current_stage == "deploy": self.current_stage = "complete" def set_processing(self): """Mark current stage as processing""" self.stage_status = "processing" def set_ready(self): """Mark current stage as ready""" self.stage_status = "ready" def get_effective_prompt(self, prompt_type: str) -> str: """Get the effective prompt (edited version if exists, otherwise original)""" edited_prompt = getattr(self, f"{prompt_type}_edited", None) if edited_prompt: return edited_prompt return getattr(self, prompt_type, None) def save_edited_prompt(self, prompt_type: str, edited_content: str): """Save an edited version of a prompt""" setattr(self, f"{prompt_type}_edited", edited_content) def reset_prompt_to_original(self, prompt_type: str): """Reset prompt to original version by clearing edited version""" setattr(self, f"{prompt_type}_edited", None) def get_research_context(self) -> str: """Get combined research results for subsequent stages""" if self.combined_research_results: return self.combined_research_results context = "" if self.company_analysis_results: context += "COMPANY ANALYSIS:\n" + self.company_analysis_results + "\n\n" if self.industry_research_results: context += "INDUSTRY RESEARCH:\n" + self.industry_research_results + "\n\n" self.combined_research_results = context return context def get_schema_context(self) -> str: """Get schema results for data population stage""" if self.schema_generation_results: return f"DATABASE SCHEMA:\n{self.schema_generation_results}\n\nRESEARCH CONTEXT:\n{self.get_research_context()}" return self.get_research_context() def reset_stage(self, stage: str): """Reset specific stage (for redo functionality)""" if stage == "research": self.company_analysis_results = None self.industry_research_results = None self.combined_research_results = None self.current_stage = "research" elif stage == "create": self.schema_generation_results = None self.database_schema = None self.current_stage = "create" elif stage == "populate": self.data_population_results = None self.demo_dataset = None self.demo_narrative = None self.current_stage = "populate" self.stage_status = "ready" self.stages_completed[stage] = False def get_progress_display(self) -> str: """Get progress display string""" stages = [ ("Research", self.stages_completed["research"]), ("Create", self.stages_completed["create"]), ("Populate", self.stages_completed["populate"]) ] progress_parts = [] for stage_name, completed in stages: if completed: progress_parts.append(f"✓ {stage_name}") elif stage_name.lower() == self.current_stage: status = "Processing..." if self.stage_status == "processing" else "Current" progress_parts.append(f"→ {stage_name} ({status})") else: progress_parts.append(f"◦ {stage_name}") return " | ".join(progress_parts) def extract_company_name(self) -> str: """Extract clean company name from website data or URL""" if self.website_data and hasattr(self.website_data, 'title') and self.website_data.title: title = self.website_data.title # Skip placeholder/error titles - fall back to URL instead if title.lower() not in ['no title found', 'untitled', 'none', '']: # Clean up title - remove common suffixes name = title.replace(" - ", " ").split("|")[0].split("-")[0].strip() # Remove common words for remove_word in ["Home", "Welcome", "Official Site", "Website"]: name = name.replace(remove_word, "").strip() if name: # Only return if we got something useful return name[:30] # Limit length # Fallback to domain name (user's original input like "Amazon.com" -> "Amazon") domain = self.company_url.replace("https://", "").replace("http://", "").replace("www.", "").split("/")[0] return domain.split(".")[0].title() def generate_formatted_assets(self) -> str: """Generate ZIP file with formatted demo assets""" company_name = self.extract_company_name() timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") # Create temporary directory temp_dir = tempfile.mkdtemp() zip_filename = f"demo_prep_{company_name}_{self.use_case}_{timestamp}.zip" zip_path = os.path.join(temp_dir, zip_filename) with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: # 1. Company Analysis Summary if self.company_analysis_results: company_summary = self._format_company_summary() zipf.writestr("company_analysis.md", company_summary) # 2. Database Schema if self.schema_generation_results: schema_formatted = self._format_database_schema() zipf.writestr("database_schema.sql", schema_formatted) # 3. Data Population Script if self.data_population_results: data_script = self._format_data_script() zipf.writestr("data_population_script.py", data_script) # 4. Demo Crib Notes crib_notes = self._generate_crib_notes() zipf.writestr("demo_crib_notes.md", crib_notes) # 5. Assets Info if self.website_data: assets_info = self._format_assets_info() zipf.writestr("assets/brand_assets.md", assets_info) self.generated_zip_path = zip_path return zip_path def _format_company_summary(self) -> str: """Format company analysis for executive summary""" return f"""# Company Analysis Summary **Company:** {self.extract_company_name()} **Use Case:** {self.use_case} **Analysis Date:** {datetime.now().strftime("%Y-%m-%d")} ## Executive Summary {self.company_analysis_results[:500] if self.company_analysis_results else "Not available"}... ## Full Analysis {self.company_analysis_results or "Not available"} ## Industry Context {self.industry_research_results or "Not available"} """ def _format_database_schema(self) -> str: """Format database schema with headers""" return f"""-- Demo Database Schema -- Company: {self.extract_company_name()} -- Use Case: {self.use_case} -- Generated: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} {self.schema_generation_results or "Not available"} """ def _format_data_script(self) -> str: """Format data population script""" return f"""# Demo Data Population Script # Company: {self.extract_company_name()} # Use Case: {self.use_case} # Generated: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} {self.data_population_results or "Not available"} """ def _generate_crib_notes(self) -> str: """Generate demo crib notes with key talking points""" return f"""# Demo Crib Notes **Company:** {self.extract_company_name()} **Use Case:** {self.use_case} **Prepared:** {datetime.now().strftime("%Y-%m-%d")} ## Key Talking Points - Focus on {self.use_case} challenges - Highlight specific pain points identified in research - Demo shows real-world scenarios ## Demo Flow Suggestions 1. Start with business problem context 2. Show baseline data patterns 3. Highlight outlier scenarios 4. Connect insights to ROI ## Research Insights ### Company Analysis {self.company_analysis_results[:300] if self.company_analysis_results else "Not available"}... ### Industry Context {self.industry_research_results[:300] if self.industry_research_results else "Not available"}... ## Database Schema Overview {self.schema_generation_results[:200] if self.schema_generation_results else "Not available"}... ## Demo Data Scenarios {self.data_population_results[:300] if self.data_population_results else "Not available"}... """ def _format_assets_info(self) -> str: """Format visual assets information""" assets_content = f"""# Brand Assets **Company:** {self.extract_company_name()} **Website:** {self.company_url} ## Logo Candidates """ if self.website_data and hasattr(self.website_data, 'logo_candidates') and self.website_data.logo_candidates: for i, logo in enumerate(self.website_data.logo_candidates): assets_content += f"- Logo {i+1}: {logo.get('src', 'N/A')}\n" else: assets_content += "- No logo candidates found\n" assets_content += f"\n## CSS Resources\n" if self.website_data and hasattr(self.website_data, 'css_links') and self.website_data.css_links: for css in self.website_data.css_links[:5]: # Limit to first 5 assets_content += f"- {css}\n" else: assets_content += "- No CSS links found\n" return assets_content def get_formatted_tab_content(self, tab_type: str) -> str: """Get formatted content for specific tab""" if tab_type == "company": if self.company_analysis_results: return f"""## Company Analysis Summary {self.company_analysis_results} """ return "*Company analysis will appear here after research*" elif tab_type == "industry": if self.industry_research_results: return f"""## Industry Research Results {self.industry_research_results} """ return "*Industry research will appear here after research*" elif tab_type == "assets": if self.website_data: content = f"""## Brand Assets Extracted **Website Title:** {getattr(self.website_data, 'title', 'Unknown')} **URL:** {self.website_data.url if hasattr(self.website_data, 'url') else self.company_url} ### Logo Candidates Found """ if hasattr(self.website_data, 'logo_candidates') and self.website_data.logo_candidates: for i, logo in enumerate(self.website_data.logo_candidates): content += f"- **Logo {i+1}:** {logo.get('src', 'N/A')}\n" else: content += "- No logo candidates identified\n" content += f"\n### CSS Resources\n" if hasattr(self.website_data, 'css_links') and self.website_data.css_links: for css in self.website_data.css_links[:3]: content += f"- {css}\n" if len(self.website_data.css_links) > 3: content += f"""
... and {len(self.website_data.css_links) - 3} more CSS files (click to expand) """ for css in self.website_data.css_links[3:]: content += f"- {css}\n" content += "\n
\n" else: content += "- No CSS links found\n" return content return "*Asset information will appear here after research*" elif tab_type == "demo": if self.current_stage == "complete": return f"""## Demo Resources Generated ### Database Schema ```sql {self.schema_generation_results[:500] if self.schema_generation_results else "Not available"}... ``` ### Data Population Overview ```python {self.data_population_results[:500] if self.data_population_results else "Not available"}... ``` ### Download Assets Complete demo preparation package available for download above. """ return "*Generated demo tables and SQL scripts will appear here*" return "" def to_dict(self) -> Dict: """Convert to dictionary for JSON serialization""" return { "use_case": self.use_case, "company_url": self.company_url, "current_stage": self.current_stage, "stage_status": self.stage_status, "company_analysis_results": self.company_analysis_results, "industry_research_results": self.industry_research_results, "combined_research_results": self.combined_research_results, "schema_generation_results": self.schema_generation_results, "data_population_results": self.data_population_results, "stages_completed": self.stages_completed, "generated_zip_path": self.generated_zip_path } @classmethod def from_dict(cls, data: Dict) -> 'DemoBuilder': """Create instance from dictionary""" instance = cls(data["use_case"], data["company_url"]) instance.current_stage = data["current_stage"] instance.stage_status = data["stage_status"] instance.company_analysis_results = data.get("company_analysis_results") instance.industry_research_results = data.get("industry_research_results") instance.combined_research_results = data.get("combined_research_results") instance.schema_generation_results = data.get("schema_generation_results") instance.data_population_results = data.get("data_population_results") instance.stages_completed = data.get("stages_completed", {"research": False, "create": False, "populate": False}) instance.generated_zip_path = data.get("generated_zip_path") return instance