File size: 4,204 Bytes
e062359 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 | """
This file contains code from https://github.com/pydantic/pydantic/blob/main/pydantic/v1/datetime_parse.py
without the Pydantic v1 specific errors.
"""
from __future__ import annotations
import re
from typing import Dict, Union, Optional
from datetime import date, datetime, timezone, timedelta
from .._types import StrBytesIntFloat
date_expr = r"(?P<year>\d{4})-(?P<month>\d{1,2})-(?P<day>\d{1,2})"
time_expr = (
r"(?P<hour>\d{1,2}):(?P<minute>\d{1,2})"
r"(?::(?P<second>\d{1,2})(?:\.(?P<microsecond>\d{1,6})\d{0,6})?)?"
r"(?P<tzinfo>Z|[+-]\d{2}(?::?\d{2})?)?$"
)
date_re = re.compile(f"{date_expr}$")
datetime_re = re.compile(f"{date_expr}[T ]{time_expr}")
EPOCH = datetime(1970, 1, 1)
# if greater than this, the number is in ms, if less than or equal it's in seconds
# (in seconds this is 11th October 2603, in ms it's 20th August 1970)
MS_WATERSHED = int(2e10)
# slightly more than datetime.max in ns - (datetime.max - EPOCH).total_seconds() * 1e9
MAX_NUMBER = int(3e20)
def _get_numeric(value: StrBytesIntFloat, native_expected_type: str) -> Union[None, int, float]:
if isinstance(value, (int, float)):
return value
try:
return float(value)
except ValueError:
return None
except TypeError:
raise TypeError(f"invalid type; expected {native_expected_type}, string, bytes, int or float") from None
def _from_unix_seconds(seconds: Union[int, float]) -> datetime:
if seconds > MAX_NUMBER:
return datetime.max
elif seconds < -MAX_NUMBER:
return datetime.min
while abs(seconds) > MS_WATERSHED:
seconds /= 1000
dt = EPOCH + timedelta(seconds=seconds)
return dt.replace(tzinfo=timezone.utc)
def _parse_timezone(value: Optional[str]) -> Union[None, int, timezone]:
if value == "Z":
return timezone.utc
elif value is not None:
offset_mins = int(value[-2:]) if len(value) > 3 else 0
offset = 60 * int(value[1:3]) + offset_mins
if value[0] == "-":
offset = -offset
return timezone(timedelta(minutes=offset))
else:
return None
def parse_datetime(value: Union[datetime, StrBytesIntFloat]) -> datetime:
"""
Parse a datetime/int/float/string and return a datetime.datetime.
This function supports time zone offsets. When the input contains one,
the output uses a timezone with a fixed offset from UTC.
Raise ValueError if the input is well formatted but not a valid datetime.
Raise ValueError if the input isn't well formatted.
"""
if isinstance(value, datetime):
return value
number = _get_numeric(value, "datetime")
if number is not None:
return _from_unix_seconds(number)
if isinstance(value, bytes):
value = value.decode()
assert not isinstance(value, (float, int))
match = datetime_re.match(value)
if match is None:
raise ValueError("invalid datetime format")
kw = match.groupdict()
if kw["microsecond"]:
kw["microsecond"] = kw["microsecond"].ljust(6, "0")
tzinfo = _parse_timezone(kw.pop("tzinfo"))
kw_: Dict[str, Union[None, int, timezone]] = {k: int(v) for k, v in kw.items() if v is not None}
kw_["tzinfo"] = tzinfo
return datetime(**kw_) # type: ignore
def parse_date(value: Union[date, StrBytesIntFloat]) -> date:
"""
Parse a date/int/float/string and return a datetime.date.
Raise ValueError if the input is well formatted but not a valid date.
Raise ValueError if the input isn't well formatted.
"""
if isinstance(value, date):
if isinstance(value, datetime):
return value.date()
else:
return value
number = _get_numeric(value, "date")
if number is not None:
return _from_unix_seconds(number).date()
if isinstance(value, bytes):
value = value.decode()
assert not isinstance(value, (float, int))
match = date_re.match(value)
if match is None:
raise ValueError("invalid date format")
kw = {k: int(v) for k, v in match.groupdict().items()}
try:
return date(**kw)
except ValueError:
raise ValueError("invalid date format") from None
|