Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import gitlab | |
| from google.cloud import logging as gcp_logging | |
| from google.cloud import errorreporting_v1beta1 as gcp_error | |
| import google.generativeai as genai | |
| class GitLabProvider: | |
| def __init__(self, project_id, token): | |
| self.gl = gitlab.Gitlab("https://gitlab.com", private_token=token) | |
| self.project = self.gl.projects.get(project_id) | |
| def fetch_mr_bundle(self, mr_iid): | |
| """Gather MR details, diffs, and related history in one go.""" | |
| try: | |
| mr = self.project.mergerequests.get(mr_iid) | |
| changes = mr.changes() | |
| # Smart filtering: ignore noise to save LLM context window | |
| ignored_exts = {'.jpeg', '.jpg', '.png', '.gif', '.svg', '.bin', '.exe', '.dll', '.so', '.iso', '.zip', '.tar', '.gz'} | |
| ignored_files = {'package-lock.json', 'yarn.lock', 'pnpm-lock.yaml', 'poetry.lock', 'Pipfile.lock', 'Cargo.lock'} | |
| diff_summary = "" | |
| for change in changes.get('changes', []): | |
| new_path = change.get('new_path', '') | |
| # Check ignores | |
| ext = os.path.splitext(new_path)[1].lower() | |
| if ext in ignored_exts or os.path.basename(new_path) in ignored_files: | |
| print(f"[!] Ignoring noisy file for LLM context: {new_path}") | |
| continue | |
| # Include the full diff for valid files | |
| diff_summary += f"File: {new_path}\nDiff:\n{change.get('diff', '')}\n\n" | |
| return { | |
| "title": mr.title, | |
| "description": mr.description, | |
| "labels": mr.labels, | |
| "diff": diff_summary, | |
| "author": mr.author["name"] | |
| } | |
| except Exception as e: | |
| print(f"GitLab Error: {e}") | |
| return None | |
| def post_comment(self, mr_iid, body): | |
| """Post a structured comment to the MR.""" | |
| mr = self.project.mergerequests.get(mr_iid) | |
| mr.notes.create({"body": body}) | |
| def fetch_job_logs(self): | |
| """Fetch logs from the last failed job in the pipeline.""" | |
| try: | |
| pipelines = self.project.pipelines.list(order_by="id", sort="desc", limit=1) | |
| if pipelines: | |
| jobs = pipelines[0].jobs.list(scope="failed") | |
| if jobs: | |
| log_data = jobs[0].trace() | |
| return log_data.decode('utf-8')[-2000:] # Last 2k chars | |
| return "No failed jobs found." | |
| except Exception as e: | |
| return f"Log Fetch Error: {e}" | |
| def update_mr_metadata(self, mr_iid, labels=None, description_prefix=None): | |
| """Update MR labels and prepend a summary to the description.""" | |
| import re | |
| try: | |
| mr = self.project.mergerequests.get(mr_iid) | |
| if labels: | |
| mr.labels = list(set(labels)) # prevent duplicates | |
| if description_prefix: | |
| desc = mr.description or "" | |
| # 1. Remove ANY block wrapped in CB_START/CB_END | |
| desc = re.sub(r"<!-- CB_START -->.*?<!-- CB_END -->\n*", "", desc, flags=re.DOTALL) | |
| # 2. Cleanup Legacy/Hallucinated Summaries (more generic) | |
| # Catch anything starting with the brain emoji or "Context Brain Summary" | |
| # This handles different formatting variations | |
| desc = re.sub(r"(> )?\*\*?🧠\s*Context Brain Summary\*\*?.*?\n(> - \*\*.*?\*\*: .*?\n?)*", "", desc, flags=re.IGNORECASE | re.DOTALL) | |
| desc = re.sub(r"🧠\s*Context Brain Summary:.*?\n", "", desc, flags=re.IGNORECASE) | |
| # 3. Final Trim and wrap in the standardized block | |
| mr.description = f"<!-- CB_START -->\n{description_prefix}\n<!-- CB_END -->\n\n{desc.strip()}" | |
| mr.save() | |
| print("Successfully updated MR metadata.") | |
| return True | |
| except Exception as e: | |
| print(f"Metadata Update Error: {e}") | |
| return False | |
| def create_inline_suggestion(self, mr_iid, file_path, line, suggestion_code, discussion_body="Context Brain Suggestion:"): | |
| """Create a GitLab discussion thread with a code suggestion block.""" | |
| try: | |
| mr = self.project.mergerequests.get(mr_iid) | |
| diff_refs = mr.diff_refs | |
| mr.discussions.create({ | |
| 'body': f"{discussion_body}\n\n```suggestion\n{suggestion_code}\n```", | |
| 'position': { | |
| 'base_sha': diff_refs.get('base_sha'), | |
| 'start_sha': diff_refs.get('start_sha'), | |
| 'head_sha': diff_refs.get('head_sha'), | |
| 'position_type': 'text', | |
| 'new_path': file_path, | |
| 'new_line': line | |
| } | |
| }) | |
| return True | |
| except Exception as e: | |
| print(f"Failed to create inline suggestion: {e}") | |
| return False | |
| def approve_mr(self, mr_iid): | |
| """Automatically approve the Merge Request.""" | |
| try: | |
| mr = self.project.mergerequests.get(mr_iid) | |
| mr.approve() | |
| return True | |
| except Exception as e: | |
| print(f"Failed to approve MR: {e}") | |
| return False | |
| def commit_test_file(self, mr_iid, file_path, content, commit_message="test: add AI generated unit tests"): | |
| """Automatically commit the passing test file to the user's branch.""" | |
| try: | |
| mr = self.project.mergerequests.get(mr_iid) | |
| branch = mr.source_branch | |
| data = { | |
| 'branch': branch, | |
| 'commit_message': commit_message, | |
| 'actions': [{'action': 'create', 'file_path': file_path, 'content': content}] | |
| } | |
| try: | |
| self.project.commits.create(data) | |
| print(f" -> 📦 Successfully pushed '{file_path}' to '{branch}'!") | |
| except Exception as e: | |
| # Fallback to updating if file natively already exists from prior AI pipeline run | |
| if 'already exists' in str(e).lower() or 'exists' in str(e).lower(): | |
| data['actions'][0]['action'] = 'update' | |
| self.project.commits.create(data) | |
| print(f" -> 📦 Successfully updated '{file_path}' on '{branch}'!") | |
| else: raise e | |
| return True | |
| except Exception as e: | |
| print(f"Failed to auto-commit test file: {e}") | |
| return False | |
| class GCPProvider: | |
| def __init__(self, project_id): | |
| self.project_id = project_id | |
| self.enabled = False | |
| if project_id and os.getenv("GOOGLE_APPLICATION_CREDENTIALS"): | |
| try: | |
| self.logging_client = gcp_logging.Client(project=project_id) | |
| self.error_client = gcp_error.ErrorStatsServiceClient() | |
| self.enabled = True | |
| except Exception as e: | |
| print(f"GCP Init Error: {e}") | |
| def get_context(self): | |
| """Fetch production signals if enabled.""" | |
| if not self.enabled: | |
| return "GCP Context: Not connected or credentials missing." | |
| try: | |
| # 1. Fetch Errors | |
| time_range = gcp_error.QueryTimeRange(period=gcp_error.QueryTimeRange.Period.PERIOD_1_DAY) | |
| request = gcp_error.ListGroupStatsRequest(project_name=f"projects/{self.project_id}", time_range=time_range) | |
| errors = self.error_client.list_group_stats(request=request) | |
| error_list = [f"- {s.group.representative_issue.title} (Seen {s.count} times)" for s in errors] | |
| return "--- Recent GCP Production Errors ---\n" + ("\n".join(error_list[:3]) if error_list else "No recent errors found.") | |
| except Exception as e: | |
| return f"GCP Error: {e}" | |
| class GeminiBrain: | |
| def __init__(self, api_key): | |
| genai.configure(api_key=api_key) | |
| self.model = genai.GenerativeModel("gemini-2.5-flash") | |
| def synthesize(self, gitlab_data, gcp_data, log_data=""): | |
| prompt = f""" | |
| Act as a Principal Engineer reviewing a GitLab Merge Request. | |
| MR DATA: | |
| {json.dumps(gitlab_data, indent=2)} | |
| PRODUCTION SIGNALS: | |
| {gcp_data} | |
| CI LOG DATA (if any): | |
| {log_data} | |
| TASK: | |
| 1. Generate a highly concise 'Context Brain' intelligence report in Markdown (max 3 bullet points). | |
| 2. Identify specific line-level issues for 'Code Quality'. IMPORTANT: Only suggest issues for lines that were explicitly ADDED or MODIFIED in the provided 'Diff' (lines starting with '+'). Do not suggest issues on untouched lines. | |
| 3. Suggest automated LABELS (e.g., brain::risk-high). | |
| 4. Generate a 1-sentence SUMMARY to be added to the MR description. | |
| OUTPUT FORMAT (Return strictly as JSON matching this structure): | |
| {{ | |
| "report_markdown": "Full markdown report string", | |
| "code_quality": [ | |
| {{ | |
| "file": "path/to/file.ext", | |
| "line": 10, | |
| "description": "Issue description", | |
| "severity": "major", | |
| "suggestion": "optimized code replacement without markdown formatting" | |
| }} | |
| ], | |
| "metadata": {{ | |
| "labels": ["brain::label1"], | |
| "summary": "One sentence summary" | |
| }} | |
| }} | |
| """ | |
| # Ask the model to return strict JSON (prompt already instructs this) | |
| response = self.model.generate_content(prompt) | |
| return response.text | |
| def parse_response(self, text): | |
| """Extract all structured data from the LLM output using strict JSON parsing.""" | |
| try: | |
| # Strip markdown code fences if Gemini wrapped the JSON | |
| clean = text.strip() | |
| if clean.startswith("```"): | |
| clean = clean.split("\n", 1)[-1] # remove first fence line | |
| clean = clean.rsplit("```", 1)[0] # remove closing fence | |
| clean = clean.strip() | |
| data = json.loads(clean) | |
| report = data.get("report_markdown", "") | |
| code_quality = data.get("code_quality", []) | |
| metadata = data.get("metadata", {}) | |
| return report, code_quality, metadata | |
| except json.JSONDecodeError as e: | |
| print(f"❌ LLM Output JSON Parse Error: {e}") | |
| print(f"Raw Output: {text[:500]}...") | |
| return "", [], {} | |
| def estimate_cost(self, mr_data): | |
| """Analyze code for FinOps cost impact.""" | |
| prompt = f""" | |
| You are a FinOps AI. Look at this Gitlab MR diff: | |
| {json.dumps(mr_data)} | |
| Estimate if these changes increase or decrease cloud infrastructure costs (e.g., added DB calls, enlarged images, inefficient loops). | |
| Respond in 1 to 2 exact sentences. State the impact clearly. | |
| """ | |
| response = self.model.generate_content(prompt) | |
| return response.text.strip() | |
| def review_devops(self, mr_data): | |
| """Analyze code for DevOps / SRE impact.""" | |
| prompt = f""" | |
| You are an SRE / DevOps AI architect. Analyze this MR diff: | |
| {json.dumps(mr_data)} | |
| Focus ONLY on infrastructure (Docker, CI/CD, K8s, Terraform, pipelines) or production reliability (missing logs, timeouts). | |
| Summarize any optimizations or risks in precisely 1 to 2 sentences. If no infrastructure changes or risks exist, reply exactly with: "No infrastructure or CI/CD changes detected." | |
| """ | |
| response = self.model.generate_content(prompt) | |
| return response.text.strip() | |
| def generate_tests(self, gitlab_data): | |
| prompt = f""" | |
| You are a QA engineer. Your ONLY job is to write simple, working pytest tests. | |
| MR DATA: | |
| {json.dumps(gitlab_data, indent=2)} | |
| STRICT RULES - follow every single one or the tests will fail: | |
| RULE 1 - ONLY TEST PURE PYTHON FUNCTIONS. | |
| Only write tests for functions that contain plain Python logic (math, string manipulation, conditionals, loops). | |
| If a function imports or uses: databases, cloud SDKs (google, boto3, azure), HTTP clients, file I/O, or any external service (even if it's just 'json' or 'requests') — DO NOT WRITE A TEST FOR IT. Skip it entirely. | |
| RULE 2 - NO MOCKING ALLOWED. | |
| NEVER use `unittest.mock`, `pytest-mock`, `@patch`, `patch(...)`, `MagicMock`, `mock.patch`, or `autospec`. | |
| If you think you need to mock something, it violates Rule 1. Skip that function. Do not try to be clever with mocks. | |
| RULE 3 - SIMPLE IMPORTS ONLY. | |
| Only write: `import pytest` at the top. Then directly import only the specific function under test. | |
| Example: `from module_name import function_name` | |
| Never import an entire module. | |
| RULE 4 - NO DECORATORS. | |
| Do not use any decorators except `@pytest.mark.parametrize` if needed. | |
| NEVER use `@patch` or any mock-related decorators. | |
| RULE 5 - FOCUS ON FUNCTIONALITY. | |
| Test the business logic with various inputs (edge cases, typical values). | |
| NEGATIVE CONSTRAINTS (DO NOT DO THESE): | |
| - DO NOT write tests for functions that interact with the GitLab API. | |
| - DO NOT write tests for functions that interact with GCP. | |
| - DO NOT write tests for functions that read/write files. | |
| - DO NOT use the `json` module in your tests if it requires patching. | |
| - DO NOT explain your work. | |
| - DO NOT include markdown backticks in the response. | |
| RULE 6 - RETURN ONLY RAW PYTHON CODE. No markdown, no backticks, no explanations. | |
| RULE 7 - If there are NO pure Python functions to test (only config, YAML, Dockerfiles, cloud code), return exactly: NO_TESTS_NEEDED | |
| """ | |
| response = self.model.generate_content(prompt) | |
| text = response.text.strip() | |
| # Strip markdown fences if model ignores Rule 6 | |
| if text.startswith("```"): | |
| text = text.split("\n", 1)[-1] | |
| text = text.rsplit("```", 1)[0].strip() | |
| return text | |