123 lines
4.2 KiB
Python
123 lines
4.2 KiB
Python
import datetime
|
|
import re
|
|
from functools import wraps
|
|
from typing import Any, Callable, Optional, cast
|
|
|
|
import sentry_sdk
|
|
from grpc import StatusCode
|
|
from sentry_sdk.integrations.excepthook import ExcepthookIntegration
|
|
from sentry_sdk.types import Event
|
|
|
|
from ._errors import TFunc
|
|
from .constants import ERROR_HUB_DSN, PACKAGE_NAME
|
|
from .exceptions import AioRequestError, RequestError
|
|
|
|
__all__ = (
|
|
"init_error_hub",
|
|
"async_init_error_hub",
|
|
"handle_error_hub_gen",
|
|
"handle_aio_error_hub_gen",
|
|
)
|
|
|
|
BEARER_PATTERN = re.compile(r"Bearer\s+[a-zA-Z0-9\-_\.]+", re.IGNORECASE)
|
|
|
|
|
|
def sanitize_bearer_tokens(event, hint) -> Optional[Event]:
|
|
def _sanitize_value(value):
|
|
if isinstance(value, str):
|
|
return BEARER_PATTERN.sub("[***Filtered***]", value)
|
|
elif isinstance(value, dict):
|
|
return {k: _sanitize_value(v) for k, v in value.items()}
|
|
elif isinstance(value, (list, tuple)):
|
|
return [_sanitize_value(item) for item in value]
|
|
else:
|
|
return value
|
|
|
|
if "exc_info" in hint:
|
|
exc_type, exc_value, traceback = hint["exc_info"]
|
|
if exc_type == RequestError or exc_type == AioRequestError:
|
|
status_code = exc_value.code
|
|
if status_code == StatusCode.UNAVAILABLE:
|
|
return _sanitize_value(event)
|
|
|
|
return None
|
|
|
|
|
|
def init_error_hub(client):
|
|
import sentry_sdk
|
|
from sentry_sdk.integrations.modules import ModulesIntegration
|
|
|
|
# noinspection PyProtectedMember
|
|
sentry_sdk.init(
|
|
dsn=ERROR_HUB_DSN,
|
|
send_default_pii=False,
|
|
integrations=[],
|
|
auto_enabling_integrations=False,
|
|
disabled_integrations=[ModulesIntegration(), ExcepthookIntegration()],
|
|
before_send=sanitize_bearer_tokens,
|
|
release=PACKAGE_NAME,
|
|
environment=client._target,
|
|
add_full_stack=False,
|
|
max_stack_frames=3,
|
|
max_breadcrumbs=3,
|
|
enable_logs=False,
|
|
)
|
|
|
|
|
|
async def async_init_error_hub(client):
|
|
init_error_hub(client)
|
|
|
|
|
|
def handle_error_hub_gen() -> Callable[[TFunc], TFunc]:
|
|
def decorator(func: TFunc) -> TFunc:
|
|
# noinspection DuplicatedCode
|
|
@wraps(func)
|
|
def wrapper(*args: Any, **kwargs: Any) -> Any:
|
|
with sentry_sdk.new_scope() as scope:
|
|
stream_start = datetime.datetime.now()
|
|
scope.set_extra("stream_start", stream_start)
|
|
try:
|
|
for message in func(*args, **kwargs):
|
|
scope.set_extra("last_message", message)
|
|
yield message
|
|
except RequestError as e:
|
|
metadata = e.metadata
|
|
tracking_id = metadata.tracking_id if metadata else None
|
|
scope.set_extra("tracking_id", tracking_id)
|
|
scope.set_extra(
|
|
"stream_duration", datetime.datetime.now() - stream_start
|
|
)
|
|
sentry_sdk.capture_exception(e)
|
|
raise
|
|
|
|
return cast(TFunc, wrapper)
|
|
|
|
return decorator
|
|
|
|
|
|
def handle_aio_error_hub_gen() -> Callable[[TFunc], TFunc]:
|
|
def decorator(func: TFunc) -> TFunc:
|
|
# noinspection DuplicatedCode
|
|
@wraps(func)
|
|
async def wrapper(*args: Any, **kwargs: Any) -> Any:
|
|
with sentry_sdk.new_scope() as scope:
|
|
stream_start = datetime.datetime.now()
|
|
scope.set_extra("stream_start", stream_start)
|
|
try:
|
|
async for message in func(*args, **kwargs):
|
|
scope.set_extra("last_message", message)
|
|
yield message
|
|
except AioRequestError as e:
|
|
metadata = e.metadata
|
|
tracking_id = metadata.tracking_id if metadata else None
|
|
scope.set_extra("tracking_id", tracking_id)
|
|
scope.set_extra(
|
|
"stream_duration", datetime.datetime.now() - stream_start
|
|
)
|
|
sentry_sdk.capture_exception(e)
|
|
raise
|
|
|
|
return cast(TFunc, wrapper)
|
|
|
|
return decorator
|