|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| from datetime import datetime
|
|
|
| import peewee
|
|
|
| from api.db.db_models import DB
|
| from api.utils import datetime_format, current_timestamp, get_uuid
|
|
|
|
|
| class CommonService:
|
| model = None
|
|
|
| @classmethod
|
| @DB.connection_context()
|
| def query(cls, cols=None, reverse=None, order_by=None, **kwargs):
|
| return cls.model.query(cols=cols, reverse=reverse,
|
| order_by=order_by, **kwargs)
|
|
|
| @classmethod
|
| @DB.connection_context()
|
| def get_all(cls, cols=None, reverse=None, order_by=None):
|
| if cols:
|
| query_records = cls.model.select(*cols)
|
| else:
|
| query_records = cls.model.select()
|
| if reverse is not None:
|
| if not order_by or not hasattr(cls, order_by):
|
| order_by = "create_time"
|
| if reverse is True:
|
| query_records = query_records.order_by(
|
| cls.model.getter_by(order_by).desc())
|
| elif reverse is False:
|
| query_records = query_records.order_by(
|
| cls.model.getter_by(order_by).asc())
|
| return query_records
|
|
|
| @classmethod
|
| @DB.connection_context()
|
| def get(cls, **kwargs):
|
| return cls.model.get(**kwargs)
|
|
|
| @classmethod
|
| @DB.connection_context()
|
| def get_or_none(cls, **kwargs):
|
| try:
|
| return cls.model.get(**kwargs)
|
| except peewee.DoesNotExist:
|
| return None
|
|
|
| @classmethod
|
| @DB.connection_context()
|
| def save(cls, **kwargs):
|
|
|
|
|
| sample_obj = cls.model(**kwargs).save(force_insert=True)
|
| return sample_obj
|
|
|
| @classmethod
|
| @DB.connection_context()
|
| def insert(cls, **kwargs):
|
| if "id" not in kwargs:
|
| kwargs["id"] = get_uuid()
|
| kwargs["create_time"] = current_timestamp()
|
| kwargs["create_date"] = datetime_format(datetime.now())
|
| kwargs["update_time"] = current_timestamp()
|
| kwargs["update_date"] = datetime_format(datetime.now())
|
| sample_obj = cls.model(**kwargs).save(force_insert=True)
|
| return sample_obj
|
|
|
| @classmethod
|
| @DB.connection_context()
|
| def insert_many(cls, data_list, batch_size=100):
|
| with DB.atomic():
|
| for d in data_list:
|
| d["create_time"] = current_timestamp()
|
| d["create_date"] = datetime_format(datetime.now())
|
| for i in range(0, len(data_list), batch_size):
|
| cls.model.insert_many(data_list[i:i + batch_size]).execute()
|
|
|
| @classmethod
|
| @DB.connection_context()
|
| def update_many_by_id(cls, data_list):
|
| with DB.atomic():
|
| for data in data_list:
|
| data["update_time"] = current_timestamp()
|
| data["update_date"] = datetime_format(datetime.now())
|
| cls.model.update(data).where(
|
| cls.model.id == data["id"]).execute()
|
|
|
| @classmethod
|
| @DB.connection_context()
|
| def update_by_id(cls, pid, data):
|
| data["update_time"] = current_timestamp()
|
| data["update_date"] = datetime_format(datetime.now())
|
| num = cls.model.update(data).where(cls.model.id == pid).execute()
|
| return num
|
|
|
| @classmethod
|
| @DB.connection_context()
|
| def get_by_id(cls, pid):
|
| try:
|
| obj = cls.model.query(id=pid)[0]
|
| return True, obj
|
| except Exception as e:
|
| return False, None
|
|
|
| @classmethod
|
| @DB.connection_context()
|
| def get_by_ids(cls, pids, cols=None):
|
| if cols:
|
| objs = cls.model.select(*cols)
|
| else:
|
| objs = cls.model.select()
|
| return objs.where(cls.model.id.in_(pids))
|
|
|
| @classmethod
|
| @DB.connection_context()
|
| def delete_by_id(cls, pid):
|
| return cls.model.delete().where(cls.model.id == pid).execute()
|
|
|
| @classmethod
|
| @DB.connection_context()
|
| def filter_delete(cls, filters):
|
| with DB.atomic():
|
| num = cls.model.delete().where(*filters).execute()
|
| return num
|
|
|
| @classmethod
|
| @DB.connection_context()
|
| def filter_update(cls, filters, update_data):
|
| with DB.atomic():
|
| return cls.model.update(update_data).where(*filters).execute()
|
|
|
| @staticmethod
|
| def cut_list(tar_list, n):
|
| length = len(tar_list)
|
| arr = range(length)
|
| result = [tuple(tar_list[x:(x + n)]) for x in arr[::n]]
|
| return result
|
|
|
| @classmethod
|
| @DB.connection_context()
|
| def filter_scope_list(cls, in_key, in_filters_list,
|
| filters=None, cols=None):
|
| in_filters_tuple_list = cls.cut_list(in_filters_list, 20)
|
| if not filters:
|
| filters = []
|
| res_list = []
|
| if cols:
|
| for i in in_filters_tuple_list:
|
| query_records = cls.model.select(
|
| *
|
| cols).where(
|
| getattr(
|
| cls.model,
|
| in_key).in_(i),
|
| *
|
| filters)
|
| if query_records:
|
| res_list.extend(
|
| [query_record for query_record in query_records])
|
| else:
|
| for i in in_filters_tuple_list:
|
| query_records = cls.model.select().where(
|
| getattr(cls.model, in_key).in_(i), *filters)
|
| if query_records:
|
| res_list.extend(
|
| [query_record for query_record in query_records])
|
| return res_list
|
|
|