import os import traceback import numpy as np import gradio as gr from openai import AsyncOpenAI from langsmith import traceable from sklearn.metrics.pairwise import cosine_similarity from src.prompts import system_prompt # from src.name_extractor import extract_name_gliner from src.models import CacheEntry from src.config import Config from src.utils import FileReader # --------------------------------------------------------------------- # CHAT CLASS # --------------------------------------------------------------------- class MyProfileAvatarChat(Config, FileReader): def __init__(self, max_history_turns: int = 10, similarity_thresh: float = 0.80): Config.__init__(self) FileReader.__init__(self) # 1. Try to load from env self.name = os.getenv("PROFIL_NAME") # if not self.name: # name = extract_name_gliner(self.linkedin_profile) # self.name = name["person"][0] # print(f"Name found on Linkedin profile: {self.name}") self.openai = AsyncOpenAI(api_key=self.openai_api_key) # Build system prompt once self.system_prompt = system_prompt self.system_prompt += f"## Linkedin Profile:\n{self.linkedin_profile}\n\n" self.system_prompt += f"## Addidional Information:\n{self.additional_info}\n\n" self.system_prompt += f"With this context, please chat with user, always staying in character as {self.name}." # Settings self.max_history_turns = max_history_turns self.similarity_threshold = similarity_thresh # QA cache (question -> answer -> embedding) self.qa_cache = [] # list of dict: {"question": str, "answer": str, "embedding": np.array} def format_history(self, history): return "\n".join(f"{turn['role'].upper()}: {turn['content']}" for turn in history) async def embed(self, text: str): """Return embedding vector for text (uses OpenAI embeddings).""" resp = await self.openai.embeddings.create( model="text-embedding-3-small", input=text ) return np.array(resp.data[0].embedding) def cosine_sim(self, a: np.ndarray, b: np.ndarray) -> float: return float(cosine_similarity(a.reshape(1, -1), b.reshape(1, -1))[0][0]) async def find_similar_question(self, new_question: str): if not self.qa_cache: return None, 0.0 new_emb = await self.embed(new_question) best = None best_sim = 0.0 for item in self.qa_cache: sim = self.cosine_sim(new_emb, item["embedding"]) if sim > best_sim: best_sim = sim best = item if best and best_sim >= self.similarity_threshold: return best, best_sim return None, best_sim async def chat(self, message: str, history: list, **kwargs): """Main chat. Uses semantic QA cache and sliding window for tokens Args: message: user message string history: existing list of dicts [{"role":...., "content":....}] Returns: reply string """ # Cache exact-match short-circuit if message in (qa["question"] for qa in self.qa_cache): # exact match for qa in self.qa_cache: if qa["question"] == message: print("Using exact cached reply") history.append({"role": "user", "content": message}) history.append({"role": "assistant", "content": qa["answer"]}) return qa["answer"] # Check for semantically similar previous question similar, sim_score = await self.find_similar_question(message) if similar: print(f"Reusing past answer (similarity={sim_score:.2%})") refine_prompt = ( f"The user previously asked a similar question:\n" + f"Old question: {similar['question']}\n" + f"Old answer: {similar['answer']}\n\n" + f"Now user asks: {message}\n\n" + f"Please update or refine the old answer to match the new question." ) messages = [{"role": "system", "content": self.system_prompt}, {"role": "user", "content": refine_prompt}] try: response = await self.openai.chat.completions.create( model="gpt-4o-mini", messages=messages ) reply = response.choices[0].message.content except Exception as e: print(f"Error calling OpenAI for refinement: {e}") reply = similar["answer"] else: # Build token-efficent context (sliding window) temp_history = history + [{"role": "user", "content": message}] context_for_api = temp_history[-self.max_history_turns:] messages = [{"role": "system", "content": self.system_prompt}] + context_for_api try: response = await self.openai.chat.completions.create( model="gpt-4o-mini", messages=messages ) reply = response.choices[0].message.content except Exception as e: print(f"Error calling OpenAI: {e}") try: emb = await self.embed(message) except Exception as e: print(f"Embedding Error: {e}") traceback.print_exc() emb = None self.qa_cache.append({ "question":message, "answer":reply, "embedding":emb }) return reply @traceable(run_type="chain", name="ProfileChat") async def chat_traced(self, *args, **kwargs): """Wrapper for LangSmith tracing. Accepts any extra arguments (like from Gradio) and passes only message/history to chat().""" if len(args) >=2: message, history = args[0], args[1] else: message = kwargs.get("message") history = kwargs.get("history") return await self.chat(message, history) if __name__ == "__main__": my_profile = MyProfileAvatarChat() with gr.Blocks() as demo: # Per-user chat history state state = gr.State([]) # Chat interface chat = gr.ChatInterface( my_profile.chat_traced ) demo.queue(max_size=10).launch( server_name="0.0.0.0", show_error=8000, share=True )