Xcom_push will be deprecated in Airflow version 2.0

Hi there, I noticed xcom_push in operators like PostgresOperator is deprecating in Airflow version 2.0.

What is your take on this? How do we suppose to exchange information by the result set of select query with its downstream tasks where we want to use this result set (xcom) variable to do business intelligence/ETL. Am I missing something?

[2020-06-08 10:25:19,259] {taskinstance.py:900} INFO - Executing <Task(PythonOperator): extract_last_batch_date> on 2020-06-07T11:00:00+00:00
/home/airflow/.local/lib/python3.7/site-packages/airflow/operators/postgres_operator.py:54: PendingDeprecationWarning: Invalid arguments were passed to PostgresOperator (task_id: current_batch_date). Support for passing such arguments will be dropped in Airflow 2.0. Invalid arguments were:
*args: ()
**kwargs: {'postgress_conn_id': 'va_target', 'xcom_push': True}
  super(PostgresOperator, self).__init__(*args, **kwargs)

Many thanks,
Ozgur

2 Likes

Hey, @ozgurgul! Thanks for reaching out.

Depending on the operator you are using, there could be a xcom_push parameter associated an operator’s __init__ method. As far as I know, BashOperator is the only operator with that parameter in the past.

On a side note, it looks like even that parameter is on it’s way out in favour for do_xcom_push, which will be universal for all operators who are a child of BaseOperator.

Pre 1.10.9 and including, BashOperator returned the output if the xcom_push is set as True.

In 1.10.10, the conditional is removed since the same logic can be found in TaskInstance since 1.10.5.

For clarification from the official airflow documentation, this interaction is described under the XCom section and it has been this way as early as 1.8.1.

In addition, if a task returns a value (either from its Operator’s execute() method, or from a PythonOperator’s python_callable function), then an XCom containing that value is automatically pushed.

Back to your example, PostgresOperator has never had any parameters associated with xcom until 1.10.5 when do_xcom_push was added as a parameters for BaseOperator. Even then, PostgresOperator does not have a return statement meaning it is impossible for any values to be pushed to XCom as is.

On the other hand, there are various routes you can take to communicate the result of a select query through XCOM. One easy way is to use a PythonOperator since it is one of the most flexible operators that allows you to do practically anything in python. An example is provided in the XCom section of the documentation and I have provided a more tailored example below.

from airflow.operators.python_operator import PythonOperator
from airflow.hooks.postgres_hook import PostgresHook

def query_and_push(sql):
    pg_hook = PostgresHook(postgres_conn_id='pg_conn')
    records = pg_hook.get_records(sql=sql)
    return records

action = PythonOperator(
    task_id='push_result',
    python_callable=query_and_push,
    provide_context=True,
    op_kwargs={
        'sql': 'select * from table;'
    },
    dag=dag
)

Please let me know if you still have questions about XCom! I am happy to explain in further details :smiley:

5 Likes

Great answer @Alan :rocket: and welcome to the forum @ozgurgul

2 Likes

Hi Alan, thanks so much for your clarification. It makes perfect sense.

I ran this query, but it didnt returns anything and failed as well.

Update: I tried to remove provide_context=True then it worked, any idea?

