Azure Data Factory (Advanced)

Azure Data Factory, including a few techniques that are more advanced

Get Started. It's Free
or sign up with your email address
Azure Data Factory (Advanced) by Mind Map: Azure Data Factory (Advanced)

1. Continuous Integration and Delivery with Azure DevOps

1.1. Azure DevOps is hosted at dev.azure.com

1.2. Within your DevOps account, you can create multiple organisations and within each organisation you can create multiple projects

1.2.1. When working on a real world ADF project, you will typically be working within a particular DevOps organisation and project

1.2.2. I have an organisation named argento-it-training and a project named Argento Training

1.2.3. As soon as you create a new project, a new Git repo is created, named after the project, and you have configuration options for the repo via Project Settings

1.3. In ADF, in the Manage page, you can link (and unlink) the data factory to your DevOps Git repo

1.3.1. By default, the branch where your changes will be committed is the master branch (as by default this is set as the "collaboration branch")

1.3.1.1. This is probably the wrong choice in most projects, and it will better to create feature branches for your own work

1.3.2. The adf_publish branch is just another branch to Git but in the ADF development environment, it's reserved for publishing changes to your data factory in the designated Azure development environment, and you can see it's greyed out in ADF so you can't link your data factory work directly to this

1.3.2.1. The adf_publish branch contains the ARM templates that represent the latest published version of the data factory you've designated for your development environment

1.3.3. You should only link the data factory designated for your Dev environment to source control - other data factories that represent QA/UAT/PROD should all be unlinked as they will get their changes from Azure DevOps CI/CD release pipelines

1.3.4. It is important to understand the difference between saving and publishing in ADF, as saving changes only becomes an option after you have linked your ADF instance to Git

1.3.4.1. Save (or Save All) allows you to make any kind of change and commit it to your chosen Git branch

1.3.4.1.1. This allows you to commit development work as you go without needing to complete all changes necessary for publishing a successful change

1.3.4.1.2. It also means that changes that are saved but not yet published are ignored when you run your pipelines (either via debug or via a trigger)

1.3.4.1.3. For example, we can make an edit to a pipeline and save the change

1.4. If you try to publish changes in ADF but are on a non collaboration branch, you will be prevented from doing so

1.4.1. There is only ever one branch designated for collaboration and by default it is the master branch

1.4.2. If you click the link to "merge the changes to master", this will automatically launch an Azure DevOps master branch pull request in a new browser tab

1.4.2.1. Once merge request is completed successfully, you can switch back to ADF change to your collaboration branch (e.g. master) and then you can publish your changes

1.4.2.2. You can also process the pull request directly from DevOps; there is no need to rely on clicking this link from ADF

1.4.2.3. You can also easily reverse out your changes here if you want by swapping the direction of pull request to be: master->feature

1.5. When you publish your changes in ADF (from your freshly merged collaboration branch) this will trigger a refresh of the ARM templates that are then automatically committed into your adf_publish branch

1.6. Another approach you can use is to capture individual ADF objects (such as pipelines) as Json files and upload the file using Azure DevOps (via a branch commit)

1.6.1. First thing to note is that every object in ADF can be serialised to Json using the Code button { } in the top right area of screen

1.6.1.1. You can copy the Json and save to file, noting that to be valid in ADF, the file must be named exactly the same as the name property with a .json file extension

1.6.1.1.1. If the filename and name property are misaligned and you upload to ADF, the pipeline will be flagged as being in error and won't be usable

1.6.2. Example: upload a pipeline to ADF via DevOps

1.6.2.1. Note that before starting, there is a single existing pipeline in ADF for our feature branch

1.6.2.2. In DevOps, we upload the pipeline Json file to the feature branch

1.6.2.2.1. The new file is committed to the feature branch

1.6.2.3. Back in ADF, after a refresh, we now have the newly added pipeline

1.7. DevOps Release Pipelines

1.7.1. A typical release pipeline for ADF will take two inputs, have a continuous deployment trigger tied to updates made to the adf_publish branch and include at least one stage that invokes one or more Azure tasks that get the data factory published to another ADF that represents a downstream environment

1.7.1.1. Inputs are the ARM templates in the adf_publish branch and a parameters file

1.7.1.1.1. Parameters file is for controlling the variables that are particular to an environment (QA/TEST/PROD, etc.)

1.7.1.2. Typical tasks include:

1.7.1.2.1. Azure Key Vault

1.7.1.2.2. Azure PowerShell

1.7.1.2.3. Azure resource group deployment

1.7.1.2.4. Azure Functions

1.7.2. Setting up a release pipeline for data factory

1.7.2.1. Our starting point is two data factories provisioned in Azure, one for Dev and one for QA

1.7.2.1.1. Only Dev data factory should be configured for Git integration

1.7.2.2. In DevOps, we create a new release pipeline, selecting "Empty Job" rather than a template

1.7.2.3. Add an artifact to the release pipeline, setting the source to be an Azure repo, and pointing it to the adf_publish branch

1.7.2.3.1. Think of artifacts as the pipeline source

1.7.2.3.2. Remember that the adf_publish branch holds the complete set of ARM templates required to deploy the entire Dev data factory, updated whenever we publish from the collaboration branch

1.7.2.4. Add a trigger to the artifact, enabling continuous automated deployment whenever a commit is made to adf_publish branch

1.7.2.5. Add a new task to the stage

1.7.2.5.1. Choose ARM template deployment

1.7.2.6. Rename the stage from "Stage 1" to something that is more descriptive

1.7.2.6.1. e.g. QA Release

1.7.2.7. Set pre-deployment conditions for the stage

1.7.2.7.1. Note that the "After release" trigger means that stage deployment activity will automatically commence after the upstream artifacts trigger has fired

1.7.2.7.2. In the real world, you may have stages for more controlled environments like Pre Prod and Prod, and in these cases you will probably choose the deployment to be manual and/or to have pre approvals before the deployment proceeds

1.7.2.8. Rename the pipeline from "New Release Pipeline" to something that is more descriptive

1.7.2.8.1. e.g. Argento_DataFactory_Release

1.7.3. Create a manual release

1.7.3.1. A release can be created in two ways:

1.7.3.1.1. via the continuous deployment trigger, if enabled on the artifacts section of the release pipeline

1.7.3.1.2. via a manual trigger

