Successor skipped despite trigger_rule ALL_DONE

Hello,

I’m pretty new to Airflow (2.0.1) and workflow engineering.
I need to create a dynamically scaled workflow in Airflow, which kind of works through variables usage.

Unfortunately I’m not able to end my workflow, since the trigger_rule ALL_DONE and also ONE_SUCCESS are not causing the task process_storage_meta (which is joining the branches) to be triggered… The task is being skipped although one of the predecessors succeed. I really need to resume for logging even if all tasks are failing, so I assumed the rules should work…

not sure if it’s related to the dynamic branching or if I’m getting something wrong.
In addition, the task process_storage_meta is a custom operator instance…

Would be great if someone could help, thanks in advance :clap:t4:

I think you are on the right track by specifying trigger rules for the operator.

One must be aware of the interaction between trigger rules and skipped tasks in schedule level. Skipped tasks will cascade through trigger rules all_success and all_failed but not all_done , one_failed , one_success , none_failed , none_failed_or_skipped , none_skipped and dummy .

Can you provide your DAG file? You may be setting the trigger rule incorrectly.

[...]
from airflow.utils.trigger_rule import TriggerRule

[...]

with DAG("A2_scheduled_discovery",
         start_date=days_ago(1),
         schedule_interval="@monthly") as dag:

    storages = {}
    try:
        storages = Variable.get("storages", deserialize_json=True)
    except:
        pass

    process_storage_meta = Process_code_analysis(
        task_id="process_storage_meta",
        repository_names=list(storages.keys()),
        trigger_rule=TriggerRule.ALL_DONE)

    for storage_name, storage in storages.items():
        # collects traces and short circuits branch if not complete
        collect_uri = ShortCircuitOperator(
            task_id=f'collect_uri_short_circuit_{storage_name}',
            python_callable=has_complete_traces(storage))

        branch_by_storage = BranchPythonOperator(
            task_id=f'branch_by_storage_{storage_name}',
            python_callable=get_task_handler(storage, storage_name))

        retrieve_mongo_storage_meta = Retrieve_mongo_meta(
            task_id=f"retrieve_mongo_meta_data_{storage_name}",
            storage=storage,
            storage_name=storage_name)

        retrieve_postgres_storage_meta = Retrieve_mongo_meta(
            task_id=f"retrieve_postgres_meta_data_{storage_name}",
            storage=storage, storage_name=storage_name)

        collect_uri >> branch_by_storage >> [retrieve_mongo_storage_meta, retrieve_postgres_storage_meta] >> process_storage_meta

    append_report_from_MR = Reporter(
        task_id="appending_report_of_meta_data_retrieval",
        loggin_task="meta_data")

    process_storage_meta >> append_report_from_MR

thanks for the feedback, I appreciate your work in this forum

Ah! I see the issue.

ShortCircuitOperators ignore trigger rules as documented in this AIrflow Github Issue.

The logic for skipping all descendant tasks has not been changed. (see source)

A intermediary solution would be opt out of the ShortCircuitOperator and build that logic into the BranchPythonOperator. The third branch would be to the process_storage_meta task. This way we can avoid the skip all descendant behaviour and still maintain the skipping effect/outcome from the ShortCircuitOperator.

thanks a lot, it works! :bulb: