Building a Python parallel processing pipeline package (R-3PO)
Sat Aug 29 2020tags: programming build public internship inzura
Introduction
Richard Jelbert (CEO of Inzura) and I whipped up a Python parallel processing pipeline package (just rolls off the tongue, doesn't it) over a couple of days. We used it to process a large number (over 3 million) of JSON files.
Richard's Parallel Processing Protocol, or R-3PO for short (like C-3PO) is an embarassingly trivial package built on top of Ray to make embarassingly parallel problems embarassingly easy.
Use case: Suppose you have lots of data files that need to be processed in the exact same way with the same function. And suppose you want to save the results of that processing into a CSV file. This is an embarassingly parallel problem: it should be easy.
Embarassingly easy, even: R3PO lets you do it with a config.yaml
file and two lines of code. It automatically
handles the distribution of tasks to processes,
saves your progress so you can stop and restart the job anytime,
and logs all errors automatically.
This package is probably useless to you unless your workflow and use case are very similar to mine. But if it is, I believe it could save you an hour or two from needing to write boilerplate code.
1 minute guide
Have the following two files in your directory.
config.yaml
:
job_name: count_produce
processes: 2
source_path: /home/lieu/produce_log
source_file_part: .json
working_dir: /home/lieu/working_dir
output_path: /home/lieu/output_dir
main.py
:
# main.py
from r3po import jobbuilder, jobrunner
from count_fruits import count_fruits
# Build jobs
build_jobs('./config.yaml')
# Now run them
run_jobs('./config.yaml', count_fruits)
Running python3 main.py
will run the count_fruits
function on all .json
files in /home/lieu/produce_log/
and log the results in numbered csv files
inside /home/lieu/output_dir/
.
Sounds like something you'd find useful? Read on...
Quickstart guide
We'll work though a very simple example available on the Github repo.
First, let's clone the repo and go to the sample
directory:
Cloning the repo
git clone https://github.com/lieuzhenghong/r3po
cd r3po/sample
The directory will look like this:
./r3po/sample/
├── produce_log/
├── file_0.json
├── file_1.json
├── file_2.json
├── file_3.json
├── file_4.json
├── file_5.json
├── config.yaml
├── count_fruits.py
├── main.py
Setting the scene
Imagine you're the king of Cornucopia and you own one bajillion acres of land. You've allocated an acre of land to each one of your loyal subjects for them to farm as they see fit.
One day, you decide that you'd like to know how much food is produced in Cornucopia.
So you ask your subjects to report their yields.
Your subjects are well-acquainted with both farming and the JSON spec,
so let's take a look at file_0.json
:
{
"produce": {
"fruits": {
"apples": 24,
"bananas": 33,
"cantaloupes": 3
},
"vegetables": {
"carrots": 15,
"cucumbers": 22,
"cabbages": 21
},
"meat": {
"beef": 12,
"pork": 2
}
}
}
Now as the king you're not that interested in the various types of fruits per se---perhaps you only want the total sum of each type of produce. Let's say you want something like the following:
{
"fruits": 60,
"vegetables": 58,
"meat": 14,
}
So you write a simple function that does exactly that. The details are not important, but the key is that any such function must take in a filepath string and return a dictionary for R3PO to work with it.
# count_fruits.py
import json
def count_fruits(produce_log_filepath):
with open(produce_log_filepath, 'r') as f:
produce_counts = {}
data = dict(json.load(f))
all_produce = data['produce']
for produce_type in all_produce:
produce_counts[produce_type] = 0
for produce in all_produce[produce_type]:
produce_counts[produce_type] += int(
all_produce[produce_type][produce])
return produce_counts
So you'd want to run this function for every file. However, you have a bajillion acres of land and thus a bajillion JSON files (instead of the six files in the sample folder) and so we'd like to speed it up by running the code in parallel. Let's see how to do it with the R-3PO pipeline.
First, let's open config.yaml
and take a look at the file.
job_name: count_produce
processes: 2
source_path: /home/lieu/produce_log
source_file_part: .json
working_dir: /home/lieu/working_dir
output_path: /home/lieu/output_dir
Most of these are pretty self-explanatory (more details later), but basically this means that
- R3PO will spin up two parallel processes,
- look for any file ending with
.json
in/home/lieu/produce_log/
, and - write the results in
/home/lieu/output_dir
.
Now let's open the main.py
file and see what's in it
(a whopping two lines of code):
# main.py
from r3po import jobbuilder, jobrunner
from count_fruits import count_fruits
# Build jobs
build_jobs('./config.yaml')
# Now run them
run_jobs('./config.yaml', count_fruits)
Not much to say about this. Let's run it and see what happens:
Running the code
# bash pip3 install r3po
# python3 main.py
Building job called: count_produce
Reading job source directory and generating master job CSV file...
Scanning for files ending with .json
Pending files generated.
Removing old node jobfiles...
Writing new node jobfiles...
Number of files: 6
Number of pending files: 6
2020-08-30 16:38:05,916 INFO resource_spec.py:223 -- Starting Ray with 5.47 GiB memory available for workers and up to 2.73 GiB for objects. You can adjust these settings with ray.init(memory=<bytes>, object_store_memory=<bytes>).
2020-08-30 16:38:06,302 INFO services.py:1191 -- View the Ray dashboard at localhost:8265
Spinning up Ray processes...
Process 0 spawned!
Process 1 spawned!
None
None
(pid=336291) Processed trip /home/lieu/dev/r3po/sample/output_dir/0.results.csv in node 0.
(pid=336291) Processed trip /home/lieu/dev/r3po/sample/output_dir/0.results.csv in node 0.
(pid=336291) Processed trip /home/lieu/dev/r3po/sample/output_dir/0.results.csv in node 0.
(pid=336283) Processed trip /home/lieu/dev/r3po/sample/output_dir/1.results.csv in node 1.
(pid=336283) Processed trip /home/lieu/dev/r3po/sample/output_dir/1.results.csv in node 1.
(pid=336283) Processed trip /home/lieu/dev/r3po/sample/output_dir/1.results.csv in node 1.
Okay, everything looks good! Let's take a look at what has happened. There were a lot of print statements which are self-explanatory.
We can see that Ray has spun up
two parallel processes
(exactly what we specified in the config.yaml file).
Additionally,
we can see that R3PO has created an output_dir with
two results.csv
files.
We also have a working_dir/
with done.job
and
nodejobfile.txt
files, but we won't worry about that for now.
./r3po/sample/
├── output_dir/
├── 0.results.csv
├── 1.results.csv
├── produce_log/
├── file_0.json
├── file_1.json
├── file_2.json
├── file_3.json
├── file_4.json
├── file_5.json
├── working_dir/
└── tracking/
└── ..../file_0.json
└── done.job
└── ..../file_1.json
└── done.job
└── ..../file_2.json
└── done.job
└── ..../file_3.json
└── done.job
└── ..../file_4.json
└── done.job
└── ..../file_5.json
└── done.job
├── 0.nodejobfile.txt
├── 1.nodejobfile.txt
├── config.yaml
├── count_fruits.py
├── main.py
Opening up the two results.csv
files,
we can see that the function has
indeed been calculated for all of the input JSON files.
Because we spun up two processes, there are two
results.csv
files:
# 0.results.csv
fruits,vegetables,meat
60,58,14
3939,58,14
6456,3743,14
# 1.results.csv
fruits,vegetables,meat
21,1539,79
60,58,14
5666,15,15
And we're done! These CSVs can then be converted
and concatenated with pandas's pd.concat
function
to get pandas DataFrames
,
which is the format in which I do the majority of my data analysis.
How it works in more detail
Overall abstraction
This pipeline runs a function f
on a large number of
input files in parallel and logs the results into a CSV.
It is made up of two files.
The first file jobbuilder.py
reads the source_path
from config.yaml
.
It looks for all files inside source_path
that match a user-specified criteria.
It writes these file paths inside nodejobfile
text files that tell each process
which files to work on.
The second file jobrunner.py
spins up the processes.
Each process looks at its nodejobfile
text file, and works through the list.
For each $FILEPATH
listed in nodejobfile
:
- the process opens the file,
- runs some function
f
on that file, - and (tries to) append the output to a .csv file.
If the function successfully runs, an empty file called done.job
is created in <$WORKING_DIR>/tracking/<$FILEPATH>/
as a record of completion.
If the function throws an exception, then a file called error.job
is created instead.
Keeping a record of file completion means that the processes can be terminated and restarted at any time without going through the same files again.
Limitations and requirements
The pipeline requires the following to work:
- A
config.yaml
file that follows a particular specification - A function that follows a specific contract (see next section)
- All your input files must end with the same suffix.
The function contract
- The function you call must take as input an absolute filepath to the file.
- It must return a Dictionary that will be passed to
csv.DictWriter
. - Furthermore, every Dictionary object returned must have the same keys.
If it is not able to return such a Dictionary, it must raise an
Exception
.
The config.yaml
file
This is what a config.yaml file should looks like. All fields are required.
job_name: generate-trip-visualisations
output_path: /home/lieu/RJ/trip_processing_research/.generate-trip-visualisations
processes: 12
source_file_part: formatted.lz4
source_path: /media/lieu/bigdata/anonymous/rjrun
working_dir: /home/lieu/RJ/trip_processing_research/.generate-trip-visualisations
count_all_files
and count_job_files
are added by the program
and list the number of files in the job and the number of files remaining
in the job respectively.
job_name
is the name of the processing job. It is used in
print
statements only.
processes
is the number of parallel processes to spin up.
source_path
is a filepath string pointing to the folder where
the source files are.
source_file_part
is used for filepath matching:
R3PO will traverse all subfolders inside source_path
and find all the .json
files. In this case,
working_dir
is a filepath string that points to a folder
to store intermediate files.
where the nodejobtxts will be, as well as the done.job
and error.job
files.
output_path
is a filepath string that points to a folder
where you want the results of the computations to
be written to CSV files. If the folder does not exist,
one will be automatically created.
What jobbuilder.py does
The jobbuilder module is responsible for getting the filepaths of all the input files and writing these filepaths to a CSV that jobrunner can read later. It does not write the filepath if the file has already been processed or if an error occurred whilst processing that file.
This is how it does it:
- Read config.yaml file and find
source_path
,working_dir
, andsource_file_part
- Create the folder
working_dir
if one does not exist - Look for file paths that match
source_path/**/*.source_file_part
, and do not already have adone.job
orerror.job
- Write
X.nodejobfile.txt
by dividing the file paths up equally across all X process nodejobfiles
Why does this even exist?
Admittedly, the code is pretty trivial, and quite overfitted to my specific use-case. Why did I even bother packaging it up in the first place? I offer three reasons:
- If you think about it, the code is not that trivial.
It's easy enough to parallelise code in Python with the
multiprocessing
library. But it can be tricky to write code that allows jobs to terminate and restart arbitrarily, while not reprocessing already-processed files. - It follows from point 1 that this package will save you spending an hour or two reinventing the wheel if your use-case and dataflow are similar to mine.
- I was lazy and wanted to be able to
pip install
rather thangit clone
-ing and moving the files around manually.
Conclusion
Thanks for reading!