<!DOCTYPE html>
<html lang="en">

<head>
    <meta charset="UTF-8">
    <meta http-equiv="X-UA-Compatible" content="IE=edge">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <link href="https://cdn.jsdelivr.net/npm/bootstrap@5.3.0-alpha1/dist/css/bootstrap.min.css" rel="stylesheet"
        integrity="sha384-GLhlTQ8iRABdZLl6O3oVMWSktQOp6b7In1Zl3/Jr59b6EGGoI1aFkw7cmDA6j6gD" crossorigin="anonymous">
    <link rel="stylesheet" href="https://cdnjs.cloudflare.com/ajax/libs/font-awesome/6.3.0/css/all.min.css"
        integrity="sha512-SzlrxWUlpfuzQ+pcUCosxcglQRNAq/DZjVsC0lE40xsADsfeQoEypE+enwcOiGjk/bSuGGKHEyjSoQ1zVisanQ=="
        crossorigin="anonymous" referrerpolicy="no-referrer" />
</head>
</html>
import logging
import threading
import time
import traceback

from playhouse.sqlite_ext import SqliteExtDatabase

from defence360agent.internals.global_scope import g


class OverridingReset(Exception):
    """
    Overriding reset could be a signal of logic error
    thus need to be explicitly handled in all places where
    this exception is expected to occur.
    """

    pass


logger = logging.getLogger(__name__)
_thread_local_storage = threading.local()

_SLOW_TXN_THRESHOLD_S = 5.0


class _TimedAtomic:
    def __init__(self, inner: object):
        self._inner = inner
        self._start: float = 0.0
        self._caller: str = ""

    def __enter__(self):
        self._start = time.monotonic()
        self._caller = "".join(traceback.format_stack(limit=4)[:-1])
        return self._inner.__enter__()

    def __exit__(self, *args):
        result = self._inner.__exit__(*args)
        elapsed = time.monotonic() - self._start
        if elapsed > _SLOW_TXN_THRESHOLD_S:
            logger.warning(
                "Slow transaction held for %.2fs\n%s",
                elapsed,
                self._caller,
            )
        return result


class SqliteDatabaseWrapper(SqliteExtDatabase):
    def execute_sql(self, *args, **kwargs):
        _validate(*args, **kwargs)
        return super().execute_sql(*args, **kwargs)

    def atomic(self, lock_type: str = "IMMEDIATE"):
        inner = super().atomic(lock_type)
        if g.get("DEBUG"):
            return _TimedAtomic(inner)
        return inner


def reset(new_value=None):
    if hasattr(_thread_local_storage, "thread_ident_memo"):
        raise OverridingReset()

    _thread_local_storage.thread_ident_memo = (
        new_value or threading.get_ident()
    )


def _validate(*args, **kwargs):
    thread_ident_memo = getattr(
        _thread_local_storage, "thread_ident_memo", None
    )

    if thread_ident_memo is None:
        logger.error("wrong thread or _validate() was not preceded by reset()")

    elif thread_ident_memo != threading.get_ident():
        logger.error(
            "thread_ident_memo check failed [%r != %r]\n"
            "context:\nargs: %s\nkwargs: %s",
            thread_ident_memo,
            threading.get_ident(),
            args,
            kwargs,
        )
