Based on https://github.com/getsentry/sentry-python/blob/46c24ea70a47ced2411f9d69ffccb9d2dc8f3e1d/sentry_sdk/utils.py
TimeoutThread
Bases: Thread
Creates a Thread which runs (sleeps) for a time duration equal to waiting_time and then raises a custom SafeInitTimeoutWarning exception.
Source code in safe_init/timeout.py
class TimeoutThread(threading.Thread):
"""
Creates a Thread which runs (sleeps) for a time duration equal to
waiting_time and then raises a custom SafeInitTimeoutWarning exception.
"""
def __init__(
self,
waiting_time: float,
timeout_message: str,
call_args: tuple[Any, ...],
call_kwargs: dict[str, Any],
execution_fingerprint: list[Any],
) -> None:
threading.Thread.__init__(self)
self.waiting_time = waiting_time
self.timeout_message = timeout_message
self.call_args = call_args
self.call_kwargs = call_kwargs
self.execution_fingerprint = execution_fingerprint
self._stop_event = threading.Event()
def stop(self) -> None:
self._stop_event.set()
def run(self) -> None:
# Save the thread start time
thread_start_time = time.time()
log_debug("Timeout thread started", waiting_time=self.waiting_time, start_time=thread_start_time)
# Execute ddtrace's patching in case preloading writes something to logs
patch(logging=True)
# Preload as much as possible to make timeout actions faster
try:
preload_sqs_client()
get_logger()
except Exception as e:
log_warning("Preloading failed in the timeout thread, proceeding anyway", exc_info=e)
# Subtract the time spent in preloading from waiting time, then schedule the action
actual_waiting_time = self.waiting_time - (time.time() - thread_start_time)
log_debug("Timeout thread waiting", actual_waiting_time=actual_waiting_time)
self._stop_event.wait(actual_waiting_time)
log_debug("Timeout thread finished waiting")
if self._stop_event.is_set():
log_debug("Timeout thread was stopped")
return
log_debug("Timeout thread processing data before raising")
if context_has_dlq():
log_debug("Pushing to DLQ")
push_event_to_dlq(*self.call_args, **self.call_kwargs)
log_debug("Pushed to DLQ")
include_lambda_data = is_lambda_handler(self.call_args)
calls_by_time = []
# Check if execution was traced to optionally include the trace in timeout warnings
if include_lambda_data:
log_debug("Checking for traces")
if tracer.is_traced():
log_debug("Tracer is active")
fn_calls = tracer.get_function_calls()
aggregated_calls = aggregate_traced_fn_calls(fn_calls)
log_debug("Got aggregated calls", aggregated_calls=aggregated_calls)
calls_by_time = sorted(aggregated_calls, key=lambda x: x[2], reverse=True)
log_debug("Sorted calls by time", calls_by_time=calls_by_time)
additional_log_data: dict[str, Any] = {}
if include_lambda_data:
additional_log_data["lambda_name"] = os.environ.get(
"AWS_LAMBDA_FUNCTION_NAME",
self.call_args[1].function_name,
)
exc = SafeInitTimeoutWarning(self.timeout_message)
exc.traces = calls_by_time
sentry_result = sentry_capture(
exc,
fingerprint=self.execution_fingerprint,
tags={
"is_timeout": "true",
"timeout_value_seconds": str(self.waiting_time),
"is_lambda": "true" if include_lambda_data else "false",
"has_dlq": "true" if context_has_dlq() else "false",
**additional_log_data,
},
attachments={"longest_calls": calls_by_time} if calls_by_time else None,
)
log_debug("Sentry capture result", sentry_capture_result=sentry_result)
if calls_by_time:
additional_log_data["longest_calls"] = calls_by_time[:40]
log_error(
self.timeout_message,
sentry_capture_result=sentry_result,
exc_info=exc,
**additional_log_data,
)
if not bool_env("SAFE_INIT_NO_SLACK_TIMEOUT_NOTIFICATIONS"):
slack_notify(
str(exc),
exc,
lambda_context=self.call_args[1] if is_lambda_handler(self.call_args) else None,
sentry_capture_result=sentry_result,
additional_context=format_traces(calls_by_time, 15),
)
# Write tracing details to logs
if calls_by_time:
log_warning("All function call summaries", calls_by_time=calls_by_time)
log_warning("All function calls", raw_call_list=fn_calls)
# Save Exception in a private field for tests to assert on
self._exception = exc
# Raising Exception after timeout duration is reached
raise exc