Spaces:
Runtime error
Runtime error
| """Util that calls GitHub.""" | |
| from __future__ import annotations | |
| import json | |
| from typing import TYPE_CHECKING, Any, Dict, List, Optional | |
| from langchain_core.pydantic_v1 import BaseModel, Extra, root_validator | |
| from langchain.utils import get_from_dict_or_env | |
| if TYPE_CHECKING: | |
| from github.Issue import Issue | |
| class GitHubAPIWrapper(BaseModel): | |
| """Wrapper for GitHub API.""" | |
| github: Any #: :meta private: | |
| github_repo_instance: Any #: :meta private: | |
| github_repository: Optional[str] = None | |
| github_app_id: Optional[str] = None | |
| github_app_private_key: Optional[str] = None | |
| github_branch: Optional[str] = None | |
| github_base_branch: Optional[str] = None | |
| class Config: | |
| """Configuration for this pydantic object.""" | |
| extra = Extra.forbid | |
| def validate_environment(cls, values: Dict) -> Dict: | |
| """Validate that api key and python package exists in environment.""" | |
| github_repository = get_from_dict_or_env( | |
| values, "github_repository", "GITHUB_REPOSITORY" | |
| ) | |
| github_app_id = get_from_dict_or_env(values, "github_app_id", "GITHUB_APP_ID") | |
| github_app_private_key = get_from_dict_or_env( | |
| values, "github_app_private_key", "GITHUB_APP_PRIVATE_KEY" | |
| ) | |
| github_branch = get_from_dict_or_env( | |
| values, "github_branch", "GITHUB_BRANCH", default="master" | |
| ) | |
| github_base_branch = get_from_dict_or_env( | |
| values, "github_base_branch", "GITHUB_BASE_BRANCH", default="master" | |
| ) | |
| try: | |
| from github import Auth, GithubIntegration | |
| except ImportError: | |
| raise ImportError( | |
| "PyGithub is not installed. " | |
| "Please install it with `pip install PyGithub`" | |
| ) | |
| with open(github_app_private_key, "r") as f: | |
| private_key = f.read() | |
| auth = Auth.AppAuth( | |
| github_app_id, | |
| private_key, | |
| ) | |
| gi = GithubIntegration(auth=auth) | |
| installation = gi.get_installations()[0] | |
| # create a GitHub instance: | |
| g = installation.get_github_for_installation() | |
| values["github"] = g | |
| values["github_repo_instance"] = g.get_repo(github_repository) | |
| values["github_repository"] = github_repository | |
| values["github_app_id"] = github_app_id | |
| values["github_app_private_key"] = github_app_private_key | |
| values["github_branch"] = github_branch | |
| values["github_base_branch"] = github_base_branch | |
| return values | |
| def parse_issues(self, issues: List[Issue]) -> List[dict]: | |
| """ | |
| Extracts title and number from each Issue and puts them in a dictionary | |
| Parameters: | |
| issues(List[Issue]): A list of Github Issue objects | |
| Returns: | |
| List[dict]: A dictionary of issue titles and numbers | |
| """ | |
| parsed = [] | |
| for issue in issues: | |
| title = issue.title | |
| number = issue.number | |
| parsed.append({"title": title, "number": number}) | |
| return parsed | |
| def get_issues(self) -> str: | |
| """ | |
| Fetches all open issues from the repo | |
| Returns: | |
| str: A plaintext report containing the number of issues | |
| and each issue's title and number. | |
| """ | |
| issues = self.github_repo_instance.get_issues(state="open") | |
| if issues.totalCount > 0: | |
| parsed_issues = self.parse_issues(issues) | |
| parsed_issues_str = ( | |
| "Found " + str(len(parsed_issues)) + " issues:\n" + str(parsed_issues) | |
| ) | |
| return parsed_issues_str | |
| else: | |
| return "No open issues available" | |
| def get_issue(self, issue_number: int) -> Dict[str, Any]: | |
| """ | |
| Fetches a specific issue and its first 10 comments | |
| Parameters: | |
| issue_number(int): The number for the github issue | |
| Returns: | |
| dict: A doctionary containing the issue's title, | |
| body, and comments as a string | |
| """ | |
| issue = self.github_repo_instance.get_issue(number=issue_number) | |
| page = 0 | |
| comments: List[dict] = [] | |
| while len(comments) <= 10: | |
| comments_page = issue.get_comments().get_page(page) | |
| if len(comments_page) == 0: | |
| break | |
| for comment in comments_page: | |
| comments.append({"body": comment.body, "user": comment.user.login}) | |
| page += 1 | |
| return { | |
| "title": issue.title, | |
| "body": issue.body, | |
| "comments": str(comments), | |
| } | |
| def create_pull_request(self, pr_query: str) -> str: | |
| """ | |
| Makes a pull request from the bot's branch to the base branch | |
| Parameters: | |
| pr_query(str): a string which contains the PR title | |
| and the PR body. The title is the first line | |
| in the string, and the body are the rest of the string. | |
| For example, "Updated README\nmade changes to add info" | |
| Returns: | |
| str: A success or failure message | |
| """ | |
| if self.github_base_branch == self.github_branch: | |
| return """Cannot make a pull request because | |
| commits are already in the master branch""" | |
| else: | |
| try: | |
| title = pr_query.split("\n")[0] | |
| body = pr_query[len(title) + 2 :] | |
| pr = self.github_repo_instance.create_pull( | |
| title=title, | |
| body=body, | |
| head=self.github_branch, | |
| base=self.github_base_branch, | |
| ) | |
| return f"Successfully created PR number {pr.number}" | |
| except Exception as e: | |
| return "Unable to make pull request due to error:\n" + str(e) | |
| def comment_on_issue(self, comment_query: str) -> str: | |
| """ | |
| Adds a comment to a github issue | |
| Parameters: | |
| comment_query(str): a string which contains the issue number, | |
| two newlines, and the comment. | |
| for example: "1\n\nWorking on it now" | |
| adds the comment "working on it now" to issue 1 | |
| Returns: | |
| str: A success or failure message | |
| """ | |
| issue_number = int(comment_query.split("\n\n")[0]) | |
| comment = comment_query[len(str(issue_number)) + 2 :] | |
| try: | |
| issue = self.github_repo_instance.get_issue(number=issue_number) | |
| issue.create_comment(comment) | |
| return "Commented on issue " + str(issue_number) | |
| except Exception as e: | |
| return "Unable to make comment due to error:\n" + str(e) | |
| def create_file(self, file_query: str) -> str: | |
| """ | |
| Creates a new file on the Github repo | |
| Parameters: | |
| file_query(str): a string which contains the file path | |
| and the file contents. The file path is the first line | |
| in the string, and the contents are the rest of the string. | |
| For example, "hello_world.md\n# Hello World!" | |
| Returns: | |
| str: A success or failure message | |
| """ | |
| file_path = file_query.split("\n")[0] | |
| file_contents = file_query[len(file_path) + 2 :] | |
| try: | |
| exists = self.github_repo_instance.get_contents(file_path) | |
| if exists is None: | |
| self.github_repo_instance.create_file( | |
| path=file_path, | |
| message="Create " + file_path, | |
| content=file_contents, | |
| branch=self.github_branch, | |
| ) | |
| return "Created file " + file_path | |
| else: | |
| return f"File already exists at {file_path}. Use update_file instead" | |
| except Exception as e: | |
| return "Unable to make file due to error:\n" + str(e) | |
| def read_file(self, file_path: str) -> str: | |
| """ | |
| Reads a file from the github repo | |
| Parameters: | |
| file_path(str): the file path | |
| Returns: | |
| str: The file decoded as a string | |
| """ | |
| file = self.github_repo_instance.get_contents(file_path) | |
| return file.decoded_content.decode("utf-8") | |
| def update_file(self, file_query: str) -> str: | |
| """ | |
| Updates a file with new content. | |
| Parameters: | |
| file_query(str): Contains the file path and the file contents. | |
| The old file contents is wrapped in OLD <<<< and >>>> OLD | |
| The new file contents is wrapped in NEW <<<< and >>>> NEW | |
| For example: | |
| /test/hello.txt | |
| OLD <<<< | |
| Hello Earth! | |
| >>>> OLD | |
| NEW <<<< | |
| Hello Mars! | |
| >>>> NEW | |
| Returns: | |
| A success or failure message | |
| """ | |
| try: | |
| file_path = file_query.split("\n")[0] | |
| old_file_contents = ( | |
| file_query.split("OLD <<<<")[1].split(">>>> OLD")[0].strip() | |
| ) | |
| new_file_contents = ( | |
| file_query.split("NEW <<<<")[1].split(">>>> NEW")[0].strip() | |
| ) | |
| file_content = self.read_file(file_path) | |
| updated_file_content = file_content.replace( | |
| old_file_contents, new_file_contents | |
| ) | |
| if file_content == updated_file_content: | |
| return ( | |
| "File content was not updated because old content was not found." | |
| "It may be helpful to use the read_file action to get " | |
| "the current file contents." | |
| ) | |
| self.github_repo_instance.update_file( | |
| path=file_path, | |
| message="Update " + file_path, | |
| content=updated_file_content, | |
| branch=self.github_branch, | |
| sha=self.github_repo_instance.get_contents(file_path).sha, | |
| ) | |
| return "Updated file " + file_path | |
| except Exception as e: | |
| return "Unable to update file due to error:\n" + str(e) | |
| def delete_file(self, file_path: str) -> str: | |
| """ | |
| Deletes a file from the repo | |
| Parameters: | |
| file_path(str): Where the file is | |
| Returns: | |
| str: Success or failure message | |
| """ | |
| try: | |
| file = self.github_repo_instance.get_contents(file_path) | |
| self.github_repo_instance.delete_file( | |
| path=file_path, | |
| message="Delete " + file_path, | |
| branch=self.github_branch, | |
| sha=file.sha, | |
| ) | |
| return "Deleted file " + file_path | |
| except Exception as e: | |
| return "Unable to delete file due to error:\n" + str(e) | |
| def run(self, mode: str, query: str) -> str: | |
| if mode == "get_issues": | |
| return self.get_issues() | |
| elif mode == "get_issue": | |
| return json.dumps(self.get_issue(int(query))) | |
| elif mode == "comment_on_issue": | |
| return self.comment_on_issue(query) | |
| elif mode == "create_file": | |
| return self.create_file(query) | |
| elif mode == "create_pull_request": | |
| return self.create_pull_request(query) | |
| elif mode == "read_file": | |
| return self.read_file(query) | |
| elif mode == "update_file": | |
| return self.update_file(query) | |
| elif mode == "delete_file": | |
| return self.delete_file(query) | |
| else: | |
| raise ValueError("Invalid mode" + mode) | |