Bhanumani12 commited on
Commit
692050b
·
verified ·
1 Parent(s): 34f2d02

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +209 -87
app.py CHANGED
@@ -1,10 +1,14 @@
1
  import os
2
- import gradio as gr
 
 
 
3
  from datetime import datetime
4
- from transformers import pipeline, AutoTokenizer, AutoModelForSequenceClassification
 
 
5
  from simple_salesforce import Salesforce, SalesforceLogin
6
  from dotenv import load_dotenv
7
- import xml.etree.ElementTree as ET
8
 
9
  # ---------- Load Environment Variables ----------
10
  load_dotenv()
@@ -12,10 +16,47 @@ SF_USERNAME = os.getenv("SF_USERNAME")
12
  SF_PASSWORD = os.getenv("SF_PASSWORD")
13
  SF_SECURITY_TOKEN = os.getenv("SF_SECURITY_TOKEN")
14
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
15
  # ---------- Logging ----------
16
  def log_to_console(data, log_type):
17
  timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
18
- print(f"[{timestamp}] {log_type}: {data}")
19
 
20
  # ---------- Salesforce Connection ----------
21
  try:
@@ -30,60 +71,87 @@ except Exception as e:
30
  sf = None
31
  print(f"❌ Failed to connect to Salesforce: {e}")
32
 
33
- # ---------- Load Hugging Face Models ----------
34
- print("⏳ Loading Hugging Face models...")
35
-
36
- # CodeBERT for code analysis
37
- codebert_tokenizer = AutoTokenizer.from_pretrained("microsoft/codebert-base")
38
- codebert_model = AutoModelForSequenceClassification.from_pretrained(
39
- "microsoft/codebert-base", num_labels=4
40
- )
41
- codebert_pipeline = pipeline("text-classification", model=codebert_model, tokenizer=codebert_tokenizer)
42
-
43
- # Flan-T5 for Q&A / metadata
44
- qa_pipeline = pipeline("text2text-generation", model="google/flan-t5-large")
45
-
46
- print("✅ Models loaded")
47
-
48
- # ---------- Label Mapping ----------
49
- label_to_issue_type = {
50
- 0: "Performance",
51
- 1: "Error",
52
- 2: "Security",
53
- 3: "Best Practice"
54
- }
55
-
56
- suggestions = {
57
- "Performance": "Optimize loops and reduce SOQL queries inside loops.",
58
- "Error": "Add error handling (try-catch) and null checks for safer execution.",
59
- "Security": "Avoid dynamic SOQL; use bind variables to prevent SOQL injection.",
60
- "Best Practice": "Refactor for readability, use bulk-safe patterns."
61
- }
62
-
63
- severities = {
64
- "Performance": "Medium",
65
- "Error": "High",
66
- "Security": "High",
67
- "Best Practice": "Low"
68
- }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
69
 
70
  # ---------- Code Analyzer ----------
71
  def analyze_code(code):
72
- if not code.strip():
73
  return "No code provided.", "", ""
74
 
75
- result = codebert_pipeline(code[:512]) # truncate to prevent model overload
76
- label_id = int(result[0]["label"].replace("LABEL_", "")) if "LABEL_" in result[0]["label"] else 0
77
-
78
- issue_type = label_to_issue_type[label_id]
79
- suggestion = suggestions[issue_type]
80
- severity = severities[issue_type]
81
 
82
  review_data = {
83
  "Name": f"Review_{issue_type}",
84
  "CodeSnippet__c": code,
85
  "IssueType__c": issue_type,
86
- "Suggestion__c": suggestion,
87
  "Severity__c": severity
88
  }
89
 
@@ -94,34 +162,75 @@ def analyze_code(code):
94
  result = sf.CodeReviewResult__c.create(review_data)
95
  if result.get("success"):
96
  log_to_console({"Salesforce Record ID": result["id"]}, "Salesforce Create")
 
 
97
  except Exception as e:
98
  log_to_console({"Salesforce Exception": str(e)}, "Salesforce Error")
 
 
99
 
100
- return issue_type, suggestion, severity
101
 
102
  # ---------- Metadata Validator ----------
103
  def validate_metadata(metadata, admin_id=None):