*** Reading local file: /root/airflow/logs/sampledag/push_result/2020-10-14T11:40:55.146280+00:00/1.log
[2020-10-14 11:55:58,447] {taskinstance.py:670} INFO - Dependencies all met for <TaskInstance: sampledag.push_result 2020-10-14T11:40:55.146280+00:00 [queued]>
[2020-10-14 11:55:58,474] {taskinstance.py:670} INFO - Dependencies all met for <TaskInstance: sampledag.push_result 2020-10-14T11:40:55.146280+00:00 [queued]>
[2020-10-14 11:55:58,474] {taskinstance.py:880} INFO - 
--------------------------------------------------------------------------------
[2020-10-14 11:55:58,474] {taskinstance.py:881} INFO - Starting attempt 1 of 1
[2020-10-14 11:55:58,474] {taskinstance.py:882} INFO - 
--------------------------------------------------------------------------------
[2020-10-14 11:55:58,482] {taskinstance.py:901} INFO - Executing <Task(PythonOperator): push_result> on 2020-10-14T11:40:55.146280+00:00
[2020-10-14 11:55:58,484] {standard_task_runner.py:54} INFO - Started process 29321 to run task
[2020-10-14 11:55:58,506] {standard_task_runner.py:77} INFO - Running: ['airflow', 'run', 'sampledag', 'push_result', '2020-10-14T11:40:55.146280+00:00', '--job_id', '4116', '--pool', 'default_pool', '--raw', '-sd', 'DAGS_FOLDER/test1.py', '--cfg_path', '/tmp/tmp0kp_lvfx']
[2020-10-14 11:55:58,506] {standard_task_runner.py:78} INFO - Job 4116: Subtask push_result
[2020-10-14 11:55:58,548] {logging_mixin.py:112} INFO - Running %s on host %s <TaskInstance: sampledag.push_result 2020-10-14T11:40:55.146280+00:00 [running]> bhuvi.c.searce-academy.internal
[2020-10-14 11:55:58,575] {taskinstance.py:1150} ERROR - query_and_push() got an unexpected keyword argument 'conf'
Traceback (most recent call last):
  File "/root/.local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 984, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/root/.local/lib/python3.7/site-packages/airflow/operators/python_operator.py", line 113, in execute
    return_value = self.execute_callable()
  File "/root/.local/lib/python3.7/site-packages/airflow/operators/python_operator.py", line 118, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
TypeError: query_and_push() got an unexpected keyword argument 'conf'
[2020-10-14 11:55:58,577] {taskinstance.py:1194} INFO - Marking task as FAILED. dag_id=sampledag, task_id=push_result, execution_date=20201014T114055, start_date=20201014T115558, end_date=20201014T115558
[2020-10-14 11:56:03,438] {local_task_job.py:102} INFO - Task exited with return code 1

Ahhh! My mistake there! I shouldn’t have set provide context as True since I did not need the context for the python callable.

When you set provide_context as True, the python callable needs to have a parameter that accepts keyword argument. The solution to fix the above example would be to
remove the provide_context parameter

action = PythonOperator(
    task_id='push_result',
    python_callable=query_and_push,
    op_kwargs={
        'sql': 'select * from table;'
    },
    dag=dag
)

or add **context (or any name will suffice) to the python callable.

def query_and_push(sql, **context):
    pg_hook = PostgresHook(postgres_conn_id='pg_conn')
    records = pg_hook.get_records(sql=sql)
    return records
1 Like

@Alan Thank for your example. Actually I was looking for how to pass the query result from one to another. the first query returns a list, and the second, will use this list as parameter and will pass it to where clause. I am not sure how to do the rest part. But I have tried your code applying to my use case, and I have 2 questions for you: I will appreciate your response.
You have used get_records method, this method in not implemented with the latest version of postgresHook (Using airflow 2.0.2). I don’t get how that works here :wink:
https://airflow.apache.org/docs/apache-airflow-providers-postgres/stable/_modules/airflow/providers/postgres/hooks/postgres.html#PostgresHook
2. How to pass sql file instead of sql itself. To be able to pass file_name ?!

I believe PostgresHook has always been a child of DbApiHook, which does have the get_record method.

I see the dilemma here, you want to template your sql but the PythonOperator doesn’t have the sql extension added in template_ext.

In this case, I would suggest creating your own custom Operator that is a child of PythonOperator. You will need to include the sql extension in template_ext, which Airflow will then render the content of sql files as the true value of the given parameter.

class SqlPythonOperator(PythonOperator):
    template_ext = ('.sql',)

Since the PythonOperator already have fields (like op_kwargs) where you can use to pass in the SQL file name (see source), all you need is tell Airflow to render .sql files.

With the true SQL script instead of the SQL file path, you can then use PostgresHook as outlined in the post above.

1 Like