Akane710 commited on
Commit
5dcf7d0
·
verified ·
1 Parent(s): 2b8e58c

Update main.py

Browse files
Files changed (1) hide show
  1. main.py +105 -233
main.py CHANGED
@@ -1,24 +1,18 @@
 
1
  import os
2
- import base64
3
- import hashlib
4
- import hmac
5
- from fastapi import FastAPI, Query, HTTPException
6
- from fastapi.responses import JSONResponse
7
- from typing import Optional
8
  from io import BytesIO
9
- from Crypto.Cipher import AES
 
 
 
 
10
  import cloudinary
11
  import cloudinary.uploader
12
  from cloudinary.utils import cloudinary_url
13
- import requests
14
- import asyncio
15
- import concurrent.futures
16
- from ytube_api import Ytube
17
- from BerogovPyCAI import get_client
18
- from BerogovPyCAI.exceptions import SessionClosedError
19
- import enkanetwork
20
- import enkacard
21
- import starrailcard
22
 
23
  app = FastAPI()
24
 
@@ -30,270 +24,148 @@ cloudinary.config(
30
  secure=True
31
  )
32
 
33
- TOKEN = "529e24b4173b29dbc3054fef02a380e1e5b41949"
34
- CHARACTER_ID = "smtV3Vyez6ODkwS8BErmBAdgGNj-1XWU73wIFVOY1hQ"
35
- client = None # BerogovPyCAI client to be initialized on startup
36
-
37
- appInfo = {
38
- "image": b"WhatsApp Image Keys",
39
- "video": b"WhatsApp Video Keys",
40
- "audio": b"WhatsApp Audio Keys",
41
- "document": b"WhatsApp Document Keys",
42
- "image/webp": b"WhatsApp Image Keys",
43
- "image/jpeg": b"WhatsApp Image Keys",
44
- "image/png": b"WhatsApp Image Keys",
45
- "video/mp4": b"WhatsApp Video Keys",
46
- "audio/aac": b"WhatsApp Audio Keys",
47
- "audio/ogg": b"WhatsApp Audio Keys",
48
- "audio/wav": b"WhatsApp Audio Keys",
49
- }
50
-
51
- extension = {
52
- "image": "jpg",
53
- "video": "mp4",
54
- "audio": "ogg",
55
- "document": "bin",
56
- }
57
-
58
- # Key Derivation Function using HKDF
59
- def HKDF(key, length, appInfo=b""):
60
- key = hmac.new(b"\0"*32, key, hashlib.sha256).digest()
61
- keyStream = b""
62
- keyBlock = b""
63
- blockIndex = 1
64
- while len(keyStream) < length:
65
- keyBlock = hmac.new(
66
- key,
67
- msg=keyBlock + appInfo + (chr(blockIndex).encode("utf-8")),
68
- digestmod=hashlib.sha256
69
- ).digest()
70
- blockIndex += 1
71
- keyStream += keyBlock
72
- return keyStream[:length]
73
-
74
- def AESUnpad(s):
75
- return s[:-ord(s[len(s)-1:])]
76
-
77
- def AESDecrypt(key, ciphertext, iv):
78
- cipher = AES.new(key, AES.MODE_CBC, iv)
79
- plaintext = cipher.decrypt(ciphertext)
80
- return AESUnpad(plaintext)
81
-
82
- # Helper functions for Cloudinary upload
83
- def upload_image(data, public_id=None):
84
- try:
85
- upload_result = cloudinary.uploader.upload(
86
- data, folder="card_images", public_id=public_id, invalidate=True
87
- )
88
- return upload_result["secure_url"]
89
- except Exception as e:
90
- raise Exception(f"Cloudinary upload error: {str(e)}")
91
-
92
- # Helper functions to create genshin and starrail cards/profiles
93
- async def genshin_card(id, designtype, character_id=None, character_art_url=None):
94
  character_art = {str(character_id): character_art_url} if character_id and character_art_url else None
