eu-scrapper / app /core /database.py
brestok's picture
init
b60402f
"""
Database utilities for ClipboardHealthAI application.
This module provides database-related utilities, including:
- Custom Pydantic types for MongoDB ObjectID handling
- Base model classes for MongoDB document models with serialization support
- Background task for periodically refreshing Snowflake database connections
"""
from datetime import datetime
from enum import Enum
from typing import Any, Dict, Type
from bson import ObjectId
from pydantic import AnyUrl, BaseModel, Field, GetCoreSchemaHandler
from pydantic.json_schema import JsonSchemaValue
from pydantic_core import core_schema
class PyObjectId:
"""
Custom type for handling MongoDB ObjectId in Pydantic models.
This class provides validation and serialization for MongoDB's ObjectId,
making it compatible with Pydantic model validation and JSON serialization.
"""
@classmethod
def __get_pydantic_core_schema__(
cls, _source: type, _handler: GetCoreSchemaHandler
) -> core_schema.CoreSchema:
"""
Define the core schema for Pydantic validation.
Args:
source: The source type
handler: Schema handler instance
Returns:
CoreSchema: The schema for validation
"""
return core_schema.with_info_after_validator_function(
cls.validate, core_schema.str_schema() # type: ignore
)
@classmethod
def __get_pydantic_json_schema__(
cls, _schema: core_schema.CoreSchema, _handler: GetCoreSchemaHandler
) -> JsonSchemaValue:
"""
Define the JSON schema representation.
Args:
schema: The core schema
handler: Schema handler instance
Returns:
JsonSchemaValue: The JSON schema representation
"""
return {"type": "string"}
@classmethod
def validate(cls, value: str) -> ObjectId:
"""
Validate and convert a string to MongoDB ObjectId.
Args:
value: String representation of ObjectId
Returns:
ObjectId: MongoDB ObjectId instance
Raises:
ValueError: If the value is not a valid ObjectId
"""
if not ObjectId.is_valid(value):
raise ValueError(f"Invalid ObjectId: {value}")
return ObjectId(value)
def __getattr__(self, item):
"""
Delegate attribute access to the wrapped ObjectId.
Args:
item: The attribute name
Returns:
The attribute value from the wrapped ObjectId
"""
return getattr(self.__dict__["value"], item)
def __init__(self, value: str | None = None):
"""
Initialize with a string value or create a new ObjectId.
Args:
value: Optional string representation of ObjectId
"""
if value is None:
self.value = ObjectId()
else:
self.value = self.validate(value)
def __str__(self):
"""
Convert to string representation.
Returns:
str: String representation of the ObjectId
"""
return str(self.value)
class MongoBaseModel(BaseModel):
"""
Base model for MongoDB documents with serialization support.
This class extends Pydantic's BaseModel to provide MongoDB-specific
serialization/deserialization and handling of special types.
"""
id: str = Field(default_factory=lambda: str(PyObjectId()))
class Config: # pylint: disable=R0903
"""
Configuration for the model.
"""
arbitrary_types_allowed = True
def to_mongo(self) -> Dict[str, Any]:
"""
Convert the model instance to a MongoDB-compatible dictionary.
Handles special types like nested models, enums, datetimes, and URLs.
Returns:
Dict[str, Any]: A dictionary suitable for MongoDB storage
"""
def model_to_dict(model: BaseModel) -> Dict[str, Any]:
doc = {}
for name, value in model._iter(): # pylint: disable=W0212
key = model.__fields__[name].alias or name
if isinstance(value, BaseModel):
doc[key] = model_to_dict(value)
elif isinstance(value, list) and all(isinstance(i, BaseModel) for i in value):
doc[key] = [model_to_dict(item) for item in value] # type: ignore
elif value and isinstance(value, Enum):
doc[key] = value.value
elif isinstance(value, datetime):
doc[key] = value.isoformat() # type: ignore
elif value and isinstance(value, AnyUrl):
doc[key] = str(value) # type: ignore
else:
doc[key] = value
return doc
result = model_to_dict(self)
return result
@classmethod
def from_mongo(cls, data: Dict[str, Any]):
"""
Create a model instance from MongoDB document data.
Handles special types conversion, particularly for enum values.
Args:
data: Dictionary containing MongoDB document data
Returns:
MongoBaseModel: An instance of the model class
"""
def restore_enums(inst: Any, model_cls: Type[BaseModel]) -> None:
for name, field in model_cls.__fields__.items(): # type: ignore
value = getattr(inst, name)
if (
field
and isinstance(field.annotation, type)
and issubclass(field.annotation, Enum)
):
setattr(inst, name, field.annotation(value))
elif isinstance(value, BaseModel):
restore_enums(value, value.__class__)
elif isinstance(value, list):
for i, item in enumerate(value):
if isinstance(item, BaseModel):
restore_enums(item, item.__class__)
elif isinstance(field.annotation, type) and issubclass(
field.annotation, Enum
):
value[i] = field.annotation(item)
elif isinstance(value, dict):
for k, v in value.items():
if isinstance(v, BaseModel):
restore_enums(v, v.__class__)
elif isinstance(field.annotation, type) and issubclass(
field.annotation, Enum
):
value[k] = field.annotation(v)
if data is None:
return None
instance = cls(**data)
restore_enums(instance, instance.__class__)
return instance