How to use Python for data engineering in ADF

How to use Python for data engineering in ADF

Consider a scenario where you need to migrate your existing data engineering workload to Azure. Let’s say while creating your new Azure Data Factory (ADF) pipeline, you want to reuse some of your existing business logic written in Python which is tested and approved. How can we reuse these data engineering routines written in Python and call them within ADF pipeline? Instead, in another scenario let’s say you have resources proficient in Python and you may want to write some data engineering logic in Python and use them in ADF pipeline.

Problem statement

To understand the problem statement in detail, let’s take a simple scenario: Let’s say we have an employee file containing two columns, Employee Name and their Date of joining on your Azure Storage. The business wants to know how many employees were joined in a year. This aggregated information should be available as a new file on your Data Lake store. Based on your current team competency, you want to write a small Python routine that read the Employee file from Azure Blob, does the required aggregation, and write back to your Data Lake area. Once this business logic is tested and approved, you want to utilize this piece of code in ADF pipeline.

Solution approach

We are designing a solution using the following steps:

  • Make an Employee file available on Azure Blob
  • Create an Azure Function using Python which will do the required job
  • Call this Azure Function in ADF pipeline

Upload file to Azure Blob

Let’s create a similar file and upload it manually to the Azure Blob location. We’re using an example employee.csv.

example employee csv screenshot

file uploaded to azure blob screenshot

If you need help on how to upload a file on Azure Blob location, you can refer to different options like Azure Portal, Storage Explorer or AZ Copy to upload a file.

Create an Azure function using Python

Azure Functions allows you to write a small piece of code which runs in a serverless manner. You don’t need to worry about application architecture anymore because the Azure cloud infrastructure provides you all the required hardware for running a piece of code at scale. Azure Functions support multiple languages like C#, Java, JavaScript, PowerShell as well as Python runtime. Because of its simplicity and power, Azure Functions becomes a critical solution component in the scenario of IoT, processing bulk data at scale or in Micro services environment. You can refer Microsoft Doc for more information on Azure Function.

Let’s create our Azure function using VSCode and deploy it to Azure. You can refer to the article for creating your Azure Function using Python. During the creation of an Azure function, you might have noticed that we can create different types of functions like Trigger-based or HTTPRequest based. We will create a function that is HTTPRequest type so that later we can use the Endpoint in ADF. Once you are ready with all configuration, start making changes to your Azure function. Let’s open _init_.py and start putting in our logic.

screenshot example Azure function adding logic to _init_.py

 

Let’s add the dependency we need for our code. Since we are dealing with Azure Blob, we need related libraries to work with it. We will also be required Pandas for our data wrangling purpose.

example code


import logging 
import azure.functions as func 
from azure.storage.blob import BlobServiceClient 
    #pip install azure-storage-blob 
import pandas as pd 
    #pip install pandas 
import datetime 

You may want to install them in your local environment, you can do this by running commented lines (pip install) in your Python command prompt.

Please open the requirement.txt in your solution, and add the required dependency here as well.

requirement txt to add dependency screenshot

This will help to setup the required library when we deploy this function. The respective library will be added to the Python runtime.

Now, within the main function area, let’s create two variable which will hold our SAS key for Azure Blob.

example code


def main(req: func.HttpRequest) -> func.HttpResponse: 
    logging.info('Python HTTP trigger function processed a request.') 
    INPUTSTORAGEACCOUNTURLSAS= "{SAS Key to Employee file}" 
    STORAGEACCOUNTSAS="{SAS Key to Output Location}" 

For accessing Azure Blob, you will need a Key that will allow you to Read/Write in a specific location. There are multiple ways you can authenticate yourself. You can use Storage Account Keys Or you can use SAS keys which are for specific periods with specific permissions or you can use Azure Key Vault to create client id and secrete. To keep it simple, we will use SAS keys, but Microsoft recommends that you use Azure Key Vault for greater security.

Once you have setup the Keys, let’s start reading our Employee file and create DataFrame.

example code

 
#Read CSV directly from Blob location
df = pd.read_csv(INPUTSTORAGEACCOUNTURLSAS,delimiter = ',')

There are multiple options on how you can read files from Azure Storage using Pandas. Please note that in this example we are using Python SDK v12.

Now, we can start manipulating data on a given DataFrame.

code example


#Do the data manipulation
df['no_of_emp']= 1
df['joining_year']= pd.DatetimeIndex(df['joining_date']).year
df_result = df.groupby('joining_year').count()[['no_of_emp']]
output = df_result.to_csv(encoding = "utf-8")

