lensfastapi / main.py
tachibanaa710's picture
Update main.py
c3098a3 verified
from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.staticfiles import StaticFiles
from fastapi.responses import JSONResponse, FileResponse
from typing import Dict, Optional, Literal
from dataclasses import dataclass, asdict
import httpx
import asyncio
from functools import wraps
import zipfile
import os
import akinator
import json
import os
import time
from typing import Dict, Optional
from datetime import datetime, timedelta
from fastapi import HTTPException, Query
from pydantic import BaseModel
from typing import Literal
import threading
import pickle
import random
import shutil
from pathlib import Path
from typing import Optional, List
import uuid
from datetime import datetime, timedelta
import logging
import asyncio
from zenka import zenka, ZZZError, CacheConfig, Lang
import os
from enum import Enum
from enkacard import encbanner
from enkanetwork import EnkaNetworkAPI
from starrailcard.src.api import enka
import concurrent.futures
import requests
import traceback
from fastapi import FastAPI,Query
from io import BytesIO
from fastapi.responses import JSONResponse
import enkacard
import starrailcard
import enkanetwork
import uvicorn
import cloudinary
import cloudinary.uploader
from cloudinary.utils import cloudinary_url
import pydantic
from pydantic import BaseModel
import genshin
from packaging import version
import mangadex as md
from fastapi import FastAPI, Query, HTTPException
from fastapi.responses import JSONResponse, FileResponse
from fastapi.staticfiles import StaticFiles
from reportlab.pdfgen import canvas
import tempfile
from PIL import Image as PILImage
import httpx
import pikepdf
from pathlib import Path
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
from genshin import Client
import logging
import json
import threading
import time
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, Optional
from dataclasses import dataclass, asdict
import akinator
from fastapi import HTTPException, Query
from pydantic import BaseModel
from typing import Literal
import asyncio
from functools import wraps
app = FastAPI(title="my api", version="1.0.0")
# Create directories
STATIC_DIR = Path("static")
TEMP_DIR = Path("temp")
IMAGES_DIR = STATIC_DIR / "images"
ZIPS_DIR = STATIC_DIR / "zips"
for directory in [STATIC_DIR, TEMP_DIR, IMAGES_DIR, ZIPS_DIR]:
directory.mkdir(exist_ok=True)
# Mount static files
app.mount("/static", StaticFiles(directory="static"), name="static")
Path("static").mkdir(exist_ok=True)
# Configuration
MAX_IMAGES = 1000
MAX_INDIVIDUAL_IMAGES = 10
SAFEBOORU_API_URL = "https://safebooru.org/index.php"
CLEANUP_INTERVAL_HOURS = 24
class SafebooruService:
def __init__(self):
self.session = None
async def get_session(self):
if self.session is None:
self.session = httpx.AsyncClient(timeout=30.0)
return self.session
async def search_images(self, tags: str, limit: int = 1000) -> List[dict]:
"""Search images from Safebooru API"""
session = await self.get_session()
params = {
"page": "dapi",
"s": "post",
"q": "index",
"json": "1",
"limit": limit,
"tags": tags.replace(" ", "_")
}
try:
response = await session.get(SAFEBOORU_API_URL, params=params)
response.raise_for_status()
data = response.json()
if not data:
return []
# Filter for supported image formats
filtered_data = [
img for img in data
if img.get("image", "").lower().endswith((".jpg", ".jpeg", ".png"))
]
return filtered_data
except Exception as e:
logger.error(f"Error searching images: {e}")
raise HTTPException(status_code=500, detail="Failed to search images")
async def download_image(self, img_data: dict, filename: str, max_retries: int = 3) -> Optional[str]:
"""Download image with retry mechanism"""
session = await self.get_session()
image_url = f"https://safebooru.org/images/{img_data['directory']}/{img_data['image']}"
for attempt in range(max_retries):
try:
if attempt > 0:
await asyncio.sleep(1 * attempt) # Exponential backoff
response = await session.get(image_url)
response.raise_for_status()
file_path = IMAGES_DIR / filename
with open(file_path, "wb") as f:
f.write(response.content)
logger.info(f"Downloaded: {filename}")
return str(file_path)
except Exception as e:
logger.error(f"Download attempt {attempt + 1} failed for {filename}: {e}")
if attempt == max_retries - 1:
logger.error(f"Failed to download {filename} after {max_retries} attempts")
return None
return None
safebooru_service = SafebooruService()
async def cleanup_old_files():
"""Clean up files older than 24 hours"""
cutoff_time = datetime.now() - timedelta(hours=CLEANUP_INTERVAL_HOURS)
for directory in [IMAGES_DIR, ZIPS_DIR]:
for file_path in directory.iterdir():
if file_path.is_file():
file_time = datetime.fromtimestamp(file_path.stat().st_mtime)
if file_time < cutoff_time:
try:
file_path.unlink()
logger.info(f"Cleaned up old file: {file_path}")
except Exception as e:
logger.error(f"Failed to clean up {file_path}: {e}")
@app.on_event("startup")
async def startup_event():
"""Startup tasks"""
logger.info("FastAPI Safebooru Clone starting up...")
# Initial cleanup
await cleanup_old_files()
@app.on_event("shutdown")
async def shutdown_event():
"""Cleanup on shutdown"""
if safebooru_service.session:
await safebooru_service.session.aclose()
@app.get("/")
async def root():
"""API Documentation"""
return {
"message": "Safebooru API Clone",
"endpoints": {
"/search": "Search and get images",
"/cleanup": "Manual cleanup of old files",
"/health": "Health check"
},
"usage": {
"individual_images": "/search?tags=hakurei_reimu&count=5",
"zip_file": "/search?tags=hakurei_reimu&count=20"
}
}
@app.get("/search")
async def search_safebooru(
tags: str,
count: Optional[int] = 5,
background_tasks: BackgroundTasks = None
):
"""
Search Safebooru and return direct links to images
- **tags**: Search tags (space-separated, will be converted to underscores)
- **count**: Number of images to return (1-50, default: 5)
"""
if not tags.strip():
raise HTTPException(status_code=400, detail="Tags parameter is required")
# Validate count
if count < 1:
count = 1
elif count > MAX_IMAGES:
count = MAX_IMAGES
try:
# Search for images
logger.info(f"Searching for tags: {tags}, count: {count}")
images_data = await safebooru_service.search_images(tags, limit=1000)
if not images_data:
raise HTTPException(status_code=404, detail=f"No results found for tags: {tags}")
# Shuffle and select requested number of images
random.shuffle(images_data)
selected_images = images_data[:count]
# Generate unique session ID
session_id = str(uuid.uuid4())
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
if count <= MAX_INDIVIDUAL_IMAGES:
# Return individual image links
image_links = []
download_tasks = []
for i, img_data in enumerate(selected_images):
filename = f"{session_id}_{i+1}_{img_data['image']}"
download_tasks.append(
safebooru_service.download_image(img_data, filename)
)
# Download all images concurrently
downloaded_paths = await asyncio.gather(*download_tasks, return_exceptions=True)
for i, path in enumerate(downloaded_paths):
if isinstance(path, str) and path:
relative_path = Path(path).relative_to(STATIC_DIR)
image_links.append({
"index": i + 1,
"filename": Path(path).name,
"url": f"/static/{relative_path}",
"direct_link": f"https://tachibanaa710-lensfastapi.hf.space/static/{relative_path}"
})
# Schedule cleanup
if background_tasks:
background_tasks.add_task(cleanup_old_files)
return {
"query": tags,
"total_found": len(images_data),
"requested_count": count,
"delivered_count": len(image_links),
"type": "individual_images",
"images": image_links,
"session_id": session_id
}
else:
# Create ZIP file for multiple images
zip_filename = f"safebooru_{tags.replace(' ', '_')}_{count}images_{timestamp}_{session_id[:8]}.zip"
zip_path = ZIPS_DIR / zip_filename
# Create temporary directory for downloads
temp_session_dir = TEMP_DIR / session_id
temp_session_dir.mkdir(exist_ok=True)
try:
# Download images to temp directory
download_tasks = []
for i, img_data in enumerate(selected_images):
temp_filename = f"{i+1:02d}_{img_data['image']}"
download_tasks.append(
download_image_to_temp(img_data, temp_session_dir / temp_filename)
)
downloaded_files = await asyncio.gather(*download_tasks, return_exceptions=True)
successful_downloads = [f for f in downloaded_files if isinstance(f, str) and f]
if not successful_downloads:
raise HTTPException(status_code=500, detail="Failed to download any images")
# Create ZIP file
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
for file_path in successful_downloads:
file_path_obj = Path(file_path)
if file_path_obj.exists():
zipf.write(file_path_obj, file_path_obj.name)
# Cleanup temp directory
shutil.rmtree(temp_session_dir, ignore_errors=True)
# Get file size
file_size_mb = zip_path.stat().st_size / (1024 * 1024)
# Schedule cleanup
if background_tasks:
background_tasks.add_task(cleanup_old_files)
return {
"query": tags,
"total_found": len(images_data),
"requested_count": count,
"delivered_count": len(successful_downloads),
"type": "zip_file",
"zip_info": {
"filename": zip_filename,
"size_mb": round(file_size_mb, 2),
"url": f"/static/zips/{zip_filename}",
"direct_link": f"https://tachibanaa710-lensfastapi.hf.space/static/zips/{zip_filename}",
"download_url": f"/download/{zip_filename}"
},
"session_id": session_id
}
except Exception as e:
# Cleanup on error
shutil.rmtree(temp_session_dir, ignore_errors=True)
if zip_path.exists():
zip_path.unlink()
raise HTTPException(status_code=500, detail=f"Failed to create ZIP file: {str(e)}")
except HTTPException:
raise
except Exception as e:
logger.error(f"Unexpected error in search endpoint: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
async def download_image_to_temp(img_data: dict, file_path: Path, max_retries: int = 3) -> Optional[str]:
"""Download image to temporary location"""
session = await safebooru_service.get_session()
image_url = f"https://safebooru.org/images/{img_data['directory']}/{img_data['image']}"
for attempt in range(max_retries):
try:
if attempt > 0:
await asyncio.sleep(1 * attempt)
response = await session.get(image_url)
response.raise_for_status()
with open(file_path, "wb") as f:
f.write(response.content)
return str(file_path)
except Exception as e:
logger.error(f"Download attempt {attempt + 1} failed: {e}")
if attempt == max_retries - 1:
return None
return None
@app.get("/download/{filename}")
async def download_zip(filename: str):
"""Download ZIP file endpoint"""
zip_path = ZIPS_DIR / filename
if not zip_path.exists():
raise HTTPException(status_code=404, detail="File not found")
return FileResponse(
path=zip_path,
filename=filename,
media_type="application/zip"
)
@app.get("/health")
async def health_check():
"""Health check endpoint"""
return {
"status": "healthy",
"timestamp": datetime.now().isoformat(),
"images_count": len(list(IMAGES_DIR.glob("*"))),
"zips_count": len(list(ZIPS_DIR.glob("*")))
}
@app.post("/cleanup")
async def manual_cleanup():
"""Manually trigger cleanup of old files"""
await cleanup_old_files()
return {
"message": "Cleanup completed",
"timestamp": datetime.now().isoformat()
}
pydantic_version = version.parse(pydantic.__version__)
if pydantic_version.major >= 2:
# Use Pydantic v2-compatible imports or replacements
print("Running with Pydantic v2")
# Add any required adjustments for Pydantic v2 here
else:
# Use Pydantic v1-compatible imports or replacements
print("Running with Pydantic v1")
# No changes needed for older code compatible with v1
# Cloudinary configuration
cloudinary.config(
cloud_name=os.getenv("cloudname"),
api_key=os.getenv("key"),
api_secret=os.getenv("secret"),
secure=True
)
# Genshin Impact card creation
async def genshin_card(id, designtype,character_id=None, character_art_url=None):
# Use the provided character ID and character art URL from user input, if available
character_art = {str(character_id): character_art_url} if character_id and character_art_url else None
async with encbanner.ENC(uid=str(id), character_art=character_art) as encard:
return await encard.creat(template=(2 if str(designtype) == "2" else 1))
# Star Rail card creation with optional character ID and cookies
async def starrail_card(id, designtype, character_id=None, character_art_url=None, ltmid_v2=None, ltoken_v2=None, ltuid_v2=None):
character_art = {str(character_id): character_art_url} if character_id and character_art_url else None
# Use cookies if provided
if ltmid_v2 and ltoken_v2 and ltuid_v2:
cookie = {
"ltmid_v2": ltmid_v2,
"ltoken_v2": ltoken_v2,
"ltuid_v2": ltuid_v2
}
async with starrailcard.HoYoCard(cookie=cookie,seeleland=True, remove_logo=True, character_art=character_art,boost_speed = True) as card:
return await card.create(id,force_update = True, style=(2 if str(designtype) == "2" else 1))
else:
# Fallback to the existing process
async with starrailcard.Card(seeleland=True, remove_logo=True, character_art=character_art,boost_speed = True,enka=True) as card:
return await card.create(id,force_update = True, style=(2 if str(designtype) == "2" else 1))
# Star Rail profile creation
async def starrail_profile(id):
async with starrailcard.Card(remove_logo=True, seeleland=True,boost_speed = True,enka=True) as card:
return await card.create_profile(id,force_update = True, style=2)
# Genshin profile creation
async def genshinprofile(id):
async with encbanner.ENC(uid=id) as encard:
return await encard.profile(card=True)
# Route for Genshin Impact
@app.get("/genshin/{id}")
async def genshin_characters(id: int, design: str = "1", character_id: int = None, character_art_url: str = None):
try:
result = await genshin_card(id, design, character_id, character_art_url)
characters = process_images(result, id)
return JSONResponse(content={'response': characters})
except enkanetwork.exception.VaildateUIDError:
return JSONResponse(content={'error': 'Invalid UID. Please check your UID.'}, status_code=400)
except enkacard.enc_error.ENCardError:
return JSONResponse(content={'error': 'Enable display of the showcase in the game or add characters there.'}, status_code=400)
except Exception as e:
return JSONResponse(content={'error': 'UNKNOWN ERR: ' + str(e)}, status_code=500)
# Route for Star Rail with optional character ID
@app.get("/starrail/{id}")
async def starrail_characters(
id: int,
design: str = "1",
character_id: int = None,
character_art_url: str = None,
ltmid_v2: str = None,
ltoken_v2: str = None,
ltuid_v2: str = None
):
try:
# Call starrail_card with cookies if provided
result = await starrail_card(id, design, character_id, character_art_url, ltmid_v2, ltoken_v2, ltuid_v2)
characters = process_images(result, id)
return JSONResponse(content={'response': characters})
except Exception as e:
return JSONResponse(content={'error': 'UNKNOWN ERR: ' + str(e)}, status_code=500)
# Route for Star Rail profile
@app.get("/starrail/profile/{id}")
async def starrail_profile_route(id: int):
try:
result = await starrail_profile(id)
profile_data = process_profile(result)
return JSONResponse(content={'response': profile_data})
except Exception as e:
return JSONResponse(content={'error': 'UNKNOWN ERR: ' + str(e)}, status_code=500)
# Route for Genshin profile
@app.get("/genshin/profile/{id}")
async def genshin_profile_route(id: int):
try:
result = await genshinprofile(id)
profile_data = process_profile(result)
return JSONResponse(content={'response': profile_data})
except Exception as e:
return JSONResponse(content={'error': 'UNKNOWN ERR: ' + str(e)}, status_code=500)
@app.get("/")
def root():
return "خدتك عليه"
@app.get("/ايه")
def root():
return "خدتك عليه"
@app.get("/update")
async def update():
try:
# Update assets using EnkaNetworkAPI and ApiEnkaNetwork
async def update_assets() -> None:
async with EnkaNetworkAPI() as client:
await client.update_assets()
await enka.ApiEnkaNetwork().update_assets()
await asyncio.create_task(update_assets())
return JSONResponse(content={'response': 'Assets updated successfully'})
except Exception as e:
error_details = traceback.format_exc()
return JSONResponse(
content={'error': f'UNKNOWN ERR: {str(e)}', 'details': error_details},
status_code=500
)
@app.get("/updatezzz")
async def maiin():
try:
async with zenka.Client() as client:
await client.update_asset()
return JSONResponse(content={'response': 'Assets updated successfully'})
except ZZZError as e:
error_details = traceback.format_exc()
return JSONResponse(
content={'error': f'UNKNOWN ERR: {str(e)}', 'details': error_details},
status_code=500
)
# Helper function to upload the image to Cloudinary
def upload_image(data, character_id, player_id):
try:
# Set the public_id to include the character ID and the given player ID
public_id = f"{character_id}_{player_id}"
# Upload image to Cloudinary with the specified public_id
upload_result = cloudinary.uploader.upload(data, folder="card_images", public_id=public_id, invalidate=True)
# Get the secure URL of the uploaded image
return upload_result["secure_url"]
except Exception as e:
raise Exception(f"Cloudinary upload error: {str(e)}")
# Helper function to upload the image to Cloudinary without player or character ID
def upload_imagee(data):
try:
# Upload image to Cloudinary without specifying a public_id
upload_result = cloudinary.uploader.upload(data, folder="card_images", invalidate=True)
# Get the secure URL of the uploaded image
return upload_result["secure_url"]
except Exception as e:
raise Exception(f"Cloudinary upload error: {str(e)}")
# Process individual image card
def process_image(dt, player_id):
with BytesIO() as byte_io:
dt.card.save(byte_io, "PNG")
byte_io.seek(0)
# Upload the image using the character's ID and player ID
image_url = upload_image(byte_io, dt.id, player_id)
return {
"name": dt.name,
"id": dt.id,
"url": image_url
}
# Process the profile image
def process_profile(profile_card):
with BytesIO() as byte_io:
profile_card.card.save(byte_io, "PNG")
byte_io.seek(0)
# Upload the image without using the character or player ID
image_url = upload_imagee(byte_io)
return {
"url": image_url,
}
from io import BytesIO
def process_profilee(profile_card):
# Ensure profile_card contains an image
if not profile_card.cards or not isinstance(profile_card.cards, list):
raise ValueError("Invalid profile_card: 'cards' is missing or not a list.")
profile_image = profile_card.cards[0].card # Extract the PIL image from the first card
with BytesIO() as byte_io:
profile_image.save(byte_io, "PNG") # Save the image as PNG
byte_io.seek(0)
image_url = upload_imagee(byte_io) # Upload the image
return {
"url": image_url,
}
# Process all the images returned
def process_images(result, player_id):
characters = []
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [executor.submit(process_image, dt, player_id) for dt in result.card]
for future in concurrent.futures.as_completed(futures):
try:
characters.append(future.result())
except Exception as e:
print(f"Error processing image: {e}")
return characters
def process_imagess(result, player_id):
characters = []
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [executor.submit(process_image, dt, player_id) for dt in result.cards]
for future in concurrent.futures.as_completed(futures):
try:
characters.append(future.result())
except Exception as e:
print(f"Error processing image: {e}")
return characters
# ZZZ Card and Profile creation functions
async def zenless_card(uid, character_id=None, character_art=None):
config = zenka.Config(
asset_save=True,
hide_uid=False,
)
try:
# Simplified to match your example exactly
async with zenka.Client(
lang=Lang.EN,
config=config,
character_art=character_art,
character_id=character_id
) as client:
data = await client.card(uid)
return data
except ZZZError as e:
raise Exception(f"Zenless Zone Zero Error: Code:{e.code} Message: {e.text}")
except Exception as e:
# Add more detailed error information
import traceback
error_details = traceback.format_exc()
raise Exception(f"Zenless Zone Zero Error: {str(e)}\n{error_details}")
async def zenless_profile(uid):
config = zenka.Config(
asset_save=True,
hide_uid=False,
)
try:
# Simplified to match your example exactly
async with zenka.Client(
lang=Lang.EN,
config=config
) as client:
data = await client.profile(uid)
return data
except ZZZError as e:
raise Exception(f"Zenless Zone Zero Error: Code:{e.code} Message: {e.text}")
except Exception as e:
# Add more detailed error information
import traceback
error_details = traceback.format_exc()
raise Exception(f"Zenless Zone Zero Error: {str(e)}\n{error_details}")
# Add these routes to your FastAPI app
@app.get("/zenless/{uid}")
async def zenless_characters(uid: int, character_id: str = None, character_art_url: str = None):
try:
# Process character_id from string to list if provided
char_id = None
if character_id:
char_id = [int(id) if id.isdigit() else id for id in character_id.split(',')]
# Process character_art from URL to dict if provided
char_art = None
if character_id and character_art_url:
# Create a dictionary mapping character IDs to art URLs
char_ids = char_id if isinstance(char_id, list) else []
# The key in the dictionary must be a string
if char_ids:
char_art = {char_ids[0]: character_art_url}
result = await zenless_card(uid, char_id, char_art)
characters = process_imagess(result, uid)
return JSONResponse(content={'response': characters})
except Exception as e:
return JSONResponse(content={'error': 'UNKNOWN ERR: ' + str(e)}, status_code=500)
async def zenless_card(uid, character_id=None, character_art=None):
config = zenka.Config(
asset_save=True,
hide_uid=False, # Fixed typo from Falde to False
)
try:
# Make sure character_art is properly formatted as a dictionary
async with zenka.Client(
lang=Lang.EN,
config=config,
character_art=character_art, # This should be a dict like {"1121": "https://example.com/image.webp"}
character_id=character_id # This should be a list like [1151, "1121"]
) as client:
data = await client.card(uid)
return data
except ZZZError as e:
raise Exception(f"Zenless Zone Zero Error: Code:{e.code} Message: {e.text}")
except Exception as e:
import traceback
error_details = traceback.format_exc()
raise Exception(f"Zenless Zone Zero Error: {str(e)}\n{error_details}")
@app.get("/zenless/profile/{uid}")
async def zenless_profile_route(uid: int):
try:
result = await zenless_profile(uid)
profile_data = process_profilee(result)
return JSONResponse(content={'response': profile_data})
except Exception as e:
return JSONResponse(content={'error': 'UNKNOWN ERR: ' + str(e)}, status_code=500)
@app.get("/check_train_score")
async def check_train_score(ltoken_v2: str, ltuid_v2: str):
"""
Check if the Star Rail training score is at maximum (500).
Parameters:
- ltoken_v2: Your ltoken_v2 cookie value
- ltuid_v2: Your ltuid_v2 cookie value as an integer
Returns:
- "yes" if train score is 500
- "no" if train score is less than 500
"""
try:
# Convert ltuid_v2 to int if it's passed as a string
ltuid_v2_int = int(ltuid_v2)
# Set up cookies for authentication
cookies = {
"ltoken_v2": ltoken_v2,
"ltuid_v2": ltuid_v2_int
}
# Initialize genshin client
client = genshin.Client(cookies)
# Fetch Star Rail notes data
data = await client.get_starrail_notes()
# Check train score
if data.current_train_score >= 500:
return "yes"
else:
return "no"
except genshin.errors.InvalidCookies:
raise HTTPException(status_code=401, detail="Invalid cookies provided")
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error checking train score: {str(e)}")
class GameOptions(str, Enum):
STARRAIL = "hsr"
GENSHIN = "genshin"
HONKAI = "hi3rd"
ZZZ = "zenless"
class RewardInfo(BaseModel):
name: str
amount: int
icon: str = None
@app.get("/claim_daily_reward", response_model=dict)
async def claim_daily_reward(
ltoken_v2: str,
ltuid_v2: str,
game: GameOptions = Query(..., description="Game to claim rewards for")
):
"""
Claim daily reward for selected Hoyoverse game.
Parameters:
- ltoken_v2: Your ltoken_v2 cookie value
- ltuid_v2: Your ltuid_v2 cookie value as an integer
- game: Game to claim rewards for (hsr, genshin, hi3rd, zenless)
Returns:
- Claim result information including reward details
"""
try:
# Convert ltuid_v2 to int if it's passed as a string
ltuid_v2_int = int(ltuid_v2)
# Set up cookies for authentication
cookies = {
"ltoken_v2": ltoken_v2,
"ltuid_v2": ltuid_v2_int
}
# Initialize genshin client
client = genshin.Client(cookies)
# Map the game option to genshin library enum
game_map = {
GameOptions.STARRAIL: genshin.Game.STARRAIL,
GameOptions.GENSHIN: genshin.Game.GENSHIN,
GameOptions.HONKAI: genshin.Game.HONKAI,
GameOptions.ZZZ: genshin.Game.ZZZ
}
selected_game = game_map[game]
# Claim daily reward
result = await client.claim_daily_reward(game=selected_game)
# Parse reward information
reward = {
"name": getattr(result, "name", "Unknown"),
"amount": getattr(result, "amount", 0),
"icon": getattr(result, "icon", None)
}
return {
"success": True,
"message": f"Successfully claimed daily reward for {game}",
"reward": reward
}
except genshin.errors.InvalidCookies:
raise HTTPException(status_code=401, detail="Invalid cookies provided")
except genshin.errors.AlreadyClaimed:
return {
"success": False,
"message": f"Daily reward for {game} already claimed today"
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error claiming daily reward: {str(e)}")
auth = md.auth.Auth()
@app.get("/mangadex")
async def fetch_and_upload_pdf(
chapter_id: str = Query(..., description="Comma-separated MangaDex chapter IDs to fetch images for"),
as_pdf: bool = Query(True, description="Set to False to return image URLs without creating a PDF"),
page: str = Query(None, description="Specific page(s) to fetch, e.g., '1' or '1-5'")
):
"""
Fetch images from specified chapters and optionally combine them into a single PDF.
"""
try:
chapter_ids = chapter_id.split(",")
all_images = []
# Fetch images for each chapter
chapter = md.series.Chapter(auth=auth)
for chapter_id in chapter_ids:
chapter_data = chapter.get_chapter_by_id(chapter_id=chapter_id)
images = chapter_data.fetch_chapter_images()
if not images:
raise HTTPException(status_code=404, detail=f"No images found for chapter ID {chapter_id}.")
all_images.extend(images)
# Filter images by page if applicable
if page:
pages = []
try:
if '-' in page:
start, end = map(int, page.split('-'))
pages = list(range(start, end + 1))
else:
pages = [int(page)]
except ValueError:
raise HTTPException(status_code=400, detail="Invalid page query format.")
all_images = [all_images[i - 1] for i in pages if 1 <= i <= len(all_images)]
if not all_images:
raise HTTPException(status_code=404, detail="No images found for the selected pages.")
# Return image URLs if not converting to PDF
if not as_pdf:
return JSONResponse(content={"image_urls": all_images})
# Generate PDF from images
with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as temp_pdf:
pdf_path = temp_pdf.name
c = canvas.Canvas(pdf_path)
async with httpx.AsyncClient() as client:
for image_url in all_images:
response = await client.get(image_url)
response.raise_for_status()
# Save image to temporary file
with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as temp_image:
temp_image.write(response.content)
temp_image_path = temp_image.name
# Process image
img = PILImage.open(temp_image_path)
img = img.convert("RGB")
# Save a converted image (JPEG format)
converted_image_path = temp_image_path + "_converted.jpg"
img.save(converted_image_path, "JPEG")
# Get image dimensions
width, height = img.size
# Add image to PDF
c.setPageSize((width, height))
c.drawImage(converted_image_path, 0, 0, width, height)
c.showPage()
# Clean up temporary files
os.remove(temp_image_path)
os.remove(converted_image_path)
c.save()
# Compress the PDF
compressed_pdf_path = f"static/{chapter_id}_compressed.pdf"
with pikepdf.open(pdf_path) as pdf:
pdf.save(compressed_pdf_path)
# Clean up temporary PDF file
os.remove(pdf_path)
# Return the URL to the compressed PDF
return JSONResponse(content={"file_url": f"/static/{chapter_id}_compressed.pdf"})
except Exception as e:
return JSONResponse(content={"error": f"Failed to process the chapters: {str(e)}"}, status_code=500)
@app.get("/name")
async def fetch_manga_details(
title: str = Query(..., description="Manga title to search for")
):
"""
Fetch manga details by title from MangaDex.
"""
try:
manga = md.series.Manga(auth=auth)
manga_list = manga.get_manga_list(title=title)
if not manga_list:
return JSONResponse(
content={"error": f"No manga found with the title '{title}'"},
status_code=404
)
manga_details = []
for m in manga_list:
manga_details.append({
"id": m.manga_id,
"title": m.title.get("en", "No English Title"),
"alt_titles": m.altTitles,
"description": m.description.get("en", "No English Description"),
"status": m.status,
"content_rating": m.contentRating,
"last_volume": m.lastVolume,
"last_chapter": m.lastChapter,
"year": m.year,
"original_language": m.originalLanguage,
"created_at": str(m.createdAt) if m.createdAt else None,
"uploaded_at": str(getattr(m, "uploadedAt", None))
})
return JSONResponse(content={"manga_list": manga_details})
except Exception as e:
return JSONResponse(
content={"error": f"Failed to fetch manga details: {str(e)}"},
status_code=500
)
cookies = {"ltuid_v2": os.getenv("ltuid_v2"), "ltoken_v2": os.getenv("ltoken_v2"),"ltmid_v2":os.getenv("ltmid_v2"),"cookie_token_v2":os.getenv("cookie_token_v2")}
client = Client(cookies)
# Helper function to format data
def format_user_data(user):
try:
# Info section
info = f"• Info •\n"
info += f"{getattr(user.stats, 'days_active', 'N/A')} Active Days\n"
info += f"{getattr(user.stats, 'achievements', 'N/A')} Achievements\n"
info += f"{len(user.characters) if hasattr(user, 'characters') and isinstance(user.characters, list) else 'N/A'} Characters\n"
info += f"{getattr(user.stats, 'unlocked_waypoints', 'N/A')} Waypoints\n"
info += f"{getattr(user.stats, 'unlocked_domains', 'N/A')} Domains\n"
info += f"{getattr(user.stats, 'dendroculi', 'N/A')} Dendroculus\n"
info += f"{getattr(user.stats, 'electroculi', 'N/A')} Electroculus\n"
info += f"{getattr(user.stats, 'anemoculi', 'N/A')} Anemoculus\n"
info += f"{getattr(user.stats, 'geoculi', 'N/A')} Geoculus\n"
info += f"Spiral Abyss {getattr(user.stats, 'spiral_abyss', 'N/A')}\n"
# Chests section
chests = f"• Chests •\n"
chests += f"{getattr(user.stats, 'remarkable_chests', 'N/A')} Remarkable\n"
chests += f"{getattr(user.stats, 'luxurious_chests', 'N/A')} Luxurious\n"
chests += f"{getattr(user.stats, 'precious_chests', 'N/A')} Precious\n"
chests += f"{getattr(user.stats, 'exquisite_chests', 'N/A')} Exquisite\n"
chests += f"{getattr(user.stats, 'common_chests', 'N/A')} Common\n"
# Exploration section
exploration = f"• Exploration •\n"
if hasattr(user, 'explorations') and isinstance(user.explorations, list):
for exploration_data in user.explorations:
percent = f"{getattr(exploration_data, 'raw_explored', 0) / 10.0:.1f}%"
exploration += f"{getattr(exploration_data, 'name', 'Unknown')} {percent}\n"
if hasattr(exploration_data, 'offerings') and isinstance(exploration_data.offerings, list):
for offering in exploration_data.offerings:
exploration += f" {getattr(offering, 'name', 'Unknown')} level {getattr(offering, 'level', 'N/A')}\n"
else:
exploration += "No exploration data available.\n"
# Teapot section
teapot = f"• Teapot •\n"
if hasattr(user.teapot, 'realms') and isinstance(user.teapot.realms, list) and len(user.teapot.realms) > 0:
teapot += f"{getattr(user.teapot.realms[0], 'name', 'Unknown')}\n"
teapot += f"Level {getattr(user.teapot, 'level', 'N/A')} ({getattr(user.teapot, 'comfort_name', 'Unknown')})\n"
teapot += f"Comfort {getattr(user.teapot, 'comfort', 'N/A')}\n"
teapot += f"Visits {len(user.teapot.visitors) if hasattr(user.teapot, 'visitors') and isinstance(user.teapot.visitors, list) else 'N/A'}\n"
teapot += f"Items {getattr(user.teapot, 'items', 'N/A')}\n"
return f"{info}\n{chests}\n{exploration}\n{teapot}"
except Exception as e:
# Log unexpected issues in the formatting process
return f"Error formatting user data: {e}"
@app.get("/user/{uid}")
async def get_user_info(uid: int):
try:
# Fetch user data from the client
user = await client.get_genshin_user(uid)
# Format user data
formatted_data = format_user_data(user)
return {"uid": uid, "data": formatted_data}
except Exception as e:
# Log and return any errors encountered during API call or formatting
return {"error": str(e)}
async def fetch_spiral_abyss_data(uid: int, previous: bool = False):
"""Fetches Spiral Abyss data using the genshin API."""
return await client.get_spiral_abyss(uid, previous=previous)
def format_abyss_data(data):
"""Formats the Spiral Abyss data into the desired format."""
def format_rank(rank):
"""Formats a rank entry with its value and character name."""
return f"{rank[0].value} ({rank[0].name})" if rank else "N/A"
return {
"Season": data.season,
"Start Time": data.start_time.isoformat(),
"End Time": data.end_time.isoformat(),
"Total Battles": data.total_battles,
"Total Wins": data.total_wins,
"Max Floor": data.max_floor,
"Total Stars": data.total_stars,
"Ranks": {
"Most Played": [f"{char.name} ({char.value})" for char in data.ranks.most_played],
"Most Kills": format_rank(data.ranks.most_kills),
"Strongest Strike": format_rank(data.ranks.strongest_strike),
"Most Damage Taken": format_rank(data.ranks.most_damage_taken),
"Most Bursts Used": format_rank(data.ranks.most_bursts_used),
"Most Skills Used": format_rank(data.ranks.most_skills_used),
},
"Floors": [
{
"Floor": floor.floor,
"Unlocked": floor.unlocked,
"Stars": f"{floor.stars}/{floor.max_stars}",
"Chambers": [
{
"Chamber": chamber.chamber,
"Stars": f"{chamber.stars}/{chamber.max_stars}",
}
for chamber in floor.chambers
],
}
for floor in data.floors
],
}
@app.get("/spiral-abyss/")
async def get_spiral_abyss(
id: int = Query(...),
previous: bool = Query(False)
):
"""
Fetch and return Spiral Abyss data for the given user ID.
Query parameters:
- `id`: The user ID to fetch data for.
- `previous`: Whether to fetch the previous season's data (default: False).
"""
try:
data = await fetch_spiral_abyss_data(id, previous)
formatted_data = format_abyss_data(data)
return formatted_data
except Exception as e:
return {"error": str(e)}
CHARACTER_MAPPING = {
8001: "Trailblazer (Physical)",
8002: "Trailblazer (Physical)",
8003: "Trailblazer (Fire)",
8004: "Trailblazer (Fire)",
8005: "Trailblazer (Imaginary)",
8006: "Trailblazer (Imaginary)",
8007: "Trailblazer (Ice)",
8008: "Trailblazer (Ice)",
1310: "Firefly",
1212: "Jingliu",
2212: "Jingliu",
1303: "Ruan Mei",
1208: "Fu Xuan",
1305: "Dr. Ratio",
1301: "Gallagher",
1306: "Sparkle",
1102: "Seele",
1205: "Blade",
2205: "Blade",
1006: "Silver Wolf",
2006: "Silver Wolf",
1005: "Kafka",
2005: "Kafka",
1307: "Black Swan",
1309: "Robin",
1112: "Topaz",
1211: "Bailu",
1107: "Clara",
1003: "Himeko",
1004: "Welt",
1221: "Yunli",
1104: "Gepard",
1101: "Bronya",
1203: "Luocha",
1204: "Jing Yuan",
1002: "Dan Heng",
1213: "Dan Heng IL",
1001: "March 7th",
1224: "March 7th (Hunt)",
1217: "Huohuo",
1302: "Argenti",
1009: "Asta",
1013: "Herta",
1401: "The Herta",
1103: "Serval",
1105: "Natasha",
1106: "Pela",
1108: "Sampo",
1109: "Hook",
1110: "Lynx",
1111: "Luka",
1201: "Qingque",
1202: "Tingyun",
1206: "Sushang",
1207: "Yukong",
1209: "Yanqing",
1314: "Jade",
1210: "Guinaifen",
1214: "Xueyi",
1215: "Hanya",
1312: "Misha",
1308: "Acheron",
1304: "Aventurine",
1315: "Boothill",
1218: "Jiaoqiu",
1228: "Moze",
1220: "Feixiao",
1222: "Lingsha",
1313: "Sunday",
1225: "Fugue",
1402: "Aglaea",
1403: "Tribbie",
1404: "Mydei",
1317: "Rappa",
1407: "Castorice",
1405: "Anaxa",
1409: "Hyacine",
1406: "Cipher",
1408: "Phainon",
1410: "Hysilens",
1412: "Cerydra",
1014: "Saber",
1015: "Archer"
}
def get_character_name(char_id: int) -> str:
return CHARACTER_MAPPING.get(char_id, f"Unknown Character ({char_id})")
def format_eidolon(rank: int) -> str:
return f"E{rank}"
def get_element_emoji(element: str) -> str:
emoji_map = {
"physical": "⚪",
"fire": "🔥",
"ice": "❄️",
"lightning": "⚡",
"wind": "🌪️",
"quantum": "🔮",
"imaginary": "🌌"
}
return emoji_map.get(element.lower(), "❓")
def get_rarity_stars(rarity: int) -> str:
return "⭐" * rarity
@app.get("/apc/{uid}")
async def get_apc_data(
uid: int,
):
"""Get Apocalyptic Shadow data for a specific UID"""
try:
coookies = {"ltuid_v2": os.getenv("ltuid_v2"), "ltoken_v2": os.getenv("ltoken_v2"),"ltmid_v2":os.getenv("ltmid_v2"),"cookie_token_v2":os.getenv("cookie_token_v2")}
cliient = genshin.Client(coookies, uid=uid)
apc_data = await cliient.get_starrail_apc_shadow(uid=uid)
# Format the data
formatted_data = {
"uid": uid,
"summary": {
"total_stars": apc_data.total_stars,
"max_floor": apc_data.max_floor,
"total_battles": apc_data.total_battles,
"has_data": apc_data.has_data
},
"current_season": None,
"floors": []
}
# Get current season info
if apc_data.seasons:
current_season = next((s for s in apc_data.seasons if s.status == "New"), None)
if current_season:
formatted_data["current_season"] = {
"name": current_season.name,
"status": current_season.status,
"begin_time": str(current_season.begin_time),
"end_time": str(current_season.end_time),
"upper_boss": current_season.upper_boss.name_mi18n if current_season.upper_boss else None,
"lower_boss": current_season.lower_boss.name_mi18n if current_season.lower_boss else None
}
# Format floors
for floor in apc_data.floors:
floor_data = {
"id": floor.id,
"name": floor.name,
"stars": floor.star_num,
"is_quick_clear": floor.is_quick_clear,
"last_update": str(floor.last_update_time) if floor.last_update_time else None,
"nodes": []
}
for i, node in enumerate([floor.node_1, floor.node_2], 1):
if node and node.avatars:
node_data = {
"node_number": i,
"challenge_time": str(node.challenge_time) if node.challenge_time else None,
"score": node.score,
"boss_defeated": node.boss_defeated,
"buff": {
"name": node.buff.name,
"description": node.buff.description
} if node.buff else None,
"team": []
}
for char in node.avatars:
char_name = get_character_name(char.id)
node_data["team"].append({
"id": char.id,
"name": char_name,
"eidolon": format_eidolon(char.rank),
"level": char.level,
"element": char.element,
"rarity": char.rarity,
"display_name": f"{format_eidolon(char.rank)} {char_name}"
})
floor_data["nodes"].append(node_data)
formatted_data["floors"].append(floor_data)
return formatted_data
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error fetching data: {str(e)}")
if __name__ == "__main__":
uvicorn.run("main:app", host="0.0.0.0", port=7860, workers=8, timeout_keep_alive=60000)