Spaces:
Paused
Paused
| from fastapi import FastAPI, HTTPException, BackgroundTasks | |
| from fastapi.staticfiles import StaticFiles | |
| from fastapi.responses import JSONResponse, FileResponse | |
| from typing import Dict, Optional, Literal | |
| from dataclasses import dataclass, asdict | |
| import httpx | |
| import asyncio | |
| from functools import wraps | |
| import zipfile | |
| import os | |
| import akinator | |
| import json | |
| import os | |
| import time | |
| from typing import Dict, Optional | |
| from datetime import datetime, timedelta | |
| from fastapi import HTTPException, Query | |
| from pydantic import BaseModel | |
| from typing import Literal | |
| import threading | |
| import pickle | |
| import random | |
| import shutil | |
| from pathlib import Path | |
| from typing import Optional, List | |
| import uuid | |
| from datetime import datetime, timedelta | |
| import logging | |
| import asyncio | |
| from zenka import zenka, ZZZError, CacheConfig, Lang | |
| import os | |
| from enum import Enum | |
| from enkacard import encbanner | |
| from enkanetwork import EnkaNetworkAPI | |
| from starrailcard.src.api import enka | |
| import concurrent.futures | |
| import requests | |
| import traceback | |
| from fastapi import FastAPI,Query | |
| from io import BytesIO | |
| from fastapi.responses import JSONResponse | |
| import enkacard | |
| import starrailcard | |
| import enkanetwork | |
| import uvicorn | |
| import cloudinary | |
| import cloudinary.uploader | |
| from cloudinary.utils import cloudinary_url | |
| import pydantic | |
| from pydantic import BaseModel | |
| import genshin | |
| from packaging import version | |
| import mangadex as md | |
| from fastapi import FastAPI, Query, HTTPException | |
| from fastapi.responses import JSONResponse, FileResponse | |
| from fastapi.staticfiles import StaticFiles | |
| from reportlab.pdfgen import canvas | |
| import tempfile | |
| from PIL import Image as PILImage | |
| import httpx | |
| import pikepdf | |
| from pathlib import Path | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| from genshin import Client | |
| import logging | |
| import json | |
| import threading | |
| import time | |
| from datetime import datetime, timedelta | |
| from pathlib import Path | |
| from typing import Dict, Optional | |
| from dataclasses import dataclass, asdict | |
| import akinator | |
| from fastapi import HTTPException, Query | |
| from pydantic import BaseModel | |
| from typing import Literal | |
| import asyncio | |
| from functools import wraps | |
| app = FastAPI(title="my api", version="1.0.0") | |
| # Create directories | |
| STATIC_DIR = Path("static") | |
| TEMP_DIR = Path("temp") | |
| IMAGES_DIR = STATIC_DIR / "images" | |
| ZIPS_DIR = STATIC_DIR / "zips" | |
| for directory in [STATIC_DIR, TEMP_DIR, IMAGES_DIR, ZIPS_DIR]: | |
| directory.mkdir(exist_ok=True) | |
| # Mount static files | |
| app.mount("/static", StaticFiles(directory="static"), name="static") | |
| Path("static").mkdir(exist_ok=True) | |
| # Configuration | |
| MAX_IMAGES = 1000 | |
| MAX_INDIVIDUAL_IMAGES = 10 | |
| SAFEBOORU_API_URL = "https://safebooru.org/index.php" | |
| CLEANUP_INTERVAL_HOURS = 24 | |
| class SafebooruService: | |
| def __init__(self): | |
| self.session = None | |
| async def get_session(self): | |
| if self.session is None: | |
| self.session = httpx.AsyncClient(timeout=30.0) | |
| return self.session | |
| async def search_images(self, tags: str, limit: int = 1000) -> List[dict]: | |
| """Search images from Safebooru API""" | |
| session = await self.get_session() | |
| params = { | |
| "page": "dapi", | |
| "s": "post", | |
| "q": "index", | |
| "json": "1", | |
| "limit": limit, | |
| "tags": tags.replace(" ", "_") | |
| } | |
| try: | |
| response = await session.get(SAFEBOORU_API_URL, params=params) | |
| response.raise_for_status() | |
| data = response.json() | |
| if not data: | |
| return [] | |
| # Filter for supported image formats | |
| filtered_data = [ | |
| img for img in data | |
| if img.get("image", "").lower().endswith((".jpg", ".jpeg", ".png")) | |
| ] | |
| return filtered_data | |
| except Exception as e: | |
| logger.error(f"Error searching images: {e}") | |
| raise HTTPException(status_code=500, detail="Failed to search images") | |
| async def download_image(self, img_data: dict, filename: str, max_retries: int = 3) -> Optional[str]: | |
| """Download image with retry mechanism""" | |
| session = await self.get_session() | |
| image_url = f"https://safebooru.org/images/{img_data['directory']}/{img_data['image']}" | |
| for attempt in range(max_retries): | |
| try: | |
| if attempt > 0: | |
| await asyncio.sleep(1 * attempt) # Exponential backoff | |
| response = await session.get(image_url) | |
| response.raise_for_status() | |
| file_path = IMAGES_DIR / filename | |
| with open(file_path, "wb") as f: | |
| f.write(response.content) | |
| logger.info(f"Downloaded: {filename}") | |
| return str(file_path) | |
| except Exception as e: | |
| logger.error(f"Download attempt {attempt + 1} failed for {filename}: {e}") | |
| if attempt == max_retries - 1: | |
| logger.error(f"Failed to download {filename} after {max_retries} attempts") | |
| return None | |
| return None | |
| safebooru_service = SafebooruService() | |
| async def cleanup_old_files(): | |
| """Clean up files older than 24 hours""" | |
| cutoff_time = datetime.now() - timedelta(hours=CLEANUP_INTERVAL_HOURS) | |
| for directory in [IMAGES_DIR, ZIPS_DIR]: | |
| for file_path in directory.iterdir(): | |
| if file_path.is_file(): | |
| file_time = datetime.fromtimestamp(file_path.stat().st_mtime) | |
| if file_time < cutoff_time: | |
| try: | |
| file_path.unlink() | |
| logger.info(f"Cleaned up old file: {file_path}") | |
| except Exception as e: | |
| logger.error(f"Failed to clean up {file_path}: {e}") | |
| async def startup_event(): | |
| """Startup tasks""" | |
| logger.info("FastAPI Safebooru Clone starting up...") | |
| # Initial cleanup | |
| await cleanup_old_files() | |
| async def shutdown_event(): | |
| """Cleanup on shutdown""" | |
| if safebooru_service.session: | |
| await safebooru_service.session.aclose() | |
| async def root(): | |
| """API Documentation""" | |
| return { | |
| "message": "Safebooru API Clone", | |
| "endpoints": { | |
| "/search": "Search and get images", | |
| "/cleanup": "Manual cleanup of old files", | |
| "/health": "Health check" | |
| }, | |
| "usage": { | |
| "individual_images": "/search?tags=hakurei_reimu&count=5", | |
| "zip_file": "/search?tags=hakurei_reimu&count=20" | |
| } | |
| } | |
| async def search_safebooru( | |
| tags: str, | |
| count: Optional[int] = 5, | |
| background_tasks: BackgroundTasks = None | |
| ): | |
| """ | |
| Search Safebooru and return direct links to images | |
| - **tags**: Search tags (space-separated, will be converted to underscores) | |
| - **count**: Number of images to return (1-50, default: 5) | |
| """ | |
| if not tags.strip(): | |
| raise HTTPException(status_code=400, detail="Tags parameter is required") | |
| # Validate count | |
| if count < 1: | |
| count = 1 | |
| elif count > MAX_IMAGES: | |
| count = MAX_IMAGES | |
| try: | |
| # Search for images | |
| logger.info(f"Searching for tags: {tags}, count: {count}") | |
| images_data = await safebooru_service.search_images(tags, limit=1000) | |
| if not images_data: | |
| raise HTTPException(status_code=404, detail=f"No results found for tags: {tags}") | |
| # Shuffle and select requested number of images | |
| random.shuffle(images_data) | |
| selected_images = images_data[:count] | |
| # Generate unique session ID | |
| session_id = str(uuid.uuid4()) | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| if count <= MAX_INDIVIDUAL_IMAGES: | |
| # Return individual image links | |
| image_links = [] | |
| download_tasks = [] | |
| for i, img_data in enumerate(selected_images): | |
| filename = f"{session_id}_{i+1}_{img_data['image']}" | |
| download_tasks.append( | |
| safebooru_service.download_image(img_data, filename) | |
| ) | |
| # Download all images concurrently | |
| downloaded_paths = await asyncio.gather(*download_tasks, return_exceptions=True) | |
| for i, path in enumerate(downloaded_paths): | |
| if isinstance(path, str) and path: | |
| relative_path = Path(path).relative_to(STATIC_DIR) | |
| image_links.append({ | |
| "index": i + 1, | |
| "filename": Path(path).name, | |
| "url": f"/static/{relative_path}", | |
| "direct_link": f"https://tachibanaa710-lensfastapi.hf.space/static/{relative_path}" | |
| }) | |
| # Schedule cleanup | |
| if background_tasks: | |
| background_tasks.add_task(cleanup_old_files) | |
| return { | |
| "query": tags, | |
| "total_found": len(images_data), | |
| "requested_count": count, | |
| "delivered_count": len(image_links), | |
| "type": "individual_images", | |
| "images": image_links, | |
| "session_id": session_id | |
| } | |
| else: | |
| # Create ZIP file for multiple images | |
| zip_filename = f"safebooru_{tags.replace(' ', '_')}_{count}images_{timestamp}_{session_id[:8]}.zip" | |
| zip_path = ZIPS_DIR / zip_filename | |
| # Create temporary directory for downloads | |
| temp_session_dir = TEMP_DIR / session_id | |
| temp_session_dir.mkdir(exist_ok=True) | |
| try: | |
| # Download images to temp directory | |
| download_tasks = [] | |
| for i, img_data in enumerate(selected_images): | |
| temp_filename = f"{i+1:02d}_{img_data['image']}" | |
| download_tasks.append( | |
| download_image_to_temp(img_data, temp_session_dir / temp_filename) | |
| ) | |
| downloaded_files = await asyncio.gather(*download_tasks, return_exceptions=True) | |
| successful_downloads = [f for f in downloaded_files if isinstance(f, str) and f] | |
| if not successful_downloads: | |
| raise HTTPException(status_code=500, detail="Failed to download any images") | |
| # Create ZIP file | |
| with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: | |
| for file_path in successful_downloads: | |
| file_path_obj = Path(file_path) | |
| if file_path_obj.exists(): | |
| zipf.write(file_path_obj, file_path_obj.name) | |
| # Cleanup temp directory | |
| shutil.rmtree(temp_session_dir, ignore_errors=True) | |
| # Get file size | |
| file_size_mb = zip_path.stat().st_size / (1024 * 1024) | |
| # Schedule cleanup | |
| if background_tasks: | |
| background_tasks.add_task(cleanup_old_files) | |
| return { | |
| "query": tags, | |
| "total_found": len(images_data), | |
| "requested_count": count, | |
| "delivered_count": len(successful_downloads), | |
| "type": "zip_file", | |
| "zip_info": { | |
| "filename": zip_filename, | |
| "size_mb": round(file_size_mb, 2), | |
| "url": f"/static/zips/{zip_filename}", | |
| "direct_link": f"https://tachibanaa710-lensfastapi.hf.space/static/zips/{zip_filename}", | |
| "download_url": f"/download/{zip_filename}" | |
| }, | |
| "session_id": session_id | |
| } | |
| except Exception as e: | |
| # Cleanup on error | |
| shutil.rmtree(temp_session_dir, ignore_errors=True) | |
| if zip_path.exists(): | |
| zip_path.unlink() | |
| raise HTTPException(status_code=500, detail=f"Failed to create ZIP file: {str(e)}") | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Unexpected error in search endpoint: {e}") | |
| raise HTTPException(status_code=500, detail="Internal server error") | |
| async def download_image_to_temp(img_data: dict, file_path: Path, max_retries: int = 3) -> Optional[str]: | |
| """Download image to temporary location""" | |
| session = await safebooru_service.get_session() | |
| image_url = f"https://safebooru.org/images/{img_data['directory']}/{img_data['image']}" | |
| for attempt in range(max_retries): | |
| try: | |
| if attempt > 0: | |
| await asyncio.sleep(1 * attempt) | |
| response = await session.get(image_url) | |
| response.raise_for_status() | |
| with open(file_path, "wb") as f: | |
| f.write(response.content) | |
| return str(file_path) | |
| except Exception as e: | |
| logger.error(f"Download attempt {attempt + 1} failed: {e}") | |
| if attempt == max_retries - 1: | |
| return None | |
| return None | |
| async def download_zip(filename: str): | |
| """Download ZIP file endpoint""" | |
| zip_path = ZIPS_DIR / filename | |
| if not zip_path.exists(): | |
| raise HTTPException(status_code=404, detail="File not found") | |
| return FileResponse( | |
| path=zip_path, | |
| filename=filename, | |
| media_type="application/zip" | |
| ) | |
| async def health_check(): | |
| """Health check endpoint""" | |
| return { | |
| "status": "healthy", | |
| "timestamp": datetime.now().isoformat(), | |
| "images_count": len(list(IMAGES_DIR.glob("*"))), | |
| "zips_count": len(list(ZIPS_DIR.glob("*"))) | |
| } | |
| async def manual_cleanup(): | |
| """Manually trigger cleanup of old files""" | |
| await cleanup_old_files() | |
| return { | |
| "message": "Cleanup completed", | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| pydantic_version = version.parse(pydantic.__version__) | |
| if pydantic_version.major >= 2: | |
| # Use Pydantic v2-compatible imports or replacements | |
| print("Running with Pydantic v2") | |
| # Add any required adjustments for Pydantic v2 here | |
| else: | |
| # Use Pydantic v1-compatible imports or replacements | |
| print("Running with Pydantic v1") | |
| # No changes needed for older code compatible with v1 | |
| # Cloudinary configuration | |
| cloudinary.config( | |
| cloud_name=os.getenv("cloudname"), | |
| api_key=os.getenv("key"), | |
| api_secret=os.getenv("secret"), | |
| secure=True | |
| ) | |
| # Genshin Impact card creation | |
| async def genshin_card(id, designtype,character_id=None, character_art_url=None): | |
| # Use the provided character ID and character art URL from user input, if available | |
| character_art = {str(character_id): character_art_url} if character_id and character_art_url else None | |
| async with encbanner.ENC(uid=str(id), character_art=character_art) as encard: | |
| return await encard.creat(template=(2 if str(designtype) == "2" else 1)) | |
| # Star Rail card creation with optional character ID and cookies | |
| async def starrail_card(id, designtype, character_id=None, character_art_url=None, ltmid_v2=None, ltoken_v2=None, ltuid_v2=None): | |
| character_art = {str(character_id): character_art_url} if character_id and character_art_url else None | |
| # Use cookies if provided | |
| if ltmid_v2 and ltoken_v2 and ltuid_v2: | |
| cookie = { | |
| "ltmid_v2": ltmid_v2, | |
| "ltoken_v2": ltoken_v2, | |
| "ltuid_v2": ltuid_v2 | |
| } | |
| async with starrailcard.HoYoCard(cookie=cookie,seeleland=True, remove_logo=True, character_art=character_art,boost_speed = True) as card: | |
| return await card.create(id,force_update = True, style=(2 if str(designtype) == "2" else 1)) | |
| else: | |
| # Fallback to the existing process | |
| async with starrailcard.Card(seeleland=True, remove_logo=True, character_art=character_art,boost_speed = True,enka=True) as card: | |
| return await card.create(id,force_update = True, style=(2 if str(designtype) == "2" else 1)) | |
| # Star Rail profile creation | |
| async def starrail_profile(id): | |
| async with starrailcard.Card(remove_logo=True, seeleland=True,boost_speed = True,enka=True) as card: | |
| return await card.create_profile(id,force_update = True, style=2) | |
| # Genshin profile creation | |
| async def genshinprofile(id): | |
| async with encbanner.ENC(uid=id) as encard: | |
| return await encard.profile(card=True) | |
| # Route for Genshin Impact | |
| async def genshin_characters(id: int, design: str = "1", character_id: int = None, character_art_url: str = None): | |
| try: | |
| result = await genshin_card(id, design, character_id, character_art_url) | |
| characters = process_images(result, id) | |
| return JSONResponse(content={'response': characters}) | |
| except enkanetwork.exception.VaildateUIDError: | |
| return JSONResponse(content={'error': 'Invalid UID. Please check your UID.'}, status_code=400) | |
| except enkacard.enc_error.ENCardError: | |
| return JSONResponse(content={'error': 'Enable display of the showcase in the game or add characters there.'}, status_code=400) | |
| except Exception as e: | |
| return JSONResponse(content={'error': 'UNKNOWN ERR: ' + str(e)}, status_code=500) | |
| # Route for Star Rail with optional character ID | |
| async def starrail_characters( | |
| id: int, | |
| design: str = "1", | |
| character_id: int = None, | |
| character_art_url: str = None, | |
| ltmid_v2: str = None, | |
| ltoken_v2: str = None, | |
| ltuid_v2: str = None | |
| ): | |
| try: | |
| # Call starrail_card with cookies if provided | |
| result = await starrail_card(id, design, character_id, character_art_url, ltmid_v2, ltoken_v2, ltuid_v2) | |
| characters = process_images(result, id) | |
| return JSONResponse(content={'response': characters}) | |
| except Exception as e: | |
| return JSONResponse(content={'error': 'UNKNOWN ERR: ' + str(e)}, status_code=500) | |
| # Route for Star Rail profile | |
| async def starrail_profile_route(id: int): | |
| try: | |
| result = await starrail_profile(id) | |
| profile_data = process_profile(result) | |
| return JSONResponse(content={'response': profile_data}) | |
| except Exception as e: | |
| return JSONResponse(content={'error': 'UNKNOWN ERR: ' + str(e)}, status_code=500) | |
| # Route for Genshin profile | |
| async def genshin_profile_route(id: int): | |
| try: | |
| result = await genshinprofile(id) | |
| profile_data = process_profile(result) | |
| return JSONResponse(content={'response': profile_data}) | |
| except Exception as e: | |
| return JSONResponse(content={'error': 'UNKNOWN ERR: ' + str(e)}, status_code=500) | |
| def root(): | |
| return "خدتك عليه" | |
| def root(): | |
| return "خدتك عليه" | |
| async def update(): | |
| try: | |
| # Update assets using EnkaNetworkAPI and ApiEnkaNetwork | |
| async def update_assets() -> None: | |
| async with EnkaNetworkAPI() as client: | |
| await client.update_assets() | |
| await enka.ApiEnkaNetwork().update_assets() | |
| await asyncio.create_task(update_assets()) | |
| return JSONResponse(content={'response': 'Assets updated successfully'}) | |
| except Exception as e: | |
| error_details = traceback.format_exc() | |
| return JSONResponse( | |
| content={'error': f'UNKNOWN ERR: {str(e)}', 'details': error_details}, | |
| status_code=500 | |
| ) | |
| async def maiin(): | |
| try: | |
| async with zenka.Client() as client: | |
| await client.update_asset() | |
| return JSONResponse(content={'response': 'Assets updated successfully'}) | |
| except ZZZError as e: | |
| error_details = traceback.format_exc() | |
| return JSONResponse( | |
| content={'error': f'UNKNOWN ERR: {str(e)}', 'details': error_details}, | |
| status_code=500 | |
| ) | |
| # Helper function to upload the image to Cloudinary | |
| def upload_image(data, character_id, player_id): | |
| try: | |
| # Set the public_id to include the character ID and the given player ID | |
| public_id = f"{character_id}_{player_id}" | |
| # Upload image to Cloudinary with the specified public_id | |
| upload_result = cloudinary.uploader.upload(data, folder="card_images", public_id=public_id, invalidate=True) | |
| # Get the secure URL of the uploaded image | |
| return upload_result["secure_url"] | |
| except Exception as e: | |
| raise Exception(f"Cloudinary upload error: {str(e)}") | |
| # Helper function to upload the image to Cloudinary without player or character ID | |
| def upload_imagee(data): | |
| try: | |
| # Upload image to Cloudinary without specifying a public_id | |
| upload_result = cloudinary.uploader.upload(data, folder="card_images", invalidate=True) | |
| # Get the secure URL of the uploaded image | |
| return upload_result["secure_url"] | |
| except Exception as e: | |
| raise Exception(f"Cloudinary upload error: {str(e)}") | |
| # Process individual image card | |
| def process_image(dt, player_id): | |
| with BytesIO() as byte_io: | |
| dt.card.save(byte_io, "PNG") | |
| byte_io.seek(0) | |
| # Upload the image using the character's ID and player ID | |
| image_url = upload_image(byte_io, dt.id, player_id) | |
| return { | |
| "name": dt.name, | |
| "id": dt.id, | |
| "url": image_url | |
| } | |
| # Process the profile image | |
| def process_profile(profile_card): | |
| with BytesIO() as byte_io: | |
| profile_card.card.save(byte_io, "PNG") | |
| byte_io.seek(0) | |
| # Upload the image without using the character or player ID | |
| image_url = upload_imagee(byte_io) | |
| return { | |
| "url": image_url, | |
| } | |
| from io import BytesIO | |
| def process_profilee(profile_card): | |
| # Ensure profile_card contains an image | |
| if not profile_card.cards or not isinstance(profile_card.cards, list): | |
| raise ValueError("Invalid profile_card: 'cards' is missing or not a list.") | |
| profile_image = profile_card.cards[0].card # Extract the PIL image from the first card | |
| with BytesIO() as byte_io: | |
| profile_image.save(byte_io, "PNG") # Save the image as PNG | |
| byte_io.seek(0) | |
| image_url = upload_imagee(byte_io) # Upload the image | |
| return { | |
| "url": image_url, | |
| } | |
| # Process all the images returned | |
| def process_images(result, player_id): | |
| characters = [] | |
| with concurrent.futures.ThreadPoolExecutor() as executor: | |
| futures = [executor.submit(process_image, dt, player_id) for dt in result.card] | |
| for future in concurrent.futures.as_completed(futures): | |
| try: | |
| characters.append(future.result()) | |
| except Exception as e: | |
| print(f"Error processing image: {e}") | |
| return characters | |
| def process_imagess(result, player_id): | |
| characters = [] | |
| with concurrent.futures.ThreadPoolExecutor() as executor: | |
| futures = [executor.submit(process_image, dt, player_id) for dt in result.cards] | |
| for future in concurrent.futures.as_completed(futures): | |
| try: | |
| characters.append(future.result()) | |
| except Exception as e: | |
| print(f"Error processing image: {e}") | |
| return characters | |
| # ZZZ Card and Profile creation functions | |
| async def zenless_card(uid, character_id=None, character_art=None): | |
| config = zenka.Config( | |
| asset_save=True, | |
| hide_uid=False, | |
| ) | |
| try: | |
| # Simplified to match your example exactly | |
| async with zenka.Client( | |
| lang=Lang.EN, | |
| config=config, | |
| character_art=character_art, | |
| character_id=character_id | |
| ) as client: | |
| data = await client.card(uid) | |
| return data | |
| except ZZZError as e: | |
| raise Exception(f"Zenless Zone Zero Error: Code:{e.code} Message: {e.text}") | |
| except Exception as e: | |
| # Add more detailed error information | |
| import traceback | |
| error_details = traceback.format_exc() | |
| raise Exception(f"Zenless Zone Zero Error: {str(e)}\n{error_details}") | |
| async def zenless_profile(uid): | |
| config = zenka.Config( | |
| asset_save=True, | |
| hide_uid=False, | |
| ) | |
| try: | |
| # Simplified to match your example exactly | |
| async with zenka.Client( | |
| lang=Lang.EN, | |
| config=config | |
| ) as client: | |
| data = await client.profile(uid) | |
| return data | |
| except ZZZError as e: | |
| raise Exception(f"Zenless Zone Zero Error: Code:{e.code} Message: {e.text}") | |
| except Exception as e: | |
| # Add more detailed error information | |
| import traceback | |
| error_details = traceback.format_exc() | |
| raise Exception(f"Zenless Zone Zero Error: {str(e)}\n{error_details}") | |
| # Add these routes to your FastAPI app | |
| async def zenless_characters(uid: int, character_id: str = None, character_art_url: str = None): | |
| try: | |
| # Process character_id from string to list if provided | |
| char_id = None | |
| if character_id: | |
| char_id = [int(id) if id.isdigit() else id for id in character_id.split(',')] | |
| # Process character_art from URL to dict if provided | |
| char_art = None | |
| if character_id and character_art_url: | |
| # Create a dictionary mapping character IDs to art URLs | |
| char_ids = char_id if isinstance(char_id, list) else [] | |
| # The key in the dictionary must be a string | |
| if char_ids: | |
| char_art = {char_ids[0]: character_art_url} | |
| result = await zenless_card(uid, char_id, char_art) | |
| characters = process_imagess(result, uid) | |
| return JSONResponse(content={'response': characters}) | |
| except Exception as e: | |
| return JSONResponse(content={'error': 'UNKNOWN ERR: ' + str(e)}, status_code=500) | |
| async def zenless_card(uid, character_id=None, character_art=None): | |
| config = zenka.Config( | |
| asset_save=True, | |
| hide_uid=False, # Fixed typo from Falde to False | |
| ) | |
| try: | |
| # Make sure character_art is properly formatted as a dictionary | |
| async with zenka.Client( | |
| lang=Lang.EN, | |
| config=config, | |
| character_art=character_art, # This should be a dict like {"1121": "https://example.com/image.webp"} | |
| character_id=character_id # This should be a list like [1151, "1121"] | |
| ) as client: | |
| data = await client.card(uid) | |
| return data | |
| except ZZZError as e: | |
| raise Exception(f"Zenless Zone Zero Error: Code:{e.code} Message: {e.text}") | |
| except Exception as e: | |
| import traceback | |
| error_details = traceback.format_exc() | |
| raise Exception(f"Zenless Zone Zero Error: {str(e)}\n{error_details}") | |
| async def zenless_profile_route(uid: int): | |
| try: | |
| result = await zenless_profile(uid) | |
| profile_data = process_profilee(result) | |
| return JSONResponse(content={'response': profile_data}) | |
| except Exception as e: | |
| return JSONResponse(content={'error': 'UNKNOWN ERR: ' + str(e)}, status_code=500) | |
| async def check_train_score(ltoken_v2: str, ltuid_v2: str): | |
| """ | |
| Check if the Star Rail training score is at maximum (500). | |
| Parameters: | |
| - ltoken_v2: Your ltoken_v2 cookie value | |
| - ltuid_v2: Your ltuid_v2 cookie value as an integer | |
| Returns: | |
| - "yes" if train score is 500 | |
| - "no" if train score is less than 500 | |
| """ | |
| try: | |
| # Convert ltuid_v2 to int if it's passed as a string | |
| ltuid_v2_int = int(ltuid_v2) | |
| # Set up cookies for authentication | |
| cookies = { | |
| "ltoken_v2": ltoken_v2, | |
| "ltuid_v2": ltuid_v2_int | |
| } | |
| # Initialize genshin client | |
| client = genshin.Client(cookies) | |
| # Fetch Star Rail notes data | |
| data = await client.get_starrail_notes() | |
| # Check train score | |
| if data.current_train_score >= 500: | |
| return "yes" | |
| else: | |
| return "no" | |
| except genshin.errors.InvalidCookies: | |
| raise HTTPException(status_code=401, detail="Invalid cookies provided") | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Error checking train score: {str(e)}") | |
| class GameOptions(str, Enum): | |
| STARRAIL = "hsr" | |
| GENSHIN = "genshin" | |
| HONKAI = "hi3rd" | |
| ZZZ = "zenless" | |
| class RewardInfo(BaseModel): | |
| name: str | |
| amount: int | |
| icon: str = None | |
| async def claim_daily_reward( | |
| ltoken_v2: str, | |
| ltuid_v2: str, | |
| game: GameOptions = Query(..., description="Game to claim rewards for") | |
| ): | |
| """ | |
| Claim daily reward for selected Hoyoverse game. | |
| Parameters: | |
| - ltoken_v2: Your ltoken_v2 cookie value | |
| - ltuid_v2: Your ltuid_v2 cookie value as an integer | |
| - game: Game to claim rewards for (hsr, genshin, hi3rd, zenless) | |
| Returns: | |
| - Claim result information including reward details | |
| """ | |
| try: | |
| # Convert ltuid_v2 to int if it's passed as a string | |
| ltuid_v2_int = int(ltuid_v2) | |
| # Set up cookies for authentication | |
| cookies = { | |
| "ltoken_v2": ltoken_v2, | |
| "ltuid_v2": ltuid_v2_int | |
| } | |
| # Initialize genshin client | |
| client = genshin.Client(cookies) | |
| # Map the game option to genshin library enum | |
| game_map = { | |
| GameOptions.STARRAIL: genshin.Game.STARRAIL, | |
| GameOptions.GENSHIN: genshin.Game.GENSHIN, | |
| GameOptions.HONKAI: genshin.Game.HONKAI, | |
| GameOptions.ZZZ: genshin.Game.ZZZ | |
| } | |
| selected_game = game_map[game] | |
| # Claim daily reward | |
| result = await client.claim_daily_reward(game=selected_game) | |
| # Parse reward information | |
| reward = { | |
| "name": getattr(result, "name", "Unknown"), | |
| "amount": getattr(result, "amount", 0), | |
| "icon": getattr(result, "icon", None) | |
| } | |
| return { | |
| "success": True, | |
| "message": f"Successfully claimed daily reward for {game}", | |
| "reward": reward | |
| } | |
| except genshin.errors.InvalidCookies: | |
| raise HTTPException(status_code=401, detail="Invalid cookies provided") | |
| except genshin.errors.AlreadyClaimed: | |
| return { | |
| "success": False, | |
| "message": f"Daily reward for {game} already claimed today" | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Error claiming daily reward: {str(e)}") | |
| auth = md.auth.Auth() | |
| async def fetch_and_upload_pdf( | |
| chapter_id: str = Query(..., description="Comma-separated MangaDex chapter IDs to fetch images for"), | |
| as_pdf: bool = Query(True, description="Set to False to return image URLs without creating a PDF"), | |
| page: str = Query(None, description="Specific page(s) to fetch, e.g., '1' or '1-5'") | |
| ): | |
| """ | |
| Fetch images from specified chapters and optionally combine them into a single PDF. | |
| """ | |
| try: | |
| chapter_ids = chapter_id.split(",") | |
| all_images = [] | |
| # Fetch images for each chapter | |
| chapter = md.series.Chapter(auth=auth) | |
| for chapter_id in chapter_ids: | |
| chapter_data = chapter.get_chapter_by_id(chapter_id=chapter_id) | |
| images = chapter_data.fetch_chapter_images() | |
| if not images: | |
| raise HTTPException(status_code=404, detail=f"No images found for chapter ID {chapter_id}.") | |
| all_images.extend(images) | |
| # Filter images by page if applicable | |
| if page: | |
| pages = [] | |
| try: | |
| if '-' in page: | |
| start, end = map(int, page.split('-')) | |
| pages = list(range(start, end + 1)) | |
| else: | |
| pages = [int(page)] | |
| except ValueError: | |
| raise HTTPException(status_code=400, detail="Invalid page query format.") | |
| all_images = [all_images[i - 1] for i in pages if 1 <= i <= len(all_images)] | |
| if not all_images: | |
| raise HTTPException(status_code=404, detail="No images found for the selected pages.") | |
| # Return image URLs if not converting to PDF | |
| if not as_pdf: | |
| return JSONResponse(content={"image_urls": all_images}) | |
| # Generate PDF from images | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as temp_pdf: | |
| pdf_path = temp_pdf.name | |
| c = canvas.Canvas(pdf_path) | |
| async with httpx.AsyncClient() as client: | |
| for image_url in all_images: | |
| response = await client.get(image_url) | |
| response.raise_for_status() | |
| # Save image to temporary file | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as temp_image: | |
| temp_image.write(response.content) | |
| temp_image_path = temp_image.name | |
| # Process image | |
| img = PILImage.open(temp_image_path) | |
| img = img.convert("RGB") | |
| # Save a converted image (JPEG format) | |
| converted_image_path = temp_image_path + "_converted.jpg" | |
| img.save(converted_image_path, "JPEG") | |
| # Get image dimensions | |
| width, height = img.size | |
| # Add image to PDF | |
| c.setPageSize((width, height)) | |
| c.drawImage(converted_image_path, 0, 0, width, height) | |
| c.showPage() | |
| # Clean up temporary files | |
| os.remove(temp_image_path) | |
| os.remove(converted_image_path) | |
| c.save() | |
| # Compress the PDF | |
| compressed_pdf_path = f"static/{chapter_id}_compressed.pdf" | |
| with pikepdf.open(pdf_path) as pdf: | |
| pdf.save(compressed_pdf_path) | |
| # Clean up temporary PDF file | |
| os.remove(pdf_path) | |
| # Return the URL to the compressed PDF | |
| return JSONResponse(content={"file_url": f"/static/{chapter_id}_compressed.pdf"}) | |
| except Exception as e: | |
| return JSONResponse(content={"error": f"Failed to process the chapters: {str(e)}"}, status_code=500) | |
| async def fetch_manga_details( | |
| title: str = Query(..., description="Manga title to search for") | |
| ): | |
| """ | |
| Fetch manga details by title from MangaDex. | |
| """ | |
| try: | |
| manga = md.series.Manga(auth=auth) | |
| manga_list = manga.get_manga_list(title=title) | |
| if not manga_list: | |
| return JSONResponse( | |
| content={"error": f"No manga found with the title '{title}'"}, | |
| status_code=404 | |
| ) | |
| manga_details = [] | |
| for m in manga_list: | |
| manga_details.append({ | |
| "id": m.manga_id, | |
| "title": m.title.get("en", "No English Title"), | |
| "alt_titles": m.altTitles, | |
| "description": m.description.get("en", "No English Description"), | |
| "status": m.status, | |
| "content_rating": m.contentRating, | |
| "last_volume": m.lastVolume, | |
| "last_chapter": m.lastChapter, | |
| "year": m.year, | |
| "original_language": m.originalLanguage, | |
| "created_at": str(m.createdAt) if m.createdAt else None, | |
| "uploaded_at": str(getattr(m, "uploadedAt", None)) | |
| }) | |
| return JSONResponse(content={"manga_list": manga_details}) | |
| except Exception as e: | |
| return JSONResponse( | |
| content={"error": f"Failed to fetch manga details: {str(e)}"}, | |
| status_code=500 | |
| ) | |
| cookies = {"ltuid_v2": os.getenv("ltuid_v2"), "ltoken_v2": os.getenv("ltoken_v2"),"ltmid_v2":os.getenv("ltmid_v2"),"cookie_token_v2":os.getenv("cookie_token_v2")} | |
| client = Client(cookies) | |
| # Helper function to format data | |
| def format_user_data(user): | |
| try: | |
| # Info section | |
| info = f"• Info •\n" | |
| info += f"{getattr(user.stats, 'days_active', 'N/A')} Active Days\n" | |
| info += f"{getattr(user.stats, 'achievements', 'N/A')} Achievements\n" | |
| info += f"{len(user.characters) if hasattr(user, 'characters') and isinstance(user.characters, list) else 'N/A'} Characters\n" | |
| info += f"{getattr(user.stats, 'unlocked_waypoints', 'N/A')} Waypoints\n" | |
| info += f"{getattr(user.stats, 'unlocked_domains', 'N/A')} Domains\n" | |
| info += f"{getattr(user.stats, 'dendroculi', 'N/A')} Dendroculus\n" | |
| info += f"{getattr(user.stats, 'electroculi', 'N/A')} Electroculus\n" | |
| info += f"{getattr(user.stats, 'anemoculi', 'N/A')} Anemoculus\n" | |
| info += f"{getattr(user.stats, 'geoculi', 'N/A')} Geoculus\n" | |
| info += f"Spiral Abyss {getattr(user.stats, 'spiral_abyss', 'N/A')}\n" | |
| # Chests section | |
| chests = f"• Chests •\n" | |
| chests += f"{getattr(user.stats, 'remarkable_chests', 'N/A')} Remarkable\n" | |
| chests += f"{getattr(user.stats, 'luxurious_chests', 'N/A')} Luxurious\n" | |
| chests += f"{getattr(user.stats, 'precious_chests', 'N/A')} Precious\n" | |
| chests += f"{getattr(user.stats, 'exquisite_chests', 'N/A')} Exquisite\n" | |
| chests += f"{getattr(user.stats, 'common_chests', 'N/A')} Common\n" | |
| # Exploration section | |
| exploration = f"• Exploration •\n" | |
| if hasattr(user, 'explorations') and isinstance(user.explorations, list): | |
| for exploration_data in user.explorations: | |
| percent = f"{getattr(exploration_data, 'raw_explored', 0) / 10.0:.1f}%" | |
| exploration += f"{getattr(exploration_data, 'name', 'Unknown')} {percent}\n" | |
| if hasattr(exploration_data, 'offerings') and isinstance(exploration_data.offerings, list): | |
| for offering in exploration_data.offerings: | |
| exploration += f" {getattr(offering, 'name', 'Unknown')} level {getattr(offering, 'level', 'N/A')}\n" | |
| else: | |
| exploration += "No exploration data available.\n" | |
| # Teapot section | |
| teapot = f"• Teapot •\n" | |
| if hasattr(user.teapot, 'realms') and isinstance(user.teapot.realms, list) and len(user.teapot.realms) > 0: | |
| teapot += f"{getattr(user.teapot.realms[0], 'name', 'Unknown')}\n" | |
| teapot += f"Level {getattr(user.teapot, 'level', 'N/A')} ({getattr(user.teapot, 'comfort_name', 'Unknown')})\n" | |
| teapot += f"Comfort {getattr(user.teapot, 'comfort', 'N/A')}\n" | |
| teapot += f"Visits {len(user.teapot.visitors) if hasattr(user.teapot, 'visitors') and isinstance(user.teapot.visitors, list) else 'N/A'}\n" | |
| teapot += f"Items {getattr(user.teapot, 'items', 'N/A')}\n" | |
| return f"{info}\n{chests}\n{exploration}\n{teapot}" | |
| except Exception as e: | |
| # Log unexpected issues in the formatting process | |
| return f"Error formatting user data: {e}" | |
| async def get_user_info(uid: int): | |
| try: | |
| # Fetch user data from the client | |
| user = await client.get_genshin_user(uid) | |
| # Format user data | |
| formatted_data = format_user_data(user) | |
| return {"uid": uid, "data": formatted_data} | |
| except Exception as e: | |
| # Log and return any errors encountered during API call or formatting | |
| return {"error": str(e)} | |
| async def fetch_spiral_abyss_data(uid: int, previous: bool = False): | |
| """Fetches Spiral Abyss data using the genshin API.""" | |
| return await client.get_spiral_abyss(uid, previous=previous) | |
| def format_abyss_data(data): | |
| """Formats the Spiral Abyss data into the desired format.""" | |
| def format_rank(rank): | |
| """Formats a rank entry with its value and character name.""" | |
| return f"{rank[0].value} ({rank[0].name})" if rank else "N/A" | |
| return { | |
| "Season": data.season, | |
| "Start Time": data.start_time.isoformat(), | |
| "End Time": data.end_time.isoformat(), | |
| "Total Battles": data.total_battles, | |
| "Total Wins": data.total_wins, | |
| "Max Floor": data.max_floor, | |
| "Total Stars": data.total_stars, | |
| "Ranks": { | |
| "Most Played": [f"{char.name} ({char.value})" for char in data.ranks.most_played], | |
| "Most Kills": format_rank(data.ranks.most_kills), | |
| "Strongest Strike": format_rank(data.ranks.strongest_strike), | |
| "Most Damage Taken": format_rank(data.ranks.most_damage_taken), | |
| "Most Bursts Used": format_rank(data.ranks.most_bursts_used), | |
| "Most Skills Used": format_rank(data.ranks.most_skills_used), | |
| }, | |
| "Floors": [ | |
| { | |
| "Floor": floor.floor, | |
| "Unlocked": floor.unlocked, | |
| "Stars": f"{floor.stars}/{floor.max_stars}", | |
| "Chambers": [ | |
| { | |
| "Chamber": chamber.chamber, | |
| "Stars": f"{chamber.stars}/{chamber.max_stars}", | |
| } | |
| for chamber in floor.chambers | |
| ], | |
| } | |
| for floor in data.floors | |
| ], | |
| } | |
| async def get_spiral_abyss( | |
| id: int = Query(...), | |
| previous: bool = Query(False) | |
| ): | |
| """ | |
| Fetch and return Spiral Abyss data for the given user ID. | |
| Query parameters: | |
| - `id`: The user ID to fetch data for. | |
| - `previous`: Whether to fetch the previous season's data (default: False). | |
| """ | |
| try: | |
| data = await fetch_spiral_abyss_data(id, previous) | |
| formatted_data = format_abyss_data(data) | |
| return formatted_data | |
| except Exception as e: | |
| return {"error": str(e)} | |
| CHARACTER_MAPPING = { | |
| 8001: "Trailblazer (Physical)", | |
| 8002: "Trailblazer (Physical)", | |
| 8003: "Trailblazer (Fire)", | |
| 8004: "Trailblazer (Fire)", | |
| 8005: "Trailblazer (Imaginary)", | |
| 8006: "Trailblazer (Imaginary)", | |
| 8007: "Trailblazer (Ice)", | |
| 8008: "Trailblazer (Ice)", | |
| 1310: "Firefly", | |
| 1212: "Jingliu", | |
| 2212: "Jingliu", | |
| 1303: "Ruan Mei", | |
| 1208: "Fu Xuan", | |
| 1305: "Dr. Ratio", | |
| 1301: "Gallagher", | |
| 1306: "Sparkle", | |
| 1102: "Seele", | |
| 1205: "Blade", | |
| 2205: "Blade", | |
| 1006: "Silver Wolf", | |
| 2006: "Silver Wolf", | |
| 1005: "Kafka", | |
| 2005: "Kafka", | |
| 1307: "Black Swan", | |
| 1309: "Robin", | |
| 1112: "Topaz", | |
| 1211: "Bailu", | |
| 1107: "Clara", | |
| 1003: "Himeko", | |
| 1004: "Welt", | |
| 1221: "Yunli", | |
| 1104: "Gepard", | |
| 1101: "Bronya", | |
| 1203: "Luocha", | |
| 1204: "Jing Yuan", | |
| 1002: "Dan Heng", | |
| 1213: "Dan Heng IL", | |
| 1001: "March 7th", | |
| 1224: "March 7th (Hunt)", | |
| 1217: "Huohuo", | |
| 1302: "Argenti", | |
| 1009: "Asta", | |
| 1013: "Herta", | |
| 1401: "The Herta", | |
| 1103: "Serval", | |
| 1105: "Natasha", | |
| 1106: "Pela", | |
| 1108: "Sampo", | |
| 1109: "Hook", | |
| 1110: "Lynx", | |
| 1111: "Luka", | |
| 1201: "Qingque", | |
| 1202: "Tingyun", | |
| 1206: "Sushang", | |
| 1207: "Yukong", | |
| 1209: "Yanqing", | |
| 1314: "Jade", | |
| 1210: "Guinaifen", | |
| 1214: "Xueyi", | |
| 1215: "Hanya", | |
| 1312: "Misha", | |
| 1308: "Acheron", | |
| 1304: "Aventurine", | |
| 1315: "Boothill", | |
| 1218: "Jiaoqiu", | |
| 1228: "Moze", | |
| 1220: "Feixiao", | |
| 1222: "Lingsha", | |
| 1313: "Sunday", | |
| 1225: "Fugue", | |
| 1402: "Aglaea", | |
| 1403: "Tribbie", | |
| 1404: "Mydei", | |
| 1317: "Rappa", | |
| 1407: "Castorice", | |
| 1405: "Anaxa", | |
| 1409: "Hyacine", | |
| 1406: "Cipher", | |
| 1408: "Phainon", | |
| 1410: "Hysilens", | |
| 1412: "Cerydra", | |
| 1014: "Saber", | |
| 1015: "Archer" | |
| } | |
| def get_character_name(char_id: int) -> str: | |
| return CHARACTER_MAPPING.get(char_id, f"Unknown Character ({char_id})") | |
| def format_eidolon(rank: int) -> str: | |
| return f"E{rank}" | |
| def get_element_emoji(element: str) -> str: | |
| emoji_map = { | |
| "physical": "⚪", | |
| "fire": "🔥", | |
| "ice": "❄️", | |
| "lightning": "⚡", | |
| "wind": "🌪️", | |
| "quantum": "🔮", | |
| "imaginary": "🌌" | |
| } | |
| return emoji_map.get(element.lower(), "❓") | |
| def get_rarity_stars(rarity: int) -> str: | |
| return "⭐" * rarity | |
| async def get_apc_data( | |
| uid: int, | |
| ): | |
| """Get Apocalyptic Shadow data for a specific UID""" | |
| try: | |
| coookies = {"ltuid_v2": os.getenv("ltuid_v2"), "ltoken_v2": os.getenv("ltoken_v2"),"ltmid_v2":os.getenv("ltmid_v2"),"cookie_token_v2":os.getenv("cookie_token_v2")} | |
| cliient = genshin.Client(coookies, uid=uid) | |
| apc_data = await cliient.get_starrail_apc_shadow(uid=uid) | |
| # Format the data | |
| formatted_data = { | |
| "uid": uid, | |
| "summary": { | |
| "total_stars": apc_data.total_stars, | |
| "max_floor": apc_data.max_floor, | |
| "total_battles": apc_data.total_battles, | |
| "has_data": apc_data.has_data | |
| }, | |
| "current_season": None, | |
| "floors": [] | |
| } | |
| # Get current season info | |
| if apc_data.seasons: | |
| current_season = next((s for s in apc_data.seasons if s.status == "New"), None) | |
| if current_season: | |
| formatted_data["current_season"] = { | |
| "name": current_season.name, | |
| "status": current_season.status, | |
| "begin_time": str(current_season.begin_time), | |
| "end_time": str(current_season.end_time), | |
| "upper_boss": current_season.upper_boss.name_mi18n if current_season.upper_boss else None, | |
| "lower_boss": current_season.lower_boss.name_mi18n if current_season.lower_boss else None | |
| } | |
| # Format floors | |
| for floor in apc_data.floors: | |
| floor_data = { | |
| "id": floor.id, | |
| "name": floor.name, | |
| "stars": floor.star_num, | |
| "is_quick_clear": floor.is_quick_clear, | |
| "last_update": str(floor.last_update_time) if floor.last_update_time else None, | |
| "nodes": [] | |
| } | |
| for i, node in enumerate([floor.node_1, floor.node_2], 1): | |
| if node and node.avatars: | |
| node_data = { | |
| "node_number": i, | |
| "challenge_time": str(node.challenge_time) if node.challenge_time else None, | |
| "score": node.score, | |
| "boss_defeated": node.boss_defeated, | |
| "buff": { | |
| "name": node.buff.name, | |
| "description": node.buff.description | |
| } if node.buff else None, | |
| "team": [] | |
| } | |
| for char in node.avatars: | |
| char_name = get_character_name(char.id) | |
| node_data["team"].append({ | |
| "id": char.id, | |
| "name": char_name, | |
| "eidolon": format_eidolon(char.rank), | |
| "level": char.level, | |
| "element": char.element, | |
| "rarity": char.rarity, | |
| "display_name": f"{format_eidolon(char.rank)} {char_name}" | |
| }) | |
| floor_data["nodes"].append(node_data) | |
| formatted_data["floors"].append(floor_data) | |
| return formatted_data | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Error fetching data: {str(e)}") | |
| if __name__ == "__main__": | |
| uvicorn.run("main:app", host="0.0.0.0", port=7860, workers=8, timeout_keep_alive=60000) |