|
|
import traceback |
|
|
from typing import Dict, Any, List |
|
|
from llama_cpp import Llama |
|
|
|
|
|
from langchain_core.language_models.llms import LLM |
|
|
from langchain.chains import ConversationalRetrievalChain |
|
|
from langchain.memory import ConversationBufferMemory |
|
|
from langchain_community.embeddings import HuggingFaceEmbeddings |
|
|
from langchain_community.vectorstores import Chroma |
|
|
from langchain_core.prompts import PromptTemplate |
|
|
|
|
|
class LlamaLangChain(LLM): |
|
|
llama_instance: Llama |
|
|
@property |
|
|
def _llm_type(self) -> str: return "custom" |
|
|
def _call(self, prompt: str, stop: List[str] | None = None, **kwargs) -> str: |
|
|
response = self.llama_instance(prompt, max_tokens=512, stop=stop, stream=False, echo=False) |
|
|
return response["choices"][0]["text"] |
|
|
async def _acall(self, prompt: str, stop: List[str] | None = None, **kwargs) -> str: |
|
|
return self._call(prompt, stop, **kwargs) |
|
|
|
|
|
def format_docs(docs): |
|
|
return "\n\n".join(doc.page_content for doc in docs) |
|
|
|
|
|
class SupportAgent: |
|
|
def __init__(self, llm_instance: Llama, embedding_path: str, db_path: str): |
|
|
print("--- Initializing Support Agent (Final Version) ---") |
|
|
if llm_instance is None: raise ValueError("SupportAgent received an invalid LLM instance.") |
|
|
self.langchain_llm_wrapper = LlamaLangChain(llama_instance=llm_instance) |
|
|
self.embeddings = HuggingFaceEmbeddings(model_name=embedding_path) |
|
|
self.vector_store = Chroma(persist_directory=db_path, embedding_function=self.embeddings) |
|
|
self.conversations: Dict[str, ConversationBufferMemory] = {} |
|
|
print("β
Agent and core components initialized successfully.") |
|
|
|
|
|
def _get_or_create_memory(self, conversation_id: str) -> ConversationBufferMemory: |
|
|
if conversation_id not in self.conversations: |
|
|
self.conversations[conversation_id] = ConversationBufferMemory(memory_key="chat_history", return_messages=True, input_key="question", output_key='answer') |
|
|
return self.conversations[conversation_id] |
|
|
|
|
|
def answer(self, payload: dict, conversation_id: str) -> dict: |
|
|
question = payload.get("question", "") |
|
|
live_data_context = payload.get("live_data", "") |
|
|
user_role = payload.get("role", "user") |
|
|
|
|
|
memory = self._get_or_create_memory(conversation_id) |
|
|
|
|
|
try: |
|
|
|
|
|
human_friendly_template = """You are Sparky, a helpful AI assistant for Reachify. |
|
|
Your job is to provide a direct and concise answer to the user's question. |
|
|
Use the Live Data and Context to find the answer. Do not talk about yourself. |
|
|
|
|
|
**Live Data (Facts from the user's account):** |
|
|
{live_data} |
|
|
|
|
|
**Context (General Knowledge):** |
|
|
{context} |
|
|
|
|
|
**Chat History:** |
|
|
{chat_history} |
|
|
|
|
|
**User's Question:** {question} |
|
|
|
|
|
**Direct Answer:** |
|
|
""" |
|
|
|
|
|
final_prompt = PromptTemplate.from_template(human_friendly_template) |
|
|
|
|
|
|
|
|
retriever = self.vector_store.as_retriever() |
|
|
|
|
|
qa_chain = ConversationalRetrievalChain.from_llm( |
|
|
llm=self.langchain_llm_wrapper, |
|
|
retriever=retriever, |
|
|
memory=memory, |
|
|
combine_docs_chain_kwargs={"prompt": final_prompt} |
|
|
) |
|
|
|
|
|
result = qa_chain.invoke({ |
|
|
"question": question, |
|
|
"live_data": live_data_context |
|
|
}) |
|
|
|
|
|
raw_answer = result.get("answer", "I'm sorry, I could not find an answer.").strip() |
|
|
|
|
|
final_answer = raw_answer.split("Answer:")[0].split("Direct Answer:")[0].strip() |
|
|
|
|
|
return {"response": final_answer, "context": format_docs(result.get('source_documents', []))} |
|
|
|
|
|
except Exception as e: |
|
|
traceback.print_exc() |
|
|
return {"response": "A critical server error occurred in the AI agent.", "context": str(e)} |
|
|
|
|
|
|
|
|
def generate_caption_variant(self, caption: str, action: str) -> str: |
|
|
|
|
|
|
|
|
if not self.langchain_llm_wrapper: |
|
|
return "Error: The AI model is not available." |
|
|
|
|
|
system_prompt = ( |
|
|
"You are an expert social media copywriter..." |
|
|
) |
|
|
if action == 'improve_writing': |
|
|
user_instruction = "Improve the writing..." |
|
|
elif action == 'make_punchier': |
|
|
user_instruction = "Make it punchier..." |
|
|
elif action == 'generate_alternatives': |
|
|
user_instruction = "Generate three new, creative..." |
|
|
else: |
|
|
return "Error: Invalid action specified." |
|
|
|
|
|
final_prompt = f"[SYSTEM INSTRUCTIONS]..." |
|
|
|
|
|
try: |
|
|
response = self.langchain_llm_wrapper._call(final_prompt) |
|
|
return response.strip() |
|
|
except Exception as e: |
|
|
traceback.print_exc() |
|
|
return f"An error occurred while generating the caption." |
|
|
|
|
|
def generate_marketing_strategy(self, prompt: str) -> str: |
|
|
if not self.langchain_llm_wrapper: return "Error: The AI model is not available." |
|
|
try: |
|
|
return self.langchain_llm_wrapper._call(prompt) |
|
|
except Exception as e: |
|
|
traceback.print_exc() |
|
|
return f"An error occurred: {e}" |
|
|
|
|
|
def generate_content_outline(self, title: str) -> str: |
|
|
if not self.langchain_llm_wrapper: return "Error: The AI model is not available." |
|
|
prompt = f"""You are a professional content writer... |
|
|
**Title:** "{title}" |
|
|
**Your Outline:** |
|
|
""" |
|
|
try: |
|
|
return self.langchain_llm_wrapper._call(prompt) |
|
|
except Exception as e: |
|
|
traceback.print_exc() |
|
|
return f"An error occurred: {e}" |