import os import json import time import logging import numpy as np import google.generativeai as genai from flask import Flask, request, jsonify, render_template_string from dotenv import load_dotenv # Configurar logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # Cargar variables de entorno load_dotenv() # Variables para el control de rotación de API keys current_key_index = 0 response_counter = 0 ROTATION_THRESHOLD = 5 # Rotar cada 5 respuestas # Variables para el seguimiento del estado de las claves API active_keys = [] revoked_keys = [] def verify_api_key(api_key): """Verifica si una clave API de Gemini es válida de manera moderada para evitar rate limits.""" try: # Configurar temporalmente con esta clave genai.configure(api_key=api_key) # Intentar una solicitud simple y ligera # Usamos una solicitud mínima para evitar consumir cuota innecesariamente model = genai.GenerativeModel("gemini-pro") response = model.generate_content("test", generation_config={ "max_output_tokens": 5, "temperature": 0 }) # Si llegamos aquí, la clave es válida # Esperamos un breve momento para evitar rate limits time.sleep(1) return True except Exception as e: error_str = str(e).lower() # Detectar específicamente errores de rate limit if "rate limit" in error_str or "quota" in error_str: logger.warning(f"Rate limit detectado durante verificación: {str(e)}") # Asumimos que la clave es válida pero está limitada temporalmente time.sleep(2) # Esperamos más tiempo en caso de rate limit return True logger.warning(f"Clave API inválida: {str(e)}") return False # Obtener y procesar las claves API de Gemini def initialize_api_keys(): global active_keys, revoked_keys api_keys_str = os.getenv("GEMINI_API_KEY", "") api_keys = [key.strip() for key in api_keys_str.split(",") if key.strip()] if not api_keys: logger.warning("No se encontraron claves API en la configuración. El servidor se iniciará pero los endpoints no funcionarán.") return logger.info(f"Verificando {len(api_keys)} claves API...") # Verificar cada clave API con una pausa entre verificaciones for i, key in enumerate(api_keys): logger.info(f"Verificando clave API {i+1} de {len(api_keys)}...") if verify_api_key(key): active_keys.append(key) logger.info(f"Clave API {i+1} verificada con éxito") else: revoked_keys.append(key) logger.warning(f"Clave API {i+1} inválida o revocada") # Pausa entre verificaciones para evitar rate limits if i < len(api_keys) - 1: time.sleep(2) if not active_keys: logger.warning("No hay claves API válidas disponibles. El servidor se iniciará pero los endpoints no funcionarán.") else: logger.info(f"Inicialización completada: {len(active_keys)} claves activas, {len(revoked_keys)} claves revocadas") def get_current_api_key(): global current_key_index, response_counter # Verificar si hay claves activas if not active_keys: return None # Obtener la clave actual current_key = active_keys[current_key_index] # Incrementar el contador de respuestas response_counter += 1 # Verificar si es necesario rotar la clave if response_counter >= ROTATION_THRESHOLD: current_key_index = (current_key_index + 1) % len(active_keys) response_counter = 0 logger.info(f"Rotando a la clave API #{current_key_index + 1}") return current_key # Plantilla HTML para la página de estado con estilo de terminal STATUS_PAGE_TEMPLATE = """ Gemini Proxy Status

GEMINI PROXY

