Pytest to mock xcom_pull using Double

Hi Guys,

I had a customized operator that is using xcom_pull to get results from other tasks and I would like to run unit test by using pytest for this operator and mock xcom_pull

def execute(self, context):
        export_task = self.xcom_pull(
            context=context, task_ids=self.rds_task_id, dag_id=self.dag_id, key="some_key"
        )

I found that it didn’t work

    from doubles import allow
    mock_run = mocker.patch.object(AWSAthenaHook, "run_query")
    my_operator = CustomizedOperator(..)
    allow(my_operator).xcom_pull.with_args(...).and_return(some_value)

which will return error

doubles.exceptions.UnallowedMethodCallError: Received unexpected call to 'xcom_pull' on <Task(CustomizedOperator): 

Any idea how to mock the xcom_pull method. It seems that this method belongs to TaskInstance Class which will be created once the Operator task is being created. Should I mock the TaskInstance object instead?

Hi @BenLi!

Have you checked out the existing TaskInstance unit tests in the Airflow repo for some inspiration?

@josh-fell Thanks for sending this! I actually tried to mock the TaskInstance with

    task_instance = InstanceDouble('airflow.models.taskinstance.TaskInstance')
    allow(task_instance).xcom_pull.with_args(...).and_return(...)
    my_operator = CustomizedOperator(..,task_instance=task_instance)

And it didn’t work properly as well.

I think my question is when the CustomizedOperator is created, will an instance of TaskInstance will be created as well? or the TaskInstance is created when the Dag is defined and created?