Implementing dynamic ETL using Azure Data Factory and SQL Server Change Tracking

How much effort and code are needed to implement a dynamic ETL solution using Azure Data Factory (ADF) and SQL Server Change Tracking?

As you will see here not much and indeed not very much compared to the benefits. But as always the devil lies in the details.

The task I gave my self was to implement an ETL solution/proof-of-concept without having any knowledge about the data to be loaded except of the table names.

Design

The solution consists of an ADF with two data sets and one pipeline.

The ADF is used as the engine to make things happening but the statements to be executed all comes from stored procedures. By keeping the logic in stored procedures, the need to modify and change the ADF pipeline is minimized. It also makes debugging much easier as the stored procedures can be executed and tested one by one.

The source data set is located on an on-premise SQL Server using the Microsoft Integration Runtime (self-hosted) to securely giving ADF access the database. The source database used is the AdventureWorks2017 database which has 68 tables in the schemas HumanResources, Person, Production, Purchasing and Sales.

An Azure SQL database is used as the sink (destination) database. The sink database also hosts the 2 tables and 4 stored procedures used by the solution.

The only table that contains information about which tables to be processed by the solution is dbo.Tables. It is a simple table with 2 columns, Schema and Object, and it contains the only configuration that is needed.

The solution uses two separate schemas LZ (landing zone) and DATA where the LZ schema contains the tables where the extracted data from the source data lands before it is merged into the final tables in the DATA schema.

I have chosen to use regularly tables in the LZ schema and not e.g. temporary tables as having the extracted data available for a longer time helps debugging and troubleshooting.

Another design decision was to drop the LZ tables prior to each data load and let the Copy Data activity recreate them again. The reasoning for that was to get the schema of the source tables as easy as possible. This blog does not take in account schema drift where the source table schemas changes over time but having the LZ tables to always reflect the source schemas will make it easier to investigate schema drift.

The DATA tables are created at the first execution using the source tables as template.

Here is a screen capture from SSMS where the tables have been filtered to only show the tables from the schema HumanResources. The LZ and DATA tables are named by concatenating the source schema and table name separated by an underscore ‘_’

Implementation

The implementation is using dynamic SQL statement that again uses dynamic SQL. And by doing that the design goal is reached – a solution that has a minimum degree of configuration.

ADF pipeline

Main flow

The main flow in the pipeline consist of a Lookup activity calling the stored procedure dbo.GetTables and then a ForEach activity iterating over the result set from the stored procedure.

IterateOverTables activities

The flow executed for every table consists of 6 activities:

SinkGetCreateTableQueryStmtThe stored procedure dbo.GetCreateTableStmt checks if the DATA table exists and if not it returns a query statement to be used by the False activities of the If Condition activity CheckForCreateTableStmt
CheckForCreateTableStmtChecks if a query statement was returned by the Lookup activity GetCreateTableQueryStmt.
If a statement was returned, meaning that the DATA table does not exists, the False activities sub-flow is executed
SinkGetExtractQueryStmtThe stored procedure returns query statement to be executed on the source by the Lookup activity GetExtractStmt
SourceGetExtractStmtThis activity executes the query statement to get the extract statement containing the actual SQL statement to be executed by the Copy Data activity ExecuteExtractStmt
Source
Sink
ExecuteExtractStmtExecutes the SQL statement that performs:
1) a full load if we are behind the minimum valid change tracking version or this is the very first time
2) a delta load extracting the changes since the last execution and now
Important: This Data Copy activity must be configured to use the isolation level Snapshot in order to guarantee that the change tracking code is 100% waterproof. See screenshot below.
SinkLoadDataExecute the stored procedure dbo.LoadData that merge the data from the LZ table into the DATA table
Data Copy activity isolation level:

CheckForCreateTableStmt activities

SourceGetCreateDataTableStmtThis activity executes the query statement to get the CREATE TABLE statement to be executed on the sink database by the Lookup activity ExecuteCreateDataTableStmt
SinkExecuteCreateDataTableStmtExecutes the CREATE TABLE statement

Tables

dbo.Tables

As said earlier this is the only table to be configured in order to use the ETL solution.

CREATE TABLE dbo.Tables(
	Id int IDENTITY NOT NULL,
	[Schema] nvarchar(128) NOT NULL,
	Object nvarchar(128) NOT NULL,
	CONSTRAINT PK_Tables PRIMARY KEY CLUSTERED(Id)
);

dbo.LoadStatus

This table contains the result for every execution. The most crucial data in the table is the TrackingVersion as that is used to verify if we are too late, lower version number than CHANGE_TRACKING_MIN_VALID_VERSION(), and else it used to capture the changes happened since the latest version number.

CREATE TABLE dbo.LoadStatus(
	Id int IDENTITY NOT NULL,
	[Schema] nvarchar(128) NOT NULL,
	Object nvarchar(128) NOT NULL,
	Rows int NOT NULL,
	DeltaRows int NOT NULL,
	TrackingVersion bigint NOT NULL,
	Operation nchar(1) NOT NULL,
	Duration int NOT NULL,
	TimestampUTC datetime2(3) CONSTRAINT DEF_LoadStatus DEFAULT(SYSUTCDATETIME()),
	CONSTRAINT PK_LoadStatus PRIMARY KEY CLUSTERED(Id)
);

Stored procedures

The stored procedures are listed in the sequence they are used. And as said earlier they rely solely on dynamic SQL in order to deliver the design goal – minimal configuration.

dbo.GetTables

A very simple stored procedure, but again, by keeping the logic in stored procedures, and not accessing tables directly from ADF the solution is much easier to maintain and change to future requirements.

