airflow.providers.cncf.kubernetes.utils.pod_manager¶
Launches PODs.
Attributes¶
Sentinel for no xcom result. |
Exceptions¶
When pod launching fails in KubernetesPodOperator. |
|
When pod does not leave the |
|
Expected pod does not exist in kube-api. |
Classes¶
Possible pod phases. |
|
Responsible for pulling pod logs from a stream with checking a container status before reading data. |
|
Return the status of the pod and last log time when exiting from fetch_container_logs. |
|
Create, monitor, and otherwise interact with Kubernetes pods for use with the KubernetesPodOperator. |
|
Action to take when the pod finishes. |
|
Create, monitor, and otherwise interact with Kubernetes pods for use with the KubernetesPodTriggerer. |
Functions¶
|
Check if an Exception indicates a transient error and warrants retrying. |
|
Read pod events and write them to the log. |
|
Monitor the startup phase of a Kubernetes pod, waiting for it to leave the |
|
Check if the line is a log group marker like ::group:: or ::endgroup::. |
|
Parse K8s log line and returns the final state. |
Module Contents¶
- airflow.providers.cncf.kubernetes.utils.pod_manager.EMPTY_XCOM_RESULT = '__airflow_xcom_result_empty__'[source]¶
Sentinel for no xcom result.
- exception airflow.providers.cncf.kubernetes.utils.pod_manager.PodLaunchFailedException[source]¶
Bases:
airflow.exceptions.AirflowExceptionWhen pod launching fails in KubernetesPodOperator.
- airflow.providers.cncf.kubernetes.utils.pod_manager.should_retry_start_pod(exception)[source]¶
Check if an Exception indicates a transient error and warrants retrying.
- class airflow.providers.cncf.kubernetes.utils.pod_manager.PodPhase[source]¶
Possible pod phases.
See https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#pod-phase.
- airflow.providers.cncf.kubernetes.utils.pod_manager.check_exception_is_kubernetes_api_unauthorized(exc)[source]¶
- async airflow.providers.cncf.kubernetes.utils.pod_manager.watch_pod_events(pod_manager, pod, check_interval=1)[source]¶
Read pod events and write them to the log.
This function supports both asynchronous and synchronous pod managers.
- Parameters:
pod_manager (PodManager | AsyncPodManager) – The pod manager instance (PodManager or AsyncPodManager).
pod (kubernetes.client.models.v1_pod.V1Pod) – The pod object to monitor.
check_interval (float) – Interval (in seconds) between checks.
- async airflow.providers.cncf.kubernetes.utils.pod_manager.await_pod_start(pod_manager, pod, schedule_timeout=120, startup_timeout=120, check_interval=1)[source]¶
Monitor the startup phase of a Kubernetes pod, waiting for it to leave the
Pendingstate.This function is shared by both PodManager and AsyncPodManager to provide consistent pod startup tracking.
- Parameters:
pod_manager (PodManager | AsyncPodManager) – The pod manager instance (PodManager or AsyncPodManager).
pod (kubernetes.client.models.v1_pod.V1Pod) – The pod object to monitor.
schedule_timeout (int) – Maximum time (in seconds) to wait for the pod to be scheduled.
startup_timeout (int) – Maximum time (in seconds) to wait for the pod to start running after being scheduled.
check_interval (float) – Interval (in seconds) between status checks.
is_async – Set to True if called in an async context; otherwise, False.
- exception airflow.providers.cncf.kubernetes.utils.pod_manager.PodLaunchTimeoutException[source]¶
Bases:
airflow.exceptions.AirflowExceptionWhen pod does not leave the
Pendingphase within specified timeout.
- exception airflow.providers.cncf.kubernetes.utils.pod_manager.PodNotFoundException[source]¶
Bases:
airflow.exceptions.AirflowExceptionExpected pod does not exist in kube-api.
- class airflow.providers.cncf.kubernetes.utils.pod_manager.PodLogsConsumer(response, pod, pod_manager, container_name, post_termination_timeout=120, read_pod_cache_timeout=120)[source]¶
Responsible for pulling pod logs from a stream with checking a container status before reading data.
This class is a workaround for the issue https://github.com/apache/airflow/issues/23497.
- Parameters:
response (urllib3.response.HTTPResponse) – HTTP response with logs
pod (kubernetes.client.models.v1_pod.V1Pod) – Pod instance from Kubernetes client
pod_manager (PodManager) – Pod manager instance
container_name (str) – Name of the container that we’re reading logs from
post_termination_timeout (int) – (Optional) The period of time in seconds representing for how long time logs are available after the container termination.
read_pod_cache_timeout (int) – (Optional) The container’s status cache lifetime. The container status is cached to reduce API calls.
- class airflow.providers.cncf.kubernetes.utils.pod_manager.PodLoggingStatus[source]¶
Return the status of the pod and last log time when exiting from fetch_container_logs.
- class airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager(kube_client, callbacks=None)[source]¶
Bases:
airflow.utils.log.logging_mixin.LoggingMixinCreate, monitor, and otherwise interact with Kubernetes pods for use with the KubernetesPodOperator.
- async await_pod_start(pod, schedule_timeout=120, startup_timeout=120, check_interval=1)[source]¶
Wait for the pod to reach phase other than
Pending.- Parameters:
pod (kubernetes.client.models.v1_pod.V1Pod)
schedule_timeout (int) – Timeout (in seconds) for pod stay in schedule state (if pod is taking to long in schedule state, fails task)
startup_timeout (int) – Timeout (in seconds) for startup of the pod (if pod is pending for too long after being scheduled, fails task)
check_interval (int) – Interval (in seconds) between checks
- Returns:
- Return type:
None
- fetch_container_logs(pod, container_name, *, follow=False, since_time=None, post_termination_timeout=120, container_name_log_prefix_enabled=True, log_formatter=None)[source]¶
Follow the logs of container and stream to airflow logging.
Returns when container exits.
Between when the pod starts and logs being available, there might be a delay due to CSR not approved and signed yet. In such situation, ApiException is thrown. This is why we are retrying on this specific exception.
- fetch_requested_init_container_logs(pod, init_containers, follow_logs=False, container_name_log_prefix_enabled=True, log_formatter=None)[source]¶
Follow the logs of containers in the specified pod and publish it to airflow logging.
Returns when all the containers exit.
- fetch_requested_container_logs(pod, containers, follow_logs=False, container_name_log_prefix_enabled=True, log_formatter=None)[source]¶
Follow the logs of containers in the specified pod and publish it to airflow logging.
Returns when all the containers exit.
- await_container_completion(pod, container_name, polling_time=1)[source]¶
Wait for the given container in the given pod to be completed.
- await_pod_completion(pod, istio_enabled=False, container_name='base')[source]¶
Monitor a pod and return the final state.
- container_is_terminated(pod, container_name)[source]¶
Read pod and checks if container is terminated.
- read_pod_logs(pod, container_name, tail_lines=None, timestamps=False, since_seconds=None, follow=True, post_termination_timeout=120, **kwargs)[source]¶
Read log from the POD.
- get_init_container_names(pod)[source]¶
Return container names from the POD except for the airflow-xcom-sidecar container.
- get_container_names(pod)[source]¶
Return container names from the POD except for the airflow-xcom-sidecar container.
- class airflow.providers.cncf.kubernetes.utils.pod_manager.OnFinishAction[source]¶
-
Action to take when the pod finishes.
- airflow.providers.cncf.kubernetes.utils.pod_manager.is_log_group_marker(line)[source]¶
Check if the line is a log group marker like ::group:: or ::endgroup::.
- airflow.providers.cncf.kubernetes.utils.pod_manager.parse_log_line(line)[source]¶
Parse K8s log line and returns the final state.
- class airflow.providers.cncf.kubernetes.utils.pod_manager.AsyncPodManager(async_hook, callbacks=None)[source]¶
Bases:
airflow.utils.log.logging_mixin.LoggingMixinCreate, monitor, and otherwise interact with Kubernetes pods for use with the KubernetesPodTriggerer.
- async await_pod_start(pod, schedule_timeout=120, startup_timeout=120, check_interval=1)[source]¶
Wait for the pod to reach phase other than
Pending.- Parameters:
pod (kubernetes.client.models.v1_pod.V1Pod)
schedule_timeout (int) – Timeout (in seconds) for pod stay in schedule state (if pod is taking to long in schedule state, fails task)
startup_timeout (int) – Timeout (in seconds) for startup of the pod (if pod is pending for too long after being scheduled, fails task)
check_interval (float) – Interval (in seconds) between checks
- Returns:
- Return type:
None
- async fetch_container_logs_before_current_sec(pod, container_name, since_time=None)[source]¶
Asynchronously read the log file of the specified pod.
This method streams logs from the base container, skipping log lines from the current second to prevent duplicate entries on subsequent reads. It is designed to handle long-running containers and gracefully suppresses transient interruptions.
- Parameters:
pod (kubernetes.client.models.v1_pod.V1Pod) – The pod specification to monitor.
container_name (str) – The name of the container within the pod.
since_time (pendulum.DateTime | None) – The timestamp from which to start reading logs.
- Returns:
The timestamp to use for the next log read, representing the start of the current second. Returns None if an exception occurred.
- Return type:
pendulum.DateTime | None