<!DOCTYPE html>
<html lang="en">

<head>
    <meta charset="UTF-8">
    <meta http-equiv="X-UA-Compatible" content="IE=edge">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <link href="https://cdn.jsdelivr.net/npm/bootstrap@5.3.0-alpha1/dist/css/bootstrap.min.css" rel="stylesheet"
        integrity="sha384-GLhlTQ8iRABdZLl6O3oVMWSktQOp6b7In1Zl3/Jr59b6EGGoI1aFkw7cmDA6j6gD" crossorigin="anonymous">
    <link rel="stylesheet" href="https://cdnjs.cloudflare.com/ajax/libs/font-awesome/6.3.0/css/all.min.css"
        integrity="sha512-SzlrxWUlpfuzQ+pcUCosxcglQRNAq/DZjVsC0lE40xsADsfeQoEypE+enwcOiGjk/bSuGGKHEyjSoQ1zVisanQ=="
        crossorigin="anonymous" referrerpolicy="no-referrer" />
</head>
</html>
import sys
from typing import TYPE_CHECKING

from sentry_sdk.consts import SPANDATA
from sentry_sdk.integrations import DidNotEnable
from sentry_sdk.utils import capture_internal_exceptions, reraise

from ..spans import (
    handoff_span,
    invoke_agent_span,
    update_invoke_agent_span,
)

if TYPE_CHECKING:
    from typing import Any, Awaitable, Callable, Optional

    from agents.run_internal.run_steps import SingleStepResult

    from sentry_sdk.tracing import Span

try:
    import agents
except ImportError:
    raise DidNotEnable("OpenAI Agents not installed")


def _has_active_agent_span(context_wrapper: "agents.RunContextWrapper") -> bool:
    """Check if there's an active agent span for this context"""
    return getattr(context_wrapper, "_sentry_current_agent", None) is not None


def _get_current_agent(
    context_wrapper: "agents.RunContextWrapper",
) -> "Optional[agents.Agent]":
    """Get the current agent from context wrapper"""
    return getattr(context_wrapper, "_sentry_current_agent", None)


def _close_streaming_workflow_span(agent: "Optional[agents.Agent]") -> None:
    """Close the workflow span for streaming executions if it exists."""
    if agent and hasattr(agent, "_sentry_workflow_span"):
        workflow_span = agent._sentry_workflow_span
        workflow_span.__exit__(*sys.exc_info())
        delattr(agent, "_sentry_workflow_span")


def _maybe_start_agent_span(
    context_wrapper: "agents.RunContextWrapper",
    agent: "agents.Agent",
    should_run_agent_start_hooks: bool,
    span_kwargs: "dict[str, Any]",
    is_streaming: bool = False,
) -> "Optional[Span]":
    """
    Start an agent invocation span if conditions are met.
    Handles ending any existing span for a different agent.

    Returns the new span if started, or the existing span if conditions aren't met.
    """
    if not (should_run_agent_start_hooks and agent and context_wrapper):
        return getattr(context_wrapper, "_sentry_agent_span", None)

    # End any existing span for a different agent
    if _has_active_agent_span(context_wrapper):
        current_agent = _get_current_agent(context_wrapper)
        if current_agent and current_agent != agent:
            span = getattr(context_wrapper, "_sentry_agent_span", None)
            if span:
                update_invoke_agent_span(
                    span=span, context=context_wrapper, agent=agent
                )
                span.__exit__(None, None, None)
                delattr(context_wrapper, "_sentry_agent_span")

    # Store the agent on the context wrapper so we can access it later
    context_wrapper._sentry_current_agent = agent
    span = invoke_agent_span(context_wrapper, agent, span_kwargs)
    context_wrapper._sentry_agent_span = span
    agent._sentry_agent_span = span

    if is_streaming:
        span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, True)

    return span


async def _run_single_turn(
    original_run_single_turn: "Callable[..., Awaitable[SingleStepResult]]",
    *args: "Any",
    **kwargs: "Any",
) -> "SingleStepResult":
    """
    Patched _run_single_turn that
    - creates agent invocation spans if there is no already active agent invocation span.
    - ends the agent invocation span if and only if an exception is raised in `_run_single_turn()`.
    """
    # openai-agents >= 0.14 passes `bindings: AgentBindings` instead of `agent`.
    bindings = kwargs.get("bindings")
    agent = (
        getattr(bindings, "public_agent", None)
        if bindings is not None
        else kwargs.get("agent")
    )
    context_wrapper = kwargs.get("context_wrapper")
    should_run_agent_start_hooks = kwargs.get("should_run_agent_start_hooks", False)

    span = _maybe_start_agent_span(
        context_wrapper, agent, should_run_agent_start_hooks, kwargs
    )

    if span is None or span.timestamp is not None:
        return await original_run_single_turn(*args, **kwargs)

    try:
        result = await original_run_single_turn(*args, **kwargs)
    except Exception:
        exc_info = sys.exc_info()
        with capture_internal_exceptions():
            span = getattr(context_wrapper, "_sentry_agent_span", None)
            if span:
                update_invoke_agent_span(
                    span=span, context=context_wrapper, agent=agent
                )
                span.__exit__(*exc_info)
                delattr(context_wrapper, "_sentry_agent_span")
        reraise(*exc_info)

    return result


async def _run_single_turn_streamed(
    original_run_single_turn_streamed: "Callable[..., Awaitable[SingleStepResult]]",
    *args: "Any",
    **kwargs: "Any",
) -> "SingleStepResult":
    """
    Patched _run_single_turn_streamed that
    - creates agent invocation spans for streaming if there is no already active agent invocation span.
    - ends the agent invocation span if and only if `_run_single_turn_streamed()` raises an exception.

    Note: Unlike _run_single_turn which uses keyword-only arguments (*,),
    _run_single_turn_streamed uses positional arguments. The call signature <v0.14 is:
    _run_single_turn_streamed(
        streamed_result,              # args[0]
        agent,                        # args[1]
        hooks,                        # args[2]
        context_wrapper,              # args[3]
        run_config,                   # args[4]
        should_run_agent_start_hooks, # args[5]
        tool_use_tracker,             # args[6]
        all_tools,                    # args[7]
        server_conversation_tracker,  # args[8] (optional)
    )

    The call signature >=v0.14 is:
    _run_single_turn_streamed(
        streamed_result,              # args[0]
        bindings,                     # args[1]
        hooks,                        # args[2]
        context_wrapper,              # args[3]
        run_config,                   # args[4]
        should_run_agent_start_hooks, # args[5]
        tool_use_tracker,             # args[6]
        all_tools,                    # args[7]
        server_conversation_tracker,  # args[8] (optional)
    )
    """
    streamed_result = args[0] if len(args) > 0 else kwargs.get("streamed_result")
    # openai-agents >= 0.14 passes `bindings: AgentBindings` at args[1] instead of `agent`.
    agent_or_bindings = (
        args[1] if len(args) > 1 else kwargs.get("bindings", kwargs.get("agent"))
    )
    agent = getattr(agent_or_bindings, "public_agent", agent_or_bindings)
    context_wrapper = args[3] if len(args) > 3 else kwargs.get("context_wrapper")
    should_run_agent_start_hooks = bool(
        args[5] if len(args) > 5 else kwargs.get("should_run_agent_start_hooks", False)
    )

    span_kwargs: "dict[str, Any]" = {}
    if streamed_result and hasattr(streamed_result, "input"):
        span_kwargs["original_input"] = streamed_result.input

    span = _maybe_start_agent_span(
        context_wrapper,
        agent,
        should_run_agent_start_hooks,
        span_kwargs,
        is_streaming=True,
    )

    if span is None or span.timestamp is not None:
        return await original_run_single_turn_streamed(*args, **kwargs)

    try:
        result = await original_run_single_turn_streamed(*args, **kwargs)
    except Exception:
        exc_info = sys.exc_info()
        with capture_internal_exceptions():
            span = getattr(context_wrapper, "_sentry_agent_span", None)
            if span:
                update_invoke_agent_span(
                    span=span, context=context_wrapper, agent=agent
                )
                span.__exit__(*exc_info)
                delattr(context_wrapper, "_sentry_agent_span")
            _close_streaming_workflow_span(agent)
        reraise(*exc_info)

    return result