95
- async with enkacard.ENC(uid=str(id), character_art=character_art) as encard:
96
  return await encard.creat(akasha=True, template=(2 if str(designtype) == "2" else 1))
97
 
 
98
  async def starrail_card(id, designtype, character_id=None, character_art_url=None):
 
99
  character_art = {str(character_id): character_art_url} if character_id and character_art_url else None
 
100
  async with starrailcard.Card(seeleland=True, remove_logo=True, character_art=character_art) as card:
101
  return await card.create(id, style=(2 if str(designtype) == "2" else 1))
102
 
103
- async def genshinprofile(id):
104
- async with enkacard.ENC(uid=id) as encard:
105
- return await encard.profile(card=True)
106
 
 
107
  async def starrail_profile(id):
108
  async with starrailcard.Card(remove_logo=True, seeleland=True) as card:
109
  return await card.create_profile(id, style=2)
110
 
111
- # Utility to process images in card generation endpoints
112
- def process_images(result, player_id):
113
- characters = []
114
- with concurrent.futures.ThreadPoolExecutor() as executor:
115
- futures = [executor.submit(process_image, dt, player_id) for dt in result.card]
116
- for future in concurrent.futures.as_completed(futures):
117
- try:
118
- characters.append(future.result())
119
- except Exception as e:
120
- print(f"Error processing image: {e}")
121
- return characters
122
-
123
- # FastAPI startup event to initialize BerogovPyCAI client
124
- @app.on_event("startup")
125
- async def startup_event():
126
- global client
127
- client = await get_client(token=TOKEN)
128
-
129
- @app.on_event("shutdown")
130
- async def shutdown_event():
131
- await client.close_session()
132
-
133
- # Root endpoint
134
- @app.get("/")
135
- def root():
136
- return "Welcome to the Unified API for Media Decryption, Cloudinary Uploads, Genshin/StarRail Profiles, Character AI, and YouTube Downloader"
137
-
138
- # Decrypt and Upload Endpoint
139
- @app.get("/decrypt_and_upload")
140
- async def decrypt_and_upload(
141
- url: str = Query(..., description="URL of the encrypted file"),
142
- key: str = Query(..., description="Base64-encoded decryption key"),
143
- media_type: str = Query("image", description="Media type (image, video, audio, document)")
144
- ):
145
- try:
146
- response = requests.get(url)
147
- if response.status_code != 200:
148
- return JSONResponse(content={"error": "Failed to download the file."}, status_code=400)
149
- mediaData = response.content
150
- mediaKey = base64.b64decode(key)
151
- mediaKeyExpanded = HKDF(mediaKey, 112, appInfo[media_type])
152
- decrypted_data = AESDecrypt(
153
- mediaKeyExpanded[16:48], mediaData[:-10], mediaKeyExpanded[:16]
154
- )
155
- upload_result = cloudinary.uploader.upload(
156
- decrypted_data, resource_type="raw", format=extension.get(media_type, "bin")
157
- )
158
- file_url = upload_result.get("url")
159
- return JSONResponse(content={"decrypted_file_url": file_url})
160
- except Exception as e:
161
- return JSONResponse(content={"error": str(e)}, status_code=500)
162
 
163
- # Genshin Impact and Star Rail card/profile creation endpoints
164
  @app.get("/genshin/{id}")
165
  async def genshin_characters(id: int, design: str = "1", character_id: int = None, character_art_url: str = None):
166
  try:
167
  result = await genshin_card(id, design, character_id, character_art_url)
168
  characters = process_images(result, id)
169
  return JSONResponse(content={'response': characters})
170
- except Exception as e:
171
- return JSONResponse(content={'error': str(e)}, status_code=500)
172
 
173
- @app.get("/genshin/profile/{id}")
174
- async def genshin_profile_route(id: int):
175
- try:
176
- result = await genshinprofile(id)
177
- profile_data = process_profile(result)
178
- return JSONResponse(content={'response': profile_data})
179
  except Exception as e:
180
- return JSONResponse(content={'error': str(e)}, status_code=500)
181
 
 
182
  @app.get("/starrail/{id}")
