paijo77 commited on
Commit
58b0cb8
·
verified ·
1 Parent(s): a8009db

update app/oauth.py

Browse files
Files changed (1) hide show
  1. app/oauth.py +194 -0
app/oauth.py ADDED
@@ -0,0 +1,194 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import httpx
2
+ from fastapi import HTTPException
3
+ from sqlalchemy.ext.asyncio import AsyncSession
4
+ from sqlalchemy import select
5
+ from app.db_models import User
6
+ from app.auth import create_access_token
7
+ from app.config import settings
8
+
9
+
10
+ async def check_github_repo_admin(access_token: str, username: str) -> bool:
11
+ """
12
+ Check if user is admin/owner/collaborator of the configured GitHub repo.
13
+ Returns True if user has admin privileges on the repo.
14
+ """
15
+ if not settings.GITHUB_REPO_OWNER or not settings.GITHUB_REPO_NAME:
16
+ return False
17
+
18
+ async with httpx.AsyncClient() as client:
19
+ # Check if user is a collaborator
20
+ collab_response = await client.get(
21
+ f"https://api.github.com/repos/{settings.GITHUB_REPO_OWNER}/{settings.GITHUB_REPO_NAME}/collaborators/{username}",
22
+ headers={
23
+ "Authorization": f"Bearer {access_token}",
24
+ "Accept": "application/vnd.github+json",
25
+ },
26
+ )
27
+
28
+ if collab_response.status_code == 200:
29
+ collab_data = collab_response.json()
30
+ permission = collab_data.get("permission", "")
31
+ # Admin permissions: admin, maintain, triage
32
+ if permission in ["admin", "maintain", "triage"]:
33
+ return True
34
+
35
+ # Check if user is the repo owner (for organization repos)
36
+ repo_response = await client.get(
37
+ f"https://api.github.com/repos/{settings.GITHUB_REPO_OWNER}/{settings.GITHUB_REPO_NAME}",
38
+ headers={
39
+ "Authorization": f"Bearer {access_token}",
40
+ "Accept": "application/vnd.github+json",
41
+ },
42
+ )
43
+
44
+ if repo_response.status_code == 200:
45
+ repo_data = repo_response.json()
46
+ # Check if user is the owner
47
+ if repo_data.get("owner", {}).get("login") == username:
48
+ return True
49
+
50
+ return False
51
+
52
+
53
+ class OAuthHandler:
54
+ @staticmethod
55
+ async def github_callback(code: str, session: AsyncSession) -> tuple[User, str]:
56
+ async with httpx.AsyncClient() as client:
57
+ token_response = await client.post(
58
+ "https://github.com/login/oauth/access_token",
59
+ headers={"Accept": "application/json"},
60
+ data={
61
+ "client_id": settings.GITHUB_CLIENT_ID,
62
+ "client_secret": settings.GITHUB_CLIENT_SECRET,
63
+ "code": code,
64
+ },
65
+ )
66
+
67
+ if token_response.status_code != 200:
68
+ raise HTTPException(status_code=400, detail="GitHub OAuth failed")
69
+
70
+ token_data = token_response.json()
71
+ access_token = token_data.get("access_token")
72
+
73
+ user_response = await client.get(
74
+ "https://api.github.com/user",
75
+ headers={"Authorization": f"Bearer {access_token}"},
76
+ )
77
+
78
+ if user_response.status_code != 200:
79
+ raise HTTPException(
80
+ status_code=400, detail="Failed to fetch GitHub user"
81
+ )
82
+
83
+ github_user = user_response.json()
84
+ github_username = github_user["login"]
85
+ github_id = str(github_user["id"])
86
+
87
+ # Check if user is admin of the configured repo
88
+ is_repo_admin = await check_github_repo_admin(access_token, github_username)
89
+
90
+ result = await session.execute(
91
+ select(User).where(
92
+ User.oauth_provider == "github",
93
+ User.oauth_id == github_id,
94
+ )
95
+ )
96
+ user = result.scalar_one_or_none()
97
+
98
+ # Determine role: admin if repo collaborator, otherwise user
99
+ role = "admin" if is_repo_admin else "user"
100
+
101
+ if not user:
102
+ user = User(
103
+ oauth_provider="github",
104
+ oauth_id=github_id,
105
+ email=github_user.get("email") or f"{github_username}@github.local",
106
+ username=github_username,
107
+ avatar_url=github_user.get("avatar_url"),
108
+ role=role,
109
+ )
110
+ session.add(user)
111
+ else:
112
+ user.last_login = None
113
+ user.username = github_username
114
+ user.avatar_url = github_user.get("avatar_url")
115
+ user.role = role # Update role based on repo access
116
+
117
+ await session.commit()
118
+ await session.refresh(user)
119
+
120
+ jwt_token = create_access_token(
121
+ data={"sub": str(user.id), "email": user.email, "role": user.role}
122
+ )
123
+
124
+ return user, jwt_token
125
+
126
+ @staticmethod
127
+ async def google_callback(
128
+ code: str, redirect_uri: str, session: AsyncSession
129
+ ) -> tuple[User, str]:
130
+ async with httpx.AsyncClient() as client:
131
+ token_response = await client.post(
132
+ "https://oauth2.googleapis.com/token",
133
+ data={
134
+ "client_id": settings.GOOGLE_CLIENT_ID,
135
+ "client_secret": settings.GOOGLE_CLIENT_SECRET,
136
+ "code": code,
137
+ "grant_type": "authorization_code",
138
+ "redirect_uri": redirect_uri,
139
+ },
140
+ )
141
+
142
+ if token_response.status_code != 200:
143
+ raise HTTPException(status_code=400, detail="Google OAuth failed")
144
+
145
+ token_data = token_response.json()
146
+ access_token = token_data.get("access_token")
147
+
148
+ user_response = await client.get(
149
+ "https://www.googleapis.com/oauth2/v2/userinfo",
150
+ headers={"Authorization": f"Bearer {access_token}"},
151
+ )
152
+
153
+ if user_response.status_code != 200:
154
+ raise HTTPException(
155
+ status_code=400, detail="Failed to fetch Google user"
156
+ )
157
+
158
+ google_user = user_response.json()
159
+
160
+ result = await session.execute(
161
+ select(User).where(
162
+ User.oauth_provider == "google", User.oauth_id == google_user["id"]
163
+ )
164
+ )
165
+ user = result.scalar_one_or_none()
166
+
167
+ if not user:
168
+ user = User(
169
+ oauth_provider="google",
170
+ oauth_id=google_user["id"],
171
+ email=google_user["email"],
172
+ username=google_user.get(
173
+ "name", google_user["email"].split("@")[0]
174
+ ),
175
+ avatar_url=google_user.get("picture"),
176
+ role="user",
177
+ )
178
+ session.add(user)
179
+ else:
180
+ user.last_login = None
181
+ user.username = google_user.get("name", user.username)
182
+ user.avatar_url = google_user.get("picture")
183
+
184
+ await session.commit()
185
+ await session.refresh(user)
186
+
187
+ jwt_token = create_access_token(
188
+ data={"sub": str(user.id), "email": user.email, "role": user.role}
189
+ )
190
+
191
+ return user, jwt_token
192
+
193
+
194
+ oauth_handler = OAuthHandler()