Structured streaming from Event Hub using Databricks

Structured streaming from Event Hub using Databricks

Real-time processing or live dashboarding has gained an enormous infatuation in the recent data engineering era to enable data-driven decisions. With the emerging need to solve critical business problems, the appeal towards usage of streaming frameworks has not been left behind.

With existing features like no data loss, zero fault tolerance, maintaining a checkpoint, and real-time processing, the streaming frameworks in the current data engineering era have left no stone unturned.

What is structured streaming?

Structured streaming is a real-time data processing approach built on top of Spark SQL engine providing numerous benefits such as error tolerance, optimized computations, book-keeping and maintaining logs.

In this blog, we will discuss spark structured streaming using Databricks to stream data through the Event Hub instance.

Here is the list of prerequisites to proceed:

Steps to stream data through the Event Hub instance

1. Installation of required libraries on Databricks cluster:

We assume that the cluster has been provisioned in Databricks workspace. If you haven’t, you can follow the guidelines to create a cluster in your Databricks workspace using “Create Spark Cluster in Databricks” Document.

In this blog, we use the Azure EventHub connector for Apache spark to read data from Azure Event Hubs. To use these APIs as a part of our Databricks cluster, we need to add them as libraries to our Spark cluster in Azure Databricks.

The below steps demonstrate how we can add a library to the existing spark cluster:

a. In our Azure Databricks workspace, select “Compute”, and navigate to the Spark cluster on which libraries are to be installed.

Azure Databricks workspace

From the cluster menu, select “Libraries” and click “Install New”.

Azure Databricks cluster

b. From the Install library page, for Library Source, select “Maven”. Further, enter the following coordinates for the Spark Event Hubs connector into the “Coordinates” section. Coordinates for Spark Event Hubs connectorcom.microsoft.azure:azure-event hubs-spark_2.12:2.3.22

Make sure you install the correct connector library version based on your Spark and Scala version (See Databricks Runtime Version from cluster configuration page as shown in above image); else it may cause an issue during cluster initialization.

Azure Databricks library source

And click Install.

2. Getting connection string from Event Hub instance

A connection string is essential to connect to our Databricks instance with the provisioned Event Hub using the Spark streaming framework.

To get the connection string from the Azure portal, we will first need to navigate to the “Event Hub Namespace”, inside which we can find the Entities section and click on “Event Hubs” as shown below:

Azure Databricks Events Hub

Once navigated to the Event Hub instance using the Azure portal, we can scroll to left-hand side options under the Settings section on “Shared access policies“. Here you can create new policies and allow permissions as per your requirements just by clicking on the “+ Add” button.

Test event hub namespace

In our case, we would only require “Listen” permission to be enabled since we would be streaming data from the Event hub and dumping it to the Adls container storage location. Still, you could use “Manage” permission if two-way communication must be established.

You can see the “Connection string-primary key” highlighted above; you can copy and keep it safe for further process to establish the connectivity.

Recommendation: To store the Event Hub Connection string as a secret in the Azure key vault

As a best practice, we recommend storing the Event Hub instance connection string as a secret in Azure Key Vault so that keys and secrets can be managed or accessed more securely over a network.

For the above to be implemented, you would require providing a key vault instance on Azure and configuring a key vault-backed secret scope for the Databricks environment.

We recommend following guidelines to configure key vault-backed secret scope in Databricks from the following blog: Create secret scope in Azure Databricks.

You can securely access the connection string using the scope in Databricks; else, for the time being, you can even hardcode the connection string value (*not recommended) and use it as shown below:

Continuous streaming with processing time

3. Using Databricks to stream data through the Event Hub instance

Now that we have installed the required library on the Databricks cluster and captured the connection string from the Event Hub instance, we will be creating a Python notebook to leverage Spark structured streaming framework capability to capture messages from Event Hub in two different ways:

a. Continuous streaming using the processing time trigger

b. Streaming using the run once trigger (recommended for cost-cutting)

a. Continuous Streaming using the processing time trigger approach

This approach can be considered in scenarios where even a minute latency is not acceptable such as live dashboarding for stock market statistics, logistics management to track delivery timings, current locations and manage outstanding orders, and many more. Here, the cluster will be up and running 24/7, thus highly impacting the infrastructure cost.

In this approach, where the query is fired at intervals based on the processing time trigger, it is still required for the cluster to be up and running continuously 24/7.

One of the key use cases for using processing time trigger while Streaming is to avoid out-of-memory issues in case of huge incoming data since the data gets divided into time frames of processing time declared in the query.

By default, the processing time is set to 500 milliseconds; we could put the processing time interval such that it might resolve the out-of-memory issues while considering that it would also avoid checking for data availability at source and not end up processing undersized data.

Below, you can find the snippet used to read a stream from the Event Hub using a connection string encrypted at line no. 5., and the read format specified for stream here is “eventhubs”.