1.7.3.2. Ensure you've selected your release pipeline and click Create Release button

1.7.3.2.1. Optionally add description for release and click Create button

1.7.3.3. All being well, the release will succeed

1.7.3.3.1. First time I tried, it failed due to me misnaming the target data factory name parameter value

1.7.3.4. Check the target data factory to verify the changes are successfully deployed

1.7.4. Automated release

1.7.4.1. According to the way we configured the continuous deployment trigger and pre deployment conditions of our release pipeline, we can expect automated release from Dev to QA data factory whenever we publish a change in Dev

1.7.4.2. Imagine we have a change in our Dev data factory that is currently committed only to our feature branch

1.7.4.2.1. In DevOps, we create a pull request from our feature branch

1.7.4.2.2. In ADF, we now see the change (a new pipeline in this case) merged to the master branch, and we can click the Publish button

1.7.4.2.3. In DevOps, we see that a new release from our release pipeline is automatically created and deployment in progress

1.7.4.2.4. In ADF, we now see the change in our QA data factory, automatically deployed there by our DevOps release branch

1.7.5. You can also re-deploy old releases, which is useful when a later release deployed a change that you want to undo

1.7.5.1. As our configured action for the ARM Template deployment task in the DevOps release pipeline is "Create or update" this will not result in deleting any unwanted data factory components

1.7.5.1.1. To delete an unwanted component in our downstream data factory, this can be done manually using eh ADF web interface or you could have a DevOps release pipeline with the ARM Template deployment task configured with the "Delete" action

1.7.6. Create individual parameter files for each data factory environment

1.7.6.1. Rather than configure the ARM Template deployment task in each stage (where stage typically has a 1-to-1 relationship with environment) to use the development parameter file + override values, it is better to set up separate parameter files for each environment and commit these directly into the adf_publish branch

1.7.6.2. From DevOps, you can download a copy of the development ADF parameter file

1.7.6.2.1. Once downloaded, rename file to put an environment suffix on

1.7.6.2.2. Edit the parameter file in Notepad++ and then upload/commit new file directly into adf_publish branch in same location as the development version

1.7.6.2.3. Once you have dedicated parameter files available in adf_publish, you can choose these in the ARM Template deployment task and remove parameter overrides

2. ARM Templates

2.1. ARM = Azure Resource Manager

2.2. Provides means of developing infrastructure as code

2.2.1. Templates all stored in JSON format

2.3. Although Azure DevOps offers a very good way to deploy ARM templates, there are other methods too

2.3.1. Importing ARM templates directly via Azure Portal will trigger a deployment

2.3.2. ARM templates can be deployed via PowerShell

2.4. Deploy ARM templates via Export->Import method

2.4.1. In ADF, click ARM template menu and choose the export option

2.4.2. Unzip the ARM template

2.4.2.1. You get the main ARM template that includes everything you need to deploy a copy of the data factory somewhere else and a paired parameter file that holds values particular to your data factory that you want to change when deploying to different targets

2.4.2.2. Every ARM template has a common JSON structure, consisting of a sequence of embedded objects, only some of which are mandatory - a typical ARM template for a data factory deployment consists of:

2.4.2.2.1. $schema

2.4.2.2.2. contentVersion

2.4.2.2.3. parameters

2.4.2.2.4. variables

2.4.2.2.5. resources

2.4.3. In ADF, deploy a new empty data factory and then click ARM template menu and choose the import option

2.4.3.1. This kicks off a custom deployment

2.4.3.2. Click hotlink "Build your own template in the editor"

2.4.3.2.1. Click Load file, point to your ARM template file and then Save

2.5. You can create ARM templates from scratch as well (i.e. not based on Azure development + export as ARM template)

2.5.1. A great starting point for this is Azure Quickstart Templates

2.5.1.1. If you search "data factory", you'll see a bunch of ARM templates for various data factory deployment scenarios

2.5.2. ARM templates can be developed in VS Code or Visual Studio

3. Pipeline templates

3.1. Rather than creating ADF pipelines from scratch, they can be created from templates

3.2. In order to create your own templates, you must have your data factory Git integrated

3.3. Creating a pipeline template

3.3.1. Every pipeline has the option to save as a template

3.3.1.1. You can set the template name and description

3.3.1.1.1. Note the Git location (non editable), which will commit the template to the linked repo in the templates directory

3.3.1.1.2. You can also check the various services used by the ADF pipeline template, add tags and a hyperlink to custom documentation if you have that set up

3.3.1.1.3. As an alternative to saving the template in your Git repo, you also have the option to export the template to your local drive

3.4. Create new pipeline from template

3.4.1. When creating a new pipeline, choose Pipeline from template

3.4.1.1. You are presented with the template gallery and can choose one of your own templates or someone else's templates

3.4.1.1.1. Note that there are a bunch of built in templates supplied by Microsoft, which will be interesting to explore

3.4.1.1.2. Complete user inputs for template and click the Use this template button

4. Integrating ADF with Azure Key Vault

4.1. You must not store sensitive information within the components of your data factory, such as API keys, passwords, certificates, etc. - all of these are secrets that should be stored in Azure Key Vault and securely retrieved by ADF at runtime

4.2. Secrets are added to Key Vault but in order for any person or application to access those secrets you must create an appropriate access policy

4.3. Recommended best practice is to have different key vaults for each environment (i.e. one for Dev, one for QA, one for PRod, etc.)

4.4. Two options for using key vault from data factory:

4.4.1. ADF linked service to key vault

4.4.2. DevOps release pipeline that pulls from key vault

4.5. Example: ADF with Key Vault to manage secrets for secure connections to Azure storage account + Azure SQL Database

4.5.1. Go to Azure Storage Account | Access Keys and copy the access key

4.5.2. Add new secret to your key vault

4.5.2.1. Name the secret (e.g. DataLakeAccessKey) and paste the storage account access key into the secret value before clicking Create

4.5.2.1.1. My original choice of secret name, DevDataLakeConnStr, was poor as the idea is not to name secrets with environment references (i.e. "Dev" in this case) but to maintain secrets with identical names across separate key vaults for each environment, and secret values differing per environment

4.5.3. Add a second secret to hold your SQL Database admin user password

4.5.3.1. In the real world, you'd probably go for a less privileged SQL account than the admin one set up at time of provisioning the resource

4.5.4. Go to ADF and add a new linked service for Azure Key Vault

