Making a Simple PySpark Job 20x Faster with the DataFrame API

November 17, 2021

At Abnormal Security, we use a data science-based approach to keep our customers safe from the most advanced email attacks. This requires processing huge amounts of data to train machine learning models, build datasets, and otherwise model the typical behavior of the organizations we’re protecting.

One of the tools we use for data processing is PySpark, a Python layer on top of Apache Spark’s Java API. PySpark allows us to iterate rapidly on our ML products in Python and also deploy them for training in a highly scalable environment. But there’s one major downside: compared to native (JVM) Spark, performing a task on PySpark can be an order of magnitude more expensive.

In this blog post, I’ll provide a brief overview of the design choices that lead to this performance disparity. Then, I’ll walk through an example job where we saw a 20x performance improvement by re-writing a simple filter with Spark’s DataFrame API.

Overheads, Under the Hood

To begin, it’s necessary to understand the reasons behind the difference in performance between PySpark and native Spark.

It’s not as simple as saying Python is slower than Java. From a pure performance perspective, a program written in Python will most often run significantly more slowly than one written in Java. Still, there’s more at play under the hood that makes PySpark even slower than one might expect when looking only at language performances. To explain this larger difference, we have to look at how PySpark builds on top of core Spark functionality.

Pyspark data flow

An old-but-still-accurate document on PySpark’s internals gives a good overview of how PySpark works on top of core Spark. Remember all of those great data science libraries we wanted to use in Python? That code, along with the rest of our heavy-lifting, non-driver application code, all run in Python subprocesses in each worker in the Spark cluster.

To run our Python program there, all of our input data, broadcast variables, serialized Python code, and any other required context is sent over a Unix pipe from the JVM-based Spark worker process to the Python subprocesses. These are the same Unix pipes you use when redirecting output from one command as the input to another, like when you run cat my_file | grep xyz. There is some I/O cost associated with this operation, but it’s relatively cheap. The main performance penalty comes from the fact that all of this data must be (de-)serialized every time we communicate across these pipes.

Additionally, a smaller, but still important consideration is that for n Python processes, we have to create n copies of any shared data, like broadcast variables, which can create a much larger memory footprint. In JVM Spark, multi-threading can be used, and so this common data can be shared across threads. In practice, this means that a PySpark is more likely to be memory-constrained, which leads to expensive vertical scaling.

A Simple Example Job

Although these performance penalties often lead to much more expensive batch jobs, we’re happy with the tradeoff; for us, being able to develop these pipelines in Python is totally worth it for the ecosystem of data science it unlocks.

But there are times when we don’t need to run any special Python code, and just want to apply some simple ETL logic. Let’s take a real-world example job where we just want to loop over a set of attributes, filter our input dataset to records matching the current attribute, and run a side effect on the filtered set, like writing back out to storage. Sample code might look like this:

# Cache the input RDD, since we will be using it many times
rdd = _read_input_rdd(...).cache()
for target_attribute in target_attributes:
 filtered_rdd = rdd.filter(lambda x: x.attribute == target_attribute)
 _write_to_storage(filtered_rdd, target_attribute, ...)

For each iteration of our loop here, we get the Spark stage shown below. This isn’t too surprising: `filter` is implemented with a MapPartitions operation; after that, we run our write operation.

Pyspark map partitions

There are some obvious issues with our code here, like why don’t we just do one shuffle of the data to repartition our records by the `attribute` property?, but let’s assume for now that, due to constraints outside the scope of this post, we can’t improve on the overall loop-and-filter algorithm. This is a case where we’re not using any of those fancy Python libraries to process our data. Do we really need to run this in Python and incur all the costs associated with the PySpark design?

I Declare Efficiency!

It turns out that the folks working on Spark have thought about this quite a bit, and they offer a solution called the DataFrame API. This is probably familiar to anyone who’s worked with Spark before, but it’s worth thinking about why one might use this functionality rather than the core RDD API, which allows the user to define everything in simple, native Python.

At a high level, the DataFrame API constrains the programming model to a more relational, declarative style. Just as a relational database compiles SQL code into lower-level instructions, a query optimizer compiles this DataFrame code into the lower-level RDD API. This query optimizer, called Catalyst, applies a variety of clever logical tricks that the application developer probably doesn’t want to think about with a deadline looming. The declarative interface here limits the user’s expressiveness, but the simplification also allows the library to automatically incorporate reusable and sometimes drastic optimizations under the hood.

Let’s try rewriting our above code in this DataFrame API and see if we receive any performance improvements:

# Initialize the SQLContext so that we can use DataFrames

