Akane710 commited on
Commit
616d8b2
·
verified ·
1 Parent(s): 661757d

Update main.py

Browse files
Files changed (1) hide show
  1. main.py +217 -126
main.py CHANGED
@@ -1,18 +1,24 @@
1
- import asyncio
2
  import os
3
- from enkacard import encbanner
4
- import concurrent.futures
5
- import requests
6
- from fastapi import FastAPI
7
- from io import BytesIO
8
  from fastapi.responses import JSONResponse
9
- import enkacard
10
- import starrailcard
11
- import enkanetwork
12
- import uvicorn
13
  import cloudinary
14
  import cloudinary.uploader
15
  from cloudinary.utils import cloudinary_url
 
 
 
 
 
 
 
 
 
16
 
17
  app = FastAPI()
18
 
@@ -24,148 +30,233 @@ cloudinary.config(
24
  secure=True
25
  )
26
 
27
- # Genshin Impact card creation
28
- async def genshin_card(id, designtype,character_id=None, character_art_url=None):
29
- # Use the provided character ID and character art URL from user input, if available
30
- character_art = {str(character_id): character_art_url} if character_id and character_art_url else None
31
- async with encbanner.ENC(uid=str(id), character_art=character_art) as encard:
32
- return await encard.creat(akasha=True, template=(2 if str(designtype) == "2" else 1))
33
-
34
- # Star Rail card creation with optional character ID
35
- async def starrail_card(id, designtype, character_id=None, character_art_url=None):
36
- # Use the provided character ID and character art URL from user input, if available
37
- character_art = {str(character_id): character_art_url} if character_id and character_art_url else None
38
-
39
- async with starrailcard.Card(seeleland=True, remove_logo=True, character_art=character_art) as card:
40
- return await card.create(id, style=(2 if str(designtype) == "2" else 1))
41
-
42
-
43
- # Star Rail profile creation
44
- async def starrail_profile(id):
45
- async with starrailcard.Card(remove_logo=True, seeleland=True) as card:
46
- return await card.create_profile(id, style=2)
47
-
48
- # Genshin profile creation
49
- async def genshinprofile(id):
50
- async with encbanner.ENC(uid=id) as encard:
51
- return await encard.profile(card=True)
52
-
53
- # Route for Genshin Impact
54
- @app.get("/genshin/{id}")
55
- async def genshin_characters(id: int, design: str = "1", character_id: int = None, character_art_url: str = None):
56
- try:
57
- result = await genshin_card(id, design, character_id, character_art_url)
58
- characters = process_images(result, id)
59
- return JSONResponse(content={'response': characters})
60
 
61
- except enkanetwork.exception.VaildateUIDError:
62
- return JSONResponse(content={'error': 'Invalid UID. Please check your UID.'}, status_code=400)
 
 
 
 
 
 
 
 
 
 
 
63
 
64
- except enkacard.enc_error.ENCardError:
65
- return JSONResponse(content={'error': 'Enable display of the showcase in the game or add characters there.'}, status_code=400)
 
 
 
 
66
 
67
- except Exception as e:
68
- return JSONResponse(content={'error': 'UNKNOWN ERR: ' + str(e)}, status_code=500)
 
 
 
 
 
 
 
 
 
 
 
 
 
69
 
70
- # Route for Star Rail with optional character ID
71
- @app.get("/starrail/{id}")
72
- async def starrail_characters(id: int, design: str = "1", character_id: int = None, character_art_url: str = None):
73
- try:
74
- # Call starrail_card function with optional character ID and art URL
75
- result = await starrail_card(id, design, character_id, character_art_url)
76
- characters = process_images(result, id)
77
- return JSONResponse(content={'response': characters})
78
 
 
 
 
 
 
 
 
 
 
 
 
 
79
  except Exception as e:
80
- return JSONResponse(content={'error': 'UNKNOWN ERR: ' + str(e)}, status_code=500)
 
 
 
 
 
 
 
 
 
 
 
 
81
 
 
 
 
 
 
82
 
83
- # Route for Star Rail profile
84
- @app.get("/starrail/profile/{id}")
85
- async def starrail_profile_route(id: int):
 
 
 
 
 
 
 
 
 
 
 
 
 
86
  try:
87
- result = await starrail_profile(id)
88
- profile_data = process_profile(result)
89
- return JSONResponse(content={'response': profile_data})
 
 
 
 
 
 
 
 
 
 
 
 
 
90
 
 
 
 
 
 
 
 
91
  except Exception as e:
92
- return JSONResponse(content={'error': 'UNKNOWN ERR: ' + str(e)}, status_code=500)
93
 
94
- # Route for Genshin profile
95
  @app.get("/genshin/profile/{id}")
96
  async def genshin_profile_route(id: int):
97
  try:
