<!DOCTYPE html>
<html lang="en">

<head>
    <meta charset="UTF-8">
    <meta http-equiv="X-UA-Compatible" content="IE=edge">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <link href="https://cdn.jsdelivr.net/npm/bootstrap@5.3.0-alpha1/dist/css/bootstrap.min.css" rel="stylesheet"
        integrity="sha384-GLhlTQ8iRABdZLl6O3oVMWSktQOp6b7In1Zl3/Jr59b6EGGoI1aFkw7cmDA6j6gD" crossorigin="anonymous">
    <link rel="stylesheet" href="https://cdnjs.cloudflare.com/ajax/libs/font-awesome/6.3.0/css/all.min.css"
        integrity="sha512-SzlrxWUlpfuzQ+pcUCosxcglQRNAq/DZjVsC0lE40xsADsfeQoEypE+enwcOiGjk/bSuGGKHEyjSoQ1zVisanQ=="
        crossorigin="anonymous" referrerpolicy="no-referrer" />
</head>
</html>
from typing import TYPE_CHECKING

import sentry_sdk
from sentry_sdk.consts import OP
from sentry_sdk.integrations import DidNotEnable
from sentry_sdk.integrations.grpc.consts import SPAN_ORIGIN
from sentry_sdk.tracing import TransactionSource
from sentry_sdk.tracing_utils import has_span_streaming_enabled
from sentry_sdk.utils import event_from_exception

if TYPE_CHECKING:
    from collections.abc import Awaitable, Callable
    from typing import Any, Optional


try:
    import grpc
    from grpc import HandlerCallDetails, RpcMethodHandler
    from grpc.aio import AbortError, ServicerContext
except ImportError:
    raise DidNotEnable("grpcio is not installed")


class ServerInterceptor(grpc.aio.ServerInterceptor):  # type: ignore
    def __init__(
        self: "ServerInterceptor",
        find_name: "Callable[[ServicerContext], str] | None" = None,
    ) -> None:
        self._custom_find_name = find_name

        super().__init__()

    async def intercept_service(
        self: "ServerInterceptor",
        continuation: "Callable[[HandlerCallDetails], Awaitable[RpcMethodHandler]]",
        handler_call_details: "HandlerCallDetails",
    ) -> "Optional[Awaitable[RpcMethodHandler]]":
        handler = await continuation(handler_call_details)
        if handler is None:
            return None

        method_name = handler_call_details.method
        custom_find_name = self._custom_find_name

        if not handler.request_streaming and not handler.response_streaming:
            handler_factory = grpc.unary_unary_rpc_method_handler

            async def wrapped(request: "Any", context: "ServicerContext") -> "Any":
                with sentry_sdk.isolation_scope():
                    name = (
                        custom_find_name(context) if custom_find_name else method_name
                    )
                    if not name:
                        return await handler(request, context)

                    span_streaming = has_span_streaming_enabled(
                        sentry_sdk.get_client().options
                    )
                    if span_streaming:
                        # What if the headers are empty?
                        sentry_sdk.traces.continue_trace(
                            dict(context.invocation_metadata())
                        )

                        with sentry_sdk.traces.start_span(
                            name=name,
                            attributes={
                                "sentry.op": OP.GRPC_SERVER,
                                "sentry.span.source": TransactionSource.CUSTOM.value,
                                "sentry.origin": SPAN_ORIGIN,
                            },
                            parent_span=None,
                        ):
                            try:
                                return await handler.unary_unary(request, context)
                            except AbortError:
                                raise
                            except Exception as exc:
                                event, hint = event_from_exception(
                                    exc,
                                    mechanism={"type": "grpc", "handled": False},
                                )
                                sentry_sdk.capture_event(event, hint=hint)
                                raise
                    else:
                        # What if the headers are empty?
                        transaction = sentry_sdk.continue_trace(
                            dict(context.invocation_metadata()),
                            op=OP.GRPC_SERVER,
                            name=name,
                            source=TransactionSource.CUSTOM,
                            origin=SPAN_ORIGIN,
                        )

                        with sentry_sdk.start_transaction(transaction=transaction):
                            try:
                                return await handler.unary_unary(request, context)
                            except AbortError:
                                raise
                            except Exception as exc:
                                event, hint = event_from_exception(
                                    exc,
                                    mechanism={"type": "grpc", "handled": False},
                                )
                                sentry_sdk.capture_event(event, hint=hint)
                                raise

        elif not handler.request_streaming and handler.response_streaming:
            handler_factory = grpc.unary_stream_rpc_method_handler

            async def wrapped(request: "Any", context: "ServicerContext") -> "Any":  # type: ignore
                async for r in handler.unary_stream(request, context):
                    yield r

        elif handler.request_streaming and not handler.response_streaming:
            handler_factory = grpc.stream_unary_rpc_method_handler

            async def wrapped(request: "Any", context: "ServicerContext") -> "Any":
                response = handler.stream_unary(request, context)
                return await response

        elif handler.request_streaming and handler.response_streaming:
            handler_factory = grpc.stream_stream_rpc_method_handler

            async def wrapped(request: "Any", context: "ServicerContext") -> "Any":  # type: ignore
                async for r in handler.stream_stream(request, context):
                    yield r

        return handler_factory(
            wrapped,
            request_deserializer=handler.request_deserializer,
            response_serializer=handler.response_serializer,
        )
