Spaces:
Build error
Build error
| import os | |
| from typing import List | |
| from PIL import Image | |
| from dotenv import load_dotenv | |
| import json | |
| import pickle | |
| import asyncio | |
| import aiohttp | |
| from tenacity import retry, stop_after_attempt, wait_random_exponential | |
| from openai import OpenAI, AsyncClient | |
| import google.generativeai as gemini | |
| from .VectorDatabase import AdvancedClient | |
| from .HelperFunctions import web_search_result_processor | |
| from .prompts import PROMPTS | |
| load_dotenv("utils/.env") | |
| TOGETHER_API = os.getenv("TOGETHER_API") | |
| GEMINI_API = os.getenv("GEMINI_API") | |
| X_API_KEY = os.getenv("X_API_KEY") | |
| client = AdvancedClient(vector_database_path="VectorDB") | |
| with open("utils/HyDE.bin", "rb") as file: | |
| HyDE = pickle.load(file) | |
| def image_data_extractor(img: Image.Image, text: str) -> str: | |
| gemini.configure(api_key=GEMINI_API) | |
| model = gemini.GenerativeModel("gemini-1.5-flash") | |
| prompt = PROMPTS["gemini-image"].format(text=text) | |
| response = model.generate_content([prompt, img], stream=False) | |
| return response.text | |
| def generate_embedding( | |
| texts: List[str], embedding_model: str = "BAAI/bge-large-en-v1.5" | |
| ) -> List[List[float]]: | |
| """Generate Embeddings for the givien pieces of texts.""" | |
| client = OpenAI(api_key=TOGETHER_API, base_url="https://api.together.xyz/v1") | |
| embeddings_response = client.embeddings.create( | |
| input=texts, model=embedding_model | |
| ).data | |
| embeddings = [i.embedding for i in embeddings_response] | |
| return embeddings | |
| def industry_finder(collection_id): | |
| question = ( | |
| "What is the name and its specific niche business this document pertains to." | |
| ) | |
| docs = client.retrieve_chunks( | |
| collection_id=collection_id, query=question, number_of_chunks=5 | |
| ) | |
| context = "\n\n".join(docs) | |
| message = f"CONTEXT\n\n{context}\n\n" | |
| model = "meta-llama/Meta-Llama-3.1-8B-Instruct-Turbo" | |
| response_str = response( | |
| message=message, | |
| model=model, | |
| SysPrompt=PROMPTS["industry-finder"], | |
| temperature=0, | |
| ) | |
| industry = json.loads(response_str) | |
| return industry | |
| async def web_search(session, question): | |
| data = {"query": question, "model_id": "openai/gpt-4o-mini"} | |
| try: | |
| async with session.post( | |
| "https://general-chat.elevatics.cloud/search-assistant", | |
| json=data, | |
| headers={"X-API-KEY": X_API_KEY, "Content-Type": "application/json"}, | |
| timeout=aiohttp.ClientTimeout(total=60), # Increase timeout to 60 seconds | |
| ) as response: | |
| print(f"Status: {response.status}") | |
| if response.status == 200: | |
| content = await response.text() | |
| return content | |
| else: | |
| return f"Error: HTTP {response.status}" | |
| except asyncio.TimeoutError: | |
| return "Error: Request timed out" | |
| except aiohttp.ClientError as e: | |
| return f"Error: {str(e)}" | |
| async def other_info(company_data): | |
| industry_company = company_data.get("industry") | |
| niche = company_data.get("niche") | |
| # Define the questions for each category | |
| questions = { | |
| "Risk Involved": f"What are risk involved in the starting a {niche} business in {industry_company}?, please be concise.", | |
| "Barrier To Entry": f"What are barrier to entry for a {niche} business in {industry_company}?, please be concise.", | |
| "Competitors": f"Who are the main competitors in the market for {niche} business in {industry_company}?, please be concise.", | |
| "Challenges": f"What are in the challenges in the {niche} business for {industry_company}?, please be concise.", | |
| } | |
| # Fetch the results for each category | |
| results = {} | |
| async with aiohttp.ClientSession() as session: | |
| tasks = [web_search(session, question) for question in questions.values()] | |
| responses = await asyncio.gather(*tasks) | |
| for type_, response in zip(questions, responses): | |
| results[type_] = response | |
| return results | |
| async def answer(client, context: str, SysPrompt: str): | |
| message = f"CONTEXT:\n\n{context}" | |
| model = "meta-llama/Meta-Llama-3.1-70B-Instruct-Turbo" | |
| messages = [ | |
| {"role": "system", "content": SysPrompt}, | |
| {"role": "user", "content": message}, | |
| ] | |
| print("herere") | |
| response = await client.chat.completions.create( | |
| messages=messages, model=model, temperature=0 | |
| ) | |
| print("nononon") | |
| source = response.choices[0].message.content | |
| return source | |
| async def business_information(collection_id): | |
| async_client = AsyncClient( | |
| api_key=TOGETHER_API, base_url="https://api.together.xyz/v1" | |
| ) | |
| keys = ["product-and-market", "team-and-strategy", "financials"] | |
| async with async_client as aclient: | |
| tasks = [] | |
| for i_key in keys: | |
| for j_key in PROMPTS[i_key]: | |
| embedding = HyDE[i_key][j_key] | |
| sys_prompt = PROMPTS[i_key][j_key] | |
| chunks = client.retrieve_chunks( | |
| collection_id=collection_id, query_embedding=embedding | |
| ) | |
| context = "\n\n".join(chunks) | |
| tasks.append( | |
| asyncio.create_task( | |
| answer(client=aclient, context=context, SysPrompt=sys_prompt) | |
| ) | |
| ) | |
| await asyncio.sleep(1.5) | |
| responses = await asyncio.gather(*tasks) | |
| response_dict = {} | |
| for i_count, i_key in enumerate(keys): | |
| response_dict[i_key] = {} | |
| for j_count, j_key in enumerate(PROMPTS[i_key]): | |
| response_dict[i_key][j_key] = responses[i_count * 4 + j_count] | |
| return response_dict | |
| def response( | |
| message: object, | |
| model: object = "meta-llama/Meta-Llama-3.1-70B-Instruct-Turbo", | |
| SysPrompt: object = PROMPTS["default"], | |
| temperature: object = 0.2, | |
| ) -> str: | |
| """ | |
| :rtype: object | |
| """ | |
| client = OpenAI(api_key=TOGETHER_API, base_url="https://api.together.xyz/v1") | |
| messages = [ | |
| {"role": "system", "content": SysPrompt}, | |
| {"role": "user", "content": message}, | |
| ] | |
| def completion_with_backoff(**kwargs): | |
| print("RETRY") | |
| return client.chat.completions.create(**kwargs) | |
| try: | |
| response = completion_with_backoff( | |
| model=model, | |
| messages=messages, | |
| temperature=temperature, | |
| frequency_penalty=0.2, | |
| ) | |
| return str(response.choices[0].message.content) | |
| except Exception as e: | |
| print(f"An error occurred: {e}") | |
| return "NONE" | |