K8SPodOperator Guide Inaccurate/Incomplete?

I’m trying to follow this guide to using astronomer with KubernetesPodOperator, particularly with a locally running kube cluster:

I have followed the instructions closely, but am struggling with execution of the example DAG in the this tutorial. I get the following error:

[2023-12-13, 14:49:28 UTC] {pod.py:1028} ERROR - 'NoneType' object has no attribute 'metadata'
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/urllib3/connection.py", line 174, in _new_conn
    conn = connection.create_connection(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/urllib3/util/connection.py", line 95, in create_connection
    raise err
  File "/usr/local/lib/python3.11/site-packages/urllib3/util/connection.py", line 85, in create_connection
    sock.connect(sa)
ConnectionRefusedError: [Errno 111] Connection refused
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/urllib3/connectionpool.py", line 715, in urlopen
    httplib_response = self._make_request(
                       ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/urllib3/connectionpool.py", line 404, in _make_request
    self._validate_conn(conn)
  File "/usr/local/lib/python3.11/site-packages/urllib3/connectionpool.py", line 1058, in _validate_conn
    conn.connect()
  File "/usr/local/lib/python3.11/site-packages/urllib3/connection.py", line 363, in connect
    self.sock = conn = self._new_conn()
                       ^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/urllib3/connection.py", line 186, in _new_conn
    raise NewConnectionError(
urllib3.exceptions.NewConnectionError: <urllib3.connection.HTTPSConnection object at 0xffff6efe27d0>: Failed to establish a new connection: [Errno 111] Connection refused
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 599, in execute_sync
    self.pod = self.get_or_create_pod(  # must set `self.pod` for `on_kill`
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 556, in get_or_create_pod
    pod = self.find_pod(self.namespace or pod_request_obj.metadata.namespace, context=context)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 538, in find_pod
    pod_list = self.client.list_namespaced_pod(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/kubernetes/client/api/core_v1_api.py", line 15697, in list_namespaced_pod
    return self.list_namespaced_pod_with_http_info(namespace, **kwargs)  # noqa: E501
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/kubernetes/client/api/core_v1_api.py", line 15812, in list_namespaced_pod_with_http_info
    return self.api_client.call_api(
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/kubernetes/client/api_client.py", line 348, in call_api
    return self.__call_api(resource_path, method,
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/kubernetes/client/api_client.py", line 180, in __call_api
    response_data = self.request(
                    ^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/kubernetes/client/api_client.py", line 373, in request
    return self.rest_client.GET(url,
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/kubernetes/client/rest.py", line 240, in GET
    return self.request("GET", url,
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/kubernetes/client/rest.py", line 213, in request
    r = self.pool_manager.request(method, url,
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/urllib3/request.py", line 77, in request
    return self.request_encode_url(
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/urllib3/request.py", line 99, in request_encode_url
    return self.urlopen(method, url, **extra_kw)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/urllib3/poolmanager.py", line 376, in urlopen
    response = conn.urlopen(method, u.request_uri, **kw)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/urllib3/connectionpool.py", line 827, in urlopen
    return self.urlopen(
           ^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/urllib3/connectionpool.py", line 827, in urlopen
    return self.urlopen(
           ^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/urllib3/connectionpool.py", line 827, in urlopen
    return self.urlopen(
           ^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/urllib3/connectionpool.py", line 799, in urlopen
    retries = retries.increment(
              ^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/urllib3/util/retry.py", line 592, in increment
    raise MaxRetryError(_pool, url, error or ResponseError(cause))
urllib3.exceptions.MaxRetryError: HTTPSConnectionPool(host='127.0.0.1', port=6443): Max retries exceeded with url: /api/v1/namespaces/default/pods?labelSelector=%3Cpod-label%3E%3D%3Clabel-name%3E%2Cdag_id%3Dexample_kubernetes_pod%2Ckubernetes_pod_operator%3DTrue%2Crun_id%3Dmanual__2023-12-13T144926.5556220000-aa4116bc8%2Ctask_id%3Dtask-one%2Calready_checked%21%3DTrue%2C%21airflow-worker (Caused by NewConnectionError('<urllib3.connection.HTTPSConnection object at 0xffff6efe27d0>: Failed to establish a new connection: [Errno 111] Connection refused'))
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 866, in patch_already_checked
    name=pod.metadata.name,
         ^^^^^^^^^^^^
AttributeError: 'NoneType' object has no attribute 'metadata'
[2023-12-13, 14:49:28 UTC] {taskinstance.py:1937} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/urllib3/connection.py", line 174, in _new_conn
    conn = connection.create_connection(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/urllib3/util/connection.py", line 95, in create_connection
    raise err
  File "/usr/local/lib/python3.11/site-packages/urllib3/util/connection.py", line 85, in create_connection
    sock.connect(sa)
ConnectionRefusedError: [Errno 111] Connection refused
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/urllib3/connectionpool.py", line 715, in urlopen
    httplib_response = self._make_request(
                       ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/urllib3/connectionpool.py", line 404, in _make_request
    self._validate_conn(conn)
  File "/usr/local/lib/python3.11/site-packages/urllib3/connectionpool.py", line 1058, in _validate_conn
    conn.connect()
  File "/usr/local/lib/python3.11/site-packages/urllib3/connection.py", line 363, in connect
    self.sock = conn = self._new_conn()
                       ^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/urllib3/connection.py", line 186, in _new_conn
    raise NewConnectionError(
urllib3.exceptions.NewConnectionError: <urllib3.connection.HTTPSConnection object at 0xffff6efe27d0>: Failed to establish a new connection: [Errno 111] Connection refused
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 599, in execute_sync
    self.pod = self.get_or_create_pod(  # must set `self.pod` for `on_kill`
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 556, in get_or_create_pod
    pod = self.find_pod(self.namespace or pod_request_obj.metadata.namespace, context=context)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 538, in find_pod
    pod_list = self.client.list_namespaced_pod(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/kubernetes/client/api/core_v1_api.py", line 15697, in list_namespaced_pod
    return self.list_namespaced_pod_with_http_info(namespace, **kwargs)  # noqa: E501
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/kubernetes/client/api/core_v1_api.py", line 15812, in list_namespaced_pod_with_http_info
    return self.api_client.call_api(
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/kubernetes/client/api_client.py", line 348, in call_api
    return self.__call_api(resource_path, method,
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/kubernetes/client/api_client.py", line 180, in __call_api
    response_data = self.request(
                    ^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/kubernetes/client/api_client.py", line 373, in request
    return self.rest_client.GET(url,
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/kubernetes/client/rest.py", line 240, in GET
    return self.request("GET", url,
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/kubernetes/client/rest.py", line 213, in request
    r = self.pool_manager.request(method, url,
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/urllib3/request.py", line 77, in request
    return self.request_encode_url(
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/urllib3/request.py", line 99, in request_encode_url
    return self.urlopen(method, url, **extra_kw)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/urllib3/poolmanager.py", line 376, in urlopen
    response = conn.urlopen(method, u.request_uri, **kw)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/urllib3/connectionpool.py", line 827, in urlopen
    return self.urlopen(
           ^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/urllib3/connectionpool.py", line 827, in urlopen
    return self.urlopen(
           ^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/urllib3/connectionpool.py", line 827, in urlopen
    return self.urlopen(
           ^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/urllib3/connectionpool.py", line 799, in urlopen
    retries = retries.increment(
              ^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/urllib3/util/retry.py", line 592, in increment
    raise MaxRetryError(_pool, url, error or ResponseError(cause))
urllib3.exceptions.MaxRetryError: HTTPSConnectionPool(host='127.0.0.1', port=6443): Max retries exceeded with url: /api/v1/namespaces/default/pods?labelSelector=%3Cpod-label%3E%3D%3Clabel-name%3E%2Cdag_id%3Dexample_kubernetes_pod%2Ckubernetes_pod_operator%3DTrue%2Crun_id%3Dmanual__2023-12-13T144926.5556220000-aa4116bc8%2Ctask_id%3Dtask-one%2Calready_checked%21%3DTrue%2C%21airflow-worker (Caused by NewConnectionError('<urllib3.connection.HTTPSConnection object at 0xffff6efe27d0>: Failed to establish a new connection: [Errno 111] Connection refused'))
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 593, in execute
    return self.execute_sync(context)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 633, in execute_sync
    self.cleanup(
  File "/usr/local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 766, in cleanup
    raise AirflowException(
airflow.exceptions.AirflowException: Pod airflow-test-pod-j1lpevz7 returned a failure.
remote_pod: None

What’s odd to me is that the dag points to a config_file that doesn’t actually exist If I try to replace the config_file in the dag, which points to a file that doesn’t exist on my system, with the one that the guide says to create, I get this error instead:

[2023-12-12, 16:50:42 UTC] {pod.py:1028} ERROR - Invalid kube-config file. No configuration found.
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 598, in execute_sync
    self.pod_request_obj = self.build_pod_request_obj(context)
                           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 983, in build_pod_request_obj
    "airflow_kpo_in_cluster": str(self.hook.is_in_cluster),
                                  ^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/hooks/kubernetes.py", line 271, in is_in_cluster
    self.api_client  # so we can determine if we are in_cluster or not
    ^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/functools.py", line 1001, in __get__
    val = self.func(instance)
          ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/hooks/kubernetes.py", line 279, in api_client
    return self.get_conn()
           ^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/hooks/kubernetes.py", line 227, in get_conn
    config.load_kube_config(
  File "/usr/local/lib/python3.11/site-packages/kubernetes/config/kube_config.py", line 808, in load_kube_config
    loader = _get_kube_config_loader(
             ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/kubernetes/config/kube_config.py", line 767, in _get_kube_config_loader
    raise ConfigException(
kubernetes.config.config_exception.ConfigException: Invalid kube-config file. No configuration found.
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 865, in patch_already_checked
    self.client.patch_namespaced_pod(
    ^^^^^^^^^^^
  File "/usr/local/lib/python3.11/functools.py", line 1001, in __get__
    val = self.func(instance)
          ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 533, in client
    return self.hook.core_v1_client
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/functools.py", line 1001, in __get__
    val = self.func(instance)
          ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/hooks/kubernetes.py", line 283, in core_v1_client
    return client.CoreV1Api(api_client=self.api_client)
                                       ^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/functools.py", line 1001, in __get__
    val = self.func(instance)
          ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/hooks/kubernetes.py", line 279, in api_client
    return self.get_conn()
           ^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/hooks/kubernetes.py", line 227, in get_conn
    config.load_kube_config(
  File "/usr/local/lib/python3.11/site-packages/kubernetes/config/kube_config.py", line 808, in load_kube_config
    loader = _get_kube_config_loader(
             ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/kubernetes/config/kube_config.py", line 767, in _get_kube_config_loader
    raise ConfigException(
kubernetes.config.config_exception.ConfigException: Invalid kube-config file. No configuration found.
[2023-12-12, 16:50:42 UTC] {taskinstance.py:1937} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 598, in execute_sync
    self.pod_request_obj = self.build_pod_request_obj(context)
                           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 983, in build_pod_request_obj
    "airflow_kpo_in_cluster": str(self.hook.is_in_cluster),
                                  ^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/hooks/kubernetes.py", line 271, in is_in_cluster
    self.api_client  # so we can determine if we are in_cluster or not
    ^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/functools.py", line 1001, in __get__
    val = self.func(instance)
          ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/hooks/kubernetes.py", line 279, in api_client
    return self.get_conn()
           ^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/hooks/kubernetes.py", line 227, in get_conn
    config.load_kube_config(
  File "/usr/local/lib/python3.11/site-packages/kubernetes/config/kube_config.py", line 808, in load_kube_config
    loader = _get_kube_config_loader(
             ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/kubernetes/config/kube_config.py", line 767, in _get_kube_config_loader
    raise ConfigException(
kubernetes.config.config_exception.ConfigException: Invalid kube-config file. No configuration found.
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 593, in execute
    return self.execute_sync(context)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 633, in execute_sync
    self.cleanup(
  File "/usr/local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 766, in cleanup
    raise AirflowException(
airflow.exceptions.AirflowException: Pod None returned a failure.

Any help appreciated. It feels like there is a bit of info missing from this guide related to how to get the DAG hooked up with the kubernetes. I also tried specifying a connection via the airflow UI, since I’m not running airflow in the k8s cluster itself. No luck there either.

Hey @Amr

Apologies that you are facing this issue and we would work on fixing this in our docs.

But to unblock you and identify the issue, would it be possible for you to share the DAG file and the config_file that you created? If not, can you confirm the following:

  • You have a local Airflow running using Astro CLI
  • You have a local K8s cluster where you intend to run the KPO (or specifically you are using the example in this section?

Thanks
Manmeet

Hello Manmeet, I am specifically using the instructions listed at the URL you provided:

So that’s why I expect that this should work. The config I generated using the instructions in this doc is this:

apiVersion: v1
clusters:
- cluster:
    certificate-authority-data:  <REDACTED>
    server: https://127.0.0.1:6443
  name: docker-desktop
contexts:
- context:
    cluster: docker-desktop
    user: docker-desktop
  name: docker-desktop
current-context: docker-desktop
kind: Config
preferences: {}
users:
- name: docker-desktop
  user:
    client-certificate-data: <REDACTED>
    client-key-data:  <REDACTED>

Thanks @Amr

Give us some time to check and fix this. We will update you soon.

Thanks
Manmeet

Hey @Amr

The server in your kubeconfig file should be https://kubernetes.docker.internal:6443/ . If K8s is running on your local system in a docker container, you cannot access it as 127.0.0.1. Your error is also pointing in that direction.

I have verified the code in the guide, and it seems okay to me. Could you please try again.

Thanks
Manmeet