Spaces:
Running
Running
| """ | |
| DemoBuilder Class for managing demo preparation workflow state | |
| """ | |
| from typing import Dict, Optional, List | |
| from dataclasses import dataclass | |
| import json | |
| import zipfile | |
| import tempfile | |
| import os | |
| from datetime import datetime | |
| class DemoBuilder: | |
| """Manages state and workflow for demo preparation process""" | |
| def __init__(self, use_case: str, company_url: str): | |
| self.use_case = use_case | |
| self.company_url = company_url | |
| # Workflow state | |
| self.current_stage = "research" # research, create, populate, complete | |
| self.stage_status = "ready" # ready, processing, complete | |
| # Raw prompt results for building next stage | |
| self.company_analysis_results = None | |
| self.industry_research_results = None | |
| self.combined_research_results = None | |
| self.schema_generation_results = None | |
| self.data_population_results = None | |
| # Processed/structured data | |
| self.website_data = None # Website object with content, CSS, logos | |
| self.database_schema = None | |
| self.demo_dataset = None | |
| self.demo_narrative = None | |
| # Prompt storage for tabs (original generated versions) | |
| self.company_analysis_prompt = None | |
| self.industry_research_prompt = None | |
| self.ddl_generation_prompt = None | |
| self.population_prompt = None | |
| # Edited prompt versions (for Expert mode) | |
| self.company_analysis_prompt_edited = None | |
| self.industry_research_prompt_edited = None | |
| self.ddl_generation_prompt_edited = None | |
| self.population_prompt_edited = None | |
| # Stage completion tracking | |
| self.stages_completed = { | |
| "research": False, | |
| "create": False, | |
| "populate": False, | |
| "deploy": False | |
| } | |
| # Asset generation | |
| self.generated_zip_path = None | |
| def get_current_button_text(self) -> str: | |
| """Get button text based on current stage and status""" | |
| if self.stage_status == "processing": | |
| if self.current_stage == "research": | |
| return "Researching..." | |
| elif self.current_stage == "create": | |
| return "Creating DDL..." | |
| elif self.current_stage == "populate": | |
| return "Creating Code..." | |
| elif self.current_stage == "deploy": | |
| return "Deploying..." | |
| else: | |
| if self.current_stage == "research": | |
| return "Start Research" | |
| elif self.current_stage == "create": | |
| return "Create DDL" | |
| elif self.current_stage == "populate": | |
| return "Create Population Code" | |
| elif self.current_stage == "deploy": | |
| return "Deploy" | |
| elif self.current_stage == "complete": | |
| return "DEPLOYED!" | |
| return "Start Research" | |
| def is_button_disabled(self) -> bool: | |
| """Check if button should be disabled""" | |
| return self.stage_status == "processing" | |
| def advance_stage(self): | |
| """Move to next stage after current stage completes""" | |
| self.stages_completed[self.current_stage] = True | |
| self.stage_status = "ready" | |
| if self.current_stage == "research": | |
| self.current_stage = "create" | |
| elif self.current_stage == "create": | |
| self.current_stage = "populate" | |
| elif self.current_stage == "populate": | |
| self.current_stage = "deploy" | |
| elif self.current_stage == "deploy": | |
| self.current_stage = "complete" | |
| def set_processing(self): | |
| """Mark current stage as processing""" | |
| self.stage_status = "processing" | |
| def set_ready(self): | |
| """Mark current stage as ready""" | |
| self.stage_status = "ready" | |
| def get_effective_prompt(self, prompt_type: str) -> str: | |
| """Get the effective prompt (edited version if exists, otherwise original)""" | |
| edited_prompt = getattr(self, f"{prompt_type}_edited", None) | |
| if edited_prompt: | |
| return edited_prompt | |
| return getattr(self, prompt_type, None) | |
| def save_edited_prompt(self, prompt_type: str, edited_content: str): | |
| """Save an edited version of a prompt""" | |
| setattr(self, f"{prompt_type}_edited", edited_content) | |
| def reset_prompt_to_original(self, prompt_type: str): | |
| """Reset prompt to original version by clearing edited version""" | |
| setattr(self, f"{prompt_type}_edited", None) | |
| def get_research_context(self) -> str: | |
| """Get combined research results for subsequent stages""" | |
| if self.combined_research_results: | |
| return self.combined_research_results | |
| context = "" | |
| if self.company_analysis_results: | |
| context += "COMPANY ANALYSIS:\n" + self.company_analysis_results + "\n\n" | |
| if self.industry_research_results: | |
| context += "INDUSTRY RESEARCH:\n" + self.industry_research_results + "\n\n" | |
| self.combined_research_results = context | |
| return context | |
| def get_schema_context(self) -> str: | |
| """Get schema results for data population stage""" | |
| if self.schema_generation_results: | |
| return f"DATABASE SCHEMA:\n{self.schema_generation_results}\n\nRESEARCH CONTEXT:\n{self.get_research_context()}" | |
| return self.get_research_context() | |
| def reset_stage(self, stage: str): | |
| """Reset specific stage (for redo functionality)""" | |
| if stage == "research": | |
| self.company_analysis_results = None | |
| self.industry_research_results = None | |
| self.combined_research_results = None | |
| self.current_stage = "research" | |
| elif stage == "create": | |
| self.schema_generation_results = None | |
| self.database_schema = None | |
| self.current_stage = "create" | |
| elif stage == "populate": | |
| self.data_population_results = None | |
| self.demo_dataset = None | |
| self.demo_narrative = None | |
| self.current_stage = "populate" | |
| self.stage_status = "ready" | |
| self.stages_completed[stage] = False | |
| def get_progress_display(self) -> str: | |
| """Get progress display string""" | |
| stages = [ | |
| ("Research", self.stages_completed["research"]), | |
| ("Create", self.stages_completed["create"]), | |
| ("Populate", self.stages_completed["populate"]) | |
| ] | |
| progress_parts = [] | |
| for stage_name, completed in stages: | |
| if completed: | |
| progress_parts.append(f"✓ {stage_name}") | |
| elif stage_name.lower() == self.current_stage: | |
| status = "Processing..." if self.stage_status == "processing" else "Current" | |
| progress_parts.append(f"→ {stage_name} ({status})") | |
| else: | |
| progress_parts.append(f"◦ {stage_name}") | |
| return " | ".join(progress_parts) | |
| def extract_company_name(self) -> str: | |
| """Extract clean company name from website data or URL""" | |
| if self.website_data and hasattr(self.website_data, 'title') and self.website_data.title: | |
| title = self.website_data.title | |
| # Skip placeholder/error titles - fall back to URL instead | |
| if title.lower() not in ['no title found', 'untitled', 'none', '']: | |
| # Clean up title - remove common suffixes | |
| name = title.replace(" - ", " ").split("|")[0].split("-")[0].strip() | |
| # Remove common words | |
| for remove_word in ["Home", "Welcome", "Official Site", "Website"]: | |
| name = name.replace(remove_word, "").strip() | |
| if name: # Only return if we got something useful | |
| return name[:30] # Limit length | |
| # Fallback to domain name (user's original input like "Amazon.com" -> "Amazon") | |
| domain = self.company_url.replace("https://", "").replace("http://", "").replace("www.", "").split("/")[0] | |
| return domain.split(".")[0].title() | |
| def generate_formatted_assets(self) -> str: | |
| """Generate ZIP file with formatted demo assets""" | |
| company_name = self.extract_company_name() | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| # Create temporary directory | |
| temp_dir = tempfile.mkdtemp() | |
| zip_filename = f"demo_prep_{company_name}_{self.use_case}_{timestamp}.zip" | |
| zip_path = os.path.join(temp_dir, zip_filename) | |
| with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: | |
| # 1. Company Analysis Summary | |
| if self.company_analysis_results: | |
| company_summary = self._format_company_summary() | |
| zipf.writestr("company_analysis.md", company_summary) | |
| # 2. Database Schema | |
| if self.schema_generation_results: | |
| schema_formatted = self._format_database_schema() | |
| zipf.writestr("database_schema.sql", schema_formatted) | |
| # 3. Data Population Script | |
| if self.data_population_results: | |
| data_script = self._format_data_script() | |
| zipf.writestr("data_population_script.py", data_script) | |
| # 4. Demo Crib Notes | |
| crib_notes = self._generate_crib_notes() | |
| zipf.writestr("demo_crib_notes.md", crib_notes) | |
| # 5. Assets Info | |
| if self.website_data: | |
| assets_info = self._format_assets_info() | |
| zipf.writestr("assets/brand_assets.md", assets_info) | |
| self.generated_zip_path = zip_path | |
| return zip_path | |
| def _format_company_summary(self) -> str: | |
| """Format company analysis for executive summary""" | |
| return f"""# Company Analysis Summary | |
| **Company:** {self.extract_company_name()} | |
| **Use Case:** {self.use_case} | |
| **Analysis Date:** {datetime.now().strftime("%Y-%m-%d")} | |
| ## Executive Summary | |
| {self.company_analysis_results[:500] if self.company_analysis_results else "Not available"}... | |
| ## Full Analysis | |
| {self.company_analysis_results or "Not available"} | |
| ## Industry Context | |
| {self.industry_research_results or "Not available"} | |
| """ | |
| def _format_database_schema(self) -> str: | |
| """Format database schema with headers""" | |
| return f"""-- Demo Database Schema | |
| -- Company: {self.extract_company_name()} | |
| -- Use Case: {self.use_case} | |
| -- Generated: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} | |
| {self.schema_generation_results or "Not available"} | |
| """ | |
| def _format_data_script(self) -> str: | |
| """Format data population script""" | |
| return f"""# Demo Data Population Script | |
| # Company: {self.extract_company_name()} | |
| # Use Case: {self.use_case} | |
| # Generated: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} | |
| {self.data_population_results or "Not available"} | |
| """ | |
| def _generate_crib_notes(self) -> str: | |
| """Generate demo crib notes with key talking points""" | |
| return f"""# Demo Crib Notes | |
| **Company:** {self.extract_company_name()} | |
| **Use Case:** {self.use_case} | |
| **Prepared:** {datetime.now().strftime("%Y-%m-%d")} | |
| ## Key Talking Points | |
| - Focus on {self.use_case} challenges | |
| - Highlight specific pain points identified in research | |
| - Demo shows real-world scenarios | |
| ## Demo Flow Suggestions | |
| 1. Start with business problem context | |
| 2. Show baseline data patterns | |
| 3. Highlight outlier scenarios | |
| 4. Connect insights to ROI | |
| ## Research Insights | |
| ### Company Analysis | |
| {self.company_analysis_results[:300] if self.company_analysis_results else "Not available"}... | |
| ### Industry Context | |
| {self.industry_research_results[:300] if self.industry_research_results else "Not available"}... | |
| ## Database Schema Overview | |
| {self.schema_generation_results[:200] if self.schema_generation_results else "Not available"}... | |
| ## Demo Data Scenarios | |
| {self.data_population_results[:300] if self.data_population_results else "Not available"}... | |
| """ | |
| def _format_assets_info(self) -> str: | |
| """Format visual assets information""" | |
| assets_content = f"""# Brand Assets | |
| **Company:** {self.extract_company_name()} | |
| **Website:** {self.company_url} | |
| ## Logo Candidates | |
| """ | |
| if self.website_data and hasattr(self.website_data, 'logo_candidates') and self.website_data.logo_candidates: | |
| for i, logo in enumerate(self.website_data.logo_candidates): | |
| assets_content += f"- Logo {i+1}: {logo.get('src', 'N/A')}\n" | |
| else: | |
| assets_content += "- No logo candidates found\n" | |
| assets_content += f"\n## CSS Resources\n" | |
| if self.website_data and hasattr(self.website_data, 'css_links') and self.website_data.css_links: | |
| for css in self.website_data.css_links[:5]: # Limit to first 5 | |
| assets_content += f"- {css}\n" | |
| else: | |
| assets_content += "- No CSS links found\n" | |
| return assets_content | |
| def get_formatted_tab_content(self, tab_type: str) -> str: | |
| """Get formatted content for specific tab""" | |
| if tab_type == "company": | |
| if self.company_analysis_results: | |
| return f"""## Company Analysis Summary | |
| {self.company_analysis_results} | |
| """ | |
| return "*Company analysis will appear here after research*" | |
| elif tab_type == "industry": | |
| if self.industry_research_results: | |
| return f"""## Industry Research Results | |
| {self.industry_research_results} | |
| """ | |
| return "*Industry research will appear here after research*" | |
| elif tab_type == "assets": | |
| if self.website_data: | |
| content = f"""## Brand Assets Extracted | |
| **Website Title:** {getattr(self.website_data, 'title', 'Unknown')} | |
| **URL:** {self.website_data.url if hasattr(self.website_data, 'url') else self.company_url} | |
| ### Logo Candidates Found | |
| """ | |
| if hasattr(self.website_data, 'logo_candidates') and self.website_data.logo_candidates: | |
| for i, logo in enumerate(self.website_data.logo_candidates): | |
| content += f"- **Logo {i+1}:** {logo.get('src', 'N/A')}\n" | |
| else: | |
| content += "- No logo candidates identified\n" | |
| content += f"\n### CSS Resources\n" | |
| if hasattr(self.website_data, 'css_links') and self.website_data.css_links: | |
| for css in self.website_data.css_links[:3]: | |
| content += f"- {css}\n" | |
| if len(self.website_data.css_links) > 3: | |
| content += f""" | |
| <details> | |
| <summary><strong>... and {len(self.website_data.css_links) - 3} more CSS files (click to expand)</strong></summary> | |
| """ | |
| for css in self.website_data.css_links[3:]: | |
| content += f"- {css}\n" | |
| content += "\n</details>\n" | |
| else: | |
| content += "- No CSS links found\n" | |
| return content | |
| return "*Asset information will appear here after research*" | |
| elif tab_type == "demo": | |
| if self.current_stage == "complete": | |
| return f"""## Demo Resources Generated | |
| ### Database Schema | |
| ```sql | |
| {self.schema_generation_results[:500] if self.schema_generation_results else "Not available"}... | |
| ``` | |
| ### Data Population Overview | |
| ```python | |
| {self.data_population_results[:500] if self.data_population_results else "Not available"}... | |
| ``` | |
| ### Download Assets | |
| Complete demo preparation package available for download above. | |
| """ | |
| return "*Generated demo tables and SQL scripts will appear here*" | |
| return "" | |
| def to_dict(self) -> Dict: | |
| """Convert to dictionary for JSON serialization""" | |
| return { | |
| "use_case": self.use_case, | |
| "company_url": self.company_url, | |
| "current_stage": self.current_stage, | |
| "stage_status": self.stage_status, | |
| "company_analysis_results": self.company_analysis_results, | |
| "industry_research_results": self.industry_research_results, | |
| "combined_research_results": self.combined_research_results, | |
| "schema_generation_results": self.schema_generation_results, | |
| "data_population_results": self.data_population_results, | |
| "stages_completed": self.stages_completed, | |
| "generated_zip_path": self.generated_zip_path | |
| } | |
| def from_dict(cls, data: Dict) -> 'DemoBuilder': | |
| """Create instance from dictionary""" | |
| instance = cls(data["use_case"], data["company_url"]) | |
| instance.current_stage = data["current_stage"] | |
| instance.stage_status = data["stage_status"] | |
| instance.company_analysis_results = data.get("company_analysis_results") | |
| instance.industry_research_results = data.get("industry_research_results") | |
| instance.combined_research_results = data.get("combined_research_results") | |
| instance.schema_generation_results = data.get("schema_generation_results") | |
| instance.data_population_results = data.get("data_population_results") | |
| instance.stages_completed = data.get("stages_completed", {"research": False, "create": False, "populate": False}) | |
| instance.generated_zip_path = data.get("generated_zip_path") | |
| return instance |