How much effort and code are needed to implement a dynamic ETL solution using Azure Data Factory (ADF) and SQL Server Change Tracking?
As you will see here not much and indeed not very much compared to the benefits. But as always the devil lies in the details.
The task I gave my self was to implement an ETL solution/proof-of-concept without having any knowledge about the data to be loaded except of the table names.
Design
The solution consists of an ADF with two data sets and one pipeline.
The ADF is used as the engine to make things happening but the statements to be executed all comes from stored procedures. By keeping the logic in stored procedures, the need to modify and change the ADF pipeline is minimized. It also makes debugging much easier as the stored procedures can be executed and tested one by one.
The source data set is located on an on-premise SQL Server using the Microsoft Integration Runtime (self-hosted) to securely giving ADF access the database. The source database used is the AdventureWorks2017 database which has 68 tables in the schemas HumanResources, Person, Production, Purchasing and Sales.
An Azure SQL database is used as the sink (destination) database. The sink database also hosts the 2 tables and 4 stored procedures used by the solution.
The only table that contains information about which tables to be processed by the solution is dbo.Tables. It is a simple table with 2 columns, Schema and Object, and it contains the only configuration that is needed.
The solution uses two separate schemas LZ (landing zone) and DATA where the LZ schema contains the tables where the extracted data from the source data lands before it is merged into the final tables in the DATA schema.
I have chosen to use regularly tables in the LZ schema and not e.g. temporary tables as having the extracted data available for a longer time helps debugging and troubleshooting.
Another design decision was to drop the LZ tables prior to each data load and let the Copy Data activity recreate them again. The reasoning for that was to get the schema of the source tables as easy as possible. This blog does not take in account schema drift where the source table schemas changes over time but having the LZ tables to always reflect the source schemas will make it easier to investigate schema drift.
The DATA tables are created at the first execution using the source tables as template.
Here is a screen capture from SSMS where the tables have been filtered to only show the tables from the schema HumanResources. The LZ and DATA tables are named by concatenating the source schema and table name separated by an underscore ‘_’
Implementation
The implementation is using dynamic SQL statement that again uses dynamic SQL. And by doing that the design goal is reached – a solution that has a minimum degree of configuration.
ADF pipeline
Main flow
The main flow in the pipeline consist of a Lookup activity calling the stored procedure dbo.GetTables and then a ForEach activity iterating over the result set from the stored procedure.
IterateOverTables activities
The flow executed for every table consists of 6 activities:
Sink | GetCreateTableQueryStmt | The stored procedure dbo.GetCreateTableStmt checks if the DATA table exists and if not it returns a query statement to be used by the False activities of the If Condition activity CheckForCreateTableStmt |
– | CheckForCreateTableStmt | Checks if a query statement was returned by the Lookup activity GetCreateTableQueryStmt. If a statement was returned, meaning that the DATA table does not exists, the False activities sub-flow is executed |
Sink | GetExtractQueryStmt | The stored procedure returns query statement to be executed on the source by the Lookup activity GetExtractStmt |
Source | GetExtractStmt | This activity executes the query statement to get the extract statement containing the actual SQL statement to be executed by the Copy Data activity ExecuteExtractStmt |
Source Sink | ExecuteExtractStmt | Executes the SQL statement that performs: 1) a full load if we are behind the minimum valid change tracking version or this is the very first time 2) a delta load extracting the changes since the last execution and now Important: This Data Copy activity must be configured to use the isolation level Snapshot in order to guarantee that the change tracking code is 100% waterproof. See screenshot below. |
Sink | LoadData | Execute the stored procedure dbo.LoadData that merge the data from the LZ table into the DATA table |
Data Copy activity isolation level:
CheckForCreateTableStmt activities
Source | GetCreateDataTableStmt | This activity executes the query statement to get the CREATE TABLE statement to be executed on the sink database by the Lookup activity ExecuteCreateDataTableStmt |
Sink | ExecuteCreateDataTableStmt | Executes the CREATE TABLE statement |
Tables
dbo.Tables
As said earlier this is the only table to be configured in order to use the ETL solution.
CREATE TABLE dbo.Tables(
Id int IDENTITY NOT NULL,
[Schema] nvarchar(128) NOT NULL,
Object nvarchar(128) NOT NULL,
CONSTRAINT PK_Tables PRIMARY KEY CLUSTERED(Id)
);
dbo.LoadStatus
This table contains the result for every execution. The most crucial data in the table is the TrackingVersion as that is used to verify if we are too late, lower version number than CHANGE_TRACKING_MIN_VALID_VERSION(), and else it used to capture the changes happened since the latest version number.
CREATE TABLE dbo.LoadStatus(
Id int IDENTITY NOT NULL,
[Schema] nvarchar(128) NOT NULL,
Object nvarchar(128) NOT NULL,
Rows int NOT NULL,
DeltaRows int NOT NULL,
TrackingVersion bigint NOT NULL,
Operation nchar(1) NOT NULL,
Duration int NOT NULL,
TimestampUTC datetime2(3) CONSTRAINT DEF_LoadStatus DEFAULT(SYSUTCDATETIME()),
CONSTRAINT PK_LoadStatus PRIMARY KEY CLUSTERED(Id)
);
Stored procedures
The stored procedures are listed in the sequence they are used. And as said earlier they rely solely on dynamic SQL in order to deliver the design goal – minimal configuration.
dbo.GetTables
A very simple stored procedure, but again, by keeping the logic in stored procedures, and not accessing tables directly from ADF the solution is much easier to maintain and change to future requirements.
CREATE OR ALTER PROC dbo.GetTables
AS
BEGIN
SET NOCOUNT ON;
SELECT t.[Schema], t.Object
FROM dbo.Tables AS t
ORDER BY t.[Schema], t.Object;
END;
GO
dbo.GetCreateTableStmt
The purpose of the stored procedure is to check if the DATA table exists for the source table <@source>.<@object>.
If the DATA table does not exist, the stored procedure returns a statement to be executed on the source database in order to get the CREATE TABLE statement in return.
The DATA table will be created with clustered primary key matching the primary key on the source table.
Regarding the data types the stored procedure only handles those used by AdventureWorks2017, and it also includes exception for the data types that AdventureWorks2017 uses but that ADF does not support. See the section – The devil lies in the details
CREATE OR ALTER PROC dbo.GetCreateTableStmt
@schema nvarchar(128),
@object nvarchar(128)
AS
BEGIN
SET NOCOUNT ON;
DECLARE @loadSchema nvarchar(128);
DECLARE @stmt nvarchar(max);
SET @loadSchema = 'DATA';
SET @stmt = '';
IF NOT EXISTS (
SELECT *
FROM sys.schemas AS sch
INNER JOIN sys.objects AS o ON (o.schema_id = o.schema_id)
WHERE (sch.name = @loadSchema) AND (o.name = @schema + '_' + @object)
)
BEGIN
SET @stmt += '
DECLARE @columnIsDescendingKey bit;
DECLARE @columnIsNullable bit;
DECLARE @columnMaxLength smallint;
DECLARE @columnName nvarchar(128);
DECLARE @columnPrecision tinyint;
DECLARE @columnScale tinyint;
DECLARE @columnType nvarchar(128);
DECLARE @sinkObject nvarchar(128);
DECLARE @sinkSchema nvarchar(128);
DECLARE @sourceObject nvarchar(128);
DECLARE @sourceSchema nvarchar(128);
DECLARE @stmt nvarchar(max);
';
SET @stmt += '
SET @sourceSchema = ''' + @schema + ''';
SET @sourceObject = ''' + @object + ''';
SET @sinkSchema = ''' + @loadSchema + ''';
SET @sinkObject = @sourceSchema + ''_'' + @sourceObject;
SET @stmt = ''CREATE TABLE '' + QUOTENAME(@sinkSchema) + ''.'' + QUOTENAME(@sinkObject) + ''('';
DECLARE tCur CURSOR LOCAL READ_ONLY FAST_FORWARD FOR
SELECT
c.name AS ColumnName
,COALESCE(nativeT.name, t.name) AS ColumnType
,c.max_length AS ColumnMaxLength
,c.precision AS ColumnPrecision
,c.scale AS ColumnScale
,c.is_nullable AS ColumnIsNullable
FROM sys.schemas AS sch
INNER JOIN sys.objects AS o ON (o.schema_id = sch.schema_id)
INNER JOIN sys.columns AS c ON (c.object_id = o.object_id)
INNER JOIN sys.types AS t ON (t.system_type_id = c.system_type_id) AND (t.user_type_id = c.user_type_id)
LEFT OUTER JOIN sys.types AS nativeT on (nativeT.user_type_id = t.system_type_id)
WHERE (sch.name = @sourceSchema) AND (o.name = @sourceObject) AND (o.type = ''U'')
ORDER BY c.column_id;
';
SET @stmt += '
OPEN tCur;
WHILE (1 = 1)
BEGIN
FETCH NEXT FROM tCur
INTO @columnName, @columnType, @columnMaxLength, @columnPrecision, @columnScale, @columnIsNullable;
IF (@@FETCH_STATUS <> 0)
BEGIN
BREAK;
END;
SET @stmt += CHAR(13) + CHAR(10) + QUOTENAME(@columnName) + '' '' +
CASE @columnType
WHEN ''varbinary'' THEN @columnType + ''('' + COALESCE(CAST(NULLIF(@columnMaxLength, -1) AS nvarchar), ''max'') + '')''
WHEN ''char'' THEN @columnType + ''('' + COALESCE(CAST(NULLIF(@columnMaxLength, -1) AS nvarchar), ''max'') + '')''
WHEN ''varchar'' THEN @columnType + ''('' + COALESCE(CAST(NULLIF(@columnMaxLength, -1) AS nvarchar), ''max'') + '')''
WHEN ''nchar'' THEN @columnType + ''('' + COALESCE(CAST(NULLIF(@columnMaxLength, -1) / 2 AS nvarchar), ''max'') + '')''
WHEN ''nvarchar'' THEN @columnType + ''('' + COALESCE(CAST(NULLIF(@columnMaxLength, -1) / 2 AS nvarchar), ''max'') + '')''
WHEN ''decimal'' THEN @columnType + ''('' + CAST(@columnPrecision AS nvarchar) + '','' + CAST(@columnScale AS nvarchar) + '')''
WHEN ''numeric'' THEN @columnType + ''('' + CAST(@columnPrecision AS nvarchar) + '','' + CAST(@columnScale AS nvarchar) + '')''
WHEN ''time'' THEN @columnType + ''('' + CAST(@columnScale AS nvarchar) + '')''
-- SQL data types that can not be transferred directly using ADF
WHEN ''geography'' THEN ''nvarchar(max)''
WHEN ''hierarchyid'' THEN ''nvarchar(4000)''
ELSE @columnType
END +
CASE @columnIsNullable WHEN 0 THEN '' NOT'' ELSE '''' END + '' NULL,'';
END;
CLOSE tCur;
DEALLOCATE tCur;
';
SET @stmt += '
SET @stmt += CHAR(13) + CHAR(10) + ''CONSTRAINT [PK_'' + @sinkSchema + ''_'' + @sinkObject + ''] PRIMARY KEY CLUSTERED('';
DECLARE iCur CURSOR LOCAL READ_ONLY FAST_FORWARD FOR
SELECT c.name, ic.is_descending_key
FROM sys.schemas AS sch
INNER JOIN sys.objects AS o ON (o.schema_id = sch.schema_id)
INNER JOIN sys.indexes AS i ON (i.object_id = o.object_id)
INNER JOIN sys.index_columns AS ic ON (ic.object_id = i.object_id) AND (ic.index_id = i.index_id)
INNER JOIN sys.columns AS c ON (c.object_id = ic.object_id) AND (c.column_id = ic.column_id)
WHERE (sch.name = @sourceSchema) AND (o.name = @sourceObject) AND (o.type = ''U'') AND (i.is_primary_key = 1)
ORDER BY ic.key_ordinal;
';
SET @stmt += '
OPEN iCur;
WHILE (1 = 1)
BEGIN
FETCH NEXT FROM iCur
INTO @columnName, @columnIsDescendingKey;
IF (@@FETCH_STATUS <> 0)
BEGIN
BREAK;
END;
SET @stmt += CHAR(13) + CHAR(10) + QUOTENAME(@columnName) + '' '' + CASE @columnIsDescendingKey WHEN 0 THEN ''ASC'' ELSE ''DESC'' END + '','';
END;
CLOSE iCur;
DEALLOCATE iCur;
';
SET @stmt += '
SET @stmt = LEFT(@stmt, LEN(@stmt) - 1);
SET @stmt += CHAR(13) + CHAR(10) + '')'';
SET @stmt += CHAR(13) + CHAR(10) + '');'';
-- Needed as a Lookup activity must return something
SET @stmt += CHAR(13) + CHAR(10) + ''SELECT 1 AS Dummy;'';
SELECT @stmt AS Statement;
';
END;
SELECT @stmt AS Statement;
END;
GO
dbo.GetExtractStmt
This stored procedure generates the extract query statement to be executed on the source database and the returned statement will then be used by the Copy Data activity.
Regarding the data types the stored procedure only handles those used by AdventureWorks2017, and it also includes exception for the data types that AdventureWorks2017 uses but that ADF does not support. See the section – The devil lies in the details
CREATE OR ALTER PROC dbo.GetExtractStmt
@schema nvarchar(128),
@object nvarchar(128),
@dropSinkTable int
AS
BEGIN
SET NOCOUNT ON;
DECLARE @sinkSchema nvarchar(128);
DECLARE @sinkTable nvarchar(128);
DECLARE @stmt nvarchar(max);
DECLARE @trackingVersion bigint;
SET @sinkSchema = 'LZ';
SET @sinkTable = @schema + '_' + @object;
SET @trackingVersion = COALESCE((
SELECT TOP(1) ls.TrackingVersion
FROM dbo.LoadStatus AS ls
WHERE (ls.[Schema] = @schema) AND (ls.Object = @object)
ORDER BY ls.Id DESC
), -1);
IF (@dropSinkTable = 1)
BEGIN
IF OBJECT_ID(QUOTENAME(@sinkSchema) + '.' + QUOTENAME(@sinkTable)) IS NOT NULL
BEGIN
SET @stmt = 'DROP TABLE ' + QUOTENAME(@sinkSchema) + '.' + QUOTENAME(@sinkTable) + ';';
EXEC (@stmt);
END;
END;
SET @stmt = '
DECLARE @columnDataType nvarchar(128);
DECLARE @columnIsInPrimaryKey bit;
DECLARE @columnMaxLength smallint;
DECLARE @columnName nvarchar(128);
DECLARE @columnPrecision tinyint;
DECLARE @columnScale tinyint;
DECLARE @columnType nvarchar(128);
DECLARE @internalColumns nvarchar(max);
DECLARE @joinConditions nvarchar(max);
DECLARE @resultStmt nvarchar(max);
DECLARE @srcColumnsCT nvarchar(max);
DECLARE @srcColumnsFull nvarchar(max);
DECLARE @sourceObject nvarchar(128);
DECLARE @sourceSchema nvarchar(128);
DECLARE @trackingVersion bigint;
SET @sourceSchema = ''' + @schema + ''';
SET @sourceObject = ''' + @object + ''';
SET @trackingVersion = ' + CAST(@trackingVersion AS nvarchar) + ';
';
SET @stmt += '
-- Create column blocks
BEGIN
DECLARE iCur CURSOR LOCAL READ_ONLY FAST_FORWARD FOR
SELECT
c.name AS ColumnName
,COALESCE(nativeT.name, t.name) AS ColumnType
,c.max_length AS ColumnMaxLength
,c.precision AS ColumnPrecision
,c.scale AS ColumnScale
,COALESCE((
SELECT 1
FROM sys.indexes AS pi
INNER JOIN sys.index_columns AS pic ON (pic.object_id = pi.object_id) AND (pic.index_id = pi.index_id)
INNER JOIN sys.columns AS pc ON (pc.object_id = pic.object_id) AND (pc.column_id = pic.column_id)
WHERE (pi.object_id = o.object_id) AND (pi.is_primary_key = 1) AND (pc.name = c.name)
), 0) AS ColumnIsInPrimaryKey
FROM sys.schemas AS sch
INNER JOIN sys.objects AS o ON (o.schema_id = sch.schema_id)
INNER JOIN sys.columns AS c ON (c.object_id = o.object_id)
INNER JOIN sys.types AS t ON (t.system_type_id = c.system_type_id) AND (t.user_type_id = c.user_type_id)
LEFT OUTER JOIN sys.types AS nativeT on (nativeT.user_type_id = t.system_type_id)
WHERE (sch.name = @sourceSchema) AND (o.name = @sourceObject) AND (o.type = ''U'')
ORDER BY ColumnIsInPrimaryKey DESC, c.column_id;
SET @srcColumnsCT = '''';
SET @srcColumnsFull = '''';
SET @internalColumns = '''';
SET @joinConditions = '''';
';
SET @stmt += '
OPEN iCur;
WHILE (1 = 1)
BEGIN
FETCH NEXT FROM iCur
INTO @columnName, @columnType, @columnMaxLength, @columnPrecision, @columnScale, @columnIsInPrimaryKey;
IF (@@FETCH_STATUS <> 0)
BEGIN
BREAK;
END;
SET @columnDataType =
CASE @columnType
WHEN ''varbinary'' THEN @columnType + ''('' + COALESCE(CAST(NULLIF(@columnMaxLength, -1) AS nvarchar), ''max'') + '')''
WHEN ''char'' THEN @columnType + ''('' + COALESCE(CAST(NULLIF(@columnMaxLength, -1) AS nvarchar), ''max'') + '')''
WHEN ''varchar'' THEN @columnType + ''('' + COALESCE(CAST(NULLIF(@columnMaxLength, -1) AS nvarchar), ''max'') + '')''
WHEN ''nchar'' THEN @columnType + ''('' + COALESCE(CAST(NULLIF(@columnMaxLength, -1) / 2 AS nvarchar), ''max'') + '')''
WHEN ''nvarchar'' THEN @columnType + ''('' + COALESCE(CAST(NULLIF(@columnMaxLength, -1) / 2 AS nvarchar), ''max'') + '')''
WHEN ''decimal'' THEN @columnType + ''('' + CAST(@columnPrecision AS nvarchar) + '','' + CAST(@columnScale AS nvarchar) + '')''
WHEN ''numeric'' THEN @columnType + ''('' + CAST(@columnPrecision AS nvarchar) + '','' + CAST(@columnScale AS nvarchar) + '')''
WHEN ''time'' THEN @columnType + ''('' + CAST(@columnScale AS nvarchar) + '')''
-- SQL data types that can not be transferred directly using ADF
WHEN ''geography'' THEN ''nvarchar(max)''
WHEN ''hierarchyid'' THEN ''nvarchar(4000)''
ELSE @columnType
END;
IF (@columnType IN (''geography'', ''hierarchyid'', ''xml''))
BEGIN
SET @srcColumnsCT += '', CAST('' + CASE @columnIsInPrimaryKey WHEN 1 THEN ''ct'' ELSE ''t'' END + ''.'' + QUOTENAME(@columnName) + '' AS '' + @columnDataType + '') AS '' + QUOTENAME(@columnName);
SET @srcColumnsFull += '', CAST(t.'' + QUOTENAME(@columnName) + '' AS '' + @columnDataType + '') AS '' + QUOTENAME(@columnName);
END
ELSE BEGIN
SET @srcColumnsCT += '', '' + CASE @columnIsInPrimaryKey WHEN 1 THEN ''ct'' ELSE ''t'' END + ''.'' + QUOTENAME(@columnName);
SET @srcColumnsFull += '', t.'' + QUOTENAME(@columnName);
END;
IF (@columnIsInPrimaryKey = 1)
BEGIN
IF (@joinConditions <> '''')
BEGIN
SET @joinConditions += '' AND '';
END;
SET @joinConditions += ''(ct.'' + QUOTENAME(@columnName) + '' = t.'' + QUOTENAME(@columnName) + '')'';
END;
SET @internalColumns += '', CAST(NULL AS '' + @columnDataType + '') AS '' + QUOTENAME(@columnName);
END;
CLOSE iCur;
DEALLOCATE iCur;
';
SET @stmt += '
END;
SET @resultStmt = ''
DECLARE @currentTrackingVersion bigint;
DECLARE @trackingVersion bigint;
SET @trackingVersion = '' + CAST(@trackingVersion AS nvarchar) + '';
-- Obtain the version to use next time.
SET @currentTrackingVersion = CHANGE_TRACKING_CURRENT_VERSION();
-- Verify that version of the previous synchronization is valid.
IF (@trackingVersion < CHANGE_TRACKING_MIN_VALID_VERSION(OBJECT_ID(QUOTENAME('''''' + @sourceSchema + '''''') + ''''.'''' + QUOTENAME('''''' + @sourceObject + ''''''))))
BEGIN
-- Handle invalid version and do not enumerate changes.
-- Client must be reinitialized.
SELECT CAST(0 AS bit) _CTInternal_, @currentTrackingVersion AS _CTTrackingVersion_, CAST(''''F'''' AS nchar(1)) AS _CTOperation_'' + @srcColumnsFull + ''
FROM '' + QUOTENAME(@sourceSchema) + ''.'' + QUOTENAME(@sourceObject) + '' AS t
UNION ALL
SELECT CAST(1 AS bit) AS _CTInternal_, @currentTrackingVersion AS _CTTrackingVersion_, CAST(''''F'''' AS nchar(1)) AS _CTOperation_'' + @internalColumns + ''
END
ELSE BEGIN
-- Obtain changes.
SELECT CAST(0 AS bit) AS _CTInternal_, @currentTrackingVersion AS _CTTrackingVersion_, ct.SYS_CHANGE_OPERATION AS _CTOperation_'' + @srcColumnsCT + ''
FROM '' + QUOTENAME(@sourceSchema) + ''.'' + QUOTENAME(@sourceObject) + '' AS t
RIGHT OUTER JOIN CHANGETABLE(CHANGES '' + QUOTENAME(@sourceSchema) + ''.'' + QUOTENAME(@sourceObject) + '', @trackingVersion) AS ct ON '' + @joinConditions + ''
UNION ALL
SELECT CAST(1 AS bit) AS _CTInternal_, @currentTrackingVersion AS _CTTrackingVersion_, CAST(''''-'''' AS nchar(1)) AS _CTOperation_'' + @internalColumns + ''
END;
'';
SELECT @resultStmt AS Statement;
';
SELECT
@stmt AS Statement,
@sinkSchema AS SinkSchema,
@sinkTable AS SinkTable;
END;
GO
dbo.LoadData
This is the stored procedure responsible for merging data from the LZ tables into the DATA tables and afterwards register the result into dbo.LoadStatus
CREATE OR ALTER PROC dbo.LoadData
@schema nvarchar(128),
@object nvarchar(128),
@duration int
AS
BEGIN
SET NOCOUNT ON;
DECLARE @columnIsInPrimaryKey bit;
DECLARE @columnName nvarchar(128);
DECLARE @fields nvarchar(max);
DECLARE @joinConditions nvarchar(max);
DECLARE @loadSchema nvarchar(128);
DECLARE @sinkSchema nvarchar(128);
DECLARE @srcFields nvarchar(max);
DECLARE @stmt nvarchar(max);
DECLARE @table nvarchar(128);
DECLARE @updateStmt nvarchar(max);
SET @sinkSchema = 'LZ';
SET @loadSchema = 'DATA';
SET @table = @schema + '_' + @object;
DECLARE cCur CURSOR LOCAL READ_ONLY FAST_FORWARD FOR
SELECT
c.name AS ColumnName
,COALESCE((
SELECT 1
FROM sys.indexes AS pi
INNER JOIN sys.index_columns AS pic ON (pic.object_id = pi.object_id) AND (pic.index_id = pi.index_id)
INNER JOIN sys.columns AS pc ON (pc.object_id = pic.object_id) AND (pc.column_id = pic.column_id)
WHERE (pi.object_id = o.object_id) AND (pi.is_primary_key = 1) AND (pc.name = c.name)
), 0) AS ColumnIsInPrimaryKey
FROM sys.schemas AS sch
INNER JOIN sys.objects AS o ON (o.schema_id = sch.schema_id)
INNER JOIN sys.columns AS c ON (c.object_id = o.object_id)
WHERE (sch.name = @loadSchema) AND (o.name = @table) AND (o.type = 'U')
ORDER BY c.column_id;
SET @fields = '';
SET @srcFields = '';
SET @joinConditions = '';
SET @updateStmt = '';
OPEN cCur;
WHILE (1 = 1)
BEGIN
FETCH NEXT FROM cCur
INTO @columnName, @columnIsInPrimaryKey;
IF (@@FETCH_STATUS <> 0)
BEGIN
BREAK;
END;
IF (@srcFields <> '')
BEGIN
SET @srcFields += ', ';
END;
SET @srcFields += 'src.' + QUOTENAME(@columnName)
IF (@fields <> '')
BEGIN
SET @fields += ', ';
END;
SET @fields += QUOTENAME(@columnName)
IF (@columnIsInPrimaryKey = 1)
BEGIN
IF (@joinConditions <> '')
BEGIN
SET @joinConditions += ' AND ';
END;
SET @joinConditions += '(src.' + QUOTENAME(@columnName) + ' = tgt.' + QUOTENAME(@columnName) + ')';
END
ELSE BEGIN
IF (@updateStmt <> '')
BEGIN
SET @updateStmt += ', ';
END;
SET @updateStmt += QUOTENAME(@columnName) + ' = src.' + QUOTENAME(@columnName);
END;
END;
CLOSE cCur;
DEALLOCATE cCur;
SET @stmt = '
DECLARE @deltaRows int;
DECLARE @operation nchar(1);
DECLARE @rows int;
DECLARE @trackingVersion bigint;
IF EXISTS (SELECT * FROM ' + QUOTENAME(@sinkSchema) + '.' + QUOTENAME(@table) + ' AS src WHERE (src._CTOperation_ = ''F''))
BEGIN
TRUNCATE TABLE ' + QUOTENAME(@loadSchema) + '.' + QUOTENAME(@table) + ';
INSERT INTO ' + QUOTENAME(@loadSchema) + '.' + QUOTENAME(@table) + '(' + @fields + ')
SELECT ' + @srcFields + ' FROM ' + QUOTENAME(@sinkSchema) + '.' + QUOTENAME(@table) + ' AS src
WHERE (src._CTInternal_ = 0);
END
ELSE BEGIN
MERGE ' + QUOTENAME(@loadSchema) + '.' + QUOTENAME(@table) + ' AS tgt
USING ' + QUOTENAME(@sinkSchema) + '.' + QUOTENAME(@table) + ' AS src
ON ' + @joinConditions + '
WHEN MATCHED AND (src._CTOperation_ = ''U'') THEN
UPDATE SET ' + @updateStmt + '
WHEN NOT MATCHED AND (src._CTOperation_ = ''I'') THEN
INSERT(' + @fields + ')
VALUES(' + @srcFields + ');
DELETE tgt
FROM ' + QUOTENAME(@loadSchema) + '.' + QUOTENAME(@table) + ' AS tgt
INNER JOIN ' + QUOTENAME(@sinkSchema) + '.' + QUOTENAME(@table) + ' AS src ON ' + @joinConditions + '
WHERE (src._CTOperation_ = ''D'');
END;
SET @rows = (SELECT COUNT(*) FROM ' + QUOTENAME(@loadSchema) + '.' + QUOTENAME(@table) + ');
SET @deltaRows = (SELECT COUNT(*) FROM ' + QUOTENAME(@sinkSchema) + '.' + QUOTENAME(@table) + ' AS src WHERE (src._CTInternal_ = 0));
SET @operation = (SELECT src._CTOperation_ FROM ' + QUOTENAME(@sinkSchema) + '.' + QUOTENAME(@table) + ' AS src WHERE (src._CTInternal_ = 1));
SET @trackingVersion = (SELECT src._CTTrackingVersion_ FROM ' + QUOTENAME(@sinkSchema) + '.' + QUOTENAME(@table) + ' AS src WHERE (src._CTInternal_ = 1));
INSERT INTO dbo.LoadStatus([Schema], Object, Rows, DeltaRows, TrackingVersion, Operation, Duration)
SELECT @schema, @object, @rows, @deltaRows, @trackingVersion, @operation, @duration;
';
EXEC sp_executesql
@stmt
,N'@schema nvarchar(128), @object nvarchar(128), @duration int'
,@schema = @schema
,@object = @object
,@duration = @duration;
END;
GO
Transformation – the T in ETL
The solution does not perform any transformation, but it will be easy to implement and because all the logic is kept in the stored procedures the ADF does not need to be changed in order to implement such.
One way to implement transformation could be creating a table e.g. dbo.Columns, which then contains the transformation expression to be executed for a particular column.
Beside the table it would then be the stored procedure dbo.LoadData that should be changed in order to use the transformation expressions in dbo.Columns.
dbo.Columns could also be used to disable/enable which columns that should be loaded. Currently the solution loads everything and depending on the business needs, that could be way too much. If dbo.Columns was used in such a way then all 3 stored procedures, dbo.GetCreateTableStmt, dbo.GetExtractStmt and dbo.LoadData, must be changed.
The devil lies in the details
When I started on this task, I was expecting challenges but as usually they appeared somewhat differently as expected.
As written earlier there are data type issues.
AdventureWorks2017 uses 3 data types that the ADF Copy Data activity cannot handle natively:
- geography
- hierarchyid
- xml using a schema collection
These data types had to be cast to nvarchar().
And as also mentioned only the data types that AdventureWorks2017 uses has been implemented. But it is easily to add other if needed by updating the stored procedures dbo.GetCreateTableStmt and dbo.GetExtractStmt.
ADF pipeline – JSON
{
"$schema": "http://schema.management.azure.com/schemas/2015-01-01/deploymentTemplate.json#",
"contentVersion": "1.0.0.0",
"parameters": {
"factoryName": {
"type": "string",
"metadata": "Data Factory name"
},
"ADFETLSink": {
"type": "string"
},
"AdventureWorks2017onSQL2019": {
"type": "string"
}
},
"variables": {
"factoryId": "[concat('Microsoft.DataFactory/factories/', parameters('factoryName'))]"
},
"resources": [
{
"name": "[concat(parameters('factoryName'), '/ADFETL')]",
"type": "Microsoft.DataFactory/factories/pipelines",
"apiVersion": "2018-06-01",
"properties": {
"activities": [
{
"name": "GetListOfTables",
"type": "Lookup",
"dependsOn": [
],
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [
],
"typeProperties": {
"source": {
"type": "AzureSqlSource",
"sqlReaderStoredProcedureName": "dbo.GetTables",
"queryTimeout": "02:00:00"
},
"dataset": {
"referenceName": "ADFETLSink",
"type": "DatasetReference",
"parameters": {
"Schema": "Dummy",
"Table": "Dummy"
}
},
"firstRowOnly": false
}
},
{
"name": "IterateOverTables",
"type": "ForEach",
"dependsOn": [
{
"activity": "GetListOfTables",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [
],
"typeProperties": {
"items": {
"value": "@activity('GetListOfTables').output.value",
"type": "Expression"
},
"isSequential": true,
"activities": [
{
"name": "CheckForCreateTableStmt",
"description": "If returned statement is an empty string (True) we don't do anything.\nIf it is not an empty string it should be executed against the source",
"type": "IfCondition",
"dependsOn": [
{
"activity": "GetCreateTableQueryStmt",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [
],
"typeProperties": {
"expression": {
"value": "@equals(activity('GetCreateTableQueryStmt').output.firstRow.Statement, '')",
"type": "Expression"
},
"ifFalseActivities": [
{
"name": "GetCreateDataTableStmt",
"type": "Lookup",
"dependsOn": [
],
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [
],
"typeProperties": {
"source": {
"type": "SqlServerSource",
"sqlReaderQuery": {
"value": "@{activity('GetCreateTableQueryStmt').output.firstRow.Statement}",
"type": "Expression"
},
"queryTimeout": "02:00:00"
},
"dataset": {
"referenceName": "ADFETLSource",
"type": "DatasetReference",
"parameters": {
}
}
}
},
{
"name": "ExecuteCreateDataTableStmt",
"type": "Lookup",
"dependsOn": [
{
"activity": "GetCreateDataTableStmt",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [
],
"typeProperties": {
"source": {
"type": "AzureSqlSource",
"sqlReaderQuery": {
"value": "@{activity('GetCreateDataTableStmt').output.firstRow.Statement}",
"type": "Expression"
},
"queryTimeout": "02:00:00"
},
"dataset": {
"referenceName": "ADFETLSink",
"type": "DatasetReference",
"parameters": {
"Schema": "Dummy",
"Table": "Dummy"
}
}
}
}
]
}
},
{
"name": "GetCreateTableQueryStmt",
"type": "Lookup",
"dependsOn": [
],
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [
],
"typeProperties": {
"source": {
"type": "AzureSqlSource",
"sqlReaderStoredProcedureName": "[[dbo].[GetCreateTableStmt]",
"storedProcedureParameters": {
"object": {
"type": "String",
"value": {
"value": "@item().Object",
"type": "Expression"
}
},
"schema": {
"type": "String",
"value": {
"value": "@item().Schema",
"type": "Expression"
}
}
},
"queryTimeout": "02:00:00"
},
"dataset": {
"referenceName": "ADFETLSink",
"type": "DatasetReference",
"parameters": {
"Schema": "Dummy",
"Table": "Dummy"
}
}
}
},
{
"name": "GetExtractQueryStmt",
"type": "Lookup",
"dependsOn": [
{
"activity": "CheckForCreateTableStmt",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [
],
"typeProperties": {
"source": {
"type": "AzureSqlSource",
"sqlReaderStoredProcedureName": "[[dbo].[GetExtractStmt]",
"storedProcedureParameters": {
"dropSinkTable": {
"type": "Int32",
"value": "1"
},
"object": {
"type": "String",
"value": "@item().Object"
},
"schema": {
"type": "String",
"value": "@item().Schema"
}
},
"queryTimeout": "02:00:00"
},
"dataset": {
"referenceName": "ADFETLSink",
"type": "DatasetReference",
"parameters": {
"Schema": "Dummy",
"Table": "Dummy"
}
}
}
},
{
"name": "ExecuteExtractStmt",
"type": "Copy",
"dependsOn": [
{
"activity": "GetExtractStmt",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [
],
"typeProperties": {
"source": {
"type": "SqlServerSource",
"sqlReaderQuery": {
"value": "@{activity('GetExtractStmt').output.firstRow.Statement}",
"type": "Expression"
},
"queryTimeout": "02:00:00",
"isolationLevel": "Snapshot"
},
"sink": {
"type": "AzureSqlSink",
"tableOption": "autoCreate",
"disableMetricsCollection": false
},
"enableStaging": false,
"enableSkipIncompatibleRow": false
},
"inputs": [
{
"referenceName": "ADFETLSource",
"type": "DatasetReference",
"parameters": {
}
}
],
"outputs": [
{
"referenceName": "ADFETLSink",
"type": "DatasetReference",
"parameters": {
"Schema": "@{activity('GetExtractQueryStmt').output.firstRow.SinkSchema}",
"Table": "@{activity('GetExtractQueryStmt').output.firstRow.SinkTable}"
}
}
]
},
{
"name": "GetExtractStmt",
"type": "Lookup",
"dependsOn": [
{
"activity": "GetExtractQueryStmt",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [
],
"typeProperties": {
"source": {
"type": "SqlServerSource",
"sqlReaderQuery": {
"value": "@{activity('GetExtractQueryStmt').output.firstRow.Statement}",
"type": "Expression"
},
"queryTimeout": "02:00:00"
},
"dataset": {
"referenceName": "ADFETLSource",
"type": "DatasetReference",
"parameters": {
}
}
}
},
{
"name": "LoadData",
"type": "SqlServerStoredProcedure",
"dependsOn": [
{
"activity": "ExecuteExtractStmt",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [
],
"typeProperties": {
"storedProcedureName": "[[dbo].[LoadData]",
"storedProcedureParameters": {
"duration": {
"value": "@{activity('ExecuteExtractStmt').output.copyDuration}",
"type": "Int32"
},
"object": {
"value": "@item().Object",
"type": "String"
},
"schema": {
"value": "@item().Schema",
"type": "String"
}
}
},
"linkedServiceName": {
"referenceName": "[parameters('ADFETLSink')]",
"type": "LinkedServiceReference"
}
}
]
}
}
],
"annotations": [
]
},
"dependsOn": [
"[concat(variables('factoryId'), '/datasets/ADFETLSink')]",
"[concat(variables('factoryId'), '/datasets/ADFETLSource')]"
]
},
{
"name": "[concat(parameters('factoryName'), '/ADFETLSink')]",
"type": "Microsoft.DataFactory/factories/datasets",
"apiVersion": "2018-06-01",
"properties": {
"linkedServiceName": {
"referenceName": "[parameters('ADFETLSink')]",
"type": "LinkedServiceReference"
},
"parameters": {
"Schema": {
"type": "string",
"defaultValue": "Dummy"
},
"Table": {
"type": "string",
"defaultValue": "Dummy"
}
},
"annotations": [
],
"type": "AzureSqlTable",
"schema": [
],
"typeProperties": {
"schema": {
"value": "@dataset().Schema",
"type": "Expression"
},
"table": {
"value": "@dataset().Table",
"type": "Expression"
}
}
},
"dependsOn": [
]
},
{
"name": "[concat(parameters('factoryName'), '/ADFETLSource')]",
"type": "Microsoft.DataFactory/factories/datasets",
"apiVersion": "2018-06-01",
"properties": {
"linkedServiceName": {
"referenceName": "[parameters('AdventureWorks2017onSQL2019')]",
"type": "LinkedServiceReference"
},
"annotations": [
],
"type": "SqlServerTable",
"schema": [
],
"typeProperties": {
}
},
"dependsOn": [
]
}
]
}
References
This Microsoft article explains in details how to use SQL Server Change Tracking and how to do it safely – https://docs.microsoft.com/en-us/sql/relational-databases/track-changes/work-with-change-tracking-sql-server?view=sql-server-2017
This article describes the ADF Copy Data activity – https://docs.microsoft.com/en-us/azure/data-factory/connector-sql-server