Hi Team,
we are getting the Unknown string format: {{ti.xcom_pull(task_ids=‘get_execution_date’,key = ‘dag_execution_date’)}}.Could you someone help me how to fix this issue.it would be very helpful for my test.
How do we get the xcom pull inside a json field in DatabricksSubmitRunOperator.
Error : dateutil.parser._parser.ParserError: Unknown string format: {{ti.xcom_pull(task_ids=‘get_execution_date’,key = ‘dag_execution_date’)}}
def _print_exec_date(**kwargs):
pprint(kwargs)
execution_date_val = kwargs[“execution_date”]
#execution_date = start.strftime("%Y-%m-%dT%H")
print(‘execution_date:’,execution_date_val)
kwargs[‘ti’].xcom_push(key=‘dag_execution_date’, value = execution_date_val)
get_exec_date = PythonOperator(
task_id=“get_execution_date”,
python_callable =_print_exec_date,
provide_context=True,
dag=dag,
)
task1 = DatabricksSubmitRunOperator(
task_id=‘ETL_PROCESS’,
dag=dag,
databricks_conn_id = ‘dx_conn’,
trigger_rule=“all_done”,
raw_path = “path/directory/”,
table_name = “etl_table”,
json=directory_name.get_task_params(
timeout_seconds=700,
driver_type=’’, worker_type=’’, num_workers=,
execution_date_param = “{{ti.xcom_pull(task_ids=‘get_execution_date’,key = ‘dag_execution_date’)}}”
),
provide_context=True
)
get_exec_date >> task1
#method to check
from dateutil.parser import parse
def get_task_params(execution_date_param,raw_path,table_name):
“”“TODO”""
execution_date1 = parse(execution_date_param) → throwing the error.
print(‘execution_date1:’,execution_date1)
in the logs i could see the following message:
[2021-03-25 11:36:05,326] {{logging_mixin.py:112}} INFO - Running %s on host %s <TaskInstance: flatten_test.get_execution_date 2021-03-25T10:05:00+00:00 [running]>
[2021-03-25 11:36:05,745] {{logging_mixin.py:112}} INFO - execution_date: 2021-03-25T10:05:00+00:00
[2021-03-25 11:36:05,801] {{taskinstance.py:1150}} ERROR - Object of type Pendulum is not JSON serializable
Warnings:
[2021-03-25 12:19:49,992] {{logging_mixin.py:112}} WARNING - /usr/local/lib/python3.7/site-packages/airflow/contrib/operators/databricks_operator.py:245: PendingDeprecationWarning: Invalid arguments were passed to DatabricksSubmitRunOperator (task_id: task1). Support for passing such arguments will be dropped in Airflow 2.0. Invalid arguments were:
*args: ()
**kwargs: {‘provide_context’: True}
super(DatabricksSubmitRunOperator, self).init(**kwargs)
Thanks
Anbu