Investor-API / utils /ModelCallingFunctions.py
ashishbangwal's picture
increased sleep time dude to TogetherAI rate limit
0937fac
import os
from typing import List
from PIL import Image
from dotenv import load_dotenv
import json
import pickle
import asyncio
import aiohttp
from tenacity import retry, stop_after_attempt, wait_random_exponential
from openai import OpenAI, AsyncClient
import google.generativeai as gemini
from .VectorDatabase import AdvancedClient
from .HelperFunctions import web_search_result_processor
from .prompts import PROMPTS
load_dotenv("utils/.env")
TOGETHER_API = os.getenv("TOGETHER_API")
GEMINI_API = os.getenv("GEMINI_API")
X_API_KEY = os.getenv("X_API_KEY")
client = AdvancedClient(vector_database_path="VectorDB")
with open("utils/HyDE.bin", "rb") as file:
HyDE = pickle.load(file)
def image_data_extractor(img: Image.Image, text: str) -> str:
gemini.configure(api_key=GEMINI_API)
model = gemini.GenerativeModel("gemini-1.5-flash")
prompt = PROMPTS["gemini-image"].format(text=text)
response = model.generate_content([prompt, img], stream=False)
return response.text
def generate_embedding(
texts: List[str], embedding_model: str = "BAAI/bge-large-en-v1.5"
) -> List[List[float]]:
"""Generate Embeddings for the givien pieces of texts."""
client = OpenAI(api_key=TOGETHER_API, base_url="https://api.together.xyz/v1")
embeddings_response = client.embeddings.create(
input=texts, model=embedding_model
).data
embeddings = [i.embedding for i in embeddings_response]
return embeddings
def industry_finder(collection_id):
question = (
"What is the name and its specific niche business this document pertains to."
)
docs = client.retrieve_chunks(
collection_id=collection_id, query=question, number_of_chunks=5
)
context = "\n\n".join(docs)
message = f"CONTEXT\n\n{context}\n\n"
model = "meta-llama/Meta-Llama-3.1-8B-Instruct-Turbo"
response_str = response(
message=message,
model=model,
SysPrompt=PROMPTS["industry-finder"],
temperature=0,
)
industry = json.loads(response_str)
return industry
async def web_search(session, question):
data = {"query": question, "model_id": "openai/gpt-4o-mini"}
try:
async with session.post(
"https://general-chat.elevatics.cloud/search-assistant",
json=data,
headers={"X-API-KEY": X_API_KEY, "Content-Type": "application/json"},
timeout=aiohttp.ClientTimeout(total=60), # Increase timeout to 60 seconds
) as response:
print(f"Status: {response.status}")
if response.status == 200:
content = await response.text()
return content
else:
return f"Error: HTTP {response.status}"
except asyncio.TimeoutError:
return "Error: Request timed out"
except aiohttp.ClientError as e:
return f"Error: {str(e)}"
async def other_info(company_data):
industry_company = company_data.get("industry")
niche = company_data.get("niche")
# Define the questions for each category
questions = {
"Risk Involved": f"What are risk involved in the starting a {niche} business in {industry_company}?, please be concise.",
"Barrier To Entry": f"What are barrier to entry for a {niche} business in {industry_company}?, please be concise.",
"Competitors": f"Who are the main competitors in the market for {niche} business in {industry_company}?, please be concise.",
"Challenges": f"What are in the challenges in the {niche} business for {industry_company}?, please be concise.",
}
# Fetch the results for each category
results = {}
async with aiohttp.ClientSession() as session:
tasks = [web_search(session, question) for question in questions.values()]
responses = await asyncio.gather(*tasks)
for type_, response in zip(questions, responses):
results[type_] = response
return results
async def answer(client, context: str, SysPrompt: str):
message = f"CONTEXT:\n\n{context}"
model = "meta-llama/Meta-Llama-3.1-70B-Instruct-Turbo"
messages = [
{"role": "system", "content": SysPrompt},
{"role": "user", "content": message},
]
print("herere")
response = await client.chat.completions.create(
messages=messages, model=model, temperature=0
)
print("nononon")
source = response.choices[0].message.content
return source
async def business_information(collection_id):
async_client = AsyncClient(
api_key=TOGETHER_API, base_url="https://api.together.xyz/v1"
)
keys = ["product-and-market", "team-and-strategy", "financials"]
async with async_client as aclient:
tasks = []
for i_key in keys:
for j_key in PROMPTS[i_key]:
embedding = HyDE[i_key][j_key]
sys_prompt = PROMPTS[i_key][j_key]
chunks = client.retrieve_chunks(
collection_id=collection_id, query_embedding=embedding
)
context = "\n\n".join(chunks)
tasks.append(
asyncio.create_task(
answer(client=aclient, context=context, SysPrompt=sys_prompt)
)
)
await asyncio.sleep(1.5)
responses = await asyncio.gather(*tasks)
response_dict = {}
for i_count, i_key in enumerate(keys):
response_dict[i_key] = {}
for j_count, j_key in enumerate(PROMPTS[i_key]):
response_dict[i_key][j_key] = responses[i_count * 4 + j_count]
return response_dict
def response(
message: object,
model: object = "meta-llama/Meta-Llama-3.1-70B-Instruct-Turbo",
SysPrompt: object = PROMPTS["default"],
temperature: object = 0.2,
) -> str:
"""
:rtype: object
"""
client = OpenAI(api_key=TOGETHER_API, base_url="https://api.together.xyz/v1")
messages = [
{"role": "system", "content": SysPrompt},
{"role": "user", "content": message},
]
@retry(wait=wait_random_exponential(min=1, max=10), stop=stop_after_attempt(6))
def completion_with_backoff(**kwargs):
print("RETRY")
return client.chat.completions.create(**kwargs)
try:
response = completion_with_backoff(
model=model,
messages=messages,
temperature=temperature,
frequency_penalty=0.2,
)
return str(response.choices[0].message.content)
except Exception as e:
print(f"An error occurred: {e}")
return "NONE"