This site uses different types of cookies, including analytics and functional cookies (its own and from other sites). To change your cookie settings or find out more, click here. If you continue browsing our website, you accept these cookies.
This article previously appeared on the Feature Labs blog on 20 Aug 2018.
How to scale automated feature engineering using parallel processing
When a computation is prohibitively slow, the most important question to ask is: “What is the bottleneck?” Once you know the answer, the logical next step is to figure out how to get around that bottleneck.
Often, as we’ll see, the bottleneck is that we aren’t taking full advantage of our hardware resources, for example, running a calculation on only one core when our computer has eight. Simply getting a bigger machine — in terms of RAM or cores — will not solve the problem if our code isn’t written to use all our resources. The solution therefore is to rewrite the code to utilize whatever hardware we do have as efficiently as possible.
In this article, we’ll see how to refactor our automated feature engineering code to run in parallel on all our laptop’s cores, in the process reducing computation time by over 8x. We’ll make use of two open-source libraries — Featuretools forautomated feature engineeringandDask for parallel processing — and solve a problem with a real-world dataset.
We’ll combine two important technologies: automated feature engineering in Featuretools and parallel computation in Dask.
Our exact solution is specific for this problem, but the general approach we develop can be utilized to scale your own computations to larger datasets.
Although here we’ll stick to using one computer andmultiple cores, in the future, we’ll use this same method to run computations onmultiple machines.
The complete code implementation is available in aJupyter Notebookon GitHub. If you aren’t yet familiar with Featuretools, check outthe documentationorthis article. Here we’ll focus mainly on using Featuretools with Dask and skip over some of the automated feature engineering details.
The Problem: Too Much Data, Not Enough Time
The primary issue when applying automated feature engineering with Featuretools for the Home Credit Default Risk problem (amachine learning competition currently running on Kagglewhere the objective is to predict whether or not a client will repay a loan) is that we have a lot of data which results in a very long feature calculation time. UsingDeep Feature Synthesis, we are able to automatically generate 1820 features from7 data tables and 58 million rowsof client information, but a call to this function with only one core takes25 hourseven on an EC2 instance with 64 GB of RAM!Given that our EC2 instance — and even our laptop — has 8 cores, to speed up the calculation we don’t need more RAM, we need to make use of those cores. Featuretools does allow for parallel processing by setting then_jobsparameter in acall to Deep Feature Synthesis. However, currently the function must send the entireEntitySetto all workers — cores — on the machine. With a largeEntitySet, this can result in problems if the memory for each worker is exhausted. We are currently working on better parallelization at Feature Labs, but for now we solve our problem with Dask.
Solution: Create Lots of Small Problems
The approach is to break one large problem up into many smaller ones and then use Dask to run multiple small problems at a time — each one on a different core. The important point here is that we make each problem — task — independentof the others so they can run simultaneously. Because we are making features for each client in the dataset, each task is to make a feature matrix for a subset of clients.
When one problem is too hard, make lots of little problems.
Our approach is outlined below:
Make a large problem into many small problems by partitioning data
Write functions to make a feature matrix from each partition of data
Use Dask to run Step 2 in parallel on all our cores
At the end, we’ll have a number of smaller feature matrices that we can then join together into a final feature matrix. This same method — breaking one large problem into numerous smaller ones that are run in parallel — can be scaled to any size dataset and implemented in other libraries for distribution computing such asSparkusingPySpark.
Whatever resources we have, we want to use them as efficiently as possible, and we can take this same framework to scale to larger datasets.
Partitioning Data: Divide and Conquer
Our first step is to create small partitions of the original dataset, each one containing information from all seven tables for a subset of the clients. Each partition can then be used toindependentlycalculate a feature matrix for a group of clients.
This operation is done by taking a list of all clients, breaking it into 104 sub-lists, and then iterating through these sub-lists, each time subsetting the data to only include clients from the sub-list and saving the resulting data to disk. The basic pseudo code of this process is:
104 partitions was selected based on trial and error and 3 general guidelines:
We want at least as many partitions as workers (cores) and the number should be a multiple of number of the workers
Each partition must be small enough to fit in memory of a single worker
More partitions means less variation in time to complete each task
(As an additional point of optimization, we convert the pandasobjectdata types tocategorywhere applicable to reduce memory usage. This gets our entire dataset from 4 GB to about 2 GB. I recommend reading thePandas documentationforcategorydata types so you are using them effectively.
Saving all 104 partitions to disk took about 30 minutes, but this is a process that only must be done once.
Each partition contains all the data needed to make a feature matrix for a subset of clients.
Entity Sets from Partitions
An Entity Set in Featuretools is a useful data structure because it holds multiple tables and the relationships between them. To create anEntitySetfrom a partition, we write a function that reads a partition from disk and then generates theEntitySet with the tables and the relations linking them.
The pseudo code for this step is:
Notice that this functionreturnstheEntitySetrather than saving it as we did with the partitions of data. Saving theraw datais a better option for this problem because we might want to modify theEntitySets — say by adding interesting values or domain knowledge features — while the raw data is never altered. TheEntitySetsare generated on the fly and then passed to the next stage: calculating the feature matrix.
Feature Matrix from an Entity Set
The functionfeature_matrix_from_entitysetdoes exactly what the name suggests: takes in the EntitySet created previously and generates a feature matrix with thousands of features using Deep Feature Synthesis. The feature matrix is then saved to disk. To ensure we make the same exact features for each partition, we generate the feature definitions once and then use the Featuretools functioncalculate_feature_matrix.
Here’s the entire function (we pass in a dictionary with theEntitySetand the partition number so we can save the feature matrix with a unique name):
Creating a feature matrix from an EntitySet and saving it to disk.
Creating a feature matrix from an EntitySet and saving it to disk.
Thechunk_sizeis the only tricky part of this call: this is used to break the feature matrix calculation into smaller parts, but since we already partitioned the data, this is no longer necessary. As long as the entire EntitySet can fit in memory, then I found it’s more time efficient to calculate all of the rows at once by setting thechunk_sizeequal to the number of observations.
We now have all the individual parts we need to go from a data partition on disk to a feature matrix. These steps comprise the bulk of the work and getting Dask to run the tasks in parallel is surprisingly simple.
Dask: Unleash Your Machine(s)
Daskis a parallel computing library that allows us to run many computations at the same time, either using processes/threads on one machine (local), or many separate computers (cluster). For a single machine, Dask allows us to run computations in parallel using either threads or processes.
Processes do not share memory and run on a single core and are better for compute-intensive tasks that do not need to communicate. Threads share memory, but in Python, due to the Global Interpreter Lock (GIL), two threads cannot operate at the same time in the same program and only some operations can be run in parallel using threads. (For more on threads/processes seethis excellent article.)
Since calculating a feature matrix is compute-intensive and can be done independently for each partition, we want to useprocesses. The tasks do not need to share memory because each feature matrix does not depend on the others. In computer science terms, by partitioning the data, we have made our problemembarrassingly parallelbecause there is no need for communication between the workers.
If we start Dask using processes — as in the following code — we get 8 workers, one for each core, with each worker allotted 2 GB of memory (16 GB total / 8 workers, this will vary depending on your laptop).
To check that everything worked out, we can navigate to localhost:8787 where Dask has set up a Bokeh dashboard for us. On the Workers tab, we see 8 workers each with 2 GB of memory:
Workers created by Dask with processes = True (run on a MacBook with 8 cores and 16 GB of RAM).
At the moment, all 8 workers are idle because we haven’t given them anything to do. The next step is to create a “Dask bag” which is basically a list of tasks for Dask to allocate to workers. We make the “bag” using thedb.from_sequencemethod and the list of partition paths.
Then, wemapcomputation tasks onto the bag. Tomapmeans to take a function and a list of inputs and apply the function to each element in the list. Since we first need to make anEntitySetfrom each partition, we map the associated function to the “bag”:
Next we're doing another mapping, this time to make the feature matrixes:
This code will take the output of the firstmap — theEntitySet — and pass it to the secondmap.These steps don’t actually run the computations, but rather make a list of tasks that Dask will then allocate to workers. To run the tasks and make the feature matrices we call:
Dask automatically allocates tasks to workers based on the task graph (aDirected Acyclic Graph) constructed from the mappings. We can view the task graph and status on the Bokeh dashboard as the computation occurs.
Dask task graph midway through the computation process.
The set of blocks on the left represent theentity_set_from_partitionfunction calls and the blocks on the right arefeature_matrix_from_entityset.From this graph, we can there is a dependency between the two functions butnotbetween feature matrix calculations for each partition.
There are a number of other visualizations on the Bokeh dashboard including a task stream (left below) and a profile of operations (right below):
Task Stream (left) and Profile (right) of ongoing computation.
From the task stream, we can see that all eight workers are in use at once with a total of 208 tasks to complete. The profile tells us that the longest operation is calculating the feature matrix for each partition.
On my MacBook, it took 6200 seconds (1.75 hours) to build and save all 104 feature matrices. That’s quite an improvement, all from rewriting our code to use our available hardware as efficiently as possible.
Instead of getting a larger computer, we rewrite our code to make the most efficient use of the resources we have. Then, when we do get a bigger computer, we’ll be able to use the same code to minimize computation time.
Building One Feature Matrix
Once we have the individual feature matrices, we can directly use them for modeling if we are using an algorithm that allowson-line — also called incremental — learning. Another option is to create one feature matrix which can be done in pure Python using Pandas:
Code to join together feature matrices.
The single feature matrix has 350,000 rows and 1,820 columns, the same shape as when I first made it using a single core.
Subset of complete feature matrix.
Rather than thinking of how to get more computational power, we should think about how to use the hardware we do have as efficiently as possible. In this article, we walked through how we can parallelize our code using Dask which lets us use a laptop to complete a calculation8 times faster than on a single core.
The solution we engineered makes use of a few key concepts:
Break up the problem into smaller, independent chunks
Write functions to process one chunk at a time
Delegate each chunk to a worker and compute in parallel
Now we can not only take advantage of the speed and modeling performance of automated feature engineering with Featuretools, but we can use Dask to carry out our computations in parallel and get results quicker. Moreover, we can use the same methods to scale to larger datasets, leaving us well-situated for whatever machine learning problems come our way.
If building meaningful, high-performance predictive models is something you care about, then get in touch with us atFeature Labs. While this project was completed with the open-source Featuretools, thecommercial product offers additional tools and support for creating machine learning solutions.