| """ |
| Jira Sprint Management REST API |
| Built with FastAPI - allows other applications to interact with Jira programmatically |
| """ |
|
|
| from fastapi import FastAPI, HTTPException |
| from pydantic import BaseModel, Field |
| from typing import Optional |
| from datetime import datetime, timedelta, timezone |
| import requests |
| from requests.auth import HTTPBasicAuth |
| import json |
| import os |
| from dotenv import load_dotenv |
|
|
| load_dotenv() |
|
|
| |
| BASE_URL = os.environ.get("JIRA_BASE_URL", "") |
| EMAIL = os.environ.get("JIRA_EMAIL", "") |
| API_TOKEN = os.environ.get("JIRA_API_TOKEN", "") |
|
|
| PROJECT_KEY = os.environ.get("JIRA_PROJECT_KEY", "SCRUM") |
| BOARD_ID = int(os.environ.get("JIRA_BOARD_ID", "1")) |
|
|
| if not BASE_URL: |
| raise ValueError("Missing required environment variable: JIRA_BASE_URL") |
| if not EMAIL: |
| raise ValueError("Missing required environment variable: JIRA_EMAIL") |
| if not API_TOKEN: |
| raise ValueError("Missing required environment variable: JIRA_API_TOKEN") |
|
|
| auth = HTTPBasicAuth(EMAIL, API_TOKEN) |
| headers = { |
| "Accept": "application/json", |
| "Content-Type": "application/json" |
| } |
|
|
| |
| app = FastAPI( |
| title="Jira Sprint Management API", |
| description="REST API for managing Jira sprints, stories, and issues", |
| version="1.0.0" |
| ) |
|
|
|
|
| |
| class StoryCreate(BaseModel): |
| name: str = Field(..., description="Story name/summary") |
| description: Optional[str] = Field("", description="Story description") |
|
|
|
|
| class SprintAddIssues(BaseModel): |
| issue_keys: list[str] = Field(..., description="List of issue keys to add to sprint") |
|
|
|
|
| class IssueTransition(BaseModel): |
| status: str = Field(..., description="Status: '1'=To Do, '2'=In Progress, '3'=Testing, '4'=Done") |
| comment: Optional[str] = Field("", description="Reason/comment for the status change") |
|
|
|
|
| class SprintRollover(BaseModel): |
| new_sprint_name: str = Field(..., description="Name for the new sprint") |
| add_backlog_to_new_sprint: bool = Field( |
| False, |
| description="Whether to add remaining backlog items to the new sprint" |
| ) |
|
|
|
|
| |
|
|
| |
| STATUS_MAP = { |
| "1": {"id": "11", "name": "To Do"}, |
| "2": {"id": "21", "name": "In Progress"}, |
| "3": {"id": "31", "name": "Testing"}, |
| "4": {"id": "41", "name": "Done"} |
| } |
|
|
| def get_transition_id(status_code): |
| """Convert simple status code (1,2,3,4) to Jira transition ID""" |
| if status_code not in STATUS_MAP: |
| raise ValueError(f"Invalid status code: {status_code}. Use 1=To Do, 2=In Progress, 3=Testing, 4=Done") |
| return STATUS_MAP[status_code] |
|
|
|
|
| def get_next_saturday_friday(): |
| today = datetime.now(timezone.utc) |
| days_until_sat = (5 - today.weekday()) % 7 |
| if days_until_sat == 0: |
| days_until_sat = 7 |
| start = today + timedelta(days=days_until_sat) |
| end = start + timedelta(days=6) |
| return ( |
| start.strftime("%Y-%m-%dT%H:%M:%S.000+0000"), |
| end.strftime("%Y-%m-%dT%H:%M:%S.000+0000") |
| ) |
|
|
|
|
| def parse_description(desc_data): |
| """Parse Jira description JSON to plain text""" |
| if not desc_data or "content" not in desc_data: |
| return "" |
| text = "" |
| for block in desc_data.get("content", []): |
| if "content" in block: |
| for item in block["content"]: |
| if "text" in item: |
| text += item["text"] |
| return text |
|
|
|
|
| |
|
|
| @app.get("/") |
| def root(): |
| """API health check""" |
| return { |
| "status": "ok", |
| "message": "Jira Sprint Management API is running", |
| "version": "1.0.0" |
| } |
|
|
|
|
| @app.get("/api/backlog") |
| def get_backlog(): |
| """ |
| Get all backlog issues (issues not in any sprint and not done) |
| Returns: List of issues with id, name, description, status |
| """ |
| try: |
| jql = f'project={PROJECT_KEY} AND sprint IS EMPTY AND statusCategory != Done' |
| url = f"{BASE_URL}/rest/api/3/search/jql" |
| params = { |
| "jql": jql, |
| "fields": "summary,status,description", |
| "maxResults": 100 |
| } |
| res = requests.get(url, headers=headers, auth=auth, params=params) |
| res.raise_for_status() |
| issues = res.json()["issues"] |
|
|
| return { |
| "success": True, |
| "count": len(issues), |
| "issues": [ |
| { |
| "id": issue["key"], |
| "name": issue["fields"]["summary"], |
| "description": parse_description(issue["fields"].get("description")), |
| "status": issue["fields"]["status"]["name"] |
| } |
| for issue in issues |
| ] |
| } |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Failed to fetch backlog: {str(e)}") |
|
|
|
|
| @app.post("/api/story") |
| def create_story(story: StoryCreate): |
| """ |
| Create a new story in Jira |
| Returns: Created issue details with id and key |
| """ |
| try: |
| url = f"{BASE_URL}/rest/api/3/issue" |
| payload = { |
| "fields": { |
| "project": {"key": PROJECT_KEY}, |
| "summary": story.name, |
| "issuetype": {"name": "Story"}, |
| "description": { |
| "type": "doc", |
| "version": 1, |
| "content": [ |
| { |
| "type": "paragraph", |
| "content": [ |
| { |
| "type": "text", |
| "text": story.description if story.description else "No description provided" |
| } |
| ] |
| } |
| ] |
| } |
| } |
| } |
| res = requests.post(url, headers=headers, auth=auth, data=json.dumps(payload)) |
| res.raise_for_status() |
| data = res.json() |
|
|
| return { |
| "success": True, |
| "message": "Story created successfully", |
| "issue": { |
| "id": data["id"], |
| "key": data["key"], |
| "name": story.name, |
| "description": story.description |
| } |
| } |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Failed to create story: {str(e)}") |
|
|
|
|
| @app.get("/api/sprint/active") |
| def get_active_sprint(): |
| """ |
| Get the current active sprint and its issues |
| Returns: Sprint details with all issues and their statuses |
| """ |
| try: |
| url = f"{BASE_URL}/rest/agile/1.0/board/{BOARD_ID}/sprint?state=active" |
| res = requests.get(url, headers=headers, auth=auth) |
| res.raise_for_status() |
| sprints = res.json()["values"] |
|
|
| if not sprints: |
| return { |
| "success": True, |
| "active_sprint": None, |
| "message": "No active sprint found" |
| } |
|
|
| sprint = sprints[0] |
| sprint_id = sprint["id"] |
|
|
| |
| issues_url = f"{BASE_URL}/rest/agile/1.0/sprint/{sprint_id}/issue" |
| issues_res = requests.get(issues_url, headers=headers, auth=auth) |
| issues_res.raise_for_status() |
| issues = issues_res.json()["issues"] |
|
|
| return { |
| "success": True, |
| "active_sprint": { |
| "id": sprint["id"], |
| "name": sprint["name"], |
| "state": sprint["state"], |
| "start_date": sprint["startDate"], |
| "end_date": sprint["endDate"], |
| "issues": [ |
| { |
| "id": issue["key"], |
| "name": issue["fields"]["summary"], |
| "status": issue["fields"]["status"]["name"], |
| "status_category": issue["fields"]["status"]["statusCategory"]["name"] |
| } |
| for issue in issues |
| ], |
| "issue_count": len(issues) |
| } |
| } |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Failed to fetch active sprint: {str(e)}") |
|
|
|
|
| @app.post("/api/sprint/{sprint_id}/add-issues") |
| def add_issues_to_sprint(sprint_id: int, body: SprintAddIssues): |
| """ |
| Add issues to a sprint |
| Returns: Success message with count of added issues |
| """ |
| try: |
| url = f"{BASE_URL}/rest/agile/1.0/sprint/{sprint_id}/issue" |
| payload = {"issues": body.issue_keys} |
| res = requests.post(url, headers=headers, auth=auth, data=json.dumps(payload)) |
| res.raise_for_status() |
|
|
| return { |
| "success": True, |
| "message": f"Added {len(body.issue_keys)} issues to sprint", |
| "added_issues": body.issue_keys, |
| "sprint_id": sprint_id |
| } |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Failed to add issues to sprint: {str(e)}") |
|
|
|
|
| @app.post("/api/issue/{issue_key}/transition") |
| def transition_issue(issue_key: str, body: IssueTransition): |
| """ |
| Transition an issue to a new status with an optional comment/reason |
| Status codes: 1=To Do, 2=In Progress, 3=Testing, 4=Done |
| Returns: Success message |
| """ |
| try: |
| |
| transition = get_transition_id(body.status) |
| transition_id = transition["id"] |
| status_name = transition["name"] |
|
|
| |
| url = f"{BASE_URL}/rest/api/3/issue/{issue_key}/transitions" |
| payload = {"transition": {"id": transition_id}} |
|
|
| res = requests.post(url, headers=headers, auth=auth, data=json.dumps(payload)) |
|
|
| if res.status_code != 204: |
| raise HTTPException( |
| status_code=400, |
| detail=f"Transition failed: {res.text}" |
| ) |
|
|
| |
| comment_added = None |
| if body.comment: |
| comment_url = f"{BASE_URL}/rest/api/3/issue/{issue_key}/comment" |
| comment_payload = { |
| "body": { |
| "type": "doc", |
| "version": 1, |
| "content": [ |
| { |
| "type": "paragraph", |
| "content": [ |
| {"type": "text", "text": body.comment} |
| ] |
| } |
| ] |
| } |
| } |
| comment_res = requests.post(comment_url, headers=headers, auth=auth, data=json.dumps(comment_payload)) |
| if comment_res.status_code == 201: |
| comment_added = body.comment |
| else: |
| |
| print(f"Warning: Comment not added for {issue_key}: {comment_res.text}") |
|
|
| return { |
| "success": True, |
| "message": f"Issue {issue_key} moved to {status_name}", |
| "issue_key": issue_key, |
| "status": status_name, |
| "status_code": body.status, |
| "comment_added": comment_added |
| } |
| except ValueError as e: |
| raise HTTPException(status_code=400, detail=str(e)) |
| except HTTPException: |
| raise |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Failed to transition issue: {str(e)}") |
|
|
|
|
| @app.post("/api/sprint/rollover") |
| def sprint_rollover(body: SprintRollover): |
| """ |
| End current sprint and start a new one |
| - Closes current sprint |
| - Carries forward unfinished issues |
| - Creates and starts new sprint |
| - Optionally adds backlog items to new sprint |
| |
| Returns: Details of the new sprint and carried forward issues |
| """ |
| try: |
| |
| url = f"{BASE_URL}/rest/agile/1.0/board/{BOARD_ID}/sprint?state=active" |
| res = requests.get(url, headers=headers, auth=auth) |
| res.raise_for_status() |
| sprints = res.json()["values"] |
|
|
| if not sprints: |
| raise HTTPException(status_code=400, detail="No active sprint to close") |
|
|
| active_sprint = sprints[0] |
|
|
| |
| issues_url = f"{BASE_URL}/rest/agile/1.0/sprint/{active_sprint['id']}/issue" |
| issues_res = requests.get(issues_url, headers=headers, auth=auth) |
| issues_res.raise_for_status() |
| sprint_issues = issues_res.json()["issues"] |
|
|
| |
| unfinished = [ |
| issue["key"] |
| for issue in sprint_issues |
| if issue["fields"]["status"]["statusCategory"]["name"] != "Done" |
| ] |
|
|
| |
| close_url = f"{BASE_URL}/rest/agile/1.0/sprint/{active_sprint['id']}" |
| close_payload = { |
| "state": "closed", |
| "name": active_sprint["name"], |
| "startDate": active_sprint["startDate"], |
| "endDate": active_sprint["endDate"] |
| } |
| close_res = requests.put(close_url, headers=headers, auth=auth, data=json.dumps(close_payload)) |
| if close_res.status_code != 200: |
| raise HTTPException(status_code=400, detail=f"Failed to close sprint: {close_res.text}") |
|
|
| |
| start, end = get_next_saturday_friday() |
| create_url = f"{BASE_URL}/rest/agile/1.0/sprint" |
| create_payload = { |
| "name": body.new_sprint_name, |
| "originBoardId": BOARD_ID, |
| "startDate": start, |
| "endDate": end |
| } |
| create_res = requests.post(create_url, headers=headers, auth=auth, data=json.dumps(create_payload)) |
| create_res.raise_for_status() |
| new_sprint = create_res.json() |
|
|
| |
| start_url = f"{BASE_URL}/rest/agile/1.0/sprint/{new_sprint['id']}" |
| start_payload = { |
| "state": "active", |
| "startDate": start, |
| "endDate": end, |
| "name": body.new_sprint_name |
| } |
| start_res = requests.put(start_url, headers=headers, auth=auth, data=json.dumps(start_payload)) |
| start_res.raise_for_status() |
|
|
| |
| if unfinished: |
| add_url = f"{BASE_URL}/rest/agile/1.0/sprint/{new_sprint['id']}/issue" |
| add_payload = {"issues": unfinished} |
| add_res = requests.post(add_url, headers=headers, auth=auth, data=json.dumps(add_payload)) |
| add_res.raise_for_status() |
|
|
| |
| added_backlog = [] |
| if body.add_backlog_to_new_sprint: |
| backlog_jql = f'project={PROJECT_KEY} AND sprint IS EMPTY AND statusCategory != Done' |
| backlog_url = f"{BASE_URL}/rest/api/3/search/jql" |
| backlog_params = {"jql": backlog_jql, "maxResults": 100} |
| backlog_res = requests.get(backlog_url, headers=headers, auth=auth, params=backlog_params) |
| backlog_res.raise_for_status() |
| backlog_issues = backlog_res.json()["issues"] |
| backlog_keys = [issue["key"] for issue in backlog_issues] |
|
|
| if backlog_keys: |
| add_backlog_url = f"{BASE_URL}/rest/agile/1.0/sprint/{new_sprint['id']}/issue" |
| add_backlog_payload = {"issues": backlog_keys} |
| add_backlog_res = requests.post(add_backlog_url, headers=headers, auth=auth, data=json.dumps(add_backlog_payload)) |
| add_backlog_res.raise_for_status() |
| added_backlog = backlog_keys |
|
|
| return { |
| "success": True, |
| "message": "Sprint rolled over successfully", |
| "old_sprint": { |
| "id": active_sprint["id"], |
| "name": active_sprint["name"], |
| "status": "closed" |
| }, |
| "new_sprint": { |
| "id": new_sprint["id"], |
| "name": body.new_sprint_name, |
| "start_date": start, |
| "end_date": end, |
| "status": "active" |
| }, |
| "carried_forward": unfinished, |
| "carried_forward_count": len(unfinished), |
| "added_from_backlog": added_backlog, |
| "backlog_count": len(added_backlog) |
| } |
| except HTTPException: |
| raise |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Failed to rollover sprint: {str(e)}") |
|
|
|
|
| |
| if __name__ == "__main__": |
| import uvicorn |
| uvicorn.run(app, host="0.0.0.0", port=8001) |
|
|