| from flask import Flask, request, jsonify |
| import asyncio |
| from gemini_webapi import GeminiClient |
| from functools import wraps |
| import os |
| import base64 |
| import json |
| from io import BytesIO |
| from PIL import Image |
|
|
| app = Flask(__name__) |
|
|
| def async_route(f): |
| @wraps(f) |
| def wrapper(*args, **kwargs): |
| return asyncio.run(f(*args, **kwargs)) |
| return wrapper |
|
|
| def parse_cookies(cookies_input): |
| cookies_dict = {} |
| |
| try: |
| if isinstance(cookies_input, str): |
| cookies_input = cookies_input.strip() |
| |
| if cookies_input.startswith('['): |
| cookies_list = json.loads(cookies_input) |
| for cookie in cookies_list: |
| if isinstance(cookie, dict): |
| name = cookie.get('name', '') |
| value = cookie.get('value', '') |
| if name and value: |
| cookies_dict[name] = value |
| else: |
| cookie_pairs = cookies_input.split(';') |
| for pair in cookie_pairs: |
| pair = pair.strip() |
| if '=' in pair: |
| name, value = pair.split('=', 1) |
| cookies_dict[name.strip()] = value.strip() |
| |
| elif isinstance(cookies_input, dict): |
| cookies_dict = cookies_input |
| |
| elif isinstance(cookies_input, list): |
| for cookie in cookies_input: |
| if isinstance(cookie, dict): |
| name = cookie.get('name', '') |
| value = cookie.get('value', '') |
| if name and value: |
| cookies_dict[name] = value |
| |
| except Exception as e: |
| raise ValueError(f"Failed to parse cookies: {str(e)}") |
| |
| secure_1psid = cookies_dict.get('__Secure-1PSID') |
| secure_1psidts = ( |
| cookies_dict.get('__Secure-1PSIDTS') or |
| cookies_dict.get('__Secure-3PSIDTS') or |
| cookies_dict.get('PSIDTS') or |
| '' |
| ) |
| |
| if not secure_1psid: |
| raise ValueError("__Secure-1PSID not found in cookies") |
| |
| return secure_1psid, secure_1psidts |
|
|
| def process_image(image_data): |
| try: |
| if image_data.startswith('data:image'): |
| image_data = image_data.split(',')[1] |
| |
| image_bytes = base64.b64decode(image_data) |
| image = Image.open(BytesIO(image_bytes)) |
| |
| return image |
| except Exception as e: |
| raise ValueError(f"Invalid image data: {str(e)}") |
|
|
| @app.route('/api/gemini', methods=['POST']) |
| @async_route |
| async def gemini_generate(): |
| try: |
| data = request.get_json() |
| |
| if not data: |
| return jsonify({"success": False, "error": "No JSON data provided"}), 400 |
| |
| cookies_input = data.get('cookies') |
| prompt = data.get('prompt') |
| model_name = data.get('model', 'gemini-2.5-flash') |
| image_data = data.get('image') |
| |
| if not cookies_input: |
| secure_1psid = data.get('secure_1psid') |
| secure_1psidts = data.get('secure_1psidts', '') |
| |
| if not secure_1psid: |
| return jsonify({ |
| "success": False, |
| "error": "cookies or secure_1psid is required" |
| }), 400 |
| else: |
| secure_1psid, secure_1psidts = parse_cookies(cookies_input) |
| |
| if not prompt: |
| return jsonify({"success": False, "error": "prompt is required"}), 400 |
| |
| client = GeminiClient(secure_1psid, secure_1psidts, proxy=None) |
| await client.init(timeout=30, auto_close=True, close_delay=300) |
| |
| try: |
| if image_data: |
| image = process_image(image_data) |
| response = await client.generate_content(prompt, files=[image], model=model_name) |
| else: |
| response = await client.generate_content(prompt, model=model_name) |
| |
| return jsonify({ |
| "success": True, |
| "response": response.text, |
| "model": model_name, |
| "prompt": prompt, |
| "has_image": bool(image_data), |
| "images": [{"title": img.title, "url": img.url, "alt": img.alt} for img in response.images] if response.images else [], |
| "thoughts": response.thoughts if hasattr(response, 'thoughts') else None |
| }), 200 |
| |
| finally: |
| await client.close() |
| |
| except ValueError as e: |
| return jsonify({"success": False, "error": str(e), "error_type": "ValidationError"}), 400 |
| except Exception as e: |
| return jsonify({"success": False, "error": str(e), "error_type": type(e).__name__}), 500 |
|
|
| @app.route('/api/gemini/image', methods=['POST']) |
| @async_route |
| async def gemini_image(): |
| try: |
| if 'image' not in request.files: |
| return jsonify({"success": False, "error": "No image file provided"}), 400 |
| |
| file = request.files['image'] |
| prompt = request.form.get('prompt') |
| cookies_input = request.form.get('cookies') |
| model_name = request.form.get('model', 'gemini-2.5-flash') |
| |
| if not cookies_input: |
| secure_1psid = request.form.get('secure_1psid') |
| secure_1psidts = request.form.get('secure_1psidts', '') |
| |
| if not secure_1psid: |
| return jsonify({"success": False, "error": "cookies or secure_1psid is required"}), 400 |
| else: |
| secure_1psid, secure_1psidts = parse_cookies(cookies_input) |
| |
| if not prompt: |
| return jsonify({"success": False, "error": "prompt is required"}), 400 |
| |
| image = Image.open(file.stream) |
| |
| client = GeminiClient(secure_1psid, secure_1psidts, proxy=None) |
| await client.init(timeout=30, auto_close=True, close_delay=300) |
| |
| try: |
| response = await client.generate_content(prompt, files=[image], model=model_name) |
| |
| return jsonify({ |
| "success": True, |
| "response": response.text, |
| "model": model_name, |
| "prompt": prompt, |
| "images": [{"title": img.title, "url": img.url, "alt": img.alt} for img in response.images] if response.images else [] |
| }), 200 |
| |
| finally: |
| await client.close() |
| |
| except Exception as e: |
| return jsonify({"success": False, "error": str(e), "error_type": type(e).__name__}), 500 |
|
|
| @app.route('/api/gemini/chat', methods=['POST']) |
| @async_route |
| async def gemini_chat(): |
| try: |
| data = request.get_json() |
| |
| if not data: |
| return jsonify({"success": False, "error": "No JSON data provided"}), 400 |
| |
| cookies_input = data.get('cookies') |
| messages = data.get('messages', []) |
| model_name = data.get('model', 'gemini-2.5-flash') |
| |
| if not cookies_input: |
| secure_1psid = data.get('secure_1psid') |
| secure_1psidts = data.get('secure_1psidts', '') |
| |
| if not secure_1psid: |
| return jsonify({"success": False, "error": "cookies or secure_1psid is required"}), 400 |
| else: |
| secure_1psid, secure_1psidts = parse_cookies(cookies_input) |
| |
| if not messages or len(messages) == 0: |
| return jsonify({"success": False, "error": "messages array is required"}), 400 |
| |
| client = GeminiClient(secure_1psid, secure_1psidts, proxy=None) |
| await client.init(timeout=30, auto_close=True, close_delay=300) |
| |
| try: |
| chat = client.start_chat(model=model_name) |
| |
| responses = [] |
| for msg in messages: |
| response = await chat.send_message(msg.get('content', '')) |
| responses.append({ |
| "text": response.text, |
| "images": [{"title": img.title, "url": img.url, "alt": img.alt} for img in response.images] if response.images else [] |
| }) |
| |
| return jsonify({ |
| "success": True, |
| "responses": responses, |
| "model": model_name, |
| "metadata": chat.metadata |
| }), 200 |
| |
| finally: |
| await client.close() |
| |
| except Exception as e: |
| return jsonify({"success": False, "error": str(e), "error_type": type(e).__name__}), 500 |
|
|
| @app.route('/api/gemini/generate-image', methods=['POST']) |
| @async_route |
| async def gemini_generate_image(): |
| try: |
| data = request.get_json() |
| |
| if not data: |
| return jsonify({"success": False, "error": "No JSON data provided"}), 400 |
| |
| cookies_input = data.get('cookies') |
| prompt = data.get('prompt') |
| model_name = data.get('model', 'gemini-2.5-flash') |
| |
| if not cookies_input: |
| secure_1psid = data.get('secure_1psid') |
| secure_1psidts = data.get('secure_1psidts', '') |
| |
| if not secure_1psid: |
| return jsonify({"success": False, "error": "cookies or secure_1psid is required"}), 400 |
| else: |
| secure_1psid, secure_1psidts = parse_cookies(cookies_input) |
| |
| if not prompt: |
| return jsonify({"success": False, "error": "prompt is required"}), 400 |
| |
| client = GeminiClient(secure_1psid, secure_1psidts, proxy=None) |
| await client.init(timeout=30, auto_close=True, close_delay=300) |
| |
| try: |
| response = await client.generate_content(f"Generate an image: {prompt}", model=model_name) |
| |
| generated_images = [] |
| for img in response.images: |
| generated_images.append({"title": img.title, "url": img.url, "alt": img.alt}) |
| |
| return jsonify({ |
| "success": True, |
| "text": response.text, |
| "images": generated_images, |
| "model": model_name, |
| "prompt": prompt |
| }), 200 |
| |
| finally: |
| await client.close() |
| |
| except Exception as e: |
| return jsonify({"success": False, "error": str(e), "error_type": type(e).__name__}), 500 |
|
|
| @app.route('/api/gemini/edit-image', methods=['POST']) |
| @async_route |
| async def gemini_edit_image(): |
| try: |
| if 'image' not in request.files: |
| return jsonify({"success": False, "error": "No image file provided"}), 400 |
| |
| file = request.files['image'] |
| prompt = request.form.get('prompt') |
| cookies_input = request.form.get('cookies') |
| model_name = request.form.get('model', 'gemini-2.5-flash') |
| |
| if not cookies_input: |
| secure_1psid = request.form.get('secure_1psid') |
| secure_1psidts = request.form.get('secure_1psidts', '') |
| |
| if not secure_1psid: |
| return jsonify({"success": False, "error": "cookies or secure_1psid is required"}), 400 |
| else: |
| secure_1psid, secure_1psidts = parse_cookies(cookies_input) |
| |
| if not prompt: |
| return jsonify({"success": False, "error": "prompt is required"}), 400 |
| |
| image = Image.open(file.stream) |
| |
| client = GeminiClient(secure_1psid, secure_1psidts, proxy=None) |
| await client.init(timeout=30, auto_close=True, close_delay=300) |
| |
| try: |
| edit_prompt = f"Edit this image: {prompt}" |
| response = await client.generate_content(edit_prompt, files=[image], model=model_name) |
| |
| edited_images = [] |
| for img in response.images: |
| edited_images.append({"title": img.title, "url": img.url, "alt": img.alt}) |
| |
| return jsonify({ |
| "success": True, |
| "text": response.text, |
| "images": edited_images, |
| "model": model_name, |
| "prompt": prompt |
| }), 200 |
| |
| finally: |
| await client.close() |
| |
| except Exception as e: |
| return jsonify({"success": False, "error": str(e), "error_type": type(e).__name__}), 500 |
|
|
| @app.route('/api/health', methods=['GET']) |
| def health_check(): |
| return jsonify({ |
| "status": "ok", |
| "service": "Gemini Flask API", |
| "features": ["text", "image", "multimodal", "chat", "generate_image", "edit_image"] |
| }), 200 |
|
|
| @app.route('/api/models', methods=['GET']) |
| def list_models(): |
| return jsonify({ |
| "success": True, |
| "models": [ |
| {"name": "gemini-2.5-flash", "description": "Gemini 2.5 Flash", "supports_image": True}, |
| {"name": "gemini-2.5-pro", "description": "Gemini 2.5 Pro", "supports_image": True}, |
| {"name": "gemini-2.0-flash", "description": "Gemini 2.0 Flash", "supports_image": True} |
| ] |
| }), 200 |
|
|
| @app.route('/', methods=['GET']) |
| def index(): |
| return jsonify({ |
| "service": "Gemini Flask API", |
| "version": "4.0.0", |
| "features": ["text", "image", "multimodal", "chat", "generate_image", "edit_image", "auto_cookie_parser"], |
| "endpoints": { |
| "POST /api/gemini": "Generate text with optional image", |
| "POST /api/gemini/image": "Analyze uploaded image", |
| "POST /api/gemini/chat": "Multi-turn conversation", |
| "POST /api/gemini/generate-image": "Generate images", |
| "POST /api/gemini/edit-image": "Edit images", |
| "GET /api/health": "Health check", |
| "GET /api/models": "List models", |
| "GET /": "API documentation" |
| }, |
| "usage": { |
| "cookies_format": { |
| "json_array": "[{\"name\":\"__Secure-1PSID\",\"value\":\"...\"},...]", |
| "cookie_string": "__Secure-1PSID=...; __Secure-1PSIDTS=...", |
| "dict": "{\"__Secure-1PSID\": \"...\", \"__Secure-1PSIDTS\": \"...\"}" |
| }, |
| "example_request": { |
| "url": "POST /api/gemini", |
| "body": { |
| "cookies": "[{\"name\":\"__Secure-1PSID\",\"value\":\"g.a000...\"}]", |
| "prompt": "Hello Gemini", |
| "model": "gemini-2.5-flash" |
| } |
| } |
| } |
| }), 200 |
|
|
| if __name__ == '__main__': |
| port = int(os.environ.get('PORT', 7860)) |
| app.run(debug=False, host='0.0.0.0', port=port) |