104
- if not metadata.strip():
105
  return "No metadata provided.", "", ""
106
 
107
- mtype, issue, recommendation = "Field", "", ""
 
 
108
 
109
  try:
110
  root = ET.fromstring(metadata)
111
- description_found = any(elem.tag.endswith('description') for elem in root)
112
-
113
- if not description_found:
114
- issue = "Missing description"
115
- recommendation = "Add a <description> tag to improve clarity."
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
116
  else:
117
- # Use AI for deeper suggestions
118
- response = qa_pipeline(
119
- f"Review this Salesforce metadata and suggest improvements:\n{metadata}",
120
- max_new_tokens=60
121
- )
122
- issue = "Potential optimization"
123
- recommendation = response[0]["generated_text"].strip()
124
  except Exception as e:
 
125
  issue = "Invalid XML"
126
  recommendation = f"Could not parse metadata XML. Error: {str(e)}"
127
 
@@ -132,6 +241,7 @@ def validate_metadata(metadata, admin_id=None):
132
  "Recommendation__c": recommendation,
133
  "Status__c": "Open"
134
  }
 
135
  if admin_id:
136
  log_data["Admin__c"] = admin_id
137
 
@@ -141,54 +251,64 @@ def validate_metadata(metadata, admin_id=None):
141
  try:
142
  result = sf.MetadataAuditLog__c.create(log_data)
143
  if result.get("success"):
144
- log_to_console({"Salesforce MetadataAuditLog ID": result["id"]}, "Salesforce Create")
 
 
145
  except Exception as e:
146
  log_to_console({"Salesforce Exception": str(e)}, "Salesforce Error")
 
 
147
 
148
  return mtype, issue, recommendation
149
 
150
- # ---------- Salesforce Chatbot ----------
151
  conversation_history = []
152
 
153
  def salesforce_chatbot(query, history=[]):
154
  global conversation_history
155
- if not query.strip():
156
  return "Please provide a valid Salesforce-related question."
157
 
158
  salesforce_keywords = [
159
- "apex", "soql", "trigger", "lwc", "visualforce", "salesforce",
160
- "governor limits", "dml", "metadata", "batch apex", "queueable"
161
  ]
162
- if not any(k in query.lower() for k in salesforce_keywords):
 
163
  return "Please ask a Salesforce-related question."
164
 
165
  history_summary = "\n".join([f"User: {q}\nAssistant: {a}" for q, a in conversation_history[-4:]])
166
 
167
- prompt = f"""
168
- You are a certified Salesforce architect. Always provide accurate, production-safe answers
169
- with examples, governor limits, and Trailhead references when possible.
170
-
171
- Conversation history:
172
- {history_summary}
173
 
174
- User: {query}
175
- Assistant:
176
- """
177
  try:
178
- result = qa_pipeline(prompt, max_new_tokens=256, do_sample=False, temperature=0.1)
179
- output = result[0]["generated_text"].strip()
180
- conversation_history.append((query, output))
 
 
 
 
 
 
 
181
  conversation_history = conversation_history[-6:]
182
- return output
 
183
  except Exception as e:
184
- return f"⚠️ Error: {str(e)}"
185
 
186
  # ---------- Gradio UI ----------
187
  with gr.Blocks(theme=gr.themes.Soft()) as demo:
188
- gr.Markdown("# 🤖 Salesforce AI Code Review & Metadata Auditor")
189
 
190
  with gr.Tab("Code Review"):
191
- code_input = gr.Textbox(label="Apex / LWC Code", lines=8)
192
  issue_type = gr.Textbox(label="Issue Type")
193
  suggestion = gr.Textbox(label="AI Suggestion")
194
  severity = gr.Textbox(label="Severity")
@@ -196,7 +316,7 @@ with gr.Blocks(theme=gr.themes.Soft()) as demo:
196
  code_button.click(analyze_code, inputs=code_input, outputs=[issue_type, suggestion, severity])
197
 
198
  with gr.Tab("Metadata Validation"):
199
- metadata_input = gr.Textbox(label="Metadata XML", lines=8)
200
  mtype = gr.Textbox(label="Type")
