Feedback on my implementation

Hello, I am quite new to using airflow and I would like to ask for some feedback on the my situation and use case. I do not see many examples online, so I have no reference point for what I’m doing.

I am working on a data analysis pipeline where I read in a dataset from a source, run several transformations, run a couple of statistical algorithms, and then do some more transformations. All along the way at various points the idea is to store the intermediate results in a database (currently just .csv files).

Previously, I was performing all of this in Python and pandas, and working from RAM. Since moving to airflow, I have been forced (for the better!) to be more modular in my approach, and this means each step in my pipeline will be its own task instance, with its own data in/outs, and therefore I will not be working from RAM as it is clearly bad to pass large amounts of data/datasets between dag tasks.

My approach: I have a simple DAG with a few tasks. Each task is a Python function. Each python function reads in the dataset, performs its function, and then outputs the result separately. Additionally, it stores a dictionary to XCOM that contains the task instance name (grabbed from xcom var itself), and the output path(s) of the file. The second DAG task then gets the previous task instance from XCOM and extracts the dict info stored from the previous task - it now has the directory to the file that it will read in and perform more functions on, before storing a similar dictionary to the XCOM.

My question: this feels really clunky to me. I didn’t want to hardcode task names in my functions, hence grabbing the task name from XCOM and using that as a key in an output dict. However, I really just wanted to have a set of functions that I may use either in airflow or completely separately from airflow; but the use of XCOM to pass parameters around disrupts this goal, and I now have a logic step that checks if a **context parameter was provided (if so, it must be in airflow, and can therefore grab the dictionary). This was completely my own idea to overcome the issues I was facing, so I am sure it is not a typical approach. I wonder if somebody more experienced can offer some advice?

However, I really just wanted to have a set of functions that I may use either in airflow or completely separately from airflow

It sounds like you have a set of PythonOperators that leverages the context, which passed to the function if provide_context is set as True; however this prevents you to have functions that are platform agnostic, Airflow or regular python script otherwise.

I have provided an example here that doesn’t require you to pass context to the python callable. This is possible because I am leveraging jinja templating. More information can be found under the XCom entry in the Airflow documentation.

Another piece that might be confusing is the feature that if you return a value in the execute function, the value is pushed to XCom where the key is return_value. With this XCom interaction, I don’t have to pass context to the python callable to push to xcom.

Give this a go and let me know if this accomplished what you want!

def func_a():
    return 1


def func_b(param):
    print(param)


a = PythonOperator(
    task_id=f'a',
    python_callable=func_a,
    dag=dag,
)

b = PythonOperator(
    task_id=f'b',
    python_callable=func_b,
    op_kwargs={
        'param': '{{ task_instance.xcom_pull(task_ids="' + a.task_id + '", key="return_value") }}'
    },
    dag=dag,
)
2 Likes

Thank you, this looks very useful!

Do you have any thoughts on the way I’m planning to implement this? Basically, it’s a machine learning process that I wanted to use airflow to schedule. Each task is a stage in that process, so (1) cleaning data, (2) feature scaling, (3) classification, etc. I’m using python to carry out each task, but I’m a bit uncertain about handling the inputs/outputs of that data. I can only think that I should read/write the data using the PythonOperator (hence using xcom to access input/output file directories as arguments to those functions), but since there are database operators I feel like I should really be using those. In that case, I’m not quite sure how I should do this. Any feedback would be appreciated!

Given Airflow is more an orchestration tool, if it’s possible, I would highly suggest that you keep the computation to the minimum. It is ideal If you can move those heavy data wrangling jobs to services that specializes in handling them, like Spark or Hadoop. There are SparkSubmitOperator and HiveOperators to start those processes.

Depending on what infrastructure is already in place, you should probably try them first to get a working version and iterate from there. Hopefully that helps.

1 Like

Thanks for the advice again, I’ll keep it in mind for the future. One issue was that I am using a specific package for a model that is not implemented in the spark ML library. I am not quite sure how else to accomplish that in any other way, unless I set airflow up to trigger a procedure outside of airflow itself (maybe via a bashOperator to run the python code?) and then some fileSensor to detect whether the outputs exist to continue running.

I am not quite sure how else to accomplish that in any other way, unless I set airflow up to trigger a procedure outside of airflow itself

As mentioned before, this is actually what I think Airflow should be doing, triggering jobs and not processing data. Of course, that’s not always feasible and we end up needing to do the computation with more resources or executing them in special environments. If you are able to remove those actions from the Airflow context that would be the best.

A common template is as you described, having an operator trigger a job and a downstream sensor to check when that has been done.

1 Like