gitgud-ai / app /services /reviewer_service.py
CodeCommunity's picture
Update app/services/reviewer_service.py
1716602 verified
import json
import logging
import re
from typing import List, Dict
from app.core.model_loader import llm_engine
logger = logging.getLogger(__name__)
class AIReviewerService:
def __init__(self):
pass
def review_batch_code(self, files: list) -> list:
results = []
# Process in batches of 5 to stay within Gemini Free Tier limits (15 RPM)
batch_size = 5
for i in range(0, len(files), batch_size):
batch = files[i : i + batch_size]
combined_code = ""
file_names = []
for f in batch:
# Minify code to save tokens
raw_content = f.content or ""
# Limit to 6k chars per file to stay within context window safely
minified_content = self._minify_code(raw_content[:6000], f.fileName)
combined_code += f"\n--- FILE: {f.fileName} ---\n{minified_content}\n"
file_names.append(f.fileName)
# Updated prompt to explicitly request suggestions and improvements
prompt = f"""
Analyze the following {len(batch)} source code files for security vulnerabilities and code quality.
{combined_code}
Task:
1. Detect severe security/logic issues (Vulnerabilities).
2. If a vulnerability is found, provide a concise 'suggestion' on how to fix it.
3. If NO vulnerabilities are found in a file, provide a list of 'improvement_suggestions' (clean code, performance, or architecture tips).
4. Provide metrics for complexity and maintainability (scale 1-10).
Output a JSON array (exactly 1 object per file):
[
{{
"fileName": "exact/path/from/header",
"vulnerabilities": [
{{
"type": "SQLi/Logic/etc",
"line": 10,
"description": "Short explanation",
"suggestion": "Specific code fix"
}}
],
"improvement_suggestions": ["Tip 1", "Tip 2"],
"metrics": {{"complexity": 3, "maintainability": 8}}
}}
]
"""
try:
# 8k output tokens for the batch analysis
response_text = llm_engine.generate(prompt, max_tokens=8192)
batch_results = self._parse_json(response_text)
# Map results by fileName for easy lookup
processed_map = {item.get('fileName'): item for item in batch_results if isinstance(item, dict)}
for fn in file_names:
if fn in processed_map:
res = processed_map[fn]
# Ensure all required keys exist to prevent Android serialization errors
res.setdefault("vulnerabilities", [])
res.setdefault("improvement_suggestions", [])
res.setdefault("metrics", {"complexity": 5, "maintainability": 5})
# Ensure every vulnerability has a suggestion field
for vuln in res["vulnerabilities"]:
if "suggestion" not in vuln:
vuln["suggestion"] = "Review the implementation logic for improved safety."
results.append(res)
else:
# Fallback if the AI skipped a file in its response
results.append({
"fileName": fn,
"vulnerabilities": [],
"improvement_suggestions": ["No immediate improvements identified."],
"metrics": {"complexity": 1, "maintainability": 10}
})
except Exception as e:
logger.error(f"Batch processing error: {e}")
for fn in file_names:
results.append({
"fileName": fn,
"vulnerabilities": [],
"improvement_suggestions": [],
"metrics": {"complexity": 0, "maintainability": 0}
})
return results
def _minify_code(self, code: str, filename: str) -> str:
"""
Removes comments, empty lines, and logs to optimize token usage.
"""
lines = code.split('\n')
cleaned_lines = []
is_python = filename.endswith('.py')
is_js_style = filename.endswith(('.js', '.ts', '.jsx', '.tsx', '.java', '.kt', '.c', '.cpp'))
for line in lines:
stripped = line.strip()
if not stripped: continue
# Skip comments
if is_python and stripped.startswith('#'): continue
if is_js_style and stripped.startswith(('//', '/*', '*')): continue
# Skip common logging
if any(log in stripped for log in ['console.log', 'print(', 'logger.', 'Log.d', 'Log.e']):
continue
# Strip inline comments
if is_python and ' #' in line: line = line.split(' #', 1)[0]
if is_js_style and ' //' in line: line = line.split(' //', 1)[0]
if not line.strip(): continue
cleaned_lines.append(line.rstrip())
return '\n'.join(cleaned_lines)
def _parse_json(self, text: str):
"""
Cleans and parses the LLM response into a Python list/dict.
"""
text = text.strip()
if not text:
return []
# Clean markdown formatting if present
text = re.sub(r"^[^[]*\[", "[", text)
text = re.sub(r"\][^]]*$", "]", text)
text = text.replace("```json", "").replace("```", "").strip()
try:
return json.loads(text)
except json.JSONDecodeError as e:
logger.warning(f"JSON Decode Error: {e}")
return []
# Instantiate the service
service = AIReviewerService()