Spaces:
Build error
Build error
| # AI-Powered Document Intelligence System for NAVADA | |
| """ | |
| Advanced document intelligence system providing: | |
| - Smart content suggestions while editing documents | |
| - Auto-completion of financial projections based on industry data | |
| - Compliance checking for regulatory requirements | |
| - Risk assessment with real-time scoring | |
| - Version control with diff tracking | |
| """ | |
| import json | |
| import re | |
| from datetime import datetime | |
| from typing import Dict, List, Optional, Any, Tuple | |
| import pandas as pd | |
| import numpy as np | |
| from openai import OpenAI | |
| import asyncio | |
| import logging | |
| from difflib import SequenceMatcher | |
| import hashlib | |
| class DocumentIntelligenceEngine: | |
| """AI-powered document intelligence and assistance system.""" | |
| def __init__(self, openai_client: OpenAI): | |
| self.openai_client = openai_client | |
| self.document_versions = {} | |
| self.compliance_rules = self._load_compliance_rules() | |
| self.industry_benchmarks = self._load_industry_benchmarks() | |
| self.risk_factors = self._load_risk_factors() | |
| def _load_compliance_rules(self) -> Dict[str, List[str]]: | |
| """Load regulatory compliance rules by document type.""" | |
| return { | |
| 'business_case': [ | |
| 'Include forward-looking statement disclaimers', | |
| 'Verify market size claims with sources', | |
| 'Ensure financial projections include assumptions', | |
| 'Include risk disclosures for material factors' | |
| ], | |
| 'investor_memo': [ | |
| 'Include securities law disclaimers', | |
| 'Verify accredited investor requirements', | |
| 'Ensure material risk disclosures', | |
| 'Include subscription agreement references' | |
| ], | |
| 'term_sheet': [ | |
| 'Verify liquidation preference terms', | |
| 'Include anti-dilution provisions', | |
| 'Specify board composition clearly', | |
| 'Include standard protective provisions' | |
| ], | |
| 'executive_summary': [ | |
| 'Include company formation jurisdiction', | |
| 'Verify intellectual property claims', | |
| 'Include material contract disclosures', | |
| 'Ensure competitive landscape accuracy' | |
| ] | |
| } | |
| def _load_industry_benchmarks(self) -> Dict[str, Dict[str, Any]]: | |
| """Load industry benchmark data for auto-completion.""" | |
| return { | |
| 'saas': { | |
| 'gross_margin': {'min': 65, 'median': 75, 'max': 85}, | |
| 'churn_rate': {'min': 3, 'median': 7, 'max': 15}, | |
| 'cac_ltv_ratio': {'min': 3, 'median': 5, 'max': 8}, | |
| 'growth_rate': {'min': 20, 'median': 50, 'max': 100}, | |
| 'burn_multiple': {'min': 1.2, 'median': 2.0, 'max': 3.5} | |
| }, | |
| 'fintech': { | |
| 'gross_margin': {'min': 45, 'median': 60, 'max': 80}, | |
| 'customer_acquisition_cost': {'min': 50, 'median': 200, 'max': 500}, | |
| 'transaction_volume_growth': {'min': 30, 'median': 80, 'max': 150}, | |
| 'regulatory_capital_ratio': {'min': 8, 'median': 12, 'max': 20} | |
| }, | |
| 'ecommerce': { | |
| 'gross_margin': {'min': 20, 'median': 35, 'max': 60}, | |
| 'conversion_rate': {'min': 1, 'median': 3, 'max': 8}, | |
| 'average_order_value': {'min': 25, 'median': 75, 'max': 200}, | |
| 'customer_lifetime_value': {'min': 100, 'median': 300, 'max': 800} | |
| }, | |
| 'biotech': { | |
| 'rd_expense_ratio': {'min': 40, 'median': 70, 'max': 90}, | |
| 'clinical_trial_success_rate': {'min': 10, 'median': 25, 'max': 45}, | |
| 'time_to_market': {'min': 5, 'median': 8, 'max': 12}, | |
| 'patent_portfolio_size': {'min': 5, 'median': 25, 'max': 100} | |
| } | |
| } | |
| def _load_risk_factors(self) -> Dict[str, List[Dict[str, Any]]]: | |
| """Load common risk factors by industry/stage.""" | |
| return { | |
| 'early_stage': [ | |
| {'risk': 'Market Risk', 'probability': 0.7, 'impact': 'high', | |
| 'description': 'Unproven market demand for product/service'}, | |
| {'risk': 'Execution Risk', 'probability': 0.6, 'impact': 'high', | |
| 'description': 'Team may lack experience in scaling operations'}, | |
| {'risk': 'Funding Risk', 'probability': 0.5, 'impact': 'critical', | |
| 'description': 'Difficulty raising subsequent funding rounds'}, | |
| {'risk': 'Technology Risk', 'probability': 0.4, 'impact': 'medium', | |
| 'description': 'Technical challenges in product development'} | |
| ], | |
| 'growth_stage': [ | |
| {'risk': 'Competition Risk', 'probability': 0.8, 'impact': 'high', | |
| 'description': 'Increased competition from established players'}, | |
| {'risk': 'Scaling Risk', 'probability': 0.6, 'impact': 'high', | |
| 'description': 'Challenges in scaling operations efficiently'}, | |
| {'risk': 'Regulatory Risk', 'probability': 0.4, 'impact': 'medium', | |
| 'description': 'Changing regulatory environment'}, | |
| {'risk': 'Key Person Risk', 'probability': 0.3, 'impact': 'high', | |
| 'description': 'Dependence on key management personnel'} | |
| ] | |
| } | |
| async def analyze_document_content(self, content: str, document_type: str, | |
| industry: str = None) -> Dict[str, Any]: | |
| """Analyze document content and provide intelligent suggestions.""" | |
| try: | |
| analysis_results = { | |
| 'content_analysis': await self._analyze_content_quality(content, document_type), | |
| 'compliance_check': self._check_compliance(content, document_type), | |
| 'risk_assessment': self._assess_risks(content, industry), | |
| 'completion_suggestions': await self._generate_completion_suggestions(content, document_type, industry), | |
| 'improvement_suggestions': await self._generate_improvement_suggestions(content, document_type), | |
| 'readability_score': self._calculate_readability_score(content), | |
| 'timestamp': datetime.now().isoformat() | |
| } | |
| return analysis_results | |
| except Exception as e: | |
| logging.error(f"Document analysis error: {e}") | |
| return {'error': str(e)} | |
| async def _analyze_content_quality(self, content: str, document_type: str) -> Dict[str, Any]: | |
| """Analyze content quality using AI.""" | |
| try: | |
| prompt = f""" | |
| Analyze this {document_type} document content for quality, completeness, and professionalism. | |
| Content: {content[:3000]}... | |
| Provide analysis in this JSON format: | |
| {{ | |
| "completeness_score": 0.85, | |
| "professionalism_score": 0.92, | |
| "clarity_score": 0.78, | |
| "missing_sections": ["Financial Projections", "Risk Analysis"], | |
| "strengths": ["Clear problem statement", "Strong market analysis"], | |
| "weaknesses": ["Vague revenue model", "Limited competitive analysis"], | |
| "overall_score": 0.85 | |
| }} | |
| """ | |
| response = await asyncio.to_thread( | |
| self.openai_client.chat.completions.create, | |
| model="gpt-4", | |
| messages=[{"role": "user", "content": prompt}], | |
| temperature=0.3 | |
| ) | |
| try: | |
| analysis = json.loads(response.choices[0].message.content) | |
| return analysis | |
| except json.JSONDecodeError: | |
| # Fallback to basic analysis | |
| return self._basic_content_analysis(content, document_type) | |
| except Exception as e: | |
| logging.error(f"AI content analysis error: {e}") | |
| return self._basic_content_analysis(content, document_type) | |
| def _basic_content_analysis(self, content: str, document_type: str) -> Dict[str, Any]: | |
| """Basic content analysis without AI.""" | |
| word_count = len(content.split()) | |
| # Basic scoring based on content length and structure | |
| completeness_score = min(1.0, word_count / 2000) # Assume 2000 words is complete | |
| # Check for key sections | |
| key_sections = { | |
| 'business_case': ['executive summary', 'problem', 'solution', 'market', 'financial'], | |
| 'investor_memo': ['investment', 'team', 'market', 'traction', 'financial'], | |
| 'term_sheet': ['valuation', 'investment', 'liquidation', 'board', 'rights'] | |
| } | |
| sections = key_sections.get(document_type, []) | |
| found_sections = sum(1 for section in sections if section in content.lower()) | |
| section_score = found_sections / len(sections) if sections else 0.5 | |
| return { | |
| 'completeness_score': completeness_score, | |
| 'professionalism_score': 0.7, # Default | |
| 'clarity_score': section_score, | |
| 'missing_sections': [s for s in sections if s not in content.lower()], | |
| 'strengths': ['Document structure present'], | |
| 'weaknesses': ['Needs AI analysis for detailed feedback'], | |
| 'overall_score': (completeness_score + section_score) / 2 | |
| } | |
| def _check_compliance(self, content: str, document_type: str) -> Dict[str, Any]: | |
| """Check document compliance with regulatory requirements.""" | |
| rules = self.compliance_rules.get(document_type, []) | |
| compliance_results = { | |
| 'total_rules': len(rules), | |
| 'compliant_count': 0, | |
| 'violations': [], | |
| 'warnings': [], | |
| 'compliance_score': 0.0 | |
| } | |
| content_lower = content.lower() | |
| # Check each compliance rule | |
| for rule in rules: | |
| is_compliant = False | |
| if 'disclaimer' in rule.lower(): | |
| is_compliant = any(term in content_lower for term in | |
| ['disclaimer', 'forward-looking', 'risk', 'projection']) | |
| elif 'source' in rule.lower(): | |
| is_compliant = any(term in content_lower for term in | |
| ['source', 'reference', 'data from', 'according to']) | |
| elif 'assumption' in rule.lower(): | |
| is_compliant = any(term in content_lower for term in | |
| ['assumption', 'estimate', 'projection', 'forecast']) | |
| elif 'risk' in rule.lower(): | |
| is_compliant = any(term in content_lower for term in | |
| ['risk', 'uncertainty', 'challenge', 'limitation']) | |
| else: | |
| # Default check for key terms | |
| is_compliant = True | |
| if is_compliant: | |
| compliance_results['compliant_count'] += 1 | |
| else: | |
| compliance_results['violations'].append(rule) | |
| compliance_results['compliance_score'] = ( | |
| compliance_results['compliant_count'] / compliance_results['total_rules'] | |
| if compliance_results['total_rules'] > 0 else 1.0 | |
| ) | |
| return compliance_results | |
| def _assess_risks(self, content: str, industry: str = None) -> Dict[str, Any]: | |
| """Assess risks mentioned in document and suggest additional ones.""" | |
| content_lower = content.lower() | |
| # Detect mentioned risks | |
| mentioned_risks = [] | |
| risk_keywords = { | |
| 'market risk': ['market', 'demand', 'customer', 'competition'], | |
| 'technology risk': ['technology', 'technical', 'development', 'infrastructure'], | |
| 'financial risk': ['financial', 'funding', 'cash', 'revenue'], | |
| 'regulatory risk': ['regulatory', 'compliance', 'legal', 'policy'], | |
| 'execution risk': ['execution', 'operational', 'management', 'team'], | |
| 'competitive risk': ['competitive', 'competition', 'competitor', 'market share'] | |
| } | |
| for risk_type, keywords in risk_keywords.items(): | |
| if any(keyword in content_lower for keyword in keywords): | |
| mentioned_risks.append(risk_type) | |
| # Suggest additional risks based on stage/industry | |
| stage = 'early_stage' if 'startup' in content_lower or 'early' in content_lower else 'growth_stage' | |
| suggested_risks = self.risk_factors.get(stage, []) | |
| # Calculate overall risk score | |
| total_possible_risks = len(risk_keywords) | |
| risk_coverage = len(mentioned_risks) / total_possible_risks | |
| return { | |
| 'mentioned_risks': mentioned_risks, | |
| 'suggested_additional_risks': suggested_risks[:3], # Top 3 suggestions | |
| 'risk_coverage_score': risk_coverage, | |
| 'risk_level': 'high' if risk_coverage < 0.4 else 'medium' if risk_coverage < 0.7 else 'low', | |
| 'recommendations': self._generate_risk_recommendations(mentioned_risks, suggested_risks) | |
| } | |
| def _generate_risk_recommendations(self, mentioned_risks: List[str], | |
| suggested_risks: List[Dict]) -> List[str]: | |
| """Generate risk-related recommendations.""" | |
| recommendations = [] | |
| if len(mentioned_risks) < 3: | |
| recommendations.append("Consider adding more comprehensive risk analysis") | |
| if 'financial risk' not in mentioned_risks: | |
| recommendations.append("Include financial and funding risks in your analysis") | |
| if 'regulatory risk' not in mentioned_risks: | |
| recommendations.append("Assess potential regulatory and compliance risks") | |
| # Add suggestions based on highest probability risks | |
| high_prob_risks = [r for r in suggested_risks if r['probability'] > 0.6] | |
| if high_prob_risks: | |
| recommendations.append(f"Pay special attention to {high_prob_risks[0]['risk'].lower()}") | |
| return recommendations | |
| async def _generate_completion_suggestions(self, content: str, document_type: str, | |
| industry: str = None) -> Dict[str, Any]: | |
| """Generate smart completion suggestions based on industry benchmarks.""" | |
| suggestions = { | |
| 'financial_metrics': [], | |
| 'market_sizing': [], | |
| 'competitive_analysis': [], | |
| 'growth_projections': [] | |
| } | |
| # Get industry benchmarks if available | |
| if industry and industry.lower() in self.industry_benchmarks: | |
| benchmarks = self.industry_benchmarks[industry.lower()] | |
| # Generate financial metric suggestions | |
| if 'gross margin' not in content.lower(): | |
| margin_data = benchmarks.get('gross_margin', {}) | |
| if margin_data: | |
| suggestions['financial_metrics'].append({ | |
| 'metric': 'Gross Margin', | |
| 'suggested_range': f"{margin_data['min']}-{margin_data['max']}%", | |
| 'industry_median': f"{margin_data['median']}%", | |
| 'context': f"Typical for {industry} companies" | |
| }) | |
| # Generate growth projection suggestions | |
| growth_data = benchmarks.get('growth_rate', {}) | |
| if growth_data and 'growth' in content.lower(): | |
| suggestions['growth_projections'].append({ | |
| 'metric': 'Annual Growth Rate', | |
| 'suggested_range': f"{growth_data['min']}-{growth_data['max']}%", | |
| 'industry_median': f"{growth_data['median']}%", | |
| 'context': f"Based on {industry} industry benchmarks" | |
| }) | |
| # Add market sizing suggestions | |
| if 'market' in content.lower() and 'tam' not in content.lower(): | |
| suggestions['market_sizing'].append({ | |
| 'suggestion': 'Include TAM/SAM/SOM analysis', | |
| 'template': 'Total Addressable Market (TAM): $X billion\nServiceable Addressable Market (SAM): $Y billion\nServiceable Obtainable Market (SOM): $Z million', | |
| 'priority': 'high' | |
| }) | |
| return suggestions | |
| async def _generate_improvement_suggestions(self, content: str, | |
| document_type: str) -> List[Dict[str, Any]]: | |
| """Generate AI-powered improvement suggestions.""" | |
| try: | |
| prompt = f""" | |
| Review this {document_type} content and suggest 3-5 specific improvements. | |
| Focus on structure, clarity, persuasiveness, and completeness. | |
| Content: {content[:2000]}... | |
| Provide suggestions in this JSON format: | |
| {{ | |
| "suggestions": [ | |
| {{ | |
| "category": "Structure", | |
| "suggestion": "Add executive summary at the beginning", | |
| "priority": "high", | |
| "rationale": "Investors typically read executive summary first" | |
| }} | |
| ] | |
| }} | |
| """ | |
| response = await asyncio.to_thread( | |
| self.openai_client.chat.completions.create, | |
| model="gpt-4", | |
| messages=[{"role": "user", "content": prompt}], | |
| temperature=0.3 | |
| ) | |
| try: | |
| result = json.loads(response.choices[0].message.content) | |
| return result.get('suggestions', []) | |
| except json.JSONDecodeError: | |
| return self._basic_improvement_suggestions(content, document_type) | |
| except Exception as e: | |
| logging.error(f"AI improvement suggestions error: {e}") | |
| return self._basic_improvement_suggestions(content, document_type) | |
| def _basic_improvement_suggestions(self, content: str, document_type: str) -> List[Dict[str, Any]]: | |
| """Generate basic improvement suggestions without AI.""" | |
| suggestions = [] | |
| word_count = len(content.split()) | |
| if word_count < 500: | |
| suggestions.append({ | |
| 'category': 'Content', | |
| 'suggestion': 'Expand content with more detailed analysis', | |
| 'priority': 'high', | |
| 'rationale': 'Document appears too brief for comprehensive evaluation' | |
| }) | |
| if 'financial' not in content.lower() and document_type != 'term_sheet': | |
| suggestions.append({ | |
| 'category': 'Financial Analysis', | |
| 'suggestion': 'Include financial projections and metrics', | |
| 'priority': 'high', | |
| 'rationale': 'Financial data is critical for investor evaluation' | |
| }) | |
| return suggestions | |
| def _calculate_readability_score(self, content: str) -> Dict[str, Any]: | |
| """Calculate readability metrics for the document.""" | |
| words = content.split() | |
| sentences = content.count('.') + content.count('!') + content.count('?') | |
| if not words or not sentences: | |
| return {'score': 0, 'level': 'unclear'} | |
| avg_words_per_sentence = len(words) / sentences | |
| # Simple readability score (simplified Flesch formula) | |
| if avg_words_per_sentence < 15: | |
| score = 85 | |
| level = 'easy' | |
| elif avg_words_per_sentence < 20: | |
| score = 70 | |
| level = 'moderate' | |
| else: | |
| score = 50 | |
| level = 'difficult' | |
| return { | |
| 'score': score, | |
| 'level': level, | |
| 'avg_words_per_sentence': avg_words_per_sentence, | |
| 'total_words': len(words), | |
| 'total_sentences': sentences | |
| } | |
| def track_document_version(self, document_id: str, content: str, | |
| author: str = 'user') -> Dict[str, Any]: | |
| """Track document versions and changes.""" | |
| content_hash = hashlib.md5(content.encode()).hexdigest() | |
| timestamp = datetime.now().isoformat() | |
| if document_id not in self.document_versions: | |
| self.document_versions[document_id] = [] | |
| # Check if this is actually a new version | |
| if (self.document_versions[document_id] and | |
| self.document_versions[document_id][-1]['content_hash'] == content_hash): | |
| return {'message': 'No changes detected'} | |
| version_number = len(self.document_versions[document_id]) + 1 | |
| version_info = { | |
| 'version': version_number, | |
| 'content_hash': content_hash, | |
| 'author': author, | |
| 'timestamp': timestamp, | |
| 'content_length': len(content), | |
| 'word_count': len(content.split()) | |
| } | |
| # Calculate diff if there's a previous version | |
| if self.document_versions[document_id]: | |
| prev_version = self.document_versions[document_id][-1] | |
| if 'content' in prev_version: # If we stored content | |
| diff_ratio = SequenceMatcher(None, prev_version['content'], content).ratio() | |
| version_info['change_ratio'] = 1 - diff_ratio | |
| version_info['changes'] = self._calculate_changes(prev_version['content'], content) | |
| # Store version (optionally store full content for diff) | |
| version_info['content'] = content[:1000] # Store snippet for diff | |
| self.document_versions[document_id].append(version_info) | |
| return { | |
| 'version_created': version_number, | |
| 'timestamp': timestamp, | |
| 'changes_detected': version_info.get('change_ratio', 0) > 0.1 | |
| } | |
| def _calculate_changes(self, old_content: str, new_content: str) -> Dict[str, Any]: | |
| """Calculate specific changes between document versions.""" | |
| old_words = set(old_content.split()) | |
| new_words = set(new_content.split()) | |
| added_words = new_words - old_words | |
| removed_words = old_words - new_words | |
| return { | |
| 'words_added': len(added_words), | |
| 'words_removed': len(removed_words), | |
| 'new_words': list(added_words)[:10], # First 10 new words | |
| 'removed_words': list(removed_words)[:10] # First 10 removed words | |
| } | |
| def get_version_history(self, document_id: str) -> List[Dict[str, Any]]: | |
| """Get version history for a document.""" | |
| return self.document_versions.get(document_id, []) | |
| async def suggest_next_content(self, current_content: str, cursor_position: int, | |
| document_type: str, industry: str = None) -> List[str]: | |
| """Suggest next content based on current context.""" | |
| try: | |
| # Get context around cursor | |
| context_start = max(0, cursor_position - 200) | |
| context_end = min(len(current_content), cursor_position + 50) | |
| context = current_content[context_start:context_end] | |
| prompt = f""" | |
| Given this document context for a {document_type}, suggest 3 possible next sentences or phrases: | |
| Context: ...{context}... | |
| Provide 3 suggestions that would logically continue this content: | |
| 1. [suggestion 1] | |
| 2. [suggestion 2] | |
| 3. [suggestion 3] | |
| """ | |
| response = await asyncio.to_thread( | |
| self.openai_client.chat.completions.create, | |
| model="gpt-4", | |
| messages=[{"role": "user", "content": prompt}], | |
| temperature=0.7, | |
| max_tokens=200 | |
| ) | |
| content = response.choices[0].message.content | |
| suggestions = [] | |
| # Parse numbered suggestions | |
| for line in content.split('\n'): | |
| if re.match(r'^\d+\.', line.strip()): | |
| suggestion = re.sub(r'^\d+\.\s*', '', line.strip()) | |
| if suggestion: | |
| suggestions.append(suggestion) | |
| return suggestions[:3] | |
| except Exception as e: | |
| logging.error(f"Content suggestion error: {e}") | |
| return self._basic_content_suggestions(current_content, document_type) | |
| def _basic_content_suggestions(self, current_content: str, document_type: str) -> List[str]: | |
| """Generate basic content suggestions without AI.""" | |
| suggestions = [] | |
| if 'market' in current_content.lower(): | |
| suggestions.append("Our target market consists of...") | |
| suggestions.append("Market research indicates that...") | |
| suggestions.append("The competitive landscape shows...") | |
| elif 'financial' in current_content.lower(): | |
| suggestions.append("Revenue projections for the next 3 years...") | |
| suggestions.append("Our unit economics demonstrate...") | |
| suggestions.append("Key financial metrics include...") | |
| else: | |
| suggestions.append("Additionally, it's important to note that...") | |
| suggestions.append("This approach provides several benefits...") | |
| suggestions.append("The strategic implications include...") | |
| return suggestions | |
| # Export the class | |
| __all__ = ['DocumentIntelligenceEngine'] |