| import os |
| import pandas as pd |
| import streamlit as st |
| import ast |
| import pymongo |
| import json |
| from pymongo import MongoClient |
| from pymongo.errors import ConnectionFailure |
| from pandasai import SmartDataframe |
| from pandasai.responses.response_parser import ResponseParser |
| from pandasai.connectors import ( |
| PostgreSQLConnector, MySQLConnector, SqliteConnector, SQLConnector |
| ) |
| from pandasai.ee.connectors import SnowFlakeConnector, DatabricksConnector |
| from langchain_groq import ChatGroq |
| from groq import Groq |
| import chardet |
|
|
| |
| |
| |
| os.environ['GROQ_API_KEY'] = "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" |
| |
|
|
| llm = Groq() |
| model = ChatGroq(temperature=0.8,groq_api_key="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", model_name="llama-3.1-70b-versatile") |
|
|
| class StreamlitResponse(ResponseParser): |
| def __init__(self, context) -> None: |
| super().__init__(context) |
|
|
| def format_dataframe(self, result): |
| st.dataframe(result["value"]) |
| return |
|
|
| def format_plot(self, result): |
| st.image(result["value"], width=900, use_column_width=True) |
| return |
|
|
| def format_other(self, result): |
| st.write(result["value"]) |
| return |
|
|
| def detect_encoding(file): |
| |
| raw_data = file.read(1024) |
| result = chardet.detect(raw_data) |
| file.seek(0) |
| return result['encoding'] |
|
|
| def read_csv_with_encoding(file): |
| encodings = ['utf-8', 'latin1', 'iso-8859-1', 'utf-16', 'cp1252'] |
| for encoding in encodings: |
| try: |
| file.seek(0) |
| df = pd.read_csv(file, encoding=encoding) |
| if not df.empty: |
| return df |
| except UnicodeDecodeError: |
| continue |
| except pd.errors.EmptyDataError: |
| st.error("The CSV file is empty or malformed.") |
| return None |
| except Exception as e: |
| st.error(f"An error occurred: {e}") |
| return None |
| st.error("Unable to decode the CSV file with common encodings.") |
| return None |
|
|
| def load_file(file) -> pd.DataFrame: |
| if file.type == "text/csv": |
| df = read_csv_with_encoding(file) |
| elif file.type == "application/json": |
| try: |
| df = pd.read_json(file) |
| except ValueError as e: |
| st.error(f"An error occurred while reading JSON file: {e}") |
| return None |
| except Exception as e: |
| st.error(f"An error occurred: {e}") |
| return None |
| elif file.type in ["application/vnd.ms-excel", "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"]: |
| try: |
| df = pd.read_excel(file, engine='openpyxl') |
| except Exception as e: |
| st.error(f"An error occurred while reading Excel file: {e}") |
| return None |
| else: |
| st.error("Unsupported file type. Please upload a CSV, JSON, or Excel file.") |
| return None |
|
|
| return df |
|
|
| def get_document_structure(doc): |
| """ |
| Converts the structure of a MongoDB document into a text representation. |
| |
| Args: |
| - doc (dict): The MongoDB document. |
| |
| Returns: |
| - str: A text representation of the document structure. |
| |
| """ |
| lines = [] |
| for key, value in doc.items(): |
| |
| value_type = type(value).__name__ |
| lines.append(f"{key}: {value_type}") |
| |
| |
| structure_text = "\n".join(lines) |
| return structure_text |
|
|
| def preprocess_json_string(json_string): |
| |
| json_string = json_string.replace("'", '"') |
| |
| |
| if json_string.startswith("{") and json_string.endswith("}"): |
| return json_string |
| return f"{{{json_string}}}" |
|
|
| def mon_query(ques,structure,database,collection): |
| prompt=f""" |
| Task: You are a Mongodb query expert, you are given with the document structure. |
| Databse is already connected. |
| You primary task is to understand users task, based on the given information provide |
| the correct query to fetch the required data. |
| |
| structure:{structure} |
| query:{ques} |
| database:{database} |
| collection:{collection} |
| |
| response: |
| Only provide query to extract data. |
| |
| collection.find_one({{"name": "alice"}}) |
| |
| Requirements: |
| Only provide the query no explainantion |
| Only provide mongodb query. |
| follow structure of document |
| Use collection to get the query |
| never provide explaination |
| only mongodb query. |
| """ |
|
|
| completion = llm.chat.completions.create( |
| messages=[ |
| { |
| "role": "user", |
| "content": prompt |
| } |
| ], |
| model="llama-3.1-70b-versatile", |
| ) |
|
|
| return completion.choices[0].message.content |
|
|
| |
| if "show_db" not in st.session_state: |
| st.session_state.show_db = False |
| if "df" not in st.session_state: |
| st.session_state.df = None |
|
|
| |
| st.write("# DATA CHAD") |
| st.write("This product belongs to NAMA AI") |
|
|
| |
| st.sidebar.title("Navigation") |
| option = st.sidebar.radio("Select Option", ["File Upload", "Database Connection"], key="option_radio") |
|
|
| if option == "File Upload": |
| st.session_state.show_db = False |
| uploaded_file = st.file_uploader("Upload your CSV, JSON, or Excel file", type=["csv", "json", "xlsx", "xls"]) |
|
|
| if uploaded_file: |
| df = load_file(uploaded_file) |
| st.session_state.df = df |
|
|
| if df is not None: |
| with st.expander("🔎 Dataframe Preview"): |
| st.write(df) |
|
|
| query = st.text_area("🗣️ Chat with Dataframe") |
| if query: |
| try: |
| llm = model |
| query_engine = SmartDataframe( |
| df, |
| config={ |
| "llm": llm, |
| "response_parser": StreamlitResponse, |
| "code_to_run": True |
| }, |
| ) |
| if st.button("GENERATE"): |
| answer = query_engine.chat(f"Provide formatted answer as you are a business analyst: {query} and provide the most suitable plot. Also, explain plots and provide a complete plot, ignoring null and irrelevant values.") |
| if answer is not None: |
| st.title(f"👾 Here is your Analysis: {answer}") |
| except Exception as e: |
| st.error(f"An error occurred while processing the query: {e}") |
| else: |
| st.info("Please upload a CSV, JSON, or Excel file to get started.") |
|
|
| elif option == "Database Connection": |
| st.session_state.show_db = True |
| st.sidebar.title("Database Connection Configuration") |
| db_type = st.sidebar.selectbox("Select Database Type", ["PostgreSQL", "MySQL", "SQLite", "SQL", "Snowflake", "Databricks","MongoDB"]) |
|
|
| |
|
|
| if db_type == "PostgreSQL": |
| host = st.sidebar.text_input("Host", "postgres") |
| port = st.sidebar.number_input("Port", 5432) |
| database = st.sidebar.text_input("Database", "mydb") |
| username = st.sidebar.text_input("Username", "root") |
| password = st.sidebar.text_input("Password", "root", type="password") |
| table = st.sidebar.text_input("Table", "payments") |
| where = st.sidebar.text_input("Filter (optional)", "") |
| query= st.sidebar.text_input("Enter your Query") |
| if st.sidebar.button("Connect PostgreSQL"): |
| try: |
| where_clause = eval(where) if where else None |
| postgres_connector = PostgreSQLConnector( |
| config={ |
| "host": host, |
| "port": port, |
| "database": database, |
| "username": username, |
| "password": password, |
| "table": table, |
| "where": where_clause, |
| } |
| ) |
| df = SmartDataframe(postgres_connector) |
| st.session_state.df = df |
| st.success("Connected to PostgreSQL") |
| answer = df.chat(query) |
| st.title(answer) |
| except Exception as e: |
| st.error(f"An unexpected error occurred: {e}") |
|
|
| elif db_type == "MySQL": |
| host = st.sidebar.text_input("Host", "localhost") |
| port = st.sidebar.number_input("Port", 3306) |
| database = st.sidebar.text_input("Database", "mydb") |
| username = st.sidebar.text_input("Username", "root") |
| password = st.sidebar.text_input("Password", "root", type="password") |
| table = st.sidebar.text_input("Table", "loans") |
| where = st.sidebar.text_input("Filter (optional)", "[['loan_status', '=', 'PAIDOFF']]") |
| query= st.sidebar.text_input("Enter your Query") |
| if st.sidebar.button("Connect MySQL"): |
| try: |
| mysql_connector = MySQLConnector( |
| config={ |
| "host": host, |
| "port": port, |
| "database": database, |
| "username": username, |
| "password": password, |
| "table": table, |
| "where": eval(where), |
| } |
| ) |
| df = SmartDataframe(mysql_connector) |
| st.success("Connected to MySQL") |
| answer = df.chat(query) |
| st.title(answer) |
| except Exception as e : |
| st.error(f"An error occurred: {e}") |
|
|
| elif db_type == "SQLite": |
| database = st.sidebar.text_input("Database Path", "path_to_db") |
| table = st.sidebar.text_input("Table", "actor") |
| where = st.sidebar.text_input("Filter (optional)", "[['first_name', '=', 'PENELOPE']]") |
| query= st.sidebar.text_input("Enter your Query") |
| if st.sidebar.button("Connect SQLite"): |
| try: |
| sqlite_connector = SqliteConnector( |
| config={ |
| "database": database, |
| "table": table, |
| "where": eval(where), |
| } |
| ) |
| df = SmartDataframe(sqlite_connector) |
| st.success("Connected to SQLite") |
| except Exception as e: |
| st.error(f"An error occurred: {e}") |
|
|
| elif db_type == "SQL": |
| dialect = st.sidebar.text_input("Dialect", "sqlite") |
| driver = st.sidebar.text_input("Driver", "pysqlite") |
| host = st.sidebar.text_input("Host", "localhost") |
| port = st.sidebar.number_input("Port", 3306) |
| database = st.sidebar.text_input("Database", "mydb") |
| username = st.sidebar.text_input("Username", "root") |
| password = st.sidebar.text_input("Password", "root", type="password") |
| table = st.sidebar.text_input("Table", "loans") |
| where = st.sidebar.text_input("Filter (optional)", "[['loan_status', '=', 'PAIDOFF']]") |
| query= st.sidebar.text_input("Enter your Query") |
| if st.sidebar.button("Connect SQL"): |
| try: |
| sql_connector = SQLConnector( |
| config={ |
| "dialect": dialect, |
| "driver": driver, |
| "host": host, |
| "port": port, |
| "database": database, |
| "username": username, |
| "password": password, |
| "table": table, |
| "where": eval(where), |
| } |
| ) |
| df = SmartDataframe(sql_connector) |
| st.success("Connected to SQL") |
| answer = df.chat(query) |
| st.title(answer) |
| except Exception as e: |
| st.error(f"An error occurred: {e}") |
|
|
| elif db_type == "Snowflake": |
| account = st.sidebar.text_input("Account", "your_account") |
| database = st.sidebar.text_input("Database", "SNOWFLAKE_SAMPLE_DATA") |
| username = st.sidebar.text_input("Username", "test") |
| password = st.sidebar.text_input("Password", "*****", type="password") |
| table = st.sidebar.text_input("Table", "lineitem") |
| warehouse = st.sidebar.text_input("Warehouse", "COMPUTE_WH") |
| dbSchema = st.sidebar.text_input("Schema", "tpch_sf1") |
| where = st.sidebar.text_input("Filter (optional)", "[['l_quantity', '>', '49']]") |
| query= st.sidebar.text_input("Enter your Query") |
| if st.sidebar.button("Connect Snowflake"): |
| try: |
| snowflake_connector = SnowFlakeConnector( |
| config={ |
| "account": account, |
| "database": database, |
| "username": username, |
| "password": password, |
| "table": table, |
| "warehouse": warehouse, |
| "dbSchema": dbSchema, |
| "where": eval(where), |
| } |
| ) |
| df = SmartDataframe(snowflake_connector) |
| st.success("Connected to Snowflake") |
| answer = df.chat(query) |
| st.title(answer) |
| except Exception as e: |
| st.error(f"An error occurred: {e}") |
|
|
| elif db_type == "Databricks": |
| host = st.sidebar.text_input("Host", "adb-*****.azuredatabricks.net") |
| database = st.sidebar.text_input("Database", "default") |
| token = st.sidebar.text_input("Token", "dapidfd412321", type="password") |
| port = st.sidebar.number_input("Port", 443) |
| table = st.sidebar.text_input("Table", "loan_payments_data") |
| httpPath = st.sidebar.text_input("HTTP Path", "/sql/1.0/warehouses/213421312") |
| where = st.sidebar.text_input("Filter (optional)", "[['loan_status', '=', 'PAIDOFF']]") |
| query= st.sidebar.text_input("Enter your Query") |
| if st.sidebar.button("Connect Databricks"): |
| try: |
| databricks_connector = DatabricksConnector( |
| config={ |
| "host": host, |
| "database": database, |
| "token": token, |
| "port": port, |
| "table": table, |
| "httpPath": httpPath, |
| "where": eval(where), |
| } |
| ) |
| df = SmartDataframe(databricks_connector) |
| st.success("Connected to Databricks") |
| answer = df.chat(query) |
| st.title(answer) |
| except Exception as e: |
| st.error(f"An error occurred: {e}") |
|
|
| elif db_type == "MongoDB": |
| host = st.sidebar.text_input("Host", "localhost:27017") |
| username = st.sidebar.text_input("Username", value="", key="mongo_user") |
| password = st.sidebar.text_input("Password", value="", type="password", key="mongo_pass") |
| database = st.sidebar.text_input("Database Name") |
| collect = st.sidebar.text_input("Collection Name") |
| query = st.sidebar.text_input("Enter your Query") |
|
|
| if st.sidebar.button("Connect MongoDB"): |
| try: |
| |
| mongo_uri = f"mongodb://{username}:{password}@{host}" |
| client = MongoClient(mongo_uri) |
|
|
| |
| db = client[database] |
| collection = db[collect] |
|
|
| |
| client.server_info() |
| st.success("Connected to MongoDB") |
|
|
| ans=(f"Collection 'my_new_collection' created with document: {collection.find_one()}") |
| document_structure = collection.find_one() |
| if document_structure: |
| structure = get_document_structure(document_structure) |
| generated_query = mon_query(query, structure,database,collection) |
| ans = eval(generated_query) |
| document = ans |
| st.write("Found document:", document) |
| except ConnectionFailure as e: |
| st.error(f"An error occurred: {e}") |
|
|
|
|
|
|
| |
|
|
| if st.session_state.df is not None: |
| with st.container(): |
| query = st.text_area("🗣️ Chat with Dataframe", key="chat_query") |
| if query: |
| try: |
| llm = model |
| query_engine = SmartDataframe( |
| st.session_state.df, |
| config={ |
| "llm": llm, |
| "response_parser": StreamlitResponse, |
| "code_to_run": True, |
| "use_vector_store": False |
| }, |
| ) |
| if st.button("GENERATE", key="generate_button"): |
| try: |
| answer = query_engine.chat(f"Provide formatted answer as you are a business analyst: {query} and provide the most suitable plot. Also, explain plots and provide a complete plot, ignoring null and irrelevant values.") |
| if answer is not None: |
| st.title(f"👾 Here is your Analysis: {answer}") |
| except pandasai.exceptions.PandasAIApiCallError as e: |
| st.error(f"PandasAI API error: {str(e)}. Please try again later.") |
| except Exception as e: |
| st.error(f"An unexpected error occurred: {str(e)}") |
| except Exception as e: |
| st.error(f"An error occurred while processing the query: {e}") |