Spaces:
Sleeping
Sleeping
| # process_discovery_engine.py | |
| import numpy as np | |
| import pandas as pd | |
| from typing import Dict, List, Tuple, Optional | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| import spacy | |
| import json | |
| import re | |
| import networkx as nx | |
| from sklearn.cluster import DBSCAN | |
| class ProcessDiscoveryEngine: | |
| """ | |
| Discovers and analyzes business processes from various data sources | |
| including logs, documents, and recorded user activities. | |
| """ | |
| def __init__(self, config: Dict): | |
| """ | |
| Initialize the process discovery engine. | |
| Args: | |
| config: Configuration dictionary with parameters | |
| """ | |
| self.min_frequency = config.get('min_frequency', 0.05) | |
| self.time_threshold = config.get('time_threshold', 60) # seconds | |
| self.similarity_threshold = config.get('similarity_threshold', 0.75) | |
| self.process_graph = nx.DiGraph() | |
| def ingest_log_data(self, log_data: pd.DataFrame) -> bool: | |
| """ | |
| Ingest process log data from system logs. | |
| Args: | |
| log_data: DataFrame containing log entries with timestamp, user, action columns | |
| Returns: | |
| bool: Success status | |
| """ | |
| if 'timestamp' not in log_data.columns or 'action' not in log_data.columns: | |
| return False | |
| # Sort by timestamp | |
| sorted_logs = log_data.sort_values('timestamp') | |
| # Group by case_id if available | |
| if 'case_id' in sorted_logs.columns: | |
| case_groups = sorted_logs.groupby('case_id') | |
| for case_id, case_data in case_groups: | |
| self._process_sequence(case_data['action'].tolist(), | |
| source=f"log:{case_id}") | |
| else: | |
| # Try to identify sessions based on time gaps | |
| self._segment_and_process_logs(sorted_logs) | |
| return True | |
| def ingest_screen_recordings(self, recording_analysis: List[Dict]) -> bool: | |
| """ | |
| Ingest analyzed screen recording data. | |
| Args: | |
| recording_analysis: List of dictionaries containing screen activities | |
| Returns: | |
| bool: Success status | |
| """ | |
| for session in recording_analysis: | |
| if 'actions' in session and isinstance(session['actions'], list): | |
| action_sequence = [a['activity'] for a in session['actions'] | |
| if 'activity' in a] | |
| self._process_sequence(action_sequence, | |
| source=f"recording:{session.get('id', 'unknown')}") | |
| return True | |
| def _segment_and_process_logs(self, logs: pd.DataFrame) -> None: | |
| """ | |
| Segment logs into probable process instances based on time gaps. | |
| Args: | |
| logs: DataFrame of logs sorted by timestamp | |
| """ | |
| logs['timestamp'] = pd.to_datetime(logs['timestamp']) | |
| logs['time_diff'] = logs['timestamp'].diff().dt.total_seconds() | |
| # Mark new sequences where time difference exceeds threshold | |
| new_sequence = logs['time_diff'] > self.time_threshold | |
| logs['sequence_id'] = new_sequence.cumsum() | |
| # Process each sequence | |
| for seq_id, sequence in logs.groupby('sequence_id'): | |
| self._process_sequence(sequence['action'].tolist(), | |
| source=f"timegap:{seq_id}") | |
| def _process_sequence(self, actions: List[str], source: str) -> None: | |
| """ | |
| Process a sequence of actions into the process graph. | |
| Args: | |
| actions: List of action names in sequence | |
| source: Data source identifier | |
| """ | |
| for i in range(len(actions) - 1): | |
| current = actions[i] | |
| next_action = actions[i+1] | |
| # Add nodes if they don't exist | |
| if current not in self.process_graph: | |
| self.process_graph.add_node(current, count=0, sources=set()) | |
| if next_action not in self.process_graph: | |
| self.process_graph.add_node(next_action, count=0, sources=set()) | |
| # Update node data | |
| self.process_graph.nodes[current]['count'] += 1 | |
| self.process_graph.nodes[current]['sources'].add(source) | |
| # Add or update edge | |
| if self.process_graph.has_edge(current, next_action): | |
| self.process_graph[current][next_action]['weight'] += 1 | |
| self.process_graph[current][next_action]['sources'].add(source) | |
| else: | |
| self.process_graph.add_edge(current, next_action, | |
| weight=1, sources={source}) | |
| def discover_main_process_paths(self) -> List[Dict]: | |
| """ | |
| Discover the main process paths from the constructed graph. | |
| Returns: | |
| List of dictionaries describing main process paths | |
| """ | |
| # Filter edges by frequency | |
| total_transitions = sum(data['weight'] for _, _, data in self.process_graph.edges(data=True)) | |
| if total_transitions == 0: | |
| return [] | |
| min_edge_weight = total_transitions * self.min_frequency | |
| significant_edges = [(u, v) for u, v, d in self.process_graph.edges(data=True) | |
| if d['weight'] > min_edge_weight] | |
| # Create subgraph with only significant edges | |
| significant_graph = self.process_graph.edge_subgraph(significant_edges).copy() | |
| # Find all simple paths from potential start nodes to end nodes | |
| start_nodes = [n for n in significant_graph.nodes() | |
| if significant_graph.in_degree(n) == 0 or | |
| significant_graph.in_degree(n) < significant_graph.out_degree(n)] | |
| end_nodes = [n for n in significant_graph.nodes() | |
| if significant_graph.out_degree(n) == 0 or | |
| significant_graph.out_degree(n) < significant_graph.in_degree(n)] | |
| # If no clear start/end, use nodes with highest centrality | |
| if not start_nodes: | |
| centrality = nx.degree_centrality(significant_graph) | |
| start_nodes = [max(centrality, key=centrality.get)] | |
| if not end_nodes: | |
| centrality = nx.degree_centrality(significant_graph) | |
| end_nodes = [max(centrality, key=centrality.get)] | |
| # Find all paths between start and end nodes | |
| all_paths = [] | |
| for start in start_nodes: | |
| for end in end_nodes: | |
| try: | |
| paths = list(nx.all_simple_paths(significant_graph, start, end)) | |
| all_paths.extend(paths) | |
| except nx.NetworkXNoPath: | |
| continue | |
| # Calculate path frequency and return top paths | |
| path_data = [] | |
| for path in all_paths: | |
| # Calculate path strength as minimum edge weight along path | |
| edge_weights = [significant_graph[path[i]][path[i+1]]['weight'] | |
| for i in range(len(path)-1)] | |
| path_strength = min(edge_weights) if edge_weights else 0 | |
| path_data.append({ | |
| 'path': path, | |
| 'strength': path_strength, | |
| 'length': len(path), | |
| 'avg_edge_weight': sum(edge_weights) / len(edge_weights) if edge_weights else 0 | |
| }) | |
| # Sort by path strength descending | |
| path_data.sort(key=lambda x: x['strength'], reverse=True) | |
| return path_data | |
| def identify_process_variants(self) -> List[Dict]: | |
| """ | |
| Identify variants of the same basic process. | |
| Returns: | |
| List of process variant clusters | |
| """ | |
| if len(self.process_graph) < 2: | |
| return [] | |
| # Extract features for clustering | |
| paths = self.discover_main_process_paths() | |
| if not paths: | |
| return [] | |
| # Create feature vectors from paths | |
| all_activities = sorted(list(self.process_graph.nodes())) | |
| activity_indices = {act: i for i, act in enumerate(all_activities)} | |
| # Create feature vectors (activity presence and position) | |
| feature_vectors = [] | |
| for path_data in paths: | |
| path = path_data['path'] | |
| vector = np.zeros(len(all_activities) * 2) | |
| # Mark presence and relative position of activities | |
| for pos, activity in enumerate(path): | |
| idx = activity_indices[activity] | |
| vector[idx] = 1 # presence | |
| vector[idx + len(all_activities)] = pos / len(path) # relative position | |
| feature_vectors.append(vector) | |
| # Cluster paths using DBSCAN | |
| if len(feature_vectors) < 2: | |
| return [{'variant_id': 0, 'paths': paths}] | |
| clustering = DBSCAN(eps=0.3, min_samples=1).fit(feature_vectors) | |
| labels = clustering.labels_ | |
| # Group paths by cluster | |
| variants = {} | |
| for i, label in enumerate(labels): | |
| label_str = str(label) | |
| if label_str not in variants: | |
| variants[label_str] = [] | |
| variants[label_str].append(paths[i]) | |
| # Format result | |
| result = [ | |
| {'variant_id': variant_id, 'paths': variant_paths} | |
| for variant_id, variant_paths in variants.items() | |
| ] | |
| return result | |
| def get_process_stats(self) -> Dict: | |
| """ | |
| Get statistics about the discovered process. | |
| Returns: | |
| Dictionary with process statistics | |
| """ | |
| if not self.process_graph: | |
| return {"error": "No process data available"} | |
| stats = { | |
| "num_activities": len(self.process_graph.nodes()), | |
| "num_transitions": len(self.process_graph.edges()), | |
| "most_frequent_activities": [], | |
| "most_frequent_transitions": [], | |
| "process_complexity": 0, | |
| "data_sources": set() | |
| } | |
| # Most frequent activities | |
| activities = [(node, data['count']) | |
| for node, data in self.process_graph.nodes(data=True)] | |
| activities.sort(key=lambda x: x[1], reverse=True) | |
| stats["most_frequent_activities"] = activities[:10] | |
| # Most frequent transitions | |
| transitions = [(u, v, data['weight']) | |
| for u, v, data in self.process_graph.edges(data=True)] | |
| transitions.sort(key=lambda x: x[2], reverse=True) | |
| stats["most_frequent_transitions"] = transitions[:10] | |
| # Process complexity (using Control-Flow Complexity metric) | |
| stats["process_complexity"] = sum(self.process_graph.out_degree(n) for n in self.process_graph.nodes()) | |
| # Data sources | |
| for _, data in self.process_graph.nodes(data=True): | |
| if 'sources' in data: | |
| stats["data_sources"].update(data['sources']) | |
| stats["data_sources"] = list(stats["data_sources"]) | |
| return stats | |
| def export_process_model(self, format_type: str = 'bpmn') -> Dict: | |
| """ | |
| Export the discovered process in the specified format. | |
| Args: | |
| format_type: Output format ('bpmn', 'petri_net', or 'json') | |
| Returns: | |
| Dictionary with export data and metadata | |
| """ | |
| if format_type == 'json': | |
| nodes = [{"id": n, "count": data.get('count', 0)} | |
| for n, data in self.process_graph.nodes(data=True)] | |
| edges = [{"source": u, "target": v, "weight": data.get('weight', 0)} | |
| for u, v, data in self.process_graph.edges(data=True)] | |
| return { | |
| "format": "json", | |
| "process_model": { | |
| "nodes": nodes, | |
| "edges": edges | |
| } | |
| } | |
| elif format_type == 'bpmn': | |
| # Basic BPMN conversion (simplified) | |
| # In a real implementation, this would generate actual BPMN XML | |
| return { | |
| "format": "bpmn", | |
| "process_model": { | |
| "process_id": "discovered_process", | |
| "activities": list(self.process_graph.nodes()), | |
| "flows": [(u, v) for u, v in self.process_graph.edges()], | |
| "gateways": self._identify_potential_gateways() | |
| } | |
| } | |
| elif format_type == 'petri_net': | |
| # Basic Petri net conversion (simplified) | |
| return { | |
| "format": "petri_net", | |
| "process_model": { | |
| "places": self._generate_petri_net_places(), | |
| "transitions": list(self.process_graph.nodes()), | |
| "arcs": self._generate_petri_net_arcs() | |
| } | |
| } | |
| else: | |
| return {"error": f"Unsupported export format: {format_type}"} | |
| def _identify_potential_gateways(self) -> List[Dict]: | |
| """ | |
| Identify potential gateways in the process based on branching. | |
| Returns: | |
| List of potential gateway nodes | |
| """ | |
| gateways = [] | |
| for node in self.process_graph.nodes(): | |
| in_degree = self.process_graph.in_degree(node) | |
| out_degree = self.process_graph.out_degree(node) | |
| # Potential XOR-split (one input, multiple outputs) | |
| if in_degree == 1 and out_degree > 1: | |
| gateways.append({ | |
| "id": f"xor_split_{node}", | |
| "type": "exclusive_gateway", | |
| "direction": "split", | |
| "attached_to": node | |
| }) | |
| # Potential XOR-join (multiple inputs, one output) | |
| elif in_degree > 1 and out_degree == 1: | |
| gateways.append({ | |
| "id": f"xor_join_{node}", | |
| "type": "exclusive_gateway", | |
| "direction": "join", | |
| "attached_to": node | |
| }) | |
| # Potential AND-split/join or complex gateway | |
| elif in_degree > 1 and out_degree > 1: | |
| gateways.append({ | |
| "id": f"complex_{node}", | |
| "type": "complex_gateway", | |
| "direction": "mixed", | |
| "attached_to": node | |
| }) | |
| return gateways | |
| def _generate_petri_net_places(self) -> List[str]: | |
| """ | |
| Generate places for a Petri net representation. | |
| Returns: | |
| List of place IDs | |
| """ | |
| places = [] | |
| # Generate places between each pair of activities | |
| for u, v in self.process_graph.edges(): | |
| places.append(f"p_{u}_{v}") | |
| # Add start and end places | |
| start_nodes = [n for n in self.process_graph.nodes() | |
| if self.process_graph.in_degree(n) == 0] | |
| for node in start_nodes: | |
| places.append(f"p_start_{node}") | |
| end_nodes = [n for n in self.process_graph.nodes() | |
| if self.process_graph.out_degree(n) == 0] | |
| for node in end_nodes: | |
| places.append(f"p_{node}_end") | |
| return places | |
| def _generate_petri_net_arcs(self) -> List[Tuple[str, str]]: | |
| """ | |
| Generate arcs for a Petri net representation. | |
| Returns: | |
| List of (source, target) tuples representing arcs | |
| """ | |
| arcs = [] | |
| # Connect transitions through places | |
| for u, v in self.process_graph.edges(): | |
| place = f"p_{u}_{v}" | |
| arcs.append((u, place)) | |
| arcs.append((place, v)) | |
| # Connect start places to initial transitions | |
| start_nodes = [n for n in self.process_graph.nodes() | |
| if self.process_graph.in_degree(n) == 0] | |
| for node in start_nodes: | |
| arcs.append((f"p_start_{node}", node)) | |
| # Connect final transitions to end places | |
| end_nodes = [n for n in self.process_graph.nodes() | |
| if self.process_graph.out_degree(n) == 0] | |
| for node in end_nodes: | |
| arcs.append((node, f"p_{node}_end")) | |
| return arcs | |
| # requirements_analysis_module.py | |
| class RequirementsAnalysisModule: | |
| """ | |
| Analyzes business requirements and connects them to processes. | |
| Extracts structured data from natural language requirements. | |
| """ | |
| def __init__(self, config: Dict = None): | |
| """ | |
| Initialize the requirements analysis module. | |
| Args: | |
| config: Configuration dictionary | |
| """ | |
| self.config = config or {} | |
| # Load NLP model | |
| try: | |
| self.nlp = spacy.load("en_core_web_md") | |
| except: | |
| # Fallback to small model if medium not available | |
| self.nlp = spacy.load("en_core_web_sm") | |
| # Initialize requirements storage | |
| self.requirements = [] | |
| # Initialize taxonomy and patterns | |
| self._load_taxonomies() | |
| self._compile_requirement_patterns() | |
| def _load_taxonomies(self) -> None: | |
| """Load or initialize the business process taxonomy.""" | |
| # In production, this would load from a file or database | |
| self.process_taxonomy = { | |
| "financial": [ | |
| "invoice processing", "accounts payable", "accounts receivable", | |
| "payment processing", "financial reporting", "expense management" | |
| ], | |
| "hr": [ | |
| "onboarding", "offboarding", "payroll", "recruitment", | |
| "employee management", "benefits administration", "time tracking" | |
| ], | |
| "customer_service": [ | |
| "ticket management", "customer support", "inquiry handling", | |
| "complaint resolution", "feedback processing" | |
| ], | |
| "operations": [ | |
| "inventory management", "supply chain", "logistics", | |
| "order processing", "shipping", "receiving", "quality control" | |
| ], | |
| "sales": [ | |
| "lead management", "opportunity tracking", "quote generation", | |
| "contract management", "sales reporting", "commission calculation" | |
| ], | |
| "it": [ | |
| "access management", "incident management", "change management", | |
| "service request", "problem management", "release management" | |
| ] | |
| } | |
| # Complexity indicators for requirements | |
| self.complexity_indicators = { | |
| "high": [ | |
| "complex", "multiple systems", "integration", "decision tree", | |
| "exception handling", "compliance", "regulatory", "manual review", | |
| "approval workflow", "conditional logic", "business rules" | |
| ], | |
| "medium": [ | |
| "validation", "verification", "notification", "alert", | |
| "scheduled", "reporting", "dashboard", "data transformation" | |
| ], | |
| "low": [ | |
| "simple", "straightforward", "data entry", "form filling", | |
| "standard", "single system", "fixed path", "static rules" | |
| ] | |
| } | |
| def _compile_requirement_patterns(self) -> None: | |
| """Compile regex patterns for requirement extraction.""" | |
| # Action patterns | |
| self.action_patterns = [ | |
| r"(?:need|should|must|will|shall) (?:to )?([a-z]+)", | |
| r"responsible for ([a-z]+ing)", | |
| r"capability to ([a-z]+)", | |
| r"ability to ([a-z]+)" | |
| ] | |
| # System patterns | |
| self.system_patterns = [ | |
| r"(?:in|from|to|using|within) (?:the )?([A-Za-z0-9]+)(?: system| application| platform| software| tool)?", | |
| r"([A-Za-z0-9]+)(?: system| application| platform| software| tool)", | |
| r"([A-Za-z0-9]+) (?:database|interface|API|server)" | |
| ] | |
| # Frequency patterns | |
| self.frequency_patterns = [ | |
| r"(daily|weekly|monthly|quarterly|yearly|annually)", | |
| r"every ([0-9]+) (day|week|month|quarter|year)s?", | |
| r"([0-9]+) times per (day|week|month|year)" | |
| ] | |
| # Compile all patterns | |
| self.action_regex = [re.compile(pattern) for pattern in self.action_patterns] | |
| self.system_regex = [re.compile(pattern) for pattern in self.system_patterns] | |
| self.frequency_regex = [re.compile(pattern) for pattern in self.frequency_patterns] | |
| def analyze_text_requirement(self, requirement_text: str, source: str = None) -> Dict: | |
| """ | |
| Analyze a natural language requirement and extract structured information. | |
| Args: | |
| requirement_text: The text of the requirement | |
| source: Source of the requirement | |
| Returns: | |
| Dictionary with extracted requirement information | |
| """ | |
| # Parse with spaCy | |
| doc = self.nlp(requirement_text) | |
| # Basic requirement object | |
| requirement = { | |
| "id": f"REQ-{len(self.requirements) + 1}", | |
| "text": requirement_text, | |
| "source": source, | |
| "extracted": { | |
| "actions": self._extract_actions(doc, requirement_text), | |
| "systems": self._extract_systems(doc, requirement_text), | |
| "frequency": self._extract_frequency(requirement_text), | |
| "business_domain": self._classify_business_domain(doc), | |
| "complexity": self._assess_complexity(doc, requirement_text), | |
| "data_elements": self._extract_data_elements(doc) | |
| }, | |
| "automation_potential": None # Will be filled later | |
| } | |
| # Store the requirement | |
| self.requirements.append(requirement) | |
| return requirement | |
| def _extract_actions(self, doc, text: str) -> List[str]: | |
| """ | |
| Extract action verbs from requirement text. | |
| Args: | |
| doc: spaCy processed document | |
| text: Original text | |
| Returns: | |
| List of action verbs | |
| """ | |
| # Method 1: Use spaCy to find verbs | |
| verbs = [token.lemma_ for token in doc if token.pos_ == "VERB"] | |
| # Method 2: Use regex patterns | |
| pattern_matches = [] | |
| for pattern in self.action_regex: | |
| matches = pattern.findall(text.lower()) | |
| pattern_matches.extend(matches) | |
| # Combine and deduplicate | |
| all_actions = list(set(verbs + pattern_matches)) | |
| # Filter out common non-action verbs | |
| stopwords = ["be", "is", "are", "was", "were", "have", "has", "had"] | |
| filtered_actions = [v for v in all_actions if v not in stopwords and len(v) > 2] | |
| return filtered_actions | |
| def _extract_systems(self, doc, text: str) -> List[str]: | |
| """ | |
| Extract system names from requirement text. | |
| Args: | |
| doc: spaCy processed document | |
| text: Original text | |
| Returns: | |
| List of system names | |
| """ | |
| # Method 1: Named Entity Recognition for PRODUCT entities | |
| ner_systems = [ent.text for ent in doc.ents | |
| if ent.label_ in ["PRODUCT", "ORG", "GPE"]] | |
| # Method 2: Pattern matching | |
| pattern_systems = [] | |
| for pattern in self.system_regex: | |
| matches = pattern.findall(text) | |
| pattern_systems.extend(matches) | |
| # Combine results | |
| all_systems = list(set(ner_systems + pattern_systems)) | |
| # Filter out common false positives | |
| stopwords = ["system", "process", "application", "data", "information", "this", "the"] | |
| filtered_systems = [s for s in all_systems if s.lower() not in stopwords and len(s) > 2] | |
| return filtered_systems | |
| def _extract_frequency(self, text: str) -> Optional[str]: | |
| """ | |
| Extract frequency information from requirement text. | |
| Args: | |
| text: Requirement text | |
| Returns: | |
| Extracted frequency or None | |
| """ | |
| text_lower = text.lower() | |
| # Check all frequency patterns | |
| for pattern in self.frequency_regex: | |
| match = pattern.search(text_lower) | |
| if match: | |
| return match.group(0) | |
| # Check for specific frequency words | |
| frequency_words = ["daily", "weekly", "monthly", "quarterly", "annually", "yearly"] | |
| for word in frequency_words: | |
| if word in text_lower: | |
| return word | |
| return None | |
| def _classify_business_domain(self, doc) -> List[Tuple[str, float]]: | |
| """ | |
| Classify the business domain of the requirement. | |
| Args: | |
| doc: spaCy processed document | |
| Returns: | |
| List of (domain, confidence) tuples | |
| """ | |
| text = doc.text.lower() | |
| domain_scores = {} | |
| # Calculate score for each domain based on keyword matches | |
| for domain, keywords in self.process_taxonomy.items(): | |
| domain_score = 0 | |
| for keyword in keywords: | |
| if keyword in text: | |
| domain_score += 1 | |
| if domain_score > 0: | |
| # Normalize by number of keywords | |
| domain_scores[domain] = domain_score / len(keywords) | |
| # If no direct matches, use semantic similarity | |
| if not domain_scores: | |
| for domain, keywords in self.process_taxonomy.items(): | |
| # Calculate average similarity between doc and each keyword | |
| similarities = [doc.similarity(self.nlp(keyword)) for keyword in keywords] | |
| avg_similarity = sum(similarities) / len(similarities) if similarities else 0 | |
| if avg_similarity > 0.5: # Threshold for relevance | |
| domain_scores[domain] = avg_similarity | |
| # Sort by score and return | |
| sorted_domains = sorted(domain_scores.items(), key=lambda x: x[1], reverse=True) | |
| return sorted_domains | |
| def _assess_complexity(self, doc, text: str) -> str: | |
| """ | |
| Assess the complexity of the requirement. | |
| Args: | |
| doc: spaCy processed document | |
| text: Original text | |
| Returns: | |
| Complexity level ("high", "medium", or "low") | |
| """ | |
| text_lower = text.lower() | |
| # Count indicators for each complexity level | |
| scores = {level: 0 for level in self.complexity_indicators.keys()} | |
| for level, indicators in self.complexity_indicators.items(): | |
| for indicator in indicators: | |
| if indicator in text_lower: | |
| scores[level] += 1 | |
| # Check sentence structure complexity | |
| sentence_count = len(list(doc.sents)) | |
| avg_tokens_per_sentence = len(doc) / sentence_count if sentence_count > 0 else 0 | |
| # Adjust scores based on structural complexity | |
| if avg_tokens_per_sentence > 25: | |
| scores["high"] += 1 | |
| elif avg_tokens_per_sentence > 15: | |
| scores["medium"] += 1 | |
| # Check for conditional statements (if/then) | |
| if "if" in text_lower and ("then" in text_lower or "else" in text_lower): | |
| scores["high"] += 1 | |
| # Determine final complexity | |
| if scores["high"] > 0: | |
| return "high" | |
| elif scores["medium"] > 0: | |
| return "medium" | |
| else: | |
| return "low" | |
| def _extract_data_elements(self, doc) -> List[str]: | |
| """ | |
| Extract data elements from the requirement text. | |
| Args: | |
| doc: spaCy processed document | |
| Returns: | |
| List of data elements | |
| """ | |
| # Find noun chunks that could be data elements | |
| data_elements = [] | |
| for chunk in doc.noun_chunks: | |
| # Check if this looks like a data field | |
| if (any(token.pos_ == "NOUN" for token in chunk) and | |
| len(chunk) <= 4 and # Not too long | |
| not any(token.is_stop for token in chunk)): # Not all stopwords | |
| data_elements.append(chunk.text) | |
| # Look for specific data patterns | |
| data_patterns = [ | |
| (r"\b[A-Z][a-z]+ ID\b", "ID field"), | |
| (r"\b[A-Z][a-z]+ Number\b", "Number field"), | |
| (r"\b[A-Z][a-z]+ Code\b", "Code field"), | |
| (r"\b[A-Z][a-z]+ Date\b", "Date field"), | |
| (r"\bstatus\b", "Status field") | |
| ] | |
| for pattern, field_type in data_patterns: | |
| if re.search(pattern, doc.text): | |
| data_elements.append(field_type) | |
| return list(set(data_elements)) | |
| def analyze_requirements_batch(self, requirements: List[Dict]) -> List[Dict]: | |
| """ | |
| Analyze a batch of requirements and find relationships between them. | |
| Args: | |
| requirements: List of requirement dictionaries with 'text' field | |
| Returns: | |
| List of analyzed requirements | |
| """ | |
| # Process each requirement | |
| processed_requirements = [] | |
| for req in requirements: | |
| req_text = req.get('text', '') | |
| source = req.get('source', 'batch') | |
| processed = self.analyze_text_requirement(req_text, source) | |
| processed_requirements.append(processed) | |
| # Find relationships between requirements | |
| self._find_requirement_relationships(processed_requirements) | |
| return processed_requirements | |
| def _find_requirement_relationships(self, requirements: List[Dict]) -> None: | |
| """ | |
| Find and add relationships between requirements. | |
| Args: | |
| requirements: List of processed requirements | |
| """ | |
| if len(requirements) < 2: | |
| return | |
| # Extract text from requirements | |
| texts = [req["text"] for req in requirements] | |
| # Create TF-IDF matrix | |
| vectorizer = TfidfVectorizer(stop_words='english') | |
| tfidf_matrix = vectorizer.fit_transform(texts) | |
| # Calculate similarity matrix | |
| similarity_matrix = cosine_similarity(tfidf_matrix) | |
| # Add relationships to requirements | |
| for i, req in enumerate(requirements): | |
| related = [] | |
| for j, similarity in enumerate(similarity_matrix[i]): | |
| if i != j and similarity > 0.3: # Threshold for relationship | |
| related.append({ | |
| "id": requirements[j]["id"], | |
| "similarity": float(similarity), | |
| "relationship_type": self._determine_relationship_type(req, requirements[j]) | |
| }) | |
| # Sort by similarity | |
| related.sort(key=lambda x: x["similarity"], reverse=True) | |
| # Add to requirement | |
| req["related_requirements"] = related[:5] # Top 5 related requirements | |
| def _determine_relationship_type(self, req1: Dict, req2: Dict) -> str: | |
| """ | |
| Determine the type of relationship between two requirements. | |
| Args: | |
| req1: First requirement | |
| req2: Second requirement | |
| Returns: | |
| Relationship type string | |
| """ | |
| # Check for system relationships | |
| systems1 = set(req1["extracted"]["systems"]) | |
| systems2 = set(req2["extracted"]["systems"]) | |
| if systems1.intersection(systems2): | |
| return "same_system" | |
| # Check for business domain relationships | |
| domains1 = [d[0] for d in req1["extracted"]["business_domain"]] | |
| domains2 = [d[0] for d in req2["extracted"]["business_domain"]] | |
| if set(domains1).intersection(set(domains2)): | |
| return "same_domain" | |
| # Check for action relationships | |
| actions1 = set(req1["extracted"]["actions"]) | |
| actions2 = set(req2["extracted"]["actions"]) | |
| if actions1.intersection(actions2): | |
| return "similar_action" | |
| # Default relationship type | |
| return "related" | |
| def map_requirements_to_processes(self, requirements: List[Dict], process_models: List[Dict]) -> Dict: | |
| """ | |
| Map requirements to process models based on content matching. | |
| Args: | |
| requirements: List of analyzed requirements | |
| process_models: List of process model dictionaries | |
| Returns: | |
| Dictionary mapping process IDs to requirement IDs | |
| """ | |
| process_to_reqs = {} | |
| req_to_process = {} | |
| for process in process_models: | |
| process_id = process.get("id", "unknown") | |
| process_text = process.get("description", "") + " " + process.get("name", "") | |
| process_doc = self.nlp(process_text) | |
| # Find matching requirements | |
| matching_reqs = [] | |
| for req in requirements: | |
| req_text = req["text"] | |
| req_doc = self.nlp(req_text) | |
| # Calculate similarity | |
| similarity = process_doc.similarity(req_doc) | |
| if similarity > 0.6: # Threshold for matching | |
| matching_reqs.append({ | |
| "req_id": req["id"], | |
| "similarity": float(similarity) | |
| }) | |
| req_to_process[req["id"]] = process_id | |
| # Sort by similarity | |
| matching_reqs.sort(key=lambda x: x["similarity"], reverse=True) | |
| process_to_reqs[process_id] = matching_reqs | |
| return { | |
| "process_to_requirements": process_to_reqs, | |
| "requirement_to_process": req_to_process | |
| } | |
| def evaluate_automation_potential(self, requirement: Dict) -> Dict: | |
| """ | |
| Evaluate the automation potential of a requirement. | |
| Args: | |
| requirement: Analyzed requirement | |
| Returns: | |
| Automation potential assessment | |
| """ | |
| # Basic score starts at 5 out of 10 | |
| score = 5 | |
| # Complexity factor (high complexity decreases score) | |
| complexity = requirement["extracted"]["complexity"] | |
| if complexity == "high": | |
| score -= 2 | |
| elif complexity == "low": | |
| score += 2 | |
| # Action factor (certain actions are more automatable) | |
| automatable_actions = ["extract", "transfer", "copy", "move", "calculate", | |
| "update", "generate", "validate", "verify", "send", | |
| "notify", "schedule", "retrieve", "check"] | |
| for action in requirement["extracted"]["actions"]: | |
| if action in automatable_actions: | |
| score += 0.5 | |
| # System factor (presence of systems increases score) | |
| if requirement["extracted"]["systems"]: | |
| score += len(requirement["extracted"]["systems"]) * 0.5 | |
| # Data elements factor (more data elements suggests more structure) | |
| data_elements = requirement["extracted"]["data_elements"] | |
| if data_elements: | |
| score += min(len(data_elements) * 0.3, 2) # Cap at +2 | |
| # Cap score between 1-10 | |
| score = max(1, min(10, score)) | |
| # Determine category | |
| category = "high" if score >= 7.5 else "medium" if score >= 5 else "low" | |
| # Identify automation technology | |
| tech = self._recommend_automation_technology(requirement, score) | |
| return { | |
| "automation_score": round(score, 1), | |
| "automation_category": category, | |
| "recommended_technology": tech, | |
| "rationale": self._generate_automation_rationale(requirement, score, category) | |
| } | |
| def _recommend_automation_technology(self, requirement: Dict, score: float) -> str: | |
| """ | |
| Recommend suitable automation technology. | |
| Args: | |
| requirement: Analyzed requirement | |
| score: Automation score | |
| Returns: | |
| Recommended technology | |
| """ | |
| complexity = requirement["extracted"]["complexity"] | |
| actions = requirement["extracted"]["actions"] | |
| # Decision tree for technology recommendation | |
| if score >= 8: | |
| if any(a in actions for a in ["extract", "scrape", "read"]): | |
| return "RPA with OCR/Document Understanding" | |
| else: | |
| return "Traditional RPA" | |
| elif score >= 5: | |
| if complexity == "high": | |
| return "RPA with Human-in-the-Loop" | |
| elif any(a in actions for a in ["decide", "evaluate", "assess"]): | |
| return "RPA with Decision Automation" | |
| else: | |
| return "Traditional RPA" | |
| else: | |
| if any(a in actions for a in ["review", "approve"]): | |
| return "Workflow Automation" | |
| else: | |
| return "Partial Automation with Human Tasks" | |
| def _generate_automation_rationale(self, requirement: Dict, score: float, category: str) -> str: | |
| """ | |
| Generate explanation for automation assessment. | |
| Args: | |
| requirement: Analyzed requirement | |
| score: Automation score | |
| category: Automation category | |
| Returns: | |
| Rationale text | |
| """ | |
| complexity = requirement["extracted"]["complexity"] | |
| if category == "high": | |
| return (f"This requirement has {complexity} complexity but shows strong automation " | |
| f"potential due to clear structure and defined data elements. " | |
| f"Score of {score}/10 indicates this is a prime automation candidate.") | |
| elif category == "medium": | |
| return (f"This {complexity} complexity requirement has moderate automation potential. " | |
| f"Score of {score}/10 suggests partial automation with some human oversight.") | |
| else: | |
| return (f"The {complexity} complexity and ambiguous nature of this requirement " | |
| f"limits automation potential. Score of {score}/10 indicates this may " | |
| f"require significant human involvement or process redesign.") | |
| def assess_requirements_automation_potential(self, requirements: List[Dict]) -> List[Dict]: | |
| """ | |
| Assess automation potential for a batch of requirements. | |
| Args: | |
| requirements: List of analyzed requirements | |
| Returns: | |
| Requirements with automation assessment added | |
| """ | |
| for req in requirements: | |
| req["automation_potential"] = self.evaluate_automation_potential(req) | |
| return requirements | |
| def generate_requirements_report(self, requirements: List[Dict]) -> Dict: | |
| """ | |
| Generate a summary report of requirements analysis. | |
| Args: | |
| requirements: List of analyzed requirements | |
| Returns: | |
| Report dictionary | |
| """ | |
| # Count by complexity | |
| complexity_counts = {"high": 0, "medium": 0, "low": 0} | |
| for req in requirements: | |
| complexity = req["extracted"]["complexity"] | |
| complexity_counts[complexity] += 1 | |
| # Count by automation potential | |
| if all("automation_potential" in req for req in requirements): | |
| automation_counts = {"high": 0, "medium": 0, "low": 0} | |
| for req in requirements: | |
| category = req["automation_potential"]["automation_category"] | |
| automation_counts[category] += 1 | |
| else: | |
| automation_counts = None | |
| # Find common systems | |
| all_systems = [] | |
| for req in requirements: | |
| all_systems.extend(req["extracted"]["systems"]) | |
| system_counts = {} | |
| for system in all_systems: | |
| if system in system_counts: | |
| system_counts[system] += 1 | |
| else: | |
| system_counts[system] = 1 | |
| # Sort systems by frequency | |
| top_systems = sorted(system_counts.items(), key=lambda x: x[1], reverse=True)[:5] | |
| # Generate report | |
| report = { | |
| "total_requirements": len(requirements), | |
| "complexity_distribution": complexity_counts, | |
| "automation_potential": automation_counts, | |
| "top_systems": top_systems, | |
| "recommendations": self._generate_overall_recommendations(requirements) | |
| } | |
| return report | |
| def _generate_overall_recommendations(self, requirements: List[Dict]) -> List[str]: | |
| """ | |
| Generate overall recommendations based on requirements analysis. | |
| Args: | |
| requirements: List of analyzed requirements | |
| Returns: | |
| List of recommendation strings | |
| """ | |
| recommendations = [] | |
| # Check if automation assessment is available | |
| automation_available = all("automation_potential" in req for req in requirements) | |
| if automation_available: | |
| # Count high automation potential requirements | |
| high_potential = [r for r in requirements | |
| if r["automation_potential"]["automation_category"] == "high"] | |
| if len(high_potential) >= len(requirements) * 0.7: | |
| recommendations.append( | |
| "High automation potential across most requirements. " | |
| "Consider an end-to-end automation solution." | |
| ) | |
| elif len(high_potential) >= len(requirements) * 0.3: | |
| recommendations.append( | |
| "Significant automation potential in a subset of requirements. " | |
| "Consider a phased automation approach starting with high-potential areas." | |
| ) | |
| else: | |
| recommendations.append( | |
| "Limited automation potential in current requirements. " | |
| "Consider process redesign to increase automation potential." | |
| ) | |
| # Recommend technologies | |
| tech_counts = {} | |
| for req in requirements: | |
| tech = req["automation_potential"]["recommended_technology"] | |
| tech_counts[tech] = tech_counts.get(tech, 0) + 1 | |
| top_tech = max(tech_counts.items(), key=lambda x: x[1])[0] | |
| recommendations.append(f"Primary recommended technology: {top_tech}") | |
| # Requirements quality recommendations | |
| completeness_issues = False | |
| for req in requirements: | |
| if (not req["extracted"]["actions"] or | |
| not req["extracted"]["systems"] or | |
| not req["extracted"]["data_elements"]): | |
| completeness_issues = True | |
| break | |
| if completeness_issues: | |
| recommendations.append( | |
| "Some requirements lack necessary details. " | |
| "Consider refining requirements to specify actions, systems, and data elements." | |
| ) | |
| return recommendations | |
| Version 2 of 2 | |