I am using this tutorial code from Marc Lamberti. Here is the code:
from airflow import DAG
from airflow.decorators import task
from airflow.operators.bash import BashOperator
from datetime import datetime
with DAG("new_dag", start_date=datetime(2021, 1, 1),
schedule="@daily", catchup=False):
@task
def training_model(accuracy):
return accuracy
@task.branch
def choose_best_model(accuracies):
best_accuracy = max(accuracies)
if best_accuracy > 8:
return 'accurate'
return 'inaccurate'
accurate = BashOperator(
task_id="accurate",
bash_command="echo 'accurate'"
)
inaccurate = BashOperator(
task_id="inaccurate",
bash_command="echo 'inaccurate'"
)
choose_best_model(training_model.expand(accuracy=[3, 9, 2])) >> [accurate, inaccurate]
But when I run the dag from terminal the output I see is this:
Loading DAGs...
[2023-10-31T23:03:50.762+0000] {plugin.py:31} WARNING - Astro managed secrets backend is disabled
Running task training_model[0]... [2023-10-31T23:03:51.170+0000] {listener.py:32} INFO - TaskInstance Details: dag_id=accurate, task_id=training_model, dagrun_id=manual__2023-10-31T23:03:51.022743+00:00, map_index=0, run_start_date=None, try_number=0, job_id=None, op_classpath=airflow.decorators.python._PythonDecoratedOperator, airflow.decorators.base.DecoratedOperator, airflow.operators.python.PythonOperator
SUCCESS ✅
Running task training_model[1]... [2023-10-31T23:03:51.227+0000] {listener.py:32} INFO - TaskInstance Details: dag_id=accurate, task_id=training_model, dagrun_id=manual__2023-10-31T23:03:51.022743+00:00, map_index=1, run_start_date=None, try_number=0, job_id=None, op_classpath=airflow.decorators.python._PythonDecoratedOperator, airflow.decorators.base.DecoratedOperator, airflow.operators.python.PythonOperator
[2023-10-31T23:03:51.227+0000] {listener.py:32} INFO - TaskInstance Details: dag_id=accurate, task_id=training_model, dagrun_id=manual__2023-10-31T23:03:51.022743+00:00, map_index=1, run_start_date=None, try_number=0, job_id=None, op_classpath=airflow.decorators.python._PythonDecoratedOperator, airflow.decorators.base.DecoratedOperator, airflow.operators.python.PythonOperator
SUCCESS ✅
Running task training_model[2]... [2023-10-31T23:03:51.305+0000] {listener.py:32} INFO - TaskInstance Details: dag_id=accurate, task_id=training_model, dagrun_id=manual__2023-10-31T23:03:51.022743+00:00, map_index=2, run_start_date=None, try_number=0, job_id=None, op_classpath=airflow.decorators.python._PythonDecoratedOperator, airflow.decorators.base.DecoratedOperator, airflow.operators.python.PythonOperator
[2023-10-31T23:03:51.305+0000] {listener.py:32} INFO - TaskInstance Details: dag_id=accurate, task_id=training_model, dagrun_id=manual__2023-10-31T23:03:51.022743+00:00, map_index=2, run_start_date=None, try_number=0, job_id=None, op_classpath=airflow.decorators.python._PythonDecoratedOperator, airflow.decorators.base.DecoratedOperator, airflow.operators.python.PythonOperator
[2023-10-31T23:03:51.305+0000] {listener.py:32} INFO - TaskInstance Details: dag_id=accurate, task_id=training_model, dagrun_id=manual__2023-10-31T23:03:51.022743+00:00, map_index=2, run_start_date=None, try_number=0, job_id=None, op_classpath=airflow.decorators.python._PythonDecoratedOperator, airflow.decorators.base.DecoratedOperator, airflow.operators.python.PythonOperator
SUCCESS ✅
Running choose_best_model... [2023-10-31T23:03:51.438+0000] {listener.py:32} INFO - TaskInstance Details: dag_id=accurate, task_id=choose_best_model, dagrun_id=manual__2023-10-31T23:03:51.022743+00:00, map_index=-1, run_start_date=None, try_number=0, job_id=None, op_classpath=airflow.decorators.branch_python._BranchPythonDecoratedOperator, airflow.decorators.python._PythonDecoratedOperator, airflow.decorators.base.DecoratedOperator, airflow.operators.python.BranchPythonOperator, airflow.operators.python.PythonOperator, airflow.models.skipmixin.SkipMixin
[2023-10-31T23:03:51.438+0000] {listener.py:32} INFO - TaskInstance Details: dag_id=accurate, task_id=choose_best_model, dagrun_id=manual__2023-10-31T23:03:51.022743+00:00, map_index=-1, run_start_date=None, try_number=0, job_id=None, op_classpath=airflow.decorators.branch_python._BranchPythonDecoratedOperator, airflow.decorators.python._PythonDecoratedOperator, airflow.decorators.base.DecoratedOperator, airflow.operators.python.BranchPythonOperator, airflow.operators.python.PythonOperator, airflow.models.skipmixin.SkipMixin
[2023-10-31T23:03:51.438+0000] {listener.py:32} INFO - TaskInstance Details: dag_id=accurate, task_id=choose_best_model, dagrun_id=manual__2023-10-31T23:03:51.022743+00:00, map_index=-1, run_start_date=None, try_number=0, job_id=None, op_classpath=airflow.decorators.branch_python._BranchPythonDecoratedOperator, airflow.decorators.python._PythonDecoratedOperator, airflow.decorators.base.DecoratedOperator, airflow.operators.python.BranchPythonOperator, airflow.operators.python.PythonOperator, airflow.models.skipmixin.SkipMixin
[2023-10-31T23:03:51.438+0000] {listener.py:32} INFO - TaskInstance Details: dag_id=accurate, task_id=choose_best_model, dagrun_id=manual__2023-10-31T23:03:51.022743+00:00, map_index=-1, run_start_date=None, try_number=0, job_id=None, op_classpath=airflow.decorators.branch_python._BranchPythonDecoratedOperator, airflow.decorators.python._PythonDecoratedOperator, airflow.decorators.base.DecoratedOperator, airflow.operators.python.BranchPythonOperator, airflow.operators.python.PythonOperator, airflow.models.skipmixin.SkipMixin
SUCCESS ✅
Running accurate... [2023-10-31T23:03:51.601+0000] {listener.py:32} INFO - TaskInstance Details: dag_id=accurate, task_id=accurate, dagrun_id=manual__2023-10-31T23:03:51.022743+00:00, map_index=-1, run_start_date=None, try_number=0, job_id=None, op_classpath=airflow.operators.bash.BashOperator
[2023-10-31T23:03:51.601+0000] {listener.py:32} INFO - TaskInstance Details: dag_id=accurate, task_id=accurate, dagrun_id=manual__2023-10-31T23:03:51.022743+00:00, map_index=-1, run_start_date=None, try_number=0, job_id=None, op_classpath=airflow.operators.bash.BashOperator
[2023-10-31T23:03:51.601+0000] {listener.py:32} INFO - TaskInstance Details: dag_id=accurate, task_id=accurate, dagrun_id=manual__2023-10-31T23:03:51.022743+00:00, map_index=-1, run_start_date=None, try_number=0, job_id=None, op_classpath=airflow.operators.bash.BashOperator
[2023-10-31T23:03:51.601+0000] {listener.py:32} INFO - TaskInstance Details: dag_id=accurate, task_id=accurate, dagrun_id=manual__2023-10-31T23:03:51.022743+00:00, map_index=-1, run_start_date=None, try_number=0, job_id=None, op_classpath=airflow.operators.bash.BashOperator
[2023-10-31T23:03:51.601+0000] {listener.py:32} INFO - TaskInstance Details: dag_id=accurate, task_id=accurate, dagrun_id=manual__2023-10-31T23:03:51.022743+00:00, map_index=-1, run_start_date=None, try_number=0, job_id=None, op_classpath=airflow.operators.bash.BashOperator
SUCCESS ✅
Completed running the DAG accurate. 🚀
Total elapsed time: 0.63s
As you can see, no output from echo command is visible here. Compared this with print from python operator (in other codes), I am able to clearly see the output.
PS, running dag from UI is not much help, as I am unable to see the logs at all. Here is what see for every task:
fe446e389477
*** Could not read served logs: Client error '404 NOT FOUND' for url 'http://fe446e389477:8793/log/dag_id=accurate/run_id=manual__2023-10-31T22:45:45.727814+00:00/task_id=choose_best_model/attempt=1.log'
For more information check: https://httpstatuses.com/404
Please advice, thanks.