# Convert to Row objects for the DataFrame, pre-serializing our object for storage
attribute_and_blob_rdd =
 lambda x: Row(attribute=x.attribute, blob=bytearray(_serialize(x))

# Convert to DataFrame and cache, as before
attribute_and_blob_df = attribute_and_blob_rdd.toDF().cache()
for target_attribute in target_attributes:
 df_by_attribute = attribute_and_blob_df.filter(
 attribute_and_blob_df.attribute == target_attribute

 # Convert back to storage-compatible RDD and write
 blob_rdd = row: row.blob)

Here we make a few improvements over the original code. First, we convert to a tabular format and call .toDF() to perform our filter operation in the DataFrame API. This may look like a trivial change, but it will be clear in a moment how extreme the optimization that can be made here is.

The second change we make here is to immediately serialize each record into our output format. While we have to do this serialization work before writing no matter what, doing it here allows the filter operations to process a compressed form of the data. We won’t need to inspect or modify this object again before writing to storage, anyway.

Let’s see what our Spark job looks like this time. Here’s the Spark UI’s DAG visualization of our critical stage:

Pyspark dag visualization

The operations shown here give us a glimpse of some of the magic that the query optimizer now provides. The key stage here is the last one: Spark has automatically cached an in-memory data structure in order to perform a special operation called InMemoryTableScan. At a high level, this operation seems to be building up an in-memory columnar data structure so that our filter operation only has to scan our small `attribute` column and can ignore the much larger serialized blob. Even if you could figure out how to make this happen yourself—and you probably shouldn’t try, because you care about your application logic—you don’t have to. Spark already knows when to do this automatically, as long as you use the DataFrame API.

In practice, we found that this optimization improved the performance of the Spark job by about 20x. So why do we care? Scaling existing jobs to run more efficiently means more time to focus on new development and other team priorities. Plus, we’ve been able to repeat this pattern for other jobs still using the RDD API for quick performance improvements.

If you’re excited about scaling ML products while fighting cybercrime, check out our careers page or stay in touch by following us on Twitter and LinkedIn!

Related Posts

B 12 03 22 SIEM
Learn about Abnormal’s enhanced SIEM export schema, which provides centralized visibility into email threats
Read More
Blog phishing cover
The phishing email is one of the oldest and most successful types of cyberattacks. Attackers have long used phishing as a common attack vector to steal sensitive information or credentials from their victims. While most phishing emails are relatively simple to spot, the number of successful attacks has grown in recent years.
Read More
Blog brand cover
For those of you who have visited the Abnormal website over the last month, you’ve seen something different—a redesigned brand focused on precision. It’s new and innovative, and different from any other cybersecurity company, because it was created with one thing in mind: our customers.
Read More
B 11 22 21 AAA
At Abnormal, our customers have always been our biggest priority. Customer obsession is one of our five company values, and we live this every single day as we provide the best email security protection available for the hundreds of companies who entrust us to protect their mailboxes.
Read More
Blog microsoft abnormal cover
Before we jump into modern threats, I think it’s important to set the stage ​​since email has been around. Since email existed, threat actors targeted email users with malicious messages, general spam, and different ways to take advantage of the platform. Then of course, more dangerous attacks started to come up… things like malware and other viruses.
Read More
Blog black friday scam cover
While cybersecurity awareness is a year-round venture, it is especially important to be mindful during certain times of the year. With Thanksgiving here in the United States on Thursday, our thoughts will likely be on our family and friends and everything we have to be thankful for this holiday season.
Read More
Blog automation workflows cover
Our newest platform capabilities help customers streamline critical security workflows, like triaging phishing mailbox submissions or triggering tickets to investigate account takeovers, through automated playbooks. Doing so can decrease mean time to respond (MTTR) to incidents, further reducing any potential risk to the organization and eliminating manual workflows to save time and increase the efficiency of IT and security teams.
Read More
Blog tsa scam cover
On November 9, 2021, we identified an unusual phishing email that claimed to be from “Immigration Visa and Travel,” inviting the recipient to renew their membership in the TSA PreCheck program. The email wasn’t sent from a .gov domain, but the average consumer might not immediately reject it as a scam, particularly because it had the term “immigrationvisaforms” in the domain. The email instructed the user to renew their membership at another quasi-legitimate-looking website.
Read More
Blog pyspark cover
At Abnormal Security, we use a data science-based approach to keep our customers safe from the most advanced email attacks. This requires processing huge amounts of data to train machine learning models, build datasets, and otherwise model the typical behavior of the organizations we’re protecting.
Read More
Blog tiktok attack cover
As major social media platforms have expanded the ability of creators to monetize their content in the last few years, they and their users have increasingly found themselves the targets of malicious activity. TikTok is now no exception.
Read More
Blog ransomware guide cover
While various state agencies and the private sector keep track of ransomware attacks and related tactics worldwide, malicious actors change and evolve their ransomware strategies all the time. We’ve put together a comprehensive guide that will define ransomware, how to detect it, and what steps to take if you’ve fallen victim to a ransomware virus attack.
Read More
Blog detection efficacy cover
One of the key objectives of the Abnormal platform is to provide the highest precision detection to block all never-before-seen attacks. This ranges from socially-engineered attacks to account takeovers to everyday spam, and the platform does it without customers needing to create countless rules like with traditional secure email gateways.
Read More