How to use pipeline pattern in python data engineering projects

Subscribe to my newsletter and never miss my upcoming articles

pipeline-pattern-python.png

Functional pipeline

Functional pipeline is a design pattern mostly used in function programming paradigm, where data flows through a sequence of stages and the output of the previous stage is the input of the next. Each step can be thought of as a filter operation that transforms the data in some way.

This pattern is most suitable for map, filter and reduce operations. It also provides a clean, readable and more sustainable code in data engineering projects.

For example, lets take an input text which has to go through a series of transformations,

  • remove white spaces
  • remove special characters
  • lowercase all letters and finally produces output.

This pipeline functions are simplified to demonstrate use case, In real life scenario, It will be a lot more complex.

Let's create the simple transformation functions.

#pipeline_functions.py

import re


def remove_spaces(string):
    output = string.replace(' ', '')
    print(f"""{remove_spaces.__name__}() ==> {output}""")
    return output


def remove_special_chars(string):
    output = re.sub("[^A-Za-z0-9]", "", string)
    print(f"""{remove_special_chars.__name__}() ==> {output}""")
    return output


def lowercase(string):
    output = string.lower()
    print(f"""{lowercase.__name__}() ==> {output}""")
    return output

fastcore

fastcore is an utility which has a lot of python goodies to make coding faster, easier, and more maintainable. It borrows some ideas from other languages like Julia , Ruby and Haskell. It also adds functional programming pattern and a simplified parallel processing and a lot more. For our pipeline implementation, we will be using fastcore transform module.

Do checkout -> fastcore.fast.ai

Install fastcore

$ pip install fastcore

Creating pipeline using fastcore

#main.py

from fastcore.transform import Pipeline
from pipeline_functions import remove_spaces, remove_special_chars, lowercase


def main(input_string):
    # Creates a pipeline with a list of functions
    pipe = Pipeline([remove_spaces, remove_special_chars, lowercase])

    # Invokes pipeline
    output = pipe(input_string)

    print(f"""output ==> {output}""")


if __name__ == '__main__':
    text = input("Enter input string: ")
    main(text)

Run the program

$ python main.py
Enter input string: Hello World!
remove_spaces() ==> HelloWorld!
remove_special_chars() ==> HelloWorld
lowercase() ==> helloworld
output ==> helloworld

As you can see that entered input text gets passed through the pipeline from left to right order and manipulates input text in each step and returns final output. We can even go one step further to have more dynamic pipeline functions by getting pipeline functions at runtime. So this list of functions can also be serialised and persisted for later use.

Dynamic pipeline

#main_dynamic.py

import sys
from fastcore.transform import Pipeline
from pipeline_functions import remove_spaces, remove_special_chars, lowercase


def main(input_string, pipe_funcs):
    # Creates a pipeline with a list of functions using using globals()
    pipe = Pipeline([globals()[func] for func in pipe_funcs])

    # Invokes pipeline
    output = pipe(input_string)

    print(f"""output ==> {output}""")


if __name__ == '__main__':
    text = input("Enter input string: ")
    funcs = sys.argv[1:]
    main(text, funcs)

Run the program

$ python main_dynamic.py remove_spaces lowercase 
Enter input string: Hello World 123$    
remove_spaces() ==> HelloWorld123$
lowercase() ==> helloworld123$
output ==> helloworld123$

Conclusion

So the pipeline pattern implementation in data engineering components makes it easier to write complex data processing operations. And fastcore utility makes it even better.

If you have found this tutorial helpful, or have any suggestions do let me know!

No Comments Yet