98
  result = await genshinprofile(id)
99
  profile_data = process_profile(result)
100
  return JSONResponse(content={'response': profile_data})
101
-
102
  except Exception as e:
103
- return JSONResponse(content={'error': 'UNKNOWN ERR: ' + str(e)}, status_code=500)
104
-
105
- @app.get("/")
106
- def root():
107
- return "خدتك عليه"
108
 
109
- # Helper function to upload the image to Cloudinary
110
- def upload_image(data, character_id, player_id):
 
 
 
 
 
111
  try:
112
- # Set the public_id to include the character ID and the given player ID
113
- public_id = f"{character_id}_{player_id}"
114
-
115
- # Upload image to Cloudinary with the specified public_id
116
- upload_result = cloudinary.uploader.upload(data, folder="card_images", public_id=public_id, invalidate=True)
117
-
118
- # Get the secure URL of the uploaded image
119
- return upload_result["secure_url"]
 
 
 
 
 
 
 
 
 
 
 
 
120
  except Exception as e:
121
- raise Exception(f"Cloudinary upload error: {str(e)}")
122
 
123
- # Helper function to upload the image to Cloudinary without player or character ID
124
- def upload_imagee(data):
 
125
  try:
126
- # Upload image to Cloudinary without specifying a public_id
127
- upload_result = cloudinary.uploader.upload(data, folder="card_images", invalidate=True)
128
-
129
- # Get the secure URL of the uploaded image
130
- return upload_result["secure_url"]
131
  except Exception as e:
132
- raise Exception(f"Cloudinary upload error: {str(e)}")
133
-
134
- # Process individual image card
135
- def process_image(dt, player_id):
136
- with BytesIO() as byte_io:
137
- dt.card.save(byte_io, "PNG")
138
- byte_io.seek(0)
139
- # Upload the image using the character's ID and player ID
140
- image_url = upload_image(byte_io, dt.id, player_id)
141
- return {
142
- "name": dt.name,
143
- "id": dt.id,
144
- "url": image_url
145
- }
146
 
147
- # Process the profile image
148
- def process_profile(profile_card):
149
- with BytesIO() as byte_io:
150
- profile_card.card.save(byte_io, "PNG")
151
- byte_io.seek(0)
152
- # Upload the image without using the character or player ID
153
- image_url = upload_imagee(byte_io)
154
- return {
155
- "url": image_url,
156
- }
 
 
 
 
 
 
 
 
 
 
 
157
 
158
- # Process all the images returned
159
- def process_images(result, player_id):
160
- characters = []
161
- with concurrent.futures.ThreadPoolExecutor() as executor:
162
- futures = [executor.submit(process_image, dt, player_id) for dt in result.card]
163
- for future in concurrent.futures.as_completed(futures):
164
- try:
165
- characters.append(future.result())
166
- except Exception as e:
167
- print(f"Error processing image: {e}")
168
- return characters
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
169
 
170
  if __name__ == "__main__":
171
- uvicorn.run("main:app", host="0.0.0.0", port=7860, workers=8, timeout_keep_alive=60000)
 
 
 
1
  import os
2
+ import base64
3
+ import hashlib
4
+ import hmac
5
+ from fastapi import FastAPI, Query, HTTPException
 
6
  from fastapi.responses import JSONResponse
7
+ from typing import Optional
8
+ from io import BytesIO
9
+ from Crypto.Cipher import AES
 
10
  import cloudinary
11
  import cloudinary.uploader
12
  from cloudinary.utils import cloudinary_url
13
+ import requests
14
+ import asyncio
15
+ import concurrent.futures
16
+ from ytube_api import Ytube
17
+ from BerogovPyCAI import get_client
18
+ from BerogovPyCAI.exceptions import SessionClosedError
19
+ import enkanetwork
20
+ import enkacard
21
+ import starrailcard
22
 
23
  app = FastAPI()
24
 
 
30
  secure=True
31
  )
32
 
33
+ TOKEN = "529e24b4173b29dbc3054fef02a380e1e5b41949"
34
+ CHARACTER_ID = "smtV3Vyez6ODkwS8BErmBAdgGNj-1XWU73wIFVOY1hQ"
35
+ client = None # BerogovPyCAI client to be initialized on startup
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
36
 
37
+ appInfo = {
38
+ "image": b"WhatsApp Image Keys",
39
+ "video": b"WhatsApp Video Keys",
40
+ "audio": b"WhatsApp Audio Keys",
41
+ "document": b"WhatsApp Document Keys",
42
+ "image/webp": b"WhatsApp Image Keys",
43
+ "image/jpeg": b"WhatsApp Image Keys",
44
+ "image/png": b"WhatsApp Image Keys",
45
+ "video/mp4": b"WhatsApp Video Keys",
46
+ "audio/aac": b"WhatsApp Audio Keys",
47
+ "audio/ogg": b"WhatsApp Audio Keys",
48
+ "audio/wav": b"WhatsApp Audio Keys",
49
+ }
50
 
