MCP_ClaimVerificationSystem / kognieLlama.py
kindler-king's picture
Upload 4 files
f47ddc5 verified
import requests
from typing import List, Optional, Sequence, Any, AsyncGenerator
from llama_index.legacy.llms import LLM, LLMMetadata
from llama_index.legacy.llms.types import ChatMessage
from llama_index.core.llms.callbacks import llm_chat_callback, llm_completion_callback
from llama_index.core.base.llms.types import ChatMessage, ChatResponse, CompletionResponseAsyncGen, ChatResponseAsyncGen, MessageRole, CompletionResponse, CompletionResponseGen
from llama_index.core import SimpleDirectoryReader, VectorStoreIndex
class Kognie(LLM):
"""
A custom LLM that calls a FastAPI server at /text endpoint.
"""
base_url: str = 'http://api2.kognie.com'
api_key: str
model: str
response_format: str = 'url'
@property
def metadata(self) -> LLMMetadata:
# Provide info about your model to LlamaIndex (adjust as needed)
return LLMMetadata(
model_name=self.model
)
def _generate_text(
self,
prompt: str,
model: Optional[str] = None,
**kwargs
) -> str:
"""
The single-turn text generation method.
LlamaIndex calls `_generate_text` internally whenever it needs a completion.
"""
# Decide on mode and model to use, falling back to defaults
selected_model = model if model else self.model
endpoint = f"{self.base_url}/text"
# Prepare GET request parameters
params = {
"question": prompt,
"model": selected_model
}
# Prepare HTTP headers
headers = {
"X-KEY": self.api_key
}
try:
# Send request
response = requests.get(endpoint, params=params, headers=headers)
response.raise_for_status()
except requests.HTTPError as exc:
raise ValueError(f"FastAPI /text endpoint error: {exc}") from exc
data = response.json()
text_output = data.get("response", "")
return text_output
def _generate_image(
self,
prompt: str,
model: str,
response_format: str,
**kwargs
) -> str:
"""
The single-turn text generation method.
LlamaIndex calls `_generate_text` internally whenever it needs a completion.
"""
# Decide on mode and model to use, falling back to defaults
selected_model = model if model else self.model
endpoint = f"{self.base_url}/image"
# Prepare GET request parameters
params = {
"question": prompt,
"model": selected_model,
"response_format": response_format
}
# Prepare HTTP headers
headers = {
"X-KEY": self.api_key
}
try:
# Send request
response = requests.get(endpoint, params=params, headers=headers)
response.raise_for_status()
except requests.HTTPError as exc:
raise ValueError(f"FastAPI /text endpoint error: {exc}") from exc
# Parse JSON
data = response.json()
text_output = data.get("response", "")
return text_output
def generate_img(
self,
prompt: str,
model: str,
response_format: str,
) -> ChatMessage:
img_output = self._generate_image(
prompt=prompt,
model=model,
response_format=response_format
)
return ChatMessage(role="assistant", content=img_output)
# (Optional) Multi-turn chat approach
def chat(
self,
messages: List[ChatMessage],
model: Optional[str] = None,
**kwargs
) -> ChatMessage:
"""
If you want to handle multi-turn chat style conversation, override this method.
In LlamaIndex, some indices or chat modules might call `chat(messages=...)`.
"""
# Merge messages into a single prompt
# e.g. if you want to pass a conversation log:
conversation_log = ""
for m in messages:
role = m.role # "system", "user", or "assistant"
content = m.content
if role == "user":
conversation_log += f"User: {content}\n"
else:
conversation_log += f"{role.capitalize()}: {content}\n"
# Now just call your single-turn generation on the entire conversation log
# This is simplistic; you can implement more advanced chat logic if needed
text_output = self._generate_text(
prompt=conversation_log,
model=model,
**kwargs
)
return ChatMessage(role="assistant", content=text_output)
@llm_chat_callback()
def messages_to_prompt(messages):
prompt = ""
for message in messages:
if message.role == MessageRole.SYSTEM:
prompt += f"<|system|>\n(message.content)</s>\n"
elif message.role == MessageRole.USER:
prompt += f"<|user|>\n{message.content}</s>\n"
elif message.role == MessageRole.ASSISTANT:
prompt += f"<|assistant|>\n{message.content}</s>\n"
# Ensure the prompt starts with a system message
if not prompt.startswith("<|system|>\n"):
prompt = "<|system|>\n</s>\n" + prompt
# Add a final assistant prompt
prompt += "<|assistant|>\n"
return prompt
async def stream_chat(self, messages: Sequence[ChatMessage], **kwargs: Any) -> AsyncGenerator[ChatResponse, None]:
# Assume `astream_complete` is an async method that yields CompletionResponse objects
async for completion_response in self.astream_complete(self.messages_to_prompt(messages), **kwargs):
# Here, you manually convert each CompletionResponse to a ChatResponse
chat_response = self.convert_completion_to_chat(
completion_response)
yield chat_response
async def astream_complete(self, prompt: str, **kwargs: Any) -> AsyncGenerator[CompletionResponse, None]:
# Implement your logic to asynchronously stream completion responses
pass
def convert_completion_to_chat(self, completion_response: CompletionResponse) -> ChatResponse:
# Implement your conversion logic here
# For simplicity, we're directly using the completion text as the chat content
return ChatResponse(message=ChatMessage(role="assistant", content=completion_response.text))
@llm_chat_callback()
async def achat(
self,
messages: Sequence[ChatMessage],
**kwargs: Any,
) -> ChatResponse:
return self.chat(messages, **kwargs)
@llm_chat_callback()
async def astream_chat(
self,
messages: Sequence[ChatMessage],
**kwargs: Any,
) -> ChatResponseAsyncGen:
async def gen() -> ChatResponseAsyncGen:
for message in self.stream_chat(messages, **kwargs):
yield message
# NOTE: convert generator to async generator
return gen()
@llm_completion_callback()
async def acomplete(
self, prompt: str, formatted: bool = False, **kwargs: Any
) -> CompletionResponse:
return self.complete(prompt, formatted=formatted, **kwargs)
@llm_completion_callback()
def complete(
self, prompt: str, formatted: bool = False, **kwargs: Any
) -> CompletionResponse:
return self.complete(prompt, formatted=formatted, **kwargs)
@llm_completion_callback()
async def astream_complete(
self, prompt: str, formatted: bool = False, **kwargs: Any
) -> CompletionResponseAsyncGen:
async def gen() -> CompletionResponseAsyncGen:
for message in self.stream_complete(prompt, formatted=formatted, **kwargs):
yield message
# NOTE: convert generator to async generator
return gen()
@llm_completion_callback()
def stream_complete(
self, prompt: str, formatted: bool = False, **kwargs: Any
) -> CompletionResponseGen:
def gen() -> CompletionResponseGen:
for message in self.stream_complete(prompt, formatted=formatted, **kwargs):
yield message
return gen()
@classmethod
def class_name(cls) -> str:
return "custom_llm"
# # 1) Initialize your custom LLM
# custom_llm = Kognie(
# api_key="kg-qnA0uVr4MbJmDtpuyQEmnZWnwe6RkZjF",
# model="gpt-4o-mini"
# )
# answer = custom_llm.chat(messages=[ChatMessage(role="user", content="Who was the first president of the United States?")])
# print(answer)
# answer = custom_llm.generate_img(prompt='a dog', model='flux-pro-1.1', response_format='url')
# documents = SimpleDirectoryReader("./data").load_data()
# vector_index = VectorStoreIndex.from_documents(documents)
# query_engine = vector_index.as_query_engine()
# answer = query_engine.query(
# "what is the documents about?"
# )
# print(answer)