import sqlite3 from typing import Optional, List, Dict, Any from sqlalchemy import create_engine, text from sqlalchemy.orm import sessionmaker from models import Base, Supplier, Customer, Product, Purchase, Sale, EntityExtraction class DatabaseManager: def __init__(self, db_path: str = "chatbot.db"): self.db_path = db_path self.engine = create_engine(f"sqlite:///{db_path}") Base.metadata.create_all(self.engine) Session = sessionmaker(bind=self.engine) self.session = Session() self._initialize_data() def _initialize_data(self): """Initialize database with sample data""" # Add default suppliers if they don't exist suppliers = ["TechMart", "Office Supplies Co", "Electronics Plus"] for supplier_name in suppliers: existing = self.session.query(Supplier).filter_by(name=supplier_name).first() if not existing: supplier = Supplier(name=supplier_name) self.session.add(supplier) # Add default products products = [ ("USB drives", "Electronics"), ("Office chairs", "Furniture"), ("Laptops", "Electronics"), ("Monitors", "Electronics"), ("Keyboards", "Electronics") ] for product_name, category in products: existing = self.session.query(Product).filter_by(name=product_name).first() if not existing: product = Product(name=product_name, category=category) self.session.add(product) self.session.commit() def process_transaction(self, entities: EntityExtraction): """Process a transaction based on extracted entities""" try: if entities.transaction_type == "purchase": return self._process_purchase(entities) elif entities.transaction_type == "sale": return self._process_sale(entities) else: return None, "Could not determine transaction type" except Exception as e: self.session.rollback() return None, f"Error processing transaction: {str(e)}" def _process_purchase(self, entities: EntityExtraction) -> str: """Process a purchase transaction""" # Get or create supplier supplier = None if entities.supplier: supplier = self.session.query(Supplier).filter_by(name=entities.supplier).first() if not supplier: supplier = Supplier(name=entities.supplier) self.session.add(supplier) self.session.flush() # Get or create product product = None if entities.product: product = self.session.query(Product).filter_by(name=entities.product).first() if not product: product = Product(name=entities.product) self.session.add(product) self.session.flush() # Create purchase record purchase = Purchase( supplier_id=supplier.id if supplier else None, product_id=product.id if product else None, quantity=entities.quantity or 1, unit_price=entities.unit_price or 0, total_cost=entities.total_amount or (entities.quantity or 1) * (entities.unit_price or 0), notes=entities.notes ) self.session.add(purchase) self.session.commit() return purchase.id, f"Purchase recorded: {entities.quantity or 1}x {entities.product or 'Unknown'} from {entities.supplier or 'Unknown'} for €{entities.total_amount or 0}" def _process_sale(self, entities: EntityExtraction) -> str: """Process a sale transaction""" # Get or create customer customer = None if entities.customer: customer = self.session.query(Customer).filter_by(name=entities.customer).first() if not customer: customer = Customer(name=entities.customer) self.session.add(customer) self.session.flush() # Get or create product product = None if entities.product: product = self.session.query(Product).filter_by(name=entities.product).first() if not product: product = Product(name=entities.product) self.session.add(product) self.session.flush() # Create sale record sale = Sale( customer_id=customer.id if customer else None, product_id=product.id if product else None, quantity=entities.quantity or 1, unit_price=entities.unit_price or 0, total_amount=entities.total_amount or (entities.quantity or 1) * (entities.unit_price or 0), notes=entities.notes ) self.session.add(sale) self.session.commit() return sale.id, f"Sale recorded: {entities.quantity or 1}x {entities.product or 'Unknown'} to {entities.customer or 'Unknown'} for €{entities.total_amount or 0}" def query_data(self, query: str) -> List[Dict[str, Any]]: """Execute a query and return results""" try: result = self.session.execute(text(query)) columns = result.keys() rows = result.fetchall() return [dict(zip(columns, row)) for row in rows] except Exception as e: return [{"error": str(e)}] def get_recent_transactions(self, limit: int = 10) -> Dict[str, List[Dict]]: """Get recent purchases and sales""" purchases = self.session.query(Purchase).order_by(Purchase.purchase_date.desc()).limit(limit).all() sales = self.session.query(Sale).order_by(Sale.sale_date.desc()).limit(limit).all() purchase_data = [] for p in purchases: purchase_data.append({ "id": p.id, "supplier": p.supplier.name if p.supplier else "Unknown", "product": p.product.name if p.product else "Unknown", "quantity": p.quantity, "unit_price": float(p.unit_price), "total_cost": float(p.total_cost), "date": p.purchase_date.isoformat(), "type": "purchase" }) sale_data = [] for s in sales: sale_data.append({ "id": s.id, "customer": s.customer.name if s.customer else "Unknown", "product": s.product.name if s.product else "Unknown", "quantity": s.quantity, "unit_price": float(s.unit_price), "total_amount": float(s.total_amount), "date": s.sale_date.isoformat(), "type": "sale" }) return {"purchases": purchase_data, "sales": sale_data} def search_transactions(self, search_term: str) -> List[Dict[str, Any]]: """Search transactions by supplier, customer, or product""" results = [] # Search purchases purchases = self.session.query(Purchase).join(Supplier, Purchase.supplier_id == Supplier.id, isouter=True)\ .join(Product, Purchase.product_id == Product.id, isouter=True)\ .filter( (Supplier.name.contains(search_term)) | (Product.name.contains(search_term)) | (Purchase.notes.contains(search_term)) ).all() for p in purchases: results.append({ "id": p.id, "type": "purchase", "supplier": p.supplier.name if p.supplier else "Unknown", "product": p.product.name if p.product else "Unknown", "quantity": p.quantity, "unit_price": float(p.unit_price), "total": float(p.total_cost), "date": p.purchase_date.isoformat() }) # Search sales sales = self.session.query(Sale).join(Customer, Sale.customer_id == Customer.id, isouter=True)\ .join(Product, Sale.product_id == Product.id, isouter=True)\ .filter( (Customer.name.contains(search_term)) | (Product.name.contains(search_term)) | (Sale.notes.contains(search_term)) ).all() for s in sales: results.append({ "id": s.id, "type": "sale", "customer": s.customer.name if s.customer else "Unknown", "product": s.product.name if s.product else "Unknown", "quantity": s.quantity, "unit_price": float(s.unit_price), "total": float(s.total_amount), "date": s.sale_date.isoformat() }) return sorted(results, key=lambda x: x["date"], reverse=True) def get_transaction_by_id(self, transaction_id: int, transaction_type: str) -> Optional[Dict[str, Any]]: """Retrieve a specific transaction by ID and type""" try: if transaction_type == "purchase": transaction = self.session.query(Purchase).filter_by(id=transaction_id).first() if transaction: return { "id": transaction.id, "type": "purchase", "supplier_id": transaction.supplier_id, "product_id": transaction.product_id, "quantity": transaction.quantity, "unit_price": transaction.unit_price, "total_cost": transaction.total_cost, "purchase_date": transaction.purchase_date.isoformat() if transaction.purchase_date else None, "notes": transaction.notes } elif transaction_type == "sale": transaction = self.session.query(Sale).filter_by(id=transaction_id).first() if transaction: return { "id": transaction.id, "type": "sale", "customer_id": transaction.customer_id, "product_id": transaction.product_id, "quantity": transaction.quantity, "unit_price": transaction.unit_price, "total_amount": transaction.total_amount, "sale_date": transaction.sale_date.isoformat() if transaction.sale_date else None, "notes": transaction.notes } return None except Exception as e: print(f"Error retrieving transaction by ID: {e}") return None def close(self): """Close database connection""" self.session.close()