Continuous streaming with processing time using Databricks

The format for writeStream here is specified as “parquet”, i.e., we are writing a parquet file to our destination folder.

We have also specified our destination folder using the “path” option, i.e., in our case, a new parquet file would be written to location: /mnt/raw/data/source=test-eventhub/dataset=test-eventhub-stream/ at ADLS.

Specify destination folder path Databricks

The “checkpointLocation” option plays an essential role in our query as it stores the metadata such as run id and is responsible for leveraging one of the fantastic features known as fault-tolerance, which helps recover from the last left-off state of the query. Therefore, it is always recommended to enable checkpoint option. You must remember that no two queries should have an exact checkpoint location whereas changing the checkpoint location for any query is also not recommended.

Changing check point location for query

For the trigger applied here as processingTime = “30 minute”, the query is fired at every 30 minutes of interval and fetches only the new input stream data from the last left-off query state. i.e., new data arrived in the last 30 mins. You can also customize the processing time as per your requirement in minutes or even in seconds.

Even though the query is fired every 30 mins, you will still find the notebook cell continuously running. Thus, the cluster would be up and running 24/7, which could lead to unnecessary high cluster costs due to idle cluster lying and resulting in wastage of resources, if no reasonable amount of data is processed at each interval.

You can see the last updated time highlighted below:

Continuous streaming with processing time - data processing

In case your requirement matches this approach and emphasizes facing minimal latency, one of the ways to productionize this activity is to set up a Databricks job. Since the notebook will be executing 24/7, just apply an immediate and unlimited retry policy on notebook. Make sure you set up maximum concurrent runs to 1 and even configure alerts on an email address if required as shown below:

Test continuous streaming with processing time

b. Streaming using the run once trigger approach

Structured streaming generally refers to streaming live data 24/7 or on a real-time basis. Still, there could be scenarios where data only arrives at a fixed interval or at a specific timeframe throughout the day, such as calculating stock age for a chain of retail stores/warehouses achieved once a day. In this case, we could use the triggered approach, which is recommended to avoid the Databricks cluster from staying up and running 24/7, leading to higher infrastructure costs.

Snippet for streaming with trigger set as once:

Streaming using the run once trigger approach

If a latency of a couple of hours is acceptable for your solution, you won’t need to worry about the cluster costs as it will not require you to stay up and active 24/7. The trigger “Once=True” will do the job for us since book-keeping is handled by structured streaming. It will figure out the new data ingested on its own.

To automate this process of triggering, the stream once daily or at specific intervals can be achieved using two ways:

    • Using Databricks Jobs for triggering notebook.
    • Using Azure Data Factory jobs for triggering notebook.

You could create a Databricks Job as discussed above and just add a schedule parameter to it as per your requirement, as shown below:

Streaming using the run once trigger approach - Schedule

In the above snapshot, we have set a scheduled trigger that fires the notebook run daily at 14:21 UTC.

Another way could be to use Azure Data Factory jobs for triggering notebook. Here, you could leverage Databricks Notebook activity from ADF to execute the notebook as demonstrated below:

Databricks notebook activity from ADF

And even apply triggers as per your requirement over pipelines as below:

Streaming using the run once trigger approach - edit trigger

In the above snapshot from ADF triggers, we have set a scheduled trigger that fires the pipeline every day at 14:21 IST.

Conclusion

Streaming using Databricks and Azure services can help solve numerous problems around analyzing real-time data, which can be used for live dashboarding and help businesses make proactive, data-driven decisions. This approach is highly recommended where even minimal latency is not accepted such as calculating stock age or live data visualization.

In one of our projects for a large conglomerate, we used the “streaming using the run once trigger” approach for the retail industry to calculate the stock age for multiple products across their chain of stores and warehouses and maintain a dashboard to keep track of stock aging KPI.

We at Neal Analytics can guide your data modernization journey by adopting streaming approaches which could lead to higher visibility, proactive business decisions, and minimal downtime. Contact us to know more!

 

References:

  1. Structured Streaming Programming Guide – Spark 3.3.0 Documentation (apache.org)
  2. Create Your Azure Free Account Today | Microsoft Azure
  3. Azure Quickstart – Create an event hub using the Azure portal – Azure Event Hubs | Microsoft Docs
  4. Quickstart – Run a Spark job on Azure Databricks Workspace using Azure portal | Microsoft Docs
  5. GitHub – Azure/azure-event-hubs-spark: Enabling Continuous Data Processing with Apache Spark and Azure Event Hubs
  6. Create Secret Scope in Azure Databricks (bigdataprogrammers.com)
  7. Running Streaming Jobs Once a Day For 10x Cost Savings – The Databricks Blog
  8. Recover from Structured Streaming query failures | Databricks on AWS
  9. Configure Structured Streaming trigger intervals on Databricks | Databricks on AWS