import os from typing import List from PIL import Image from dotenv import load_dotenv import json import pickle import asyncio import aiohttp from tenacity import retry, stop_after_attempt, wait_random_exponential from openai import OpenAI, AsyncClient import google.generativeai as gemini from .VectorDatabase import AdvancedClient from .HelperFunctions import web_search_result_processor from .prompts import PROMPTS load_dotenv("utils/.env") TOGETHER_API = os.getenv("TOGETHER_API") GEMINI_API = os.getenv("GEMINI_API") X_API_KEY = os.getenv("X_API_KEY") client = AdvancedClient(vector_database_path="VectorDB") with open("utils/HyDE.bin", "rb") as file: HyDE = pickle.load(file) def image_data_extractor(img: Image.Image, text: str) -> str: gemini.configure(api_key=GEMINI_API) model = gemini.GenerativeModel("gemini-1.5-flash") prompt = PROMPTS["gemini-image"].format(text=text) response = model.generate_content([prompt, img], stream=False) return response.text def generate_embedding( texts: List[str], embedding_model: str = "BAAI/bge-large-en-v1.5" ) -> List[List[float]]: """Generate Embeddings for the givien pieces of texts.""" client = OpenAI(api_key=TOGETHER_API, base_url="https://api.together.xyz/v1") embeddings_response = client.embeddings.create( input=texts, model=embedding_model ).data embeddings = [i.embedding for i in embeddings_response] return embeddings def industry_finder(collection_id): question = ( "What is the name and its specific niche business this document pertains to." ) docs = client.retrieve_chunks( collection_id=collection_id, query=question, number_of_chunks=5 ) context = "\n\n".join(docs) message = f"CONTEXT\n\n{context}\n\n" model = "meta-llama/Meta-Llama-3.1-8B-Instruct-Turbo" response_str = response( message=message, model=model, SysPrompt=PROMPTS["industry-finder"], temperature=0, ) industry = json.loads(response_str) return industry async def web_search(session, question): data = {"query": question, "model_id": "openai/gpt-4o-mini"} try: async with session.post( "https://general-chat.elevatics.cloud/search-assistant", json=data, headers={"X-API-KEY": X_API_KEY, "Content-Type": "application/json"}, timeout=aiohttp.ClientTimeout(total=60), # Increase timeout to 60 seconds ) as response: print(f"Status: {response.status}") if response.status == 200: content = await response.text() return content else: return f"Error: HTTP {response.status}" except asyncio.TimeoutError: return "Error: Request timed out" except aiohttp.ClientError as e: return f"Error: {str(e)}" async def other_info(company_data): industry_company = company_data.get("industry") niche = company_data.get("niche") # Define the questions for each category questions = { "Risk Involved": f"What are risk involved in the starting a {niche} business in {industry_company}?, please be concise.", "Barrier To Entry": f"What are barrier to entry for a {niche} business in {industry_company}?, please be concise.", "Competitors": f"Who are the main competitors in the market for {niche} business in {industry_company}?, please be concise.", "Challenges": f"What are in the challenges in the {niche} business for {industry_company}?, please be concise.", } # Fetch the results for each category results = {} async with aiohttp.ClientSession() as session: tasks = [web_search(session, question) for question in questions.values()] responses = await asyncio.gather(*tasks) for type_, response in zip(questions, responses): results[type_] = response return results async def answer(client, context: str, SysPrompt: str): message = f"CONTEXT:\n\n{context}" model = "meta-llama/Meta-Llama-3.1-70B-Instruct-Turbo" messages = [ {"role": "system", "content": SysPrompt}, {"role": "user", "content": message}, ] print("herere") response = await client.chat.completions.create( messages=messages, model=model, temperature=0 ) print("nononon") source = response.choices[0].message.content return source async def business_information(collection_id): async_client = AsyncClient( api_key=TOGETHER_API, base_url="https://api.together.xyz/v1" ) keys = ["product-and-market", "team-and-strategy", "financials"] async with async_client as aclient: tasks = [] for i_key in keys: for j_key in PROMPTS[i_key]: embedding = HyDE[i_key][j_key] sys_prompt = PROMPTS[i_key][j_key] chunks = client.retrieve_chunks( collection_id=collection_id, query_embedding=embedding ) context = "\n\n".join(chunks) tasks.append( asyncio.create_task( answer(client=aclient, context=context, SysPrompt=sys_prompt) ) ) await asyncio.sleep(1.5) responses = await asyncio.gather(*tasks) response_dict = {} for i_count, i_key in enumerate(keys): response_dict[i_key] = {} for j_count, j_key in enumerate(PROMPTS[i_key]): response_dict[i_key][j_key] = responses[i_count * 4 + j_count] return response_dict def response( message: object, model: object = "meta-llama/Meta-Llama-3.1-70B-Instruct-Turbo", SysPrompt: object = PROMPTS["default"], temperature: object = 0.2, ) -> str: """ :rtype: object """ client = OpenAI(api_key=TOGETHER_API, base_url="https://api.together.xyz/v1") messages = [ {"role": "system", "content": SysPrompt}, {"role": "user", "content": message}, ] @retry(wait=wait_random_exponential(min=1, max=10), stop=stop_after_attempt(6)) def completion_with_backoff(**kwargs): print("RETRY") return client.chat.completions.create(**kwargs) try: response = completion_with_backoff( model=model, messages=messages, temperature=temperature, frequency_penalty=0.2, ) return str(response.choices[0].message.content) except Exception as e: print(f"An error occurred: {e}") return "NONE"