183
  async def starrail_characters(id: int, design: str = "1", character_id: int = None, character_art_url: str = None):
184
  try:
 
185
  result = await starrail_card(id, design, character_id, character_art_url)
186
  characters = process_images(result, id)
187
  return JSONResponse(content={'response': characters})
 
188
  except Exception as e:
189
- return JSONResponse(content={'error': str(e)}, status_code=500)
 
190
 
 
191
  @app.get("/starrail/profile/{id}")
192
  async def starrail_profile_route(id: int):
193
  try:
194
  result = await starrail_profile(id)
195
  profile_data = process_profile(result)
196
  return JSONResponse(content={'response': profile_data})
197
- except Exception as e:
198
- return JSONResponse(content={'error': str(e)}, status_code=500)
199
 
200
- # Character AI message endpoint
201
- @app.get("/send_message")
202
- async def send_message(
203
- message: str,
204
- character_id: str = CHARACTER_ID,
205
- chat_id: Optional[str] = Query(None, description="ID of the existing chat session, if available.")
206
- ):
207
- try:
208
- if not chat_id:
209
- chat, greeting_message = await client.chat.create_chat(character_id)
210
- chat_id = chat.chat_id
211
- greeting_text = {
212
- "author": greeting_message.author_name,
213
- "text": greeting_message.get_primary_candidate().text
214
- }
215
- else:
216
- greeting_text = None
217
- answer = await client.chat.send_message(character_id, chat_id, message)
218
- response_data = {
219
- "chat_id": chat_id,
220
- "author": answer.author_name,
221
- "response": answer.get_primary_candidate().text
222
- }
223
- if greeting_text:
224
- response_data["greeting_message"] = greeting_text
225
- return response_data
226
- except SessionClosedError:
227
- raise HTTPException(status_code=500, detail="Session closed. Please restart the application.")
228
  except Exception as e:
229
- raise HTTPException(status_code=500, detail=str(e))
230
 
231
- # Image compression endpoint
232
- @app.get("/compress_image")
233
- async def compress_image(image_url: str = Query(..., description="URL of the image to compress")):
234
  try:
235
- upload_result = cloudinary.uploader.upload(image_url)
236
- compressed_url, options = cloudinary_url(
237
- upload_result["public_id"], fetch_format="auto", quality="auto"
238
- )
239
- return JSONResponse(content={"compressed_image_url": compressed_url})
240
  except Exception as e:
241
- return JSONResponse(content={"error": str(e)}, status_code=500)
 
 
 
 
242
 
243
- # YouTube video download and options endpoints
244
- @app.get("/download_youtube_video")
245
- async def download_youtube_video(
246
- query: str = Query(..., description="Search query for the YouTube video"),
247
- format: str = Query("mp4", description="Video format to download"),
248
- quality: str = Query("1080", description="Video quality")
249
- ):
250
  try:
251
- yt = Ytube()
252
- search_results = yt.search_videos(query)
253
- if not search_results.items:
254
- return JSONResponse(content={"error": "No videos found for the query"}, status_code=404)
255
- target_video = search_results.items[0]
256
- download_link = yt.get_download_link(target_video, format=format, quality=quality)
257
- return JSONResponse(content={
258
- "download_url": download_link.url,
259
- "filename": download_link.filename,
260
- "status": download_link.status
261
- })
262
  except Exception as e:
263
- return JSONResponse(content={"error": str(e)}, status_code=500)
264
 
265
- @app.get("/youtube_video")
266
- async def youtube_video_options(query: str = Query(..., description="Search query for the YouTube video")):
267
  try:
