Managing incremental loads through ADF V2 using the Lookup activity

Managing incremental loads through ADF V2 using the Lookup activity

Introduction

Among the many tools available on Microsoft’s Azure Platform, Azure Data Factory (ADF) stands as the most effective data management tool for extract, transform, and load processes (ETL). This continues to hold true with Microsoft’s most recent version, version 2, which expands ADF’s versatility with a wider range of activities. These activities provide new functionalities, including conditional statements, loops, look ups, and more. The addition of this new functionality opens the door for simpler pipelines and fewer work arounds regarding traditional ETL approaches to data management. In particular, the new lookup activity provides a more elegant way to develop incremental loads when managing exceptionally large transactional datasets. My goal today is to provide an example pipeline where we can demonstrate this functionality for users new to ADF version 2 (ADFv2). As a note, I will not be explaining ADF v2 datasets or linked services but rather focusing on the activity used when developing an incremental load workflow.

Approach to managing incremental loads

When developing ETL pipelines for transactional datasets I tend to think simple is best. Therefore, I usually start with a three step processes and adjust from there. In this case, we will stick to the basics and discuss these three steps, but first let us discuss the architecture we are dealing with. Again, to keep things simple, our architecture will use an On-Premises SQL Server (On-Prem) as our source and an Azure Synapse Analytics instance serving as the destination. Within the On-Prem our table will be called “source_table”. For ADW we will have two tables, a staging table to hold our data deltas (the new records we are loading) called “staging_table” and a destination for our data, aptly named “destination_table”.

Now that the architecture is defined, let’s take a quick look at the data. What you should note is we have two tables, source_table and destination_table. The source data contains all transactions to date with updated records. The destination table is a week behind and we need to develop our pipeline to update the destination_table as efficiently as possible. The most efficient way being an extract of only new or updated transactions (delta transactions) from the source_table rather than the entire dataset. We know a transaction is new based upon its Modified_Date column. This column represents when the transaction was created and/or last updated. We do not use Transaction_Date as that only capture when the transaction was created and not if it has been updated.

Alright, we now have context around the architecture and the dataset. Now to discuss the three step ETL process. The steps are as follows:

  • Identify Latest Modified_Date in the Destination_Table
  • Extract Delta Transactions from Source_table
  • Upsert New/Updated Transactions. Each of these steps has a corresponding ADFv2 activity within the pipeline. They are Look Up Activity, Copy Activity, and SPROC Activity respectively. Let us explore each of these ADFv2 Activities below.

LookUp activity

One of the new activities available in ADF v2 is called the Lookup Activity. This new control flow activity provides the user the ability to extract data points for reference in future activities within a pipeline. In this example, we will use it to extract a Modified_Date from our destination_table. The activity itself includes three major components: type, dataset, and source. The type field is simply ‘Lookup’ which informs ADF v2 that we are extracting data for reference rather than loading it into a new location. The Dataset field should reference an ADF v2 dataset which defines the destination_table from our ADW instance. Lastly, we have a sqlReaderQuery which allows us to perform a SQL select statement to grab our latest date_modified value. In this case, the value should be 12/10/2017. This value means that our destination_table has no transactions after 12/10/2017.

LOOK UP MAX DATE

{
"name": "Lookup Max Date",
"type": "Lookup",
"typeProperties": {
"dataset": {
"referenceName": "destination_table",
"type": "DatasetReference"
},
"source": {
"type": "SqlDWSource",
"sqlReaderQuery": "select max(date_modified) as MaxDate from dbo.destination_table"
}
}
}

Copy Activity

Once we have identified our latest Modified_Date from our destination_table, we can use this information during our copy activity. The copy activity consists of a series of major components, the important ones being sqlReaderQuery, dependsOn, input, and output. The input and output components are ADFv2 datasets which reference the source_table and staging_table respectively. The dependsOn component is important for process flow and references the Lookup Activity. This reference is responsible for ensure that the Copy Activity does not start until after the Lookup Activity completes.

Within the Copy Activity, the goal is to extract the delta transactions from our On-Prem source_table. We accomplish this through a unique SQL select statement within the sqlReaderQuery component. The query includes a where statement with a reference to the ‘Lookup’ activity. In this case we are referencing the output of the activity by grabbing the MaxDate value from the first row. By using this query, we are now only capturing the delta transactions from our source_table. Note that Transaction_ID 100002 is included within this list. This is because it was updated on 12/12/2017, after our Modified_Date cut off.

LOAD INCREMENTAL DATA

