import pandas as pd import os import mysql.connector from mysql.connector import Error from dotenv import load_dotenv from langchain_core.tools import tool import decimal import numpy as np from collections import defaultdict from typing import List, Dict import json load_dotenv() class Database: def __init__(self): self.connection = None self.cursor = None self.data_dir = "sales_data" def connect(self): """Establish connection to MySQL database""" try: self.connection = mysql.connector.connect( host=os.getenv('MYSQL_HOST'), user=os.getenv('MYSQL_USER'), password=os.getenv('MYSQL_PASSWORD'), database=os.getenv('MYSQL_DB'), port=int(os.getenv('MYSQL_PORT', 3306)) ) if self.connection.is_connected(): self.cursor = self.connection.cursor() return True except Error as e: print(f"Error connecting to MySQL: {e}") return False def close(self): """Close database connection""" if self.connection and self.connection.is_connected(): if self.cursor: self.cursor.close() self.connection.close() print("MySQL connection closed") def create_salein_class(self): """Create and populate salein_class table""" try: self.cursor.execute(""" CREATE TABLE IF NOT EXISTS salein_class ( id INT AUTO_INCREMENT PRIMARY KEY, date DATE, employee_name VARCHAR(255), department VARCHAR(255), item_code VARCHAR(50), product_name VARCHAR(255), quantity INT ) """) salein_class_df = pd.read_csv(os.path.join(self.data_dir, "salein_class.csv")) for _, row in salein_class_df.iterrows(): self.cursor.execute(""" INSERT IGNORE INTO salein_class (date, employee_name, department, item_code, product_name, quantity) VALUES (%s, %s, %s, %s, %s, %s) """, (row['Date'], row['Employee_Name'], row['Department'], row['item_code'], row['product_name'], row['Số lượng'])) print("Successfully created salein_class table") return True except Error as e: print(f"Error creating salein_class table: {e}") return False def create_salein_thuc_xuat(self): """Create and populate salein_thuc_xuat table""" try: self.cursor.execute(""" CREATE TABLE IF NOT EXISTS salein_thuc_xuat ( id INT AUTO_INCREMENT PRIMARY KEY, date DATE, employee_name VARCHAR(255), unit_code VARCHAR(255), product_name VARCHAR(255), quantity INT, province VARCHAR(255) ) """) salein_thuc_xuat_df = pd.read_csv(os.path.join(self.data_dir, "salein_thuc_xuat.csv")) for _, row in salein_thuc_xuat_df.iterrows(): self.cursor.execute(""" INSERT IGNORE INTO salein_thuc_xuat (date, employee_name, unit_code, product_name, quantity, province) VALUES (%s, %s, %s, %s, %s, %s) """, (row['Date'], row['Employee_Name'], row['unit_code'], row['product_name'], row['Số lượng xuất'], row['province'])) print("Successfully created salein_thuc_xuat table") return True except Error as e: print(f"Error creating salein_thuc_xuat table: {e}") return False def create_kpi_thuc_xuat(self): """Create and populate kpi_thuc_xuat table""" try: self.cursor.execute(""" CREATE TABLE IF NOT EXISTS kpi_thuc_xuat ( id INT AUTO_INCREMENT PRIMARY KEY, date DATE, employee_name VARCHAR(255), kpi_code VARCHAR(50), region VARCHAR(255), product_name VARCHAR(255), kpi_score INT ) """) kpi_thuc_xuat_df = pd.read_csv(os.path.join(self.data_dir, "kpi_thuc_xuat.csv")) for _, row in kpi_thuc_xuat_df.iterrows(): self.cursor.execute(""" INSERT IGNORE INTO kpi_thuc_xuat (date, employee_name, kpi_code, region, product_name, kpi_score) VALUES (%s, %s, %s, %s, %s, %s) """, (row['Date'], row['Employee_Name'], row['KPI_code'], row['region'], row['product_name'], row['Số KPI'])) print("Successfully created kpi_thuc_xuat table") return True except Error as e: print(f"Error creating kpi_thuc_xuat table: {e}") return False def create_tables(self): """Create all necessary tables and insert data into them""" if not self.connect(): return try: if self.create_salein_class(): print("Salein Class table created and data inserted.") if self.create_salein_thuc_xuat(): print("Salein Thuc Xuat table created and data inserted.") if self.create_kpi_thuc_xuat(): print("KPI Thuc Xuat table created and data inserted.") self.connection.commit() print("All tables created and data inserted successfully.") except Error as e: print(f"Error in database creation or data insertion: {e}") finally: self.close() def extract_tables_schemas(self): """Extract schemas of all tables in the database""" if not self.connect(): return {} try: # Get list of tables self.cursor.execute(""" SELECT table_name FROM information_schema.tables WHERE table_schema = %s """, (os.getenv('MYSQL_DB'),)) tables = self.cursor.fetchall() schemas = {} for (table_name,) in tables: # Get columns info for each table self.cursor.execute(""" SELECT column_name, column_type, is_nullable, column_key, extra, column_comment FROM information_schema.columns WHERE table_schema = %s AND table_name = %s ORDER BY ordinal_position """, (os.getenv('MYSQL_DB'), table_name)) columns = self.cursor.fetchall() # Get foreign keys info self.cursor.execute(""" SELECT column_name, referenced_table_name, referenced_column_name FROM information_schema.key_column_usage WHERE table_schema = %s AND table_name = %s AND referenced_table_name IS NOT NULL """, (os.getenv('MYSQL_DB'), table_name)) foreign_keys = self.cursor.fetchall() # Format schema information schema = { 'columns': [ { 'name': col[0], 'type': col[1], 'nullable': col[2], 'key': col[3], 'extra': col[4], 'comment': col[5] } for col in columns ], 'foreign_keys': [ { 'column': fk[0], 'references_table': fk[1], 'references_column': fk[2] } for fk in foreign_keys ] } schemas[table_name] = schema return schemas except Error as e: print(f"Error extracting table schemas: {e}") return {} finally: self.close() def get_distinct_values(self, table:str, column: str) -> list: """ Get distinct values of a column in a table. Args: table (str): Name of the table. column (str): Column name to retrieve distinct values from. Returns: list: A list of unique values found in the specified column. """ self.cursor.execute(f"SELECT DISTINCT {column} FROM {table}") return [row[0] for row in self.cursor.fetchall()] def get_total(self, table: str, value_col: str) -> float: """ Get the total sum of a numeric column in a table. Args: table (str): Name of the table. value_col (str): Name of the column to sum. Returns: float: The total sum of the specified column. """ self.cursor.execute(f"SELECT SUM({value_col}) FROM {table}") return self.cursor.fetchone()[0] def get_count(self, table: str) -> int: """ Count total number of records in a table. Args: table (str): Name of the table. Returns: int: Total number of rows in the table. """ self.cursor.execute(f"SELECT COUNT(*) FROM {table}") return self.cursor.fetchone()[0] def get_total_by_group(self, table: str, group_col: str, value_col: str) -> list: """ Get total value grouped by a specified column. Args: table (str): Name of the table. group_col (str): Column to group by (e.g. 'region', 'department'). value_col (str): Column to aggregate using SUM. Returns: list[tuple]: List of (group_value, total) pairs sorted by total descending. e.g: [("North", 5000), ("South", 3000)] """ self.cursor.execute(f""" SELECT {group_col}, SUM({value_col}) AS total FROM {table} GROUP BY {group_col} ORDER BY total DESC """) return self.cursor.fetchall() def get_avg_by_group(self, table: str, group_col: str, value_col: str) -> list: """ Get average value grouped by a specified column. Args: table (str): Name of the table. group_col (str): Column to group by (e.g. 'product_name'). value_col (str): Column to aggregate using AVG. Returns: list[tuple]: List of (group_value, average) pairs sorted by average descending. e.g: [("Product A", 105.4), ("Product B", 89.2)] """ self.cursor.execute(f""" SELECT {group_col}, AVG({value_col}) AS avg_value FROM {table} GROUP BY {group_col} ORDER BY avg_value DESC """) return self.cursor.fetchall() def get_total_by_month(self, table: str, date_col: str, value_col: str) -> list: """ Get total value grouped by month (from a date column). Args: table (str): Name of the table. date_col (str): Column containing date values. value_col (str): Column to aggregate using SUM. Returns: list[tuple]: List of (month, total) pairs in chronological order. e.g: [("2024-01", 1200), ("2024-02", 1800)] """ self.cursor.execute(f""" SELECT DATE_FORMAT({date_col}, '%Y-%m') AS month, SUM({value_col}) AS total FROM {table} GROUP BY month ORDER BY month """) return self.cursor.fetchall() def get_entity_trend(self, table: str, entity_col: str, entity_value: str, date_col: str, value_col: str) -> list: """ Get monthly trend of total value for a specific entity (e.g. employee, product). Args: table (str): Name of the table. entity_col (str): Column to filter by (e.g. 'employee_name'). entity_value (str): Value of the entity to track. date_col (str): Date column to group by month. value_col (str): Column to sum. Returns: list[tuple]: List of (month, total) for the specified entity. e.g: [("2024-01", 300), ("2024-02", 500)] """ self.cursor.execute(f""" SELECT DATE_FORMAT({date_col}, '%Y-%m') AS month, SUM({value_col}) AS total FROM {table} WHERE {entity_col} = %s GROUP BY month ORDER BY month """, (entity_value,)) return self.cursor.fetchall() def compare_plan_vs_actual(self, plan_table: str, actual_table: str, match_col: str, value_col: str) -> list: """ Compare planned vs actual values for a common attribute (e.g. product_name). Args: plan_table (str): Table containing planned values. actual_table (str): Table containing actual values. match_col (str): Column used to join the two tables (e.g. 'product_name'). value_col (str): Column to compare (e.g. 'quantity'). Returns: list[tuple]: List of (item, planned, actual, difference) sorted by difference. e.g: [("Product A", 1000, 800, -200)] """ self.cursor.execute(f""" SELECT p.{match_col}, SUM(p.{value_col}) AS planned, SUM(a.{value_col}) AS actual, SUM(a.{value_col}) - SUM(p.{value_col}) AS difference FROM {plan_table} p JOIN {actual_table} a ON p.{match_col} = a.{match_col} GROUP BY p.{match_col} ORDER BY difference DESC """) return self.cursor.fetchall() def get_monthly_growth(self, table: str, date_col: str, value_col: str) -> list: """ Calculate monthly growth rate based on summed values. Args: table (str): Name of the table. date_col (str): Date column to group by month. value_col (str): Column to aggregate using SUM. Returns: list[tuple]: List of (month, total, previous_total, growth_rate) for each month. e.g: [("2024-02", 1200, 1000, 20.0)] """ self.cursor.execute(f""" SELECT month, total, LAG(total) OVER (ORDER BY month) AS prev_total, ROUND((total - LAG(total) OVER (ORDER BY month)) / LAG(total) OVER (ORDER BY month) * 100, 2) AS growth_rate FROM ( SELECT DATE_FORMAT({date_col}, '%Y-%m') AS month, SUM({value_col}) AS total FROM {table} GROUP BY month ) AS subquery """) return self.cursor.fetchall() def get_best_employees_by_score(self, kpi_table: str, salein_table: str, top_n: int = 5) -> List[Dict]: """ Calculate a composite score for employees based on average KPI and total quantity. Args: kpi_table (str): Name of the KPI table (columns: employee_name, kpi_score). salein_table (str): Name of the actual sales table (columns: employee_name, quantity). top_n (int): Number of top employees to return. Returns: list[dict]: List of employees with their average KPI, total quantity, and composite score. Example: [{"employee_name": "A", "avg_kpi": 85.5, "quantity": 1200, "score": 91.3}] """ # Join both tables on employee_name, then calculate weighted score self.cursor.execute(f""" SELECT k.employee_name, AVG(k.kpi_score) AS avg_kpi, SUM(s.quantity) AS total_quantity, ROUND(0.6 * AVG(k.kpi_score) + 0.4 * SUM(s.quantity), 2) AS composite_score FROM {kpi_table} k JOIN {salein_table} s ON k.employee_name = s.employee_name GROUP BY k.employee_name ORDER BY composite_score DESC LIMIT %s """, (top_n,)) return [ { "employee_name": row[0], "avg_kpi": float(row[1]), "quantity": float(row[2]), "score": float(row[3]) } for row in self.cursor.fetchall() ] def get_best_products_by_region(self, table: str, top_n: int = 5) -> List[Dict]: """ Find the most prominent products across regions based on quantity and regional coverage. Args: table (str): Table name (columns: product_name, province, quantity). top_n (int): Number of top products to return. Returns: list[dict]: List of products with number of provinces, total quantity, and score. Example: [{"product_name": "X", "provinces": 8, "quantity": 3200, "score": 1604.0}] """ # Count distinct provinces and sum quantity, then compute composite score self.cursor.execute(f""" SELECT product_name, COUNT(DISTINCT province) AS province_coverage, SUM(quantity) AS total_quantity, ROUND(0.5 * COUNT(DISTINCT province) + 0.5 * SUM(quantity), 2) AS composite_score FROM {table} GROUP BY product_name ORDER BY composite_score DESC LIMIT %s """, (top_n,)) return [ { "product_name": row[0], "provinces": int(row[1]), "quantity": float(row[2]), "score": float(row[3]) } for row in self.cursor.fetchall() ] def get_deliver_by_region_per_month(self, regions: List[str], department: str, year: int) -> List[dict]: """ Get total deliver value by region per month for a specific department and year. """ placeholders = ','.join(['%s'] * len(regions)) query = f""" SELECT Organization, DATE_FORMAT(Date, '%%Y-%%m') AS Month, SUM(Deliver) AS Total_Deliver FROM kpi_thuc_xuat WHERE YEAR(Date) = %s AND Department = %s AND Organization IN ({placeholders}) GROUP BY Organization, Month ORDER BY Organization, Month """ params = [year, department] + regions self.cursor.execute(query, params) rows = self.cursor.fetchall() return [{"organization": r[0], "month": r[1], "total_deliver": float(r[2])} for r in rows] def get_plan_vs_actual_same_day(self, date: str, department: str, regions: List[str]) -> List[dict]: """ Compare planned vs actual values for a given date, department, and regions. """ placeholders = ','.join(['%s'] * len(regions)) self.cursor.execute(f""" SELECT Organization, SUM(Deliver) FROM kpi_thuc_xuat WHERE Date = %s AND Department = %s AND Organization IN ({placeholders}) GROUP BY Organization """, [date, department] + regions) actual = {r[0]: float(r[1]) for r in self.cursor.fetchall()} self.cursor.execute(f""" SELECT Organization, SUM(SaleIn) FROM salein_class WHERE Date = %s AND Department = %s AND Organization IN ({placeholders}) GROUP BY Organization """, [date, department] + regions) plan = {r[0]: float(r[1]) for r in self.cursor.fetchall()} all_keys = set(actual) | set(plan) return [{ "organization": org, "planned": plan.get(org, 0), "actual": actual.get(org, 0), "difference": actual.get(org, 0) - plan.get(org, 0) } for org in all_keys] def get_completion_rate_by_department_per_month(self, year: int) -> List[dict]: """ Get monthly completion rate (actual / planned) per department. """ self.cursor.execute(f""" SELECT s.Department, DATE_FORMAT(s.Date, '%%Y-%%m') AS Month, ROUND(SUM(t.SaleIn) / SUM(s.SaleIn) * 100, 2) AS Completion_Rate FROM salein_class s JOIN salein_thuc_xuat t ON s.Department = t.Department AND DATE(s.Date) = DATE(t.Date) WHERE YEAR(s.Date) = %s GROUP BY s.Department, Month """, (year,)) rows = self.cursor.fetchall() return [{"department": r[0], "month": r[1], "completion_rate": float(r[2])} for r in rows] def get_avg_kpi_by_month(self, year: int) -> List[dict]: """ Get average KPI score per month for a given year. """ self.cursor.execute(f""" SELECT DATE_FORMAT(Date, '%%Y-%%m') AS Month, ROUND(AVG(kpi_score), 2) FROM kpi_thuc_xuat WHERE YEAR(Date) = %s GROUP BY Month """, (year,)) return [{"month": r[0], "avg_kpi": float(r[1])} for r in self.cursor.fetchall()] def get_salein_comparison_by_region_year(self, years: List[int]) -> List[dict]: """ Compare sale-in performance by region across multiple years. """ placeholders = ','.join(['%s'] * len(years)) self.cursor.execute(f""" SELECT Organization, YEAR(Date), SUM(SaleIn) FROM salein_thuc_xuat WHERE YEAR(Date) IN ({placeholders}) GROUP BY Organization, YEAR(Date) ORDER BY Organization, YEAR(Date) """, years) return [{"organization": r[0], "year": r[1], "total_salein": float(r[2])} for r in self.cursor.fetchall()] def main(): db = Database() db.create_tables() try: db.connect() print("\n📌 DISTINCT VALUES IN COLUMN:") regions = db.get_distinct_values("kpi_thuc_xuat", "region") print("Regions:", regions) print("\n📌 TOTAL KPI:") total_kpi = db.get_total("kpi_thuc_xuat", "kpi_score") print("Total KPI Score:", total_kpi) print("\n📌 RECORD COUNT:") row_count = db.get_count("salein_thuc_xuat") print("Rows in salein_thuc_xuat:", row_count) print("\n📌 TOTAL BY GROUP:") quantity_by_province = db.get_total_by_group("salein_thuc_xuat", "province", "quantity") print("Quantity by province:", quantity_by_province) print("\n📌 AVERAGE KPI BY REGION:") avg_kpi = db.get_avg_by_group("kpi_thuc_xuat", "region", "kpi_score") print("Avg KPI per region:", avg_kpi) print("\n📌 TOTAL BY MONTH:") total_monthly = db.get_total_by_month("salein_thuc_xuat", "date", "quantity") print("Monthly totals:", total_monthly) print("\n📌 EMPLOYEE TREND (test name in your dataset):") trend = db.get_entity_trend("salein_thuc_xuat", "employee_name", "Bảo Thế Nguyễn", "date", "quantity") print("Trend for 'Bảo Thế Nguyễn':", trend) print("\n📌 COMPARE PLAN VS ACTUAL:") comparison = db.compare_plan_vs_actual("salein_class", "salein_thuc_xuat", "product_name", "quantity") print("Plan vs Actual:", comparison) print("\n📌 MONTHLY GROWTH:") growth = db.get_monthly_growth("salein_thuc_xuat", "date", "quantity") print("Monthly Growth:", growth) print("\n📌 TOP EMPLOYEES BY KPI + QUANTITY:") top_employees = db.get_best_employees_by_score("kpi_thuc_xuat", "salein_thuc_xuat", top_n=5) print(json.dumps(top_employees, indent=4, ensure_ascii=False)) print("\n📌 TOP PRODUCTS BY REGION + QUANTITY:") top_products = db.get_best_products_by_region("salein_thuc_xuat", top_n=5) print(json.dumps(top_products, indent=4, ensure_ascii=False)) print("\n📌 DELIVERY BY REGION PER MONTH (BH1 - 2024):") delivery_stats = db.get_deliver_by_region_per_month(["TV01", "TV02", "TV03"], "BH1", 2024) print(json.dumps(delivery_stats, indent=4, ensure_ascii=False)) print("\n📌 PLAN VS ACTUAL SAME DAY (1/2/2024):") plan_vs_actual = db.get_plan_vs_actual_same_day("2024-02-01", "BH1", ["TV01", "TV02", "TV03"]) print(json.dumps(plan_vs_actual, indent=4, ensure_ascii=False)) print("\n📌 COMPLETION RATE PER DEPARTMENT (2024):") completion_rate = db.get_completion_rate_by_department_per_month(2024) print(json.dumps(completion_rate, indent=4, ensure_ascii=False)) print("\n📌 AVG KPI PER MONTH (2024):") avg_kpi_month = db.get_avg_kpi_by_month(2024) print(json.dumps(avg_kpi_month, indent=4, ensure_ascii=False)) print("\n📌 COMPARE SALEIN BY REGION BY YEAR:") salein_years = db.get_salein_comparison_by_region_year([2023, 2024]) print(json.dumps(salein_years, indent=4, ensure_ascii=False)) except Error as e: print(f"❌ MySQL Error: {e}") finally: db.close() if __name__ == "__main__": main()