Hi, I have two simple tasks, one is getting the list of ids, and the other has to shows the list of ids with echo command. The result in xcom push seems right . I have a list of tuple as below.
The output of return function(xcom push) is a list of tuple, as below:
[(19343160,), (19350561,), (19351381,), (19351978,), (19356674,), (19356676,), (19356678,), (19356681,), (19356682,), (19359607,)]
Here is my code:
def read_sql(file_name):
with open(SQL_PATH + file_name) as f:
sql = f.read()
return sql
def query_and_push(sql):
pg_hook = PostgresHook(postgres_conn_id='redshift')
records = pg_hook.get_records(sql=sql)
return records
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'xcom_using_jinja_template',
default_args=default_args,
description='',
schedule_interval=timedelta(days=1),
start_date=days_ago(2),
tags=['test'],
) as dag:
t1 = PythonOperator(
task_id='get_query_id',
python_callable=query_and_push,
provide_context=True,
op_kwargs={
'sql' : read_sql('warmupqueryid.sql')
}
)
templated_command = dedent(
"""
{% for item in params.query_ids %}
echo {{ item[0] }};
{% endfor %}
"""
)
t2 = BashOperator(
task_id='templated',
depends_on_past=False,
bash_command=templated_command,
params={'query_ids': " {{ ti.xcom_pull(task_ids='get_query_id'), key='return_value' }}"},
)
t1 >> t2
My last task is failing due to this error, and I don’t understand why it’s not getting the value of xcom push. I am not sure if this is a bug, or if I 've just missed something.
Reading remote log from s3://ob-airflow-pre/logs/xcom_using_jinja_template/templated/2021-05-26T17:22:44.023533+00:00/1.log.
[2021-05-26 17:22:45,633] {taskinstance.py:877} INFO - Dependencies all met for <TaskInstance: xcom_using_jinja_template.templated 2021-05-26T17:22:44.023533+00:00 [queued]>
[2021-05-26 17:22:45,663] {taskinstance.py:877} INFO - Dependencies all met for <TaskInstance: xcom_using_jinja_template.templated 2021-05-26T17:22:44.023533+00:00 [queued]>
[2021-05-26 17:22:45,663] {taskinstance.py:1068} INFO -
--------------------------------------------------------------------------------
[2021-05-26 17:22:45,663] {taskinstance.py:1069} INFO - Starting attempt 1 of 2
[2021-05-26 17:22:45,664] {taskinstance.py:1070} INFO -
--------------------------------------------------------------------------------
[2021-05-26 17:22:45,675] {taskinstance.py:1089} INFO - Executing <Task(BashOperator): templated> on 2021-05-26T17:22:44.023533+00:00
[2021-05-26 17:22:45,679] {standard_task_runner.py:52} INFO - Started process 413 to run task
[2021-05-26 17:22:45,683] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'xcom_using_jinja_template', 'templated', '2021-05-26T17:22:44.023533+00:00', '--job-id', '1811', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/xcom_test.py', '--cfg-path', '/tmp/tmpkk2x0gyd', '--error-file', '/tmp/tmpc2ka7x4x']
[2021-05-26 17:22:45,683] {standard_task_runner.py:77} INFO - Job 1811: Subtask templated
[2021-05-26 17:22:45,859] {logging_mixin.py:104} INFO - Running <TaskInstance: xcom_using_jinja_template.templated 2021-05-26T17:22:44.023533+00:00 [running]> on host airflow-worker-1.airflow-worker.airflow.svc.cluster.local
[2021-05-26 17:22:45,945] {taskinstance.py:1281} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_EMAIL=airflow@example.com
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=xcom_using_jinja_template
AIRFLOW_CTX_TASK_ID=templated
AIRFLOW_CTX_EXECUTION_DATE=2021-05-26T17:22:44.023533+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-05-26T17:22:44.023533+00:00
[2021-05-26 17:22:45,946] {bash.py:135} INFO - Tmp dir root location:
/tmp
[2021-05-26 17:22:45,947] {bash.py:158} INFO - Running command:
echo ;
echo {;
echo {;
echo ;
echo t;
echo i;
echo .;
echo x;
echo c;
echo o;
echo m;
echo _;
echo p;
echo u;
echo l;
echo l;
echo (;
echo t;
echo a;
echo s;
echo k;
echo _;
echo i;
echo d;
echo s;
echo =;
echo ';
echo g;
echo e;
echo t;
echo _;
echo q;
echo u;
echo e;
echo r;
echo y;
echo _;
echo i;
echo d;
echo ';
echo );
echo ,;
echo ;
echo k;
echo e;
echo y;
echo =;
echo ';
echo r;
echo e;
echo t;
echo u;
echo r;
echo n;
echo _;
echo v;
echo a;
echo l;
echo u;
echo e;
echo ';
echo ;
echo };
echo };
[2021-05-26 17:22:45,954] {bash.py:169} INFO - Output:
[2021-05-26 17:22:45,955] {bash.py:173} INFO -
[2021-05-26 17:22:45,955] {bash.py:173} INFO - {
[2021-05-26 17:22:45,955] {bash.py:173} INFO - {
[2021-05-26 17:22:45,955] {bash.py:173} INFO -
[2021-05-26 17:22:45,955] {bash.py:173} INFO - t
[2021-05-26 17:22:45,955] {bash.py:173} INFO - i
[2021-05-26 17:22:45,955] {bash.py:173} INFO - .
[2021-05-26 17:22:45,955] {bash.py:173} INFO - x
[2021-05-26 17:22:45,955] {bash.py:173} INFO - c
[2021-05-26 17:22:45,955] {bash.py:173} INFO - o
[2021-05-26 17:22:45,955] {bash.py:173} INFO - m
[2021-05-26 17:22:45,955] {bash.py:173} INFO - _
[2021-05-26 17:22:45,955] {bash.py:173} INFO - p
[2021-05-26 17:22:45,956] {bash.py:173} INFO - u
[2021-05-26 17:22:45,956] {bash.py:173} INFO - l
[2021-05-26 17:22:45,956] {bash.py:173} INFO - l
[2021-05-26 17:22:45,956] {bash.py:173} INFO - bash: -c: line 34: syntax error near unexpected token `;'
[2021-05-26 17:22:45,956] {bash.py:173} INFO - bash: -c: line 34: ` echo (;'
[2021-05-26 17:22:45,956] {bash.py:177} INFO - Command exited with return code 1
[2021-05-26 17:22:45,976] {taskinstance.py:1482} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1138, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1311, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1341, in _execute_task
result = task_copy.execute(context=context)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/bash.py", line 180, in execute
raise AirflowException('Bash command failed. The command returned a non-zero exit code.')
airflow.exceptions.AirflowException: Bash command failed. The command returned a non-zero exit code.
[2021-05-26 17:22:45,978] {taskinstance.py:1525} INFO - Marking task as UP_FOR_RETRY. dag_id=xcom_using_jinja_template, task_id=templated, execution_date=20210526T172244, start_date=20210526T172245, end_date=20210526T172245
[2021-05-26 17:22:46,014] {local_task_job.py:146} INFO - Task exited with return code 1