Based on https://github.com/getsentry/sentry-python/blob/46c24ea70a47ced2411f9d69ffccb9d2dc8f3e1d/sentry_sdk/utils.py

TimeoutThread

Bases: Thread

Creates a Thread which runs (sleeps) for a time duration equal to waiting_time and then raises a custom SafeInitTimeoutWarning exception.

Source code in safe_init/timeout.py
class TimeoutThread(threading.Thread):
    """
    Creates a Thread which runs (sleeps) for a time duration equal to
    waiting_time and then raises a custom SafeInitTimeoutWarning exception.
    """

    def __init__(
        self,
        waiting_time: float,
        timeout_message: str,
        call_args: tuple[Any, ...],
        call_kwargs: dict[str, Any],
        execution_fingerprint: list[Any],
    ) -> None:
        threading.Thread.__init__(self)
        self.waiting_time = waiting_time
        self.timeout_message = timeout_message
        self.call_args = call_args
        self.call_kwargs = call_kwargs
        self.execution_fingerprint = execution_fingerprint
        self._stop_event = threading.Event()

    def stop(self) -> None:
        self._stop_event.set()

    def run(self) -> None:
        # Save the thread start time
        thread_start_time = time.time()
        log_debug("Timeout thread started", waiting_time=self.waiting_time, start_time=thread_start_time)

        # Execute ddtrace's patching in case preloading writes something to logs
        patch(logging=True)

        # Preload as much as possible to make timeout actions faster
        try:
            preload_sqs_client()
            get_logger()
        except Exception as e:
            log_warning("Preloading failed in the timeout thread, proceeding anyway", exc_info=e)

        # Subtract the time spent in preloading from waiting time, then schedule the action
        actual_waiting_time = self.waiting_time - (time.time() - thread_start_time)
        log_debug("Timeout thread waiting", actual_waiting_time=actual_waiting_time)
        self._stop_event.wait(actual_waiting_time)

        log_debug("Timeout thread finished waiting")
        if self._stop_event.is_set():
            log_debug("Timeout thread was stopped")
            return

        log_debug("Timeout thread processing data before raising")

        if context_has_dlq():
            log_debug("Pushing to DLQ")
            push_event_to_dlq(*self.call_args, **self.call_kwargs)
            log_debug("Pushed to DLQ")

        include_lambda_data = is_lambda_handler(self.call_args)

        calls_by_time = []
        # Check if execution was traced to optionally include the trace in timeout warnings
        if include_lambda_data:
            log_debug("Checking for traces")
            if tracer.is_traced():
                log_debug("Tracer is active")
                fn_calls = tracer.get_function_calls()
                aggregated_calls = aggregate_traced_fn_calls(fn_calls)
                log_debug("Got aggregated calls", aggregated_calls=aggregated_calls)
                calls_by_time = sorted(aggregated_calls, key=lambda x: x[2], reverse=True)
                log_debug("Sorted calls by time", calls_by_time=calls_by_time)

        additional_log_data: dict[str, Any] = {}
        if include_lambda_data:
            additional_log_data["lambda_name"] = os.environ.get(
                "AWS_LAMBDA_FUNCTION_NAME",
                self.call_args[1].function_name,
            )

        exc = SafeInitTimeoutWarning(self.timeout_message)
        exc.traces = calls_by_time

        sentry_result = sentry_capture(
            exc,
            fingerprint=self.execution_fingerprint,
            tags={
                "is_timeout": "true",
                "timeout_value_seconds": str(self.waiting_time),
                "is_lambda": "true" if include_lambda_data else "false",
                "has_dlq": "true" if context_has_dlq() else "false",
                **additional_log_data,
            },
            attachments={"longest_calls": calls_by_time} if calls_by_time else None,
        )
        log_debug("Sentry capture result", sentry_capture_result=sentry_result)

        if calls_by_time:
            additional_log_data["longest_calls"] = calls_by_time[:40]

        log_error(
            self.timeout_message,
            sentry_capture_result=sentry_result,
            exc_info=exc,
            **additional_log_data,
        )

        if not bool_env("SAFE_INIT_NO_SLACK_TIMEOUT_NOTIFICATIONS"):
            slack_notify(
                str(exc),
                exc,
                lambda_context=self.call_args[1] if is_lambda_handler(self.call_args) else None,
                sentry_capture_result=sentry_result,
                additional_context=format_traces(calls_by_time, 15),
            )

        # Write tracing details to logs
        if calls_by_time:
            log_warning("All function call summaries", calls_by_time=calls_by_time)
            log_warning("All function calls", raw_call_list=fn_calls)

        # Save Exception in a private field for tests to assert on
        self._exception = exc

        # Raising Exception after timeout duration is reached
        raise exc