status
{ host: "{{ host }}:{{ port }}", activeKeys: {{ active_keys }}, revokedKeys: {{ revoked_keys }} }
{% if active_keys == 0 %}
[ERROR] No hay claves API activas. Los endpoints no funcionarán.
{% endif %}
endpoints --list
# Compatible con OpenAI POST http://{{ host }}:{{ port }}/v1/chat/completions POST http://{{ host }}:{{ port }}/v1/embeddings GET http://{{ host }}:{{ port }}/v1/models # API Nativa de Gemini POST http://{{ host }}:{{ port }}/gemini/v1/models/{model_id}:generateContent POST http://{{ host }}:{{ port }}/gemini/v1/models/{model_id}:embedContent GET http://{{ host }}:{{ port }}/gemini/v1/models
models --list
gemini-pro gemini-pro-vision gemini-2.5-pro-exp-03-25 gemini-2.0-flash gemini-2.0-flash-lite gemini-1.5-flash gemini-1.5-flash-8b gemini-1.5-pro gemini-embedding-exp
help
# Este es un proxy de API para Gemini que proporciona compatibilidad con OpenAI # Los endpoints están listados arriba # Para más información, consulta el README.md
Gemini Proxy v1.0.0 [Running on {{ host }}:{{ port }}] (c) 2024
""" app = Flask(__name__) @app.route("/") def index(): # Renderizar la plantilla con la información de estado host = request.host.split(':')[0] port = request.host.split(':')[1] if ':' in request.host else "7860" return render_template_string( STATUS_PAGE_TEMPLATE, active_keys=len(active_keys), revoked_keys=len(revoked_keys), host=host, port=port ) @app.route("/v1/chat/completions", methods=["POST"]) def chat_completions(): try: request_id = f"req-{int(time.time())}-{hash(str(request.json))}"[:15] logger.info(f"[{request_id}] Recibida solicitud de chat") # Verificar si hay claves API disponibles if not active_keys: error_msg = "No hay claves API disponibles. Configura al menos una clave API válida de Gemini." logger.error(f"[{request_id}] {error_msg}") return jsonify({"error": error_msg}), 503 data = request.json # Extraer y transformar los parámetros desde formato OpenAI a Gemini messages = data.get("messages", []) model = data.get("model", "gemini-pro") temperature = data.get("temperature", 0.7) max_tokens = data.get("max_tokens", 1024) # Obtener la clave API actual y configurar Gemini current_key = get_current_api_key() genai.configure(api_key=current_key) # Iniciar el modelo model_instance = genai.GenerativeModel(model) # Construir el prompt a partir de los mensajes prompt = "" for msg in messages: role = msg.get("role", "") content = msg.get("content", "") if role == "system": prompt += f"Instructions: {content}\n\n" elif role == "user": prompt += f"User: {content}\n\n" elif role == "assistant": prompt += f"Assistant: {content}\n\n" # Generar respuesta response = model_instance.generate_content( prompt, generation_config={ "temperature": temperature, "max_output_tokens": max_tokens, } ) # Formatear la respuesta en formato OpenAI-like response_data = { "id": f"chatcmpl-gemini-proxy-{int(time.time())}", "object": "chat.completion", "created": int(time.time()), "model": model, "choices": [ { "index": 0, "message": { "role": "assistant", "content": response.text }, "finish_reason": "stop" } ], "usage": { "prompt_tokens": -1, # No disponible exactamente como en OpenAI "completion_tokens": -1, # No disponible exactamente como en OpenAI "total_tokens": -1 # No disponible exactamente como en OpenAI } } logger.info(f"[{request_id}] Respuesta generada exitosamente") return jsonify(response_data) except Exception as e: logger.error(f"Error al procesar la solicitud: {str(e)}") return jsonify({"error": str(e)}), 500 @app.route("/v1/models", methods=["GET"]) def list_models(): # Devuelve una lista simplificada de modelos disponibles models = [ {"id": "gemini-pro", "object": "model", "created": 1677610602, "owned_by": "google"}, {"id": "gemini-pro-vision", "object": "model", "created": 1677649963, "owned_by": "google"}, {"id": "gemini-2.5-pro-exp-03-25", "object": "model", "created": 1709042403, "owned_by": "google"}, {"id": "gemini-2.0-flash", "object": "model", "created": 1708531204, "owned_by": "google"}, {"id": "gemini-2.0-flash-lite", "object": "model", "created": 1708617605, "owned_by": "google"}, {"id": "gemini-1.5-flash", "object": "model", "created": 1706536800, "owned_by": "google"}, {"id": "gemini-1.5-flash-8b", "object": "model", "created": 1706536801, "owned_by": "google"}, {"id": "gemini-1.5-pro", "object": "model", "created": 1706450400, "owned_by": "google"}, {"id": "gemini-embedding-exp", "object": "model", "created": 1705168800, "owned_by": "google"}, ] logger.info("Solicitud de lista de modelos") return jsonify({"data": models, "object": "list"}) # Endpoint compatible con la API nativa de Gemini @app.route("/gemini/v1/models/:generateContent", methods=["POST"]) def gemini_generate_content(model_id): try: request_id = f"gemini-req-{int(time.time())}-{hash(str(request.json))}"[:20] logger.info(f"[{request_id}] Recibida solicitud directa de Gemini para el modelo {model_id}") # Verificar si hay claves API disponibles if not active_keys: error_msg = "No hay claves API disponibles. Configura al menos una clave API válida de Gemini." logger.error(f"[{request_id}] {error_msg}") return jsonify({"error": error_msg}), 503 data = request.json # Obtener parámetros de la solicitud en formato Gemini contents = data.get("contents", []) generation_config = data.get("generationConfig", {}) # Obtener la temperatura y otros parámetros de configuración temperature = generation_config.get("temperature", 0.7) max_output_tokens = generation_config.get("maxOutputTokens", 1024) # Obtener la clave API actual y configurar Gemini current_key = get_current_api_key() genai.configure(api_key=current_key) # Iniciar el modelo model_instance = genai.GenerativeModel(model_id) # Generar respuesta usando la API nativa de Gemini response = model_instance.generate_content( contents, generation_config={ "temperature": temperature, "max_output_tokens": max_output_tokens, } ) # Formatear la respuesta en formato Gemini nativo response_data = { "candidates": [ { "content": { "parts": [ { "text": response.text } ], "role": "model" }, "finishReason": "STOP", "index": 0, "safetyRatings": [] } ], "promptFeedback": { "safetyRatings": [] } } logger.info(f"[{request_id}] Respuesta generada exitosamente con API nativa de Gemini") return jsonify(response_data) except Exception as e: logger.error(f"Error al procesar la solicitud nativa de Gemini: {str(e)}") return jsonify({"error": str(e)}), 500 # Endpoint para listar modelos en formato API nativa de Gemini @app.route("/gemini/v1/models", methods=["GET"]) def gemini_list_models(): models = [ { "name": "models/gemini-pro", "version": "001", "displayName": "Gemini Pro", "description": "The best Gemini model for scaling across a wide range of tasks", "inputTokenLimit": 30720, "outputTokenLimit": 2048, "supportedGenerationMethods": ["generateContent"] }, { "name": "models/gemini-pro-vision", "version": "001", "displayName": "Gemini Pro Vision", "description": "The best Gemini model for multimodal tasks", "inputTokenLimit": 12288, "outputTokenLimit": 4096, "supportedGenerationMethods": ["generateContent"] }, { "name": "models/gemini-2.5-pro-exp-03-25", "version": "001", "displayName": "Gemini 2.5 Pro Experimental", "description": "Experimental version of Gemini 2.5 Pro", "inputTokenLimit": 32768, "outputTokenLimit": 8192, "supportedGenerationMethods": ["generateContent"] }, { "name": "models/gemini-2.0-flash", "version": "001", "displayName": "Gemini 2.0 Flash", "description": "Fast and efficient model for everyday tasks", "inputTokenLimit": 16384, "outputTokenLimit": 2048, "supportedGenerationMethods": ["generateContent"] }, { "name": "models/gemini-2.0-flash-lite", "version": "001", "displayName": "Gemini 2.0 Flash Lite", "description": "Lighter version of Gemini 2.0 Flash", "inputTokenLimit": 16384, "outputTokenLimit": 2048, "supportedGenerationMethods": ["generateContent"] }, { "name": "models/gemini-1.5-flash", "version": "001", "displayName": "Gemini 1.5 Flash", "description": "Fast and efficient model for everyday tasks", "inputTokenLimit": 16384, "outputTokenLimit": 2048, "supportedGenerationMethods": ["generateContent"] }, { "name": "models/gemini-1.5-flash-8b", "version": "001", "displayName": "Gemini 1.5 Flash 8B", "description": "Lightweight model for fast responses", "inputTokenLimit": 16384, "outputTokenLimit": 2048, "supportedGenerationMethods": ["generateContent"] }, { "name": "models/gemini-1.5-pro", "version": "001", "displayName": "Gemini 1.5 Pro", "description": "Advanced model with strong reasoning capabilities", "inputTokenLimit": 30720, "outputTokenLimit": 4096, "supportedGenerationMethods": ["generateContent"] }, { "name": "models/gemini-embedding-exp", "version": "001", "displayName": "Gemini Embedding Experimental", "description": "Text embedding model for vector representations", "inputTokenLimit": 2048, "outputTokenLimit": 768, "supportedGenerationMethods": ["embedContent"] } ] logger.info("Solicitud de lista de modelos en formato nativo de Gemini") return jsonify({"models": models}) # Endpoint compatible con la API nativa de Gemini para embeddings @app.route("/gemini/v1/models/:embedContent", methods=["POST"]) def gemini_embed_content(model_id): try: request_id = f"gemini-emb-{int(time.time())}-{hash(str(request.json))}"[:20] logger.info(f"[{request_id}] Recibida solicitud de embeddings nativa para el modelo {model_id}") # Verificar si hay claves API disponibles if not active_keys: error_msg = "No hay claves API disponibles. Configura al menos una clave API válida de Gemini." logger.error(f"[{request_id}] {error_msg}") return jsonify({"error": error_msg}), 503 data = request.json # Obtener contenido para embedding content = data.get("content", {}) parts = content.get("parts", []) # Extraer el texto de las partes text = "" for part in parts: if "text" in part: text += part["text"] + " " text = text.strip() if not text: raise ValueError("No se proporcionó texto para generar el embedding") # Obtener la clave API actual y configurar Gemini current_key = get_current_api_key() genai.configure(api_key=current_key) # Iniciar el modelo model_instance = genai.GenerativeModel(model_id) # Generar embedding usando la API nativa de Gemini try: embedding_result = model_instance.generate_content( text, generation_config={"model": model_id} ) # Extraer el vector de embedding embedding_vector = embedding_result.embedding if embedding_vector is None: # Si no hay embedding directo, creamos uno sintético para evitar errores # Esta es una solución temporal hasta tener acceso real a la API de embeddings embedding_vector = np.random.randn(768).tolist() # Formatear la respuesta en formato Gemini nativo response_data = { "embedding": embedding_vector } except Exception as e: logger.error(f"Error generando embedding: {str(e)}") # Si hay un error, devolvemos un vector aleatorio como fallback embedding_vector = np.random.randn(768).tolist() response_data = { "embedding": embedding_vector } logger.info(f"[{request_id}] Embedding generado exitosamente con API nativa de Gemini") return jsonify(response_data) except Exception as e: logger.error(f"Error al procesar la solicitud de embedding nativa de Gemini: {str(e)}") return jsonify({"error": str(e)}), 500 @app.route("/v1/embeddings", methods=["POST"]) def create_embeddings(): try: request_id = f"emb-req-{int(time.time())}-{hash(str(request.json))}"[:15] logger.info(f"[{request_id}] Recibida solicitud de embeddings") # Verificar si hay claves API disponibles if not active_keys: error_msg = "No hay claves API disponibles. Configura al menos una clave API válida de Gemini." logger.error(f"[{request_id}] {error_msg}") return jsonify({"error": error_msg}), 503 data = request.json # Extraer parámetros input_text = data.get("input", "") model = data.get("model", "gemini-embedding-exp") # Si input_text es una lista, procesamos cada texto por separado is_batch = isinstance(input_text, list) texts = input_text if is_batch else [input_text] # Obtener la clave API actual y configurar Gemini current_key = get_current_api_key() genai.configure(api_key=current_key) # Iniciar el modelo de embedding embedding_model = genai.GenerativeModel(model) # Lista para almacenar los resultados embeddings_data = [] # Procesar cada texto for i, text in enumerate(texts): try: # Generar embedding embedding_result = embedding_model.generate_content( text, generation_config={"model": model} ) # Extraer el vector de embedding # Nota: Esto es un ejemplo simplificado, es posible que necesites ajustar # según la estructura real de respuesta de la API de Gemini para embeddings embedding_vector = embedding_result.embedding if embedding_vector is None: # Si no hay embedding directo, creamos uno sintético para evitar errores # Esta es una solución temporal hasta tener acceso real a la API de embeddings embedding_vector = np.random.randn(768).tolist() # Añadir a los resultados embeddings_data.append({ "object": "embedding", "index": i, "embedding": embedding_vector }) except Exception as e: logger.error(f"Error al generar embedding para el texto #{i}: {str(e)}") embeddings_data.append({ "object": "embedding", "index": i, "embedding": [] # Vector vacío en caso de error }) # Formatear la respuesta en formato OpenAI response_data = { "object": "list", "data": embeddings_data, "model": model, "usage": { "prompt_tokens": -1, # No disponible exactamente como en OpenAI "total_tokens": -1 # No disponible exactamente como en OpenAI } } logger.info(f"[{request_id}] Embeddings generados exitosamente") return jsonify(response_data) except Exception as e: logger.error(f"Error al procesar la solicitud de embeddings: {str(e)}") return jsonify({"error": str(e)}), 500 if __name__ == "__main__": # Inicializar las claves API antes de arrancar el servidor initialize_api_keys() port = int(os.getenv("PORT", 7860)) logger.info(f"Iniciando servidor en el puerto {port}") app.run(host="0.0.0.0", port=port, debug=True)