Duo-Guardian / src /providers.py
Daksh C Jain
feat: improve AI test generation, premium dashboard, and GitLab MR widgets
0ccacbd
import os
import json
import gitlab
from google.cloud import logging as gcp_logging
from google.cloud import errorreporting_v1beta1 as gcp_error
import google.generativeai as genai
class GitLabProvider:
def __init__(self, project_id, token):
self.gl = gitlab.Gitlab("https://gitlab.com", private_token=token)
self.project = self.gl.projects.get(project_id)
def fetch_mr_bundle(self, mr_iid):
"""Gather MR details, diffs, and related history in one go."""
try:
mr = self.project.mergerequests.get(mr_iid)
changes = mr.changes()
# Smart filtering: ignore noise to save LLM context window
ignored_exts = {'.jpeg', '.jpg', '.png', '.gif', '.svg', '.bin', '.exe', '.dll', '.so', '.iso', '.zip', '.tar', '.gz'}
ignored_files = {'package-lock.json', 'yarn.lock', 'pnpm-lock.yaml', 'poetry.lock', 'Pipfile.lock', 'Cargo.lock'}
diff_summary = ""
for change in changes.get('changes', []):
new_path = change.get('new_path', '')
# Check ignores
ext = os.path.splitext(new_path)[1].lower()
if ext in ignored_exts or os.path.basename(new_path) in ignored_files:
print(f"[!] Ignoring noisy file for LLM context: {new_path}")
continue
# Include the full diff for valid files
diff_summary += f"File: {new_path}\nDiff:\n{change.get('diff', '')}\n\n"
return {
"title": mr.title,
"description": mr.description,
"labels": mr.labels,
"diff": diff_summary,
"author": mr.author["name"]
}
except Exception as e:
print(f"GitLab Error: {e}")
return None
def post_comment(self, mr_iid, body):
"""Post a structured comment to the MR."""
mr = self.project.mergerequests.get(mr_iid)
mr.notes.create({"body": body})
def fetch_job_logs(self):
"""Fetch logs from the last failed job in the pipeline."""
try:
pipelines = self.project.pipelines.list(order_by="id", sort="desc", limit=1)
if pipelines:
jobs = pipelines[0].jobs.list(scope="failed")
if jobs:
log_data = jobs[0].trace()
return log_data.decode('utf-8')[-2000:] # Last 2k chars
return "No failed jobs found."
except Exception as e:
return f"Log Fetch Error: {e}"
def update_mr_metadata(self, mr_iid, labels=None, description_prefix=None):
"""Update MR labels and prepend a summary to the description."""
import re
try:
mr = self.project.mergerequests.get(mr_iid)
if labels:
mr.labels = list(set(labels)) # prevent duplicates
if description_prefix:
desc = mr.description or ""
# 1. Remove ANY block wrapped in CB_START/CB_END
desc = re.sub(r"<!-- CB_START -->.*?<!-- CB_END -->\n*", "", desc, flags=re.DOTALL)
# 2. Cleanup Legacy/Hallucinated Summaries (more generic)
# Catch anything starting with the brain emoji or "Context Brain Summary"
# This handles different formatting variations
desc = re.sub(r"(> )?\*\*?🧠\s*Context Brain Summary\*\*?.*?\n(> - \*\*.*?\*\*: .*?\n?)*", "", desc, flags=re.IGNORECASE | re.DOTALL)
desc = re.sub(r"🧠\s*Context Brain Summary:.*?\n", "", desc, flags=re.IGNORECASE)
# 3. Final Trim and wrap in the standardized block
mr.description = f"<!-- CB_START -->\n{description_prefix}\n<!-- CB_END -->\n\n{desc.strip()}"
mr.save()
print("Successfully updated MR metadata.")
return True
except Exception as e:
print(f"Metadata Update Error: {e}")
return False
def create_inline_suggestion(self, mr_iid, file_path, line, suggestion_code, discussion_body="Context Brain Suggestion:"):
"""Create a GitLab discussion thread with a code suggestion block."""
try:
mr = self.project.mergerequests.get(mr_iid)
diff_refs = mr.diff_refs
mr.discussions.create({
'body': f"{discussion_body}\n\n```suggestion\n{suggestion_code}\n```",
'position': {
'base_sha': diff_refs.get('base_sha'),
'start_sha': diff_refs.get('start_sha'),
'head_sha': diff_refs.get('head_sha'),
'position_type': 'text',
'new_path': file_path,
'new_line': line
}
})
return True
except Exception as e:
print(f"Failed to create inline suggestion: {e}")
return False
def approve_mr(self, mr_iid):
"""Automatically approve the Merge Request."""
try:
mr = self.project.mergerequests.get(mr_iid)
mr.approve()
return True
except Exception as e:
print(f"Failed to approve MR: {e}")
return False
def commit_test_file(self, mr_iid, file_path, content, commit_message="test: add AI generated unit tests"):
"""Automatically commit the passing test file to the user's branch."""
try:
mr = self.project.mergerequests.get(mr_iid)
branch = mr.source_branch
data = {
'branch': branch,
'commit_message': commit_message,
'actions': [{'action': 'create', 'file_path': file_path, 'content': content}]
}
try:
self.project.commits.create(data)
print(f" -> 📦 Successfully pushed '{file_path}' to '{branch}'!")
except Exception as e:
# Fallback to updating if file natively already exists from prior AI pipeline run
if 'already exists' in str(e).lower() or 'exists' in str(e).lower():
data['actions'][0]['action'] = 'update'
self.project.commits.create(data)
print(f" -> 📦 Successfully updated '{file_path}' on '{branch}'!")
else: raise e
return True
except Exception as e:
print(f"Failed to auto-commit test file: {e}")
return False
class GCPProvider:
def __init__(self, project_id):
self.project_id = project_id
self.enabled = False
if project_id and os.getenv("GOOGLE_APPLICATION_CREDENTIALS"):
try:
self.logging_client = gcp_logging.Client(project=project_id)
self.error_client = gcp_error.ErrorStatsServiceClient()
self.enabled = True
except Exception as e:
print(f"GCP Init Error: {e}")
def get_context(self):
"""Fetch production signals if enabled."""
if not self.enabled:
return "GCP Context: Not connected or credentials missing."
try:
# 1. Fetch Errors
time_range = gcp_error.QueryTimeRange(period=gcp_error.QueryTimeRange.Period.PERIOD_1_DAY)
request = gcp_error.ListGroupStatsRequest(project_name=f"projects/{self.project_id}", time_range=time_range)
errors = self.error_client.list_group_stats(request=request)
error_list = [f"- {s.group.representative_issue.title} (Seen {s.count} times)" for s in errors]
return "--- Recent GCP Production Errors ---\n" + ("\n".join(error_list[:3]) if error_list else "No recent errors found.")
except Exception as e:
return f"GCP Error: {e}"
class GeminiBrain:
def __init__(self, api_key):
genai.configure(api_key=api_key)
self.model = genai.GenerativeModel("gemini-2.5-flash")
def synthesize(self, gitlab_data, gcp_data, log_data=""):
prompt = f"""
Act as a Principal Engineer reviewing a GitLab Merge Request.
MR DATA:
{json.dumps(gitlab_data, indent=2)}
PRODUCTION SIGNALS:
{gcp_data}
CI LOG DATA (if any):
{log_data}
TASK:
1. Generate a highly concise 'Context Brain' intelligence report in Markdown (max 3 bullet points).
2. Identify specific line-level issues for 'Code Quality'. IMPORTANT: Only suggest issues for lines that were explicitly ADDED or MODIFIED in the provided 'Diff' (lines starting with '+'). Do not suggest issues on untouched lines.
3. Suggest automated LABELS (e.g., brain::risk-high).
4. Generate a 1-sentence SUMMARY to be added to the MR description.
OUTPUT FORMAT (Return strictly as JSON matching this structure):
{{
"report_markdown": "Full markdown report string",
"code_quality": [
{{
"file": "path/to/file.ext",
"line": 10,
"description": "Issue description",
"severity": "major",
"suggestion": "optimized code replacement without markdown formatting"
}}
],
"metadata": {{
"labels": ["brain::label1"],
"summary": "One sentence summary"
}}
}}
"""
# Ask the model to return strict JSON (prompt already instructs this)
response = self.model.generate_content(prompt)
return response.text
def parse_response(self, text):
"""Extract all structured data from the LLM output using strict JSON parsing."""
try:
# Strip markdown code fences if Gemini wrapped the JSON
clean = text.strip()
if clean.startswith("```"):
clean = clean.split("\n", 1)[-1] # remove first fence line
clean = clean.rsplit("```", 1)[0] # remove closing fence
clean = clean.strip()
data = json.loads(clean)
report = data.get("report_markdown", "")
code_quality = data.get("code_quality", [])
metadata = data.get("metadata", {})
return report, code_quality, metadata
except json.JSONDecodeError as e:
print(f"❌ LLM Output JSON Parse Error: {e}")
print(f"Raw Output: {text[:500]}...")
return "", [], {}
def estimate_cost(self, mr_data):
"""Analyze code for FinOps cost impact."""
prompt = f"""
You are a FinOps AI. Look at this Gitlab MR diff:
{json.dumps(mr_data)}
Estimate if these changes increase or decrease cloud infrastructure costs (e.g., added DB calls, enlarged images, inefficient loops).
Respond in 1 to 2 exact sentences. State the impact clearly.
"""
response = self.model.generate_content(prompt)
return response.text.strip()
def review_devops(self, mr_data):
"""Analyze code for DevOps / SRE impact."""
prompt = f"""
You are an SRE / DevOps AI architect. Analyze this MR diff:
{json.dumps(mr_data)}
Focus ONLY on infrastructure (Docker, CI/CD, K8s, Terraform, pipelines) or production reliability (missing logs, timeouts).
Summarize any optimizations or risks in precisely 1 to 2 sentences. If no infrastructure changes or risks exist, reply exactly with: "No infrastructure or CI/CD changes detected."
"""
response = self.model.generate_content(prompt)
return response.text.strip()
def generate_tests(self, gitlab_data):
prompt = f"""
You are a QA engineer. Your ONLY job is to write simple, working pytest tests.
MR DATA:
{json.dumps(gitlab_data, indent=2)}
STRICT RULES - follow every single one or the tests will fail:
RULE 1 - ONLY TEST PURE PYTHON FUNCTIONS.
Only write tests for functions that contain plain Python logic (math, string manipulation, conditionals, loops).
If a function imports or uses: databases, cloud SDKs (google, boto3, azure), HTTP clients, file I/O, or any external service (even if it's just 'json' or 'requests') — DO NOT WRITE A TEST FOR IT. Skip it entirely.
RULE 2 - NO MOCKING ALLOWED.
NEVER use `unittest.mock`, `pytest-mock`, `@patch`, `patch(...)`, `MagicMock`, `mock.patch`, or `autospec`.
If you think you need to mock something, it violates Rule 1. Skip that function. Do not try to be clever with mocks.
RULE 3 - SIMPLE IMPORTS ONLY.
Only write: `import pytest` at the top. Then directly import only the specific function under test.
Example: `from module_name import function_name`
Never import an entire module.
RULE 4 - NO DECORATORS.
Do not use any decorators except `@pytest.mark.parametrize` if needed.
NEVER use `@patch` or any mock-related decorators.
RULE 5 - FOCUS ON FUNCTIONALITY.
Test the business logic with various inputs (edge cases, typical values).
NEGATIVE CONSTRAINTS (DO NOT DO THESE):
- DO NOT write tests for functions that interact with the GitLab API.
- DO NOT write tests for functions that interact with GCP.
- DO NOT write tests for functions that read/write files.
- DO NOT use the `json` module in your tests if it requires patching.
- DO NOT explain your work.
- DO NOT include markdown backticks in the response.
RULE 6 - RETURN ONLY RAW PYTHON CODE. No markdown, no backticks, no explanations.
RULE 7 - If there are NO pure Python functions to test (only config, YAML, Dockerfiles, cloud code), return exactly: NO_TESTS_NEEDED
"""
response = self.model.generate_content(prompt)
text = response.text.strip()
# Strip markdown fences if model ignores Rule 6
if text.startswith("```"):
text = text.split("\n", 1)[-1]
text = text.rsplit("```", 1)[0].strip()
return text