
How to use Python for data engineering in ADF
Consider a scenario where you need to migrate your existing data engineering workload to Azure. Let’s say while creating your new Azure Data Factory (ADF) pipeline, you want to reuse some of your existing business logic written in Python which is tested and approved. How can we reuse these data engineering routines written in Python and call them within ADF pipeline? Instead, in another scenario let’s say you have resources proficient in Python and you may want to write some data engineering logic in Python and use them in ADF pipeline.
Problem statement
To understand the problem statement in detail, let’s take a simple scenario: Let’s say we have an employee file containing two columns, Employee Name and their Date of joining on your Azure Storage. The business wants to know how many employees were joined in a year. This aggregated information should be available as a new file on your Data Lake store. Based on your current team competency, you want to write a small Python routine that read the Employee file from Azure Blob, does the required aggregation, and write back to your Data Lake area. Once this business logic is tested and approved, you want to utilize this piece of code in ADF pipeline.
Solution approach
We are designing a solution using the following steps:
- Make an Employee file available on Azure Blob
- Create an Azure Function using Python which will do the required job
- Call this Azure Function in ADF pipeline
Upload file to Azure Blob
Let’s create a similar file and upload it manually to the Azure Blob location. We’re using an example employee.csv.
If you need help on how to upload a file on Azure Blob location, you can refer to different options like Azure Portal, Storage Explorer or AZ Copy to upload a file.
Create an Azure function using Python
Azure Functions allows you to write a small piece of code which runs in a serverless manner. You don’t need to worry about application architecture anymore because the Azure cloud infrastructure provides you all the required hardware for running a piece of code at scale. Azure Functions support multiple languages like C#, Java, JavaScript, PowerShell as well as Python runtime. Because of its simplicity and power, Azure Functions becomes a critical solution component in the scenario of IoT, processing bulk data at scale or in Micro services environment. You can refer Microsoft Doc for more information on Azure Function.
Let’s create our Azure function using VSCode and deploy it to Azure. You can refer to the article for creating your Azure Function using Python. During the creation of an Azure function, you might have noticed that we can create different types of functions like Trigger-based or HTTPRequest based. We will create a function that is HTTPRequest type so that later we can use the Endpoint in ADF. Once you are ready with all configuration, start making changes to your Azure function. Let’s open _init_.py and start putting in our logic.
Let’s add the dependency we need for our code. Since we are dealing with Azure Blob, we need related libraries to work with it. We will also be required Pandas for our data wrangling purpose.
import logging
import azure.functions as func
from azure.storage.blob import BlobServiceClient
#pip install azure-storage-blob
import pandas as pd
#pip install pandas
import datetime
You may want to install them in your local environment, you can do this by running commented lines (pip install) in your Python command prompt.
Please open the requirement.txt in your solution, and add the required dependency here as well.
This will help to setup the required library when we deploy this function. The respective library will be added to the Python runtime.
Now, within the main function area, let’s create two variable which will hold our SAS key for Azure Blob.
def main(req: func.HttpRequest) -> func.HttpResponse:
logging.info('Python HTTP trigger function processed a request.')
INPUTSTORAGEACCOUNTURLSAS= "{SAS Key to Employee file}"
STORAGEACCOUNTSAS="{SAS Key to Output Location}"
For accessing Azure Blob, you will need a Key that will allow you to Read/Write in a specific location. There are multiple ways you can authenticate yourself. You can use Storage Account Keys Or you can use SAS keys which are for specific periods with specific permissions or you can use Azure Key Vault to create client id and secrete. To keep it simple, we will use SAS keys, but Microsoft recommends that you use Azure Key Vault for greater security.
Once you have setup the Keys, let’s start reading our Employee file and create DataFrame.
#Read CSV directly from Blob location
df = pd.read_csv(INPUTSTORAGEACCOUNTURLSAS,delimiter = ',')
There are multiple options on how you can read files from Azure Storage using Pandas. Please note that in this example we are using Python SDK v12.
Now, we can start manipulating data on a given DataFrame.
#Do the data manipulation
df['no_of_emp']= 1
df['joining_year']= pd.DatetimeIndex(df['joining_date']).year
df_result = df.groupby('joining_year').count()[['no_of_emp']]
output = df_result.to_csv(encoding = "utf-8")
Here, we are adding a couple of columns into the existing data set. We are extracting the Year component from the Joining Date column. This will allow us to run Group By aggregation by Year so that we can get the count of employees joined in those specific years. Then, we’ll convert our DataFrame back to CSV.
Once we have the desire output data, it is time to write back into our DataLake area, which is in our case another blob storage account.
#Connect to Output location of Azure Blob
blob_service_client = BlobServiceClient.from_connection_string(STORAGEACCOUNTSAS)
# Instantiate a new ContainerClient
container_client = blob_service_client.get_container_client("output")
# Instantiate a new BlobClient
blob_client = container_client.get_blob_client("employeecountbyyear.csv")
if blob_client.exists():
blob_client.delete_blob()
container_client.upload_blob(name="employeecountbyyear.csv", data=output)
Now, we are connecting to our output location, create a reference object of Blob Container “output”. With the help of a container object, we are trying to check if a specific blob is already present. If so, we will delete the existing blob and write a blob with the same name with the latest data.
We have finished writing our core logic for the Azure function. Here is the complete code for the same:
import logging
import azure.functions as func
from azure.storage.blob import BlobServiceClient
#pip install azure-storage-blob
import pandas as pd
#pip install pandas
import datetime
def main(req: func.HttpRequest) -> func.HttpResponse:
logging.info('Python HTTP trigger function processed a request.')
INPUTSTORAGEACCOUNTURLSAS= "{SAS Key to Employee file}"
STORAGEACCOUNTSAS="{SAS Key to Output Location}"
#Read CSV directly from Blob location
df = pd.read_csv(INPUTSTORAGEACCOUNTURLSAS,delimiter = ',')
#Do the data manipulation
df['no_of_emp']= 1
df['joining_year']= pd.DatetimeIndex(df['joining_date']).year
df_result = df.groupby('joining_year').count()[['no_of_emp']]
output = df_result.to_csv(encoding = "utf-8")
#Connect to Output location of Azure Blob
blob_service_client = BlobServiceClient.from_connection_string(STORAGEACCOUNTSAS)
# Instantiate a new ContainerClient
container_client = blob_service_client.get_container_client("output")
# Instantiate a new BlobClient
blob_client = container_client.get_blob_client("employeecountbyyear.csv")
if blob_client.exists():
blob_client.delete_blob()
container_client.upload_blob(name="employeecountbyyear.csv", data=output)
if blob_client.exists():
return func.HttpResponse(body=f"{{\"msg\":\"Aggregated data created successfully at {blob_client.url} via function.\"}}",status_code=200)
else:
return func.HttpResponse(body=f"{{\"msg\":\"Error while creating Blob at {blob_client.url} via function.\"}}",status_code=404)
Test and deploy your function
Now it’s time to test and deploy your function into a specific environment. You can test this function locally in VSCode or on the portal as well. Let’s deploy our function and test in the Azure Portal.
Please select the related Azure Function App you may have created.
Once deployed, go to Azure Portal > All resource > {Your Function App} > Functions.
Open the respective function from the right pane and click on Code + Test. Test your function. It should create an aggregated file at the output Azure Blob location.
You can add any query parameter in case it is required. Since we are not passing any parameter for our example, you can simply click on Run. Once execution completes, you should see our message along with the file creation path.
Now we have the Azure function up and running. You can test this using End Point URL as well.
Call your Azure function in ADF pipeline
Let’s create a Data Factory. Create a new pipeline, add Azure Function activity, and configure settings for creating a new Linked Service. Configure the Function Name you want to write and select the method you want to invoke. Please refer to this blog for detailed information.
Meanwhile, let’s make sure we clean up our output Azure Blob area to make sure our ADF generates the output at a specified location.
Let’s run our ADF pipeline and monitor the output location on Azure Blob storage. A new aggregated file should be created from our ADF run. You can also see an output message along with the output file path.
When you open this file, you can see aggregated data is available. This is how we can use Python in ADF.
Resources:
- An introduction to Azure Functions
- Quickstart: Create your first function in Azure using Visual Studio Code
- Azure Functions now supported as a step in Azure Data Factory pipelines