zhangyi617's picture
Upload folder using huggingface_hub
129cd69
"""Util that calls GitHub."""
from __future__ import annotations
import json
from typing import TYPE_CHECKING, Any, Dict, List, Optional
from langchain_core.pydantic_v1 import BaseModel, Extra, root_validator
from langchain.utils import get_from_dict_or_env
if TYPE_CHECKING:
from github.Issue import Issue
class GitHubAPIWrapper(BaseModel):
"""Wrapper for GitHub API."""
github: Any #: :meta private:
github_repo_instance: Any #: :meta private:
github_repository: Optional[str] = None
github_app_id: Optional[str] = None
github_app_private_key: Optional[str] = None
github_branch: Optional[str] = None
github_base_branch: Optional[str] = None
class Config:
"""Configuration for this pydantic object."""
extra = Extra.forbid
@root_validator()
def validate_environment(cls, values: Dict) -> Dict:
"""Validate that api key and python package exists in environment."""
github_repository = get_from_dict_or_env(
values, "github_repository", "GITHUB_REPOSITORY"
)
github_app_id = get_from_dict_or_env(values, "github_app_id", "GITHUB_APP_ID")
github_app_private_key = get_from_dict_or_env(
values, "github_app_private_key", "GITHUB_APP_PRIVATE_KEY"
)
github_branch = get_from_dict_or_env(
values, "github_branch", "GITHUB_BRANCH", default="master"
)
github_base_branch = get_from_dict_or_env(
values, "github_base_branch", "GITHUB_BASE_BRANCH", default="master"
)
try:
from github import Auth, GithubIntegration
except ImportError:
raise ImportError(
"PyGithub is not installed. "
"Please install it with `pip install PyGithub`"
)
with open(github_app_private_key, "r") as f:
private_key = f.read()
auth = Auth.AppAuth(
github_app_id,
private_key,
)
gi = GithubIntegration(auth=auth)
installation = gi.get_installations()[0]
# create a GitHub instance:
g = installation.get_github_for_installation()
values["github"] = g
values["github_repo_instance"] = g.get_repo(github_repository)
values["github_repository"] = github_repository
values["github_app_id"] = github_app_id
values["github_app_private_key"] = github_app_private_key
values["github_branch"] = github_branch
values["github_base_branch"] = github_base_branch
return values
def parse_issues(self, issues: List[Issue]) -> List[dict]:
"""
Extracts title and number from each Issue and puts them in a dictionary
Parameters:
issues(List[Issue]): A list of Github Issue objects
Returns:
List[dict]: A dictionary of issue titles and numbers
"""
parsed = []
for issue in issues:
title = issue.title
number = issue.number
parsed.append({"title": title, "number": number})
return parsed
def get_issues(self) -> str:
"""
Fetches all open issues from the repo
Returns:
str: A plaintext report containing the number of issues
and each issue's title and number.
"""
issues = self.github_repo_instance.get_issues(state="open")
if issues.totalCount > 0:
parsed_issues = self.parse_issues(issues)
parsed_issues_str = (
"Found " + str(len(parsed_issues)) + " issues:\n" + str(parsed_issues)
)
return parsed_issues_str
else:
return "No open issues available"
def get_issue(self, issue_number: int) -> Dict[str, Any]:
"""
Fetches a specific issue and its first 10 comments
Parameters:
issue_number(int): The number for the github issue
Returns:
dict: A doctionary containing the issue's title,
body, and comments as a string
"""
issue = self.github_repo_instance.get_issue(number=issue_number)
page = 0
comments: List[dict] = []
while len(comments) <= 10:
comments_page = issue.get_comments().get_page(page)
if len(comments_page) == 0:
break
for comment in comments_page:
comments.append({"body": comment.body, "user": comment.user.login})
page += 1
return {
"title": issue.title,
"body": issue.body,
"comments": str(comments),
}
def create_pull_request(self, pr_query: str) -> str:
"""
Makes a pull request from the bot's branch to the base branch
Parameters:
pr_query(str): a string which contains the PR title
and the PR body. The title is the first line
in the string, and the body are the rest of the string.
For example, "Updated README\nmade changes to add info"
Returns:
str: A success or failure message
"""
if self.github_base_branch == self.github_branch:
return """Cannot make a pull request because
commits are already in the master branch"""
else:
try:
title = pr_query.split("\n")[0]
body = pr_query[len(title) + 2 :]
pr = self.github_repo_instance.create_pull(
title=title,
body=body,
head=self.github_branch,
base=self.github_base_branch,
)
return f"Successfully created PR number {pr.number}"
except Exception as e:
return "Unable to make pull request due to error:\n" + str(e)
def comment_on_issue(self, comment_query: str) -> str:
"""
Adds a comment to a github issue
Parameters:
comment_query(str): a string which contains the issue number,
two newlines, and the comment.
for example: "1\n\nWorking on it now"
adds the comment "working on it now" to issue 1
Returns:
str: A success or failure message
"""
issue_number = int(comment_query.split("\n\n")[0])
comment = comment_query[len(str(issue_number)) + 2 :]
try:
issue = self.github_repo_instance.get_issue(number=issue_number)
issue.create_comment(comment)
return "Commented on issue " + str(issue_number)
except Exception as e:
return "Unable to make comment due to error:\n" + str(e)
def create_file(self, file_query: str) -> str:
"""
Creates a new file on the Github repo
Parameters:
file_query(str): a string which contains the file path
and the file contents. The file path is the first line
in the string, and the contents are the rest of the string.
For example, "hello_world.md\n# Hello World!"
Returns:
str: A success or failure message
"""
file_path = file_query.split("\n")[0]
file_contents = file_query[len(file_path) + 2 :]
try:
exists = self.github_repo_instance.get_contents(file_path)
if exists is None:
self.github_repo_instance.create_file(
path=file_path,
message="Create " + file_path,
content=file_contents,
branch=self.github_branch,
)
return "Created file " + file_path
else:
return f"File already exists at {file_path}. Use update_file instead"
except Exception as e:
return "Unable to make file due to error:\n" + str(e)
def read_file(self, file_path: str) -> str:
"""
Reads a file from the github repo
Parameters:
file_path(str): the file path
Returns:
str: The file decoded as a string
"""
file = self.github_repo_instance.get_contents(file_path)
return file.decoded_content.decode("utf-8")
def update_file(self, file_query: str) -> str:
"""
Updates a file with new content.
Parameters:
file_query(str): Contains the file path and the file contents.
The old file contents is wrapped in OLD <<<< and >>>> OLD
The new file contents is wrapped in NEW <<<< and >>>> NEW
For example:
/test/hello.txt
OLD <<<<
Hello Earth!
>>>> OLD
NEW <<<<
Hello Mars!
>>>> NEW
Returns:
A success or failure message
"""
try:
file_path = file_query.split("\n")[0]
old_file_contents = (
file_query.split("OLD <<<<")[1].split(">>>> OLD")[0].strip()
)
new_file_contents = (
file_query.split("NEW <<<<")[1].split(">>>> NEW")[0].strip()
)
file_content = self.read_file(file_path)
updated_file_content = file_content.replace(
old_file_contents, new_file_contents
)
if file_content == updated_file_content:
return (
"File content was not updated because old content was not found."
"It may be helpful to use the read_file action to get "
"the current file contents."
)
self.github_repo_instance.update_file(
path=file_path,
message="Update " + file_path,
content=updated_file_content,
branch=self.github_branch,
sha=self.github_repo_instance.get_contents(file_path).sha,
)
return "Updated file " + file_path
except Exception as e:
return "Unable to update file due to error:\n" + str(e)
def delete_file(self, file_path: str) -> str:
"""
Deletes a file from the repo
Parameters:
file_path(str): Where the file is
Returns:
str: Success or failure message
"""
try:
file = self.github_repo_instance.get_contents(file_path)
self.github_repo_instance.delete_file(
path=file_path,
message="Delete " + file_path,
branch=self.github_branch,
sha=file.sha,
)
return "Deleted file " + file_path
except Exception as e:
return "Unable to delete file due to error:\n" + str(e)
def run(self, mode: str, query: str) -> str:
if mode == "get_issues":
return self.get_issues()
elif mode == "get_issue":
return json.dumps(self.get_issue(int(query)))
elif mode == "comment_on_issue":
return self.comment_on_issue(query)
elif mode == "create_file":
return self.create_file(query)
elif mode == "create_pull_request":
return self.create_pull_request(query)
elif mode == "read_file":
return self.read_file(query)
elif mode == "update_file":
return self.update_file(query)
elif mode == "delete_file":
return self.delete_file(query)
else:
raise ValueError("Invalid mode" + mode)