Im planning to use an airflow operator inside a function and then call it from a different task. For me, the task ran successfully, but it didn’t trigger the operator inside the function.
from datetime import timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
from airflow.operators.python_operator import PythonOperator
args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': days_ago(2)
}
dag = DAG(
'bhuvitest',
default_args=args,
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1),
catchup=False
)
def func():
t1 = BashOperator(
task_id='print_date',
bash_command='touch /tmp/aaaaaaaaaaaaa'
)
t1
main_dag = DAG(
'bhuvitest',
default_args=args,
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1),
catchup=False
)
bhuvitest = PythonOperator(
task_id='python_task',
python_callable=func,
dag = main_dag)
bhuvitest
This actually ran without any issues, but it didn’t trigger the bashoperator.
Log
*** Reading local file: /root/airflow/logs/bhuvitest/python_task/2020-10-11T10:10:00.285929+00:00/1.log
[2020-10-11 10:10:07,568] {taskinstance.py:670} INFO - Dependencies all met for <TaskInstance: bhuvitest.python_task 2020-10-11T10:10:00.285929+00:00 [queued]>
[2020-10-11 10:10:07,586] {taskinstance.py:670} INFO - Dependencies all met for <TaskInstance: bhuvitest.python_task 2020-10-11T10:10:00.285929+00:00 [queued]>
[2020-10-11 10:10:07,586] {taskinstance.py:880} INFO -
--------------------------------------------------------------------------------
[2020-10-11 10:10:07,586] {taskinstance.py:881} INFO - Starting attempt 1 of 1
[2020-10-11 10:10:07,586] {taskinstance.py:882} INFO -
--------------------------------------------------------------------------------
[2020-10-11 10:10:07,599] {taskinstance.py:901} INFO - Executing <Task(PythonOperator): python_task> on 2020-10-11T10:10:00.285929+00:00
[2020-10-11 10:10:07,601] {standard_task_runner.py:54} INFO - Started process 15874 to run task
[2020-10-11 10:10:07,619] {standard_task_runner.py:77} INFO - Running: ['airflow', 'run', 'bhuvitest', 'python_task', '2020-10-11T10:10:00.285929+00:00', '--job_id', '3805', '--pool', 'default_pool', '--raw', '-sd', 'DAGS_FOLDER/bhuvitest.py', '--cfg_path', '/tmp/tmp0b59kioj']
[2020-10-11 10:10:07,620] {standard_task_runner.py:78} INFO - Job 3805: Subtask python_task
[2020-10-11 10:10:07,658] {logging_mixin.py:112} INFO - Running %s on host %s <TaskInstance: bhuvitest.python_task 2020-10-11T10:10:00.285929+00:00 [running]> bhuvi.c.searce-academy.internal
[2020-10-11 10:10:07,681] {python_operator.py:114} INFO - Done. Returned value was: None
[2020-10-11 10:10:07,692] {taskinstance.py:1070} INFO - Marking task as SUCCESS.dag_id=bhuvitest, task_id=python_task, execution_date=20201011T101000, start_date=20201011T101007, end_date=20201011T101007
[2020-10-11 10:10:12,554] {local_task_job.py:102} INFO - Task exited with return code 0
Task Instance details
Can someone help me to fix it?