- Functional pipeline
- fastcore
- Install fastcore
- Creating pipeline using fastcore
- Dynamic pipeline using fastcore
Functional pipeline
Functional pipeline is a design pattern mostly used in function programming paradigm, where data flows through a sequence of stages and the output of the previous stage is the input of the next. Each step can be thought of as a filter operation that transforms the data in some way.
This pattern is most suitable for map, filter and reduce operations. It also provides a clean, readable and more sustainable code in data engineering projects.
For example, lets take an input text which has to go through a series of transformations,
- remove white spaces
- remove special characters
- lowercase all letters and finally produces output.
This pipeline functions are simplified to demonstrate use case, In real life scenario, It will be a lot more complex.
Let's create the simple transformation functions.
#pipeline_functions.py
import re
def remove_spaces(string):
output = string.replace(' ', '')
print(f"""{remove_spaces.__name__}() ==> {output}""")
return output
def remove_special_chars(string):
output = re.sub("[^A-Za-z0-9]", "", string)
print(f"""{remove_special_chars.__name__}() ==> {output}""")
return output
def lowercase(string):
output = string.lower()
print(f"""{lowercase.__name__}() ==> {output}""")
return output
fastcore
fastcore is an utility which has a lot of python goodies to make coding faster, easier, and more maintainable. It borrows some ideas from other languages like Julia , Ruby and Haskell. It also adds functional programming pattern and a simplified parallel processing and a lot more. For our pipeline implementation, we will be using fastcore transform module.
Do checkout -> fastcore.fast.ai
Install fastcore
$ pip install fastcore
Creating pipeline using fastcore
#main.py
from fastcore.transform import Pipeline
from pipeline_functions import remove_spaces, remove_special_chars, lowercase
def main(input_string):
# Creates a pipeline with a list of functions
pipe = Pipeline([remove_spaces, remove_special_chars, lowercase])
# Invokes pipeline
output = pipe(input_string)
print(f"""output ==> {output}""")
if __name__ == '__main__':
text = input("Enter input string: ")
main(text)
Run the program
$ python main.py
Enter input string: Hello World!
remove_spaces() ==> HelloWorld!
remove_special_chars() ==> HelloWorld
lowercase() ==> helloworld
output ==> helloworld
As you can see that entered input text gets passed through the pipeline from left to right order and manipulates input text in each step and returns final output. We can even go one step further to have more dynamic pipeline functions by getting pipeline functions at runtime. So this list of functions can also be serialised and persisted for later use.
Dynamic pipeline
#main_dynamic.py
import sys
from fastcore.transform import Pipeline
from pipeline_functions import remove_spaces, remove_special_chars, lowercase
def main(input_string, pipe_funcs):
# Creates a pipeline with a list of functions using using globals()
pipe = Pipeline([globals()[func] for func in pipe_funcs])
# Invokes pipeline
output = pipe(input_string)
print(f"""output ==> {output}""")
if __name__ == '__main__':
text = input("Enter input string: ")
funcs = sys.argv[1:]
main(text, funcs)
Run the program
$ python main_dynamic.py remove_spaces lowercase
Enter input string: Hello World 123$
remove_spaces() ==> HelloWorld123$
lowercase() ==> helloworld123$
output ==> helloworld123$
Conclusion
So the pipeline pattern implementation in data engineering components makes it easier to write complex data processing operations. And fastcore utility makes it even better.
If you have found this tutorial helpful, or have any suggestions do let me know!