Business_Chatbot / src /database_manager.py
Ancastal's picture
Upload folder using huggingface_hub
401b16c verified
import sqlite3
from typing import Optional, List, Dict, Any
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
from models import Base, Supplier, Customer, Product, Purchase, Sale, EntityExtraction
class DatabaseManager:
def __init__(self, db_path: str = "chatbot.db"):
self.db_path = db_path
self.engine = create_engine(f"sqlite:///{db_path}")
Base.metadata.create_all(self.engine)
Session = sessionmaker(bind=self.engine)
self.session = Session()
self._initialize_data()
def _initialize_data(self):
"""Initialize database with sample data"""
# Add default suppliers if they don't exist
suppliers = ["TechMart", "Office Supplies Co", "Electronics Plus"]
for supplier_name in suppliers:
existing = self.session.query(Supplier).filter_by(name=supplier_name).first()
if not existing:
supplier = Supplier(name=supplier_name)
self.session.add(supplier)
# Add default products
products = [
("USB drives", "Electronics"),
("Office chairs", "Furniture"),
("Laptops", "Electronics"),
("Monitors", "Electronics"),
("Keyboards", "Electronics")
]
for product_name, category in products:
existing = self.session.query(Product).filter_by(name=product_name).first()
if not existing:
product = Product(name=product_name, category=category)
self.session.add(product)
self.session.commit()
def process_transaction(self, entities: EntityExtraction):
"""Process a transaction based on extracted entities"""
try:
if entities.transaction_type == "purchase":
return self._process_purchase(entities)
elif entities.transaction_type == "sale":
return self._process_sale(entities)
else:
return None, "Could not determine transaction type"
except Exception as e:
self.session.rollback()
return None, f"Error processing transaction: {str(e)}"
def _process_purchase(self, entities: EntityExtraction) -> str:
"""Process a purchase transaction"""
# Get or create supplier
supplier = None
if entities.supplier:
supplier = self.session.query(Supplier).filter_by(name=entities.supplier).first()
if not supplier:
supplier = Supplier(name=entities.supplier)
self.session.add(supplier)
self.session.flush()
# Get or create product
product = None
if entities.product:
product = self.session.query(Product).filter_by(name=entities.product).first()
if not product:
product = Product(name=entities.product)
self.session.add(product)
self.session.flush()
# Create purchase record
purchase = Purchase(
supplier_id=supplier.id if supplier else None,
product_id=product.id if product else None,
quantity=entities.quantity or 1,
unit_price=entities.unit_price or 0,
total_cost=entities.total_amount or (entities.quantity or 1) * (entities.unit_price or 0),
notes=entities.notes
)
self.session.add(purchase)
self.session.commit()
return purchase.id, f"Purchase recorded: {entities.quantity or 1}x {entities.product or 'Unknown'} from {entities.supplier or 'Unknown'} for €{entities.total_amount or 0}"
def _process_sale(self, entities: EntityExtraction) -> str:
"""Process a sale transaction"""
# Get or create customer
customer = None
if entities.customer:
customer = self.session.query(Customer).filter_by(name=entities.customer).first()
if not customer:
customer = Customer(name=entities.customer)
self.session.add(customer)
self.session.flush()
# Get or create product
product = None
if entities.product:
product = self.session.query(Product).filter_by(name=entities.product).first()
if not product:
product = Product(name=entities.product)
self.session.add(product)
self.session.flush()
# Create sale record
sale = Sale(
customer_id=customer.id if customer else None,
product_id=product.id if product else None,
quantity=entities.quantity or 1,
unit_price=entities.unit_price or 0,
total_amount=entities.total_amount or (entities.quantity or 1) * (entities.unit_price or 0),
notes=entities.notes
)
self.session.add(sale)
self.session.commit()
return sale.id, f"Sale recorded: {entities.quantity or 1}x {entities.product or 'Unknown'} to {entities.customer or 'Unknown'} for €{entities.total_amount or 0}"
def query_data(self, query: str) -> List[Dict[str, Any]]:
"""Execute a query and return results"""
try:
result = self.session.execute(text(query))
columns = result.keys()
rows = result.fetchall()
return [dict(zip(columns, row)) for row in rows]
except Exception as e:
return [{"error": str(e)}]
def get_recent_transactions(self, limit: int = 10) -> Dict[str, List[Dict]]:
"""Get recent purchases and sales"""
purchases = self.session.query(Purchase).order_by(Purchase.purchase_date.desc()).limit(limit).all()
sales = self.session.query(Sale).order_by(Sale.sale_date.desc()).limit(limit).all()
purchase_data = []
for p in purchases:
purchase_data.append({
"id": p.id,
"supplier": p.supplier.name if p.supplier else "Unknown",
"product": p.product.name if p.product else "Unknown",
"quantity": p.quantity,
"unit_price": float(p.unit_price),
"total_cost": float(p.total_cost),
"date": p.purchase_date.isoformat(),
"type": "purchase"
})
sale_data = []
for s in sales:
sale_data.append({
"id": s.id,
"customer": s.customer.name if s.customer else "Unknown",
"product": s.product.name if s.product else "Unknown",
"quantity": s.quantity,
"unit_price": float(s.unit_price),
"total_amount": float(s.total_amount),
"date": s.sale_date.isoformat(),
"type": "sale"
})
return {"purchases": purchase_data, "sales": sale_data}
def search_transactions(self, search_term: str) -> List[Dict[str, Any]]:
"""Search transactions by supplier, customer, or product"""
results = []
# Search purchases
purchases = self.session.query(Purchase).join(Supplier, Purchase.supplier_id == Supplier.id, isouter=True)\
.join(Product, Purchase.product_id == Product.id, isouter=True)\
.filter(
(Supplier.name.contains(search_term)) |
(Product.name.contains(search_term)) |
(Purchase.notes.contains(search_term))
).all()
for p in purchases:
results.append({
"id": p.id,
"type": "purchase",
"supplier": p.supplier.name if p.supplier else "Unknown",
"product": p.product.name if p.product else "Unknown",
"quantity": p.quantity,
"unit_price": float(p.unit_price),
"total": float(p.total_cost),
"date": p.purchase_date.isoformat()
})
# Search sales
sales = self.session.query(Sale).join(Customer, Sale.customer_id == Customer.id, isouter=True)\
.join(Product, Sale.product_id == Product.id, isouter=True)\
.filter(
(Customer.name.contains(search_term)) |
(Product.name.contains(search_term)) |
(Sale.notes.contains(search_term))
).all()
for s in sales:
results.append({
"id": s.id,
"type": "sale",
"customer": s.customer.name if s.customer else "Unknown",
"product": s.product.name if s.product else "Unknown",
"quantity": s.quantity,
"unit_price": float(s.unit_price),
"total": float(s.total_amount),
"date": s.sale_date.isoformat()
})
return sorted(results, key=lambda x: x["date"], reverse=True)
def get_transaction_by_id(self, transaction_id: int, transaction_type: str) -> Optional[Dict[str, Any]]:
"""Retrieve a specific transaction by ID and type"""
try:
if transaction_type == "purchase":
transaction = self.session.query(Purchase).filter_by(id=transaction_id).first()
if transaction:
return {
"id": transaction.id,
"type": "purchase",
"supplier_id": transaction.supplier_id,
"product_id": transaction.product_id,
"quantity": transaction.quantity,
"unit_price": transaction.unit_price,
"total_cost": transaction.total_cost,
"purchase_date": transaction.purchase_date.isoformat() if transaction.purchase_date else None,
"notes": transaction.notes
}
elif transaction_type == "sale":
transaction = self.session.query(Sale).filter_by(id=transaction_id).first()
if transaction:
return {
"id": transaction.id,
"type": "sale",
"customer_id": transaction.customer_id,
"product_id": transaction.product_id,
"quantity": transaction.quantity,
"unit_price": transaction.unit_price,
"total_amount": transaction.total_amount,
"sale_date": transaction.sale_date.isoformat() if transaction.sale_date else None,
"notes": transaction.notes
}
return None
except Exception as e:
print(f"Error retrieving transaction by ID: {e}")
return None
def close(self):
"""Close database connection"""
self.session.close()