Here, we are adding a couple of columns into the existing data set. We are extracting the Year component from the Joining Date column. This will allow us to run Group By aggregation by Year so that we can get the count of employees joined in those specific years. Then, we’ll convert our DataFrame back to CSV.

Once we have the desire output data, it is time to write back into our DataLake area, which is in our case another blob storage account.

code example


#Connect to Output location of Azure Blob 
    blob_service_client = BlobServiceClient.from_connection_string(STORAGEACCOUNTSAS) 
    # Instantiate a new ContainerClient 
    container_client = blob_service_client.get_container_client("output") 
    # Instantiate a new BlobClient 
    blob_client = container_client.get_blob_client("employeecountbyyear.csv") 
    if blob_client.exists(): 
        blob_client.delete_blob() 

    container_client.upload_blob(name="employeecountbyyear.csv", data=output) 

Now, we are connecting to our output location, create a reference object of Blob Container “output”. With the help of a container object, we are trying to check if a specific blob is already present. If so, we will delete the existing blob and write a blob with the same name with the latest data.

We have finished writing our core logic for the Azure function. Here is the complete code for the same:

code example


import logging
import azure.functions as func
from azure.storage.blob import BlobServiceClient
    #pip install azure-storage-blob
import pandas as pd
    #pip install pandas
import datetime

def main(req: func.HttpRequest) -> func.HttpResponse:
    logging.info('Python HTTP trigger function processed a request.')
    
    INPUTSTORAGEACCOUNTURLSAS= "{SAS Key to Employee file}"
    STORAGEACCOUNTSAS="{SAS Key to Output Location}"

    #Read CSV directly from Blob location
    df = pd.read_csv(INPUTSTORAGEACCOUNTURLSAS,delimiter = ',')
    
    #Do the data manipulation
    df['no_of_emp']= 1
    df['joining_year']= pd.DatetimeIndex(df['joining_date']).year
    df_result = df.groupby('joining_year').count()[['no_of_emp']]
    output = df_result.to_csv(encoding = "utf-8")

    #Connect to Output location of Azure Blob
    blob_service_client = BlobServiceClient.from_connection_string(STORAGEACCOUNTSAS)
    # Instantiate a new ContainerClient
    container_client = blob_service_client.get_container_client("output")
    # Instantiate a new BlobClient
    blob_client = container_client.get_blob_client("employeecountbyyear.csv")
    if blob_client.exists():
        blob_client.delete_blob()

    container_client.upload_blob(name="employeecountbyyear.csv", data=output)

    if blob_client.exists():
        return func.HttpResponse(body=f"{{\"msg\":\"Aggregated data created successfully at {blob_client.url} via function.\"}}",status_code=200)    
    else:
        return func.HttpResponse(body=f"{{\"msg\":\"Error while creating Blob at {blob_client.url} via function.\"}}",status_code=404) 

Test and deploy your function

Now it’s time to test and deploy your function into a specific environment. You can test this function locally in VSCode or on the portal as well. Let’s deploy our function and test in the Azure Portal.

test and deploy function screenshot

Please select the related Azure Function App you may have created.

Once deployed, go to Azure Portal > All resource > {Your Function App} > Functions.

azure portal your functions screenshot

Open the respective function from the right pane and click on Code + Test. Test your function. It should create an aggregated file at the output Azure Blob location.

code and test function screenshot

You can add any query parameter in case it is required. Since we are not passing any parameter for our example, you can simply click on Run. Once execution completes, you should see our message along with the file creation path.

http response message screenshot

Now we have the Azure function up and running. You can test this using End Point URL as well.

Call your Azure function in ADF pipeline

Let’s create a Data Factory. Create a new pipeline, add Azure Function activity, and configure settings for creating a new Linked Service. Configure the Function Name you want to write and select the method you want to invoke. Please refer to this blog for detailed information.

create a data factory and configure function screenshot example

 

Meanwhile, let’s make sure we clean up our output Azure Blob area to make sure our ADF generates the output at a specified location.

clean up output azure blob area screenshot

Let’s run our ADF pipeline and monitor the output location on Azure Blob storage. A new aggregated file should be created from our ADF run. You can also see an output message along with the output file path.

output message screenshot

output file path screenshot

When you open this file, you can see aggregated data is available. This is how we can use Python in ADF.

employee count by year success python in ADF screenshot

Resources: