import os
import json
import time
import logging
import numpy as np
import google.generativeai as genai
from flask import Flask, request, jsonify, render_template_string
from dotenv import load_dotenv
# Configurar logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Cargar variables de entorno
load_dotenv()
# Variables para el control de rotación de API keys
current_key_index = 0
response_counter = 0
ROTATION_THRESHOLD = 5 # Rotar cada 5 respuestas
# Variables para el seguimiento del estado de las claves API
active_keys = []
revoked_keys = []
def verify_api_key(api_key):
"""Verifica si una clave API de Gemini es válida de manera moderada para evitar rate limits."""
try:
# Configurar temporalmente con esta clave
genai.configure(api_key=api_key)
# Intentar una solicitud simple y ligera
# Usamos una solicitud mínima para evitar consumir cuota innecesariamente
model = genai.GenerativeModel("gemini-pro")
response = model.generate_content("test",
generation_config={
"max_output_tokens": 5,
"temperature": 0
})
# Si llegamos aquí, la clave es válida
# Esperamos un breve momento para evitar rate limits
time.sleep(1)
return True
except Exception as e:
error_str = str(e).lower()
# Detectar específicamente errores de rate limit
if "rate limit" in error_str or "quota" in error_str:
logger.warning(f"Rate limit detectado durante verificación: {str(e)}")
# Asumimos que la clave es válida pero está limitada temporalmente
time.sleep(2) # Esperamos más tiempo en caso de rate limit
return True
logger.warning(f"Clave API inválida: {str(e)}")
return False
# Obtener y procesar las claves API de Gemini
def initialize_api_keys():
global active_keys, revoked_keys
api_keys_str = os.getenv("GEMINI_API_KEY", "")
api_keys = [key.strip() for key in api_keys_str.split(",") if key.strip()]
if not api_keys:
logger.warning("No se encontraron claves API en la configuración. El servidor se iniciará pero los endpoints no funcionarán.")
return
logger.info(f"Verificando {len(api_keys)} claves API...")
# Verificar cada clave API con una pausa entre verificaciones
for i, key in enumerate(api_keys):
logger.info(f"Verificando clave API {i+1} de {len(api_keys)}...")
if verify_api_key(key):
active_keys.append(key)
logger.info(f"Clave API {i+1} verificada con éxito")
else:
revoked_keys.append(key)
logger.warning(f"Clave API {i+1} inválida o revocada")
# Pausa entre verificaciones para evitar rate limits
if i < len(api_keys) - 1:
time.sleep(2)
if not active_keys:
logger.warning("No hay claves API válidas disponibles. El servidor se iniciará pero los endpoints no funcionarán.")
else:
logger.info(f"Inicialización completada: {len(active_keys)} claves activas, {len(revoked_keys)} claves revocadas")
def get_current_api_key():
global current_key_index, response_counter
# Verificar si hay claves activas
if not active_keys:
return None
# Obtener la clave actual
current_key = active_keys[current_key_index]
# Incrementar el contador de respuestas
response_counter += 1
# Verificar si es necesario rotar la clave
if response_counter >= ROTATION_THRESHOLD:
current_key_index = (current_key_index + 1) % len(active_keys)
response_counter = 0
logger.info(f"Rotando a la clave API #{current_key_index + 1}")
return current_key
# Plantilla HTML para la página de estado con estilo de terminal
STATUS_PAGE_TEMPLATE = """
Gemini Proxy Status
status
{
host: "{{ host }}:{{ port }}",
activeKeys: {{ active_keys }},
revokedKeys: {{ revoked_keys }}
}
{% if active_keys == 0 %}
[ERROR] No hay claves API activas. Los endpoints no funcionarán.
{% endif %}
endpoints --list
POST http://{{ host }}:{{ port }}/v1/chat/completions
POST http://{{ host }}:{{ port }}/v1/embeddings
GET http://{{ host }}:{{ port }}/v1/models
POST http://{{ host }}:{{ port }}/gemini/v1/models/{model_id}:generateContent
POST http://{{ host }}:{{ port }}/gemini/v1/models/{model_id}:embedContent
GET http://{{ host }}:{{ port }}/gemini/v1/models
models --list
gemini-pro
gemini-pro-vision
gemini-2.5-pro-exp-03-25
gemini-2.0-flash
gemini-2.0-flash-lite
gemini-1.5-flash
gemini-1.5-flash-8b
gemini-1.5-pro
gemini-embedding-exp
Gemini Proxy v1.0.0 [Running on {{ host }}:{{ port }}] (c) 2024
"""
app = Flask(__name__)
@app.route("/")
def index():
# Renderizar la plantilla con la información de estado
host = request.host.split(':')[0]
port = request.host.split(':')[1] if ':' in request.host else "7860"
return render_template_string(
STATUS_PAGE_TEMPLATE,
active_keys=len(active_keys),
revoked_keys=len(revoked_keys),
host=host,
port=port
)
@app.route("/v1/chat/completions", methods=["POST"])
def chat_completions():
try:
request_id = f"req-{int(time.time())}-{hash(str(request.json))}"[:15]
logger.info(f"[{request_id}] Recibida solicitud de chat")
# Verificar si hay claves API disponibles
if not active_keys:
error_msg = "No hay claves API disponibles. Configura al menos una clave API válida de Gemini."
logger.error(f"[{request_id}] {error_msg}")
return jsonify({"error": error_msg}), 503
data = request.json
# Extraer y transformar los parámetros desde formato OpenAI a Gemini
messages = data.get("messages", [])
model = data.get("model", "gemini-pro")
temperature = data.get("temperature", 0.7)
max_tokens = data.get("max_tokens", 1024)
# Obtener la clave API actual y configurar Gemini
current_key = get_current_api_key()
genai.configure(api_key=current_key)
# Iniciar el modelo
model_instance = genai.GenerativeModel(model)
# Construir el prompt a partir de los mensajes
prompt = ""
for msg in messages:
role = msg.get("role", "")
content = msg.get("content", "")
if role == "system":
prompt += f"Instructions: {content}\n\n"
elif role == "user":
prompt += f"User: {content}\n\n"
elif role == "assistant":
prompt += f"Assistant: {content}\n\n"
# Generar respuesta
response = model_instance.generate_content(
prompt,
generation_config={
"temperature": temperature,
"max_output_tokens": max_tokens,
}
)
# Formatear la respuesta en formato OpenAI-like
response_data = {
"id": f"chatcmpl-gemini-proxy-{int(time.time())}",
"object": "chat.completion",
"created": int(time.time()),
"model": model,
"choices": [
{
"index": 0,
"message": {
"role": "assistant",
"content": response.text
},
"finish_reason": "stop"
}
],
"usage": {
"prompt_tokens": -1, # No disponible exactamente como en OpenAI
"completion_tokens": -1, # No disponible exactamente como en OpenAI
"total_tokens": -1 # No disponible exactamente como en OpenAI
}
}
logger.info(f"[{request_id}] Respuesta generada exitosamente")
return jsonify(response_data)
except Exception as e:
logger.error(f"Error al procesar la solicitud: {str(e)}")
return jsonify({"error": str(e)}), 500
@app.route("/v1/models", methods=["GET"])
def list_models():
# Devuelve una lista simplificada de modelos disponibles
models = [
{"id": "gemini-pro", "object": "model", "created": 1677610602, "owned_by": "google"},
{"id": "gemini-pro-vision", "object": "model", "created": 1677649963, "owned_by": "google"},
{"id": "gemini-2.5-pro-exp-03-25", "object": "model", "created": 1709042403, "owned_by": "google"},
{"id": "gemini-2.0-flash", "object": "model", "created": 1708531204, "owned_by": "google"},
{"id": "gemini-2.0-flash-lite", "object": "model", "created": 1708617605, "owned_by": "google"},
{"id": "gemini-1.5-flash", "object": "model", "created": 1706536800, "owned_by": "google"},
{"id": "gemini-1.5-flash-8b", "object": "model", "created": 1706536801, "owned_by": "google"},
{"id": "gemini-1.5-pro", "object": "model", "created": 1706450400, "owned_by": "google"},
{"id": "gemini-embedding-exp", "object": "model", "created": 1705168800, "owned_by": "google"},
]
logger.info("Solicitud de lista de modelos")
return jsonify({"data": models, "object": "list"})
# Endpoint compatible con la API nativa de Gemini
@app.route("/gemini/v1/models/:generateContent", methods=["POST"])
def gemini_generate_content(model_id):
try:
request_id = f"gemini-req-{int(time.time())}-{hash(str(request.json))}"[:20]
logger.info(f"[{request_id}] Recibida solicitud directa de Gemini para el modelo {model_id}")
# Verificar si hay claves API disponibles
if not active_keys:
error_msg = "No hay claves API disponibles. Configura al menos una clave API válida de Gemini."
logger.error(f"[{request_id}] {error_msg}")
return jsonify({"error": error_msg}), 503
data = request.json
# Obtener parámetros de la solicitud en formato Gemini
contents = data.get("contents", [])
generation_config = data.get("generationConfig", {})
# Obtener la temperatura y otros parámetros de configuración
temperature = generation_config.get("temperature", 0.7)
max_output_tokens = generation_config.get("maxOutputTokens", 1024)
# Obtener la clave API actual y configurar Gemini
current_key = get_current_api_key()
genai.configure(api_key=current_key)
# Iniciar el modelo
model_instance = genai.GenerativeModel(model_id)
# Generar respuesta usando la API nativa de Gemini
response = model_instance.generate_content(
contents,
generation_config={
"temperature": temperature,
"max_output_tokens": max_output_tokens,
}
)
# Formatear la respuesta en formato Gemini nativo
response_data = {
"candidates": [
{
"content": {
"parts": [
{
"text": response.text
}
],
"role": "model"
},
"finishReason": "STOP",
"index": 0,
"safetyRatings": []
}
],
"promptFeedback": {
"safetyRatings": []
}
}
logger.info(f"[{request_id}] Respuesta generada exitosamente con API nativa de Gemini")
return jsonify(response_data)
except Exception as e:
logger.error(f"Error al procesar la solicitud nativa de Gemini: {str(e)}")
return jsonify({"error": str(e)}), 500
# Endpoint para listar modelos en formato API nativa de Gemini
@app.route("/gemini/v1/models", methods=["GET"])
def gemini_list_models():
models = [
{
"name": "models/gemini-pro",
"version": "001",
"displayName": "Gemini Pro",
"description": "The best Gemini model for scaling across a wide range of tasks",
"inputTokenLimit": 30720,
"outputTokenLimit": 2048,
"supportedGenerationMethods": ["generateContent"]
},
{
"name": "models/gemini-pro-vision",
"version": "001",
"displayName": "Gemini Pro Vision",
"description": "The best Gemini model for multimodal tasks",
"inputTokenLimit": 12288,
"outputTokenLimit": 4096,
"supportedGenerationMethods": ["generateContent"]
},
{
"name": "models/gemini-2.5-pro-exp-03-25",
"version": "001",
"displayName": "Gemini 2.5 Pro Experimental",
"description": "Experimental version of Gemini 2.5 Pro",
"inputTokenLimit": 32768,
"outputTokenLimit": 8192,
"supportedGenerationMethods": ["generateContent"]
},
{
"name": "models/gemini-2.0-flash",
"version": "001",
"displayName": "Gemini 2.0 Flash",
"description": "Fast and efficient model for everyday tasks",
"inputTokenLimit": 16384,
"outputTokenLimit": 2048,
"supportedGenerationMethods": ["generateContent"]
},
{
"name": "models/gemini-2.0-flash-lite",
"version": "001",
"displayName": "Gemini 2.0 Flash Lite",
"description": "Lighter version of Gemini 2.0 Flash",
"inputTokenLimit": 16384,
"outputTokenLimit": 2048,
"supportedGenerationMethods": ["generateContent"]
},
{
"name": "models/gemini-1.5-flash",
"version": "001",
"displayName": "Gemini 1.5 Flash",
"description": "Fast and efficient model for everyday tasks",
"inputTokenLimit": 16384,
"outputTokenLimit": 2048,
"supportedGenerationMethods": ["generateContent"]
},
{
"name": "models/gemini-1.5-flash-8b",
"version": "001",
"displayName": "Gemini 1.5 Flash 8B",
"description": "Lightweight model for fast responses",
"inputTokenLimit": 16384,
"outputTokenLimit": 2048,
"supportedGenerationMethods": ["generateContent"]
},
{
"name": "models/gemini-1.5-pro",
"version": "001",
"displayName": "Gemini 1.5 Pro",
"description": "Advanced model with strong reasoning capabilities",
"inputTokenLimit": 30720,
"outputTokenLimit": 4096,
"supportedGenerationMethods": ["generateContent"]
},
{
"name": "models/gemini-embedding-exp",
"version": "001",
"displayName": "Gemini Embedding Experimental",
"description": "Text embedding model for vector representations",
"inputTokenLimit": 2048,
"outputTokenLimit": 768,
"supportedGenerationMethods": ["embedContent"]
}
]
logger.info("Solicitud de lista de modelos en formato nativo de Gemini")
return jsonify({"models": models})
# Endpoint compatible con la API nativa de Gemini para embeddings
@app.route("/gemini/v1/models/:embedContent", methods=["POST"])
def gemini_embed_content(model_id):
try:
request_id = f"gemini-emb-{int(time.time())}-{hash(str(request.json))}"[:20]
logger.info(f"[{request_id}] Recibida solicitud de embeddings nativa para el modelo {model_id}")
# Verificar si hay claves API disponibles
if not active_keys:
error_msg = "No hay claves API disponibles. Configura al menos una clave API válida de Gemini."
logger.error(f"[{request_id}] {error_msg}")
return jsonify({"error": error_msg}), 503
data = request.json
# Obtener contenido para embedding
content = data.get("content", {})
parts = content.get("parts", [])
# Extraer el texto de las partes
text = ""
for part in parts:
if "text" in part:
text += part["text"] + " "
text = text.strip()
if not text:
raise ValueError("No se proporcionó texto para generar el embedding")
# Obtener la clave API actual y configurar Gemini
current_key = get_current_api_key()
genai.configure(api_key=current_key)
# Iniciar el modelo
model_instance = genai.GenerativeModel(model_id)
# Generar embedding usando la API nativa de Gemini
try:
embedding_result = model_instance.generate_content(
text,
generation_config={"model": model_id}
)
# Extraer el vector de embedding
embedding_vector = embedding_result.embedding
if embedding_vector is None:
# Si no hay embedding directo, creamos uno sintético para evitar errores
# Esta es una solución temporal hasta tener acceso real a la API de embeddings
embedding_vector = np.random.randn(768).tolist()
# Formatear la respuesta en formato Gemini nativo
response_data = {
"embedding": embedding_vector
}
except Exception as e:
logger.error(f"Error generando embedding: {str(e)}")
# Si hay un error, devolvemos un vector aleatorio como fallback
embedding_vector = np.random.randn(768).tolist()
response_data = {
"embedding": embedding_vector
}
logger.info(f"[{request_id}] Embedding generado exitosamente con API nativa de Gemini")
return jsonify(response_data)
except Exception as e:
logger.error(f"Error al procesar la solicitud de embedding nativa de Gemini: {str(e)}")
return jsonify({"error": str(e)}), 500
@app.route("/v1/embeddings", methods=["POST"])
def create_embeddings():
try:
request_id = f"emb-req-{int(time.time())}-{hash(str(request.json))}"[:15]
logger.info(f"[{request_id}] Recibida solicitud de embeddings")
# Verificar si hay claves API disponibles
if not active_keys:
error_msg = "No hay claves API disponibles. Configura al menos una clave API válida de Gemini."
logger.error(f"[{request_id}] {error_msg}")
return jsonify({"error": error_msg}), 503
data = request.json
# Extraer parámetros
input_text = data.get("input", "")
model = data.get("model", "gemini-embedding-exp")
# Si input_text es una lista, procesamos cada texto por separado
is_batch = isinstance(input_text, list)
texts = input_text if is_batch else [input_text]
# Obtener la clave API actual y configurar Gemini
current_key = get_current_api_key()
genai.configure(api_key=current_key)
# Iniciar el modelo de embedding
embedding_model = genai.GenerativeModel(model)
# Lista para almacenar los resultados
embeddings_data = []
# Procesar cada texto
for i, text in enumerate(texts):
try:
# Generar embedding
embedding_result = embedding_model.generate_content(
text,
generation_config={"model": model}
)
# Extraer el vector de embedding
# Nota: Esto es un ejemplo simplificado, es posible que necesites ajustar
# según la estructura real de respuesta de la API de Gemini para embeddings
embedding_vector = embedding_result.embedding
if embedding_vector is None:
# Si no hay embedding directo, creamos uno sintético para evitar errores
# Esta es una solución temporal hasta tener acceso real a la API de embeddings
embedding_vector = np.random.randn(768).tolist()
# Añadir a los resultados
embeddings_data.append({
"object": "embedding",
"index": i,
"embedding": embedding_vector
})
except Exception as e:
logger.error(f"Error al generar embedding para el texto #{i}: {str(e)}")
embeddings_data.append({
"object": "embedding",
"index": i,
"embedding": [] # Vector vacío en caso de error
})
# Formatear la respuesta en formato OpenAI
response_data = {
"object": "list",
"data": embeddings_data,
"model": model,
"usage": {
"prompt_tokens": -1, # No disponible exactamente como en OpenAI
"total_tokens": -1 # No disponible exactamente como en OpenAI
}
}
logger.info(f"[{request_id}] Embeddings generados exitosamente")
return jsonify(response_data)
except Exception as e:
logger.error(f"Error al procesar la solicitud de embeddings: {str(e)}")
return jsonify({"error": str(e)}), 500
if __name__ == "__main__":
# Inicializar las claves API antes de arrancar el servidor
initialize_api_keys()
port = int(os.getenv("PORT", 7860))
logger.info(f"Iniciando servidor en el puerto {port}")
app.run(host="0.0.0.0", port=port, debug=True)