51
+ extension = {
52
+ "image": "jpg",
53
+ "video": "mp4",
54
+ "audio": "ogg",
55
+ "document": "bin",
56
+ }
57
 
58
+ # Key Derivation Function using HKDF
59
+ def HKDF(key, length, appInfo=b""):
60
+ key = hmac.new(b"\0"*32, key, hashlib.sha256).digest()
61
+ keyStream = b""
62
+ keyBlock = b""
63
+ blockIndex = 1
64
+ while len(keyStream) < length:
65
+ keyBlock = hmac.new(
66
+ key,
67
+ msg=keyBlock + appInfo + (chr(blockIndex).encode("utf-8")),
68
+ digestmod=hashlib.sha256
69
+ ).digest()
70
+ blockIndex += 1
71
+ keyStream += keyBlock
72
+ return keyStream[:length]
73
 
74
+ def AESUnpad(s):
75
+ return s[:-ord(s[len(s)-1:])]
 
 
 
 
 
 
76
 
77
+ def AESDecrypt(key, ciphertext, iv):
78
+ cipher = AES.new(key, AES.MODE_CBC, iv)
79
+ plaintext = cipher.decrypt(ciphertext)
80
+ return AESUnpad(plaintext)
81
+
82
+ # Helper functions for Cloudinary upload
83
+ def upload_image(data, public_id=None):
84
+ try:
85
+ upload_result = cloudinary.uploader.upload(
86
+ data, folder="card_images", public_id=public_id, invalidate=True
87
+ )
88
+ return upload_result["secure_url"]
89
  except Exception as e:
90
+ raise Exception(f"Cloudinary upload error: {str(e)}")
91
+
92
+ # Utility to process images in card generation endpoints
93
+ def process_images(result, player_id):
94
+ characters = []
95
+ with concurrent.futures.ThreadPoolExecutor() as executor:
96
+ futures = [executor.submit(process_image, dt, player_id) for dt in result.card]
97
+ for future in concurrent.futures.as_completed(futures):
98
+ try:
99
+ characters.append(future.result())
100
+ except Exception as e:
101
+ print(f"Error processing image: {e}")
102
+ return characters
103
 
104
+ # FastAPI startup event to initialize BerogovPyCAI client
105
+ @app.on_event("startup")
106
+ async def startup_event():
107
+ global client
108
+ client = await get_client(token=TOKEN)
109
 
110
+ @app.on_event("shutdown")
111
+ async def shutdown_event():
112
+ await client.close_session()
113
+
114
+ # Root endpoint
115
+ @app.get("/")
116
+ def root():
117
+ return "Welcome to the Unified API for Media Decryption, Cloudinary Uploads, Genshin/StarRail Profiles, Character AI, and YouTube Downloader"
118
+
119
+ # Decrypt and Upload Endpoint
120
+ @app.get("/decrypt_and_upload")
121
+ async def decrypt_and_upload(
122
+ url: str = Query(..., description="URL of the encrypted file"),
123
+ key: str = Query(..., description="Base64-encoded decryption key"),
124
+ media_type: str = Query("image", description="Media type (image, video, audio, document)")
125
+ ):
126
  try:
127
+ response = requests.get(url)
128
+ if response.status_code != 200:
129
+ return JSONResponse(content={"error": "Failed to download the file."}, status_code=400)
130
+ mediaData = response.content
131
+ mediaKey = base64.b64decode(key)
132
+ mediaKeyExpanded = HKDF(mediaKey, 112, appInfo[media_type])
133
+ decrypted_data = AESDecrypt(
134
+ mediaKeyExpanded[16:48], mediaData[:-10], mediaKeyExpanded[:16]
135
+ )
136
+ upload_result = cloudinary.uploader.upload(
137
+ decrypted_data, resource_type="raw", format=extension.get(media_type, "bin")
138
+ )
139
+ file_url = upload_result.get("url")
140
+ return JSONResponse(content={"decrypted_file_url": file_url})
141
+ except Exception as e:
142
+ return JSONResponse(content={"error": str(e)}, status_code=500)
143
 
144
+ # Genshin Impact and Star Rail card/profile creation endpoints (Example for Genshin)
145
+ @app.get("/genshin/{id}")
146
+ async def genshin_characters(id: int, design: str = "1", character_id: int = None, character_art_url: str = None):
147
+ try:
148
+ result = await genshin_card(id, design, character_id, character_art_url)
149
+ characters = process_images(result, id)
150
+ return JSONResponse(content={'response': characters})
151
  except Exception as e:
