How We Made a Machine Learning Pipeline 96 Times Faster

SILVR
9 min readFeb 2, 2023

--

Recently, Silvr helped a data science team at a client in the financial services industry improve the efficiency and price performance of building machine learning models. We increased the throughput of the Python application that builds the models by a factor of 96. At the same time, we reduced computing costs by 75%. This post describes how we did that.

Here was the initial state:

  • The application was a single-threaded Python 3.9 program.
  • Each run of the program computed thousands of models using statsmodels.
  • The client ran multiple instances of the program manually on AWS in a Windows instance.

The goal was to increase the throughput, measured in runs/day, and reduce the time for each run. We did this by:

  • Moving the application to Linux
  • Introducing parallelism into the Python application
  • Using a Dask cluster of AWS instances
  • Optimizing the performance of the Python application

Moving the application to Linux

When we started, the application ran on a Windows c5.24xlarge (96 CPU, 192 GiB memory) AWS instance. To reduce computing costs, the client asked us to explore moving the application to Linux. At the time of this writing, in the us-east-1 AWS region for on-demand instances:

  • A Windows c5.24xlarge instance costs $8.496/hour, and
  • A Linux c5.24xlarge instance costs $4.08/hour.

Silvr ported the application to Linux, and that alone reduced the price of each AWS instance by 52%.

But wait! There’s more!

When we ran multiple copies of the application on Windows, we observed we could not use more than half of the CPUs on a Windows instance. On Linux, we could use all the CPUs. Since the price performance on Linux was already better, it was not worthwhile to investigate why we could not use all of the CPUs on Windows. Since the application could use twice as many CPUs on a given AWS instance running Linux as on the same instance type running Windows, for the same throughput, we would only need half as many Linux instances as Windows instances. By combining the lower cost of a Linux instance with needing half as many instances, we reduced computing costs by approximately 75% just by moving to Linux.

Introducing parallelism

We observed the application spent most of the time in an opaque, long-running, CPU-bound, single-threaded statsmodels regression function. We will describe how we measured that in the Python optimization section below.

The statsmodels function was called thousands of times in one run, and each of the calls was independent of any other call to the same statsmodels function. By performing calls to the statsmodels function in parallel, we could reduce the time for a complete run of the application.

To call statsmodels in parallel, we selected the Python Joblib package. With joblib, it is easy to run functions in parallel. Also, it will be easy to run those functions on a Dask cluster. More on that later.

Suppose you have a Python program with a long-running function:

import time

def slow_function(x):
time.sleep(10)
return x * x

result = [slow_function(i) for i in range(10)]
print(result)

Which prints:

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

To call slow_function in parallel, wrap the function with joblib.delayed and pass an iterable with the delayed function calls to joblib.Parallel.

import time
from joblib import Parallel, delayed

def slow_function(x):
time.sleep(10)
return x * x

result = Parallel(n_jobs=-1, verbose=10)(delayed(slow_function)(i) for i in range(10))
print(result)

The n_jobs=-1 argument allows Parallel to use all of the CPUs. The verbose=10 argument enables progress messages. Notice that no changes to slow_function were required.

Use a Dask local cluster

In the previous example, the number of CPUs that can run slow_function is limited to the number of CPUs on the current instance. Using Dask, we can combine multiple AWS instances into a compute cluster that allows us to run slow_function on multiple AWS instances in parallel.

As a first step, we used Dask.distributed to create a local cluster on the current instance. This allowed us to debug the use of Dask without the overhead of setting up and tearing down a Dask cluster on multiple AWS instances. For example:

import os
import time
from distributed import Client
from joblib import Parallel, delayed, parallel_backend

os.environ['MKL_NUM_THREADS'] = '1'
os.environ['OMP_NUM_THREADS'] = '1'
os.environ['OPENBLAS_NUM_THREADS'] = '1'

def slow_function(x):
time.sleep(10)
return x * x

def main():
with Client(processes=True, n_workers=os.cpu_count(),
threads_per_worker=1) as client, \
parallel_backend('dask', client=client), \
Parallel(verbose=10) as parallel:
result = parallel(delayed(slow_function)(i) for i in range(10))
print(result)

if __name__ == '__main__':
main()

Each invocation of slow_function runs on a Dask worker. The distributed.Client call creates a Dask local cluster with workers running on the current AWS instance. In the client's application, since the long-running statsmodels function is CPU-bound and uses only one CPU, to achieve the most parallelism in Python, we used a separate process for each worker, processes=True, with one thread per worker, threads_per_worker=1, and one worker for each CPU, n_workers=os.cpu_count().