268
- yt = Ytube()
269
- search_results = yt.search_videos(query)
270
- if not search_results.items:
271
- return JSONResponse(content={"error": "No videos found for the query"}, status_code=404)
272
- target_video = search_results.items[0]
273
- available_downloads = []
274
- qualities = ["144", "240", "360", "480", "720", "1080"]
275
- formats = ["mp4"]
276
- for quality in qualities:
277
- for fmt in formats:
278
- try:
279
- download_link = yt.get_download_link(target_video, format=fmt, quality=quality)
280
- if download_link.url:
281
- available_downloads.append({
282
- "quality": quality,
283
- "format": fmt,
284
- "download_url": download_link.url
285
- })
286
- except Exception:
287
- continue
288
- if not available_downloads:
289
- return JSONResponse(content={"error": "No available download options found."}, status_code=404)
290
- return JSONResponse(content={
291
- "video_title": target_video.title,
292
- "available_downloads": available_downloads
293
- })
294
  except Exception as e:
295
- return JSONResponse(content={"error": str(e)}, status_code=500)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
296
 
297
  if __name__ == "__main__":
298
- import uvicorn
299
- uvicorn.run("main:app", host="0.0.0.0", port=8000)
 
1
+ import asyncio
2
  import os
3
+ from enkacard import encbanner
4
+ import concurrent.futures
5
+ import requests
6
+ from fastapi import FastAPI
 
 
7
  from io import BytesIO
8
+ from fastapi.responses import JSONResponse
9
+ import enkacard
10
+ import starrailcard
11
+ import enkanetwork
12
+ import uvicorn
13
  import cloudinary
14
  import cloudinary.uploader
15
  from cloudinary.utils import cloudinary_url
 
 
 
 
 
 
 
 
 
16
 
17
  app = FastAPI()
18
 
 
24
  secure=True
25
  )
26
 
27
+ # Genshin Impact card creation
28
+ async def genshin_card(id, designtype,character_id=None, character_art_url=None):
29
+ # Use the provided character ID and character art URL from user input, if available
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
30
  character_art = {str(character_id): character_art_url} if character_id and character_art_url else None
31
+ async with encbanner.ENC(uid=str(id), character_art=character_art) as encard:
32
  return await encard.creat(akasha=True, template=(2 if str(designtype) == "2" else 1))
33
 
34
+ # Star Rail card creation with optional character ID
35
  async def starrail_card(id, designtype, character_id=None, character_art_url=None):
36
+ # Use the provided character ID and character art URL from user input, if available
37
  character_art = {str(character_id): character_art_url} if character_id and character_art_url else None
38
+
39
  async with starrailcard.Card(seeleland=True, remove_logo=True, character_art=character_art) as card:
40
  return await card.create(id, style=(2 if str(designtype) == "2" else 1))
41
 
 
 
 
42
 
43
+ # Star Rail profile creation
44
  async def starrail_profile(id):
45
  async with starrailcard.Card(remove_logo=True, seeleland=True) as card:
46
  return await card.create_profile(id, style=2)
47
 
48
+ # Genshin profile creation
49
+ async def genshinprofile(id):
50
+ async with encbanner.ENC(uid=id) as encard:
51
+ return await encard.profile(card=True)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
52
 
53
+ # Route for Genshin Impact
54
  @app.get("/genshin/{id}")
55
  async def genshin_characters(id: int, design: str = "1", character_id: int = None, character_art_url: str = None):
56
  try:
57
  result = await genshin_card(id, design, character_id, character_art_url)
58
  characters = process_images(result, id)
59
  return JSONResponse(content={'response': characters})
 
 
60
 
61
+ except enkanetwork.exception.VaildateUIDError:
62
+ return JSONResponse(content={'error': 'Invalid UID. Please check your UID.'}, status_code=400)
63
+
64
+ except enkacard.enc_error.ENCardError:
65
+ return JSONResponse(content={'error': 'Enable display of the showcase in the game or add characters there.'}, status_code=400)
66
+
67
  except Exception as e:
68
+ return JSONResponse(content={'error': 'UNKNOWN ERR: ' + str(e)}, status_code=500)
69
 
70
+ # Route for Star Rail with optional character ID
71
  @app.get("/starrail/{id}")
72
  async def starrail_characters(id: int, design: str = "1", character_id: int = None, character_art_url: str = None):
73
  try:
74
+ # Call starrail_card function with optional character ID and art URL
75
  result = await starrail_card(id, design, character_id, character_art_url)
