Airflow 2.0.2: Dag doesn't render correctly the template

Hi, I have two simple tasks, one is getting the list of ids, and the other has to shows the list of ids with echo command. The result in xcom push seems right . I have a list of tuple as below.

The output of return function(xcom push) is a list of tuple, as below:
[(19343160,), (19350561,), (19351381,), (19351978,), (19356674,), (19356676,), (19356678,), (19356681,), (19356682,), (19359607,)]

Here is my code:

def read_sql(file_name):
    with open(SQL_PATH + file_name) as f:
        sql = f.read()

    return sql

def query_and_push(sql):
    pg_hook = PostgresHook(postgres_conn_id='redshift')
    records = pg_hook.get_records(sql=sql)
    return records

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}
with DAG(
    'xcom_using_jinja_template',
    default_args=default_args,
    description='',
    schedule_interval=timedelta(days=1),
    start_date=days_ago(2),
    tags=['test'],
) as dag:


    t1 = PythonOperator(
        task_id='get_query_id',
        python_callable=query_and_push,
        provide_context=True,
        op_kwargs={
            'sql' : read_sql('warmupqueryid.sql')
                    }
    )
    
    
    templated_command = dedent(
        """
    {% for item in params.query_ids %}
        echo {{ item[0] }};
    {% endfor %}
    """
    )


    t2 = BashOperator(
        task_id='templated',
        depends_on_past=False,
        bash_command=templated_command,
        params={'query_ids': " {{ ti.xcom_pull(task_ids='get_query_id'), key='return_value' }}"},
    )

  
    t1 >> t2

My last task is failing due to this error, and I don’t understand why it’s not getting the value of xcom push. I am not sure if this is a bug, or if I 've just missed something.

Reading remote log from s3://ob-airflow-pre/logs/xcom_using_jinja_template/templated/2021-05-26T17:22:44.023533+00:00/1.log.
[2021-05-26 17:22:45,633] {taskinstance.py:877} INFO - Dependencies all met for <TaskInstance: xcom_using_jinja_template.templated 2021-05-26T17:22:44.023533+00:00 [queued]>
[2021-05-26 17:22:45,663] {taskinstance.py:877} INFO - Dependencies all met for <TaskInstance: xcom_using_jinja_template.templated 2021-05-26T17:22:44.023533+00:00 [queued]>
[2021-05-26 17:22:45,663] {taskinstance.py:1068} INFO - 
--------------------------------------------------------------------------------
[2021-05-26 17:22:45,663] {taskinstance.py:1069} INFO - Starting attempt 1 of 2
[2021-05-26 17:22:45,664] {taskinstance.py:1070} INFO - 
--------------------------------------------------------------------------------
[2021-05-26 17:22:45,675] {taskinstance.py:1089} INFO - Executing <Task(BashOperator): templated> on 2021-05-26T17:22:44.023533+00:00
[2021-05-26 17:22:45,679] {standard_task_runner.py:52} INFO - Started process 413 to run task
[2021-05-26 17:22:45,683] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'xcom_using_jinja_template', 'templated', '2021-05-26T17:22:44.023533+00:00', '--job-id', '1811', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/xcom_test.py', '--cfg-path', '/tmp/tmpkk2x0gyd', '--error-file', '/tmp/tmpc2ka7x4x']
[2021-05-26 17:22:45,683] {standard_task_runner.py:77} INFO - Job 1811: Subtask templated
[2021-05-26 17:22:45,859] {logging_mixin.py:104} INFO - Running <TaskInstance: xcom_using_jinja_template.templated 2021-05-26T17:22:44.023533+00:00 [running]> on host airflow-worker-1.airflow-worker.airflow.svc.cluster.local
[2021-05-26 17:22:45,945] {taskinstance.py:1281} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_EMAIL=airflow@example.com
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=xcom_using_jinja_template
AIRFLOW_CTX_TASK_ID=templated
AIRFLOW_CTX_EXECUTION_DATE=2021-05-26T17:22:44.023533+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-05-26T17:22:44.023533+00:00
[2021-05-26 17:22:45,946] {bash.py:135} INFO - Tmp dir root location: 
 /tmp