Since we provisioned the cluster with one thread per worker, we set the MKL_NUM_THREADS, OMP_NUM_THREADS, and OPENBLAS_NUM_THREADS environment variables to limit the number of threads used by lower-level mathematics libraries. In the client's application, while the majority of time is spent in a single-threaded statsmodels function, there are other functions being called on each worker that would not otherwise be limited to one thread.

The call to the joblib.parallel_backend context manager tells joblib.Parallel to use the workers on the Dask cluster. This example also shows how to invoke joblib.Parallel as a context manager: this is useful if the program has other functions to run on the cluster as well as the one invocation of slow_function shown here.

Since each Dask worker imports the main program in order to access slow_function, we need a guard (if __name__ == '__main__': main()) to ensure we create the Dask cluster and run the main program only once, at the top-level.

Use a Dask remote cluster

Now we are ready to use a Dask remote cluster. A Dask cluster has two components: a single scheduler process and multiple worker processes that connect to the scheduler. We ran the scheduler and each worker on their own AWS instances.

Invoke the dask scheduler command to start the Dask scheduler:

dask scheduler --port 8786 --no-dashboard --no-show

Invoke the dask worker command to start a Dask worker:

SCHEDULER_IP_ADDR=XX.XX.XX.XX  # IP address of AWS instance running the Dask scheduler
export MKL_NUM_THREADS=1
export OMP_NUM_THREADS=1
export OPENBLAS_NUM_THREADS=1
dask worker --nworkers=-1 --nthreads=1 --no-dashboard tcp://${SCHEDULER_IP_ADDR}:8786

In the main program, to specify the use of a Dask remote cluster, all you need to do is change the arguments to distributed.Client. Change:

Client(processes=True, n_workers=os.cpu_count(), threads_per_worker=1)

to:

Client(address='tcp://XX.XX.XX.XX:8786')

where XX.XX.XX.XX is the IP address of the AWS instance running the Dask scheduler.

To encrypt communication between the client, scheduler, and workers, additional steps are needed:

  1. Create a TLS certificate in PEM format.
  2. When invoking the dask scheduler and dask worker commands, include these options: --tls-ca-file, --tls-cert, and --tls-key.
  3. In the main program, create a distributed.security.Security object:
security = Security(tls_ca_file='my_ca.pem',
tls_client_cert='my_cert.pem',
tls_client_key='my_key.pem',
require_encryption=True)

4. In the main program, when creating a distributed.Client object, specify the Dask scheduler address using the tls protocol and pass in the security object using the security parameter

Client(address='tls://XX.XX.XX.XX:8786', security=security)

Infrastructure automation

We ran the Dask scheduler and workers inside Docker containers. In order for the Dask scheduler and workers to communicate with each other and the client, we specified the --network=host option on the docker run command.

Here is an example of a Dockerfile that creates a Python environment for Dask.distributed applications. The same Docker image can be used for the Dask client application, the Dask scheduler, and Dask workers.

FROM python:3.10

ENV MPLCONFIGDIR=/tmp/matplotlib
ENV PIP_CACHE_DIR=/tmp/pip
RUN mkdir ${MPLCONFIGDIR} && chmod 777 ${MPLCONFIGDIR} \
&& mkdir ${PIP_CACHE_DIR} && chmod 777 ${PIP_CACHE_DIR}

ENV DASK_DISTRIBUTED__COMM__SOCKET_BACKLOG=4096
ENV DASK_DISTRIBUTED__COMM__TIMEOUTS__CONNECT=1m
ENV MKL_NUM_THREADS=1
ENV OMP_NUM_THREADS=1
ENV OPENBLAS_NUM_THREADS=1

RUN pip install --no-cache-dir --upgrade pip
RUN pip install --no-cache-dir \
distributed \
lxml \
lz4 \
matplotlib \
pandas \
scikit-learn \
seaborn \
statsmodels

CMD ["python3"]

In order to use a private IP address for the Dask scheduler and use a static DNS name, we put the scheduler behind an AWS Elastic Load Balancer.

We then used Terraform to automate the creation of a Dask cluster.

Dask cluster tuning