4.5.4.1. Configure linked service to connect to the key vault instance that holds your secret storage account connection string

4.5.4.1.1. Note that the dialog warns and prompts you to ensure an access policy is set up on the key vault based on the managed identity of your data factory instance

4.5.5. In Key Vault, add a new access policy

4.5.5.1. Configure the data factory managed identity for secret management and click Add to create new access policy

4.5.5.1.1. Note that you can retrieve the managed identity ID from ADF Properties

4.5.5.1.2. Don't forget to click Save to confirm the new access policy

4.5.6. In ADF, configure your linked service for the Azure Storage Account to use key vault instead of connection string

4.5.6.1. If the key vault secret is correctly set up and referenced and the key vault policy is in place, you should see a successful connection test

4.5.6.2. If you did not set up the key vault access policy for your data factory instance, you will get a connection error:

4.5.6.2.1. Caller was not found on any access policy in this key vault

4.5.6.3. If you used the wrong value for the key vault secret that is supposed to hold your storage account access key, you will get a connection error:

4.5.6.3.1. The specified account key is invalid. Check your ADF configuration.

4.5.7. In ADF, configure your linked service for the Azure SQL Database to use key vault instead of password

4.5.7.1. If your Azure SQL Server firewall does not permit access from Azure services, you will get a connection error

4.5.7.1.1. Cannot connect to SQL Database... Client with IP address... is not allowed to access the server

4.5.8. In ADF, you can set up a quick and simple connectivity test for the linked services that use key vault integration to retrieve secrets

4.5.8.1. Create a new pipeline with two Get Metadata tasks, one pointed to a flat file dataset hosted in your Azure storage account and the other pointed to a SQL Database table

4.5.8.1.1. After publishing and running debug on the pipeline, you should get a successful result

4.6. As an aside, you can retrieve your secrets manually from Azure Key Vault using the portal, as long as the key vault access policy allows it for your user account

4.6.1. Just navigate to the key vault instance and down to the particular secret version, and there you will see an option to show secret value and/or copy it

5. Leveraging ADF Managed Identity to simplify access to other Azure resources

5.1. Managed Identity key features and benefits:

5.1.1. Data Factory automatically registers a managed identity in Azure Active Directory at the time it is provisioned

5.1.2. Allows you to keep passwords, credentials and/or certificates out of code

5.1.2.1. Good security practice!

5.1.3. No additional cost for managed identities

5.1.4. Requires object to be held in same AAD tenant

5.1.4.1. So you cannot use managed identity held in one AAD to grant access to resources held in a separate Azure tenant (i.e. different Azure account)

5.2. Example: grant data factory instance access to Azure storage account using RBAC and the data factory managed identity

5.2.1. Go to storage account Access Control (IAM) and add a new role to the specific data factory

5.2.1.1. The role will typically be one of the built in ones that begin "Storage Blob"

5.2.1.2. When you type in part of the data factory name in the Select field, you will see listed the name of the data factory you are interested in and you click to move it to Selected members

5.2.1.3. Click Save to complete

5.2.2. Note that the managed identity is distinguishable from regular service principals in the list of role assignments for the storage account

5.2.2.1. It bears the data factory icon rather than the generic icon for registered apps (service principals)

5.2.2.2. The type is given as "Data Factory" instead of "App"

5.2.3. In ADF, configure the storage account linked service to use managed identity

5.2.3.1. Authentication method = Managed Identity

5.2.3.2. Account selection method = From Azure subscription

5.2.4. You can make a simple test again using a pipeline with a Get Metadata task that's bound to the linked service for the storage account

5.2.5. This example illustrates that Managed Identity offers a simpler but equally secure method for application authentication than Azure Key Vault

5.3. As an aside, managed identities are represented as an Enterprise Application

5.3.1. Enterprise applications are contained by Azure Active Directory, and you can browse many of these applications using AAD, but it seems like managed identities are hidden

6. Copy Data activity

6.1. SQL Database auto table creation

6.1.1. Copy activity allows you to have ADF auto-create a table in SQL Database sink

6.1.1.1. This means you can have delimited file data load into a SQL staging table without needing to predefine schema on either the source or sink of the Copy data activity

