Spaces:
Sleeping
Sleeping
| import os | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from dotenv import load_dotenv | |
| from sentence_transformers import ( | |
| SentenceTransformer, | |
| CrossEncoder, | |
| ) # SentenceTransformer -> model for embeddings, CrossEncoder -> re-ranker | |
| from ctransformers import AutoModelForCausalLM | |
| from torch import Tensor | |
| from google import genai | |
| from google.genai import types | |
| from app.core.chunks import Chunk | |
| from app.settings import settings, BASE_DIR, GeminiEmbeddingSettings | |
| load_dotenv() | |
| class Embedder: | |
| def __init__(self, model: str = "BAAI/bge-m3"): | |
| self.device: str = settings.device | |
| self.model_name: str = model | |
| self.model: SentenceTransformer = SentenceTransformer(model, device=self.device) | |
| """ | |
| Encodes string to dense vector | |
| """ | |
| def encode(self, text: str | list[str]) -> Tensor | list[Tensor]: | |
| return self.model.encode(sentences=text, show_progress_bar=False, batch_size=32) | |
| """ | |
| Returns the dimensionality of dense vector | |
| """ | |
| def get_vector_dimensionality(self) -> int | None: | |
| return self.model.get_sentence_embedding_dimension() | |
| class Reranker: | |
| def __init__(self, model: str = "cross-encoder/ms-marco-MiniLM-L6-v2"): | |
| self.device: str = settings.device | |
| self.model_name: str = model | |
| self.model: CrossEncoder = CrossEncoder(model, device=self.device) | |
| """ | |
| Returns re-sorted (by relevance) vector with dicts, from which we need only the 'corpus_id' | |
| since it is a position of chunk in original list | |
| """ | |
| def rank(self, query: str, chunks: list[Chunk]) -> list[dict[str, int]]: | |
| return self.model.rank(query, [chunk.get_raw_text() for chunk in chunks]) | |
| # TODO: add models parameters to global config file | |
| # TODO: add exception handling when response have more tokens than was set | |
| # TODO: find a way to restrict the model for providing too long answers | |
| class LocalLLM: | |
| def __init__(self): | |
| self.model = AutoModelForCausalLM.from_pretrained( | |
| **settings.local_llm.model_dump() | |
| ) | |
| """ | |
| Produces the response to user's prompt | |
| stream -> flag, determines weather we need to wait until the response is ready or can show it token by token | |
| TODO: invent a way to really stream the answer (as return value) | |
| """ | |
| def get_response( | |
| self, | |
| prompt: str, | |
| stream: bool = True, | |
| logging: bool = True, | |
| use_default_config: bool = True, | |
| ) -> str: | |
| with open("../prompt.txt", "w") as f: | |
| f.write(prompt) | |
| generated_text = "" | |
| tokenized_text: list[int] = self.model.tokenize(text=prompt) | |
| response: list[int] = self.model.generate( | |
| tokens=tokenized_text, **settings.local_llm.model_dump() | |
| ) | |
| if logging: | |
| print(response) | |
| if not stream: | |
| return self.model.detokenize(response) | |
| for token in response: | |
| chunk = self.model.detokenize([token]) | |
| generated_text += chunk | |
| if logging: | |
| print(chunk, end="", flush=True) # flush -> clear the buffer | |
| return generated_text | |
| class GeminiLLM: | |
| def __init__(self, model="gemini-2.0-flash"): | |
| self.client = genai.Client(api_key=settings.api_key) | |
| self.model = model | |
| def get_response( | |
| self, | |
| prompt: str, | |
| stream: bool = True, | |
| logging: bool = True, | |
| use_default_config: bool = False, | |
| ) -> str: | |
| path_to_prompt = os.path.join(BASE_DIR, "prompt.txt") | |
| with open(path_to_prompt, "w", encoding="utf-8", errors="replace") as f: | |
| f.write(prompt) | |
| response = self.client.models.generate_content( | |
| model=self.model, | |
| contents=prompt, | |
| config=( | |
| types.GenerateContentConfig(**settings.gemini_generation.model_dump()) | |
| if use_default_config | |
| else None | |
| ), | |
| ) | |
| return response.text | |
| async def get_streaming_response( | |
| self, | |
| prompt: str, | |
| stream: bool = True, | |
| logging: bool = True, | |
| use_default_config: bool = False, | |
| ): | |
| path_to_prompt = os.path.join(BASE_DIR, "prompt.txt") | |
| with open(path_to_prompt, "w", encoding="utf-8", errors="replace") as f: | |
| f.write(prompt) | |
| response = self.client.models.generate_content_stream( | |
| model=self.model, | |
| contents=prompt, | |
| config=( | |
| types.GenerateContentConfig(**settings.gemini_generation.model_dump()) | |
| if use_default_config | |
| else None | |
| ), | |
| ) | |
| for chunk in response: | |
| yield chunk | |
| class GeminiEmbed: | |
| def __init__(self, model="text-embedding-004"): | |
| self.client = genai.Client(api_key=settings.api_key) | |
| self.model = model | |
| self.settings = GeminiEmbeddingSettings() | |
| self.max_workers = 5 | |
| def _embed_batch(self, batch: list[str], idx: int) -> dict: | |
| response = self.client.models.embed_content( | |
| model=self.model, | |
| contents=batch, | |
| config=types.EmbedContentConfig( | |
| **settings.gemini_embedding.model_dump() | |
| ), | |
| ).embeddings | |
| return {"idx": idx, "embeddings": response} | |
| def encode(self, text: str | list[str]) -> list[Tensor]: | |
| if isinstance(text, str): | |
| text = [text] | |
| groups: list[list[float]] = [] | |
| max_batch_size = 100 # can not be changed due to google restrictions | |
| batches: list[list[str]] = [text[i : i + max_batch_size] for i in range(0, len(text), max_batch_size)] | |
| print(*[len(batch) for batch in batches]) | |
| with ThreadPoolExecutor(max_workers=self.max_workers) as executor: | |
| futures = [executor.submit(self._embed_batch, batch, idx) for idx, batch in enumerate(batches)] | |
| for future in as_completed(futures): | |
| groups.append(future.result()) | |
| groups.sort(key=lambda x: x["idx"]) | |
| result: list[float] = [] | |
| for group in groups: | |
| for vec in group["embeddings"]: | |
| result.append(vec.values) | |
| return result | |
| def get_vector_dimensionality(self) -> int | None: | |
| return getattr(self.settings, "output_dimensionality") | |
| class Wrapper: | |
| def __init__(self, model: str = "gemini-2.0-flash"): | |
| self.model = model | |
| self.client = genai.Client(api_key=settings.api_key) | |
| def wrap(self, prompt: str) -> str: | |
| response = self.client.models.generate_content( | |
| model=self.model, | |
| contents=prompt, | |
| config=types.GenerateContentConfig(**settings.gemini_wrapper.model_dump()) | |
| ) | |
| return response.text | |