{
"name": "Load Incremental Data",
"type": "Copy",
"typeProperties": {
"source": {
"type": "SqlSource",
"sqlReaderQuery": "select * from dbo.source_table where [date_modified] >
\'@{activity('Lookup Max Date').output.firstRow.MaxDate}\'"
},
"sink": {
"type": "SqlDWSink",
"allowPolyBase": true,
"polyBaseSettings":
{
"rejectType": "percentage",
"rejectValue": 10.0,
"rejectSampleValue": 100,
"useTypeDefault": true
}
}
},
"dependsOn": [
{
"activity": "Look Up Max Date",
"dependencyConditions": [ "Succeeded" ]
}
],
"inputs": [
{
"referenceName": "source_table",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "staging_table",
"type": "DatasetReference"
}
]
}

 

SPROC Activity

With the delta transactions loaded into our staging_table only one steps remains in the ETL process. This is the upsert steps that loads new/updated transactions into our destination_table. This process is managed by a stored procedure (SPROC) within ADW. The SPROC has three steps which are noted below in the query. The takeaway from the query is that we delete any transactions in the destination_table that are within the staging table. We then insert the delta transactions from the staging_table into the destination_table. Lastly, we wipe clean the staging_table. These three steps in the SPROC ensure we have no duplicated transactions within the destination_table by deleting updated transactions and reinserting them from the staging_table. This SPROC is then called through a SqlServerStoredProcesure activity. There are no datasets within this activity and instead we only make reference to the ADW through a linked service and call the specific stored procedure under typeProperties. Lastly, we set Data Factory so this activity depends on the Copy Activity and will not run until that activity completes.

SPROC MANAGE UPSERTS

{
"name": "SPROC Manage Upserts",
"description":"SPROC To Delete Duplicates and Manage Upserts",
"type": "SqlServerStoredProcedure",
"linkedServiceName": {
"referenceName": "destination_sqldw",
"type": "LinkedServiceReference"
},
"typeProperties": {
"storedProcedureName": "sp_upsert_staging"
},
"dependsOn": [
{
"activity": "Load Incremental Data",
"dependencyConditions": [ "Succeeded" ]
}
],
}

SP_UPSERT_STAGING

DELETE FROM destination_table
WHERE Transaction_ID in (SELECT DISTINCT Transaction_ID FROM staging_table)

INSERT INTO destination_table
SELECT * FROM staging_table

DELETE FROM staging_table

Conclusion

As you can see, ADFv2’s lookup activity is an excellent addition to the toolbox and allows for a simple and elegant way to manage incremental loads into Azure. While ADFv2 was still in preview at the time of this example, version 2 is already miles ahead of the original. Not only do the new activities provide a broad set of much needed features, but it marks an important milestone in the tool’s history. One where Azure Data Factory has become a more realistic replacement for some of Microsoft’s more traditional ETL tools like SSIS. Furthermore, this was just one example of the new activities with multiple others still available. For those who are interested you can find Microsoft’s documentation for ADFv2 at https://docs.microsoft.com/en-us/azure/data-factory/.

Completed pipeline

{
"name": "Incremental_Pipeline",
"properties": {
"activities": [
{
"name": "Lookup Max Date",
"type": "Lookup",
"typeProperties": {
"dataset": {
"referenceName": "destination_table",
"type": "DatasetReference"
},
"source": {
"type": "SqlDWSource",
"sqlReaderQuery": "select max(date_modified) as MaxDate from dbo.destination_table"
}
}
},
{
"name": "Load Incremental Data",
"type": "Copy",
"typeProperties": {
"source": {
"type": "SqlSource",
"sqlReaderQuery": "select * from dbo.source_table where [date_modified] > \'@{activity('Look Up Max Date').output.firstRow.MaxDate}\'"
},
"sink": {
"type": "SqlDWSink",
"allowPolyBase": true,
"polyBaseSettings":
{
"rejectType": "percentage",
"rejectValue": 10.0,
"rejectSampleValue": 100,
"useTypeDefault": true
}
}
},
"dependsOn": [
{
"activity": "Lookup Max Date",
"dependencyConditions": [ "Succeeded" ]
}
],
"inputs": [
{
"referenceName": "source_table",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "staging_table",
"type": "DatasetReference"
}
]
},
{
"name": "SPROC Manage Upserts",
"description":"SPROC To Delete Duplicates and Manage Upserts",
"type": "SqlServerStoredProcedure",
"linkedServiceName": {
"referenceName": "destination_sqldw",
"type": "LinkedServiceReference"
},
"typeProperties": {
"storedProcedureName": "sp_upsert_staging"
},
"dependsOn": [
{
"activity": "Load Incremental Data",
"dependencyConditions": [ "Succeeded" ]
}
],
}
]
}
}