201
  issue = gr.Textbox(label="Issue")
202
  recommendation = gr.Textbox(label="Recommendation")
@@ -205,13 +325,15 @@ with gr.Blocks(theme=gr.themes.Soft()) as demo:
205
 
206
  with gr.Tab("Salesforce Chatbot"):
207
  chatbot_output = gr.Chatbot(label="Conversation History", height=400)
208
- query_input = gr.Textbox(label="Your Question", placeholder="e.g., How many DML operations are allowed?")
209
  with gr.Row():
210
  chatbot_button = gr.Button("Ask")
211
  clear_button = gr.Button("Clear Chat")
212
  chat_state = gr.State(value=[])
213
 
214
  def update_chatbot(query, chat_history):
 
 
215
  response = salesforce_chatbot(query, chat_history)
216
  chat_history.append((query, response))
217
  return chat_history, ""
 
1
  import os
2
+ import re
3
+ import json
4
+ import random
5
+ import xml.etree.ElementTree as ET
6
  from datetime import datetime
7
+
8
+ import gradio as gr
9
+ from transformers import pipeline
10
  from simple_salesforce import Salesforce, SalesforceLogin
11
  from dotenv import load_dotenv
 
12
 
13
  # ---------- Load Environment Variables ----------
14
  load_dotenv()
 
16
  SF_PASSWORD = os.getenv("SF_PASSWORD")
17
  SF_SECURITY_TOKEN = os.getenv("SF_SECURITY_TOKEN")
18
 
19
+ # ---------- Label Mapping (kept; now used as fallback) ----------
20
+ label_to_issue_type = {
21
+ "LABEL_0": "Performance",
22
+ "LABEL_1": "Error",
23
+ "LABEL_2": "Security",
24
+ "LABEL_3": "Best Practice"
25
+ }
26
+
27
+ suggestions = {
28
+ "Performance": "Consider optimizing loops and database access. Use collections to reduce SOQL/DML calls, avoid SOQL/DML inside loops, and add selective WHERE clauses.",
29
+ "Error": "Add proper error handling and null checks. Wrap DML in try/catch and use Database methods for partial success.",
30
+ "Security": "Avoid dynamic SOQL. Use bind variables, withSharing, and field-level security checks where applicable.",
31
+ "Best Practice": "Refactor for readability and bulk-safety (Batchable/Queueable where needed). Limit fields and records in queries."
32
+ }
33
+
34
+ severities = {
35
+ "Performance": "Medium",
36
+ "Error": "High",
37
+ "Security": "High",
38
+ "Best Practice": "Low"
39
+ }
40
+
41
+ # ---------- Hugging Face Models (Hugging Face only, per BRD/SDD) ----------
42
+ # Lightweight BLOOMZ for natural language support
43
+ try:
44
+ nlp_pipeline = pipeline("text-generation", model="bigscience/bloomz-560m")
45
+ except Exception as e:
46
+ nlp_pipeline = None
47
+ print(f"⚠️ Could not load BLOOMZ model: {e}")
48
+
49
+ # Optional: simple classifier (kept minimal; not strictly required)
50
+ try:
51
+ clf_pipeline = pipeline("text-classification", model="microsoft/codebert-base")
52
+ except Exception as e:
53
+ clf_pipeline = None
54
+ print(f"⚠️ Could not load CodeBERT classifier: {e}")
55
+
56
  # ---------- Logging ----------
57
  def log_to_console(data, log_type):
58
  timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
59
+ print(f"[{timestamp}] {log_type} Log: {data}")
60
 
61
  # ---------- Salesforce Connection ----------
62
  try:
 
71
  sf = None
72
  print(f"❌ Failed to connect to Salesforce: {e}")
73
 
