| import re |
| import os |
| from datetime import datetime |
| import openai |
| from google.cloud import firestore |
| from dotenv import load_dotenv |
| import pandas as pd |
| import ast |
| import json |
| from pandasai.responses.response_parser import ResponseParser |
| |
| load_dotenv() |
| db = firestore.Client.from_service_account_json("firestore-key.json") |
| from langchain_google_genai import ChatGoogleGenerativeAI |
| |
| from langchain_community.chat_models.sambanova import ChatSambaNovaCloud |
| import google.generativeai as genai |
|
|
| llm = ChatSambaNovaCloud( |
| model="Meta-Llama-3.1-70B-Instruct", |
| max_tokens=1024, |
| temperature=0.7, |
| top_k=1, |
| top_p=0.01, |
| ) |
|
|
|
|
|
|
| |
| class FlaskResponse(ResponseParser): |
| def __init__(self, context) -> None: |
| super().__init__(context) |
|
|
| def format_dataframe(self, result): |
| return result['value'].to_html() |
|
|
| def format_plot(self, result): |
| |
| try: |
| |
| img_path = result['value'] |
| |
| |
| except ValueError: |
| img_path = str(result['value']) |
| print("value error!", img_path) |
| |
| print("response_class_path:", img_path) |
| return img_path |
|
|
| def format_other(self, result): |
| return str(result['value']) |
| |
| def generateResponse(prompt, model='Meta-Llama-3.1-70B-Instruct'): |
| |
| relevant_info_template = """ |
| Intent: The CRUD operation (create, read, update, delete) |
| Transaction Type: e.g. Purchase, Sale, Inventory |
| Details: A list of key detail fields extracted. |
| """ |
| sample_single_transaction_template = """ |
| *Intent*: Create |
| *Transaction Type*: Purchase |
| *Details*: - Item: Car, - Quantity: 1, - Cost: 10000, etc. |
| """ |
| sample_multi_transaction_template = """ |
| *Intent*: Create |
| Transaction 1: |
| *Transaction Type*: Purchase |
| *Details*: - Item: Car, - Quantity: 1, etc. |
| Transaction 2: |
| *Transaction Type*: Sale |
| *Details*: - Item: Chair, - Quantity: 2, etc. |
| """ |
| response = openai.OpenAI( |
| api_key=os.environ.get("SAMBANOVA_API_KEY"), |
| base_url="https://api.sambanova.ai/v1", |
| ).chat.completions.create( |
| model=model, |
| messages=[ |
| {"role": "system", "content": |
| f"You are a helpful assistant that classifies transactions. Format your output with these guidelines: {relevant_info_template} " |
| f"Sample single transaction: {sample_single_transaction_template} " |
| f"Sample multi-transaction: {sample_multi_transaction_template}"}, |
| {"role": "user", "content": prompt} |
| ] |
| ) |
| try: |
| response_text = response.choices[0].message.content |
| except Exception as e: |
| print(f'An error occurred: {str(e)}') |
| response_text = None |
| return response_text |
|
|
| def parse_value(value): |
| value = value.strip() |
| try: |
| |
| currency_match = re.search(r"([A-Z]{3}|\$|€|£)", value) |
| currency = currency_match.group(1) if currency_match else None |
|
|
| |
| cleaned_value = re.sub(r"([A-Z]{3}|\$|€|£)", "", value).replace(",", "").strip() |
|
|
| if "%" in cleaned_value: |
| return float(cleaned_value.replace("%", "")), currency |
| elif cleaned_value.replace(".", "", 1).isdigit(): |
| return float(cleaned_value) if "." in cleaned_value else int(cleaned_value), currency |
| return value, currency |
| except ValueError: |
| return value, None |
|
|
| def extract_transaction_details(text): |
| details = {} |
| transaction_currency = None |
| |
| detail_matches = re.findall( |
| r"-\s*\*{0,2}([\w\s]+)\*{0,2}:\s*([\w\s,.$%-]+?)(?:\s*[\n]|$)", |
| text, |
| re.DOTALL |
| ) |
| for field, value in detail_matches: |
| field_key = field.strip().lower().replace(" ", "_") |
| parsed_value, detected_currency = parse_value(value) |
| if detected_currency and not transaction_currency: |
| transaction_currency = detected_currency |
| details[field_key] = parsed_value |
| if transaction_currency: |
| details["currency"] = transaction_currency |
| return details |
|
|
| def parse_ai_response(response_text): |
| data = { |
| "intent": None, |
| "transaction_type": None, |
| "details": {}, |
| "created_at": datetime.now().isoformat() |
| } |
| intent_match = re.search(r"\*Intent\*:\s*(\w+)", response_text) |
| if intent_match: |
| data["intent"] = intent_match.group(1) |
| transaction_type_match = re.search(r"\*Transaction Type\*:\s*(\w+)", response_text) |
| if transaction_type_match: |
| data["transaction_type"] = transaction_type_match.group(1) |
| data["details"] = extract_transaction_details(response_text) |
| return data |
|
|
| def parse_multiple_transactions(response_text): |
| transactions = [] |
| |
| transaction_sections = re.split(r"Transaction \d+:", response_text, flags=re.IGNORECASE) |
| transaction_sections = [section.strip() for section in transaction_sections if section.strip()] |
| |
| if transaction_sections and not re.search(r"\*Transaction Type\*", transaction_sections[0], re.IGNORECASE): |
| transaction_sections.pop(0) |
| intent_match = re.search(r"\*Intent\*:\s*(\w+)", response_text) |
| intent = intent_match.group(1) if intent_match else None |
| for section in transaction_sections: |
| data = { |
| "intent": intent, |
| "transaction_type": None, |
| "details": {}, |
| "created_at": datetime.now().isoformat() |
| } |
| transaction_type_match = re.search(r"\*Transaction Type\*:\s*(\w+)", section) |
| if transaction_type_match: |
| data["transaction_type"] = transaction_type_match.group(1) |
| data["details"] = extract_transaction_details(section) |
| transactions.append(data) |
| return transactions |
|
|
| def read_datalake(user_phone, user_question): |
| inventory_ref = db.collection("users").document(user_phone).collection("inventory") |
| sales_ref = db.collection("users").document(user_phone).collection("sales") |
| inventory_list = [doc.to_dict() for doc in inventory_ref.stream()] |
| sales_list = [doc.to_dict() for doc in sales_ref.stream()] |
| inventory_df = pd.DataFrame(inventory_list) |
| sales_df = pd.DataFrame(sales_list) |
| from pandasai import SmartDatalake |
| lake = SmartDatalake( |
| [inventory_df, sales_df], |
| config={ |
| "llm": llm, |
| "custom_whitelisted_dependencies": ["ast"], |
| "response_parser": FlaskResponse, |
| "enable_cache": False, |
| "save_logs": False |
| } |
| ) |
| response = lake.chat(user_question) |
| return response |
|
|
| def create_inventory(user_phone, transaction_data): |
| for transaction in transaction_data: |
| item_name = transaction['details'].get('item') |
| if item_name: |
| doc_ref = db.collection("users").document(user_phone).collection("inventory").document(item_name) |
| doc_ref.set(transaction) |
| return True |
|
|
| def create_sale(user_phone, transaction_data): |
| for transaction in transaction_data: |
| item_name = transaction['details'].get('item') |
| if not item_name: |
| continue |
| inventory = fetch_transaction(user_phone, item_name) |
| if inventory and inventory['details'].get('quantity') is not None: |
| new_stock = inventory['details'].get('quantity') - transaction['details'].get('quantity', 0) |
| inventory['details']['quantity'] = new_stock |
| inv_ref = db.collection("users").document(user_phone).collection("inventory").document(item_name) |
| inv_ref.set(inventory) |
| sale_ref = db.collection("users").document(user_phone).collection("sales").document(item_name) |
| sale_ref.set(transaction) |
| return True |
|
|
| def update_transaction(user_phone, transaction_id, update_data): |
| doc_ref = db.collection("users").document(user_phone).collection("transactions").document(transaction_id) |
| doc_ref.update(update_data) |
| return True |
|
|
| def fetch_transaction(user_phone, transaction_id=None): |
| if transaction_id: |
| doc_ref = db.collection("users").document(user_phone).collection("inventory").document(transaction_id) |
| transaction = doc_ref.get() |
| if transaction.exists: |
| return transaction.to_dict() |
| return None |
| else: |
| collection_ref = db.collection("users").document(user_phone).collection("inventory") |
| transactions = [doc.to_dict() for doc in collection_ref.stream()] |
| return transactions |
|
|
| def delete_transaction(user_phone, transaction_data): |
| |
| transaction_id = transaction_data[0]['details'].get('item') |
| transaction_type = transaction_data[0]['transaction_type'] |
| if transaction_type: |
| transaction_type = transaction_type.lower() |
| else: |
| transaction_type = "inventory" |
| doc_ref = db.collection("users").document(user_phone).collection(transaction_type).document(transaction_id) |
| item_doc = doc_ref.get() |
| if item_doc.exists: |
| doc_ref.delete() |
| return True |
| else: |
| return False |
|
|
| def persist_temporary_transaction(transactions, mobile): |
| temp_ref = db.collection("users").document(mobile).collection("temp_transactions").document('pending-user-action') |
| data = { |
| "transactions": transactions, |
| "status": "pending", |
| "created_at": datetime.now().isoformat() |
| } |
| temp_ref.set(data) |
| return True |
|
|
| def process_intent(parsed_trans_data, mobile): |
| """ |
| Process the detected intent and handle transactions accordingly. |
| - For 'create', it differentiates between purchase/inventory and sale. |
| - For 'delete', it attempts to delete the transaction. |
| - Returns a message indicating the result. |
| """ |
| intent = parsed_trans_data[0]['intent'].lower() if parsed_trans_data and parsed_trans_data[0].get('intent') else None |
| trans_type = parsed_trans_data[0]['transaction_type'].lower() if parsed_trans_data and parsed_trans_data[0].get('transaction_type') else None |
|
|
| if intent == 'create': |
| if trans_type in ('purchase', 'purchases', 'inventory'): |
| if create_inventory(mobile, parsed_trans_data): |
| firestore_msg = "Transaction recorded successfully!" |
| else: |
| firestore_msg = "Sorry, could not record transaction!" |
| elif trans_type in ('sale', 'sales'): |
| if create_sale(mobile, parsed_trans_data): |
| firestore_msg = "Transaction recorded successfully!" |
| else: |
| firestore_msg = "Sorry, could not record transaction!" |
| else: |
| firestore_msg = f"Transaction type '{trans_type}' is not supported for create operations." |
| elif intent == 'update': |
| |
| firestore_msg = "Update operation not implemented yet." |
| elif intent == 'delete': |
| item = parsed_trans_data[0]['details'].get('item', 'item') |
| item_deleted = delete_transaction(mobile, parsed_trans_data) |
| if item_deleted: |
| firestore_msg = f"You successfully deleted {item} from {trans_type}!" |
| else: |
| firestore_msg = f"Sorry, could not delete {item} from {trans_type}!" |
| else: |
| firestore_msg = f"The detected intent, {intent}, is not currently supported!" |
| return firestore_msg |