Hi,
I am pretty new and I am trying to use XComArg, I did not understand how it works. I have trying to implement lineage and I I written 3 tasks: PythonOperator task to get a list of files, PythonOperator task to verify files get and a DatahubEmitterOperator task to store lineage information:
test_operator_task = PythonOperator(
task_id="test_operator_task",
python_callable=get_files,
)
file_list_clean = XComArg(test_operator_task)['file_list']
test_pull_task = PythonOperator(
task_id="test_pull_task",
python_callable=test,
)
emitter_task = DatahubEmitterOperator(
task_id="emitter_task",
datahub_conn_id="datahub_rest_default",
mces=[
builder.make_lineage_mce(
upstream_urns=[builder.make_dataset_urn(platform="file", name=filename) for filename in file_list_clean],
downstream_urn=builder.make_dataset_urn(platform="file", name="/nfs/data/archival/archive_test.tar.gz")
)
]
)
the test_pull_tak has been created to verity the list of the files and it is defined in this way:
def test(ti):
print(file_list_clean)
print(ti.xcom_pull(task_ids="test_operator_task", key="file_list"))
I expect the same results by the print but I have that the first print produce "{{ task_instance.xcom_pull(task_ids='test_operator_task', dag_id='test_lineage_drop_partition', key='file_list') }}"
the second one produce the list. Should XComArg(test_operator_task)['file_list']
return the output or should only produce the string to use in the template?
After that i tried to use file_list_clean in the DatahubEmitterOperator, in this way the DAG is not imported. If i try to use the template as:
emitter_task = DatahubEmitterOperator(
task_id="emitter_task",
datahub_conn_id="datahub_rest_default",
mces=[
builder.make_lineage_mce(
upstream_urns=[builder.make_dataset_urn(platform="file", name=filename) for filename in "{{ ti.xcom_pull(task_ids='test_operator_task', key='file_list') }}"],
downstream_urn=builder.make_dataset_urn(platform="file", name="/nfs/data/archival/archive_test.tar.gz")
)
]
)
but the template is not evaluated. How can I pass the files list produced by test_operator_task to emitter_task ?
Thanks a lot!