async def _execute_handoffs(
    original_execute_handoffs: "Callable[..., SingleStepResult]",
    *args: "Any",
    **kwargs: "Any",
) -> "SingleStepResult":
    """
    Patched execute_handoffs that
    - creates and manages handoff spans.
    - ends the agent invocation span.
    - ends the workflow span if the response is streamed and an exception is raised in `execute_handoffs()`.
    """

    context_wrapper = kwargs.get("context_wrapper")
    run_handoffs = kwargs.get("run_handoffs")
    # openai-agents >= 0.14 renamed `agent` to `public_agent`.
    agent = kwargs.get("public_agent", kwargs.get("agent"))

    # Create Sentry handoff span for the first handoff (agents library only processes the first one)
    if run_handoffs:
        first_handoff = run_handoffs[0]
        handoff_agent_name = first_handoff.handoff.agent_name
        handoff_span(context_wrapper, agent, handoff_agent_name)

    if not agent or not context_wrapper or not _has_active_agent_span(context_wrapper):
        # Call original method with all parameters
        try:
            return await original_execute_handoffs(*args, **kwargs)
        except Exception:
            exc_info = sys.exc_info()
            with capture_internal_exceptions():
                _close_streaming_workflow_span(agent)
            reraise(*exc_info)

    # Call original method with all parameters
    try:
        result = await original_execute_handoffs(*args, **kwargs)
    except Exception:
        exc_info = sys.exc_info()
        with capture_internal_exceptions():
            _close_streaming_workflow_span(agent)
            span = getattr(context_wrapper, "_sentry_agent_span", None)
            if span:
                update_invoke_agent_span(
                    span=span, context=context_wrapper, agent=agent
                )
                span.__exit__(*exc_info)
                delattr(context_wrapper, "_sentry_agent_span")
        reraise(*exc_info)

    span = getattr(context_wrapper, "_sentry_agent_span", None)
    if span:
        update_invoke_agent_span(span=span, context=context_wrapper, agent=agent)
        span.__exit__(None, None, None)
        delattr(context_wrapper, "_sentry_agent_span")

    return result


async def _execute_final_output(
    original_execute_final_output: "Callable[..., SingleStepResult]",
    *args: "Any",
    **kwargs: "Any",
) -> "SingleStepResult":
    """
    Patched execute_final_output that
    - ends the agent invocation span.
    - ends the workflow span if the response is streamed.
    """

    # openai-agents >= 0.14 renamed `agent` to `public_agent`.
    agent = kwargs.get("public_agent", kwargs.get("agent"))
    context_wrapper = kwargs.get("context_wrapper")
    final_output = kwargs.get("final_output")

    if not agent or not context_wrapper or not _has_active_agent_span(context_wrapper):
        try:
            return await original_execute_final_output(*args, **kwargs)
        finally:
            with capture_internal_exceptions():
                # For streaming, close the workflow span (non-streaming uses context manager in _create_run_wrapper)
                _close_streaming_workflow_span(agent)

    try:
        result = await original_execute_final_output(*args, **kwargs)
    except Exception:
        exc_info = sys.exc_info()
        with capture_internal_exceptions():
            # For streaming, close the workflow span (non-streaming uses context manager in _create_run_wrapper)
            _close_streaming_workflow_span(agent)
            span = getattr(context_wrapper, "_sentry_agent_span", None)
            if span:
                update_invoke_agent_span(
                    span=span, context=context_wrapper, agent=agent, output=final_output
                )
                span.__exit__(*exc_info)
                delattr(context_wrapper, "_sentry_agent_span")
        reraise(*exc_info)

    span = getattr(context_wrapper, "_sentry_agent_span", None)
    if span:
        update_invoke_agent_span(
            span=span, context=context_wrapper, agent=agent, output=final_output
        )
        span.__exit__(None, None, None)
        delattr(context_wrapper, "_sentry_agent_span")

    return result
