Spark is critical to the modern data stack. As such, it’s extremely important to have the right level of observability for your Spark environments. There are plenty of options for monitoring Spark—including SaaS programs that provide you with pre-configured dashboards for Spark and Spark SQL metrics. What if that’s not enough?
Typical Spark application setup, whether it’s a self-hosted or managed solution, includes some operational dashboards for cluster health monitoring. But while those dashboards are very useful, they only bring us an infrastructure overview and not the actual metrics related to data. Yes, we can assume there may be something wrong with the app when the CPU has increased usage or the cluster is running out of RAM, but it doesn’t help when the source changed the schema or data that came from another department is broken. Most issues that engineers face are caused by data and not by the underlying infrastructure so they have to spend a lot of time reproducing issues or tinkering around with files and buckets like detectives. This where the actual application monitoring can help.
Every situation calls for a different level of visibility, and data engineers need to have the ability to go a level deeper than execution metrics. Otherwise, you can spend a significant amount of time debugging data quality issues in Spark.
In this guide, you’ll learn how to get high-levels and low-levels of data observability for Spark. For the high-level, you’ll be using Spark’s internal systems like Listener APIs and Query Execution Listeners. For the low-level, you’ll learn how to use libraries to track data quality metrics.
After learning to do both, you’ll have the option to pick whichever one works best for the problem you’re trying to solve.
This is a very old and bulletproof way of getting metrics. Actually, Spark UI (link resides outside ibm.com) utilizes the very same mechanism to visualize metrics. Spark listeners API allows developers to track events which Spark emits during application execution. Those events are typically application start/end, job start/end, stage start/end etc. You can find the full list in Spark JavaDoc (link resides outside ibm.com). It’s easy to configure and easy to use Spark Listeners to grab metrics. After performing each of the operations, Spark will call Spark Listener and pass some metadata information to it’s method. This will include things like execution time, records read/written, bytes read/written and other.
This very basic and low-level data quality monitoring will check records count and size. Imagine you have some job which runs on a daily basis and executes some transformation/analytics on incoming data sets. You can write a listener which checks how many records were read from input and compare it with the previous day’s result. When the difference is significant, we can assume that something can be wrong with the datasource.
However, this approach requires writing in-house monitoring solutions. Metric values should be stored somewhere, alert mechanisms should be configured. When application code will change, all metrics keys will also change and one should handle it properly.
However, even a simple Spark Listener can give some insights to your data.
Here is an example of such a Spark Listener:
public class SomeSparkListener extends SparkListener { /** * This very simple spark listener print metrics collected for every stage. * * @param stageCompleted */ @Override public void onStageCompleted(SparkListenerStageCompleted stageCompleted) { StageInfo stageInfo = stageCompleted.stageInfo(); Iterator it = stageInfo.taskMetrics().accumulators().iterator(); while (it.hasNext()) { AccumulatorV2You can add Spark Listener to your application in several ways:
Add it programmatically:
SparkSession spark = SparkSession.builder().getOrCreate(); spark.sparkContext().addSparkListener(new SomeSparkListener());Or pass it via spark-submit/spark cluster driver options:
spark-submit --conf "spark.extraListeners=ai.databand.SomeSparkListener"This is another mechanism for Spark monitoring provided out-of-the-box. Instead of focusing on very low-level metrics, Query Execution Listener allows developers to subscribe to query completion events. It provides a more high-level metadata about query executed like logical and physical plans, and execution metrics.
You can get metrics like records read/written by query, but this time aggregated for the whole query instead of specific tasks/jobs/stages.
Also the very useful information can be extracted from plans like data location and schema. You can extract and store schema along with dataframe dimensions and compare it to the previous runs, triggering alerts when something is going wrong.
However, extracting data from a plan can be complicated because you’re forced to use a low-level Spark API.
Also, all operational burdens with implementing metrics storage and alerting mechanisms are still present. What you’ll get from the Spark is just metadata. It’s the developer’s responsibility to utilise it.
Here is an example of simple Query Execution Listener which prints plan and metrics:
public class ExampleQueryExecutionListener implements QueryExecutionListener { /** * Print plan and query metrics * * @param funcName * @param qe * @param durationNs */ @Override public void onSuccess(String funcName, QueryExecution qe, long durationNs) { System.out.println(qe.executedPlan().prettyJson()); Iterator it = qe.executedPlan().metrics().iterator(); while (it.hasNext()) { Tuple2 next = it.next(); System.out.printf("Key: %s, value: %s%n", next._1(), next._2().value()); } } @Override public void onFailure(String funcName, QueryExecution qe, Exception exception) { } }Query execution listeners can be added either programmatically or via configuration:
In application code: SparkSession spark = SparkSession.builder().getOrCreate(); spark.listenerManager().register(new ExampleQueryExecutionListener());Via spark-submit:
spark-submit --conf "spark.sql.queryExecutionListeners=ai.databand.ExampleQueryExecutionListener"Implementing low-level monitoring can be some serious heavy-lifting, however, the “system” way of monitoring has a huge benefit: it doesn’t introduce computational overhead. Since the metadata is emitted and recorded by Spark internals it doesn’t give any penalties to query execution times.
Using Listeners for monitoring allows you to avoid touching any application code. This can have huge benefits when you want to track data on existing and legacy applications but don’t have the budget to make changes. Just write a listener, pass it via spark configuration and get a picture of your data.
You can greatly increase your confidence in incoming data by validating it manually. Let’s say we expect some number of records in the input datasource and this number shouldn’t be usually lower than X. We can write something very simple like:
df = spark.read("path") if (df.count < X) { throw new RuntimeException("Input data is missing") }
The possibilities here are unlimited. We can compare counts, non-null values count, inferred schemas, etc.
Since many quality checks are more or less trivial, like ensuring your dataframe has proper shape and contents, the community developed convenient libraries for such checks. One of those libraries is Deequ (link resides outside ibm.com). It provides a rich Domain-specific language (DSL) for most cases. Check it out. Also it has advanced things, like the ability to profile columns—calculate min/max/mean/percentiles, calculate histograms, detect anomalies and many more.
Consider following example from Deequ docs:
val verificationResult = VerificationSuite() .onData(data) .addCheck( Check(CheckLevel.Error, "unit testing my data") .hasSize(_ == 5) // we expect 5 rows .isComplete("id") // should never be NULL .isUnique("id") // should not contain duplicates .isComplete("productName") // should never be NULL // should only contain the values "high" and "low" .isContainedIn("priority", Array("high", "low")) .isNonNegative("numViews") // should not contain negative values // at least half of the descriptions should contain a url .containsURL("description", _ >= 0.5) // half of the items should have less than 10 views .hasApproxQuantile("numViews", 0.5, _ <= 10)) .run()
You can see we have a huge set of checks wrapped in a nice and ready-to-use DSL.
More important, Deequ provides the ability to store checks results and automatically run comparisons with previous runs. This can be done by utilizing Metrics Repositories (link resides outside ibm.com). One can write their own implementation and seamlessly integrate Deequ into existing monitoring infrastructure.
While high-level, application quality checks are way more flexible than low-level approaches, they come with a big downside: performance penalties. Since every calculation emits spark operation, overhead can be very significant in some cases, especially on the large data sets. Each “count” and “where” can lead into full scans. Spark internally will do its best to optimize execution plans but you should consider these implications and ensure data profiling won’t harm your performance.
We have reviewed several ways of monitoring data quality for Spark applications. Low-level approach utilizes Spark Event Listeners API and gives access to low-level metrics like records read/written, logical/physical plans and can be useful for building trends and making sure data pipeline produces proper results and getting overview on existing applications without any code modifications. High-level approaches like checking data by hand or using data quality libraries is much more convenient but has drawbacks like performance penalties.
As in any real world situation, there are always trade-offs and better scenarios for both approaches, depending on your application type. Use it wisely.
At IBM® Databand®, we utilize both ways to provide a comprehensive set of options to track Spark applications. While in our core we use Spark Listeners to build metric trends and data lineage, we also provide convenient Metrics Store for Deequ as well as ability to track individual manually-calculated metrics.
Learn more about Databand’s continuous data observability platform and how it helps detect data incidents earlier, resolve them faster and deliver more trustworthy data to the business. If you’re ready to take a deeper look, book a demo today.
Get an in-depth understanding of how hybrid cloud blends private and public cloud environments to enhance your business. Learn about its components, benefits and use cases, and see how it can drive transformation and innovation in your organization.
Learn how DevOps streamlines development and operations, boosting collaboration, speed and quality. Explore key practices and tools to enhance your organization's efficiency.
Discover IBM cloud migration solutions designed to streamline your journey to the cloud. Learn about different migration types, strategies and benefits that drive efficiency, scalability and innovation.
Explore the key differences between public, private and hybrid cloud solutions with IBM. Understand which cloud model best suits your business needs for enhanced flexibility, security and scalability.
Learn 5 ways IBM Cloud is helping clients make the right workload-placement decisions based on resiliency, performance, security, compliance and TCO.
By applying IBM Watson Discovery, watsonx Assistant and watsonx.ai on IBM Cloud, the EdTech firm has not only enhanced the learning experience for its customers but also achieved significant business benefits.
Discover how Cloud Paks can transform your business operations and drive faster, smarter outcomes in a hybrid cloud environment.
Use our cloud services, powered by our IBM Consulting Advantage platform, to speed your journey to hybrid cloud, driving cost efficiency, increased productivity, sustainability and faster time to market.
Streamline your digital transformation with IBM’s hybrid cloud solutions, built to optimize scalability, modernization, and seamless integration across your IT infrastructure.