Spaces:
Paused
Paused
| from asyncio import create_subprocess_shell, create_task, gather, run, sleep | |
| from json import dumps | |
| from logging import ERROR, INFO, basicConfig, getLogger | |
| from pathlib import Path | |
| from shutil import rmtree | |
| from subprocess import CalledProcessError, PIPE | |
| from typing import Any, List | |
| from uuid import uuid4 | |
| from PIL import Image | |
| from aiorentry.client import Client as RentryClient | |
| from fastapi import FastAPI, HTTPException | |
| from fastapi.responses import PlainTextResponse | |
| from httpx import AsyncClient, HTTPStatusError, RequestError | |
| from lxml import html | |
| from markdown import markdown | |
| from pydantic import BaseModel, HttpUrl | |
| from uvicorn import run as uvicorn_run | |
| proxy_endpoint = 'https://telegraphprxver.vercel.app' | |
| need_logging = False | |
| basicConfig(level=INFO if need_logging else ERROR) | |
| logger = getLogger(__name__) | |
| oxipng_bin = Path(__file__).parent / 'oxipng' | |
| if not oxipng_bin.stat().st_mode & 0o111: | |
| oxipng_bin.chmod(0o755) | |
| tokens = [ | |
| # это токены из https://api.imgbb.com/ | |
| '4cf4defe2fe567d23cbbeda7d1c83849', | |
| '847d3a233726220a07f5cdb2205f555f', | |
| '25e8af988ace6f59308edb34004215fb', | |
| '0689cddb67dbc43daa3f0fecefccb77f', | |
| 'b57bc7d3220ba635a9c00eb0f94451c7', | |
| ] | |
| class UserInfo: | |
| def __init__(self, token: str, short_name: str, author_name: str, author_url: str, auth_link: str): | |
| self.token = token | |
| self.short_name = short_name | |
| self.author_name = author_name | |
| self.author_url = author_url | |
| self.auth_link = auth_link | |
| def get_token(self) -> str: | |
| return self.token | |
| def get_short_name(self) -> str: | |
| return self.short_name | |
| def get_author_name(self) -> str: | |
| return self.author_name | |
| def get_author_url(self) -> str: | |
| return self.author_url | |
| def get_auth_link(self) -> str: | |
| return self.auth_link | |
| def __repr__(self): | |
| return f'UserAuthInfo(token={self.token}, short_name={self.short_name}, author_name={self.author_name}, author_url={self.author_url}, auth_link={self.auth_link})' | |
| async def create_account(short_name: str, author_name: str, author_url: str) -> UserInfo: | |
| params = { | |
| 'short_name': short_name, | |
| 'author_name': author_name, | |
| 'author_url': author_url | |
| } | |
| async with AsyncClient(verify=False, follow_redirects=True, timeout=30.0) as client: | |
| response = await client.get(f'{proxy_endpoint}/createAccount', params=params) | |
| json_response: dict = response.json() | |
| if json_response.get('ok'): | |
| result = json_response.get('result', {}) | |
| return UserInfo(result['access_token'], result['short_name'], result['author_name'], result['author_url'], result['auth_url']) | |
| tgph_acc = run(create_account('prolapse', 'prolapse', '')) | |
| def md_to_dom(markdown_text: str) -> list[dict[str, str | list | dict | None]]: | |
| def handle_heading(element) -> dict[str, str | list | dict | None]: | |
| if element.tag == 'h1': | |
| return {'tag': 'h3', 'children': parse_children(element)} | |
| elif element.tag == 'h2': | |
| return {'tag': 'h4', 'children': parse_children(element)} | |
| else: | |
| return {'tag': 'p', 'children': [{'tag': 'strong', 'children': parse_children(element)}]} | |
| def handle_list(element) -> dict[str, str | list | dict | None]: | |
| return {'tag': element.tag, 'children': [{'tag': 'li', 'children': parse_children(li)} for li in element.xpath('./li')]} | |
| def handle_link(element) -> dict[str, str | list | dict | None]: | |
| return {'tag': 'a', 'attrs': {'href': element.get('href')}, 'children': parse_children(element)} | |
| def handle_media(element) -> dict[str, str | list | dict | None]: | |
| return {'tag': element.tag, 'attrs': {'src': element.get('src')}} | |
| def parse_children(element) -> list[str | dict[str, str | list | dict | None]]: | |
| children = [] | |
| for child in element.iterchildren(): | |
| if child.tag: | |
| children.append(parse_element(child)) | |
| elif isinstance(child, str): | |
| children.append(child.strip()) | |
| if element.text and element.text.strip(): | |
| children.insert(0, element.text.strip()) | |
| if element.tail and element.tail.strip(): | |
| children.append(element.tail.strip()) | |
| return children | |
| def parse_element(element) -> dict[str, str | list | dict | None]: | |
| handlers = {'h1': handle_heading, 'h2': handle_heading, 'h3': handle_heading, 'h4': handle_heading, 'h5': handle_heading, 'h6': handle_heading, 'ul': handle_list, 'ol': handle_list, 'a': handle_link, 'img': handle_media, 'iframe': handle_media} | |
| handler = handlers.get(element.tag, lambda e: {'tag': e.tag, 'children': parse_children(e)}) | |
| return handler(element) | |
| html_content = markdown(markdown_text, extensions=['extra', 'sane_lists']) | |
| tree = html.fromstring(html_content) | |
| try: | |
| return [parse_element(element) for element in tree.body] | |
| except: | |
| return [parse_element(element) for element in tree.xpath('/*/*')] | |
| async def tgph_create_page(token: str, title: str, markdown_text: str) -> str: | |
| content = dumps(md_to_dom(markdown_text)) | |
| params = { | |
| 'access_token': token, | |
| 'title': title, | |
| 'content': content, | |
| 'return_content': False | |
| } | |
| async with AsyncClient(verify=False, follow_redirects=True, timeout=30.0) as client: | |
| response = await client.get(f'{proxy_endpoint}/createPage', params=params) | |
| json_response: dict = response.json() | |
| if json_response.get('ok'): | |
| result = json_response.get('result', {}) | |
| else: | |
| result = {} | |
| print(f'ошибка создания страницы: {json_response}') | |
| print(markdown_text) | |
| print(content) | |
| return result.get('path') | |
| async def tgph_edit_page(token: str, page: str, title: str, markdown_text: str) -> str: | |
| content = dumps(md_to_dom(markdown_text)) | |
| data = { | |
| 'access_token': token, | |
| 'path': page, | |
| 'title': title, | |
| 'content': content, | |
| 'return_content': False | |
| } | |
| async with AsyncClient(verify=False, follow_redirects=True, timeout=30.0) as client: | |
| response = await client.post(f'{proxy_endpoint}/editPage', data=data) | |
| json_response = response.json() | |
| if json_response.get('ok'): | |
| result = json_response.get('result', {}) | |
| else: | |
| result = {} | |
| return result.get('path', '') | |
| async def download_png(url: str, folder: str, client: AsyncClient, retries: int = 5) -> Path: | |
| # print(f'загрузка изображения: {url}') | |
| for attempt in range(retries): | |
| try: | |
| response = await client.get(url) | |
| response.raise_for_status() | |
| file_path = Path(__file__).parent / folder / f'{uuid4()}.png' | |
| file_path.parent.mkdir(parents=True, exist_ok=True) | |
| file_path.write_bytes(response.content) | |
| return file_path | |
| except (HTTPStatusError, RequestError) as e: | |
| if attempt < retries - 1: | |
| await sleep(2 ** attempt) | |
| else: | |
| raise e | |
| async def download_pngs(urls: str | list[str]) -> list[Any]: | |
| urls = [urls] if isinstance(urls, str) else urls | |
| # print(f'скачивается список список из {len(urls)}: {urls}') | |
| # бот coze имеет баг, и из воркфлоу прибавляет предыдущий ответ к ссылкам, если включен контекст чата: | |
| valid_urls = [url for url in urls if url and '\n' not in url and '\\n' not in url and url.strip() != ''] | |
| if len(valid_urls) != len(urls): | |
| print(f'некорректные ссылки удалены из списка: {set(urls) - set(valid_urls)}') | |
| async with AsyncClient(verify=False, follow_redirects=True, timeout=30.0) as client: | |
| tasks = [download_png(url, str(uuid4()), client) for url in valid_urls] | |
| return list(await gather(*tasks)) | |
| async def optimize_png(image_path: Path, retries: int = 3) -> None: | |
| command = f'{oxipng_bin.resolve()} --opt 2 --strip safe --out {image_path} {image_path}' | |
| # print(f'оптимизация картинки {image_path}') | |
| for attempt in range(retries): | |
| try: | |
| process = await create_subprocess_shell(command, stdout=PIPE, stderr=PIPE) | |
| stdout, stderr = await process.communicate() | |
| if process.returncode == 0: | |
| return | |
| else: | |
| raise CalledProcessError(process.returncode, command, output=stdout, stderr=stderr) | |
| except CalledProcessError as e: | |
| print(f'ошибка при оптимизации {image_path}') | |
| if attempt < retries - 1: | |
| await sleep(2 ** attempt) | |
| else: | |
| raise e | |
| async def convert_to_jpeg(image_path: Path) -> Path: | |
| # print(f'конвертируется {image_path}') | |
| try: | |
| image = Image.open(image_path) | |
| output_path = image_path.with_suffix('.jpg') | |
| image.save(output_path, 'JPEG', quality=98, optimize=True) | |
| image_path.unlink(missing_ok=True) | |
| return output_path | |
| except: | |
| print(f'ошибка при конвертации {image_path}') | |
| return image_path | |
| async def convert_to_jpegs(image_paths: list[str | Path] | str | Path) -> tuple[Path]: | |
| image_paths = [Path(image_file) for image_file in ([image_paths] if not isinstance(image_paths, list) else image_paths)] | |
| # print(f'оптимизируется список список из {len(image_paths)}: {image_paths}') | |
| tasks = [convert_to_jpeg(image_path) for image_path in image_paths] | |
| return await gather(*tasks) | |
| async def optimize_pngs(image_paths: list[str | Path] | str | Path) -> None: | |
| image_paths = [Path(image_file) for image_file in ([image_paths] if not isinstance(image_paths, list) else image_paths)] | |
| # print(f'оптимизируется список список из {len(image_paths)}: {image_paths}') | |
| tasks = [optimize_png(image_path) for image_path in image_paths] | |
| await gather(*tasks) | |
| async def upload_image_to_imgbb(file_path: Path, file_type: str = 'png') -> str | None: | |
| for token in tokens: | |
| url = f'https://api.imgbb.com/1/upload?key={token}' | |
| try: | |
| with file_path.open('rb') as file: | |
| files = {'image': (file_path.name, file, f'image/{file_type}')} | |
| data = {} | |
| async with AsyncClient(verify=False, follow_redirects=True, timeout=30.0) as client: | |
| response = await client.post(url, files=files, data=data, timeout=30) | |
| response.raise_for_status() | |
| json = response.json() | |
| if json.get('success'): | |
| return json['data']['url'] | |
| except Exception as e: | |
| print(f"ошибка при загрузке с {token}: {e}") | |
| continue | |
| return None | |
| async def upload_image_to_freeimagehost(image_path: Path, file_type: str = 'png') -> str | None: | |
| try: | |
| async with AsyncClient(verify=False, follow_redirects=True, timeout=30.0) as client: | |
| with image_path.open("rb") as image_file: | |
| files = {'source': (image_path.name, image_file, f'image/{file_type}')} | |
| payload = {'key': '6d207e02198a847aa98d0a2a901485a5', 'action': 'upload', 'format': 'json'} | |
| response = await client.post('https://freeimage.host/api/1/upload', data=payload, files=files) | |
| response.raise_for_status() | |
| response_data = response.json() | |
| return response_data['image']['url'] | |
| except Exception as e: | |
| print(f'ошибка при загрузке {image_path}: {e}') | |
| return None | |
| async def upload_image(file_path: Path | str, file_type: str = 'png') -> str | None: | |
| file_path = Path(file_path) | |
| return await upload_image_to_imgbb(file_path, file_type) or await upload_image_to_freeimagehost(file_path, file_type) | |
| async def process_image(old_url: str, image_path: Path, convert: bool) -> tuple[str, Path]: | |
| new_url = await upload_image(image_path, 'png' if not convert else 'jpeg') | |
| if new_url: | |
| # print(f'загружено изображение {image_path} в {new_url}') | |
| pass | |
| else: | |
| new_url = old_url | |
| print(f'не удалось загрузить изображение {image_path}, оставим старую ссылку: {old_url}') | |
| try: | |
| image_path.unlink() | |
| except Exception as e: | |
| print(f'не удалось удалить файл {image_path}: {e}') | |
| return new_url, image_path | |
| async def optimize_and_upload(images_urls: List[str] | str, convert: bool = False) -> List[str]: | |
| images_urls = [images_urls] if isinstance(images_urls, str) else images_urls | |
| ph_link = None | |
| if convert: | |
| try: | |
| ph_link = await tgph_create_page(tgph_acc.get_token(), 'DAll-E v3', '*изображения скоро появятся, обнови страницу...*') | |
| except Exception as e: | |
| print(f'не получилось создать страницу на телеграфе: {e}') | |
| async with RentryClient('https://rentry.org') as client: | |
| page = await client.new_page('изображения скоро появятся, обнови страницу...') | |
| page_id, code = page.url, page.edit_code | |
| continue_task = create_task(continue_optimizing_and_uploading(images_urls, page_id, code, ph_link)) | |
| print(f'https://telegra.ph/{ph_link}', f'https://rentry.co/{page_id}', f'https://rentry.org/{page_id}') | |
| return [f'https://telegra.ph/{ph_link}', f'https://rentry.co/{page_id}', f'https://rentry.org/{page_id}'] if ph_link else [f'https://rentry.co/{page_id}', f'https://rentry.org/{page_id}'] | |
| else: | |
| return await continue_optimizing_and_uploading(images_urls) | |
| async def continue_optimizing_and_uploading(images_urls: list[str], page_id: str = None, code: str = None, ph_link: str = None) -> list[str]: | |
| images_paths = await download_pngs(images_urls) | |
| if not page_id: # convert=False | |
| await optimize_pngs(images_paths) | |
| new_images_urls = [] | |
| images_paths = images_paths if not page_id else await convert_to_jpegs(images_paths) | |
| tasks = [] | |
| for old_url, image_path in zip(images_urls, images_paths): | |
| tasks.append(process_image(old_url, image_path, page_id is not None)) | |
| results = await gather(*tasks) | |
| new_images_urls = [result[0] for result in results] | |
| print(f'новые ссылки: ({len(new_images_urls)}): {new_images_urls}') | |
| try: | |
| rmtree(images_paths[0].parent) | |
| except Exception as e: | |
| print(f'не удалось удалить файл {images_paths[0].parent}: {e}') | |
| content = '\n\n'.join([f'' for url in new_images_urls]) | |
| print(content) | |
| if ph_link: | |
| try: | |
| await tgph_edit_page(tgph_acc.get_token(), ph_link, 'DAll-E v3', content) | |
| except Exception as e: | |
| print(f'не удалось отредактировать на телеграфе: {e}') | |
| else: | |
| print(f'страница телеграф не предоставлена: {ph_link}') | |
| if page_id and code: | |
| try: | |
| async with RentryClient('https://rentry.org') as client: | |
| await client.edit_page(text=content, url=page_id, edit_code=code) | |
| except Exception as e: | |
| print(f'не удалось создать страницу в rentry: {e}') | |
| return new_images_urls if len(new_images_urls) > 0 and new_images_urls[0] else images_urls | |
| app = FastAPI() | |
| class ImageURLs(BaseModel): | |
| urls: List[HttpUrl] | |
| async def read_root(): | |
| return PlainTextResponse('ну пролапс, ну и что', status_code=200) | |
| async def optimize_images_endpoint(image_urls: ImageURLs): | |
| try: | |
| optimized_urls = await optimize_and_upload([str(url) for url in image_urls.urls]) | |
| return {"optimized_urls": optimized_urls} | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def optimize_images_endpoint(image_urls: ImageURLs): | |
| try: | |
| optimized_urls = await optimize_and_upload([str(url) for url in image_urls.urls], convert=True) | |
| return {"optimized_urls": optimized_urls} | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| if __name__ == "__main__": | |
| uvicorn_run(app, host='0.0.0.0', port=7860, timeout_keep_alive=90, log_level='info', use_colors=False) | |