152
+ return JSONResponse(content={'error': str(e)}, status_code=500)
153
 
 
154
  @app.get("/genshin/profile/{id}")
155
  async def genshin_profile_route(id: int):
156
  try:
157
  result = await genshinprofile(id)
158
  profile_data = process_profile(result)
159
  return JSONResponse(content={'response': profile_data})
 
160
  except Exception as e:
161
+ return JSONResponse(content={'error': str(e)}, status_code=500)
 
 
 
 
162
 
163
+ # Character AI message endpoint
164
+ @app.get("/send_message")
165
+ async def send_message(
166
+ message: str,
167
+ character_id: str = CHARACTER_ID,
168
+ chat_id: Optional[str] = Query(None, description="ID of the existing chat session, if available.")
169
+ ):
170
  try:
171
+ if not chat_id:
172
+ chat, greeting_message = await client.chat.create_chat(character_id)
173
+ chat_id = chat.chat_id
174
+ greeting_text = {
175
+ "author": greeting_message.author_name,
176
+ "text": greeting_message.get_primary_candidate().text
177
+ }
178
+ else:
179
+ greeting_text = None
180
+ answer = await client.chat.send_message(character_id, chat_id, message)
181
+ response_data = {
182
+ "chat_id": chat_id,
183
+ "author": answer.author_name,
184
+ "response": answer.get_primary_candidate().text
185
+ }
186
+ if greeting_text:
187
+ response_data["greeting_message"] = greeting_text
188
+ return response_data
189
+ except SessionClosedError:
190
+ raise HTTPException(status_code=500, detail="Session closed. Please restart the application.")
191
  except Exception as e:
192
+ raise HTTPException(status_code=500, detail=str(e))
193
 
194
+ # Image compression endpoint
195
+ @app.get("/compress_image")
196
+ async def compress_image(image_url: str = Query(..., description="URL of the image to compress")):
197
  try:
198
+ upload_result = cloudinary.uploader.upload(image_url)
199
+ compressed_url, options = cloudinary_url(
200
+ upload_result["public_id"], fetch_format="auto", quality="auto"
201
+ )
202
+ return JSONResponse(content={"compressed_image_url": compressed_url})
203
  except Exception as e:
204
+ return JSONResponse(content={"error": str(e)}, status_code=500)
 
 
 
 
 
 
 
 
 
 
 
 
 
205
 
206
+ # YouTube video download and options endpoints
207
+ @app.get("/download_youtube_video")
208
+ async def download_youtube_video(
209
+ query: str = Query(..., description="Search query for the YouTube video"),
210
+ format: str = Query("mp4", description="Video format to download"),
211
+ quality: str = Query("1080", description="Video quality")
212
+ ):
213
+ try:
214
+ yt = Ytube()
215
+ search_results = yt.search_videos(query)
216
+ if not search_results.items:
217
+ return JSONResponse(content={"error": "No videos found for the query"}, status_code=404)
218
+ target_video = search_results.items[0]
219
+ download_link = yt.get_download_link(target_video, format=format, quality=quality)
220
+ return JSONResponse(content={
221
+ "download_url": download_link.url,
222
+ "filename": download_link.filename,
223
+ "status": download_link.status
224
+ })
225
+ except Exception as e:
226
+ return JSONResponse(content={"error": str(e)}, status_code=500)
227
 
228
+ @app.get("/youtube_video")
229
+ async def youtube_video_options(query: str = Query(..., description="Search query for the YouTube video")):
230
+ try:
231
+ yt = Ytube()
232
+ search_results = yt.search_videos(query)
233
+ if not search_results.items:
234
+ return JSONResponse(content={"error": "No videos found for the query"}, status_code=404)
235
+ target_video = search_results.items[0]
236
+ available_downloads = []
237
+ qualities = ["144", "240", "360", "480", "720", "1080"]
238
+ formats = ["mp4"]
239
+ for quality in qualities:
240
+ for fmt in formats:
241
+ try:
242
+ download_link = yt.get_download_link(target_video, format=fmt, quality=quality)
243
+ if download_link.url:
244
+ available_downloads.append({
245
+ "quality": quality,
246
+ "format": fmt,
247
+ "download_url": download_link.url
248
+ })
249
+ except Exception:
250
+ continue
251
+ if not available_downloads:
252
+ return JSONResponse(content={"error": "No available download options found."}, status_code=404)
253
+ return JSONResponse(content={
254
+ "video_title": target_video.title,
255
+ "available_downloads": available_downloads
256
+ })
257
+ except Exception as e:
258
+ return JSONResponse(content={"error": str(e)}, status_code=500)
259
 
260
  if __name__ == "__main__":
261
+ import uvicorn
262
+ uvicorn.run("main:app", host="0.0.0.0", port=8000)