File size: 15,369 Bytes
a176aa6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
from pymongo.mongo_client import MongoClient
from pymongo.server_api import ServerApi
from datetime import datetime, timedelta
import json

uri = "mongodb+srv://transyltoonia:meradatabase@transyltoonia.xdrsx1s.mongodb.net/?retryWrites=true&w=majority&appName=transyltoonia"

class DatabaseManager:
    def __init__(self, uri=uri, db_name='FlipkartGrid_DB'):
        """
        Comprehensive Database Management System
        Combines basic and advanced database operations
        
        """
        
        client = MongoClient(uri, server_api=ServerApi('1'))

        try:
            client.admin.command('ping')
            print("Pinged your deployment. You successfully connected to MongoDB!")
        except Exception as e:
            print("An error occurred while connecting to MongoDB: ", e)
        try:
            # Database Connection
            self.client = MongoClient(uri)
            self.db = self.client[db_name]
            
            # Collections
            self.brand_collection = self.db['brand_recognition']
            self.ocr_collection = self.db['ocr']
            self.freshness_collection = self.db['freshness']
            
            # Create performance indexes
            self._create_indexes()
        except Exception as e:
            print(f"Database connection error: {e}")
            raise

    def _create_indexes(self):
        """
        Create performance and unique indexes
        """
        self.brand_collection.create_index([("brand", 1)], unique=True)
        self.ocr_collection.create_index([("brand", 1), ("expiry_date", 1)])
        self.freshness_collection.create_index([("produce", 1)])

    # Original Basic Methods
    from datetime import datetime

    def add_brand_record(self, brand, count):
        """
        Add or update a brand record with validation.
        If the brand exists, increment its count. Otherwise, add a new record.
        """
        try:
            # Input validation
            if not brand or not isinstance(brand, str):
                return "Invalid brand name"
            if not isinstance(count, (int, float)) or count < 0:
                return "Invalid count value"

            # Prepare the brand name
            brand = brand.strip().title()

            # Check if the brand already exists
            existing_record = self.brand_collection.find_one({"brand": brand})

            if existing_record:
                # Increment the count if the brand exists
                new_count = existing_record["count"] + count
                self.brand_collection.update_one(
                    {"brand": brand},
                    {"$set": {"count": new_count, "last_updated": datetime.now()}}
                )
                return f"Updated brand record: Brand = {brand}, New Count = {new_count}"
            else:
                # Add a new record if the brand does not exist
                record = {
                    "S.No": self.brand_collection.count_documents({}) + 1,
                    "timestamp": datetime.now(),
                    "brand": brand,
                    "count": count,
                    "last_updated": datetime.now()
                }
                self.brand_collection.insert_one(record)
                return f"Added brand record: Brand = {brand}, Count = {count}"
        except Exception as e:
            return f"Error adding brand record: {e}"


    def get_brand_records(self, filter_criteria=None, sort_by='timestamp', ascending=False):
        """
        Retrieve brand records with flexible filtering and sorting
        """
        try:
            filter_criteria = filter_criteria or {}
            sort_direction = 1 if ascending else -1
            
            records = self.brand_collection.find(filter_criteria).sort(sort_by, sort_direction)
            
            return [
                {
                    "S.No": record['S.No'],
                    "Brand": record['brand'],
                    "Count": record['count'],
                    "Timestamp": record['timestamp']
                } for record in records
            ]
        except Exception as e:
            return f"Error retrieving brand records: {e}"

    def add_ocr_record(self, brand, expiry_date, manufacture_date, mrp):
        """
        Add OCR record with comprehensive validation
        """
        try:
            # Date validation helper
            def validate_date(date_str):
                try:
                    return datetime.strptime(date_str, "%Y-%m-%d")
                except ValueError:
                    return None

            exp_date = validate_date(expiry_date)
            man_date = validate_date(manufacture_date)

            if not exp_date or not man_date:
                return "Invalid date format. Use YYYY-MM-DD"

            # Check expiry date is after manufacture date
            if exp_date <= man_date:
                return "Expiry date must be after manufacture date"

            # Validate MRP
            try:
                mrp = float(mrp)
                if mrp <= 0:
                    return "MRP must be a positive number"
            except ValueError:
                return "Invalid MRP value"

            record = {
                "S.No": self.ocr_collection.count_documents({}) + 1,
                "timestamp": datetime.now(),
                "brand": brand.strip().title(),
                "expiry_date": exp_date,
                "manufacture_date": man_date,
                "mrp": mrp,
                "days_to_expiry": (exp_date - datetime.now().date()).days
            }
            self.ocr_collection.insert_one(record)
            return f"Added OCR record: Brand = {brand}, Expiry Date = {expiry_date}"
        except Exception as e:
            return f"Error adding OCR record: {e}"
        
        
    def get_ocr_records(self, filter_criteria=None, sort_by='timestamp', ascending=False):
        """
        Retrieve OCR records with flexible filtering and sorting
        """
        try:
            filter_criteria = filter_criteria or {}
            sort_direction = 1 if ascending else -1
            
            records = self.ocr_collection.find(filter_criteria).sort(sort_by, sort_direction)
            
            return [
                {
                    "S.No": record['S.No'],
                    "Brand": record['brand'],
                    "Expiry Date": record['expiry_date'],
                    "Manufacture Date": record['manufacture_date'],
                    "MRP": record['mrp']
                } for record in records
            ]
        except Exception as e:
            return f"Error retrieving OCR records: {e}"

    
    def add_freshness_record(self, produce, shelf_life, characteristics, eatable):
        """
        Add freshness record with enhanced validation
        """
        try:
            # Input validations
            if not produce or not isinstance(produce, str):
                return "Invalid produce name"
            
            # Convert to boolean or validate eatable status
            if isinstance(eatable, str):
                eatable = eatable.lower() in ['true', 'yes', '1']
            
            record = {
                "S.No": self.freshness_collection.count_documents({}) + 1,
                "timestamp": datetime.now(),
                "produce": produce.strip().title(),
                "Shelf-Life": shelf_life,
                "Characteristics": characteristics,
                "Eatable": bool(eatable)
            }
            self.freshness_collection.insert_one(record)
            return f"Added freshness record: Produce = {produce}, Shelf-Life = {shelf_life}"
        except Exception as e:
            return f"Error adding freshness record: {e}"

    # Advanced Analysis Methods
    def analyze_brand_trends(self, time_period=30):
        """
        Analyze brand trends over a specified time period
        """
        try:
            cutoff_date = datetime.now() - timedelta(days=time_period)
            
            pipeline = [
                {"$match": {"timestamp": {"$gte": cutoff_date}}},
                {"$group": {
                    "_id": "$brand",
                    "total_count": {"$sum": "$count"},
                    "avg_count": {"$avg": "$count"},
                    "first_seen": {"$min": "$timestamp"},
                    "last_seen": {"$max": "$timestamp"}
                }},
                {"$sort": {"total_count": -1}}
            ]
            
            trends = list(self.brand_collection.aggregate(pipeline))
            
            # Enrich the results
            for trend in trends:
                trend['brand'] = trend.pop('_id')
                trend['first_seen'] = trend['first_seen'].strftime('%Y-%m-%d %H:%M:%S')
                trend['last_seen'] = trend['last_seen'].strftime('%Y-%m-%d %H:%M:%S')
                trend['total_count'] = round(trend['total_count'], 2)
                trend['avg_count'] = round(trend['avg_count'], 2)
            
            return trends
        except Exception as e:
            return f"Error analyzing brand trends: {e}"

    # Additional methods from previous implementations
    def get_all_brand_records(self):
        """
        Retrieve all brand records
        """
        try:
            records = self.brand_collection.find()
            result = [
                f"S.No: {record['S.No']}, Brand: {record['brand']}, Count: {record['count']}"
                for record in records
            ]
            return "\n".join(result) if result else "No brand records found."
        except Exception as e:
            return f"An error occurred while fetching brand records: {e}"

    def get_all_ocr_records(self):
        """
        Retrieve all OCR records
        """
        try:
            records = self.ocr_collection.find()
            result = [
                f"S.No: {record['S.No']}, Brand: {record['brand']}, Expiry Date: {record['expiry_date']}, "
                f"Manufacture Date: {record['manufacture_date']}, MRP: {record['mrp']}"
                for record in records
            ]
            return "\n".join(result) if result else "No OCR records found."
        except Exception as e:
            return f"An error occurred while fetching OCR records: {e}"

    def get_all_freshness_records(self):
        """
        Retrieve all freshness records
        """
        try:
            records = self.freshness_collection.find()
            result = [
                f"S.No: {record['S.No']}, Produce: {record['produce']}, Shelf-Life: {record['Shelf-Life']}, "
                f"Characteristics: {record['Characteristics']}, Eatable: {record['Eatable']}"
                for record in records
            ]
            return "\n".join(result) if result else "No freshness records found."
        except Exception as e:
            return f"An error occurred while fetching freshness records: {e}"
       
    def get_freshness_records(self, filter_criteria=None, sort_by='timestamp', ascending=False):
        """
        Retrieve freshness records with flexible filtering and sorting
        """
        try:
            filter_criteria = filter_criteria or {}
            sort_direction = 1 if ascending else -1
            
            records = self.freshness_collection.find(filter_criteria).sort(sort_by, sort_direction)
            
            return [
                {
                    "S.No": record['S.No'],
                    "Produce": record['produce'],
                    "Shelf-Life": record['Shelf-Life'],
                    "Characteristics": record['Characteristics'],
                    "Eatable": record['Eatable']
                } for record in records
            ]
        except Exception as e:
            return f"Error retrieving freshness records: {e}"
        
    

    # Advanced Search and Filtering
    def advanced_search(self, collection_name, search_criteria=None, projection=None):
        """
        Advanced search method with flexible filtering
        """
        try:
            collection_map = {
                'brand': self.brand_collection,
                'ocr': self.ocr_collection,
                'freshness': self.freshness_collection
            }
            
            if collection_name not in collection_map:
                return "Invalid collection name"
            
            search_criteria = search_criteria or {}
            projection = projection or {}
            
            results = list(collection_map[collection_name].find(search_criteria, projection))
            
            # Convert ObjectId to string for JSON serialization
            for result in results:
                if '_id' in result:
                    result['_id'] = str(result['_id'])
            
            return results
        except Exception as e:
            return f"Error performing advanced search: {e}"

    # More Advanced Methods (Export, Statistical Analysis, etc.)
    def export_collection_to_json(self, collection_name, filename=None):
        """
        Export a collection to a JSON file
        """
        try:
            collection_map = {
                'brand': self.brand_collection,
                'ocr': self.ocr_collection,
                'freshness': self.freshness_collection
            }
            
            if collection_name not in collection_map:
                return "Invalid collection name"
            
            documents = list(collection_map[collection_name].find())
            
            for doc in documents:
                doc['_id'] = str(doc['_id'])
            
            if not filename:
                filename = f"{collection_name}_export_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
            
            with open(filename, 'w') as f:
                json.dump(documents, f, indent=2, default=str)
            
            return f"Successfully exported {len(documents)} documents to {filename}"
        except Exception as e:
            return f"Error exporting collection: {e}"

    def close_connection(self):
        """
        Properly close the MongoDB connection
        """
        try:
            self.client.close()
            print("MongoDB connection closed successfully")
        except Exception as e:
            print(f"Error closing connection: {e}")

# Example usage
def main():
    try:
        # Initialize database manager
        db_manager = DatabaseManager()

        # Example of using various methods
        print(db_manager.add_brand_record("Nike", 150))
        print(db_manager.add_ocr_record("Adidas", "2024-12-31", "2023-06-01", 75.50))
        print(db_manager.add_freshness_record("Apple", "14 days", "Red, crisp", True))

        # Retrieve records
        print("\nBrand Trends:")
        print(db_manager.analyze_brand_trends())

        # Advanced search example
        print("\nAdvanced Search:")
        search_results = db_manager.advanced_search('brand', 
            search_criteria={'count': {'$gt': 100}},
            projection={'brand': 1, 'count': 1}
        )
        print(json.dumps(search_results, indent=2))

    except Exception as e:
        print(f"An error occurred: {e}")
    finally:
        db_manager.close_connection()
        
    

if __name__ == "__main__":
    main()