275 lines
9.8 KiB
Python
275 lines
9.8 KiB
Python
import ast
|
|
import dataclasses
|
|
import re
|
|
from datetime import datetime, timedelta, timezone
|
|
from decimal import Decimal
|
|
from enum import Enum
|
|
from typing import Any, Callable, Generator, Iterable, List, Protocol, Tuple
|
|
|
|
import dateutil.parser
|
|
|
|
from .schemas import (
|
|
CandleInterval,
|
|
HistoricCandle,
|
|
MoneyValue,
|
|
Quotation,
|
|
SubscriptionInterval,
|
|
)
|
|
|
|
__all__ = (
|
|
"get_intervals",
|
|
"quotation_to_decimal",
|
|
"money_to_decimal",
|
|
"decimal_to_quotation",
|
|
"decimal_to_money",
|
|
"candle_interval_to_subscription_interval",
|
|
"now",
|
|
"candle_interval_to_timedelta",
|
|
"ceil_datetime",
|
|
"floor_datetime",
|
|
"dataclass_from_dict",
|
|
"round_datetime_range",
|
|
"empty_or_uuid",
|
|
)
|
|
|
|
DAYS_IN_YEAR = 365
|
|
|
|
|
|
MAX_INTERVALS = {
|
|
CandleInterval.CANDLE_INTERVAL_5_SEC: timedelta(minutes=200),
|
|
CandleInterval.CANDLE_INTERVAL_10_SEC: timedelta(minutes=200),
|
|
CandleInterval.CANDLE_INTERVAL_30_SEC: timedelta(hours=20),
|
|
CandleInterval.CANDLE_INTERVAL_1_MIN: timedelta(days=1),
|
|
CandleInterval.CANDLE_INTERVAL_2_MIN: timedelta(days=1),
|
|
CandleInterval.CANDLE_INTERVAL_3_MIN: timedelta(days=1),
|
|
CandleInterval.CANDLE_INTERVAL_5_MIN: timedelta(days=1),
|
|
CandleInterval.CANDLE_INTERVAL_10_MIN: timedelta(days=1),
|
|
CandleInterval.CANDLE_INTERVAL_15_MIN: timedelta(days=1),
|
|
CandleInterval.CANDLE_INTERVAL_30_MIN: timedelta(days=1),
|
|
CandleInterval.CANDLE_INTERVAL_HOUR: timedelta(weeks=1),
|
|
CandleInterval.CANDLE_INTERVAL_2_HOUR: timedelta(weeks=1),
|
|
CandleInterval.CANDLE_INTERVAL_4_HOUR: timedelta(weeks=1),
|
|
CandleInterval.CANDLE_INTERVAL_DAY: timedelta(days=DAYS_IN_YEAR),
|
|
CandleInterval.CANDLE_INTERVAL_WEEK: timedelta(days=DAYS_IN_YEAR),
|
|
CandleInterval.CANDLE_INTERVAL_MONTH: timedelta(days=DAYS_IN_YEAR * 3),
|
|
}
|
|
|
|
INTERVAL_LENGTHS = {
|
|
CandleInterval.CANDLE_INTERVAL_5_SEC: timedelta(seconds=5),
|
|
CandleInterval.CANDLE_INTERVAL_10_SEC: timedelta(seconds=10),
|
|
CandleInterval.CANDLE_INTERVAL_30_SEC: timedelta(seconds=30),
|
|
CandleInterval.CANDLE_INTERVAL_1_MIN: timedelta(minutes=1),
|
|
CandleInterval.CANDLE_INTERVAL_2_MIN: timedelta(minutes=2),
|
|
CandleInterval.CANDLE_INTERVAL_3_MIN: timedelta(minutes=3),
|
|
CandleInterval.CANDLE_INTERVAL_5_MIN: timedelta(minutes=5),
|
|
CandleInterval.CANDLE_INTERVAL_10_MIN: timedelta(minutes=10),
|
|
CandleInterval.CANDLE_INTERVAL_15_MIN: timedelta(minutes=15),
|
|
CandleInterval.CANDLE_INTERVAL_30_MIN: timedelta(minutes=30),
|
|
CandleInterval.CANDLE_INTERVAL_HOUR: timedelta(hours=1),
|
|
CandleInterval.CANDLE_INTERVAL_2_HOUR: timedelta(hours=2),
|
|
CandleInterval.CANDLE_INTERVAL_4_HOUR: timedelta(hours=4),
|
|
CandleInterval.CANDLE_INTERVAL_DAY: timedelta(days=1),
|
|
CandleInterval.CANDLE_INTERVAL_WEEK: timedelta(days=7),
|
|
CandleInterval.CANDLE_INTERVAL_MONTH: timedelta(days=30),
|
|
}
|
|
|
|
|
|
def get_intervals(
|
|
interval: CandleInterval, from_: datetime, to: datetime
|
|
) -> Generator[Tuple[datetime, datetime], None, None]:
|
|
max_interval = MAX_INTERVALS[interval]
|
|
interval_length = INTERVAL_LENGTHS[interval]
|
|
local_from = from_
|
|
while local_from <= to:
|
|
yield local_from, min(local_from + max_interval, to)
|
|
local_from += max_interval + interval_length
|
|
|
|
|
|
def quotation_to_decimal(quotation: Quotation) -> Decimal:
|
|
return money_to_decimal(quotation)
|
|
|
|
|
|
def decimal_to_quotation(decimal: Decimal) -> Quotation:
|
|
fractional = decimal % 1
|
|
return Quotation(units=int(decimal // 1), nano=int(fractional * Decimal("10e8")))
|
|
|
|
|
|
def quotation_to_money(quotation: Quotation, currency: str) -> MoneyValue:
|
|
return MoneyValue(units=quotation.units, nano=quotation.nano, currency=currency)
|
|
|
|
|
|
def decimal_to_money(decimal: Decimal, currency: str) -> MoneyValue:
|
|
quotation = decimal_to_quotation(decimal)
|
|
return quotation_to_money(quotation, currency)
|
|
|
|
|
|
class MoneyProtocol(Protocol):
|
|
units: int
|
|
nano: int
|
|
|
|
|
|
def money_to_decimal(money: MoneyProtocol) -> Decimal:
|
|
fractional = money.nano / Decimal("10e8")
|
|
return Decimal(money.units) + fractional
|
|
|
|
|
|
# fmt: off
|
|
_CANDLE_INTERVAL_TO_SUBSCRIPTION_INTERVAL_MAPPING = {
|
|
CandleInterval.CANDLE_INTERVAL_1_MIN:
|
|
SubscriptionInterval.SUBSCRIPTION_INTERVAL_ONE_MINUTE,
|
|
CandleInterval.CANDLE_INTERVAL_5_MIN:
|
|
SubscriptionInterval.SUBSCRIPTION_INTERVAL_FIVE_MINUTES,
|
|
CandleInterval.CANDLE_INTERVAL_UNSPECIFIED:
|
|
SubscriptionInterval.SUBSCRIPTION_INTERVAL_UNSPECIFIED,
|
|
}
|
|
# fmt: on
|
|
|
|
|
|
def candle_interval_to_subscription_interval(
|
|
candle_interval: CandleInterval,
|
|
) -> SubscriptionInterval:
|
|
return _CANDLE_INTERVAL_TO_SUBSCRIPTION_INTERVAL_MAPPING.get(
|
|
candle_interval, SubscriptionInterval.SUBSCRIPTION_INTERVAL_UNSPECIFIED
|
|
)
|
|
|
|
|
|
def now() -> datetime:
|
|
return datetime.utcnow().replace(tzinfo=timezone.utc)
|
|
|
|
|
|
_CANDLE_INTERVAL_TO_TIMEDELTA_MAPPING = {
|
|
CandleInterval.CANDLE_INTERVAL_5_SEC: timedelta(seconds=5),
|
|
CandleInterval.CANDLE_INTERVAL_10_SEC: timedelta(seconds=10),
|
|
CandleInterval.CANDLE_INTERVAL_30_SEC: timedelta(seconds=30),
|
|
CandleInterval.CANDLE_INTERVAL_1_MIN: timedelta(minutes=1),
|
|
CandleInterval.CANDLE_INTERVAL_2_MIN: timedelta(minutes=2),
|
|
CandleInterval.CANDLE_INTERVAL_3_MIN: timedelta(minutes=3),
|
|
CandleInterval.CANDLE_INTERVAL_5_MIN: timedelta(minutes=5),
|
|
CandleInterval.CANDLE_INTERVAL_10_MIN: timedelta(minutes=10),
|
|
CandleInterval.CANDLE_INTERVAL_15_MIN: timedelta(minutes=15),
|
|
CandleInterval.CANDLE_INTERVAL_30_MIN: timedelta(minutes=30),
|
|
CandleInterval.CANDLE_INTERVAL_HOUR: timedelta(hours=1),
|
|
CandleInterval.CANDLE_INTERVAL_2_HOUR: timedelta(hours=2),
|
|
CandleInterval.CANDLE_INTERVAL_4_HOUR: timedelta(hours=4),
|
|
CandleInterval.CANDLE_INTERVAL_DAY: timedelta(days=1),
|
|
CandleInterval.CANDLE_INTERVAL_WEEK: timedelta(weeks=1),
|
|
CandleInterval.CANDLE_INTERVAL_MONTH: timedelta(days=30),
|
|
CandleInterval.CANDLE_INTERVAL_UNSPECIFIED: timedelta(minutes=1),
|
|
}
|
|
|
|
|
|
def candle_interval_to_timedelta(candle_interval: CandleInterval) -> timedelta:
|
|
if delta := _CANDLE_INTERVAL_TO_TIMEDELTA_MAPPING.get(candle_interval):
|
|
return delta
|
|
raise ValueError(f"Cannot convert {candle_interval} to timedelta")
|
|
|
|
|
|
_DATETIME_MIN = datetime.min.replace(tzinfo=timezone.utc)
|
|
|
|
|
|
def ceil_datetime(datetime_: datetime, delta: timedelta):
|
|
return datetime_ + (_DATETIME_MIN - datetime_) % delta
|
|
|
|
|
|
def floor_datetime(datetime_: datetime, delta: timedelta):
|
|
return datetime_ - (datetime_ - _DATETIME_MIN) % delta
|
|
|
|
|
|
def dataclass_from_dict(klass, d):
|
|
if issubclass(int, klass):
|
|
return int(d)
|
|
if issubclass(bool, klass):
|
|
return bool(d)
|
|
if issubclass(klass, datetime):
|
|
return dateutil.parser.parse(d).replace(tzinfo=timezone.utc)
|
|
if issubclass(klass, Quotation):
|
|
d = ast.literal_eval(d)
|
|
if issubclass(klass, Enum):
|
|
return klass(int(d))
|
|
|
|
fieldtypes = {f.name: f.type for f in dataclasses.fields(klass)}
|
|
return klass(**{f: dataclass_from_dict(fieldtypes[f], d[f]) for f in d})
|
|
|
|
|
|
_datetime_range_replace_floor_by_interval = {
|
|
CandleInterval.CANDLE_INTERVAL_1_MIN: lambda r: r.replace(second=0, microsecond=0),
|
|
CandleInterval.CANDLE_INTERVAL_2_MIN: lambda r: r.replace(second=0, microsecond=0),
|
|
CandleInterval.CANDLE_INTERVAL_3_MIN: lambda r: r.replace(second=0, microsecond=0),
|
|
CandleInterval.CANDLE_INTERVAL_5_MIN: lambda r: r.replace(second=0, microsecond=0),
|
|
CandleInterval.CANDLE_INTERVAL_10_MIN: lambda r: r.replace(second=0, microsecond=0),
|
|
CandleInterval.CANDLE_INTERVAL_15_MIN: lambda r: r.replace(second=0, microsecond=0),
|
|
CandleInterval.CANDLE_INTERVAL_30_MIN: lambda r: r.replace(second=0, microsecond=0),
|
|
CandleInterval.CANDLE_INTERVAL_HOUR: lambda r: r.replace(
|
|
minute=0, second=0, microsecond=0
|
|
),
|
|
CandleInterval.CANDLE_INTERVAL_2_HOUR: lambda r: r.replace(
|
|
minute=0, second=0, microsecond=0
|
|
),
|
|
CandleInterval.CANDLE_INTERVAL_4_HOUR: lambda r: r.replace(
|
|
minute=0, second=0, microsecond=0
|
|
),
|
|
CandleInterval.CANDLE_INTERVAL_DAY: lambda r: r.replace(
|
|
hour=0, minute=0, second=0, microsecond=0
|
|
),
|
|
CandleInterval.CANDLE_INTERVAL_WEEK: lambda r: r.replace(
|
|
hour=0, minute=0, second=0, microsecond=0
|
|
),
|
|
CandleInterval.CANDLE_INTERVAL_MONTH: lambda r: r.replace(
|
|
hour=0, minute=0, second=0, microsecond=0
|
|
),
|
|
}
|
|
|
|
|
|
def round_datetime_range(
|
|
date_range: Tuple[datetime, datetime],
|
|
interval: CandleInterval,
|
|
) -> Tuple[datetime, datetime]:
|
|
"""Expand datetime range to nearest round range.
|
|
|
|
interval = CandleInterval.CANDLE_INTERVAL_DAY,
|
|
date_range = [
|
|
2023-09-11 21:39:04.988646+00:00
|
|
2023-09-14 21:39:04.988660+00:00
|
|
].
|
|
|
|
Returns
|
|
-------
|
|
[
|
|
2023-09-11 00:00:00+00:00
|
|
2023-09-15 00:00:00+00:00
|
|
].
|
|
|
|
"""
|
|
floor = _datetime_range_replace_floor_by_interval[interval]
|
|
start, end = date_range
|
|
start = floor(start)
|
|
interval_delta = candle_interval_to_timedelta(interval)
|
|
end = floor(end + interval_delta)
|
|
return start, end
|
|
|
|
|
|
def filter_distinct_candles(candles: List[HistoricCandle]) -> List[HistoricCandle]:
|
|
filtered = []
|
|
for candle1, candle2 in zip(candles, candles[1:]):
|
|
if candle2.time - candle1.time > timedelta():
|
|
filtered.append(candle1)
|
|
filtered.extend(candles[-1:])
|
|
return filtered
|
|
|
|
|
|
def with_filtering_distinct_candles(f: Callable[[Any], Iterable[HistoricCandle]]):
|
|
def _(*args: Any, **kwargs: Any) -> Iterable[HistoricCandle]:
|
|
yield from filter_distinct_candles(list(f(*args, **kwargs)))
|
|
|
|
return _
|
|
|
|
|
|
def empty_or_uuid(s: str) -> bool:
|
|
return (
|
|
s == ""
|
|
or re.fullmatch(
|
|
r"^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$", s
|
|
)
|
|
is not None
|
|
)
|