Spaces:
Paused
Paused
| from typing import Dict, Any, Optional | |
| from pydantic import BaseModel, Field | |
| from uuid import uuid4 | |
| import os | |
| from typing import Dict, Any, List, Optional, Tuple # Ensure Tuple is imported | |
| from github import Github, PullRequest | |
| from github.GithubException import GithubException, UnknownObjectException | |
| import requests # Make sure requests is imported for patch_url | |
| from dotenv import load_dotenv | |
| class PRReviewState(BaseModel): | |
| # GitHub PR Information (Mandatory) | |
| pr_id: int | |
| repo_name: str # e.g., "owner/repo" | |
| # GitHub PR Information (Now Optional) | |
| diff_url: Optional[str] = None | |
| pr_title: Optional[str] = None | |
| pr_author: Optional[str] = None | |
| # review_run_id: str = Field(default_factory=lambda: str(uuid.uuid4())) # Optional: Unique ID for this specific review run | |
| # Code Content (Now Optional) | |
| code_diff: Optional[str] = None # The fetched raw diff content | |
| file_contents: Dict[str, str] = {} # Map of filename to full content for context (already has default) | |
| # LLM Review Outputs (Already Optional) | |
| llm_markdown_review: Optional[str] = None # The raw Markdown output from the LLM (e.g., from generate_code_review_markdown) | |
| parsed_llm_review_data: Optional[Dict[str, Any]] = None # Structured dict from parsing the Markdown (e.g., from parse_llm_review_markdown) | |
| # Human-in-the-Loop (Simplified for Phase 1) (Already Optional or has default) | |
| require_human_approval: bool = False # Config flag, set at graph initialization | |
| human_approval_status: Optional[bool] = None # True if approved, False if rejected | |
| human_feedback_message: Optional[str] = None # Any message from human rejection | |
| # System Status (Now Optional or has default) | |
| review_status: str = "initiated" # e.g., "initiated", "fetching_code", "code_retrieved", "generating_llm_review", "llm_review_generated", "parsing_llm_review", "review_parsed", "awaiting_human_approval", "posting_review", "posted", "rejected", "failed" | |
| last_error: Optional[str] = None # Stores the last encountered error message | |
| # error_traceback: Optional[str] = None # Optional: For more detailed error debugging | |
| review_id: Optional[int] = None # this is the pull-request-review-id (pending -> approved/Discarded based on HIL) (Now Optional) | |
| review_comment_url: Optional[str] = None # URL of the main posted GitHub review comment | |
| ''' | |
| from kaggle_secrets import UserSecretsClient | |
| user_secrets = UserSecretsClient() | |
| git_hub_token = user_secrets.get_secret("GITHUB_token_ID") | |
| google_api_key = user_secrets.get_secret("GOOGLE_API_KEY") | |
| ''' | |
| google_api_key = os.getenv("GOOGLE_API_KEY") | |
| if not google_api_key: | |
| print("Google API key not found in environment variables.") | |
| print(f"Using Google API key: {google_api_key[:4]}... (truncated for security)") | |
| git_hub_token = os.getenv("GITHUB_token_ID") | |
| if not git_hub_token: | |
| print("git_hub_token not found in environment variables.") | |
| print(f"git_hub_token : {git_hub_token[:4]}... (truncated for security)") | |
| # For local testing, you might need to load dotenv if your environment variables | |
| # are managed via a .env file. In a deployed environment, they would likely be | |
| # set directly. | |
| # Only load dotenv if it's not already loaded (e.g., in __main__ or a test setup) | |
| # This prevents redundant loading in production or if your main script handles it. | |
| if not os.getenv("GITHUB_TOKEN"): # Only load if token not already set | |
| load_dotenv() # Load environment variables from .env file | |
| # Assuming 'git_hub_token' is defined globally or passed in a larger context | |
| # If git_hub_token is expected to be a global variable, ensure it's imported or declared. | |
| # For better practice, pass it as an argument or rely solely on os.getenv. | |
| # Let's adjust to purely rely on os.getenv for this function. | |
| # github_token = os.getenv("GITHUB_TOKEN") # Moved inside function for safety | |
| def fetch_pr_code_changes(repo_name: str, pr_id: int) -> Tuple[Optional[str], Optional[Dict[str, str]], Optional[str], Optional[str]]: | |
| """ | |
| Fetches the raw diff content, the full contents of changed files, | |
| and the head commit SHA for a given PR. | |
| Args: | |
| repo_name (str): The full name of the repository (e.g., "octocat/Spoon-Knife"). | |
| pr_id (int): The ID of the Pull Request. | |
| Returns: | |
| Tuple[Optional[str], Optional[Dict[str, str]], Optional[str], Optional[str]]: | |
| - raw_diff_content (str or None): The raw diff content of the PR. | |
| - file_contents (Dict[str, str] or None): Dictionary mapping filename to its full content (after changes). | |
| - head_commit_sha (str or None): The SHA of the head commit of the PR. | |
| - error_message (str or None): An error message if something went wrong. | |
| """ | |
| github_token = os.getenv("GITHUB_token_ID") | |
| #github_token = git_hub_token | |
| if not github_token: | |
| print("Error: GITHUB_TOKEN environment variable not set.") | |
| return None, None, None, "GitHub token not found in environment variables." | |
| try: | |
| g = Github(github_token) | |
| repo = g.get_repo(repo_name) | |
| pull_request = repo.get_pull(pr_id) | |
| # --- NEW: Get the head commit SHA --- | |
| head_commit_sha = pull_request.head.sha | |
| print(f"Fetched PR {pr_id} head commit SHA: {head_commit_sha}") | |
| # 1. Fetch raw diff content (patch) | |
| # Using requests directly for patch_url is good as PyGithub's get_patch() can sometimes be rate-limited differently | |
| patch_url = pull_request.patch_url | |
| headers = {"Authorization": f"token {github_token}"} | |
| raw_diff_content = requests.get(patch_url, headers=headers).text | |
| # 2. Fetch full content of changed files | |
| file_contents: Dict[str, str] = {} | |
| for file in pull_request.get_files(): | |
| # Skip files that were deleted, as their content cannot be retrieved from the current head. | |
| if file.status == 'deleted': | |
| file_contents[file.filename] = "[FILE DELETED]" | |
| continue | |
| try: | |
| # We want the content *after* the change, which is from the PR's head branch. | |
| # PyGithub's get_contents should be called with `ref` set to `pull_request.head.ref` | |
| # or `pull_request.head.sha` for explicit content at the PR's head. | |
| # Using pull_request.head.sha is more robust as ref might change. | |
| file_content_obj = repo.get_contents(file.filename, ref=pull_request.head.sha) | |
| if isinstance(file_content_obj, list): | |
| print(f"Warning: '{file.filename}' is a directory or multiple files, skipping content retrieval for now.") | |
| file_contents[file.filename] = "[DIRECTORY OR MULTIPLE FILES]" | |
| continue | |
| file_contents[file.filename] = file_content_obj.decoded_content.decode('utf-8') | |
| except GithubException as e: | |
| print(f"Warning: GitHub API error fetching content for {file.filename} (PR {pr_id}, Repo {repo_name}): {e.status} - {e.data.get('message', 'No message')}") | |
| file_contents[file.filename] = f"[ERROR: Could not fetch content. Status: {e.status}, Message: {e.data.get('message', 'No message')}]" | |
| except Exception as e: | |
| print(f"Unexpected error fetching content for {file.filename} (PR {pr_id}, Repo {repo_name}): {e}") | |
| file_contents[file.filename] = f"[ERROR: Unexpected error fetching content: {e}]" | |
| # Return the new head_commit_sha along with existing returns | |
| return raw_diff_content, file_contents, head_commit_sha, None # No error message if successful | |
| except UnknownObjectException as e: | |
| error_msg = f"GitHub object not found (repo or PR): {e.data.get('message', 'No message')}" | |
| print(f"Error in fetch_pr_code_changes: {error_msg}") | |
| return None, None, None, error_msg | |
| except GithubException as e: | |
| error_msg = f"GitHub API error for PR {pr_id} from {repo_name}: {e.status} - {e.data.get('message', 'No message')}" | |
| print(f"Error in fetch_pr_code_changes: {error_msg}") | |
| return None, None, None, error_msg | |
| except Exception as e: | |
| error_msg = f"An unexpected error occurred while fetching PR {pr_id} from {repo_name}: {e}" | |
| print(f"Error in fetch_pr_code_changes: {error_msg}") | |
| return None, None, None, error_msg | |
| def code_retriever_node(state:PRReviewState): | |
| repo_name = state.repo_name | |
| pull_req_id = state.pr_id | |
| print(f"repo_name :{repo_name}-------- pull_req_id:{pull_req_id}") | |
| diff, contents,head_commit_sha, error = fetch_pr_code_changes(repo_name, pull_req_id) | |
| # Don't forget to return an updated state, as nodes in LangGraph should always do | |
| # For this simple example, we'll just return a copy with an updated status | |
| updated_state = state.model_copy(update={ | |
| "review_status": "code_retrieved", # Update status after retrieval logic | |
| "code_diff": diff, | |
| "file_contents": contents | |
| }) | |
| return updated_state | |
| import os | |
| from typing import Dict, Any | |
| from langchain_core.prompts import ChatPromptTemplate | |
| # Ensure you have your LLM provider installed, e.g., pip install langchain-google-genai | |
| from langchain_google_genai import ChatGoogleGenerativeAI # Using Gemini as per your preference | |
| # Initialize your LLM. Make sure your GOOGLE_API_KEY is set in environment variables. | |
| # You can also configure other models like "gemini-1.5-flash" or "gemini-1.5-pro" | |
| llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash", temperature=0.0, api_key=google_api_key) # Lower temperature for more deterministic output | |
| def generate_code_review_markdown(code_diff: str, file_contents: Dict[str, str]) -> str: | |
| """ | |
| Generates a detailed, human-readable code review in Markdown format from the LLM. | |
| The prompt is designed to elicit structured Markdown output that can then be | |
| parsed for GitHub PR comments, grouped by file and function. | |
| Args: | |
| code_diff (str): The string representation of the code diff. | |
| file_contents (Dict[str, str]): A dictionary where keys are file paths | |
| and values are their full content. | |
| Returns: | |
| str: A Markdown string representing the code review. | |
| """ | |
| # Prepare full contents context | |
| full_contents_str = "" | |
| if file_contents: | |
| for filename, content in file_contents.items(): | |
| # Add a clear separator and Markdown code block for each file | |
| full_contents_str += f"--- Full Content of {filename} ---\n```python\n{content}\n```\n\n" | |
| else: | |
| full_contents_str = "No full file contents provided for additional context." | |
| # Construct the Prompt Template | |
| prompt = ChatPromptTemplate.from_messages( | |
| [ | |
| ("system", | |
| "You are an expert Senior Software Engineer and a meticulous code reviewer.\n" | |
| "Your task is to review the provided code changes in a Pull Request.\n" | |
| "Analyze the `code_diff` for potential bugs, performance issues, security vulnerabilities, code style violations, maintainability concerns, and missing tests or documentation.\n" | |
| "Refer to the `full_file_contents` for additional context if the diff alone is insufficient to understand the changes or their implications.\n" | |
| "Provide a comprehensive, actionable, and constructive review.\n" | |
| "Format your review clearly using Markdown. Structure it with the following top-level sections:\n" | |
| "1. **Overall Impression:** A brief summary of the PR's purpose and overall quality.\n" | |
| "2. **Specific Observations and Suggestions:** Detailed feedback, grouped by file.\n" | |
| " - Within each file's section, group related comments, ideally by function or logical block.\n" | |
| " - For each observation/suggestion, include relevant line numbers from the *new* file for context (e.g., 'Line X-Y:').\n" | |
| "3. **Potential Issues and Edge Cases:** Discuss any missed scenarios or potential problems.\n" | |
| "4. **Security Implications:** Highlight any security concerns.\n" | |
| "5. **Adherence to Best Practices (PEP 8):** Comment on style and best practice compliance.\n" | |
| "6. **Performance Considerations:** Discuss performance aspects.\n" | |
| "7. **Unit Testing Suggestions:** Recommend additional tests.\n" | |
| "8. **Docstring/Comment Improvements:** Suggest documentation enhancements.\n" | |
| "9. **Clarity and Conciseness:** Feedback on code readability.\n" | |
| "10. **Summary:** A concise conclusion and recommended action (e.g., 'Approve', 'Request Changes', 'Comment').\n\n" | |
| "For code suggestions, use GitHub's Markdown code block with 'suggestion' annotation, like this:\n" | |
| "```suggestion\n" | |
| "your_suggested_code_here\n" | |
| "```\n" | |
| "Ensure file paths are correctly formatted (e.g., `src/utils/data_processor.py`)." | |
| ), | |
| ("human", | |
| "Here are the code changes (diff):\n" | |
| "```diff\n" | |
| "{code_diff}\n" | |
| "```\n\n" | |
| "Here are the full contents of the changed files (for additional context, use only if necessary to understand the diff):\n" | |
| "{full_contents_context}\n\n" | |
| "Please provide your structured code review in Markdown." | |
| ), | |
| ] | |
| ) | |
| # Create the Chain | |
| review_chain = prompt | llm | |
| # Invoke the Chain | |
| try: | |
| review_markdown = review_chain.invoke({ | |
| "code_diff": code_diff, | |
| "full_contents_context": full_contents_str | |
| }).content # Access the content attribute for Chat model output | |
| return review_markdown | |
| except Exception as e: | |
| print(f"Error generating code review: {e}") | |
| return f"Error: Could not generate code review. {e}\n\n" \ | |
| f"Please check the LLM API call or token limits." | |
| def code_reviewer_node(state:PRReviewState): | |
| code_diff = state.code_diff | |
| file_contents = state.file_contents | |
| review_markdown = generate_code_review_markdown(code_diff, file_contents) | |
| # Don't forget to return an updated state, as nodes in LangGraph should always do | |
| # For this simple example, we'll just return a copy with an updated status | |
| updated_state = state.model_copy(update={ | |
| "review_status": "code_reviewed", # Update status after retrieval logic | |
| "llm_markdown_review":review_markdown, | |
| }) | |
| return updated_state | |
| import re | |
| from typing import List, Dict, Tuple, Optional, Any | |
| class ParsedComment: | |
| """ | |
| Represents a single parsed comment from the LLM's review, | |
| intended for grouping by file/function. | |
| """ | |
| def __init__(self, message: str, suggestion: Optional[str] = None): | |
| self.message = message | |
| self.suggestion = suggestion | |
| def __repr__(self): | |
| return f"ParsedComment(msg='{self.message[:50]}...', has_suggestion={self.suggestion is not None})" | |
| class ParsedReviewSection: | |
| """ | |
| Represents a categorized section of the review, e.g., 'Potential Issues'. | |
| """ | |
| def __init__(self, title: str, content: str): | |
| self.title = title | |
| self.content = content | |
| def __repr__(self): | |
| return f"ParsedReviewSection(title='{self.title}', content='{self.content[:50]}...')" | |
| # Helper to extract suggestion block and clean message | |
| def _extract_suggestion(text: str) -> Tuple[Optional[str], str]: | |
| """Helper to extract suggestion block and clean message.""" | |
| suggestion_match = re.search(r"```suggestion\n([\s\S]*?)\n```", text, re.MULTILINE) | |
| suggestion_code = suggestion_match.group(1).strip() if suggestion_match else None | |
| # Remove suggestion from the main message | |
| cleaned_message = re.sub(r"```suggestion[\s\S]*?```", "", text).strip() | |
| return suggestion_code, cleaned_message | |
| def _parse_bullet_comments(text_block: str) -> List[ParsedComment]: | |
| """Helper to parse bullet-point comments from a given text block.""" | |
| comments = [] | |
| # FIX: Updated regex for bullet comments | |
| # Now matches from a bullet point until the start of the next bullet point or end of the text block. | |
| # This handles multi-line comments and embedded suggestion blocks more robustly. | |
| comment_matches = re.finditer(r"(^ *[-*]\s*[\s\S]*?)(?=\n *[-*]\s*|\Z)", text_block, re.MULTILINE | re.DOTALL) | |
| for cm in comment_matches: | |
| full_comment_text = cm.group(1).strip() | |
| if full_comment_text: | |
| suggestion_code, cleaned_message = _extract_suggestion(full_comment_text) | |
| comments.append(ParsedComment(message=cleaned_message, suggestion=suggestion_code)) | |
| return comments | |
| def parse_llm_review_markdown(markdown_review: str) -> Dict[str, Any]: | |
| """ | |
| Parses the LLM-generated Markdown review into a structured dictionary. | |
| It extracts the overall summary, file-specific/function-specific comments, | |
| and other general review sections. | |
| Args: | |
| markdown_review (str): The full Markdown string generated by the LLM. | |
| Returns: | |
| Dict[str, Any]: A dictionary containing structured review data: | |
| - 'overall_impression': str | |
| - 'file_comments': Dict[str, Dict[str, List[ParsedComment]]] | |
| (file_path -> function_name -> List[ParsedComment]) | |
| - 'general_sections': List[ParsedReviewSection] | |
| - 'summary': str | |
| - 'approval_status': str (extracted from summary, if present) | |
| """ | |
| structured_review: Dict[str, Any] = { | |
| 'overall_impression': '', | |
| 'file_comments': {}, | |
| 'general_sections': [], | |
| 'summary': '', | |
| 'approval_status': 'Comment' # Default status | |
| } | |
| # Helper to extract content between two headers. | |
| # Now more flexible: allows optional numbering and variable header level for top sections | |
| def extract_section_content(text: str, start_header_text: str, end_header_text: str) -> Optional[str]: | |
| # Pattern to match headers with optional numbering and flexible spacing | |
| start_pattern = r"^(?:##|###)\s*\d*\.?\s*" + re.escape(start_header_text) + r":\s*$" | |
| end_pattern = r"^(?:##|###)\s*\d*\.?\s*" + re.escape(end_header_text) + r":\s*$" | |
| # Use re.DOTALL to allow . to match newlines | |
| match = re.search(f"{start_pattern}([\\s\\S]*?)(?={end_pattern}|\\Z)", text, re.MULTILINE | re.DOTALL) | |
| if match: | |
| return match.group(1).strip() | |
| return None | |
| # --- 1. Extract Overall Impression --- | |
| overall_impression_content = extract_section_content(markdown_review, "Overall Impression", "Specific Observations and Suggestions") | |
| if overall_impression_content: | |
| structured_review['overall_impression'] = overall_impression_content | |
| # --- 2. Extract Specific Observations and Suggestions (File/Function Comments) --- | |
| specific_obs_section_content = extract_section_content(markdown_review, "Specific Observations and Suggestions", "Potential Issues and Edge Cases") | |
| # Debug prints for specific_obs_section_content (kept for verification) | |
| print(f"\n--- DEBUG: specific_obs_section_content (extracted from markdown_review) ---") | |
| if specific_obs_section_content is None: | |
| print("specific_obs_section_content is None") | |
| elif not specific_obs_section_content.strip(): | |
| print("specific_obs_section_content is empty or only whitespace") | |
| else: | |
| print(specific_obs_section_content[:500] + "..." if len(specific_obs_section_content) > 500 else specific_obs_section_content) | |
| print(f"--- END DEBUG: specific_obs_section_content ---\n") | |
| print(f"\n--- DEBUG: Raw specific_obs_section_content (using repr()):") | |
| if specific_obs_section_content is not None: | |
| print(repr(specific_obs_section_content)) | |
| print(f"Length of specific_obs_section_content: {len(specific_obs_section_content)}") | |
| print(f"Does it start with '### `data_processor.py`'? {specific_obs_section_content.startswith('### `data_processor.py`')}") | |
| starts_as_file_header = False | |
| if specific_obs_section_content.startswith('### `') or specific_obs_section_content.startswith('**File:'): | |
| starts_as_file_header = True | |
| print(f"Does it start with a common file header pattern? {starts_as_file_header}") | |
| else: | |
| print("specific_obs_section_content is None.") | |
| print(f"--- END DEBUG: Raw specific_obs_section_content ---\n") | |
| if specific_obs_section_content: | |
| # NEW STRATEGY FOR FILE BLOCK PARSING: | |
| # Step 1: Find all file header line matches first | |
| file_header_line_pattern = re.compile( | |
| r"^(?:\*\*File:\s*`?([\w\/\.\-_]+\.\w+)`?\*\*|###\s*`?([\w\/\.\-_]+\.\w+)`?)\s*$", | |
| re.MULTILINE | |
| ) | |
| header_matches = list(file_header_line_pattern.finditer(specific_obs_section_content)) | |
| print(f"--- DEBUG: Number of file_header_line_pattern matches found (New Strategy): {len(header_matches)} ---") | |
| if not header_matches: | |
| print("No file headers were found. Cannot parse file blocks.") | |
| pass | |
| else: | |
| # Step 2: Iterate through header matches and extract content blocks | |
| for i, header_match in enumerate(header_matches): | |
| file_name = (header_match.group(1) or header_match.group(2)).strip().replace('`', '') | |
| # Determine the start of the content block (after the header line) | |
| content_start_index = header_match.end() | |
| # Determine the end of the content block (start of next header or end of section content) | |
| content_end_index = len(specific_obs_section_content) | |
| if i + 1 < len(header_matches): | |
| content_end_index = header_matches[i+1].start() | |
| file_content_block = specific_obs_section_content[content_start_index:content_end_index].strip() | |
| print(f"\n--- DEBUG: Processing file (new strategy): {file_name} ---") | |
| print(f"File content block (first 200 chars):\n{file_content_block[:200]}..." if len(file_content_block) > 200 else file_content_block) | |
| if not file_name: continue | |
| structured_review['file_comments'][file_name] = {} | |
| general_comments_for_file: List[ParsedComment] = [] | |
| # Refined split to capture general file comments and specific function/section comments | |
| # Matches '#### Function: `func_name`' OR '#### Any other section title' | |
| sub_section_header_pattern = re.compile( | |
| r"^(####\s*(?:Function:\s*`?([\w_]+)`?|[\s\S]+?))\s*$", | |
| re.MULTILINE | |
| ) | |
| sub_section_matches_list = list(sub_section_header_pattern.finditer(file_content_block)) | |
| print(f"--- DEBUG: Number of sub-section (####) matches for {file_name}: {len(sub_section_matches_list)} ---") | |
| if not sub_section_matches_list: | |
| print(f"No '####' sub-sections were found in the block for {file_name}. All content will be general comments or missed.") | |
| if file_content_block.strip(): | |
| parsed_general_comments = _parse_bullet_comments(file_content_block.strip()) | |
| structured_review['file_comments'][file_name]["General_File_Comments"] = parsed_general_comments | |
| print(f" - DEBUG: Parsed {len(parsed_general_comments)} general comments for {file_name}.") | |
| continue | |
| # If sub-sections (#### headers) ARE found, process comments before the first sub-section header (these are file-level comments) | |
| first_match_start_index = sub_section_matches_list[0].start() | |
| pre_section_comments_content = file_content_block[:first_match_start_index].strip() | |
| if pre_section_comments_content: | |
| general_comments_for_file.extend(_parse_bullet_comments(pre_section_comments_content)) | |
| print(f" - DEBUG: Added {len(general_comments_for_file)} general comments (before first sub-section) for {file_name}.") | |
| # Process each sub-section | |
| for k, current_match in enumerate(sub_section_matches_list): | |
| section_header_raw = current_match.group(1).strip() | |
| func_name_from_group = current_match.group(2) | |
| section_title_key = "" | |
| if func_name_from_group: | |
| section_title_key = func_name_from_group.replace('`', '') | |
| else: | |
| section_title_key = section_header_raw[section_header_raw.find('####') + 4:].strip().replace('`', '') | |
| content_start_index = current_match.end() | |
| content_end_index = (sub_section_matches_list[k+1].start() | |
| if k + 1 < len(sub_section_matches_list) | |
| else len(file_content_block)) | |
| sub_section_content = file_content_block[content_start_index:content_end_index].strip() | |
| print(f" - DEBUG: Sub-section '{section_title_key}' content (first 100 chars): {sub_section_content[:100]}..." if len(sub_section_content) > 100 else sub_section_content) | |
| if sub_section_content: | |
| parsed_comments_for_section = _parse_bullet_comments(sub_section_content) | |
| structured_review['file_comments'][file_name][section_title_key] = parsed_comments_for_section | |
| print(f" - DEBUG: Parsed {len(parsed_comments_for_section)} comments for '{section_title_key}'.") | |
| else: | |
| structured_review['file_comments'][file_name][section_title_key] = [] | |
| print(f" - DEBUG: No content for sub-section '{section_title_key}'.") | |
| if general_comments_for_file: | |
| structured_review['file_comments'][file_name]["General_File_Comments"] = general_comments_for_file | |
| # --- 3. Extract General Sections --- | |
| general_section_headers = [ | |
| ("Potential Issues and Edge Cases", "Potential Issues and Edge Cases"), | |
| ("Security Implications", "Security Implications"), | |
| ("Adherence to Best Practices (PEP 8)", "Adherence to Best Practices (PEP 8)"), | |
| ("Performance Considerations", "Performance Considerations"), | |
| ("Unit Testing Suggestions", "Unit Testing Suggestions"), | |
| ("Docstring/Comment Improvements", "Docstring/Comment Improvements"), | |
| ("Clarity and Conciseness", "Clarity and Conciseness"), | |
| ("Summary", "Summary"), | |
| ] | |
| current_markdown_to_parse = markdown_review | |
| start_parsing_from_match = re.search(r"^##\s*\d*\.?\s*Potential Issues and Edge Cases:\s*$", current_markdown_to_parse, re.MULTILINE) | |
| if not start_parsing_from_match: | |
| specific_obs_end_idx = 0 | |
| specific_obs_match = re.search(r"^##\s*\d*\.?\s*Specific Observations and Suggestions:\s*([\s\S]*?)(?=^##\s*\d*\.?\s*[\w\s\(\)\/]+:|\Z)", current_markdown_to_parse, re.MULTILINE | re.DOTALL) | |
| if specific_obs_match: | |
| current_markdown_to_parse = current_markdown_to_parse[specific_obs_match.end():].strip() | |
| else: | |
| pass | |
| else: | |
| current_markdown_to_parse = current_markdown_to_parse[start_parsing_from_match.start():].strip() | |
| for i, (title, header_text) in enumerate(general_section_headers): | |
| current_header_pattern = r"^##\s*\d*\.?\s*" + re.escape(header_text) + r":\s*$" | |
| start_match = re.search(current_header_pattern, current_markdown_to_parse, re.MULTILINE) | |
| if not start_match: | |
| continue | |
| section_start_idx = start_match.end() | |
| section_end_idx = len(current_markdown_to_parse) | |
| if i + 1 < len(general_section_headers): | |
| next_header_text = general_section_headers[i+1][1] | |
| next_header_pattern = r"^##\s*\d*\.?\s*" + re.escape(next_header_text) + r":\s*$" | |
| next_match = re.search(next_header_pattern, current_markdown_to_parse[section_start_idx:], re.MULTILINE) | |
| if next_match: | |
| section_end_idx = section_start_idx + next_match.start() | |
| content_raw = current_markdown_to_parse[section_start_idx:section_end_idx].strip() | |
| if title == "Summary": | |
| structured_review['summary'] = content_raw | |
| structured_review['summary'] = re.sub(r'(`{3,})\s*$', '', structured_review['summary']).strip() | |
| approval_match = re.search(r"^\s*\*\*(?:Action|Recommended Action|Status):\*\*\s*(Approve|Request Changes|Comment|No action required)", structured_review['summary'], re.IGNORECASE | re.MULTILINE) | |
| if approval_match: | |
| structured_review['approval_status'] = approval_match.group(1).strip().replace(' ', '').capitalize() | |
| else: | |
| structured_review['approval_status'] = 'Comment' | |
| else: | |
| structured_review['general_sections'].append(ParsedReviewSection(title=title, content=content_raw)) | |
| current_markdown_to_parse = current_markdown_to_parse[section_end_idx:].strip() | |
| if not structured_review['summary']: | |
| summary_match = re.search(r"^##\s*\d*\.?\s*Summary:\s*([\s\S]*)$", markdown_review, re.MULTILINE | re.DOTALL) | |
| if summary_match: | |
| structured_review['summary'] = summary_match.group(1).strip() | |
| structured_review['summary'] = re.sub(r'(`{3,})\s*$', '', structured_review['summary']).strip() | |
| approval_match = re.search(r"^\s*\*\*(?:Action|Recommended Action|Status):\*\*\s*(Approve|Request Changes|Comment|No action required)", structured_review['summary'], re.IGNORECASE | re.MULTILINE) | |
| if approval_match: | |
| structured_review['approval_status'] = approval_match.group(1).strip().replace(' ', '').capitalize() | |
| else: | |
| structured_review['approval_status'] = 'Comment' | |
| else: | |
| structured_review['summary'] = "Automated review completed." | |
| return structured_review | |
| def feedback_formatter_node(state: PRReviewState): | |
| llm_markdown_review = state.llm_markdown_review | |
| parsed_llm_review_data = parse_llm_review_markdown(llm_markdown_review) | |
| # Don't forget to return an updated state, as nodes in LangGraph should always do | |
| # For this simple example, we'll just return a copy with an updated status | |
| updated_state = state.model_copy(update={ | |
| "review_status": "review_parsed" ,# Update status after retrieval logic | |
| "parsed_llm_review_data":parsed_llm_review_data, | |
| }) | |
| return updated_state | |
| from github import Github, PullRequest | |
| from github.GithubException import GithubException, UnknownObjectException | |
| from github.Commit import Commit # Import Commit type for clarity and correctness | |
| from typing import Dict, Any, List, Optional | |
| import os | |
| import re | |
| import logging | |
| # IMPORTANT: These classes should be imported from src.utils.markdown_parser | |
| # For standalone execution or if import paths are complex, ensure they are correctly defined or imported. | |
| class ParsedComment: | |
| def __init__(self, message: str, suggestion: Optional[str] = None): | |
| self.message = message | |
| self.suggestion = suggestion | |
| def __repr__(self): | |
| return f"ParsedComment(msg='{self.message[:50]}...', has_suggestion={self.suggestion is not None})" | |
| class ParsedReviewSection: | |
| def __init__(self, title: str, content: str): | |
| self.title = title | |
| self.content = content | |
| def __repr__(self): | |
| return f"ParsedReviewSection(title='{self.title}', content='{self.content[:50]}...')" | |
| # Configure logging (optional, but good practice) | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| def post_review_comments_on_github( | |
| repo_name: str, | |
| pr_id: int, | |
| parsed_review_data: Dict[str, Any], | |
| github_token: Optional[str] = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| Posts a structured code review to a GitHub Pull Request. | |
| Args: | |
| repo_name (str): The full name of the repository (e.g., "owner/repo"). | |
| pr_id (int): The Pull Request number. | |
| parsed_review_data (Dict[str, Any]): The structured review data | |
| as returned by parse_llm_review_markdown. | |
| github_token (str, optional): GitHub Personal Access Token. | |
| If None, tries to read from GITHUB_TOKEN env var. | |
| Returns: | |
| Dict[str, Any]: A dictionary containing details of the posted review, | |
| e.g., {'status': 'success', 'review_url': '...', 'main_comment_id': ...}. | |
| Raises an exception on failure. | |
| """ | |
| if github_token is None: | |
| github_token = os.getenv("GITHUB_token_ID") | |
| if github_token is None: | |
| logging.error("GitHub token not provided and GITHUB_TOKEN environment variable not set.") | |
| raise ValueError("GitHub token not provided and GITHUB_TOKEN environment variable not set.") | |
| try: | |
| g = Github(github_token) | |
| repo = g.get_repo(repo_name) | |
| pr = repo.get_pull(pr_id) | |
| logging.info(f"Connected to GitHub repo '{repo_name}', PR #{pr_id}.") | |
| # --- 1. Prepare the Main Review Body --- | |
| overall_impression = parsed_review_data.get('overall_impression', '') | |
| general_sections = parsed_review_data.get('general_sections', []) | |
| summary = parsed_review_data.get('summary', '') | |
| approval_status = parsed_review_data.get('approval_status', 'COMMENT').upper() | |
| main_review_body = f"### 🤖 Automated Code Review\n\n" | |
| if overall_impression.strip(): | |
| main_review_body += f"**Overall Impression:**\n{overall_impression}\n\n---\n\n" | |
| for section in general_sections: | |
| title_to_add = section.title | |
| content_to_add = section.content | |
| if content_to_add.strip(): | |
| main_review_body += f"### {title_to_add}\n{content_to_add}\n\n---\n\n" | |
| if summary.strip(): | |
| main_review_body += f"### Summary\n{summary}\n\n" | |
| main_review_body += f"**Recommended Action:** {approval_status}\n" | |
| github_event = "COMMENT" | |
| if approval_status == "APPROVE": | |
| github_event = "APPROVE" | |
| elif approval_status == "REQUEST CHANGES": | |
| github_event = "REQUEST_CHANGES" | |
| logging.info(f"Calculated GitHub review event: {github_event}") | |
| # --- 2. Prepare Line/File Comments --- | |
| github_comments = [] | |
| file_comments_data = parsed_review_data.get('file_comments', {}) | |
| head_commit_sha = pr.head.sha | |
| # FIX: Get the Commit object from the SHA | |
| pr_commit_obj = repo.get_commit(head_commit_sha) # <--- ADDED THIS LINE | |
| logging.info(f"Using head commit SHA: {head_commit_sha} (as Commit object)") | |
| if file_comments_data: | |
| logging.info(f"Preparing {len(file_comments_data)} file-specific comments.") | |
| for file_path, functions_data in file_comments_data.items(): | |
| consolidated_file_comment_body = f"### Review for `{file_path}`\n\n" | |
| sorted_func_names = sorted(functions_data.keys(), key=lambda x: (0 if x == "General_File_Comments" else 1, x)) | |
| for func_name in sorted_func_names: | |
| comments_for_func = functions_data[func_name] | |
| if not comments_for_func: | |
| continue | |
| if func_name != "General_File_Comments": | |
| consolidated_file_comment_body += f"#### ⚙️ Function: `{func_name}`\n\n" | |
| else: | |
| if len(sorted_func_names) > 1 or (len(sorted_func_names) == 1 and func_name == "General_File_Comments"): | |
| consolidated_file_comment_body += f"#### 📄 General File Comments\n\n" | |
| for comment in comments_for_func: | |
| consolidated_file_comment_body += f"{comment.message}\n" | |
| if comment.suggestion: | |
| consolidated_file_comment_body += f"\n```suggestion\n{comment.suggestion}\n```\n\n" | |
| consolidated_file_comment_body += "\n---\n\n" | |
| if consolidated_file_comment_body.strip() != f"### Review for `{file_path}`": | |
| github_comments.append({ | |
| "path": file_path, | |
| "position": 1, | |
| "body": consolidated_file_comment_body.strip(), | |
| }) | |
| # --- 3. Submit the Review --- | |
| # Pass the Commit object to the 'commit' parameter | |
| review = pr.create_review( | |
| commit=pr_commit_obj, # <--- CHANGED THIS LINE | |
| body=main_review_body, | |
| event=github_event, | |
| comments=github_comments | |
| ) | |
| logging.info(f"Successfully posted GitHub review. URL: {review.html_url}") | |
| return { | |
| 'status': 'success', | |
| 'review_url': review.html_url, | |
| 'review_id': review.id, | |
| 'main_comment_body': main_review_body | |
| } | |
| except UnknownObjectException as e: | |
| logging.error(f"GitHub object not found (repo or PR): {e}") | |
| raise ValueError(f"GitHub object not found (repo or PR): {e}") | |
| except GithubException as e: | |
| logging.error(f"GitHub API error: {e}") | |
| raise RuntimeError(f"GitHub API error: {e}") | |
| except Exception as e: | |
| logging.critical(f"An unexpected error occurred while posting review: {e}", exc_info=True) | |
| raise RuntimeError(f"An unexpected error occurred while posting review: {e}") | |
| def post_review_coments_on_github_node(state:PRReviewState): | |
| repo_name = state.repo_name | |
| pr_id = state.pr_id | |
| parsed_llm_review_data = state.parsed_llm_review_data | |
| result = post_review_comments_on_github(repo_name,pr_id,parsed_llm_review_data, git_hub_token) | |
| # Don't forget to return an updated state, as nodes in LangGraph should always do | |
| # For this simple example, we'll just return a copy with an updated status | |
| updated_state = state.model_copy(update={ | |
| "review_status": "posted", # Update status after retrieval logic | |
| "review_comment_url":result['review_url'], | |
| "review_id":result['review_id'], | |
| "last_error":result['status'] # change this field later | |
| }) | |
| return updated_state | |
| #from IPython.display import Image, display | |
| from langgraph.graph import StateGraph, START, END | |
| # Build graph | |
| builder = StateGraph(PRReviewState) | |
| builder.add_node("code_retriever_node", code_retriever_node) | |
| builder.add_node("code_reviewer_node", code_reviewer_node) | |
| builder.add_node("feedback_formatter_node", feedback_formatter_node) | |
| builder.add_node("post_review_coments_on_github_node", post_review_coments_on_github_node) | |
| # Logic | |
| builder.add_edge(START, "code_retriever_node") | |
| builder.add_edge("code_retriever_node", "code_reviewer_node") | |
| builder.add_edge("code_reviewer_node", "feedback_formatter_node") | |
| builder.add_edge("feedback_formatter_node", "post_review_coments_on_github_node") | |
| builder.add_edge("post_review_coments_on_github_node", END) | |
| # need to fix ParsedComment serializable error | |
| #graph = builder.compile(checkpointer=memory) | |
| graph = builder.compile() | |
| # View | |
| #display(Image(graph.get_graph().draw_mermaid_png())) | |