Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, HTTPException | |
| from pydantic import BaseModel | |
| import chromadb | |
| from langchain_community.vectorstores import Chroma | |
| from langchain_openai import OpenAIEmbeddings | |
| import os | |
| from openai import OpenAI | |
| import zipfile | |
| from typing import Literal | |
| import json | |
| import requests | |
| # Inicializar FastAPI | |
| app = FastAPI() | |
| # Configurar la API Key de OpenAI | |
| OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") | |
| ACCESS_TOKEN = os.getenv("ACCESS_TOKEN") | |
| PHONE_NUMBER_ID = os.getenv("PHONE_NUMBER_ID") | |
| RECIPIENT_NUMBER = os.getenv("RECIPIENT_NUMBER") | |
| # Inicializar el cliente de OpenAI | |
| client = OpenAI(api_key=OPENAI_API_KEY) | |
| # Inicializar el cliente de ChromaDB en Hugging Face Space | |
| chroma_client = chromadb.PersistentClient(path="chroma_db") | |
| # Cargar la base de datos de Chroma como un vector store | |
| vectorstore = Chroma( | |
| client=chroma_client, | |
| collection_name="docs", | |
| embedding_function=OpenAIEmbeddings(model="text-embedding-3-small", openai_api_key=OPENAI_API_KEY) | |
| ) | |
| # Crear un retriever | |
| retriever = vectorstore.as_retriever() | |
| def obtener_extractos(pregunta: str): | |
| """Obtiene documentos relevantes desde ChromaDB y los formatea como texto plano""" | |
| docs_relevantes = retriever.invoke(pregunta) | |
| if not docs_relevantes: | |
| return "No se encontró información relevante en la base de datos." | |
| extractos = [] | |
| for i, doc in enumerate(docs_relevantes, start=1): | |
| contenido = doc.page_content.strip() | |
| url = doc.metadata.get("url", "URL no disponible") | |
| extractos.append(f"🔹 Extracto {i}:\n{contenido}\n🔗 Fuente: {url}") | |
| return "\n\n".join(extractos) | |
| def enviar_contacto(access_token: str, phone_number_id: str, recipient_number: str, formatted_name: str, first_name: str): | |
| url = f"https://graph.facebook.com/v19.0/{phone_number_id}/messages" | |
| headers = { | |
| 'Authorization': f'Bearer {access_token}', | |
| 'Content-Type': 'application/json' | |
| } | |
| data = { | |
| "messaging_product": "whatsapp", | |
| "to": recipient_number, | |
| "type": "contacts", | |
| "contacts": [ | |
| { | |
| "name": { | |
| "formatted_name": formatted_name, | |
| "first_name": first_name | |
| } | |
| } | |
| ] | |
| } | |
| response = requests.post(url, headers=headers, json=data) | |
| print(response.json) | |
| return response.json() | |
| def enviar_ubicacion(access_token: str, phone_number_id: str, recipient_number: str, latitude: str, longitude: str): | |
| url = f"https://graph.facebook.com/v19.0/{phone_number_id}/messages" | |
| headers = { | |
| 'Authorization': f'Bearer {access_token}', | |
| 'Content-Type': 'application/json' | |
| } | |
| data = { | |
| "messaging_product": "whatsapp", | |
| "recipient_type": "individual", | |
| "to": recipient_number, | |
| "type": "location", | |
| "location": { | |
| "latitude": latitude, | |
| "longitude": longitude | |
| } | |
| } | |
| response = requests.post(url, headers=headers, json=data) | |
| return response.json() | |
| def handle_tool_call(tool_call, contexto): | |
| """ | |
| Genera los mensajes necesarios tras una llamada a una tool: | |
| - assistant con tool_call | |
| - tool con respuesta de la función | |
| Retorna una lista con ambos mensajes para agregarlos al historial. | |
| """ | |
| tool_call_id = tool_call.id | |
| function_name = tool_call.function.name | |
| # Asegurar que los argumentos son string JSON, no dict | |
| arguments = ( | |
| json.dumps(tool_call.function.arguments) | |
| if isinstance(tool_call.function.arguments, dict) | |
| else tool_call.function.arguments | |
| ) | |
| return [ | |
| { | |
| "role": "assistant", | |
| "content": None, | |
| "tool_calls": [ | |
| { | |
| "id": tool_call_id, | |
| "type": "function", | |
| "function": { | |
| "name": function_name, | |
| "arguments": arguments | |
| } | |
| } | |
| ] | |
| }, | |
| { | |
| "role": "tool", | |
| "tool_call_id": tool_call_id, | |
| "name": function_name, | |
| "content": contexto | |
| } | |
| ] | |
| # Modelo de datos para la solicitud | |
| class ChatRequest(BaseModel): | |
| message: str | |
| system_message: str = "Eres un asistente virtual." | |
| max_tokens: int = 512 | |
| temperature: float = 0.7 | |
| top_p: float = 0.95 | |
| async def chat(request: ChatRequest): | |
| messages = [ | |
| {"role": "system", "content": request.system_message}, | |
| {"role": "user", "content": request.message} | |
| ] | |
| tools = [ | |
| { | |
| "type": "function", | |
| "function": { | |
| "name": "enviar_contacto", | |
| "description": "Envía el contacto de WhatsApp de soporte", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "formatted_name": {"type": "string"}, | |
| "first_name": {"type": "string"} | |
| }, | |
| "required": ["formatted_name", "first_name"] | |
| } | |
| } | |
| }, | |
| { | |
| "type": "function", | |
| "function": { | |
| "name": "enviar_ubicacion", | |
| "description": "Envía la ubicación de las oficinas de WipsHub", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "latitude": {"type": "string"}, | |
| "longitude": {"type": "string"} | |
| }, | |
| "required": ["latitude", "longitude"] | |
| } | |
| } | |
| }, | |
| { | |
| "type": "function", | |
| "function": { | |
| "name": "obtener_extractos", | |
| "description": "Busca información relevante en la base de datos vectorial con manuales de WipsHub.", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "query": {"type": "string"} | |
| }, | |
| "required": ["query"] | |
| } | |
| } | |
| } | |
| ] | |
| try: | |
| response = client.chat.completions.create( | |
| model="gpt-4o-mini", | |
| messages=messages, | |
| tools=tools, | |
| tool_choice="auto", | |
| max_tokens=request.max_tokens, | |
| temperature=request.temperature, | |
| top_p=request.top_p | |
| ) | |
| choice = response.choices[0] | |
| # Si el modelo decide usar una función/tool | |
| if choice.message.tool_calls: | |
| for tool_call in choice.message.tool_calls: | |
| name = tool_call.function.name | |
| args = json.loads(tool_call.function.arguments) | |
| if name == "obtener_extractos": | |
| contexto = obtener_extractos(args["query"]) | |
| # Generar los mensajes necesarios | |
| tool_messages = handle_tool_call(tool_call, contexto) | |
| messages.extend(tool_messages) | |
| # Segunda llamada con nuevo contexto | |
| response = client.chat.completions.create( | |
| model="gpt-4o-mini", | |
| messages=messages, | |
| max_tokens=request.max_tokens, | |
| temperature=request.temperature, | |
| top_p=request.top_p | |
| ) | |
| return {"response": response.choices[0].message.content, "context": contexto} | |
| elif name == "enviar_contacto": | |
| result = enviar_contacto( | |
| ACCESS_TOKEN, PHONE_NUMBER_ID, RECIPIENT_NUMBER, | |
| formatted_name="Pedro J. Johnson", | |
| first_name="Pedro" | |
| ) | |
| return {"response": "✅ Contacto enviado.", "result": result} | |
| elif name == "enviar_ubicacion": | |
| result = enviar_ubicacion( | |
| ACCESS_TOKEN, PHONE_NUMBER_ID, RECIPIENT_NUMBER, | |
| latitude="37.44216251868683", | |
| longitude="-122.16153582049394" | |
| ) | |
| return {"response": "📍 Ubicación enviada.", "result": result} | |
| # Si no se usa ninguna tool, simplemente devuelve la respuesta | |
| return {"response": choice.message.content} | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| # Punto de entrada para ejecutar con Uvicorn en Hugging Face | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=7860) | |