Spaces:
Sleeping
Sleeping
| import sqlite3 | |
| from typing import Optional, List, Dict, Any | |
| from sqlalchemy import create_engine, text | |
| from sqlalchemy.orm import sessionmaker | |
| from models import Base, Supplier, Customer, Product, Purchase, Sale, EntityExtraction | |
| class DatabaseManager: | |
| def __init__(self, db_path: str = "chatbot.db"): | |
| self.db_path = db_path | |
| self.engine = create_engine(f"sqlite:///{db_path}") | |
| Base.metadata.create_all(self.engine) | |
| Session = sessionmaker(bind=self.engine) | |
| self.session = Session() | |
| self._initialize_data() | |
| def _initialize_data(self): | |
| """Initialize database with sample data""" | |
| # Add default suppliers if they don't exist | |
| suppliers = ["TechMart", "Office Supplies Co", "Electronics Plus"] | |
| for supplier_name in suppliers: | |
| existing = self.session.query(Supplier).filter_by(name=supplier_name).first() | |
| if not existing: | |
| supplier = Supplier(name=supplier_name) | |
| self.session.add(supplier) | |
| # Add default products | |
| products = [ | |
| ("USB drives", "Electronics"), | |
| ("Office chairs", "Furniture"), | |
| ("Laptops", "Electronics"), | |
| ("Monitors", "Electronics"), | |
| ("Keyboards", "Electronics") | |
| ] | |
| for product_name, category in products: | |
| existing = self.session.query(Product).filter_by(name=product_name).first() | |
| if not existing: | |
| product = Product(name=product_name, category=category) | |
| self.session.add(product) | |
| self.session.commit() | |
| def process_transaction(self, entities: EntityExtraction): | |
| """Process a transaction based on extracted entities""" | |
| try: | |
| if entities.transaction_type == "purchase": | |
| return self._process_purchase(entities) | |
| elif entities.transaction_type == "sale": | |
| return self._process_sale(entities) | |
| else: | |
| return None, "Could not determine transaction type" | |
| except Exception as e: | |
| self.session.rollback() | |
| return None, f"Error processing transaction: {str(e)}" | |
| def _process_purchase(self, entities: EntityExtraction) -> str: | |
| """Process a purchase transaction""" | |
| # Get or create supplier | |
| supplier = None | |
| if entities.supplier: | |
| supplier = self.session.query(Supplier).filter_by(name=entities.supplier).first() | |
| if not supplier: | |
| supplier = Supplier(name=entities.supplier) | |
| self.session.add(supplier) | |
| self.session.flush() | |
| # Get or create product | |
| product = None | |
| if entities.product: | |
| product = self.session.query(Product).filter_by(name=entities.product).first() | |
| if not product: | |
| product = Product(name=entities.product) | |
| self.session.add(product) | |
| self.session.flush() | |
| # Create purchase record | |
| purchase = Purchase( | |
| supplier_id=supplier.id if supplier else None, | |
| product_id=product.id if product else None, | |
| quantity=entities.quantity or 1, | |
| unit_price=entities.unit_price or 0, | |
| total_cost=entities.total_amount or (entities.quantity or 1) * (entities.unit_price or 0), | |
| notes=entities.notes | |
| ) | |
| self.session.add(purchase) | |
| self.session.commit() | |
| return purchase.id, f"Purchase recorded: {entities.quantity or 1}x {entities.product or 'Unknown'} from {entities.supplier or 'Unknown'} for €{entities.total_amount or 0}" | |
| def _process_sale(self, entities: EntityExtraction) -> str: | |
| """Process a sale transaction""" | |
| # Get or create customer | |
| customer = None | |
| if entities.customer: | |
| customer = self.session.query(Customer).filter_by(name=entities.customer).first() | |
| if not customer: | |
| customer = Customer(name=entities.customer) | |
| self.session.add(customer) | |
| self.session.flush() | |
| # Get or create product | |
| product = None | |
| if entities.product: | |
| product = self.session.query(Product).filter_by(name=entities.product).first() | |
| if not product: | |
| product = Product(name=entities.product) | |
| self.session.add(product) | |
| self.session.flush() | |
| # Create sale record | |
| sale = Sale( | |
| customer_id=customer.id if customer else None, | |
| product_id=product.id if product else None, | |
| quantity=entities.quantity or 1, | |
| unit_price=entities.unit_price or 0, | |
| total_amount=entities.total_amount or (entities.quantity or 1) * (entities.unit_price or 0), | |
| notes=entities.notes | |
| ) | |
| self.session.add(sale) | |
| self.session.commit() | |
| return sale.id, f"Sale recorded: {entities.quantity or 1}x {entities.product or 'Unknown'} to {entities.customer or 'Unknown'} for €{entities.total_amount or 0}" | |
| def query_data(self, query: str) -> List[Dict[str, Any]]: | |
| """Execute a query and return results""" | |
| try: | |
| result = self.session.execute(text(query)) | |
| columns = result.keys() | |
| rows = result.fetchall() | |
| return [dict(zip(columns, row)) for row in rows] | |
| except Exception as e: | |
| return [{"error": str(e)}] | |
| def get_recent_transactions(self, limit: int = 10) -> Dict[str, List[Dict]]: | |
| """Get recent purchases and sales""" | |
| purchases = self.session.query(Purchase).order_by(Purchase.purchase_date.desc()).limit(limit).all() | |
| sales = self.session.query(Sale).order_by(Sale.sale_date.desc()).limit(limit).all() | |
| purchase_data = [] | |
| for p in purchases: | |
| purchase_data.append({ | |
| "id": p.id, | |
| "supplier": p.supplier.name if p.supplier else "Unknown", | |
| "product": p.product.name if p.product else "Unknown", | |
| "quantity": p.quantity, | |
| "unit_price": float(p.unit_price), | |
| "total_cost": float(p.total_cost), | |
| "date": p.purchase_date.isoformat(), | |
| "type": "purchase" | |
| }) | |
| sale_data = [] | |
| for s in sales: | |
| sale_data.append({ | |
| "id": s.id, | |
| "customer": s.customer.name if s.customer else "Unknown", | |
| "product": s.product.name if s.product else "Unknown", | |
| "quantity": s.quantity, | |
| "unit_price": float(s.unit_price), | |
| "total_amount": float(s.total_amount), | |
| "date": s.sale_date.isoformat(), | |
| "type": "sale" | |
| }) | |
| return {"purchases": purchase_data, "sales": sale_data} | |
| def search_transactions(self, search_term: str) -> List[Dict[str, Any]]: | |
| """Search transactions by supplier, customer, or product""" | |
| results = [] | |
| # Search purchases | |
| purchases = self.session.query(Purchase).join(Supplier, Purchase.supplier_id == Supplier.id, isouter=True)\ | |
| .join(Product, Purchase.product_id == Product.id, isouter=True)\ | |
| .filter( | |
| (Supplier.name.contains(search_term)) | | |
| (Product.name.contains(search_term)) | | |
| (Purchase.notes.contains(search_term)) | |
| ).all() | |
| for p in purchases: | |
| results.append({ | |
| "id": p.id, | |
| "type": "purchase", | |
| "supplier": p.supplier.name if p.supplier else "Unknown", | |
| "product": p.product.name if p.product else "Unknown", | |
| "quantity": p.quantity, | |
| "unit_price": float(p.unit_price), | |
| "total": float(p.total_cost), | |
| "date": p.purchase_date.isoformat() | |
| }) | |
| # Search sales | |
| sales = self.session.query(Sale).join(Customer, Sale.customer_id == Customer.id, isouter=True)\ | |
| .join(Product, Sale.product_id == Product.id, isouter=True)\ | |
| .filter( | |
| (Customer.name.contains(search_term)) | | |
| (Product.name.contains(search_term)) | | |
| (Sale.notes.contains(search_term)) | |
| ).all() | |
| for s in sales: | |
| results.append({ | |
| "id": s.id, | |
| "type": "sale", | |
| "customer": s.customer.name if s.customer else "Unknown", | |
| "product": s.product.name if s.product else "Unknown", | |
| "quantity": s.quantity, | |
| "unit_price": float(s.unit_price), | |
| "total": float(s.total_amount), | |
| "date": s.sale_date.isoformat() | |
| }) | |
| return sorted(results, key=lambda x: x["date"], reverse=True) | |
| def get_transaction_by_id(self, transaction_id: int, transaction_type: str) -> Optional[Dict[str, Any]]: | |
| """Retrieve a specific transaction by ID and type""" | |
| try: | |
| if transaction_type == "purchase": | |
| transaction = self.session.query(Purchase).filter_by(id=transaction_id).first() | |
| if transaction: | |
| return { | |
| "id": transaction.id, | |
| "type": "purchase", | |
| "supplier_id": transaction.supplier_id, | |
| "product_id": transaction.product_id, | |
| "quantity": transaction.quantity, | |
| "unit_price": transaction.unit_price, | |
| "total_cost": transaction.total_cost, | |
| "purchase_date": transaction.purchase_date.isoformat() if transaction.purchase_date else None, | |
| "notes": transaction.notes | |
| } | |
| elif transaction_type == "sale": | |
| transaction = self.session.query(Sale).filter_by(id=transaction_id).first() | |
| if transaction: | |
| return { | |
| "id": transaction.id, | |
| "type": "sale", | |
| "customer_id": transaction.customer_id, | |
| "product_id": transaction.product_id, | |
| "quantity": transaction.quantity, | |
| "unit_price": transaction.unit_price, | |
| "total_amount": transaction.total_amount, | |
| "sale_date": transaction.sale_date.isoformat() if transaction.sale_date else None, | |
| "notes": transaction.notes | |
| } | |
| return None | |
| except Exception as e: | |
| print(f"Error retrieving transaction by ID: {e}") | |
| return None | |
| def close(self): | |
| """Close database connection""" | |
| self.session.close() |