In order to maximize the size of the Dask cluster, we adjusted a few tuning parameters:

  • Increased the maximum number of open file descriptors resource limit: ulimit -n
  • Increased the Dask connection timeout: DASK_DISTRIBUTED__COMM__TIMEOUTS__CONNECT environment variable
  • Increased the Dask socket backlog: DASK_DISTRIBUTED__COMM__SOCKET_BACKLOG environment variable

Notwithstanding all the tuning, we found there was a limit for the maximum number of workers a Dask scheduler can handle. The limit depends on the workload that is run on the cluster. When the number of workers exceeded the limit, we started errors, such as timeout errors, from the scheduler, workers, and client. For the client’s application, the limit was 480 workers on five c5.24xlarge AWS instances.

Python optimization

Using a Dask cluster improved the throughput of the portion of the application that could be parallelized. Once that was done, other portions of the application, which ran before and after the long-running statsmodels calls, became hotspots. The next step was to measure the performance of those parts of the application.

We analyzed the performance of the Python application in two steps. First, we found the functions that took the most time. In the functions that were hotspots, we then measured how much time was spent in each line.

Profiling of Python functions

To find the functions that took the most time, we measured the performance using the cProfile package in the Python standard library. Per its documentation, “cProfile is recommended for most users; it’s a C extension with reasonable overhead that makes it suitable for profiling long-running programs.”

To demonstrate this process, here is an example, slow.py:

def squares():
return [i**2 for i in range(1000)]

def cubes():
return [i**3 for i in range(1000000)]

def more_powers():
a = [i**2 for i in range(1000000)]
b = [i**10 for i in range(10000000)]
c = [i**20 for i in range(1000000)]
return a, b, c

def a():
squares()
more_powers()

def main():
a()
cubes()

if __name__ == '__main__':
main()

To collect performance data, run the program using cProfile:

python -m cProfile -o slow.prof slow.py

This command runs the program and writes profile data to the file slow.prof.

To visualize the profile data, we used tuna. After installing tuna:

tuna slow.prof

This command opens a web browser to display a flame graph of the time spent in each function call. For example:

You can see from the flame graph that 87.5% of the time is spent in the more_powers function. In this case, it also indicates which line in more_powers took the most time, but cProfile, in general, only instruments function calls.

Profiling Python line-by-line

To measure line-by-line performance, we used the line-profiler Python package.

After installing line-profiler, decorate the function you want to instrument with the @profile decorator. For example:

@profile
def more_powers():
a = [i**2 for i in range(1000000)]
b = [i**10 for i in range(10000000)]
c = [i**20 for i in range(1000000)]
return a, b, c

Next, run the program using the kernprof command from the line-profiler package.

kernprof -l slow_decorated.py

The results are written to the file slow_decorated.py.lprof. To view the results, use the following:

python -m line_profiler slow_decorated.py.lprof

Timing information is displayed for each line in the profiled functions:

Timer unit: 1e-06 s

Total time: 3.24499 s
File: slow_decorated.py
Function: more_powers at line 7

Line # Hits Time Per Hit % Time Line Contents
==============================================================
7 @profile
8 def more_powers():
9 1 173470.0 173470.0 5.3 a = [i**2 for i in range(1000000)]
10 1 2710952.0 2710952.0 83.5 b = [i**10 for i in range(10000000)]
11 1 360566.0 360566.0 11.1 c = [i**20 for i in range(1000000)]
12 1 1.0 1.0 0.0 return a, b, c

The results show that 83.5% of the time in more_powers was spent in the middle list comprehension.

Conclusion

By measuring the performance of the Python application, we learned the following:

  • The application spent most of its time in function calls to statsmodels.
  • Once the statsmodels calls were ran in parallel on a Dask remote cluster, another function became a hotspot. We optimized this function:
    - The function computed a large number of results that were never used. We changed the function to only compute the results that were needed.
    - We replaced slow Pandas operations with Pandas operations that had better performance.
  • The application created several PDF files containing plots. We used joblib to write the PDF files in parallel.

Additional improvements we made:

  • Moving from Python 3.9 to the current version of Python at the time, Python 3.10, gave us another small performance improvement.
  • Created a driver script to automatically run the application multiple times.

Taking into account all the performance improvements, by using a Dask remote cluster and optimizing the Python program, we increased throughput by a factor of 96. As a bonus, data scientists at the client now had the capability to use a Dask remote cluster as a self-service computing resource.

--

--

SILVR

Launch your organization to the cloud with SILVR! Visit us at https://silvr.io