[2021-05-26 17:22:45,947] {bash.py:158} INFO - Running command: 

    echo  ;

    echo {;

    echo {;

    echo  ;

    echo t;

    echo i;

    echo .;

    echo x;

    echo c;

    echo o;

    echo m;

    echo _;

    echo p;

    echo u;

    echo l;

    echo l;

    echo (;

    echo t;

    echo a;

    echo s;

    echo k;

    echo _;

    echo i;

    echo d;

    echo s;

    echo =;

    echo ';

    echo g;

    echo e;

    echo t;

    echo _;

    echo q;

    echo u;

    echo e;

    echo r;

    echo y;

    echo _;

    echo i;

    echo d;

    echo ';

    echo );

    echo ,;

    echo  ;

    echo k;

    echo e;

    echo y;

    echo =;

    echo ';

    echo r;

    echo e;

    echo t;

    echo u;

    echo r;

    echo n;

    echo _;

    echo v;

    echo a;

    echo l;

    echo u;

    echo e;

    echo ';

    echo  ;

    echo };

    echo };

[2021-05-26 17:22:45,954] {bash.py:169} INFO - Output:
[2021-05-26 17:22:45,955] {bash.py:173} INFO - 
[2021-05-26 17:22:45,955] {bash.py:173} INFO - {
[2021-05-26 17:22:45,955] {bash.py:173} INFO - {
[2021-05-26 17:22:45,955] {bash.py:173} INFO - 
[2021-05-26 17:22:45,955] {bash.py:173} INFO - t
[2021-05-26 17:22:45,955] {bash.py:173} INFO - i
[2021-05-26 17:22:45,955] {bash.py:173} INFO - .
[2021-05-26 17:22:45,955] {bash.py:173} INFO - x
[2021-05-26 17:22:45,955] {bash.py:173} INFO - c
[2021-05-26 17:22:45,955] {bash.py:173} INFO - o
[2021-05-26 17:22:45,955] {bash.py:173} INFO - m
[2021-05-26 17:22:45,955] {bash.py:173} INFO - _
[2021-05-26 17:22:45,955] {bash.py:173} INFO - p
[2021-05-26 17:22:45,956] {bash.py:173} INFO - u
[2021-05-26 17:22:45,956] {bash.py:173} INFO - l
[2021-05-26 17:22:45,956] {bash.py:173} INFO - l
[2021-05-26 17:22:45,956] {bash.py:173} INFO - bash: -c: line 34: syntax error near unexpected token `;'
[2021-05-26 17:22:45,956] {bash.py:173} INFO - bash: -c: line 34: `    echo (;'
[2021-05-26 17:22:45,956] {bash.py:177} INFO - Command exited with return code 1
[2021-05-26 17:22:45,976] {taskinstance.py:1482} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1138, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1311, in _prepare_and_execute_task_with_callbacks
    result = self._execute_task(context, task_copy)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1341, in _execute_task
    result = task_copy.execute(context=context)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/bash.py", line 180, in execute
    raise AirflowException('Bash command failed. The command returned a non-zero exit code.')
airflow.exceptions.AirflowException: Bash command failed. The command returned a non-zero exit code.
[2021-05-26 17:22:45,978] {taskinstance.py:1525} INFO - Marking task as UP_FOR_RETRY. dag_id=xcom_using_jinja_template, task_id=templated, execution_date=20210526T172244, start_date=20210526T172245, end_date=20210526T172245
[2021-05-26 17:22:46,014] {local_task_job.py:146} INFO - Task exited with return code 1

Hi @maryampashmi!

Unfortunately you won’t be able to use params in a Jinja-templated way directly with the bash_command as written since params is not a template_field for the BashOperator. However, you can reference the return_value XCom from the get_query_id task as a variable in Jinja like so:

templated_command = dedent(
    """
    {% set query_ids = ti.xcom_pull(task_ids='get_query_id', key='return_value') %}
    {% for item in query_ids %}
        echo {{ item[0] }};
    {% endfor %}
    """
    )


    t2 = BashOperator(
        task_id='templated',
        depends_on_past=False,
        bash_command=templated_command,
    )

Now the templated_command directly references the XCom you need and sets it to a variable in the Jinja string and get the output you expect:
image

If you still want to use params, you can create a .sh or .bash file with your original templated_command and reference the file as the argument for the bash_command parameter for the BashOperator.

I hope this helps!

1 Like