|
|
"""Utility functions for Salesforce B2B Commerce Migration Assistant""" |
|
|
|
|
|
import re |
|
|
import json |
|
|
import logging |
|
|
from typing import Dict, Tuple, List, Optional |
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
APEX_PATTERNS = { |
|
|
"class_declaration": r"(?:public|private|global|protected)\s+(?:virtual|abstract|with sharing|without sharing|inherited sharing)?\s*class\s+\w+", |
|
|
"trigger_declaration": r"trigger\s+\w+\s+on\s+\w+\s*\([^)]+\)", |
|
|
"method_declaration": r"(?:public|private|global|protected)\s+(?:static)?\s*(?:void|\w+)\s+\w+\s*\([^)]*\)", |
|
|
"soql_query": r"(?:\[|Database\.query\s*\()\s*SELECT\s+.*?\s+FROM\s+\w+.*?(?:\]|\))", |
|
|
"dml_operation": r"(?:insert|update|delete|undelete|upsert|merge)\s+\w+", |
|
|
"bulkification_issue": r"for\s*\([^)]+\)\s*{[^}]*(?:insert|update|delete|undelete)\s+", |
|
|
"hardcoded_id": r"(?:\'[a-zA-Z0-9]{15}\'|\'[a-zA-Z0-9]{18}\')", |
|
|
"missing_null_check": r"(\w+)\.(\w+)(?!\s*(?:!=|==)\s*null)", |
|
|
"governor_limit_risk": r"(?:for\s*\([^)]+\)\s*{[^}]*\[SELECT|Database\.query)", |
|
|
} |
|
|
|
|
|
|
|
|
APEX_ERRORS = { |
|
|
"missing_semicolon": { |
|
|
"pattern": r"[^{};]\s*\n\s*(?:public|private|global|protected|if|for|while|try)", |
|
|
"message": "Missing semicolon at end of statement", |
|
|
"severity": "error" |
|
|
}, |
|
|
"unclosed_bracket": { |
|
|
"pattern": r"(?:\{(?:[^{}]|(?:\{[^{}]*\}))*$)|(?:^[^{}]*\})", |
|
|
"message": "Unclosed or extra bracket detected", |
|
|
"severity": "error" |
|
|
}, |
|
|
"invalid_soql": { |
|
|
"pattern": r"\[\s*SELECT\s+FROM\s+\w+", |
|
|
"message": "Invalid SOQL: Missing field selection", |
|
|
"severity": "error" |
|
|
}, |
|
|
"missing_try_catch_dml": { |
|
|
"pattern": r"(?<!try\s{[^}]*)(insert|update|delete|upsert)\s+(?!.*catch)", |
|
|
"message": "DML operation without try-catch block", |
|
|
"severity": "warning" |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
B2B_COMMERCE_PATTERNS = { |
|
|
"cloudcraze_reference": r"(?:ccrz__|E_\w+|CC_\w+)", |
|
|
"b2b_lex_object": r"(?:OrderSummary|CartItem|WebCart|ProductCatalog|BuyerGroup|CommerceEntitlementPolicy)", |
|
|
"deprecated_method": r"(?:ccrz\.cc_CallContext|ccrz\.ccAPI|cc_bean_\w+)", |
|
|
"migration_required": r"(?:E_Product__|E_Cart__|E_Order__|CC_Promotions__|CC_Tax__)" |
|
|
} |
|
|
|
|
|
VALIDATION_SCHEMA = { |
|
|
"quality_rating": "int (1–10)", |
|
|
"accuracy": "float (0.0–1.0)", |
|
|
"completeness": "float (0.0–1.0)", |
|
|
"best_practices_alignment": "float (0.0–1.0)", |
|
|
"syntax_validity": "float (0.0–1.0)", |
|
|
"security_score": "float (0.0–1.0)", |
|
|
"performance_score": "float (0.0–1.0)", |
|
|
"explanations": { |
|
|
"quality_rating": "string", |
|
|
"accuracy": "string", |
|
|
"completeness": "string", |
|
|
"best_practices_alignment": "string", |
|
|
"syntax_validity": "string", |
|
|
"security_score": "string", |
|
|
"performance_score": "string" |
|
|
}, |
|
|
"errors": ["list of syntax errors"], |
|
|
"warnings": ["list of potential issues"], |
|
|
"suggestions": ["list of improvement suggestions"] |
|
|
} |
|
|
|
|
|
def validate_apex_syntax(code: str) -> Tuple[bool, List[Dict[str, str]]]: |
|
|
"""Validate Apex syntax and return errors/warnings.""" |
|
|
issues = [] |
|
|
|
|
|
|
|
|
for error_type, error_info in APEX_ERRORS.items(): |
|
|
matches = re.finditer(error_info["pattern"], code, re.MULTILINE | re.DOTALL) |
|
|
for match in matches: |
|
|
issues.append({ |
|
|
"type": error_info["severity"], |
|
|
"message": error_info["message"], |
|
|
"line": code[:match.start()].count('\n') + 1, |
|
|
"position": match.start() |
|
|
}) |
|
|
|
|
|
|
|
|
if not re.search(APEX_PATTERNS["class_declaration"], code) and \ |
|
|
not re.search(APEX_PATTERNS["trigger_declaration"], code): |
|
|
issues.append({ |
|
|
"type": "error", |
|
|
"message": "No valid Apex class or trigger declaration found", |
|
|
"line": 1, |
|
|
"position": 0 |
|
|
}) |
|
|
|
|
|
|
|
|
bulk_issues = re.finditer(APEX_PATTERNS["bulkification_issue"], code, re.DOTALL) |
|
|
for match in bulk_issues: |
|
|
issues.append({ |
|
|
"type": "error", |
|
|
"message": "DML operation inside loop - violates bulkification best practices", |
|
|
"line": code[:match.start()].count('\n') + 1, |
|
|
"position": match.start() |
|
|
}) |
|
|
|
|
|
|
|
|
hardcoded_ids = re.finditer(APEX_PATTERNS["hardcoded_id"], code) |
|
|
for match in hardcoded_ids: |
|
|
issues.append({ |
|
|
"type": "warning", |
|
|
"message": "Hardcoded Salesforce ID detected - use Custom Settings or Custom Metadata", |
|
|
"line": code[:match.start()].count('\n') + 1, |
|
|
"position": match.start() |
|
|
}) |
|
|
|
|
|
|
|
|
gov_limit_risks = re.finditer(APEX_PATTERNS["governor_limit_risk"], code, re.DOTALL) |
|
|
for match in gov_limit_risks: |
|
|
issues.append({ |
|
|
"type": "warning", |
|
|
"message": "SOQL query inside loop - potential governor limit issue", |
|
|
"line": code[:match.start()].count('\n') + 1, |
|
|
"position": match.start() |
|
|
}) |
|
|
|
|
|
has_errors = any(issue["type"] == "error" for issue in issues) |
|
|
return not has_errors, issues |
|
|
|
|
|
def perform_skeptical_evaluation(code: str, context: str = "trigger") -> Dict[str, any]: |
|
|
"""Perform skeptical evaluation of code looking for common issues.""" |
|
|
evaluation = { |
|
|
"syntax_issues": [], |
|
|
"security_concerns": [], |
|
|
"performance_issues": [], |
|
|
"best_practice_violations": [], |
|
|
"b2b_commerce_issues": [] |
|
|
} |
|
|
|
|
|
|
|
|
is_valid, syntax_issues = validate_apex_syntax(code) |
|
|
evaluation["syntax_issues"] = syntax_issues |
|
|
|
|
|
|
|
|
if re.search(r"without\s+sharing", code, re.IGNORECASE): |
|
|
evaluation["security_concerns"].append({ |
|
|
"type": "warning", |
|
|
"message": "Class declared 'without sharing' - ensure this is intentional" |
|
|
}) |
|
|
|
|
|
if not re.search(r"\.stripInaccessible\(", code) and re.search(r"(insert|update)\s+", code): |
|
|
evaluation["security_concerns"].append({ |
|
|
"type": "warning", |
|
|
"message": "DML operations without stripInaccessible - potential FLS violation" |
|
|
}) |
|
|
|
|
|
|
|
|
nested_loops = re.findall(r"for\s*\([^)]+\)\s*\{[^}]*for\s*\([^)]+\)", code, re.DOTALL) |
|
|
if nested_loops: |
|
|
evaluation["performance_issues"].append({ |
|
|
"type": "warning", |
|
|
"message": f"Nested loops detected ({len(nested_loops)} occurrences) - review for O(n²) complexity" |
|
|
}) |
|
|
|
|
|
|
|
|
if re.search(r"@isTest|testMethod", code, re.IGNORECASE): |
|
|
if not re.search(r"System\.assert|Assert\.", code): |
|
|
evaluation["best_practice_violations"].append({ |
|
|
"type": "error", |
|
|
"message": "Test class without assertions - tests must verify behavior" |
|
|
}) |
|
|
|
|
|
|
|
|
cloudcraze_refs = re.findall(B2B_COMMERCE_PATTERNS["cloudcraze_reference"], code) |
|
|
if cloudcraze_refs: |
|
|
evaluation["b2b_commerce_issues"].append({ |
|
|
"type": "error", |
|
|
"message": f"CloudCraze references found ({len(set(cloudcraze_refs))} unique) - must be migrated to B2B LEX" |
|
|
}) |
|
|
|
|
|
deprecated_methods = re.findall(B2B_COMMERCE_PATTERNS["deprecated_method"], code) |
|
|
if deprecated_methods: |
|
|
evaluation["b2b_commerce_issues"].append({ |
|
|
"type": "error", |
|
|
"message": f"Deprecated CloudCraze methods found: {', '.join(set(deprecated_methods))}" |
|
|
}) |
|
|
|
|
|
return evaluation |
|
|
|
|
|
def extract_code_blocks(text: str) -> str: |
|
|
"""Enhanced code extraction with multiple strategies.""" |
|
|
|
|
|
pattern = r"```(?:apex|java|Apex|Java|APEX|JAVA)?\s*(.*?)```" |
|
|
matches = re.findall(pattern, text, re.DOTALL | re.IGNORECASE) |
|
|
|
|
|
code_blocks = [] |
|
|
for block in matches: |
|
|
cleaned_block = block.strip() |
|
|
if cleaned_block: |
|
|
code_blocks.append(cleaned_block) |
|
|
|
|
|
|
|
|
if not code_blocks: |
|
|
apex_patterns = [ |
|
|
|
|
|
r"((?:public|private|global|protected)\s+(?:virtual|abstract|with sharing|without sharing|inherited sharing)?\s*class\s+\w+(?:\s+extends\s+\w+)?(?:\s+implements\s+[\w\s,]+)?\s*\{(?:[^{}]|\{[^{}]*\})*\})", |
|
|
|
|
|
r"(trigger\s+\w+\s+on\s+\w+\s*\([^)]+\)\s*\{(?:[^{}]|\{[^{}]*\})*\})", |
|
|
|
|
|
r"((?:public|private|global)\s+interface\s+\w+(?:\s+extends\s+[\w\s,]+)?\s*\{(?:[^{}]|\{[^{}]*\})*\})", |
|
|
|
|
|
r"((?:public|private|global)\s+enum\s+\w+\s*\{[^}]+\})", |
|
|
|
|
|
r"(@\w+(?:\([^)]*\))?\s*(?:public|private|global|protected).*?(?:\{(?:[^{}]|\{[^{}]*\})*\}|;))" |
|
|
] |
|
|
|
|
|
for pattern in apex_patterns: |
|
|
found = re.findall(pattern, text, re.DOTALL | re.MULTILINE) |
|
|
code_blocks.extend(found) |
|
|
|
|
|
|
|
|
if not code_blocks: |
|
|
|
|
|
marker_patterns = [ |
|
|
r"(?:corrected|fixed|updated|converted|modified)\s+code\s*:\s*\n((?:(?:public|private|global|trigger).*?)(?=\n\n|\Z))", |
|
|
r"(?:here'?s?|below is)\s+(?:the|your)\s+(?:corrected|fixed|updated)\s+\w+\s*:\s*\n((?:(?:public|private|global|trigger).*?)(?=\n\n|\Z))" |
|
|
] |
|
|
|
|
|
for pattern in marker_patterns: |
|
|
found = re.findall(pattern, text, re.DOTALL | re.IGNORECASE) |
|
|
code_blocks.extend(found) |
|
|
|
|
|
return '\n\n'.join(filter(None, code_blocks)) |
|
|
|
|
|
def format_structured_explanation(response: str, code_output: str) -> str: |
|
|
"""Format the explanation in a structured, brief manner.""" |
|
|
|
|
|
sections = { |
|
|
"key_changes": "", |
|
|
"critical_issues": "", |
|
|
"warnings": "" |
|
|
} |
|
|
|
|
|
|
|
|
key_match = re.search(r"##\s*KEY CHANGES.*?\n((?:[-•]\s*.*?\n)+)", response, re.IGNORECASE | re.DOTALL) |
|
|
if key_match: |
|
|
sections["key_changes"] = key_match.group(1).strip() |
|
|
|
|
|
|
|
|
critical_match = re.search(r"##\s*CRITICAL ISSUES.*?\n((?:\d+\..*?\n)+)", response, re.IGNORECASE | re.DOTALL) |
|
|
if critical_match: |
|
|
sections["critical_issues"] = critical_match.group(1).strip() |
|
|
|
|
|
|
|
|
warning_match = re.search(r"##\s*REMAINING WARNINGS.*?\n((?:[-•]\s*.*?\n)*)", response, re.IGNORECASE | re.DOTALL) |
|
|
if warning_match: |
|
|
sections["warnings"] = warning_match.group(1).strip() |
|
|
|
|
|
|
|
|
formatted = "### Summary of Changes\n\n" |
|
|
|
|
|
if sections["key_changes"]: |
|
|
formatted += "**Key Changes:**\n" + sections["key_changes"] + "\n\n" |
|
|
|
|
|
if sections["critical_issues"]: |
|
|
formatted += "**Critical Issues Fixed:**\n" + sections["critical_issues"] + "\n\n" |
|
|
|
|
|
if sections["warnings"]: |
|
|
formatted += "**⚠️ Remaining Warnings:**\n" + sections["warnings"] |
|
|
|
|
|
|
|
|
if not any(sections.values()): |
|
|
|
|
|
formatted = "### Code Correction Summary\n\n" |
|
|
formatted += "The code has been corrected and optimized. " |
|
|
formatted += "Check the code output for inline comments explaining specific changes.\n\n" |
|
|
formatted += "For detailed analysis, see the Full Model Response." |
|
|
|
|
|
return formatted.strip() |
|
|
|
|
|
def format_object_conversion_explanation(response: str, code_output: str) -> str: |
|
|
"""Format the object conversion explanation in a structured manner.""" |
|
|
sections = { |
|
|
"mapping": "", |
|
|
"field_table": "", |
|
|
"steps": "", |
|
|
"warnings": "" |
|
|
} |
|
|
|
|
|
|
|
|
mapping_match = re.search(r"##\s*B2B LEX OBJECT MAPPING.*?\n((?:[-•]\s*.*?\n)+)", response, re.IGNORECASE | re.DOTALL) |
|
|
if mapping_match: |
|
|
sections["mapping"] = mapping_match.group(1).strip() |
|
|
|
|
|
|
|
|
table_match = re.search(r"##\s*FIELD MAPPINGS.*?\n((?:\|.*?\|.*?\n)+)", response, re.IGNORECASE | re.DOTALL) |
|
|
if table_match: |
|
|
sections["field_table"] = table_match.group(1).strip() |
|
|
|
|
|
|
|
|
steps_match = re.search(r"##\s*MIGRATION STEPS.*?\n((?:\d+\..*?\n)+)", response, re.IGNORECASE | re.DOTALL) |
|
|
if steps_match: |
|
|
sections["steps"] = steps_match.group(1).strip() |
|
|
|
|
|
|
|
|
warning_match = re.search(r"##\s*WARNINGS.*?\n((?:[-•]\s*.*?\n)*)", response, re.IGNORECASE | re.DOTALL) |
|
|
if warning_match: |
|
|
sections["warnings"] = warning_match.group(1).strip() |
|
|
|
|
|
|
|
|
formatted = "### Conversion Summary\n\n" |
|
|
|
|
|
if sections["mapping"]: |
|
|
formatted += "**Object Mapping:**\n" + sections["mapping"] + "\n\n" |
|
|
|
|
|
if sections["field_table"]: |
|
|
formatted += "**Field Mappings:**\n" + sections["field_table"] + "\n\n" |
|
|
|
|
|
if sections["steps"]: |
|
|
formatted += "**Migration Steps:**\n" + sections["steps"] + "\n\n" |
|
|
|
|
|
if sections["warnings"]: |
|
|
formatted += "**⚠️ Important Notes:**\n" + sections["warnings"] |
|
|
|
|
|
|
|
|
if not any(sections.values()): |
|
|
formatted = "### Conversion Summary\n\n" |
|
|
formatted += "The CloudCraze object has been converted to B2B Lightning Experience format. " |
|
|
formatted += "Check the code output for the complete implementation.\n\n" |
|
|
formatted += "For detailed field mappings and migration steps, see the Full Model Response." |
|
|
|
|
|
return formatted.strip() |
|
|
|
|
|
def extract_validation_metrics(validation_text: str) -> Optional[Dict[str, float]]: |
|
|
"""Enhanced JSON extraction for validation metrics.""" |
|
|
try: |
|
|
|
|
|
json_patterns = [ |
|
|
r'(?:json|JSON|assessment|Assessment)[\s:]*({[^{}]*(?:{[^{}]*}[^{}]*)*})', |
|
|
r'```json\s*({[^`]+})\s*```', |
|
|
r'({[^{}]*"quality_rating"[^{}]*(?:{[^{}]*}[^{}]*)*})' |
|
|
] |
|
|
|
|
|
for pattern in json_patterns: |
|
|
matches = re.findall(pattern, validation_text, re.DOTALL) |
|
|
for match in matches: |
|
|
try: |
|
|
data = json.loads(match) |
|
|
if "quality_rating" in data: |
|
|
return normalize_metrics(data) |
|
|
except json.JSONDecodeError: |
|
|
continue |
|
|
|
|
|
|
|
|
metrics = {} |
|
|
metric_patterns = { |
|
|
"quality_rating": r"quality_rating[\"']?\s*:\s*(\d+(?:\.\d+)?)", |
|
|
"accuracy": r"accuracy[\"']?\s*:\s*(\d+(?:\.\d+)?)", |
|
|
"completeness": r"completeness[\"']?\s*:\s*(\d+(?:\.\d+)?)", |
|
|
"best_practices_alignment": r"best_practices_alignment[\"']?\s*:\s*(\d+(?:\.\d+)?)", |
|
|
"syntax_validity": r"syntax_validity[\"']?\s*:\s*(\d+(?:\.\d+)?)", |
|
|
"security_score": r"security_score[\"']?\s*:\s*(\d+(?:\.\d+)?)", |
|
|
"performance_score": r"performance_score[\"']?\s*:\s*(\d+(?:\.\d+)?)" |
|
|
} |
|
|
|
|
|
for metric, pattern in metric_patterns.items(): |
|
|
match = re.search(pattern, validation_text, re.IGNORECASE) |
|
|
if match: |
|
|
metrics[metric] = float(match.group(1)) |
|
|
|
|
|
if metrics: |
|
|
return normalize_metrics(metrics) |
|
|
|
|
|
return None |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error extracting metrics: {e}") |
|
|
return None |
|
|
|
|
|
def normalize_metrics(data: Dict) -> Dict[str, float]: |
|
|
"""Ensure metrics are in the correct format and range.""" |
|
|
normalized = { |
|
|
"quality_rating": min(10, max(0, float(data.get("quality_rating", 0)))), |
|
|
"accuracy": min(1.0, max(0.0, float(data.get("accuracy", 0.0)))), |
|
|
"completeness": min(1.0, max(0.0, float(data.get("completeness", 0.0)))), |
|
|
"best_practices_alignment": min(1.0, max(0.0, float(data.get("best_practices_alignment", 0.0)))), |
|
|
"syntax_validity": min(1.0, max(0.0, float(data.get("syntax_validity", 0.0)))), |
|
|
"security_score": min(1.0, max(0.0, float(data.get("security_score", 0.0)))), |
|
|
"performance_score": min(1.0, max(0.0, float(data.get("performance_score", 0.0)))) |
|
|
} |
|
|
return normalized |
|
|
|
|
|
def generate_test_cases(code_type: str, code: str) -> str: |
|
|
"""Generate test cases for the given code.""" |
|
|
if code_type == "trigger": |
|
|
return f""" |
|
|
// Test class for the trigger |
|
|
@isTest |
|
|
private class Test_MigratedTrigger {{ |
|
|
@TestSetup |
|
|
static void setup() {{ |
|
|
// Create test data |
|
|
// TODO: Add specific test data setup |
|
|
}} |
|
|
|
|
|
@isTest |
|
|
static void testBulkInsert() {{ |
|
|
// Test bulk insert scenario |
|
|
List<SObject> testRecords = new List<SObject>(); |
|
|
for(Integer i = 0; i < 200; i++) {{ |
|
|
// TODO: Create test records |
|
|
}} |
|
|
|
|
|
Test.startTest(); |
|
|
insert testRecords; |
|
|
Test.stopTest(); |
|
|
|
|
|
// TODO: Add assertions |
|
|
System.assert(true, 'Bulk insert test needs implementation'); |
|
|
}} |
|
|
|
|
|
@isTest |
|
|
static void testBulkUpdate() {{ |
|
|
// Test bulk update scenario |
|
|
// TODO: Implement bulk update test |
|
|
}} |
|
|
|
|
|
@isTest |
|
|
static void testErrorHandling() {{ |
|
|
// Test error scenarios |
|
|
// TODO: Test validation rules, required fields, etc. |
|
|
}} |
|
|
|
|
|
@isTest |
|
|
static void testGovernorLimits() {{ |
|
|
// Test near governor limits |
|
|
// TODO: Test with large data volumes |
|
|
}} |
|
|
}} |
|
|
""" |
|
|
else: |
|
|
return f""" |
|
|
// Test data creation for migrated object |
|
|
@isTest |
|
|
public class Test_MigratedObjectData {{ |
|
|
public static SObject createTestRecord() {{ |
|
|
// TODO: Create and return test instance |
|
|
return null; |
|
|
}} |
|
|
|
|
|
public static List<SObject> createBulkTestRecords(Integer count) {{ |
|
|
List<SObject> records = new List<SObject>(); |
|
|
for(Integer i = 0; i < count) {{ |
|
|
// TODO: Create test records |
|
|
}} |
|
|
return records; |
|
|
}} |
|
|
|
|
|
public static void validateMigrationMapping() {{ |
|
|
// Validate that all fields are properly mapped |
|
|
// TODO: Add field mapping validation |
|
|
}} |
|
|
}} |
|
|
""" |
|
|
|
|
|
def handle_api_error(status_code: int, response_text: str) -> str: |
|
|
"""Handle API errors with appropriate user-friendly messages.""" |
|
|
if status_code == 401: |
|
|
return "Authentication failed. Please check API configuration." |
|
|
elif status_code == 429: |
|
|
return "Rate limit exceeded. Please try again later." |
|
|
elif status_code == 403: |
|
|
return "Access forbidden. Please check your permissions." |
|
|
elif status_code >= 500: |
|
|
return "Service temporarily unavailable. Please try again." |
|
|
else: |
|
|
return f"Request failed with status {status_code}" |