Rauhan's picture
UPDATE: claude prompts
1b179fe
from ..components.metadataGenerator import MetadataGenerator
from ..workflows.reportingWorkflow import reportingToolWorkflow
from ..components.speechToText import SpeechToText
from ..utils.exceptions import CustomException
from ..utils.functions import readYaml
from ..components import replManager
from supabase import create_client
from urllib.request import urlopen
from ..utils.logger import logger
import orjson
import time
import os
class CompletePipeline:
def __init__(self):
logger.info("Initializing CompletePipeline components.")
self.speechToTextModule = SpeechToText()
self.metadataGenerator = MetadataGenerator()
self.yamlPath = os.path.join(os.getcwd(), "params.yaml")
self.supabaseClient = create_client(
supabase_url = os.environ["SUPABASE_URL"],
supabase_key = os.environ["SUPABASE_KEY"]
)
def generateMetadata(self, projectId: str) -> dict:
try:
dataFiles = [x.get("name") for x in self.supabaseClient.storage.from_("AnalyticsHub").list(path = projectId) if x.get("name").endswith(".parquet")]
results = ""
for fileName in dataFiles:
dataframeName = fileName.replace(".parquet", "")
codeString = readYaml(self.yamlPath)["attributeInfoCode"].format(dataframeName = dataframeName, projectId = projectId)
results += replManager.run(codeString)
metadataChain = self.metadataGenerator.getMetadataChain()
metadata = metadataChain.invoke({"metadata": results})
metadataParts = metadata.split("```")
metadata = metadataParts[-2]
metadata = orjson.loads("\n".join(metadata.split("\n")[1:]).encode())
return metadata
except Exception as e:
logger.error(CustomException(e))
raise CustomException(e)
def generateChart(self, inputQuery: str, metadata: dict, projectId: str) -> dict:
try:
response = reportingToolWorkflow.invoke({
"inputQuery": inputQuery,
"metadata": metadata,
"projectId": projectId
})
return response
except Exception as e:
logger.error(CustomException(e))
raise CustomException(e)
def generateChartFromPanel(self, projectId: str, chartType: str, xAxis: str, yAxis: str, aggregationMetric: str, dataSource: str) -> dict:
try:
blendConfigUrl = os.environ["FILE_URL"].format(projectId = projectId, fileName = "blendConfig.json").replace(".parquet", "") + f"?cb={int(time.time())}"
blendConfig = orjson.loads(urlopen(blendConfigUrl).read())
blendedTables = list(blendConfig.keys())
if dataSource in blendedTables:
tablesUsed = blendConfig[dataSource].get("tables")
joinTypes = blendConfig[dataSource].get("joinTypes")
blendOn = blendConfig[dataSource].get("blendOn")
response = replManager.run(f"getDataForChart(projectId='{projectId}', chartType='{chartType}', xAxis='{xAxis}', yAxis='{yAxis}', aggregationMetric='{aggregationMetric}', tablesUsed={tablesUsed}, joinTypes={joinTypes}, blendOn={blendOn})")
else:
response = replManager.run(f"getDataForChart(projectId='{projectId}', chartType='{chartType}', xAxis='{xAxis}', yAxis='{yAxis}', aggregationMetric='{aggregationMetric}', tablesUsed='{dataSource}')")
response = orjson.loads(response.encode())
return response
except Exception as e:
logger.error(CustomException(e))
raise CustomException(e)
def speechToText(self, b64String: str) -> str:
try:
return self.speechToTextModule.getTranscript(b64String = b64String)
except Exception as e:
logger.error(CustomException(e))
raise CustomException(e)