You’ve heard about Azure Data Lake and Azure Data Warehouse, now called Azure Synapse Analytics. You’ve also heard about Azure Data Factory and Azure Data Bricks. You might even have heard about Python, Spark, and Azure Machine Learning. In this fast-paced, all-demo session, we will walk through the process of ingesting data into the Data Lake, analyzing it with Spark and Machine Learning, outputting it to the Data Warehouse and reporting on it in Power BI. You will walk away with working code and an overall understanding of how all these tools can help you develop advanced analytics solutions in the modern data landscape.
Files:
The sample data file created in Exercise 1 and used in the remaining Exercises:
I’ve had to solve an interesting problem this week. I started the week with an SSIS package that ran in 2 minutes. It extracts a million transaction records from DB2 to a file, uploads the file to Azure Blob Storage and BULK IMPORT’s the file into an Azure SQL Database staging table. All in 2 minutes. This was acceptable performance and faster than any of the other options we were considering.
Each of the DB2 records represents an Insert, Update or Delete to the base table in DB2. I get the Transaction Type (M – Merge (Insert/Update) or D – Delete), and a time stamp in addition to the columns from the row.
So a typical set of rows might look like this:
Id
(Identity)
Field1
Field2
Field3
Field4
TrxType
RowCreatedTimeStamp
1
13940001
1
24785779
41
M
11:37:31.650
2
13940001
2
24785779
20
M
11:37:32.070
3
13940001
3
24785779
26
M
11:37:32.185
4
13940001
2
24785779
21
M
11:37:32.205
5
13940001
3
24785779
4
M
11:37:32.265
6
13940001
2
24785779
29
D
11:37:32.391
7
13940001
2
24785779
18
M
11:37:33.392
In the example above, the rows are all for the same document (See Field1: value 13940001). Rows with Field2= 1, 2, 3 were added. Then row 2 and 3 were changed (Id 4, 5). Then row 2 was deleted (Id 6) and a new row 2 was inserted (Id 7).
Here is the definition of the source table in the Azure SQL Database:
CREATE TABLE ETLSource(
Field1 [numeric](12, 0) NULL,
Field2 [smallint] NULL,
Field3 [int] NULL,
Field4 [smallint] NULL,
Field5 [nvarchar](1) NULL,
[UpdateType] [nchar](1) NOT NULL,
[RowCreated] [datetime2](7) NOT NULL,
[Id] BIGINT IDENTITY NOT NULL
) WITH (DATA_COMPRESSION = PAGE)
GO
CREATE UNIQUE CLUSTERED INDEX ETLSourcePK ON ETLSource (Id) WITH (DATA_COMPRESSION = PAGE);
CREATE UNIQUE NONCLUSTERED INDEX ETLSourceIdx1 ON ETLSource (UpdateType, RowCreated, Field1, Field2) WITH (DATA_COMPRESSION = PAGE);
CREATE UNIQUE NONCLUSTERED INDEX ETLSourceIdx2 ON ETLSource (Field1, Field2, UpdateType, RowCreated) WITH (DATA_COMPRESSION = PAGE);
GO
And here is the definition of the target table in the Azure SQL Database:
CREATE TABLE ETLTarget(
Field1 [numeric](12, 0) NULL,
Field2 [smallint] NULL,
Field3 [int] NULL,
Field4 [smallint] NULL,
Field5 [nvarchar](1) NULL,
[BatchDate] [datetime2](7) NULL
) WITH (DATA_COMPRESSION = PAGE)
GO
CREATE CLUSTERED INDEX ETLTargetPK ON ETLTarget (Field1, Field2) WITH (DATA_COMPRESSION = PAGE);
GO
At first, I tried a cursor. I know how to write them and it was easy enough to create a cursor that looped through the rows and used either a DELETE statement or a MERGE statement to deal with each one. Here’s what that looked like:
DECLARE @BatchDate DATETIME2(7) = SYSUTCDATETIME();
DECLARE @Field1 NUMERIC(12, 0)
DECLARE @Field2 SMALLINT
DECLARE @Field3 INT
DECLARE @Field4 SMALLINT
DECLARE @Field5 NVARCHAR(1)
DECLARE @UpdateType CHAR(1)
DECLARE @RowCreated DATETIME2(7)
DECLARE cur CURSOR LOCAL FAST_FORWARD FOR SELECT
Field1
,Field2
,Field3
,Field4
,Field5
,UpdateType
,RowCreated
FROM ETLSource
ORDER BY id
OPEN cur
FETCH NEXT FROM cur INTO
@Field1
, @Field2
, @Field3
, @Field4
, @Field5
, @UpdateType
, @RowCreated
WHILE @@fetch_status = 0
BEGIN
IF @UpdateType = 'D'
BEGIN
DELETE FROM dbo.ETLTarget
WHERE Field1 = @Field1
AND Field2 = @Field2;
END
IF @UpdateType = 'M'
BEGIN
--Merge the changes that are left
MERGE ETLTarget AS target
USING (
VALUES(
@Field1
, @Field2
, @Field3
, @Field4
, @Field5
)
) AS source (
Field1
, Field2
, Field3
, Field4
, Field5 )
ON (target.Field1 = source.Field1
AND target.Field2 = source.Field2)
WHEN MATCHED
THEN UPDATE
SET target.Field3 = source.Field3
,target.Field4 = source.Field4
,target.Field5 = source.Field5
,target.BatchDate = @BatchDate
WHEN NOT MATCHED BY target
THEN INSERT (
Field1
, Field2
, Field3
, Field4
, Field5
, BatchDate)
VALUES (@Field1
, @Field2
, @Field3
, @Field4
, @Field5
, @BatchDate);
END;
FETCH NEXT FROM cur INTO
@Field1
, @Field2
, @Field3
, @Field4
, @Field5
, @UpdateType
, @RowCreated
END
CLOSE cur
DEALLOCATE cur
Unfortunately, this solution was TERRIBLY slow. Cursors are notorious for being slow. This one worked fine for 1,000 transaction rows, but, after running for an hour and only processing a small portion of the million rows, I killed it and went looking for a set-based alternative.
Next, I tried a set-based MERGE statement. This was problematic because it kept complaining that multiple source records were trying to change the same target record. This complaint made sense when I realized that a row might be inserted and updated in the same day so it would have two source transactions. So I needed to get rid of the extras. It turns out that I really only care about the latest change. If it’s an insert or update, MERGE will insert or update the target row appropriately, if it’s a delete, MERGE can handle that too. But, how to select only the most recent row for each key? The standard de-duplication CTE query served as a model. Here is the final statement that worked:
WITH sourceRows AS (
SELECT *, RN = ROW_NUMBER() OVER (PARTITION BY
Field1, Field2
ORDER BY Field1, Field2, RowCreated DESC)
FROM ETLSourceStagingTable)
INSERT INTO ETLSource (
Field1
, Field2
, Field3
, Field4
, Field5
, UpdateType
, RowCreated)
SELECT
Field1
, Field2
, Field3
, Field4
, Field5
, UpdateType
, RowCreated
FROM sourceRows
WHERE RN = 1
ORDER BY RowCreated;
Note the introduction of a Staging Table. The SSIS package now uses BULK INSERT to load the Staging Table from the file in Blob Storage. The query above is used to load only the relevant rows (the most recent) into the ETLSource table. The Staging Table has the same structure as the ETLSource table, without the Id column. And has an index on it like this:
CREATE INDEX ETLSourceStagingTableSort ON ETLSourceStagingTable
(Field1, Field2, RowCreated DESC) WITH (DATA_COMPRESSION = PAGE)
The use of the Staging Table and the CTE query above meant that of my original 7 rows in the example above, only three are relevant:
Id
(Identity)
Field1
Sequence
Field3
Field4
TrxType
RowCreatedTimeStamp
Relevant
1
13940001
1
24785779
41
M
11:37:31.650
YES
2
13940001
2
24785779
20
M
11:37:32.070
3
13940001
3
24785779
26
M
11:37:32.185
4
13940001
2
24785779
21
M
11:37:32.205
5
13940001
3
24785779
4
M
11:37:32.265
YES
6
13940001
2
24785779
29
D
11:37:32.391
7
13940001
2
24785779
18
M
11:37:33.392
YES
Now, I just needed to craft the MERGE statement properly to work. When I did, this is what I had:
MERGE ETLTarget AS target USING (
SELECT
Field1
, Field2
, Field3
, Field4
, Field5
, UpdateType
FROM ETLSource
) AS source (Field1
, Field2
, Field3
, Field4
, Field5
, UpdateType)
ON (target.Field1 = source.Field1
AND target.Field2 = source.Field2)
WHEN MATCHED AND source.UpdateType = 'M'
THEN UPDATE
SET target.Field3 = source.Field3
,target.Field4 = source.Field4
,target.Field5 = source.Field5
,target.BatchDate = @BatchDate
WHEN MATCHED AND source.UpdateType = 'D'
THEN DELETE
WHEN NOT MATCHED BY TARGET AND source.UpdateType = 'M'
THEN INSERT (Field1
, Field2
, Field3
, Field4
, Field5
, BatchDate)
VALUES (Field1
, Field2
, Field3
, Field4
, Field5
, @BatchDate);
Which was fine for a small data set, but crawled on a big one, so I added batching so the merge only had to deal with a small set of rows at once. Since the clustered PK is an identity column, and since I truncate ETLSource before loading it, I am guaranteed that the Id column will be values 1…n where n is the total number of rows. So, I initialize an @rows variable right after inserting the rows into ETLSource:
SET @rows = @@rowcount;
Next, I create a while loop for each batch:
DECLARE @batchSize INT = 10000;
DECLARE @start INT = 1;
DECLARE @end INT = @batchSize;
WHILE (@start < @rows)
BEGIN
MERGE ETLTarget AS target USING (
...;
SET @start = @start + @batchSize;
SET @end = @end + @batchSize;
END
Then I add the @start and @end to the MERGE statement source:
MERGE ETLTarget AS target USING (
SELECT
Field1
, Field2
, Field3
, Field4
, Field5
, UpdateType
FROM ETLSource
WHERE id BETWEEN @start AND @end
) AS source
And this worked!!! I was able to process a million rows in 1 minute. Yay!
Then I tried 10 million rows. Ugh. Now the MERGE was only processing 10,000 rows per minute. Crap. What changed? Same code. Same data, just more of it. A look at the query plan explained it all. Here it is with 1 million rows:
And here it is with 10 million rows:
The ETLTarget table has 80 million rows in it. When I had 10 million rows in the ETLSource table, the query optimizer decided that it would be easier to do an Index SCAN instead of a SEEK. In this case, however, the SEEK would have been A LOT faster.
So how do we force it to use a Seek? It turns out the optimizer has no idea how many rows were processing in a batch, so it bases its optimization on the entire table. We could use the Loop Join hint a the end of the merge statement:
MERGE ... , @BatchDate)
OPTION (LOOP JOIN);
But most folks like to avoid these kinds of hints. So we needed another way. Someone suggested putting in a TOP clause in the Source SELECT statement. That worked. Here’s how the MERGE looks now:
MERGE ETLTarget AS target USING (
SELECT TOP 10000
Field1
, Field2
, Field3
, Field4
, Field5
, UpdateType
FROM ETLSource
WHERE id BETWEEN @start AND @end
ORDER BY id
) AS source
With this in place, I was able to process the 10 million rows in 10 minutes. Woohoo! Just to be sure, I re-ran the process with a thousand, a million and 10 million rows and it was consistent. I was able to process a million rows a minute.
I just ran a few simple tests in Azure SQL DB to see how each would perform.
I have a target table with an identity column that is the clustered primary key, and two other indexes that are the same except for the field order. (Whether having both is useful is a question for another day.) Here’s the DDL for the target table:
CREATE TABLE [ETL1](
Field1 [numeric](12, 0) NULL,
Field2 [smallint] NULL,
Field3 [int] NULL,
Field4 [smallint] NULL,
Field5 [nvarchar](1) NULL,
Field6 [nvarchar](2) NULL,
Field7 [numeric](4, 0) NULL,
Field8 [numeric](2, 0) NULL,
Field9 [nvarchar](2) NULL,
Field10 [nvarchar](8) NULL,
Field11 [datetime2](7) NULL,
Field12 [nvarchar](8) NULL,
Field13 [datetime2](7) NULL,
[UpdateType] [nchar](1) NOT NULL,
[RowCreated] [datetime2](7) NOT NULL,
Id BIGINT IDENTITY NOT NULL
) ON [PRIMARY]
GO
CREATE UNIQUE CLUSTERED INDEX iscdpk ON ETL1 (Id) WITH (DATA_COMPRESSION = PAGE);
CREATE UNIQUE NONCLUSTERED INDEX iscd1 ON ETL1 (UpdateType, RowCreated, Field1, Field2) WITH (DATA_COMPRESSION = PAGE);
CREATE UNIQUE NONCLUSTERED INDEX iscd2 ON ETL1 (Field1, Field2, UpdateType, RowCreated) WITH (DATA_COMPRESSION = PAGE);
GO
Test 1: Truncate the Target, Drop the Indexes, Insert the Data, Recreate the Indexes.
Test 2: Drop the Indexes, Truncate the Target, Insert the Data, Recreate the Indexes
Test 3: Just Truncate the Target and Insert the Data
Test 4: Truncate the Target, Drop the non-clustered Indexes (leaving the the clustered index on the identity column), Insert the Data, Recreate the non-clustered Indexes.
Here are the results. All timings are in milliseconds. These were run on a PRS1 instance of Azure SQL Database.
Test 1: Trunc then Drop Idxs
Test 2: Drop before Trunc
Test 3: No Drop/ Create
Test 4: Trunc but don’t drop clustered index
Truncate
4
2
0
4
Drop PK
8
4
n/a
n/a
Drop Index 1
5
23,630
n/a
2
Drop Index 2
6
2
n/a
2
Insert 1.84 M rows
83,033
82,315
161,706
83,205
Create PK
20,454
21,205
n/a
n/a
Create Index 1
12,149
12,264
n/a
12,265
Create Index 2
11,142
11,313
n/a
11,247
Total Time (ms)
126,801
150,735
161,706
106,725
Total Time (mins)
2.11
2.51
2.70
1.78
Delta (ms)
0
23,934
34,905
(20,076)
Test 4 was the clear winner as it avoided the cost of recreating the clustered index. Which makes sense as the clustered index was being filled in order by the identity column as rows were added. Test 1 came in second, so if your clustered index is not on an identity column, or you have no clustered index, you are still better off dropping and recreating the indexes.
Conclusion: When inserting larger data sets into an empty table, drop the indexes before inserting the data, unless the index is clustered on an identity column.
I am excited to announce I am finally ready to present my advanced ETL Framework with Biml seminar at SQL Saturday Charlotte on Oct 14. And I have a 10AM slot!!! Wooo!
Implementing a SSIS Framework and enforcing SSIS Patterns (with Biml).
(Using Biml to Automate the Implementation of a SSIS Framework)
Let’s use Biml to automate the implementation of a standard SSIS Framework. I.e. Logging, error handling, etc. Business Intelligence Markup Language (Biml) is great at automating the creation of SSIS packages. With Biml we can generate a template package that implements a standard SSIS framework. In this fast-paced session, we will create the tables and load some metadata, write the Biml to implement logging and error handling, and generate some packages that implement our standard framework. In this session, you don’t need to know Biml, but some familiarity with XML, TSQL or C# will help. By the end of this session, you will know how to use Biml to automatically generate packages that implement a simple SSIS Framework with Audit Logging and Error Handling.
I am excited to announce that I will be giving my BIML seminar in Atlanta in July!!! Here’s the abstract for my session:
How to move a ton of data from the Mainframe to the Cloud (with Biml).
So, you need to move data from 75 tables on the mainframe to new tables in SQL Azure. Do you: a) hand code one package to load all 75 tables, b) hand code 75 packages that move a table each, or c) wish there was a better way?
There is! Business Intelligence Markup Language (Biml) can automate the creation of packages so that they all follow the same script. In this session, we will create some simple metadata to be able to generate multiple packages and their associated connection managers. You will see Biml in action. You will see the XML that Biml uses to define packages and connections. You will see the C# code that Biml uses to fetch metadata and dynamically generate packages in SSIS. And you will see packages and connection managers generated from Biml before your very eyes.
Yay!!! I’ll be speaking about Biml at SQL Saturday in Chattanooga in June!
I’m very excited. In typical fashion, I will probably cram too much into the hour, but I’ll try not to. Here’s the abstract for my session.
Using Biml to Automate the Generation of SSIS Packages
So, you need to move data from 75 tables on the mainframe to new tables in SQL Azure. Do you: a) hand code one package to load all 75 tables, b) hand code 75 packages that move a table each, or c) wish there was a better way?
There is! Business Intelligence Markup Language (Biml) can automate the creation of packages so that they all follow the same script. In this session, we will create some simple metadata to be able to generate multiple packages and their associated connection managers. You will see Biml in action. You will see the XML that Biml uses to define packages and connections. You will see the C# code that Biml uses to fetch metadata and dynamically generate packages in SSIS. And you will see packages and connection managers generated from Biml before your very eyes.