| from pymongo.mongo_client import MongoClient |
| from pymongo.server_api import ServerApi |
| from datetime import datetime, timedelta |
| import json |
|
|
| uri = "mongodb+srv://transyltoonia:meradatabase@transyltoonia.xdrsx1s.mongodb.net/?retryWrites=true&w=majority&appName=transyltoonia" |
|
|
| class DatabaseManager: |
| def __init__(self, uri=uri, db_name='FlipkartGrid_DB'): |
| """ |
| Comprehensive Database Management System |
| Combines basic and advanced database operations |
| |
| """ |
| |
| client = MongoClient(uri, server_api=ServerApi('1')) |
|
|
| try: |
| client.admin.command('ping') |
| print("Pinged your deployment. You successfully connected to MongoDB!") |
| except Exception as e: |
| print("An error occurred while connecting to MongoDB: ", e) |
| try: |
| |
| self.client = MongoClient(uri) |
| self.db = self.client[db_name] |
| |
| |
| self.brand_collection = self.db['brand_recognition'] |
| self.ocr_collection = self.db['ocr'] |
| self.freshness_collection = self.db['freshness'] |
| |
| |
| self._create_indexes() |
| except Exception as e: |
| print(f"Database connection error: {e}") |
| raise |
|
|
| def _create_indexes(self): |
| """ |
| Create performance and unique indexes |
| """ |
| self.brand_collection.create_index([("brand", 1)], unique=True) |
| self.ocr_collection.create_index([("brand", 1), ("expiry_date", 1)]) |
| self.freshness_collection.create_index([("produce", 1)]) |
|
|
| |
| from datetime import datetime |
|
|
| def add_brand_record(self, brand, count): |
| """ |
| Add or update a brand record with validation. |
| If the brand exists, increment its count. Otherwise, add a new record. |
| """ |
| try: |
| |
| if not brand or not isinstance(brand, str): |
| return "Invalid brand name" |
| if not isinstance(count, (int, float)) or count < 0: |
| return "Invalid count value" |
|
|
| |
| brand = brand.strip().title() |
|
|
| |
| existing_record = self.brand_collection.find_one({"brand": brand}) |
|
|
| if existing_record: |
| |
| new_count = existing_record["count"] + count |
| self.brand_collection.update_one( |
| {"brand": brand}, |
| {"$set": {"count": new_count, "last_updated": datetime.now()}} |
| ) |
| return f"Updated brand record: Brand = {brand}, New Count = {new_count}" |
| else: |
| |
| record = { |
| "S.No": self.brand_collection.count_documents({}) + 1, |
| "timestamp": datetime.now(), |
| "brand": brand, |
| "count": count, |
| "last_updated": datetime.now() |
| } |
| self.brand_collection.insert_one(record) |
| return f"Added brand record: Brand = {brand}, Count = {count}" |
| except Exception as e: |
| return f"Error adding brand record: {e}" |
|
|
|
|
| def get_brand_records(self, filter_criteria=None, sort_by='timestamp', ascending=False): |
| """ |
| Retrieve brand records with flexible filtering and sorting |
| """ |
| try: |
| filter_criteria = filter_criteria or {} |
| sort_direction = 1 if ascending else -1 |
| |
| records = self.brand_collection.find(filter_criteria).sort(sort_by, sort_direction) |
| |
| return [ |
| { |
| "S.No": record['S.No'], |
| "Brand": record['brand'], |
| "Count": record['count'], |
| "Timestamp": record['timestamp'] |
| } for record in records |
| ] |
| except Exception as e: |
| return f"Error retrieving brand records: {e}" |
|
|
| def add_ocr_record(self, brand, expiry_date, manufacture_date, mrp): |
| """ |
| Add OCR record with comprehensive validation |
| """ |
| try: |
| |
| def validate_date(date_str): |
| try: |
| return datetime.strptime(date_str, "%Y-%m-%d") |
| except ValueError: |
| return None |
|
|
| exp_date = validate_date(expiry_date) |
| man_date = validate_date(manufacture_date) |
|
|
| if not exp_date or not man_date: |
| return "Invalid date format. Use YYYY-MM-DD" |
|
|
| |
| if exp_date <= man_date: |
| return "Expiry date must be after manufacture date" |
|
|
| |
| try: |
| mrp = float(mrp) |
| if mrp <= 0: |
| return "MRP must be a positive number" |
| except ValueError: |
| return "Invalid MRP value" |
|
|
| record = { |
| "S.No": self.ocr_collection.count_documents({}) + 1, |
| "timestamp": datetime.now(), |
| "brand": brand.strip().title(), |
| "expiry_date": exp_date, |
| "manufacture_date": man_date, |
| "mrp": mrp, |
| "days_to_expiry": (exp_date - datetime.now().date()).days |
| } |
| self.ocr_collection.insert_one(record) |
| return f"Added OCR record: Brand = {brand}, Expiry Date = {expiry_date}" |
| except Exception as e: |
| return f"Error adding OCR record: {e}" |
| |
| |
| def get_ocr_records(self, filter_criteria=None, sort_by='timestamp', ascending=False): |
| """ |
| Retrieve OCR records with flexible filtering and sorting |
| """ |
| try: |
| filter_criteria = filter_criteria or {} |
| sort_direction = 1 if ascending else -1 |
| |
| records = self.ocr_collection.find(filter_criteria).sort(sort_by, sort_direction) |
| |
| return [ |
| { |
| "S.No": record['S.No'], |
| "Brand": record['brand'], |
| "Expiry Date": record['expiry_date'], |
| "Manufacture Date": record['manufacture_date'], |
| "MRP": record['mrp'] |
| } for record in records |
| ] |
| except Exception as e: |
| return f"Error retrieving OCR records: {e}" |
|
|
| |
| def add_freshness_record(self, produce, shelf_life, characteristics, eatable): |
| """ |
| Add freshness record with enhanced validation |
| """ |
| try: |
| |
| if not produce or not isinstance(produce, str): |
| return "Invalid produce name" |
| |
| |
| if isinstance(eatable, str): |
| eatable = eatable.lower() in ['true', 'yes', '1'] |
| |
| record = { |
| "S.No": self.freshness_collection.count_documents({}) + 1, |
| "timestamp": datetime.now(), |
| "produce": produce.strip().title(), |
| "Shelf-Life": shelf_life, |
| "Characteristics": characteristics, |
| "Eatable": bool(eatable) |
| } |
| self.freshness_collection.insert_one(record) |
| return f"Added freshness record: Produce = {produce}, Shelf-Life = {shelf_life}" |
| except Exception as e: |
| return f"Error adding freshness record: {e}" |
|
|
| |
| def analyze_brand_trends(self, time_period=30): |
| """ |
| Analyze brand trends over a specified time period |
| """ |
| try: |
| cutoff_date = datetime.now() - timedelta(days=time_period) |
| |
| pipeline = [ |
| {"$match": {"timestamp": {"$gte": cutoff_date}}}, |
| {"$group": { |
| "_id": "$brand", |
| "total_count": {"$sum": "$count"}, |
| "avg_count": {"$avg": "$count"}, |
| "first_seen": {"$min": "$timestamp"}, |
| "last_seen": {"$max": "$timestamp"} |
| }}, |
| {"$sort": {"total_count": -1}} |
| ] |
| |
| trends = list(self.brand_collection.aggregate(pipeline)) |
| |
| |
| for trend in trends: |
| trend['brand'] = trend.pop('_id') |
| trend['first_seen'] = trend['first_seen'].strftime('%Y-%m-%d %H:%M:%S') |
| trend['last_seen'] = trend['last_seen'].strftime('%Y-%m-%d %H:%M:%S') |
| trend['total_count'] = round(trend['total_count'], 2) |
| trend['avg_count'] = round(trend['avg_count'], 2) |
| |
| return trends |
| except Exception as e: |
| return f"Error analyzing brand trends: {e}" |
|
|
| |
| def get_all_brand_records(self): |
| """ |
| Retrieve all brand records |
| """ |
| try: |
| records = self.brand_collection.find() |
| result = [ |
| f"S.No: {record['S.No']}, Brand: {record['brand']}, Count: {record['count']}" |
| for record in records |
| ] |
| return "\n".join(result) if result else "No brand records found." |
| except Exception as e: |
| return f"An error occurred while fetching brand records: {e}" |
|
|
| def get_all_ocr_records(self): |
| """ |
| Retrieve all OCR records |
| """ |
| try: |
| records = self.ocr_collection.find() |
| result = [ |
| f"S.No: {record['S.No']}, Brand: {record['brand']}, Expiry Date: {record['expiry_date']}, " |
| f"Manufacture Date: {record['manufacture_date']}, MRP: {record['mrp']}" |
| for record in records |
| ] |
| return "\n".join(result) if result else "No OCR records found." |
| except Exception as e: |
| return f"An error occurred while fetching OCR records: {e}" |
|
|
| def get_all_freshness_records(self): |
| """ |
| Retrieve all freshness records |
| """ |
| try: |
| records = self.freshness_collection.find() |
| result = [ |
| f"S.No: {record['S.No']}, Produce: {record['produce']}, Shelf-Life: {record['Shelf-Life']}, " |
| f"Characteristics: {record['Characteristics']}, Eatable: {record['Eatable']}" |
| for record in records |
| ] |
| return "\n".join(result) if result else "No freshness records found." |
| except Exception as e: |
| return f"An error occurred while fetching freshness records: {e}" |
| |
| def get_freshness_records(self, filter_criteria=None, sort_by='timestamp', ascending=False): |
| """ |
| Retrieve freshness records with flexible filtering and sorting |
| """ |
| try: |
| filter_criteria = filter_criteria or {} |
| sort_direction = 1 if ascending else -1 |
| |
| records = self.freshness_collection.find(filter_criteria).sort(sort_by, sort_direction) |
| |
| return [ |
| { |
| "S.No": record['S.No'], |
| "Produce": record['produce'], |
| "Shelf-Life": record['Shelf-Life'], |
| "Characteristics": record['Characteristics'], |
| "Eatable": record['Eatable'] |
| } for record in records |
| ] |
| except Exception as e: |
| return f"Error retrieving freshness records: {e}" |
| |
| |
|
|
| |
| def advanced_search(self, collection_name, search_criteria=None, projection=None): |
| """ |
| Advanced search method with flexible filtering |
| """ |
| try: |
| collection_map = { |
| 'brand': self.brand_collection, |
| 'ocr': self.ocr_collection, |
| 'freshness': self.freshness_collection |
| } |
| |
| if collection_name not in collection_map: |
| return "Invalid collection name" |
| |
| search_criteria = search_criteria or {} |
| projection = projection or {} |
| |
| results = list(collection_map[collection_name].find(search_criteria, projection)) |
| |
| |
| for result in results: |
| if '_id' in result: |
| result['_id'] = str(result['_id']) |
| |
| return results |
| except Exception as e: |
| return f"Error performing advanced search: {e}" |
|
|
| |
| def export_collection_to_json(self, collection_name, filename=None): |
| """ |
| Export a collection to a JSON file |
| """ |
| try: |
| collection_map = { |
| 'brand': self.brand_collection, |
| 'ocr': self.ocr_collection, |
| 'freshness': self.freshness_collection |
| } |
| |
| if collection_name not in collection_map: |
| return "Invalid collection name" |
| |
| documents = list(collection_map[collection_name].find()) |
| |
| for doc in documents: |
| doc['_id'] = str(doc['_id']) |
| |
| if not filename: |
| filename = f"{collection_name}_export_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" |
| |
| with open(filename, 'w') as f: |
| json.dump(documents, f, indent=2, default=str) |
| |
| return f"Successfully exported {len(documents)} documents to {filename}" |
| except Exception as e: |
| return f"Error exporting collection: {e}" |
|
|
| def close_connection(self): |
| """ |
| Properly close the MongoDB connection |
| """ |
| try: |
| self.client.close() |
| print("MongoDB connection closed successfully") |
| except Exception as e: |
| print(f"Error closing connection: {e}") |
|
|
| |
| def main(): |
| try: |
| |
| db_manager = DatabaseManager() |
|
|
| |
| print(db_manager.add_brand_record("Nike", 150)) |
| print(db_manager.add_ocr_record("Adidas", "2024-12-31", "2023-06-01", 75.50)) |
| print(db_manager.add_freshness_record("Apple", "14 days", "Red, crisp", True)) |
|
|
| |
| print("\nBrand Trends:") |
| print(db_manager.analyze_brand_trends()) |
|
|
| |
| print("\nAdvanced Search:") |
| search_results = db_manager.advanced_search('brand', |
| search_criteria={'count': {'$gt': 100}}, |
| projection={'brand': 1, 'count': 1} |
| ) |
| print(json.dumps(search_results, indent=2)) |
|
|
| except Exception as e: |
| print(f"An error occurred: {e}") |
| finally: |
| db_manager.close_connection() |
| |
| |
|
|
| if __name__ == "__main__": |
| main() |