6.1.1.2. When the pipeline runs again and the SQL table already exists, it will simply append data to the table (assuming you don't add a separate stored procedure activity to truncate it)

6.1.1.3. It won't cope with schema drift though

6.1.1.3.1. If the source file gains a schema change, this feature will not add, modify or drop columns in the SQL Database table, so you'll get an error

6.1.1.3.2. However, if the load pattern was source->stage->load repeated in that sequence, and the auto-created table was for staging, you could add a stored procedure activity to drop the table if exists before the copy activity, and that would work as an automated schema drift solution of sorts

6.1.2. Example: Load files on wildcard pattern match from ADLS Gen2 to SQL Database

6.1.2.1. Create a new dataset for the source based on Azure Data Lake Storage Gen2 and Delimited File

6.1.2.1.1. When setting the properties of the dataset, specify only the file path, not the actual file

6.1.2.2. Create a new pipeline with a copy activity

6.1.2.2.1. When setting the Source properties, pick the dataset previously created and configure a wildcard pattern

6.1.2.2.2. When creating a new dataset for the sink, you can edit the table name manually and defer the import schema to allow ADF to create it on the fly

6.1.2.2.3. When setting the Sink properties, select Table option = Auto create table

6.1.2.3. Ensure the source file is uploaded to ADLS (see attached sample of file)

6.1.2.4. Debug the pipeline

6.1.2.4.1. If successful, you should see data loaded into newly created SQL Database table

7. Copy + Delete activities to archive or move files

7.1. This technique uses two datasets, one for the original file and another for the target archive location

7.1.1. One issue with this approach is that the number of datasets can get out of hand quite quickly

7.1.2. Leveraging parameters and dynamic expressions is a more advanced technique that can keep your data factory components more compact (i.e. less of them) and more flexible

7.2. Example: pipeline to archive "employee" source files after they've been processed (by another pipeline)

7.2.1. Create a new pipeline with the Copy and Delete activities connected in sequence

7.2.2. Configure Copy activity Source properties

7.2.2.1. Dataset from earlier pipeline (i.e. the one that loads the file into a database) is re-used in this example

7.2.2.2. As the dataset references only the source file directory and not the filename, we must specify a wildcard path so the copy activity knows what files need to be copied

7.2.3. Create dataset for the file archive target

7.2.3.1. We can specify a new container for the archive file that doesn't yet exist, as the Copy activity will create it for us if required

7.2.3.2. I forgot to change the default delimiter character, which had the interesting effect of transforming the file on archive from pipe delimited to comma delimited

7.2.4. Configure Copy activity Sink properties

7.2.4.1. We only need to point it to the dataset - no need for further mapping or setting config

7.2.5. Configure Delete activity Source properties

7.2.5.1. We only need to point this to the source dataset of the Copy activity

7.2.5.2. As the source dataset references simply a directory, we should be aware that it will delete all files from this directory

7.2.5.2.1. Because I used a specific wildcard pattern on the Copy activity Source properties, I verified that this results in deleting a completely unrelated file from test-files (e.g. test3.json)

7.2.6. Configure Delete activity Logging settings

7.2.6.1. We specify a storage account and a directory for the logging

7.2.6.1.1. The Delete activity will create the directory for us if it doesn't already exist

7.2.6.2. The logs are stored in subfolders named with the Delete activity run ID (a unique GUID)

7.2.6.2.1. The log file itself is a simple csv that can be opened in Excel - it tells us what file(s) got deleted and whether any errors occurred

7.2.7. Debug the archive pipeline to verify success

7.3. An alternative technique for archiving files is to use Azure Logic Apps

8. Dynamic content

8.1. Key to more advanced usage of ADF

8.1.1. Parameters are fundamental

8.1.1.1. You can parameterise linked services, datasets, pipelines and data flows

8.1.1.2. Parameter values are passed in at run time (i.e. when pipeline is triggered)

8.1.2. Expressions use parameters and variables to create flexible and dynamic configuration

8.1.2.1. Within the expression editor, you have access to functions, parameters, variables (system and user), arrays (list objects) and metadata (e.g. capturing error messages, rows read, etc.)

8.1.3. Variables are different to parameters but also fundamental for enabling dynamic configuration

8.1.3.1. Unlike parameters that are set externally and passed in at run time and do not change throughout that run time, variables exist within the context of the run time and can change throughout it

8.2. Dataset parameterisation

8.2.1. Wherever you see the "Add dynamic content" prompt in a dataset property, this is a candidate for setting via a parameter

8.2.2. We can add a parameter to hold the file name of a file-based dataset for example

8.2.2.1. The default value for the parameter can be set at run time, which will typically take its value from a separate pipeline parameter

8.2.3. Once we have dataset parameters, we can use them to set dataset properties where dynamic content is supported

8.2.3.1. Once set, the dataset property holds a value that represents the dataset parameter, based on the notation of the ADF expression language

8.3. Dynamic content helps us to implement design patterns in a flexible, re-usable way

8.3.1. Example design pattern: If File Exists

8.3.1.1. Based on the Until activity, which runs a sequence of sub activities (Get Metadata, Set variable and Wait) until a variable (set by the sub activity) changes from false to true

8.3.1.2. 1. Create new pipeline and add Boolean variable for tracking file existence, defaulting it to false

8.3.1.2.1. In this case we named the variable fileexists

8.3.1.3. 2. Add Until activity and under Settings, set the Expression to the fileexists pipeline variable

8.3.1.3.1. The expression is set for you - note that you could type it in directly using the expression language syntax but the dynamic content editor makes building expressions easier

8.3.1.3.2. By design, the Until activity will run its activities (not yet defined) in a loop until the expression evaluates to true

8.3.1.4. 3. Edit the Until activities

8.3.1.4.1. 3.1 Add Get Metadata activity and configure the Dataset properties

8.3.1.4.2. 3.2 Add Set variable activity, connect the output of Get Metadata to the Set variable input and select the pipeline variable (fileexists)

8.3.1.4.3. 3.3 Add Wait activity, connect the output of Set variable to the Wait input and set the Wait time in seconds

8.3.1.5. Debug the pipeline

8.3.1.5.1. Initially you should see the pipeline iterating through the child activities of the Until activity, and you can check the output of the Set variable task for fileexists

8.3.1.5.2. Part way into the debug session, we upload a file with a name that the pipeline is configured to wait for

8.3.1.5.3. The pipeline now completes successfully and when we check the output of the final Set variable activity, we see that fileexists was set to true

8.3.1.5.4. The Get Metadata activity does not support filename wildcards

8.3.2. Example: count Until activity loop iterations and exit loop when counter is X

8.3.2.1. This technique is based on variables but requires a hack due to two limitations:

8.3.2.1.1. Integer variable types are not supported: only string, boolean and array

8.3.2.1.2. Expression language does not support variable self referencing, so it's not possible to have a variable value set via direct reference to itself (i.e. x += 1 pattern is not allowed)

8.3.2.1.3. To work around these limitations, we need to use two string variables, each set using two separate Set variable activities, and then we must leverage type conversion functions in the expressions

8.3.2.2. Add two variables to the pipeline containing the Until activity and set the default values to 0

8.3.2.2.1. In this example, I set the names to retries and retriesTemp, the types to String (because the only other options are Boolean and Array) and a default value of 0 for both

8.3.2.2.2. Having two variables instead of one is due to the limitation of the ADF expression language not allowing retries to be updated via a self reference (i.e. retries = retries + 1)

8.3.2.2.3. Modify the Until expression to use the or() function and add the expression for retries, which includes use of the greaterOrEquals() function and the int() function

8.4. Parent Child design pattern

8.4.1. The idea here is to have a parent pipeline that looks up metadata from a SQL Database source in the form of a list of target tables to be loaded and uses a ForEach activity to iterate the list, calling a common dynamic pipeline to handle the processing for each table

8.4.1.1. Parameters form the basis of this design

8.4.1.2. We want a parent pipeline that uses a ForEach loop to iterate over a table name list retrieved from a SQL Database

8.4.1.2.1. The child pipeline has parameters that will take different values on each iteration of parent pipeline ForEach loop activity

8.4.1.2.2. The dataset has parameters that will take their values from the child pipeline

8.4.2. This design involves a number SQL Database metadata objects, which are generic enough to re-use in many scenarios

8.4.2.1. Tables

8.4.2.1.1. dbo.ADFErrorLog

8.4.2.1.2. dbo.PipelineExecutionHistory

8.4.2.1.3. dbo.TaskQueue

8.4.2.2. Stored procs

8.4.2.2.1. dbo.usp_ErrorLog

8.4.2.2.2. dbo.usp_GetTablesToProcess

8.4.2.2.3. dbo.usp_PipelineExecutionHistory

8.4.2.2.4. dbo.UpdateTaskQueue

8.4.3. Example: load multiple files dynamically from ADLS Gen2 to SQL Database tables

8.4.3.1. To keep things simple, I've prepared two very basic test files, both flat text files with a pipe delimiter

8.4.3.1.1. argento_employees.txt

8.4.3.1.2. orders_20201121.txt

8.4.3.1.3. The files are uploaded to ADLS Gen2 using Azure Storage Explorer

8.4.3.2. Create child pipeline, add Copy activity and add 3 parameters to the child pipeline

8.4.3.2.1. SourceFilenamePattern

8.4.3.2.2. Schema

8.4.3.2.3. TableName

8.4.3.3. Create dynamic dataset for the source file

8.4.3.3.1. Set only a file path, not a file name, check the first row as header option and set Import schema = None

8.4.3.3.2. Add string parameter to the dataset, SourceFilenamePattern

8.4.3.3.3. Under Connection, set the column delimiter to pipe and set the file name property via dynamic expression that pulls in the SourceFilenamePattern parameter value (i.e. the dataset parameter)

8.4.3.4. Create dynamic dataset for the target table

8.4.3.4.1. When initially creating the new dataset, don't specify a table and ensure Import schema = None

8.4.3.4.2. Add two string parameters to the dataset: Schema and TableName

8.4.3.4.3. Under Connection, check the Edit option for the table name and set schema and table name using dynamic expressions that reference the two dataset parameters: Schema and TableName

8.4.3.5. Create parent pipeline

8.4.3.5.1. Add a Lookup activity and set it to return its content via the stored procedure, dbo.usp_GetTablesToProcess

8.4.3.5.2. Add a ForEach activity and set the Items via dynamic expression, selecting the lookup activity output and appending ".Value"

8.4.3.5.3. Add an Execute Pipeline child activity to the ForEach activity, point it to the child pipeline and configure the parameters using dynamic expressions

8.4.3.6. Complete child pipeline configuration

8.4.3.6.1. Set the Source dataset to point to the dynamic source file dataset and set the SourceFilenamePattern value via dynamic expression that references the child pipeline parameter of same name

8.4.3.6.2. Set the Sink dataset to point to the dynamic target table dataset and set the Schema and TableName values via dynamic expressions that reference the child pipeline parameters of same name

8.4.3.7. Set up dbo.TaskQueue (i.e. the metadata for driving our parent-child pipeline)

8.4.3.7.1. But take care over the SourceFilenamePattern values!

8.4.3.8. Debug the parent pipeline

8.4.3.8.1. If all is well, you should see the parent pipeline successfully invoke the child pipeline twice

8.4.3.8.2. We can verify that the source file content made it into the target tables in SQL Database

8.5. Source database to data lake with dynamic directory and file name

8.5.1. Imagine using data factory to extract data from a source system database and land these into a data lake where the directory structure is "raw/argento/yyyy/mm/dd"

8.5.2. We start by making our dynamic dataset for the ADLS Gen2 test delimited file even more dynamic

8.5.2.1. 1. Ensure the dynamic dataset has 3 parameters: ContainerName, DirectoryName and FileName

8.5.2.2. 2. Parameterise the container, directory and filename in the Connection file path settings of the dataset

8.5.3. Create a pipeline and add a Copy activity

8.5.3.1. 1. Add a pair of parameters to the pipeline for Schema and TableName

8.5.3.1.1. In this case we will demonstrate the pipeline via a single execution that relies on the parameter default values, which is just of simplicity because the focus of this technique is on the dynamic sink, but we can easily imagine this pipeline being invoked repeatedly as the child in the parent child design pattern

8.5.3.2. 2. Use the dynamic SQL Database dataset for the source

8.5.3.2.1. We set the Schema and TableName using the pipeline parameters

8.5.3.3. 3. Use the dynamic ADLS Gen2 delimited text file for the sink and set the 3 parameters using dynamic expressions

8.5.3.3.1. For FileName we use a combination of 3 functions: concat(), utcnow() and formatDate(), plus the TableName pipeline parameter

8.5.3.3.2. For ContainerName I chose to set this statically to "raw"

8.5.3.3.3. For DirectoryName we use a combination of 3 functions: concat(), utcnow() and formatDate()

8.5.4. Debug pipeline

8.5.4.1. If all is well, you should see a successful result

8.5.4.2. Using Storage Explorer we can confirm that the requested date-based directory path has been created and the file is named after the table with a date suffix

8.6. Restartability

8.6.1. A key idea for restartability is the concept of the task queue, which can be implemented as a table in SQL Database

8.6.1.1. To demonstrate this concept, we'll build on the parent child concept by adding a pair of stored procedure activities to follow the child pipeline copy acitivity

8.6.1.1.1. As a first step, we'll tweak the dbo.usp_UpdateTaskQueue stored procedure to include @IsProcessed as an input parameter and no longer use ProcessedDate Is Null as a filter

8.6.1.1.2. We will also add an intentionally dodgy item into the dbo.TaskQueue table

8.6.1.1.3. Add a stored procedure task to child pipeline and connect to success output from the copy activity, then set to call the dbo.usp_UpdateTaskQueue proc, import the parameters and set the values

8.6.1.1.4. Add a stored procedure task to child pipeline and connect to failure output from the copy activity, then set to call the dbo.usp_UpdateTaskQueue proc, import the parameters and set the values

8.6.1.1.5. Debug parent pipeline

8.7. Custom logging

8.7.1. This solution is based on a SQL Database table for a custom log table (dbo.PipelineExecutionHistory) and another for logging errors (dbo.ADFErrorLog)

8.7.1.1. To demonstrate this concept, we'll build on the same parent child pipelines we used for restartability, by adding a couple of stored procedures to the child pipeline for updating our custom logging tables, leveraging metadata available from a combination of system variables and the outputs from the Copy activity

8.7.1.1.1. As a first step we'll reset the dbo.TaskQueue items so that IsProcessed = 0 for all of them

8.7.1.1.2. Add a stored procedure task to child pipeline and connect to success output from the copy activity, then set to call the dbo.usp_PipelineExecutionHistory proc, import the parameters and set the values

8.7.1.1.3. Add a stored procedure task to child pipeline and connect to failure output from the copy activity, then set to call the dbo.usp_ErrorLog proc, import the parameters and set the values

8.7.1.1.4. Debug parent pipeline

9. Databricks integration

9.1. ADF data flows are built on top of Azure Databricks

9.2. ADF data flows provide an easy-to-use GUI for developing ETL pipelines, whilst Databricks provides notebooks that you can use to develop ETL pipelines using code

9.3. You can call Azure Databricks notebooks from ADF pipelines to perform your ETL (i.e. the transformations)

9.4. The advantage of using Databricks instead of ADF data flows is sheer flexibility due to the code-based approach

9.4.1. Any time you can't get the job done using ADF data flows, you can almost certainly get it done using Databricks

9.5. Example: Databricks notebook to load multiple csv files (with same schema) into Spark dataframe and then save dataframe into JSON format, targeting directory with name specified via Databricks input parameter, and demonstrate calling this notebook from ADF pipeline, setting the input parameter in the process

9.5.1. In Databricks, we create a notebook

9.5.1.1. Cell 1 uses dbutils.widgets to create a string input parameter named Table and then set a local variable named table_name to that parameter value

9.5.1.2. Cell 2 sets up some Spark config so that the notebook can connect to ADLS Gen2 storage account using the access key (blacked out in screenshot)

9.5.1.2.1. More secure practice is to use secrets for the access key (i.e. Azure Key Vault or Databricks secret scope)

9.5.1.2.2. We also test out our connection to ADLS Gen2 by issuing a dbutils.fs.ls command

9.5.1.3. Cell 3 calls spark.read with some options to load the csv files in from the data lake, storing it in a variable, df, and then displaying the resultant dataframe

9.5.1.4. Cell 4 invokes the write.json method of the dataframe, appending the table_name variable to the output path

9.5.2. Test run the notebook in Databricks

9.5.2.1. Note that we set the Table parameter to "aaaa" for this test run

9.5.3. Check result in ADLS Gen2 using Storage Explorer

9.5.3.1. In this example, it ingested 21 csv files into a Spark dataframe and exported that dataframe as 4 JSON files, grouped logically in a Table folder labelled "aaaa"

9.5.4. Create a new Databricks access token

9.5.4.1. Copy the access token when it displays - if you close the window, you'll have to re-generate the access token

9.5.5. Create a new ADF pipeline and add a parameter for Table

9.5.5.1. In this example, we set the default value to "InternetSales_JSON"

9.5.6. Add a new pipeline, and drag on a Databricks Notebook activity

9.5.6.1. Create a new Databricks linked service and paste in the access key

9.5.6.1.1. I chose "Existing interactive cluster" here for speed but normally you would choose "New job cluster"

9.5.6.1.2. Pasting in the access key to the linked service config is not best practice - better to use Azure Key Vault

9.5.6.2. Under Settings, browse to your Databricks notebook and then add a parameter that you name Table and map via the pipeline parameter of same name

9.5.6.3. Debug the pipeline and verify that it completes successfully

9.5.6.3.1. Using Storage Explorer, verify that the "InternetSales_JSON" input parameter was used to capture the JSON files produced by the Databricks notebook

10. Wrangling data flows

10.1. Provides Power Query UI for building out your transformations, based on the M language

10.1.1. Same experience as you get in Power BI

10.2. Example: use a wrangling data flow to transform a csv file in ADLS Gen2, sinking the transformed file back into data lake

10.2.1. Upload raw csv file to data lake using Storage Explorer

10.2.1.1. In this example the file is called BMI.csv

10.2.2. In ADF, create a new data flow and choose Wrangling Data Flow for the type

10.2.2.1. Set source properties, pointing to the raw csv file in the data lake

10.2.2.1.1. Although screenshot shows data lake connection, I later changed to a Blob storage account due to an issue with my ADLS Gen2 linked service using managed identity authentication

10.2.2.2. Set sink properties, pointing to the target folder "transformed" in the data lake, but with no schema import (because the file does not exist yet)

10.2.2.2.1. Screenshot shows a Blob storage account, not data lake, which is due to my data lake linked service using managed identity authentication (unsupported at time of writing)

10.2.2.3. Click OK to create new wrangling data flow, and you will see Power Query editor (familiar UI for Power BI developers)

10.2.2.3.1. Gotcha! If your data lake linked service uses Managed Identity for authentication, you will get an error when creating the wrangling data flow, as at time of writing (30-Nov-2020) it only supports access key or service principal

10.2.2.4. Make some transformations and save data flow

10.2.2.4.1. In this example, I did the following:

10.2.2.5. Create a pipeline and add Data Flow activity

10.2.2.6. Debug pipeline

10.2.2.6.1. Gotcha! Many unsupported transformations, despite being available in the editor

10.2.2.6.2. Verify that pipeline debug run completes successfully

10.2.2.6.3. Verify transformations in Azure storage account

11. Mapping data flows

11.1. One of the nice features of ADF mapping data flows is the ability to automatically handle schema drift and automatic inference of data types

11.2. Key things to remember when you want to leverage the features for allowing schema drift

11.2.1. Datasets for data flow source and sink should not have the schema imported

11.3. Example: load a csv file into a SQL Database table using options to allow schema drift and infer drifted column types

11.3.1. See attached how the source file looks; a csv file named Sales.csv

11.3.1.1. I uploaded it into my data lake to a container named data-flow-no-schema-files and a folder named "in"

11.3.2. For the source dataset, I will re-use an existing dynamic one for csv files, which is identified by parameters

11.3.2.1. The schema for this dataset is NOT imported

11.3.3. Create a new data flow and set the source settings

11.3.3.1. We point to the dynamic, schema-less dataset and enable options to allow schema drift and infer drifted column types

11.3.4. Add the sink and enable schema drift

11.3.4.1. Initially we will skip the transformation step, as we'll be using debug to see the outcome of auto-mapping

11.3.4.2. Under Settings, we'll set Table action to Recreate table

11.3.4.2.1. Under Mapping, we enable auto-mapping

11.3.5. Enable data flow debug and then go to the Data preview for the source, where we are prompted to enter values for the parameters relating to the source and sink

11.3.5.1. When we refresh the preview, we can see the data for our source file

11.3.5.1.1. Note how Customer column has been inferred as an integer type and the SoldDate inferred as a datetime type

11.3.5.2. Gotcha! My attempt to use an expression, toLower(''), for the Directory part of the source file path failed because expressions are not supported in this context and it treats the expression as a string literal in the path

11.3.5.2.1. I overcame this problem by deleting and recreating the data flow, and changing the default values for the parameters in my dynamic dataset (see attached)

11.3.6. Add a Derived Column transformation

11.3.6.1. In this example, we set the State column to transform always to upper case

11.3.6.1.1. The challenge with schema-less is that the column name is not directly addressable, but as you can see from attached screenshot, we can overcome this by using the byName() function

11.3.6.2. Data preview shows that our State column is transformed to uppercase

11.3.7. Create a new pipeline and add parameters for the data flow

11.3.7.1. 6 parameters added, 4 for the data flow source and 2 for the sink

11.3.7.1.1. ContainerName, DirectoryName, FileName, ColumnDelimiter, Schema, TableName

11.3.7.2. Add a data flow and under Settings specify that the values for the data flow source and sink shall come from the corresponding pipeline parameter

11.3.7.2.1. For example, the data flow source file name is set via the following expression:

11.3.8. Debug the pipeline

11.3.8.1. Enter the pipeline parameter values to test your pipeline against

11.3.8.2. Verify that pipeline debug completes successfully

11.3.8.3. Using SSMS, verify that dbo.RetailSales table is successfully created and loaded

11.4. Rule-based mapping

11.4.1. With rule-based mapping, we leverage the flexibility of data flows to automatically handle change, including the following changes that can occur in both source and sink

11.4.1.1. Source changes

11.4.1.1.1. Column names changing

11.4.1.1.2. Data types changing

11.4.1.1.3. New columns being added

11.4.1.1.4. Columns being removed

11.4.1.2. Sink changes

11.4.1.2.1. Columns added

11.4.1.2.2. Columns removed

11.4.2. Rule-based mappings are a specific feature of the Select transformation, and involve turning off the auto-mapping in that transformation and replacing it with expression-based mappings

11.4.3. Example: we have two sales files from two different vendors, both in csv format but with different names for some of the columns and one of the files with an extra column not present in the other

11.4.3.1. Here's what the two source files look like:

11.4.3.1.1. Sales.csv

11.4.3.1.2. ConferenceSales.csv

11.4.3.2. This builds on the previous example

11.4.3.2.1. Add a Select transformation after the source, and before the Derived column transformation

12. Log Analytics integration

12.1. Azure Log Analytics provides a means of centralising all of your Azure resource logging in one place and monitoring system health via interactive dashboard functionality

12.1.1. It is most suited to enterprise environments where you will likely have multiple different data factories, and keeping track of all of them is a challenge

12.2. At a high level, integrating ADF with Log Analytics is a 3-step process:

12.2.1. 1. Provision an Azure Log Analytics workspace

12.2.2. 2. Configure your data factory instance diagnostic settings to log all metrics and send to your Log Analytics workspace

12.2.3. 3. Provision an Azure Data Factory Analytics workspace and bind this to your Log Analytics workspace

12.3. Provision Azure Log Analytics workspace

12.3.1. In Azure Portal, create a new resource and choose Log Analytics

12.3.1.1. Set the subscription, resource group, name (of new Log Analytics workspace) and region

12.3.1.1.1. Set the pricing tier (e.g. Pay-as-you-go)

12.4. Configure Data Factory to send diagnostic log data to Log Analytics

12.4.1. Go to your data factory and under Diagnostic settings, click the "Add diagnostic setting" option

12.4.1.1. Log ActivityRuns, PipelineRuns and TriggerRuns for AllMetrics to your Log Analytics workspace

12.4.1.1.1. Note other possible destinations for your diagnostics logs are Azure storage account or event hub

12.4.1.1.2. Destination table should be Resource specific

12.5. Provision Azure Data Factory Analytics workspace

12.5.1. In Azure Portal, create a new resource and choose Data Factory Analytics

12.5.1.1. Bind your new Data Factory Analytics workspace to your Log Analytics workspace and click Create

12.6. At this point, you will need to either initiate (or wait for) some pipeline runs, ideally including a mic of success and failure

12.7. Build your Log Analytics dashboard for ADF

12.7.1. Go to your ADF Analytics solution

12.7.1.1. This is the Azure Data Analytics workspace, not the Log Analytics workspace

12.7.2. Under Workbooks, click the built in AzureDataFactoryAnalytics

12.7.3. Click the pin option and create a new dashboard for the items to be pinned to (e.g. ADF Dashboard)

12.7.3.1. The new dashboard will pin the overall workbook, which means the info we are interested in is effectively hidden one level down, so to remedy this we can edit the workbook, click on the pin options and selectively pin specific elements we are interested in

12.7.4. Once we've pinned the individual items we want and re-arranged the dashboard, it might look something like the attached

12.7.4.1. The experience of pinning and arranging the tiles is somewhat awkward at time of writing

12.7.4.2. Whilst we can see a pipeline failure on the dashboard, the only thing clickable is a link to the data factory instance itself, so you would need to go rummaging within the ADF monitoring to get to the actual error details

12.7.4.3. One notable feature is that some tiles have a button (highlighted in the previous screenshot) that when clicked, take you directly into Log Analytics

12.7.4.3.1. Log Analytics has its own query language and you can customise the dashboard tile by editing the underlying Log Analytics query

13. Alerts

13.1. This was not part of my Pragmatic Works course but I wanted to test out the built in alert feature to see if I can receive alert messages whenever a pipeline failure occurs in my data factory

13.2. Go the ADF Author & Monitor

13.2.1. Under Monitor | Alerts & metrics, click option to create a new alert rule

13.2.1.1. Give the alert rule a name such as PipelineFailure, set a severity level and then click the Add criteria option

13.2.1.2. Select Failed pipeline run metrics

13.2.1.3. Set options for the Failed pipeline metrics threshold criteria

13.2.1.3.1. Under Dimension, I just selected all pipelines (under Name) and all failure types

13.2.1.3.2. For the threshold, I went with the default Greater than Total of 0 (i.e. alert promptly if any type of failure occurs in any pipeline)

13.2.1.4. Configure notification by assigning an action group

13.2.1.4.1. I already had an action group set up, which is configured with an email address, but you can create a new one if required

13.2.1.5. Click the Create alert rule button once the configuration of the new rule is completed

13.2.1.5.1. This will create and immediately enable the new alert rule

13.3. Test out the new alert by setting up a particular pipeline for a failure and triggering that pipeline

13.3.1. You should receive an email alert notification within 5 minutes of a triggered pipeline failure

13.3.2. Click the option to "View the alert in Azure Monitor"

13.3.2.1. In order to actually drill into the details of what failed and what the error message was, you will need to click the link to the Data Factory instance

13.3.2.1.1. Under the ADF pipeline runs you will see the failures and the error message details are available

13.3.3. Interestingly, you will get an automatically generated email to say the alert has been resolved within a few minutes, which is probably due to the alert criteria looking over the period of last 1 minute

13.3.3.1. So, because there have been no failures noted within the last 1 minute, you get a second alert notification generated to say the alert condition is resolved, which is a bit misleading

14. Pipeline execution

14.1. Debugging

14.1.1. One more advanced option you can use is to provision your own IR for data flow debugging

14.1.1.1. To save costs, you can provision a new IR that is compute-optimized

14.1.1.2. To boost debug performance, you can provision a new IR that is memory-optimized

14.1.1.3. After you've provisioned a dedicated IR for data flow debugging, you get the option to set this when you start data flow debugging

14.2. Event based triggers using Logic Apps

14.2.1. The built-in event-based trigger for data factory supports only two events: Blob creation and Blob deletion, but by leveraging Logic Apps we can hugely expand on this

14.2.1.1. In this advanced scenario, we can use a Logic App to trigger our data factory pipeline, and we'll use the creation of a new row in a SQL Database table as the trigger

14.2.2. Demo setup details:

14.2.2.1. We have a new file named argento_employees.txt that has arrived in our data lake

14.2.2.1.1. We will be adding this to our TaskQueue table in order to trigger processing of the file

14.2.2.2. We are going to use our TaskQueue table in SQL Database as a trigger for our Logic App

14.2.2.2.1. We can imagine another process (perhaps another Logic App) automatically adding a row to the TaskQueue table as part of (or in response to) the file being landed in our data lake location, but for our demo, we'll do the insert into TaskQueue manually

14.2.2.3. We already have a dynamic parent pipeline for processing the task queue, which we named Pipeline_Load_Dynamic_Parent

14.2.2.3.1. This is what we'll have our Logic App call after receiving the trigger of a new row added to the TaskQueue table

14.2.2.4. Our Logic App will need to authenticate to our SQL Database, so I decided to create a SQL credential for this

14.2.2.4.1. In SSMS, create login for argento-sql-logic-app against the master database

14.2.2.4.2. In SSMS, create user for argento-sql-logic-app against the main database (ArgentoTraining in this case)

14.2.3. Demo

14.2.3.1. Provision a new Logic App

14.2.3.2. Search for "sql" and choose the SQL Server trigger "When an item is created"

14.2.3.2.1. Despite the "SQL Server" reference being suggestive of on-prem, this trigger works for Azure SQL Database and Managed Instance too

14.2.3.3. Create a new API connection to SQL Database by selecting SQL Server Authentication and entering the previously created user credentials

14.2.3.4. Configure the SQL Server trigger step

14.2.3.4.1. For server name and database name we use the values from the API connections settings previously created

14.2.3.4.2. For table name, we use a custom value to specify the table of interest to us

14.2.3.4.3. Set the the trigger frequency (e.g. every 1 minute)

14.2.3.5. Add a second step and search for "data factory", then select the "Create a pipeline run" action

14.2.3.5.1. Click the Sign in button

14.2.3.6. Configure the ADF "Create a pipeline run" pipeline and click Save to complete the logic app

14.2.3.7. Insert a new row into the SQL table to cause the logic app trigger to fire

14.2.3.8. Gotcha! The SQL Database table must feature two columns with the following characteristics:

14.2.3.8.1. An identity column

14.2.3.8.2. A column typed ROWVERSION

14.2.3.8.3. In order to get this demo to work, I had to recreate the dbo.TaskQueue table as per the attached screenshot

14.2.3.9. Check the Logic App run history to verify that a successful run of the logic app got triggered by the SQL table insert

14.2.3.9.1. If you drill into the trigger history, you should see the successful trigger event corresponding to the time you inserted the new table row in SQL Database

14.2.3.10. Check the ADF pipeline runs to verify that the expected pipeline got triggered at the same time the Logic App trigger fired

14.2.3.10.1. Note: in this example, the pipeline in question failed, but this is incidental to the main purpose of the demo, which is to prove that a Logic App can trigger an ADF pipeline based on the event of adding a new row into a SQL Database table

14.3. Call child pipeline in child data factory from a separate parent data factory

14.3.1. This idea illustrates how you can implement the idea of a "parent" data factory coordinating the work of many pipelines being run by separate "child" data factories

14.3.1.1. It is based on a technique described in a Microsoft blog

14.3.1.2. The main elements of the solution are as follows:

14.3.1.2.1. Azure Managed Identities for data factory

14.3.1.2.2. Web activity or Webhook activity

14.3.2. Demo: call pipeline in child data factory from parent data factory using the Web activity

14.3.2.1. Provision a separate data factory to act as the parent

14.3.2.1.1. In this example, I named my data factory argento-parent-datafactory

14.3.2.2. Grant the parent data factory membership of the "Data Factory Contributor" RBAC role for the child data factory

14.3.2.2.1. In this example, argento-datafactory is my child data factory - you use the Access Control to add a role assignment and then search for + select the parent DF before clicking Save

14.3.2.3. Create a child pipeline in the child data factory

14.3.2.3.1. In this example, I just created a very simple test pipeline that just runs a basic Wait activity

14.3.2.4. Create a parent pipeline in the parent data factory and add a Web activity to it

14.3.2.4.1. Settings:

14.3.2.5. Publish all changes in both child and parent data factories

14.3.2.6. Trigger the parent pipeline in the parent data factory

14.3.2.6.1. Verify successful run of parent pipeline in parent data factory

14.3.2.6.2. Verify successful run of child pipeline in child data factory