74
+ # ---------- Heuristic Rules for Apex/LWC ----------
75
+ SOQL_PATTERN = re.compile(r"\b(?:Database\.query|SELECT\s+[\s\S]+?FROM\b)", re.IGNORECASE)
76
+ DML_PATTERN = re.compile(r"\b(insert|update|upsert|delete|undelete|merge)\b", re.IGNORECASE)
77
+ LOOP_PATTERN = re.compile(r"\b(for\s*\(|while\s*\()", re.IGNORECASE)
78
+ DEBUG_PATTERN = re.compile(r"\bSystem\.debug\s*\(", re.IGNORECASE)
79
+ DYNAMIC_SOQL_PATTERN = re.compile(r"['\"].*SELECT.*FROM.*['\"]\s*\+\s*", re.IGNORECASE)
80
+ UNBOUNDED_QUERY_PATTERN = re.compile(r"SELECT\s+\*\s+FROM", re.IGNORECASE) # LWC/JS cases
81
+ NULL_GUARD_PATTERN = re.compile(r"\b(\w+)\.(\w+)\(", re.IGNORECASE) # very rough
82
+
83
+ def analyze_code_rules(code: str):
84
+ issues = []
85
+
86
+ # SOQL/DML inside loops
87
+ for loop in LOOP_PATTERN.finditer(code):
88
+ loop_block = code[loop.start(): loop.start()+400] # shallow lookahead
89
+ if SOQL_PATTERN.search(loop_block):
90
+ issues.append(("Performance", "SOQL query inside a loop detected. Move query outside the loop or use collections."))
91
+ if DML_PATTERN.search(loop_block):
92
+ issues.append(("Performance", "DML operation inside a loop detected. Bulkify by collecting records and performing DML once."))
93
+
94
+ # Dynamic SOQL
95
+ if DYNAMIC_SOQL_PATTERN.search(code):
96
+ issues.append(("Security", "Dynamic SOQL concatenation detected. Use bind variables to prevent injection."))
97
+
98
+ # Excessive debug statements
99
+ dbg_count = len(DEBUG_PATTERN.findall(code))
100
+ if dbg_count > 2:
101
+ issues.append(("Best Practice", f"Found {dbg_count} System.debug statements. Remove or gate them for production."))
102
+
103
+ # Unbounded queries (JS/LWC anti-patterns)
104
+ if UNBOUNDED_QUERY_PATTERN.search(code):
105
+ issues.append(("Performance", "Unbounded SELECT * detected. Query only required fields."))
106
+
107
+ # (Very) rough null guard hint
108
+ # Suggest using null-checks where chained dereferences are visible
109
+ dot_calls = len(NULL_GUARD_PATTERN.findall(code))
110
+ if dot_calls > 15:
111
+ issues.append(("Error", "Multiple chained calls detected. Ensure null checks and guard clauses to avoid NullPointerExceptions."))
112
+
113
+ # If classifier is available, add its hint as a final tag
114
+ if clf_pipeline:
115
+ try:
116
+ pred = clf_pipeline(code[:1000])[0] # keep it small
117
+ mapped = label_to_issue_type.get(pred.get("label"), "Best Practice")
118
+ issues.append((mapped, f"Model hint: {mapped} issue likely. Confidence ~{pred.get('score', 0):.2f}"))
119
+ except Exception:
120
+ pass
121
+
122
+ # Deduplicate by message
123
+ seen = set()
124
+ deduped = []
125
+ for t, msg in issues:
126
+ if msg not in seen:
127
+ seen.add(msg)
128
+ deduped.append((t, msg))
129
+ return deduped
130
+
131
+ def pick_primary(issues):
132
+ # Priority: Security/Error > Performance > Best Practice
133
+ prio = {"Security": 3, "Error": 3, "Performance": 2, "Best Practice": 1}
134
+ if not issues:
135
+ return ("Best Practice", suggestions["Best Practice"], severities["Best Practice"])
136
+ issues_sorted = sorted(issues, key=lambda x: prio.get(x[0], 0), reverse=True)
137
+ top_type = issues_sorted[0][0]
138
+ # Merge messages into one suggestion
139
+ merged = "; ".join(msg for _, msg in issues_sorted[:3])
140
+ return (top_type, merged or suggestions[top_type], severities[top_type])
141
 
142
  # ---------- Code Analyzer ----------
143
  def analyze_code(code):
144
+ if not code or not code.strip():
145
  return "No code provided.", "", ""
146
 
147
+ issues = analyze_code_rules(code)
148
+ issue_type, suggestion_text, severity = pick_primary(issues)
 
 
 
 
149
 
150
  review_data = {
151
  "Name": f"Review_{issue_type}",
152
  "CodeSnippet__c": code,
153
  "IssueType__c": issue_type,
154
+ "Suggestion__c": suggestion_text,
155
  "Severity__c": severity
156
  }
157
 
 
162
  result = sf.CodeReviewResult__c.create(review_data)
163
  if result.get("success"):
164
  log_to_console({"Salesforce Record ID": result["id"]}, "Salesforce Create")
165
+ else:
166
+ log_to_console(result, "Salesforce Error")
167
  except Exception as e:
168
  log_to_console({"Salesforce Exception": str(e)}, "Salesforce Error")
169
+ else:
170
+ log_to_console("Salesforce not connected.", "Salesforce Error")
171
 
172
+ return issue_type, suggestion_text, severity
173
 
174
  # ---------- Metadata Validator ----------
175
  def validate_metadata(metadata, admin_id=None):
176
+ if not metadata or not metadata.strip():
177
  return "No metadata provided.", "", ""
178
 
179
+ mtype = "Object"
180
+ issue = "No issues detected."
181
+ recommendation = "Looks good."
182
 
183
  try:
184
  root = ET.fromstring(metadata)
185
+ # 1) Description present?
186
+ has_description = any(elem.tag.lower().endswith('description') and (elem.text or '').strip() for elem in root.iter())
187
+ # 2) Duplicate <fullName> or field names?
188
+ names = []
189
+ duplicates = set()
190
+ for elem in root.iter():
191
+ tag = elem.tag.lower()
192
+ if tag.endswith('fullname') or tag.endswith('name'):
193
+ if elem.text:
194
+ val = elem.text.strip()
195
+ if val in names:
196
+ duplicates.add(val)
197
+ names.append(val)
198
+ # 3) Fields missing helpText/description
199
+ missing_help = []
200
+ for f in root.iter():
201
+ if f.tag.lower().endswith('fields'):
202
+ # look for nested field fullName
203
+ fname = None
204
+ fdesc = None
205
+ fhelp = None
206
+ for ch in f:
207
+ t = ch.tag.lower()
208
+ if t.endswith('fullname') and ch.text:
209
+ fname = ch.text.strip()
210
+ if t.endswith('description') and ch.text:
211
+ fdesc = ch.text.strip()
212
+ if t.endswith('helptext') and ch.text:
213
+ fhelp = ch.text.strip()
214
+ if fname and not (fdesc or fhelp):
215
+ missing_help.append(fname)
216
+
217
+ problems = []
218
+ if not has_description:
219
+ problems.append("Missing <description> on the object/metadata.")
220
+ if duplicates:
221
+ problems.append(f"Duplicate names detected: {', '.join(sorted(list(duplicates)))}.")
222
+ if missing_help:
223
+ problems.append(f"Fields missing description/helpText: {', '.join(missing_help[:10])}" + ("..." if len(missing_help) > 10 else ""))
224
+
225
+ if problems:
226
+ issue = " | ".join(problems)
227
+ recommendation = "Add descriptions/helpText; remove duplicates; follow naming standards."
228
  else:
229
+ issue = "No high-severity issues detected."
230
+ recommendation = "Consider adding descriptions and reviewing picklists for inactive values."
231
+
 
 
 
 
232
  except Exception as e:
233
+ mtype = "Unknown"
234
  issue = "Invalid XML"
235
  recommendation = f"Could not parse metadata XML. Error: {str(e)}"
236
 
 
241
  "Recommendation__c": recommendation,
242
  "Status__c": "Open"
243
  }
244
+
245
  if admin_id:
246
  log_data["Admin__c"] = admin_id
247
 
 
251
  try:
252
  result = sf.MetadataAuditLog__c.create(log_data)
253
  if result.get("success"):
254
+ log_to_console({"Salesforce MetadataAuditLog Record ID": result["id"]}, "Salesforce Create")
255
+ else:
256
+ log_to_console(result, "Salesforce Metadata Error")
257
  except Exception as e:
258
  log_to_console({"Salesforce Exception": str(e)}, "Salesforce Error")
259
+ else:
260
+ log_to_console("Salesforce not connected.", "Salesforce Error")
261
 
262
  return mtype, issue, recommendation
263
 
264
+ # ---------- Salesforce Chatbot (BLOOMZ) ----------
265
  conversation_history = []
266
 
267
  def salesforce_chatbot(query, history=[]):
268
  global conversation_history
269
+ if not query or not query.strip():
270
  return "Please provide a valid Salesforce-related question."
271
 
272
  salesforce_keywords = [
273
+ "apex", "soql", "trigger", "lwc", "aura", "visualforce", "salesforce", "governor limits",
274
+ "dml", "metadata", "batch apex", "queueable", "future method", "api", "sfdc", "heap", "limits"
275
  ]
276
+
277
+ if not any(keyword.lower() in query.lower() for keyword in salesforce_keywords):
278
  return "Please ask a Salesforce-related question."
279
 
280
  history_summary = "\n".join([f"User: {q}\nAssistant: {a}" for q, a in conversation_history[-4:]])
281
 
282
+ system_prompt = (
283
+ "You are a certified Salesforce developer and architect. Answer with correct, production-safe guidance. "
284
+ "When relevant, mention governor limits (e.g., 100 SOQL queries per transaction, 150 DML statements). "
285
+ "Use bullets or code snippets. Prefer bulk-safe patterns and official docs."
286
+ )
287
+ prompt = f"{system_prompt}\n\nConversation History:\n{history_summary}\n\nUser: {query.strip()}\nAssistant:"
288
 
 
 
 
289
  try:
290
+ if nlp_pipeline:
291
+ out = nlp_pipeline(prompt, max_new_tokens=220, do_sample=False)[0]["generated_text"].strip()
292
+ else:
293
+ out = "Governor limits matter (e.g., 100 SOQL queries/tx, 150 DML). Use bulk patterns, selective queries, and proper error handling."
294
+
295
+ # Keep answer reasonable length
296
+ if len(out.split()) < 15:
297
+ out += "\n\nTip: Use Database.insert with allOrNone=false for partial success and check Limits class."
298
+
299
+ conversation_history.append((query, out))
300
  conversation_history = conversation_history[-6:]
301
+ log_to_console({"Question": query, "Answer": out}, "Chatbot Query")
302
+ return out
303
  except Exception as e:
304
+ return f"⚠️ Error generating response: {str(e)}"
305
 
306
  # ---------- Gradio UI ----------
307
  with gr.Blocks(theme=gr.themes.Soft()) as demo:
308
+ gr.Markdown("# 🤖 Advanced Salesforce AI Code Review & Chatbot")
309
 
310
  with gr.Tab("Code Review"):
311
+ code_input = gr.Textbox(label="Apex / LWC Code", lines=8, placeholder="Enter your Apex or LWC code here")
312
  issue_type = gr.Textbox(label="Issue Type")
313
  suggestion = gr.Textbox(label="AI Suggestion")
314
  severity = gr.Textbox(label="Severity")
 
316
  code_button.click(analyze_code, inputs=code_input, outputs=[issue_type, suggestion, severity])
317
 
318
  with gr.Tab("Metadata Validation"):
319
+ metadata_input = gr.Textbox(label="Metadata XML", lines=8, placeholder="Enter your metadata XML here")
320
  mtype = gr.Textbox(label="Type")
321
  issue = gr.Textbox(label="Issue")
322
  recommendation = gr.Textbox(label="Recommendation")
 
325
 
326
  with gr.Tab("Salesforce Chatbot"):
327
  chatbot_output = gr.Chatbot(label="Conversation History", height=400)
328
+ query_input = gr.Textbox(label="Your Question", placeholder="e.g., How many DML operations are allowed in Apex?")
329
  with gr.Row():
330
  chatbot_button = gr.Button("Ask")
331
  clear_button = gr.Button("Clear Chat")
332
  chat_state = gr.State(value=[])
333
 
334
  def update_chatbot(query, chat_history):
335
+ if not query.strip():
336
+ return chat_history, "Please enter a valid question."
337
  response = salesforce_chatbot(query, chat_history)
338
  chat_history.append((query, response))
339
  return chat_history, ""