CREATE OR ALTER PROC dbo.GetTables
AS
BEGIN
	SET NOCOUNT ON;

	SELECT t.[Schema], t.Object
	FROM dbo.Tables AS t
	ORDER BY t.[Schema], t.Object;
END;
GO

dbo.GetCreateTableStmt

The purpose of the stored procedure is to check if the DATA table exists for the source table <@source>.<@object>.

If the DATA table does not exist, the stored procedure returns a statement to be executed on the source database in order to get the CREATE TABLE statement in return.

The DATA table will be created with clustered primary key matching the primary key on the source table.

Regarding the data types the stored procedure only handles those used by AdventureWorks2017, and it also includes exception for the data types that AdventureWorks2017 uses but that ADF does not support. See the section – The devil lies in the details

CREATE OR ALTER PROC dbo.GetCreateTableStmt
	@schema nvarchar(128),
	@object nvarchar(128)
AS
BEGIN
	SET NOCOUNT ON;

	DECLARE @loadSchema nvarchar(128);
	DECLARE @stmt nvarchar(max);

	SET @loadSchema = 'DATA';

	SET @stmt = '';

	IF NOT EXISTS (
		SELECT *
		FROM sys.schemas AS sch
		INNER JOIN sys.objects AS o ON (o.schema_id = o.schema_id)
		WHERE (sch.name = @loadSchema) AND (o.name = @schema + '_' + @object)
	)
	BEGIN
		SET @stmt += '
			DECLARE @columnIsDescendingKey bit;
			DECLARE @columnIsNullable bit;
			DECLARE @columnMaxLength smallint;
			DECLARE @columnName nvarchar(128);
			DECLARE @columnPrecision tinyint;
			DECLARE @columnScale tinyint;
			DECLARE @columnType nvarchar(128);
			DECLARE @sinkObject nvarchar(128);
			DECLARE @sinkSchema nvarchar(128);
			DECLARE @sourceObject nvarchar(128);
			DECLARE @sourceSchema nvarchar(128);
			DECLARE @stmt nvarchar(max);
		';
		SET @stmt += '

			SET @sourceSchema = ''' + @schema + ''';
			SET @sourceObject = ''' + @object + ''';
			SET @sinkSchema = ''' + @loadSchema + ''';
			SET @sinkObject = @sourceSchema + ''_'' + @sourceObject;

			SET @stmt = ''CREATE TABLE '' + QUOTENAME(@sinkSchema) + ''.'' + QUOTENAME(@sinkObject) + ''('';

			DECLARE tCur CURSOR LOCAL READ_ONLY FAST_FORWARD FOR
			SELECT
				c.name AS ColumnName
				,COALESCE(nativeT.name, t.name) AS ColumnType
				,c.max_length AS ColumnMaxLength
				,c.precision AS ColumnPrecision
				,c.scale AS ColumnScale
				,c.is_nullable AS ColumnIsNullable
			FROM sys.schemas AS sch
			INNER JOIN sys.objects AS o ON (o.schema_id = sch.schema_id)
			INNER JOIN sys.columns AS c ON (c.object_id = o.object_id)
			INNER JOIN sys.types AS t ON (t.system_type_id = c.system_type_id) AND (t.user_type_id = c.user_type_id)
			LEFT OUTER JOIN sys.types AS nativeT on (nativeT.user_type_id = t.system_type_id)
			WHERE (sch.name = @sourceSchema) AND (o.name = @sourceObject) AND (o.type = ''U'')
			ORDER BY c.column_id;
		';
		SET @stmt += '

			OPEN tCur;

			WHILE (1 = 1)
			BEGIN
				FETCH NEXT FROM tCur
				INTO @columnName, @columnType, @columnMaxLength, @columnPrecision, @columnScale, @columnIsNullable;

				IF (@@FETCH_STATUS <> 0)
				BEGIN
					BREAK;
				END;

				SET @stmt += CHAR(13) + CHAR(10) + QUOTENAME(@columnName) + '' '' +
					CASE @columnType
						WHEN ''varbinary'' THEN @columnType + ''('' + COALESCE(CAST(NULLIF(@columnMaxLength, -1) AS nvarchar), ''max'') + '')''

						WHEN ''char'' THEN @columnType + ''('' + COALESCE(CAST(NULLIF(@columnMaxLength, -1) AS nvarchar), ''max'') + '')''
						WHEN ''varchar'' THEN @columnType + ''('' + COALESCE(CAST(NULLIF(@columnMaxLength, -1) AS nvarchar), ''max'') + '')''

						WHEN ''nchar'' THEN @columnType + ''('' + COALESCE(CAST(NULLIF(@columnMaxLength, -1) / 2 AS nvarchar), ''max'') + '')''
						WHEN ''nvarchar'' THEN @columnType + ''('' + COALESCE(CAST(NULLIF(@columnMaxLength, -1) / 2 AS nvarchar), ''max'') + '')''

						WHEN ''decimal'' THEN @columnType + ''('' + CAST(@columnPrecision AS nvarchar) + '','' + CAST(@columnScale AS nvarchar) + '')''
						WHEN ''numeric'' THEN @columnType + ''('' + CAST(@columnPrecision AS nvarchar) + '','' + CAST(@columnScale AS nvarchar) + '')''

						WHEN ''time'' THEN @columnType + ''('' + CAST(@columnScale AS nvarchar) + '')''

						-- SQL data types that can not be transferred directly using ADF
						WHEN ''geography'' THEN ''nvarchar(max)''
						WHEN ''hierarchyid'' THEN ''nvarchar(4000)''

						ELSE @columnType
					END +
					CASE @columnIsNullable WHEN 0 THEN '' NOT'' ELSE '''' END + '' NULL,'';
			END;

			CLOSE tCur;
			DEALLOCATE tCur;
		';
		SET @stmt += '

			SET @stmt += CHAR(13) + CHAR(10) + ''CONSTRAINT [PK_'' + @sinkSchema + ''_'' + @sinkObject + ''] PRIMARY KEY CLUSTERED('';

			DECLARE iCur CURSOR LOCAL READ_ONLY FAST_FORWARD FOR
			SELECT c.name, ic.is_descending_key
			FROM sys.schemas AS sch
			INNER JOIN sys.objects AS o ON (o.schema_id = sch.schema_id)
			INNER JOIN sys.indexes AS i ON (i.object_id = o.object_id)
			INNER JOIN sys.index_columns AS ic ON (ic.object_id = i.object_id) AND (ic.index_id = i.index_id)
			INNER JOIN sys.columns AS c ON (c.object_id = ic.object_id) AND (c.column_id = ic.column_id)
			WHERE (sch.name = @sourceSchema) AND (o.name = @sourceObject) AND (o.type = ''U'') AND (i.is_primary_key = 1)
			ORDER BY ic.key_ordinal;
		';
		SET @stmt += '

			OPEN iCur;

			WHILE (1 = 1)
			BEGIN
				FETCH NEXT FROM iCur
				INTO @columnName, @columnIsDescendingKey;

				IF (@@FETCH_STATUS <> 0)
				BEGIN
					BREAK;
				END;

				SET @stmt += CHAR(13) + CHAR(10) + QUOTENAME(@columnName) + '' '' + CASE @columnIsDescendingKey WHEN 0 THEN ''ASC'' ELSE ''DESC'' END + '','';
			END;

			CLOSE iCur;
			DEALLOCATE iCur;
		';
		SET @stmt += '

			SET @stmt = LEFT(@stmt, LEN(@stmt) - 1);
			SET @stmt += CHAR(13) + CHAR(10) + '')'';
			SET @stmt += CHAR(13) + CHAR(10) + '');'';

			-- Needed as a Lookup activity must return something
			SET @stmt += CHAR(13) + CHAR(10) + ''SELECT 1 AS Dummy;'';

			SELECT @stmt AS Statement;
		';
	END;

	SELECT @stmt AS Statement;
END;
GO

dbo.GetExtractStmt

This stored procedure generates the extract query statement to be executed on the source database and the returned statement will then be used by the Copy Data activity.

Regarding the data types the stored procedure only handles those used by AdventureWorks2017, and it also includes exception for the data types that AdventureWorks2017 uses but that ADF does not support. See the section – The devil lies in the details

CREATE OR ALTER PROC dbo.GetExtractStmt
	@schema nvarchar(128),
	@object nvarchar(128),
	@dropSinkTable int
AS
BEGIN
	SET NOCOUNT ON;

	DECLARE @sinkSchema nvarchar(128);
	DECLARE @sinkTable nvarchar(128);
	DECLARE @stmt nvarchar(max);
	DECLARE @trackingVersion bigint;

	SET @sinkSchema = 'LZ';
	SET @sinkTable = @schema + '_' + @object;

	SET @trackingVersion = COALESCE((
		SELECT TOP(1) ls.TrackingVersion
		FROM dbo.LoadStatus AS ls
		WHERE (ls.[Schema] = @schema) AND (ls.Object = @object)
		ORDER BY ls.Id DESC
	), -1);

	IF (@dropSinkTable = 1)
	BEGIN
		IF OBJECT_ID(QUOTENAME(@sinkSchema) + '.' + QUOTENAME(@sinkTable)) IS NOT NULL
		BEGIN
			SET @stmt = 'DROP TABLE ' + QUOTENAME(@sinkSchema) + '.' + QUOTENAME(@sinkTable) + ';';
			EXEC (@stmt);
		END;
	END;

	SET @stmt = '
		DECLARE @columnDataType nvarchar(128);
		DECLARE @columnIsInPrimaryKey bit;
		DECLARE @columnMaxLength smallint;
		DECLARE @columnName nvarchar(128);
		DECLARE @columnPrecision tinyint;
		DECLARE @columnScale tinyint;
		DECLARE @columnType nvarchar(128);
		DECLARE @internalColumns nvarchar(max);
		DECLARE @joinConditions nvarchar(max);
		DECLARE @resultStmt nvarchar(max);
		DECLARE @srcColumnsCT nvarchar(max);
		DECLARE @srcColumnsFull nvarchar(max);
		DECLARE @sourceObject nvarchar(128);
		DECLARE @sourceSchema nvarchar(128);
		DECLARE @trackingVersion bigint;

		SET @sourceSchema = ''' + @schema + ''';
		SET @sourceObject = ''' + @object + ''';
		SET @trackingVersion = ' + CAST(@trackingVersion AS nvarchar) + ';
		';
		SET @stmt += '

		-- Create column blocks
		BEGIN
			DECLARE iCur CURSOR LOCAL READ_ONLY FAST_FORWARD FOR
			SELECT
				c.name AS ColumnName
				,COALESCE(nativeT.name, t.name) AS ColumnType
				,c.max_length AS ColumnMaxLength
				,c.precision AS ColumnPrecision
				,c.scale AS ColumnScale
				,COALESCE((
					SELECT 1
					FROM sys.indexes AS pi
					INNER JOIN sys.index_columns AS pic ON (pic.object_id = pi.object_id) AND (pic.index_id = pi.index_id)
					INNER JOIN sys.columns AS pc ON (pc.object_id = pic.object_id) AND (pc.column_id = pic.column_id)
					WHERE (pi.object_id = o.object_id) AND (pi.is_primary_key = 1) AND (pc.name = c.name)
				), 0) AS ColumnIsInPrimaryKey
			FROM sys.schemas AS sch
			INNER JOIN sys.objects AS o ON (o.schema_id = sch.schema_id)
			INNER JOIN sys.columns AS c ON (c.object_id = o.object_id)
			INNER JOIN sys.types AS t ON (t.system_type_id = c.system_type_id) AND (t.user_type_id = c.user_type_id)
			LEFT OUTER JOIN sys.types AS nativeT on (nativeT.user_type_id = t.system_type_id)
			WHERE (sch.name = @sourceSchema) AND (o.name = @sourceObject) AND (o.type = ''U'')
			ORDER BY ColumnIsInPrimaryKey DESC, c.column_id;

			SET @srcColumnsCT = '''';
			SET @srcColumnsFull = '''';
			SET @internalColumns = '''';
			SET @joinConditions = '''';
		';
		SET @stmt += '

			OPEN iCur;

			WHILE (1 = 1)
			BEGIN
				FETCH NEXT FROM iCur
				INTO @columnName, @columnType, @columnMaxLength, @columnPrecision, @columnScale, @columnIsInPrimaryKey;

				IF (@@FETCH_STATUS <> 0)
				BEGIN
					BREAK;
				END;

				SET @columnDataType =
					CASE @columnType
						WHEN ''varbinary'' THEN @columnType + ''('' + COALESCE(CAST(NULLIF(@columnMaxLength, -1) AS nvarchar), ''max'') + '')''

						WHEN ''char'' THEN @columnType + ''('' + COALESCE(CAST(NULLIF(@columnMaxLength, -1) AS nvarchar), ''max'') + '')''
						WHEN ''varchar'' THEN @columnType + ''('' + COALESCE(CAST(NULLIF(@columnMaxLength, -1) AS nvarchar), ''max'') + '')''

						WHEN ''nchar'' THEN @columnType + ''('' + COALESCE(CAST(NULLIF(@columnMaxLength, -1) / 2 AS nvarchar), ''max'') + '')''
						WHEN ''nvarchar'' THEN @columnType + ''('' + COALESCE(CAST(NULLIF(@columnMaxLength, -1) / 2 AS nvarchar), ''max'') + '')''

						WHEN ''decimal'' THEN @columnType + ''('' + CAST(@columnPrecision AS nvarchar) + '','' + CAST(@columnScale AS nvarchar) + '')''
						WHEN ''numeric'' THEN @columnType + ''('' + CAST(@columnPrecision AS nvarchar) + '','' + CAST(@columnScale AS nvarchar) + '')''

						WHEN ''time'' THEN @columnType + ''('' + CAST(@columnScale AS nvarchar) + '')''

						-- SQL data types that can not be transferred directly using ADF
						WHEN ''geography'' THEN ''nvarchar(max)''
						WHEN ''hierarchyid'' THEN ''nvarchar(4000)''

						ELSE @columnType
					END;

				IF (@columnType IN (''geography'', ''hierarchyid'', ''xml''))
				BEGIN
					SET @srcColumnsCT += '', CAST('' + CASE @columnIsInPrimaryKey WHEN 1 THEN ''ct'' ELSE ''t'' END + ''.'' + QUOTENAME(@columnName) + '' AS '' + @columnDataType + '') AS '' + QUOTENAME(@columnName);
					SET @srcColumnsFull += '', CAST(t.'' + QUOTENAME(@columnName) + '' AS '' + @columnDataType + '') AS '' + QUOTENAME(@columnName);
				END
				ELSE BEGIN
					SET @srcColumnsCT += '', '' + CASE @columnIsInPrimaryKey WHEN 1 THEN ''ct'' ELSE ''t'' END + ''.'' + QUOTENAME(@columnName);
					SET @srcColumnsFull += '', t.'' + QUOTENAME(@columnName);
				END;

				IF (@columnIsInPrimaryKey = 1)
				BEGIN
					IF (@joinConditions <> '''')
					BEGIN
						SET @joinConditions += '' AND '';
					END;

					SET @joinConditions += ''(ct.'' + QUOTENAME(@columnName) + '' = t.'' + QUOTENAME(@columnName) + '')'';
				END;

				SET @internalColumns += '', CAST(NULL AS '' + @columnDataType + '') AS '' + QUOTENAME(@columnName);
			END;

			CLOSE iCur;
			DEALLOCATE iCur;
		';
		SET @stmt += '
		END;

		SET @resultStmt = ''
		DECLARE @currentTrackingVersion bigint;
		DECLARE @trackingVersion bigint;

		SET @trackingVersion = '' + CAST(@trackingVersion AS nvarchar) + '';

		-- Obtain the version to use next time.
		SET @currentTrackingVersion = CHANGE_TRACKING_CURRENT_VERSION();

		-- Verify that version of the previous synchronization is valid.
		IF (@trackingVersion < CHANGE_TRACKING_MIN_VALID_VERSION(OBJECT_ID(QUOTENAME('''''' + @sourceSchema + '''''') + ''''.'''' + QUOTENAME('''''' + @sourceObject + ''''''))))
		BEGIN
			-- Handle invalid version and do not enumerate changes.
			-- Client must be reinitialized.

			SELECT CAST(0 AS bit) _CTInternal_, @currentTrackingVersion AS _CTTrackingVersion_, CAST(''''F'''' AS nchar(1)) AS _CTOperation_'' + @srcColumnsFull + ''
			FROM '' + QUOTENAME(@sourceSchema) + ''.'' + QUOTENAME(@sourceObject) + '' AS t
			UNION ALL
			SELECT CAST(1 AS bit) AS _CTInternal_, @currentTrackingVersion AS _CTTrackingVersion_, CAST(''''F'''' AS nchar(1)) AS _CTOperation_'' + @internalColumns + ''
		END
		ELSE BEGIN
			-- Obtain changes.

			SELECT CAST(0 AS bit) AS _CTInternal_, @currentTrackingVersion AS _CTTrackingVersion_, ct.SYS_CHANGE_OPERATION AS _CTOperation_'' + @srcColumnsCT + ''
			FROM '' + QUOTENAME(@sourceSchema) + ''.'' + QUOTENAME(@sourceObject) + '' AS t
			RIGHT OUTER JOIN CHANGETABLE(CHANGES '' + QUOTENAME(@sourceSchema) + ''.'' + QUOTENAME(@sourceObject) + '', @trackingVersion) AS ct ON '' + @joinConditions + ''
			UNION ALL
			SELECT CAST(1 AS bit) AS _CTInternal_, @currentTrackingVersion AS _CTTrackingVersion_, CAST(''''-'''' AS nchar(1)) AS _CTOperation_'' + @internalColumns + ''
		END;
		'';

		SELECT @resultStmt AS Statement;
	';

	SELECT
		@stmt AS Statement,
		@sinkSchema AS SinkSchema,
		@sinkTable AS SinkTable;
END;
GO

dbo.LoadData

This is the stored procedure responsible for merging data from the LZ tables into the DATA tables and afterwards register the result into dbo.LoadStatus

CREATE OR ALTER PROC dbo.LoadData
	@schema nvarchar(128),
	@object nvarchar(128),
	@duration int
AS
BEGIN
	SET NOCOUNT ON;

	DECLARE @columnIsInPrimaryKey bit;
	DECLARE @columnName nvarchar(128);
	DECLARE @fields nvarchar(max);
	DECLARE @joinConditions nvarchar(max);
	DECLARE @loadSchema nvarchar(128);
	DECLARE @sinkSchema nvarchar(128);
	DECLARE @srcFields nvarchar(max);
	DECLARE @stmt nvarchar(max);
	DECLARE @table nvarchar(128);
	DECLARE @updateStmt nvarchar(max);

	SET @sinkSchema = 'LZ';
	SET @loadSchema = 'DATA';
	SET @table = @schema + '_' + @object;

	DECLARE cCur CURSOR LOCAL READ_ONLY FAST_FORWARD FOR
	SELECT
		c.name AS ColumnName
		,COALESCE((
			SELECT 1
			FROM sys.indexes AS pi
			INNER JOIN sys.index_columns AS pic ON (pic.object_id = pi.object_id) AND (pic.index_id = pi.index_id)
			INNER JOIN sys.columns AS pc ON (pc.object_id = pic.object_id) AND (pc.column_id = pic.column_id)
			WHERE (pi.object_id = o.object_id) AND (pi.is_primary_key = 1) AND (pc.name = c.name)
		), 0) AS ColumnIsInPrimaryKey
	FROM sys.schemas AS sch
	INNER JOIN sys.objects AS o ON (o.schema_id = sch.schema_id)
	INNER JOIN sys.columns AS c ON (c.object_id = o.object_id)
	WHERE (sch.name = @loadSchema) AND (o.name = @table) AND (o.type = 'U')
	ORDER BY c.column_id;

	SET @fields = '';
	SET @srcFields = '';
	SET @joinConditions = '';
	SET @updateStmt = '';

	OPEN cCur;

	WHILE (1 = 1)
	BEGIN
		FETCH NEXT FROM cCur
		INTO @columnName, @columnIsInPrimaryKey;

		IF (@@FETCH_STATUS <> 0)
		BEGIN
			BREAK;
		END;

		IF (@srcFields <> '')
		BEGIN
			SET @srcFields += ', ';
		END;
		SET @srcFields += 'src.' + QUOTENAME(@columnName)

		IF (@fields <> '')
		BEGIN
			SET @fields += ', ';
		END;
		SET @fields += QUOTENAME(@columnName)

		IF (@columnIsInPrimaryKey = 1)
		BEGIN
			IF (@joinConditions <> '')
			BEGIN
				SET @joinConditions += ' AND ';
			END;
			SET @joinConditions += '(src.' + QUOTENAME(@columnName) + ' = tgt.' + QUOTENAME(@columnName) + ')';
		END
		ELSE BEGIN
			IF (@updateStmt <> '')
			BEGIN
				SET @updateStmt += ', ';
			END;
			SET @updateStmt += QUOTENAME(@columnName) + ' = src.' + QUOTENAME(@columnName);
		END;
	END;

	CLOSE cCur;
	DEALLOCATE cCur;

	SET @stmt = '
		DECLARE @deltaRows int;
		DECLARE @operation nchar(1);
		DECLARE @rows int;
		DECLARE @trackingVersion bigint;

		IF EXISTS (SELECT * FROM ' + QUOTENAME(@sinkSchema) + '.' + QUOTENAME(@table) + ' AS src WHERE (src._CTOperation_ = ''F''))
		BEGIN
			TRUNCATE TABLE ' + QUOTENAME(@loadSchema) + '.' + QUOTENAME(@table) + ';

			INSERT INTO ' + QUOTENAME(@loadSchema) + '.' + QUOTENAME(@table) + '(' + @fields + ')
			SELECT ' + @srcFields + ' FROM ' + QUOTENAME(@sinkSchema) + '.' + QUOTENAME(@table) + ' AS src
			WHERE (src._CTInternal_ = 0);
		END
		ELSE BEGIN
			MERGE ' + QUOTENAME(@loadSchema) + '.' + QUOTENAME(@table) + ' AS tgt
			USING ' + QUOTENAME(@sinkSchema) + '.' + QUOTENAME(@table) + ' AS src
			ON ' + @joinConditions + '
			WHEN MATCHED AND (src._CTOperation_ = ''U'') THEN
				UPDATE SET ' + @updateStmt + '
			WHEN NOT MATCHED AND (src._CTOperation_ = ''I'') THEN
				INSERT(' + @fields + ')
				VALUES(' + @srcFields + ');

			DELETE tgt
			FROM ' + QUOTENAME(@loadSchema) + '.' + QUOTENAME(@table) + ' AS tgt
			INNER JOIN ' + QUOTENAME(@sinkSchema) + '.' + QUOTENAME(@table) + ' AS src ON ' + @joinConditions + '
			WHERE (src._CTOperation_ = ''D'');
		END;

		SET @rows = (SELECT COUNT(*) FROM ' + QUOTENAME(@loadSchema) + '.' + QUOTENAME(@table) + ');
		SET @deltaRows = (SELECT COUNT(*) FROM ' + QUOTENAME(@sinkSchema) + '.' + QUOTENAME(@table) + ' AS src WHERE (src._CTInternal_ = 0));
		SET @operation = (SELECT src._CTOperation_ FROM ' + QUOTENAME(@sinkSchema) + '.' + QUOTENAME(@table) + ' AS src WHERE (src._CTInternal_ = 1));
		SET @trackingVersion = (SELECT src._CTTrackingVersion_ FROM ' + QUOTENAME(@sinkSchema) + '.' + QUOTENAME(@table) + ' AS src WHERE (src._CTInternal_ = 1));

		INSERT INTO dbo.LoadStatus([Schema], Object, Rows, DeltaRows, TrackingVersion, Operation, Duration)
		SELECT @schema, @object, @rows, @deltaRows, @trackingVersion, @operation, @duration;
	';
	EXEC sp_executesql
		@stmt
		,N'@schema nvarchar(128), @object nvarchar(128), @duration int'
		,@schema = @schema
		,@object = @object
		,@duration = @duration;
END;
GO

Transformation – the T in ETL

The solution does not perform any transformation, but it will be easy to implement and because all the logic is kept in the stored procedures the ADF does not need to be changed in order to implement such.

One way to implement transformation could be creating a table e.g. dbo.Columns, which then contains the transformation expression to be executed for a particular column.

Beside the table it would then be the stored procedure dbo.LoadData that should be changed in order to use the transformation expressions in dbo.Columns.

dbo.Columns could also be used to disable/enable which columns that should be loaded. Currently the solution loads everything and depending on the business needs, that could be way too much. If dbo.Columns was used in such a way then all 3 stored procedures, dbo.GetCreateTableStmt, dbo.GetExtractStmt and dbo.LoadData, must be changed.

The devil lies in the details

When I started on this task, I was expecting challenges but as usually they appeared somewhat differently as expected.

As written earlier there are data type issues.

AdventureWorks2017 uses 3 data types that the ADF Copy Data activity cannot handle natively:

  • geography
  • hierarchyid
  • xml using a schema collection

These data types had to be cast to nvarchar().

And as also mentioned only the data types that AdventureWorks2017 uses has been implemented. But it is easily to add other if needed by updating the stored procedures dbo.GetCreateTableStmt and dbo.GetExtractStmt.

ADF pipeline – JSON

{
  "$schema": "http://schema.management.azure.com/schemas/2015-01-01/deploymentTemplate.json#",
  "contentVersion": "1.0.0.0",
  "parameters": {
    "factoryName": {
      "type": "string",
      "metadata": "Data Factory name"
    },
    "ADFETLSink": {
      "type": "string"
    },
    "AdventureWorks2017onSQL2019": {
      "type": "string"
    }
  },
  "variables": {
    "factoryId": "[concat('Microsoft.DataFactory/factories/', parameters('factoryName'))]"
  },
  "resources": [
    {
      "name": "[concat(parameters('factoryName'), '/ADFETL')]",
      "type": "Microsoft.DataFactory/factories/pipelines",
      "apiVersion": "2018-06-01",
      "properties": {
        "activities": [
          {
            "name": "GetListOfTables",
            "type": "Lookup",
            "dependsOn": [
              
            ],
            "policy": {
              "timeout": "7.00:00:00",
              "retry": 0,
              "retryIntervalInSeconds": 30,
              "secureOutput": false,
              "secureInput": false
            },
            "userProperties": [
              
            ],
            "typeProperties": {
              "source": {
                "type": "AzureSqlSource",
                "sqlReaderStoredProcedureName": "dbo.GetTables",
                "queryTimeout": "02:00:00"
              },
              "dataset": {
                "referenceName": "ADFETLSink",
                "type": "DatasetReference",
                "parameters": {
                  "Schema": "Dummy",
                  "Table": "Dummy"
                }
              },
              "firstRowOnly": false
            }
          },
          {
            "name": "IterateOverTables",
            "type": "ForEach",
            "dependsOn": [
              {
                "activity": "GetListOfTables",
                "dependencyConditions": [
                  "Succeeded"
                ]
              }
            ],
            "userProperties": [
              
            ],
            "typeProperties": {
              "items": {
                "value": "@activity('GetListOfTables').output.value",
                "type": "Expression"
              },
              "isSequential": true,
              "activities": [
                {
                  "name": "CheckForCreateTableStmt",
                  "description": "If returned statement is an empty string (True) we don't do anything.\nIf it is not an empty string it should be executed against the source",
                  "type": "IfCondition",
                  "dependsOn": [
                    {
                      "activity": "GetCreateTableQueryStmt",
                      "dependencyConditions": [
                        "Succeeded"
                      ]
                    }
                  ],
                  "userProperties": [
                    
                  ],
                  "typeProperties": {
                    "expression": {
                      "value": "@equals(activity('GetCreateTableQueryStmt').output.firstRow.Statement, '')",
                      "type": "Expression"
                    },
                    "ifFalseActivities": [
                      {
                        "name": "GetCreateDataTableStmt",
                        "type": "Lookup",
                        "dependsOn": [
                          
                        ],
                        "policy": {
                          "timeout": "7.00:00:00",
                          "retry": 0,
                          "retryIntervalInSeconds": 30,
                          "secureOutput": false,
                          "secureInput": false
                        },
                        "userProperties": [
                          
                        ],
                        "typeProperties": {
                          "source": {
                            "type": "SqlServerSource",
                            "sqlReaderQuery": {
                              "value": "@{activity('GetCreateTableQueryStmt').output.firstRow.Statement}",
                              "type": "Expression"
                            },
                            "queryTimeout": "02:00:00"
                          },
                          "dataset": {
                            "referenceName": "ADFETLSource",
                            "type": "DatasetReference",
                            "parameters": {
                              
                            }
                          }
                        }
                      },
                      {
                        "name": "ExecuteCreateDataTableStmt",
                        "type": "Lookup",
                        "dependsOn": [
                          {
                            "activity": "GetCreateDataTableStmt",
                            "dependencyConditions": [
                              "Succeeded"
                            ]
                          }
                        ],
                        "policy": {
                          "timeout": "7.00:00:00",
                          "retry": 0,
                          "retryIntervalInSeconds": 30,
                          "secureOutput": false,
                          "secureInput": false
                        },
                        "userProperties": [
                          
                        ],
                        "typeProperties": {
                          "source": {
                            "type": "AzureSqlSource",
                            "sqlReaderQuery": {
                              "value": "@{activity('GetCreateDataTableStmt').output.firstRow.Statement}",
                              "type": "Expression"
                            },
                            "queryTimeout": "02:00:00"
                          },
                          "dataset": {
                            "referenceName": "ADFETLSink",
                            "type": "DatasetReference",
                            "parameters": {
                              "Schema": "Dummy",
                              "Table": "Dummy"
                            }
                          }
                        }
                      }
                    ]
                  }
                },
                {
                  "name": "GetCreateTableQueryStmt",
                  "type": "Lookup",
                  "dependsOn": [
                    
                  ],
                  "policy": {
                    "timeout": "7.00:00:00",
                    "retry": 0,
                    "retryIntervalInSeconds": 30,
                    "secureOutput": false,
                    "secureInput": false
                  },
                  "userProperties": [
                    
                  ],
                  "typeProperties": {
                    "source": {
                      "type": "AzureSqlSource",
                      "sqlReaderStoredProcedureName": "[[dbo].[GetCreateTableStmt]",
                      "storedProcedureParameters": {
                        "object": {
                          "type": "String",
                          "value": {
                            "value": "@item().Object",
                            "type": "Expression"
                          }
                        },
                        "schema": {
                          "type": "String",
                          "value": {
                            "value": "@item().Schema",
                            "type": "Expression"
                          }
                        }
                      },
                      "queryTimeout": "02:00:00"
                    },
                    "dataset": {
                      "referenceName": "ADFETLSink",
                      "type": "DatasetReference",
                      "parameters": {
                        "Schema": "Dummy",
                        "Table": "Dummy"
                      }
                    }
                  }
                },
                {
                  "name": "GetExtractQueryStmt",
                  "type": "Lookup",
                  "dependsOn": [
                    {
                      "activity": "CheckForCreateTableStmt",
                      "dependencyConditions": [
                        "Succeeded"
                      ]
                    }
                  ],
                  "policy": {
                    "timeout": "7.00:00:00",
                    "retry": 0,
                    "retryIntervalInSeconds": 30,
                    "secureOutput": false,
                    "secureInput": false
                  },
                  "userProperties": [
                    
                  ],
                  "typeProperties": {
                    "source": {
                      "type": "AzureSqlSource",
                      "sqlReaderStoredProcedureName": "[[dbo].[GetExtractStmt]",
                      "storedProcedureParameters": {
                        "dropSinkTable": {
                          "type": "Int32",
                          "value": "1"
                        },
                        "object": {
                          "type": "String",
                          "value": "@item().Object"
                        },
                        "schema": {
                          "type": "String",
                          "value": "@item().Schema"
                        }
                      },
                      "queryTimeout": "02:00:00"
                    },
                    "dataset": {
                      "referenceName": "ADFETLSink",
                      "type": "DatasetReference",
                      "parameters": {
                        "Schema": "Dummy",
                        "Table": "Dummy"
                      }
                    }
                  }
                },
                {
                  "name": "ExecuteExtractStmt",
                  "type": "Copy",
                  "dependsOn": [
                    {
                      "activity": "GetExtractStmt",
                      "dependencyConditions": [
                        "Succeeded"
                      ]
                    }
                  ],
                  "policy": {
                    "timeout": "7.00:00:00",
                    "retry": 0,
                    "retryIntervalInSeconds": 30,
                    "secureOutput": false,
                    "secureInput": false
                  },
                  "userProperties": [
                    
                  ],
                  "typeProperties": {
                    "source": {
                      "type": "SqlServerSource",
                      "sqlReaderQuery": {
                        "value": "@{activity('GetExtractStmt').output.firstRow.Statement}",
                        "type": "Expression"
                      },
                      "queryTimeout": "02:00:00",
                      "isolationLevel": "Snapshot"
                    },
                    "sink": {
                      "type": "AzureSqlSink",
                      "tableOption": "autoCreate",
                      "disableMetricsCollection": false
                    },
                    "enableStaging": false,
                    "enableSkipIncompatibleRow": false
                  },
                  "inputs": [
                    {
                      "referenceName": "ADFETLSource",
                      "type": "DatasetReference",
                      "parameters": {
                        
                      }
                    }
                  ],
                  "outputs": [
                    {
                      "referenceName": "ADFETLSink",
                      "type": "DatasetReference",
                      "parameters": {
                        "Schema": "@{activity('GetExtractQueryStmt').output.firstRow.SinkSchema}",
                        "Table": "@{activity('GetExtractQueryStmt').output.firstRow.SinkTable}"
                      }
                    }
                  ]
                },
                {
                  "name": "GetExtractStmt",
                  "type": "Lookup",
                  "dependsOn": [
                    {
                      "activity": "GetExtractQueryStmt",
                      "dependencyConditions": [
                        "Succeeded"
                      ]
                    }
                  ],
                  "policy": {
                    "timeout": "7.00:00:00",
                    "retry": 0,
                    "retryIntervalInSeconds": 30,
                    "secureOutput": false,
                    "secureInput": false
                  },
                  "userProperties": [
                    
                  ],
                  "typeProperties": {
                    "source": {
                      "type": "SqlServerSource",
                      "sqlReaderQuery": {
                        "value": "@{activity('GetExtractQueryStmt').output.firstRow.Statement}",
                        "type": "Expression"
                      },
                      "queryTimeout": "02:00:00"
                    },
                    "dataset": {
                      "referenceName": "ADFETLSource",
                      "type": "DatasetReference",
                      "parameters": {
                        
                      }
                    }
                  }
                },
                {
                  "name": "LoadData",
                  "type": "SqlServerStoredProcedure",
                  "dependsOn": [
                    {
                      "activity": "ExecuteExtractStmt",
                      "dependencyConditions": [
                        "Succeeded"
                      ]
                    }
                  ],
                  "policy": {
                    "timeout": "7.00:00:00",
                    "retry": 0,
                    "retryIntervalInSeconds": 30,
                    "secureOutput": false,
                    "secureInput": false
                  },
                  "userProperties": [
                    
                  ],
                  "typeProperties": {
                    "storedProcedureName": "[[dbo].[LoadData]",
                    "storedProcedureParameters": {
                      "duration": {
                        "value": "@{activity('ExecuteExtractStmt').output.copyDuration}",
                        "type": "Int32"
                      },
                      "object": {
                        "value": "@item().Object",
                        "type": "String"
                      },
                      "schema": {
                        "value": "@item().Schema",
                        "type": "String"
                      }
                    }
                  },
                  "linkedServiceName": {
                    "referenceName": "[parameters('ADFETLSink')]",
                    "type": "LinkedServiceReference"
                  }
                }
              ]
            }
          }
        ],
        "annotations": [
          
        ]
      },
      "dependsOn": [
        "[concat(variables('factoryId'), '/datasets/ADFETLSink')]",
        "[concat(variables('factoryId'), '/datasets/ADFETLSource')]"
      ]
    },
    {
      "name": "[concat(parameters('factoryName'), '/ADFETLSink')]",
      "type": "Microsoft.DataFactory/factories/datasets",
      "apiVersion": "2018-06-01",
      "properties": {
        "linkedServiceName": {
          "referenceName": "[parameters('ADFETLSink')]",
          "type": "LinkedServiceReference"
        },
        "parameters": {
          "Schema": {
            "type": "string",
            "defaultValue": "Dummy"
          },
          "Table": {
            "type": "string",
            "defaultValue": "Dummy"
          }
        },
        "annotations": [
          
        ],
        "type": "AzureSqlTable",
        "schema": [
          
        ],
        "typeProperties": {
          "schema": {
            "value": "@dataset().Schema",
            "type": "Expression"
          },
          "table": {
            "value": "@dataset().Table",
            "type": "Expression"
          }
        }
      },
      "dependsOn": [
        
      ]
    },
    {
      "name": "[concat(parameters('factoryName'), '/ADFETLSource')]",
      "type": "Microsoft.DataFactory/factories/datasets",
      "apiVersion": "2018-06-01",
      "properties": {
        "linkedServiceName": {
          "referenceName": "[parameters('AdventureWorks2017onSQL2019')]",
          "type": "LinkedServiceReference"
        },
        "annotations": [
          
        ],
        "type": "SqlServerTable",
        "schema": [
          
        ],
        "typeProperties": {
          
        }
      },
      "dependsOn": [
        
      ]
    }
  ]
}

References

This Microsoft article explains in details how to use SQL Server Change Tracking and how to do it safely – https://docs.microsoft.com/en-us/sql/relational-databases/track-changes/work-with-change-tracking-sql-server?view=sql-server-2017

This article describes the ADF Copy Data activity – https://docs.microsoft.com/en-us/azure/data-factory/connector-sql-server