prefect.logging
special
¶
configuration
¶
load_logging_config
¶
Loads logging configuration from a path allowing override from the environment
Source code in prefect/logging/configuration.py
def load_logging_config(path: Path) -> dict:
"""
Loads logging configuration from a path allowing override from the environment
"""
template = string.Template(path.read_text())
config = yaml.safe_load(
# Substitute settings into the template in format $SETTING / ${SETTING}
template.substitute(
{
setting.name: str(setting.value())
for setting in SETTING_VARIABLES.values()
}
)
)
# Load overrides from the environment
flat_config = dict_to_flatdict(config)
for key_tup, val in flat_config.items():
env_val = os.environ.get(
# Generate a valid environment variable with nesting indicated with '_'
to_envvar("PREFECT_LOGGING_" + "_".join(key_tup)).upper()
)
if env_val:
val = env_val
# reassign the updated value
flat_config[key_tup] = val
return flatdict_to_dict(flat_config)
formatters
¶
JsonFormatter
¶
Formats log records as a JSON string.
If a log record attribute is not JSON serialiazable, it will be dropped from the output.
The format may be specified as "pretty" to format the JSON with indents and newlines.
Source code in prefect/logging/formatters.py
class JsonFormatter(logging.Formatter):
"""
Formats log records as a JSON string.
If a log record attribute is not JSON serialiazable, it will be dropped from the
output.
The format may be specified as "pretty" to format the JSON with indents and
newlines.
"""
def __init__(self, fmt, dmft, style) -> None:
if fmt not in ["pretty", "default"]:
raise ValueError("Format must be either 'pretty' or 'default'.")
self.fmt = fmt
def format(self, record: logging.LogRecord):
format_fn = pformat if self.fmt == "pretty" else str
return format_fn(
{key: safe_jsonable(val) for key, val in record.__dict__.items()}
)
JsonFormatter.format
¶
Format the specified record as text.
The record's attribute dictionary is used as the operand to a string formatting operation which yields the returned string. Before formatting the dictionary, a couple of preparatory steps are carried out. The message attribute of the record is computed using LogRecord.getMessage(). If the formatting string uses the time (as determined by a call to usesTime(), formatTime() is called to format the event time. If there is exception information, it is formatted using formatException() and appended to the message.
Source code in prefect/logging/formatters.py
def format(self, record: logging.LogRecord):
format_fn = pformat if self.fmt == "pretty" else str
return format_fn(
{key: safe_jsonable(val) for key, val in record.__dict__.items()}
)
handlers
¶
OrionHandler
¶
A logging handler that sends logs to the Orion API.
Sends log records to the OrionLogWorker
which manages sending batches of logs in
the background.
Source code in prefect/logging/handlers.py
class OrionHandler(logging.Handler):
"""
A logging handler that sends logs to the Orion API.
Sends log records to the `OrionLogWorker` which manages sending batches of logs in
the background.
"""
workers: Dict[prefect.context.SettingsContext, OrionLogWorker] = {}
def get_worker(self, context: prefect.context.SettingsContext) -> OrionLogWorker:
if context not in self.workers:
worker = self.workers[context] = OrionLogWorker(context)
worker.start()
return self.workers[context]
@classmethod
def flush(cls, block: bool = False):
"""
Tell the `OrionLogWorker` to send any currently enqueued logs.
Blocks until enqueued logs are sent if `block` is set.
"""
for worker in cls.workers.values():
worker.flush(block)
def emit(self, record: logging.LogRecord):
"""
Send a log to the `OrionLogWorker`
"""
try:
profile = prefect.context.get_settings_context()
if not PREFECT_LOGGING_ORION_ENABLED.value_from(profile.settings):
return # Respect the global settings toggle
if not getattr(record, "send_to_orion", True):
return # Do not send records that have opted out
self.get_worker(profile).enqueue(self.prepare(record, profile.settings))
except Exception:
self.handleError(record)
def handleError(self, record: logging.LogRecord) -> None:
_, exc, _ = sys.exc_info()
# Warn when a logger is used outside of a run context, the stack level here
# gets us to the user logging call
if isinstance(exc, MissingContextError):
warnings.warn(exc, stacklevel=8)
return
# Display a longer traceback for other errors
return super().handleError(record)
def prepare(
self, record: logging.LogRecord, settings: prefect.settings.Settings
) -> LogCreate:
"""
Convert a `logging.LogRecord` to the Orion `LogCreate` schema and serialize.
This infers the linked flow or task run from the log record or the current
run context.
If a flow run id cannot be found, the log will be dropped.
Logs exceeding the maximum size will be dropped.
"""
flow_run_id = getattr(record, "flow_run_id", None)
task_run_id = getattr(record, "task_run_id", None)
if not flow_run_id:
try:
context = prefect.context.get_run_context()
except MissingContextError:
raise MissingContextError(
f"Logger {record.name!r} attempted to send logs to Orion without a "
"flow run id. The Orion log handler can only send logs within flow "
"run contexts unless the flow run id is manually provided."
) from None
if hasattr(context, "flow_run"):
flow_run_id = context.flow_run.id
elif hasattr(context, "task_run"):
flow_run_id = context.task_run.flow_run_id
task_run_id = task_run_id or context.task_run.id
else:
raise ValueError(
"Encountered malformed run context. Does not contain flow or task "
"run information."
)
# Parsing to a `LogCreate` object here gives us nice parsing error messages
# from the standard lib `handleError` method if something goes wrong and
# prevents malformed logs from entering the queue
log = LogCreate(
flow_run_id=flow_run_id,
task_run_id=task_run_id,
name=record.name,
level=record.levelno,
timestamp=pendulum.from_timestamp(
getattr(record, "created", None) or time.time()
),
message=self.format(record),
).dict(json_compatible=True)
log_size = sys.getsizeof(log)
if log_size > PREFECT_LOGGING_ORION_MAX_LOG_SIZE.value():
raise ValueError(
f"Log of size {log_size} is greater than the max size of "
f"{PREFECT_LOGGING_ORION_MAX_LOG_SIZE.value()}"
)
return log
def close(self) -> None:
"""
Shuts down this handler and the flushes the `OrionLogWorkers`
"""
for worker in self.workers.values():
# Flush instead of closing ecause another instance may be using the worker
worker.flush()
return super().close()
OrionHandler.close
¶
Shuts down this handler and the flushes the OrionLogWorkers
Source code in prefect/logging/handlers.py
def close(self) -> None:
"""
Shuts down this handler and the flushes the `OrionLogWorkers`
"""
for worker in self.workers.values():
# Flush instead of closing ecause another instance may be using the worker
worker.flush()
return super().close()
OrionHandler.emit
¶
Send a log to the OrionLogWorker
Source code in prefect/logging/handlers.py
def emit(self, record: logging.LogRecord):
"""
Send a log to the `OrionLogWorker`
"""
try:
profile = prefect.context.get_settings_context()
if not PREFECT_LOGGING_ORION_ENABLED.value_from(profile.settings):
return # Respect the global settings toggle
if not getattr(record, "send_to_orion", True):
return # Do not send records that have opted out
self.get_worker(profile).enqueue(self.prepare(record, profile.settings))
except Exception:
self.handleError(record)
OrionHandler.flush
classmethod
¶
Tell the OrionLogWorker
to send any currently enqueued logs.
Blocks until enqueued logs are sent if block
is set.
Source code in prefect/logging/handlers.py
@classmethod
def flush(cls, block: bool = False):
"""
Tell the `OrionLogWorker` to send any currently enqueued logs.
Blocks until enqueued logs are sent if `block` is set.
"""
for worker in cls.workers.values():
worker.flush(block)
OrionHandler.handleError
¶
Handle errors which occur during an emit() call.
This method should be called from handlers when an exception is encountered during an emit() call. If raiseExceptions is false, exceptions get silently ignored. This is what is mostly wanted for a logging system - most users will not care about errors in the logging system, they are more interested in application errors. You could, however, replace this with a custom handler if you wish. The record which was being processed is passed in to this method.
Source code in prefect/logging/handlers.py
def handleError(self, record: logging.LogRecord) -> None:
_, exc, _ = sys.exc_info()
# Warn when a logger is used outside of a run context, the stack level here
# gets us to the user logging call
if isinstance(exc, MissingContextError):
warnings.warn(exc, stacklevel=8)
return
# Display a longer traceback for other errors
return super().handleError(record)
OrionHandler.prepare
¶
Convert a logging.LogRecord
to the Orion LogCreate
schema and serialize.
This infers the linked flow or task run from the log record or the current run context.
If a flow run id cannot be found, the log will be dropped.
Logs exceeding the maximum size will be dropped.
Source code in prefect/logging/handlers.py
def prepare(
self, record: logging.LogRecord, settings: prefect.settings.Settings
) -> LogCreate:
"""
Convert a `logging.LogRecord` to the Orion `LogCreate` schema and serialize.
This infers the linked flow or task run from the log record or the current
run context.
If a flow run id cannot be found, the log will be dropped.
Logs exceeding the maximum size will be dropped.
"""
flow_run_id = getattr(record, "flow_run_id", None)
task_run_id = getattr(record, "task_run_id", None)
if not flow_run_id:
try:
context = prefect.context.get_run_context()
except MissingContextError:
raise MissingContextError(
f"Logger {record.name!r} attempted to send logs to Orion without a "
"flow run id. The Orion log handler can only send logs within flow "
"run contexts unless the flow run id is manually provided."
) from None
if hasattr(context, "flow_run"):
flow_run_id = context.flow_run.id
elif hasattr(context, "task_run"):
flow_run_id = context.task_run.flow_run_id
task_run_id = task_run_id or context.task_run.id
else:
raise ValueError(
"Encountered malformed run context. Does not contain flow or task "
"run information."
)
# Parsing to a `LogCreate` object here gives us nice parsing error messages
# from the standard lib `handleError` method if something goes wrong and
# prevents malformed logs from entering the queue
log = LogCreate(
flow_run_id=flow_run_id,
task_run_id=task_run_id,
name=record.name,
level=record.levelno,
timestamp=pendulum.from_timestamp(
getattr(record, "created", None) or time.time()
),
message=self.format(record),
).dict(json_compatible=True)
log_size = sys.getsizeof(log)
if log_size > PREFECT_LOGGING_ORION_MAX_LOG_SIZE.value():
raise ValueError(
f"Log of size {log_size} is greater than the max size of "
f"{PREFECT_LOGGING_ORION_MAX_LOG_SIZE.value()}"
)
return log
OrionLogWorker
¶
Manages the submission of logs to Orion in a background thread.
Source code in prefect/logging/handlers.py
class OrionLogWorker:
"""
Manages the submission of logs to Orion in a background thread.
"""
def __init__(self, profile_context: prefect.context.SettingsContext) -> None:
self.profile_context = profile_context.copy()
self._queue: queue.Queue[dict] = queue.Queue()
self._send_thread = threading.Thread(
target=self._send_logs_loop,
name="orion-log-worker",
daemon=True,
)
self._flush_event = threading.Event()
self._stop_event = threading.Event()
self._send_logs_finished_event = threading.Event()
self._lock = threading.Lock()
self._started = False
self._stopped = False # Cannot be started again after stopped
# Tracks logs that have been pulled from the queue but not sent successfully
self._pending_logs: List[dict] = []
self._pending_size: int = 0
self._retries = 0
self._max_retries = 3
# Ensure stop is called at exit
if sys.version_info < (3, 9):
atexit.register(self.stop)
else:
# See related issue at https://bugs.python.org/issue42647
# The http client uses a thread pool executor to make requests and in 3.9+
# new threads cannot be spawned after the interpreter finalizes threads
# which happens _before_ the normal `atexit` hook is called resulting in
# the failure to send logs.
from threading import _register_atexit
_register_atexit(self.stop)
def _send_logs_loop(self):
"""
Should only be the target of the `send_thread` as it creates a new event loop.
Runs until the `stop_event` is set.
"""
# Initialize prefect in this new thread, but do not reconfigure logging
try:
with self.profile_context:
while not self._stop_event.is_set():
# Wait until flush is called or the batch interval is reached
self._flush_event.wait(PREFECT_LOGGING_ORION_BATCH_INTERVAL.value())
self._flush_event.clear()
anyio.run(self.send_logs)
# Notify watchers that logs were sent
self._send_logs_finished_event.set()
self._send_logs_finished_event.clear()
# After the stop event, we are exiting...
# Try to send any remaining logs
anyio.run(self.send_logs, True)
except Exception:
if logging.raiseExceptions and sys.stderr:
sys.stderr.write("--- Orion logging error ---\n")
sys.stderr.write("The log worker encountered a fatal error.\n")
traceback.print_exc(file=sys.stderr)
sys.stderr.write(self.worker_info())
finally:
# Set the finished event so anyone waiting on worker completion does not
# continue to block if an exception is encountered
self._send_logs_finished_event.set()
async def send_logs(self, exiting: bool = False) -> None:
"""
Send all logs in the queue in batches to avoid network limits.
If a client error is encountered, the logs pulled from the queue are retained
and will be sent on the next call.
Note that if there is a single bad log in the queue, this will repeatedly
fail as we do not ever drop logs. We may want to adjust this behavior in the
future if there are issues.
"""
done = False
# Determine the batch size by removing the max size of a single log to avoid
# exceeding the max size in normal operation. If the single log size is greater
# than this difference, we use that instead so logs will still be sent.
max_batch_size = max(
PREFECT_LOGGING_ORION_BATCH_SIZE.value()
- PREFECT_LOGGING_ORION_MAX_LOG_SIZE.value(),
PREFECT_LOGGING_ORION_MAX_LOG_SIZE.value(),
)
# Loop until the queue is empty or we encounter an error
while not done:
# Pull logs from the queue until it is empty or we reach the batch size
try:
while self._pending_size < max_batch_size:
log = self._queue.get_nowait()
self._pending_logs.append(log)
self._pending_size += sys.getsizeof(log)
except queue.Empty:
done = True
if not self._pending_logs:
continue
client = get_client()
client.manage_lifespan = False
async with client:
try:
await client.create_logs(self._pending_logs)
self._pending_logs = []
self._pending_size = 0
self._retries = 0
except Exception:
# Attempt to send these logs on the next call instead
done = True
self._retries += 1
# Roughly replicate the behavior of the stdlib logger error handling
if logging.raiseExceptions and sys.stderr:
sys.stderr.write("--- Orion logging error ---\n")
traceback.print_exc(file=sys.stderr)
sys.stderr.write(self.worker_info())
if exiting:
sys.stderr.write(
"The log worker is stopping and these logs will not be sent.\n"
)
elif self._retries > self._max_retries:
sys.stderr.write(
"The log worker has tried to send these logs "
f"{self._retries} times and will now drop them."
)
else:
sys.stderr.write(
"The log worker will attempt to send these logs again in "
f"{PREFECT_LOGGING_ORION_BATCH_INTERVAL.value()}s\n"
)
if self._retries > self._max_retries:
# Drop this batch of logs
self._pending_logs = []
self._pending_size = 0
self._retries = 0
def worker_info(self) -> str:
"""Returns a debugging string with worker log stats"""
return (
"Worker information:\n"
f" Approximate queue length: {self._queue.qsize()}\n"
f" Pending log batch length: {len(self._pending_logs)}\n"
f" Pending log batch size: {self._pending_size}\n"
)
def enqueue(self, log: LogCreate):
if self._stopped:
raise RuntimeError(
"Logs cannot be enqueued after the Orion log worker is stopped."
)
self._queue.put(log)
def flush(self, block: bool = False) -> None:
with self._lock:
if not self._started and not self._stopped:
raise RuntimeError("Worker was never started.")
self._flush_event.set()
if block:
# TODO: Sometimes the log worker will deadlock and never finish
# so we will only block for 30 seconds here. When logging is
# refactored, this deadlock should be resolved.
self._send_logs_finished_event.wait(30)
def start(self) -> None:
"""Start the background thread"""
with self._lock:
if not self._started and not self._stopped:
self._send_thread.start()
self._started = True
elif self._stopped:
raise RuntimeError(
"The Orion log worker cannot be started after stopping."
)
def stop(self) -> None:
"""Flush all logs and stop the background thread"""
with self._lock:
if self._started:
self._flush_event.set()
self._stop_event.set()
self._send_thread.join()
self._started = False
self._stopped = True
def is_stopped(self) -> bool:
with self._lock:
return not self._stopped
OrionLogWorker.send_logs
async
¶
Send all logs in the queue in batches to avoid network limits.
If a client error is encountered, the logs pulled from the queue are retained and will be sent on the next call.
Note that if there is a single bad log in the queue, this will repeatedly fail as we do not ever drop logs. We may want to adjust this behavior in the future if there are issues.
Source code in prefect/logging/handlers.py
async def send_logs(self, exiting: bool = False) -> None:
"""
Send all logs in the queue in batches to avoid network limits.
If a client error is encountered, the logs pulled from the queue are retained
and will be sent on the next call.
Note that if there is a single bad log in the queue, this will repeatedly
fail as we do not ever drop logs. We may want to adjust this behavior in the
future if there are issues.
"""
done = False
# Determine the batch size by removing the max size of a single log to avoid
# exceeding the max size in normal operation. If the single log size is greater
# than this difference, we use that instead so logs will still be sent.
max_batch_size = max(
PREFECT_LOGGING_ORION_BATCH_SIZE.value()
- PREFECT_LOGGING_ORION_MAX_LOG_SIZE.value(),
PREFECT_LOGGING_ORION_MAX_LOG_SIZE.value(),
)
# Loop until the queue is empty or we encounter an error
while not done:
# Pull logs from the queue until it is empty or we reach the batch size
try:
while self._pending_size < max_batch_size:
log = self._queue.get_nowait()
self._pending_logs.append(log)
self._pending_size += sys.getsizeof(log)
except queue.Empty:
done = True
if not self._pending_logs:
continue
client = get_client()
client.manage_lifespan = False
async with client:
try:
await client.create_logs(self._pending_logs)
self._pending_logs = []
self._pending_size = 0
self._retries = 0
except Exception:
# Attempt to send these logs on the next call instead
done = True
self._retries += 1
# Roughly replicate the behavior of the stdlib logger error handling
if logging.raiseExceptions and sys.stderr:
sys.stderr.write("--- Orion logging error ---\n")
traceback.print_exc(file=sys.stderr)
sys.stderr.write(self.worker_info())
if exiting:
sys.stderr.write(
"The log worker is stopping and these logs will not be sent.\n"
)
elif self._retries > self._max_retries:
sys.stderr.write(
"The log worker has tried to send these logs "
f"{self._retries} times and will now drop them."
)
else:
sys.stderr.write(
"The log worker will attempt to send these logs again in "
f"{PREFECT_LOGGING_ORION_BATCH_INTERVAL.value()}s\n"
)
if self._retries > self._max_retries:
# Drop this batch of logs
self._pending_logs = []
self._pending_size = 0
self._retries = 0
OrionLogWorker.start
¶
Start the background thread
Source code in prefect/logging/handlers.py
def start(self) -> None:
"""Start the background thread"""
with self._lock:
if not self._started and not self._stopped:
self._send_thread.start()
self._started = True
elif self._stopped:
raise RuntimeError(
"The Orion log worker cannot be started after stopping."
)
OrionLogWorker.stop
¶
Flush all logs and stop the background thread
Source code in prefect/logging/handlers.py
def stop(self) -> None:
"""Flush all logs and stop the background thread"""
with self._lock:
if self._started:
self._flush_event.set()
self._stop_event.set()
self._send_thread.join()
self._started = False
self._stopped = True
OrionLogWorker.worker_info
¶
Returns a debugging string with worker log stats
Source code in prefect/logging/handlers.py
def worker_info(self) -> str:
"""Returns a debugging string with worker log stats"""
return (
"Worker information:\n"
f" Approximate queue length: {self._queue.qsize()}\n"
f" Pending log batch length: {len(self._pending_logs)}\n"
f" Pending log batch size: {self._pending_size}\n"
)
loggers
¶
PrefectLogAdapter
¶
Adapter that ensures extra kwargs are passed through correctly; without this
the extra
fields set on the adapter would overshadow any provided on a
log-by-log basis.
See https://bugs.python.org/issue32732 — the Python team has declared that this is not a bug in the LoggingAdapter and subclassing is the intended workaround.
Source code in prefect/logging/loggers.py
class PrefectLogAdapter(logging.LoggerAdapter):
"""
Adapter that ensures extra kwargs are passed through correctly; without this
the `extra` fields set on the adapter would overshadow any provided on a
log-by-log basis.
See https://bugs.python.org/issue32732 — the Python team has declared that this is
not a bug in the LoggingAdapter and subclassing is the intended workaround.
"""
def process(self, msg, kwargs):
kwargs["extra"] = {**self.extra, **(kwargs.get("extra") or {})}
return (msg, kwargs)
PrefectLogAdapter.process
¶
Process the logging message and keyword arguments passed in to a logging call to insert contextual information. You can either manipulate the message itself, the keyword args or both. Return the message and kwargs modified (or not) to suit your needs.
Normally, you'll only need to override this one method in a LoggerAdapter subclass for your specific needs.
Source code in prefect/logging/loggers.py
def process(self, msg, kwargs):
kwargs["extra"] = {**self.extra, **(kwargs.get("extra") or {})}
return (msg, kwargs)
flow_run_logger
¶
Create a flow run logger with the run's metadata attached.
Additional keyword arguments can be provided to attach custom data to the log records.
If the context is available, see run_logger
instead.
Source code in prefect/logging/loggers.py
def flow_run_logger(flow_run: "FlowRun", flow: "Flow" = None, **kwargs: str):
"""
Create a flow run logger with the run's metadata attached.
Additional keyword arguments can be provided to attach custom data to the log
records.
If the context is available, see `run_logger` instead.
"""
return PrefectLogAdapter(
get_logger("prefect.flow_runs"),
extra={
**{
"flow_run_name": flow_run.name,
"flow_run_id": str(flow_run.id),
"flow_name": flow.name if flow else "<unknown>",
},
**kwargs,
},
)
get_logger
¶
Get a prefect
logger. For use within Prefect.
Source code in prefect/logging/loggers.py
@lru_cache()
def get_logger(name: str = None) -> logging.Logger:
"""
Get a `prefect` logger. For use within Prefect.
"""
parent_logger = logging.getLogger("prefect")
if name:
# Append the name if given but allow explicit full names e.g. "prefect.test"
# should not become "prefect.prefect.test"
if not name.startswith(parent_logger.name + "."):
logger = parent_logger.getChild(name)
else:
logger = logging.getLogger(name)
else:
logger = parent_logger
return logger
get_run_logger
¶
Get a Prefect logger for the current task run or flow run.
The logger will be named either prefect.task_runs
or prefect.flow_runs
.
Contextual data about the run will be attached to the log records.
Parameters:
Name | Description | Default |
---|---|---|
context |
A specific context may be provided as an override. By default, the context is inferred from global state and this should not be needed. RunContext |
None |
**kwargs |
Additional keyword arguments will be attached to the log records in addition to the run metadata str |
{} |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If no context can be found |
Source code in prefect/logging/loggers.py
def get_run_logger(context: "RunContext" = None, **kwargs: str) -> logging.Logger:
"""
Get a Prefect logger for the current task run or flow run.
The logger will be named either `prefect.task_runs` or `prefect.flow_runs`.
Contextual data about the run will be attached to the log records.
Arguments:
context: A specific context may be provided as an override. By default, the
context is inferred from global state and this should not be needed.
**kwargs: Additional keyword arguments will be attached to the log records in
addition to the run metadata
Raises:
RuntimeError: If no context can be found
"""
# Check for existing contexts
task_run_context = prefect.context.TaskRunContext.get()
flow_run_context = prefect.context.FlowRunContext.get()
# Apply the context override
if context:
if isinstance(context, prefect.context.FlowRunContext):
flow_run_context = context
elif isinstance(context, prefect.context.TaskRunContext):
task_run_context = context
# Determine if this is a task or flow run logger
if task_run_context:
logger = task_run_logger(
task_run=task_run_context.task_run,
task=task_run_context.task,
flow_run=flow_run_context.flow_run if flow_run_context else None,
flow=flow_run_context.flow if flow_run_context else None,
**kwargs,
)
elif flow_run_context:
logger = flow_run_logger(
flow_run=flow_run_context.flow_run, flow=flow_run_context.flow, **kwargs
)
else:
raise RuntimeError("There is no active flow or task run context.")
return logger
task_run_logger
¶
Create a task run logger with the run's metadata attached.
Additional keyword arguments can be provided to attach custom data to the log records.
If the context is available, see run_logger
instead.
Source code in prefect/logging/loggers.py
def task_run_logger(
task_run: "TaskRun",
task: "Task" = None,
flow_run: "FlowRun" = None,
flow: "Flow" = None,
**kwargs: str,
):
"""
Create a task run logger with the run's metadata attached.
Additional keyword arguments can be provided to attach custom data to the log
records.
If the context is available, see `run_logger` instead.
"""
return PrefectLogAdapter(
get_logger("prefect.task_runs"),
extra={
**{
"task_run_id": str(task_run.id),
"flow_run_id": str(task_run.flow_run_id),
"task_run_name": task_run.name,
"task_name": task.name if task else "<unknown>",
"flow_run_name": flow_run.name if flow_run else "<unknown>",
"flow_name": flow.name if flow else "<unknown>",
},
**kwargs,
},
)