Databricks async operator fails to retrieve response['state']

Hi! I’ve got a ‘tiny’ problem with the DatabricksSubmitRunOperatorAsync. It cannot retrieve the state of the job and I get the following error when I try to run anything:

[2022-06-02, 10:24:20 UTC] {taskinstance.py:1507} ERROR - Trigger failed:
Traceback (most recent call last):

  File "/usr/local/lib/python3.9/site-packages/airflow/jobs/triggerer_job.py", line 296, in cleanup_finished_triggers
    result = details["task"].result()

  File "/usr/local/lib/python3.9/site-packages/airflow/jobs/triggerer_job.py", line 360, in run_trigger
    async for event in trigger.run():

  File "/usr/local/lib/python3.9/site-packages/astronomer/providers/databricks/triggers/databricks.py", line 72, in run
    run_state = await hook.get_run_state_async(self.run_id)

  File "/usr/local/lib/python3.9/site-packages/astronomer/providers/databricks/hooks/databricks.py", line 44, in get_run_state_async
    state = response["state"]

KeyError: 'state'

[2022-06-02, 10:24:20 UTC] {taskinstance.py:1743} WARNING - We expected to get frame set in local storage but it was not. Please report this as an issue with full logs at https://github.com/apache/airflow/issues/new
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1340, in _run_raw_task
    self._execute_task_with_callbacks(context)
  File "/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1477, in _execute_task_with_callbacks
    result = self._execute_task(context, self.task)
  File "/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1508, in _execute_task
    raise TaskDeferralError(next_kwargs.get("error", "Unknown"))
airflow.exceptions.TaskDeferralError: Trigger failure

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1741, in get_truncated_error_traceback
    execution_frame = _TASK_EXECUTION_FRAME_LOCAL_STORAGE.frame
AttributeError: '_thread._local' object has no attribute 'frame'
[2022-06-02, 10:24:20 UTC] {taskinstance.py:1774} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1340, in _run_raw_task
    self._execute_task_with_callbacks(context)
  File "/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1477, in _execute_task_with_callbacks
    result = self._execute_task(context, self.task)
  File "/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1508, in _execute_task
    raise TaskDeferralError(next_kwargs.get("error", "Unknown"))
airflow.exceptions.TaskDeferralError: Trigger failure

It’s trying to get the state information from the API response, but the response does not contain this key. Funnily enough, it does not happen with the non-async version.

I’m running Airflow 2.2.5 on Astronomer deployed locally in our company.

I’d appreciate any pointers on how to rectify this issue.