Spaces:
Running
Running
| import os | |
| import traceback | |
| import numpy as np | |
| import gradio as gr | |
| from openai import AsyncOpenAI | |
| from langsmith import traceable | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| from src.prompts import system_prompt | |
| # from src.name_extractor import extract_name_gliner | |
| from src.models import CacheEntry | |
| from src.config import Config | |
| from src.utils import FileReader | |
| # --------------------------------------------------------------------- | |
| # CHAT CLASS | |
| # --------------------------------------------------------------------- | |
| class MyProfileAvatarChat(Config, FileReader): | |
| def __init__(self, max_history_turns: int = 10, similarity_thresh: float = 0.80): | |
| Config.__init__(self) | |
| FileReader.__init__(self) | |
| # 1. Try to load from env | |
| self.name = os.getenv("PROFIL_NAME") | |
| # if not self.name: | |
| # name = extract_name_gliner(self.linkedin_profile) | |
| # self.name = name["person"][0] | |
| # print(f"Name found on Linkedin profile: {self.name}") | |
| self.openai = AsyncOpenAI(api_key=self.openai_api_key) | |
| # Build system prompt once | |
| self.system_prompt = system_prompt | |
| self.system_prompt += f"## Linkedin Profile:\n{self.linkedin_profile}\n\n" | |
| self.system_prompt += f"## Addidional Information:\n{self.additional_info}\n\n" | |
| self.system_prompt += f"With this context, please chat with user, always staying in character as {self.name}." | |
| # Settings | |
| self.max_history_turns = max_history_turns | |
| self.similarity_threshold = similarity_thresh | |
| # QA cache (question -> answer -> embedding) | |
| self.qa_cache = [] # list of dict: {"question": str, "answer": str, "embedding": np.array} | |
| def format_history(self, history): | |
| return "\n".join(f"{turn['role'].upper()}: {turn['content']}" for turn in history) | |
| async def embed(self, text: str): | |
| """Return embedding vector for text (uses OpenAI embeddings).""" | |
| resp = await self.openai.embeddings.create( | |
| model="text-embedding-3-small", | |
| input=text | |
| ) | |
| return np.array(resp.data[0].embedding) | |
| def cosine_sim(self, a: np.ndarray, b: np.ndarray) -> float: | |
| return float(cosine_similarity(a.reshape(1, -1), b.reshape(1, -1))[0][0]) | |
| async def find_similar_question(self, new_question: str): | |
| if not self.qa_cache: | |
| return None, 0.0 | |
| new_emb = await self.embed(new_question) | |
| best = None | |
| best_sim = 0.0 | |
| for item in self.qa_cache: | |
| sim = self.cosine_sim(new_emb, item["embedding"]) | |
| if sim > best_sim: | |
| best_sim = sim | |
| best = item | |
| if best and best_sim >= self.similarity_threshold: | |
| return best, best_sim | |
| return None, best_sim | |
| async def chat(self, message: str, history: list, **kwargs): | |
| """Main chat. Uses semantic QA cache and sliding window for tokens | |
| Args: | |
| message: user message string | |
| history: existing list of dicts [{"role":...., "content":....}] | |
| Returns: | |
| reply string | |
| """ | |
| # Cache exact-match short-circuit | |
| if message in (qa["question"] for qa in self.qa_cache): | |
| # exact match | |
| for qa in self.qa_cache: | |
| if qa["question"] == message: | |
| print("Using exact cached reply") | |
| history.append({"role": "user", "content": message}) | |
| history.append({"role": "assistant", "content": qa["answer"]}) | |
| return qa["answer"] | |
| # Check for semantically similar previous question | |
| similar, sim_score = await self.find_similar_question(message) | |
| if similar: | |
| print(f"Reusing past answer (similarity={sim_score:.2%})") | |
| refine_prompt = ( | |
| f"The user previously asked a similar question:\n" | |
| + f"Old question: {similar['question']}\n" | |
| + f"Old answer: {similar['answer']}\n\n" | |
| + f"Now user asks: {message}\n\n" | |
| + f"Please update or refine the old answer to match the new question." | |
| ) | |
| messages = [{"role": "system", "content": self.system_prompt}, | |
| {"role": "user", "content": refine_prompt}] | |
| try: | |
| response = await self.openai.chat.completions.create( | |
| model="gpt-4o-mini", | |
| messages=messages | |
| ) | |
| reply = response.choices[0].message.content | |
| except Exception as e: | |
| print(f"Error calling OpenAI for refinement: {e}") | |
| reply = similar["answer"] | |
| else: | |
| # Build token-efficent context (sliding window) | |
| temp_history = history + [{"role": "user", "content": message}] | |
| context_for_api = temp_history[-self.max_history_turns:] | |
| messages = [{"role": "system", "content": self.system_prompt}] + context_for_api | |
| try: | |
| response = await self.openai.chat.completions.create( | |
| model="gpt-4o-mini", | |
| messages=messages | |
| ) | |
| reply = response.choices[0].message.content | |
| except Exception as e: | |
| print(f"Error calling OpenAI: {e}") | |
| try: | |
| emb = await self.embed(message) | |
| except Exception as e: | |
| print(f"Embedding Error: {e}") | |
| traceback.print_exc() | |
| emb = None | |
| self.qa_cache.append({ | |
| "question":message, | |
| "answer":reply, | |
| "embedding":emb | |
| }) | |
| return reply | |
| async def chat_traced(self, *args, **kwargs): | |
| """Wrapper for LangSmith tracing. Accepts any extra arguments | |
| (like from Gradio) and passes only message/history to chat().""" | |
| if len(args) >=2: | |
| message, history = args[0], args[1] | |
| else: | |
| message = kwargs.get("message") | |
| history = kwargs.get("history") | |
| return await self.chat(message, history) | |
| if __name__ == "__main__": | |
| my_profile = MyProfileAvatarChat() | |
| with gr.Blocks() as demo: | |
| # Per-user chat history state | |
| state = gr.State([]) | |
| # Chat interface | |
| chat = gr.ChatInterface( | |
| my_profile.chat_traced | |
| ) | |
| demo.queue(max_size=10).launch( | |
| server_name="0.0.0.0", | |
| show_error=8000, | |
| share=True | |
| ) | |