76
  characters = process_images(result, id)
77
  return JSONResponse(content={'response': characters})
78
+
79
  except Exception as e:
80
+ return JSONResponse(content={'error': 'UNKNOWN ERR: ' + str(e)}, status_code=500)
81
+
82
 
83
+ # Route for Star Rail profile
84
  @app.get("/starrail/profile/{id}")
85
  async def starrail_profile_route(id: int):
86
  try:
87
  result = await starrail_profile(id)
88
  profile_data = process_profile(result)
89
  return JSONResponse(content={'response': profile_data})
 
 
90
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
91
  except Exception as e:
92
+ return JSONResponse(content={'error': 'UNKNOWN ERR: ' + str(e)}, status_code=500)
93
 
94
+ # Route for Genshin profile
95
+ @app.get("/genshin/profile/{id}")
96
+ async def genshin_profile_route(id: int):
97
  try:
98
+ result = await genshinprofile(id)
99
+ profile_data = process_profile(result)
100
+ return JSONResponse(content={'response': profile_data})
101
+
 
102
  except Exception as e:
103
+ return JSONResponse(content={'error': 'UNKNOWN ERR: ' + str(e)}, status_code=500)
104
+
105
+ @app.get("/")
106
+ def root():
107
+ return "خدتك عليه"
108
 
109
+ # Helper function to upload the image to Cloudinary
110
+ def upload_image(data, character_id, player_id):
 
 
 
 
 
111
  try:
112
+ # Set the public_id to include the character ID and the given player ID
113
+ public_id = f"{character_id}_{player_id}"
114
+
115
+ # Upload image to Cloudinary with the specified public_id
116
+ upload_result = cloudinary.uploader.upload(data, folder="card_images", public_id=public_id, invalidate=True)
117
+
118
+ # Get the secure URL of the uploaded image
119
+ return upload_result["secure_url"]
 
 
 
120
  except Exception as e:
121
+ raise Exception(f"Cloudinary upload error: {str(e)}")
122
 
123
+ # Helper function to upload the image to Cloudinary without player or character ID
124
+ def upload_imagee(data):
125
  try:
126
+ # Upload image to Cloudinary without specifying a public_id
127
+ upload_result = cloudinary.uploader.upload(data, folder="card_images", invalidate=True)
128
+
129
+ # Get the secure URL of the uploaded image
130
+ return upload_result["secure_url"]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
131
  except Exception as e:
132
+ raise Exception(f"Cloudinary upload error: {str(e)}")
133
+
134
+ # Process individual image card
135
+ def process_image(dt, player_id):
136
+ with BytesIO() as byte_io:
137
+ dt.card.save(byte_io, "PNG")
138
+ byte_io.seek(0)
139
+ # Upload the image using the character's ID and player ID
140
+ image_url = upload_image(byte_io, dt.id, player_id)
141
+ return {
142
+ "name": dt.name,
143
+ "id": dt.id,
144
+ "url": image_url
145
+ }
146
+
147
+ # Process the profile image
148
+ def process_profile(profile_card):
149
+ with BytesIO() as byte_io:
150
+ profile_card.card.save(byte_io, "PNG")
151
+ byte_io.seek(0)
152
+ # Upload the image without using the character or player ID
153
+ image_url = upload_imagee(byte_io)
154
+ return {
155
+ "url": image_url,
156
+ }
157
+
158
+ # Process all the images returned
159
+ def process_images(result, player_id):
160
+ characters = []
161
+ with concurrent.futures.ThreadPoolExecutor() as executor:
162
+ futures = [executor.submit(process_image, dt, player_id) for dt in result.card]
163
+ for future in concurrent.futures.as_completed(futures):
164
+ try:
165
+ characters.append(future.result())
166
+ except Exception as e:
167
+ print(f"Error processing image: {e}")
168
+ return characters
169
 
170
  if __name__ == "__main__":
171
+ uvicorn.run("main:app", host="0.0.0.0", port=7860, workers=8, timeout_keep_alive=60000)