SQL Saturday St. Louis – February 2020

I will be speaking at SQL Saturday St. Louis on Saturday, February 8th at 8:00 am and 10:20 am. The topics are:

8:00 am Moving Data to the Cloud (with Azure Data Factory)

You need to move data. A lot of data. To the cloud. You’ve got data in a variety of on- and off-site data sources. There are several ways to do it. Some of them can be quite easily implemented using Azure Data Factory. Learn how to use variables and looping in your Data Factory pipelines. Use the Integration Runtime to pull directly from on-site sources. See how to upload files to blob storage and import them. Learn how to trigger Data Factory activities. And, learn how to keep all those connection strings and passwords secret in Azure Vault. After this session, you will have tools that you can readily implement in your own data migrations.

10:20 am Why Learn Python? A Microsoft DB/ETL/BI Developer’s Answer

You’re a Microsoft Developer. C#, MSSQL, SSIS/SSRS, SSMS, and Azure are your tools of choice. Why would you want to learn Python? In this session, I will show you several take-home utilities that use Python. The first hunts through a folder structure full of SSIS packages looking for the one(s) that load(s) a specified table. The second executes the data sources in an SSRS report to identify performance problems and document them in Excel. The third GeoCodes the City/Country names from a SQL table, getting the Lat/Lng so you can use the data in maps. Familiarity with Python is not necessary to use the utilities, but we’re not going to do Hello World either. This is real Python for Microsoft Database, ETL and BI Developers. This all-demo session shows you how to use Python with the .Net CLR, XML, ODBC, Excel, SQL Server, and Web API calls.

  1. Which SSIS Package loads this table.py
import os
import shutil
import xml.etree.ElementTree as et
from collections import OrderedDict
import pandas as pd
import fnmatch

# CHANGE THESE TWO PARAMETERS
# pathToSearch should be set to the root directory of your source code repo
# and, in Windows, will look something like this:
# "\\Users\\fred\\source\\Workspaces\\SQL\\SSIS"
pathToSearch = "F:\\_GIT\\SqlSaturday\\Python\\SSIS Packages"
# tableToFind should be set to the name of the table you want to find
tableToFind = "Listing"


##############################################################################
# DO NOT CHANGE CODE BELOW THIS LINE

# ParseFile parses the DTSX file looking for the occurence of tableToFind
def ParseFile(file, tableToFind):
    tableFound = []
    treeOrig = et.parse(file)
    rootOrig = treeOrig.getroot()
    nsDTS = "www.microsoft.com/SqlServer/Dts"
    DTS = {"DTS": nsDTS}
    nsSQLTask = "www.microsoft.com/sqlserver/dts/tasks/sqltask"
    SQLTask = {"SQLTask": nsSQLTask}
    executablesRoot = rootOrig.find("DTS:Executables", namespaces=DTS)
    if executablesRoot:
        executables = executablesRoot.findall("DTS:Executable", namespaces=DTS)
        # "SqlTaskData", namespaces={"SQLTask": nsSQLTask})
        for e in executables:
            od = e.find("DTS:ObjectData", namespaces=DTS)
            if od:
                pipelines = od.findall("pipeline")
                for pipeline in pipelines:
                    componentsElement = pipeline.find("components")
                    if componentsElement:
                        components = componentsElement.findall("component")
                        for c in components:
                            if c.attrib["componentClassID"] == "Microsoft.OLEDBDestination":
                                props = c.find("properties")
                                table = ""
                                tablevar = ""
                                for p in props:
                                    if p.attrib["name"] == "OpenRowset":
                                        table = p.text
                                    if p.attrib["name"] == "OpenRowsetVariable":
                                        tablevar = p.text
                                if table == "" and tablevar != "":
                                    table = GetTableFromVar(
                                        tablevar, rootOrig, file)
                                if table != "":
                                    if tableToFind in table.upper():
                                        cns = c.find("connections")
                                        cn = cns.find("connection")
                                        cnName = cn.attrib["connectionManagerRefId"]
                                        tableFound.append(
                                            {"file": file,  "statementType": "OLEDB Destination", "connection": cnName})
                sqlTasks = od.findall(
                    "SQLTask:SqlTaskData", namespaces=SQLTask)
                for task in sqlTasks:
                    sql = task.attrib[f"{{{nsSQLTask}}}SqlStatementSource"].strip(
                    ).upper()
                    if sql.startswith("MERGE"):
                        words = sql[0:300].split(" ")
                        table = words[1]
                        if tableToFind in table:
                            tableFound.append(
                                {"file": file,  "statementType": "Merge", "sql": sql})
                    elif sql.startswith("INSERT INTO"):
                        words = sql[0:300].split(" ")
                        table = words[2]
                        if tableToFind in table:
                            tableFound.append(
                                {"file": file,  "statementType": "Insert Into", "sql": sql})
                    elif sql.startswith("TRUNCATE TABLE"):
                        words = sql[0:300].split(" ")
                        table = words[2]
                        if tableToFind in table:
                            tableFound.append(
                                {"file": file,  "statementType": "Truncate Table", "sql": sql})
                    elif sql.startswith("EXEC"):
                        words = sql[0:300].split(" ")
                        sproc = words[1]
                        result = SeeIfSprocUpdatesTable(
                            sproc, tableToFind, file)
                        if result:
                            tableFound.append(result)
                    elif "MERGE " in sql:
                        index = sql.find("MERGE ")
                        words = sql[index:index+300].split(" ")
                        table = words[1]
                        if tableToFind in table:
                            tableFound.append(
                                {"file": file,  "statementType": "Merge", "sql": sql})
                    elif "INSERT INTO " in sql:
                        index = sql.find("INSERT INTO ")
                        words = sql[index:index+300].split(" ")
                        table = words[2]
                        if tableToFind in table:
                            tableFound.append(
                                {"file": file,  "statementType": "Insert Into", "sql": sql})
                    elif "TRUNCATE TABLE " in sql:
                        index = sql.find("TRUNCATE TABLE ")
                        words = sql[index:index+300].split(" ")
                        table = words[2]
                        if tableToFind in table:
                            tableFound.append(
                                {"file": file,  "statementType": "Truncate Table", "sql": sql})
                    elif "INTO " in sql:
                        index = sql.find("INTO ")
                        words = sql[index:index+300].split(" ")
                        table = words[1]
                        if tableToFind in table:
                            tableFound.append(
                                {"file": file,  "statementType": "Select Into", "sql": sql})
    return tableFound

# GetTableFromVar is not implented yet
# The intention is to lookup the value(s) of the variable and see if it's == tableToFind


def GetTableFromVar(tablevar, rootOrig, file):
    if tablevar == "NOT IMPLEMENTED":
        return "tableName"
    else:
        return None

# SeeIfSprocUpdatesTable is not implented yet
# The intention is to look at the sproc source and see if it's == tableToFind


def SeeIfSprocUpdatesTable(sproc, tableToFind, file):
    if sproc == "NOT IMPLEMENTED":
        return {"file": file,  "statementType": "EXEC", "proc": sproc, "sql": "???"}
    else:
        return None


# Code Execution starts here
tableToFind = tableToFind.upper()
print(f"Looking for: {tableToFind}")

# Finding *.dtsx files in the supplied folder and its subfolders
allDtsxFiles = []
for dirpath, dirnames, filenames in os.walk(pathToSearch):
    if not filenames:
        continue
    dtsxFiles = fnmatch.filter(filenames, "*.dtsx")
    if dtsxFiles:
        for file in dtsxFiles:
            allDtsxFiles.append(f"{dirpath}\\{file}")
if len(allDtsxFiles) == 0:
    print('No *.dtsx files found')
else:
    print(f"First *.dtsx file found: {allDtsxFiles[0]}")

# Examining each file to see if the tableToFind is in there
tableFound = []
for file in allDtsxFiles:
    print(f"Checking file {file}")
    tableFound.extend(ParseFile(file, tableToFind))

# Save results as Excel file
if len(tableFound) == 0:
    print(f"{tableToFind} not found in any files")
else:
    df = pd.DataFrame(tableFound)
    excelFile = os.path.join(pathToSearch, tableToFind + '-found.xlsx')
    print(f"Saving {excelFile}")
    df.to_excel(excelFile)

print("Done")

2) Which SSRS Data Sources are Slow.py

import clr
clr.AddReference("C:\\Windows\\Microsoft.NET\\assembly\\GAC_MSIL\\Microsoft.AnalysisServices.AdomdClient\\v4.0_15.0.0.0__89845dcd8080cc91\\Microsoft.AnalysisServices.AdomdClient.dll")
clr.AddReference("System.Data")
from Microsoft.AnalysisServices.AdomdClient import (
    AdomdConnection, AdomdDataAdapter)
import datetime
import os
import shutil
import time
import xml.etree.ElementTree as et
from collections import OrderedDict
import numpy as np
import pandas as pd
import pyodbc
from System.Data import DataSet

# CHANGE THESE SETTINGS IN THE systemconfig.py FILE IN THIS FOLDER
from systemconfig import pwd, sqlserver, uid

# CHANGE THESE PARAMETERS
# path should be set to the directory where the report .RDL file is
# and, in Windows, will look something like this:
# "\\Users\\fred\\source\\Workspaces\\SQL\\SSRS"
path = "F:\\_GIT\\SqlSaturday\\Python\\SSRS Reports\Report Project1"
# reportFile should be set to the name of the .RDL you want to analyze
reportFile = 'ReportWithALotOfDatasets.rdl'
# sqlServerName should be set to the name of the SQL Server the queries run against
sqlServerName = sqlserver
# ssasServerName should be set to the name of the SSAS Server the MDX queries run against
ssasServerName = "(local)"
# ssasDatabaseNames should be set to a List of the SSAS Databases the MDX queries run against
ssasDatabaseNames = ["TheCube", "TheOtherCube"]

##############################################################################
# DO NOT CHANGE CODE BELOW THIS LINE (maybe)


def parseQueries(file):
    treeOrig = et.parse(file)
    rootOrig = treeOrig.getroot()

    dsnames = []
    dsns = []
    parms = []
    cmds = []
    fields = []

    i = 1
    datasets = rootOrig.find(
        "{http://schemas.microsoft.com/sqlserver/reporting/2016/01/reportdefinition}DataSets")
    for dataset in datasets.findall(
            "{http://schemas.microsoft.com/sqlserver/reporting/2016/01/reportdefinition}DataSet"):
        dsnames.append(dataset.attrib["Name"])
        query = dataset[0]
        dsn = query[0].text
        parm = ""
        cmd = ""
        if query[1].tag == '{http://schemas.microsoft.com/sqlserver/reporting/2016/01/reportdefinition}QueryParameters':
            parm = query[1].text
            cmd = query[2].text
        else:
            cmd = query[1].text
        field = dataset[1].text

        dsns.append(dsn)
        parms.append(parm)
        cmds.append(cmd)
        fields.append(field)

        i += 1
        # This is a debugging technique.  It limits the loop to one iteration.
        # if i > 1:
        # break
    df = pd.DataFrame({"DataSets": dsnames, "DSNs": dsns, "Parms": parms,
                       "Commands": cmds, "Fields": fields})
    return df


def test_MDX(name, server, database, mdx):
    conn = AdomdConnection("Data Source=" + server + ";Catalog=" + database)
    conn.Open()
    cmd = conn.CreateCommand()
    # you might need to add parameters here
    cmd.CommandText = mdx

    try:
        print("Trying: " + name)
        start = time.time_ns()
        adp = AdomdDataAdapter(cmd)
        datasetParam = DataSet()
        adp.Fill(datasetParam)
        end = time.time_ns()
        result = {"Name": name, "DB": database,
                  "SQL": sql, "Time": end - start, "Missing": False,
                  "Rows": datasetParam.Tables[0].Rows.Count}
    except:
        result = {"Name": name, "DB": database,
                  "SQL": sql, "Time": -1, "Missing": True,
                  "Rows": 0}
    finally:
        conn.Close()
    return result


def test_SQL(name, server, database, sql):
    connstring = "Driver={ODBC Driver 17 for SQL Server};Server=" + \
        server + ";Database=" + database + ";UID=" + uid + ";PWD=" + pwd
    conn = pyodbc.connect(connstring)
    cursor = conn.cursor()
    result = ""
    try:
        print("Trying: " + name)
        start = time.time_ns()
        cursor.execute(sql)
        rows = cursor.fetchall()
        end = time.time_ns()
        result = {"Name": name, "DB": database,
                  "SQL": sql, "Time": end - start, "Missing": False,
                  "Rows": len(rows)}
    except:
        result = {"Name": name, "DB": database,
                  "SQL": sql, "Time": -1, "Missing": True, "Rows": 0}
    return result


# Code Execution starts here
file = os.path.join(path, reportFile)
df = parseQueries(file)
date1 = str(datetime.datetime.date(datetime.datetime.today()))
# Uncomment the next line to save a list of the Datasets and their SQL to an Excel file
# df.to_excel(os.path.join(path, date1 + f"_{reportFile}_ReportQueries.xlsx"))
print("Number of Datasets to Check: " + str(df["DataSets"].count()))

commands = []
for i in range(0, df["DataSets"].count()):
    row = df.iloc[i, :]
    name = row["DataSets"]
    database = row["DSNs"]
    sql = row["Commands"]

    if database in ssasDatabaseNames:
        server = ssasServerName
        commands.append(test_MDX(name, server, database, sql))
    else:
        server = sqlServerName
        commands.append(test_SQL(name, server, database, sql))

dfSql = pd.DataFrame(commands)
dfSql["Seconds"] = dfSql['Time'] / 1000000000
excelFile = os.path.join(
    path, date1 + f"_{reportFile}_ReportQueryResults.xlsx")
print(f"Saving {excelFile}")
dfSql.to_excel(excelFile)
print("Done")

3. Geocoding Places.py

import clr
clr.AddReference("C:\\Windows\\Microsoft.NET\\assembly\\GAC_MSIL\\Microsoft.AnalysisServices.AdomdClient\\v4.0_15.0.0.0__89845dcd8080cc91\\Microsoft.AnalysisServices.AdomdClient.dll")
clr.AddReference("System.Data")
import os
import shutil
import xml.etree.ElementTree as et
from collections import OrderedDict
import numpy as np
import pandas as pd
import datetime
import pyodbc
import time
import json
import requests
from Microsoft.AnalysisServices.AdomdClient import AdomdConnection, AdomdDataAdapter
from System.Data import DataSet

# CHANGE THESE SETTINGS IN THE systemconfig.py FILE IN THIS FOLDER
from systemconfig import api_key, sqlserver, uid, pwd

# CHANGE THESE PARAMETERS
database = "AdventureWorks2016"
# The Places Table/View should have 6 columns:
# PrimaryKeyField, City, State, Country, Latitude and Longitude
placesTable = "dbo.Places"
# This is the Primary Key Field for the placesTable
placesPKField = "PlaceId"
# Set this parameter to a small number for debugging,
# and set it larger than the number of rows in your placesTable to do them all
maxGeocodingCalls = 25


##############################################################################
# DO NOT CHANGE CODE BELOW THIS LINE

connstring = "Driver={ODBC Driver 17 for SQL Server};Server=" + \
    sqlserver+";Database="+database+";UID=omwtm-sa;Pwd=" + pwd
baseurl = f'https://maps.googleapis.com/maps/api/geocode/json?key={api_key}'

def LoadCities():
    sql = f"SELECT * FROM {placesTable} WHERE ISNULL(Latitude, 0) = 0"
    rows = LoadDataSet(sql)
    cities = []
    for row in rows:
        cities.append([row[0], row[1], row[2], row[3], 0, 0, False])
    return cities

def LoadDataSet(sql):
    conn = pyodbc.connect(connstring)
    cursor = conn.cursor()
    cursor.execute(sql)
    rows = cursor.fetchall()
    cursor.close()
    conn.close()
    return rows

def ExecuteSql(conn, sql):
    cursor = conn.cursor()
    cursor.execute(sql)
    cursor.close()

def GeocodeCities(cities):
    i = 1
    for c in cities:
        print(f"Geocoding: {c[1]}, {c[2]}, {c[3]}")
        url = baseurl + f'&address={c[1]},{c[2]},{c[3]}&components=country:{c[3]}'
        geo_data = requests.get(url).json()
        if geo_data["status"] == "OK":
            lat = geo_data["results"][0]["geometry"]["location"]["lat"]
            lng = geo_data["results"][0]["geometry"]["location"]["lng"]
            c[4] = lat
            c[5] = lng
            i += 1
            # this break statement aborts the loop so it only calls
            # the google API maxGeocodingCalls number of times
            if i > maxGeocodingCalls:
               break


def UpdateCities(cities):
    conn = pyodbc.connect(connstring)
    conn.autocommit = True
    for c in cities:
        if (c[4] != 0 or c[5] != 0) and c[6] == False:
            print(f"Updating: {c[1]}, {c[2]}, {c[3]}")
            placeId = c[0]
            sql = f"UPDATE {placesTable} " + \
                f"SET Latitude = {c[4]}, Longitude = {c[5]} " + \
                f"WHERE {placesPKField} = {placeId}"
            ExecuteSql(conn, sql)
            c[6] = True
    conn.close()


# Code Execution starts here
cities = LoadCities()
# for Debugging:  print(cities)
# print(cities[0])

GeocodeCities(cities)
# for Debugging:  print(cities)
# print(cities[0])

UpdateCities(cities)
# for Debugging:  print(cities)
# print(cities[0])

print("Done")

SetupPlacesTables.sql

create table Places (
  PlaceId int not null identity constraint PlacesPK primary key,
  City nvarchar(50),
  StateProvince nvarchar(50),
  CountryRegion nvarchar(50),
  Latitude numeric(18, 6),
  Longitude numeric(18, 6)
)
GO
insert into Places (City, StateProvince, CountryRegion)
select
  distinct City,
  s.Name as StateProvince,
  c.Name as CountryRegion
from Person.Address a
inner join Person.StateProvince s on a.StateProvinceID = s.StateProvinceID
inner join Person.CountryRegion c on s.CountryRegionCode = c.CountryRegionCode
GO

systemconfig.py

api_key = ""
sqlserver = ""
uid = ""
pwd = ""

SQL Saturday Nashville – January 2020

I will be speaking at SQL Saturday Nashville on Saturday, January 18th at 8:30 am. The topic is:

Data Bricks, Spark, Machine Learning and Azure Synapse Analytics

An end-to-end example of Data in the cloud.

You’ve heard about Azure Data Lake and Azure Data Warehouse, now called Azure Synapse Analytics. You’ve also heard about Azure Data Factory and Azure Data Bricks. You might even have heard about Python, Spark, and Azure Machine Learning. In this fast-paced, all-demo session, we will walk through the process of ingesting data into the Data Lake, analyzing it with Spark and Machine Learning, outputting it to the Data Warehouse and reporting on it in Power BI. You will walk away with working code and an overall understanding of how all these tools can help you develop advanced analytics solutions in the modern data landscape.

Files:

The sample data file created in Exercise 1 and used in the remaining Exercises:

Data bricks, Spark, Machine Learning and Azure Synapse Analytics Slide Deck and Step-by-step instructions:

Quickly Processing One Million Transactions in Azure SQL Database

I’ve had to solve an interesting problem this week. I started the week with an SSIS package that ran in 2 minutes. It extracts a million transaction records from DB2 to a file, uploads the file to Azure Blob Storage and BULK IMPORT’s the file into an Azure SQL Database staging table. All in 2 minutes. This was acceptable performance and faster than any of the other options we were considering.

Each of the DB2 records represents an Insert, Update or Delete to the base table in DB2. I get the Transaction Type (M – Merge (Insert/Update) or D – Delete), and a time stamp in addition to the columns from the row.

So a typical set of rows might look like this:

Id (Identity)Field1Field2Field3Field4TrxTypeRowCreatedTimeStamp
11394000112478577941M11:37:31.650
21394000122478577920M11:37:32.070
31394000132478577926M11:37:32.185
41394000122478577921M11:37:32.205
5139400013247857794M11:37:32.265
61394000122478577929D11:37:32.391
71394000122478577918M11:37:33.392

In the example above, the rows are all for the same document (See Field1:  value 13940001).  Rows with Field2= 1, 2, 3 were added.  Then row 2 and 3 were changed (Id 4, 5).  Then row 2 was deleted (Id 6) and a new row 2 was inserted (Id 7).

Here is the definition of the source table in the Azure SQL Database:

CREATE TABLE ETLSource(
       Field1 [numeric](12, 0) NULL,
       Field2 [smallint] NULL,
       Field3 [int] NULL,
       Field4 [smallint] NULL,
       Field5 [nvarchar](1) NULL,
       [UpdateType] [nchar](1) NOT NULL,
       [RowCreated] [datetime2](7) NOT NULL,
       [Id] BIGINT IDENTITY NOT NULL
) WITH (DATA_COMPRESSION = PAGE)
GO

CREATE UNIQUE CLUSTERED INDEX ETLSourcePK ON ETLSource (Id) WITH (DATA_COMPRESSION = PAGE);
CREATE UNIQUE NONCLUSTERED  INDEX ETLSourceIdx1 ON ETLSource (UpdateType, RowCreated, Field1, Field2) WITH (DATA_COMPRESSION = PAGE);
CREATE UNIQUE NONCLUSTERED  INDEX ETLSourceIdx2 ON ETLSource (Field1, Field2, UpdateType, RowCreated) WITH (DATA_COMPRESSION = PAGE);
GO

And here is the definition of the target table in the Azure SQL Database:

CREATE TABLE ETLTarget(
       Field1 [numeric](12, 0) NULL,
       Field2 [smallint] NULL,
       Field3 [int] NULL,
       Field4 [smallint] NULL,
       Field5 [nvarchar](1) NULL,
       [BatchDate] [datetime2](7) NULL
) WITH (DATA_COMPRESSION = PAGE)
GO

CREATE CLUSTERED INDEX ETLTargetPK ON ETLTarget (Field1, Field2) WITH (DATA_COMPRESSION = PAGE);
GO

At first, I tried a cursor. I know how to write them and it was easy enough to create a cursor that looped through the rows and used either a DELETE statement or a MERGE statement to deal with each one. Here’s what that looked like:

DECLARE @BatchDate DATETIME2(7) = SYSUTCDATETIME();

DECLARE @Field1 NUMERIC(12, 0)
DECLARE @Field2 SMALLINT
DECLARE @Field3 INT
DECLARE @Field4 SMALLINT
DECLARE @Field5 NVARCHAR(1)
DECLARE @UpdateType	CHAR(1)
DECLARE @RowCreated	DATETIME2(7)

DECLARE cur CURSOR LOCAL FAST_FORWARD FOR SELECT
	   Field1 
	  ,Field2 
	  ,Field3 
	  ,Field4 
	  ,Field5 
	  ,UpdateType
	  ,RowCreated
    FROM ETLSource
    ORDER BY id

OPEN cur

FETCH NEXT FROM cur INTO 
	 @Field1 
    , @Field2 
    , @Field3 
    , @Field4 
    , @Field5 
    , @UpdateType
    , @RowCreated

WHILE @@fetch_status = 0
BEGIN

    IF @UpdateType = 'D'
    BEGIN
	   DELETE FROM dbo.ETLTarget
	   WHERE Field1 = @Field1
		  AND Field2 = @Field2;
    END
    IF @UpdateType = 'M'
    BEGIN
	   --Merge the changes that are left
	   MERGE ETLTarget AS target 
	   USING (
		  VALUES(
		    @Field1 
		  , @Field2 
		  , @Field3 
		  , @Field4 
		  , @Field5 
		  )
	   ) AS source (
		Field1 
	   , Field2 
	   , Field3 
	   , Field4 
	   , Field5 )
	   ON (target.Field1 = source.Field1
		  AND target.Field2 = source.Field2)
	   WHEN MATCHED
		  THEN UPDATE
			 SET target.Field3 = source.Field3
			    ,target.Field4 = source.Field4
			    ,target.Field5 = source.Field5
			    ,target.BatchDate = @BatchDate
	   WHEN NOT MATCHED BY target
		  THEN INSERT (
			   Field1 
			 , Field2 
			 , Field3 
			 , Field4 
			 , Field5
			 , BatchDate)
		  VALUES (@Field1 
			 , @Field2 
			 , @Field3 
			 , @Field4 
			 , @Field5 
			 , @BatchDate);
    END;

    FETCH NEXT FROM cur INTO 
		@Field1 
	   , @Field2 
	   , @Field3 
	   , @Field4 
	   , @Field5 
	   , @UpdateType
	   , @RowCreated
END

CLOSE cur
DEALLOCATE cur

Unfortunately, this solution was TERRIBLY slow. Cursors are notorious for being slow. This one worked fine for 1,000 transaction rows, but, after running for an hour and only processing a small portion of the million rows, I killed it and went looking for a set-based alternative.

Next, I tried a set-based MERGE statement. This was problematic because it kept complaining that multiple source records were trying to change the same target record. This complaint made sense when I realized that a row might be inserted and updated in the same day so it would have two source transactions. So I needed to get rid of the extras. It turns out that I really only care about the latest change. If it’s an insert or update, MERGE will insert or update the target row appropriately, if it’s a delete, MERGE can handle that too. But, how to select only the most recent row for each key? The standard de-duplication CTE query served as a model. Here is the final statement that worked:

WITH sourceRows AS (
    SELECT *, RN  = ROW_NUMBER() OVER (PARTITION BY
	   Field1, Field2
	   ORDER BY Field1, Field2, RowCreated DESC)
    FROM ETLSourceStagingTable)

INSERT INTO ETLSource (
      Field1 
    , Field2 
    , Field3 
    , Field4 
    , Field5
    , UpdateType
    , RowCreated)
SELECT       
      Field1 
    , Field2 
    , Field3 
    , Field4 
    , Field5
    , UpdateType
    , RowCreated 
FROM sourceRows
WHERE RN = 1
ORDER BY RowCreated;

Note the introduction of a Staging Table. The SSIS package now uses BULK INSERT to load the Staging Table from the file in Blob Storage. The query above is used to load only the relevant rows (the most recent) into the ETLSource table. The Staging Table has the same structure as the ETLSource table, without the Id column. And has an index on it like this:

CREATE INDEX ETLSourceStagingTableSort ON ETLSourceStagingTable
(Field1, Field2, RowCreated DESC) WITH (DATA_COMPRESSION = PAGE)

The use of the Staging Table and the CTE query above meant that of my original 7 rows in the example above, only three are relevant:

Id (Identity)Field1SequenceField3Field4TrxTypeRowCreatedTimeStampRelevant
11394000112478577941M11:37:31.650YES
21394000122478577920M11:37:32.070
31394000132478577926M11:37:32.185
41394000122478577921M11:37:32.205
5139400013247857794M11:37:32.265YES
61394000122478577929D11:37:32.391
71394000122478577918M11:37:33.392YES

Now, I just needed to craft the MERGE statement properly to work. When I did, this is what I had:

MERGE ETLTarget AS target USING (
    SELECT 
	     Field1 
	   , Field2 
	   , Field3 
	   , Field4 
	   , Field5
	   , UpdateType
    FROM ETLSource
    ) AS source (Field1 
	   , Field2 
	   , Field3 
	   , Field4 
	   , Field5
	   , UpdateType)
ON (target.Field1 = source.Field1
    AND target.Field2 = source.Field2)
WHEN MATCHED AND source.UpdateType = 'M'
    THEN UPDATE
	   SET target.Field3 = source.Field3
		  ,target.Field4 = source.Field4
		  ,target.Field5 = source.Field5
		  ,target.BatchDate = @BatchDate
WHEN MATCHED AND source.UpdateType = 'D'
    THEN DELETE  
WHEN NOT MATCHED BY TARGET AND source.UpdateType = 'M'
    THEN INSERT (Field1 
	   , Field2 
	   , Field3 
	   , Field4 
	   , Field5
	   , BatchDate)
    VALUES (Field1 
	   , Field2 
	   , Field3 
	   , Field4 
	   , Field5
	   , @BatchDate);

Which was fine for a small data set, but crawled on a big one, so I added batching so the merge only had to deal with a small set of rows at once. Since the clustered PK is an identity column, and since I truncate ETLSource before loading it, I am guaranteed that the Id column will be values 1…n where n is the total number of rows. So, I initialize an @rows variable right after inserting the rows into ETLSource:

SET @rows = @@rowcount;

Next, I create a while loop for each batch:

DECLARE @batchSize INT = 10000;
DECLARE @start INT = 1;
DECLARE @end INT = @batchSize;

WHILE (@start < @rows)
BEGIN

    MERGE ETLTarget AS target USING (
    ...;

    SET @start = @start + @batchSize;
    SET @end = @end + @batchSize;
END

Then I add the @start and @end to the MERGE statement source:

MERGE ETLTarget AS target USING (
    SELECT 
	     Field1 
	   , Field2 
	   , Field3 
	   , Field4 
	   , Field5
	   , UpdateType
    FROM ETLSource
    WHERE id BETWEEN @start AND @end
    ) AS source

And this worked!!! I was able to process a million rows in 1 minute. Yay!

Then I tried 10 million rows. Ugh. Now the MERGE was only processing 10,000 rows per minute. Crap. What changed? Same code. Same data, just more of it. A look at the query plan explained it all. Here it is with 1 million rows:

And here it is with 10 million rows:

The ETLTarget table has 80 million rows in it. When I had 10 million rows in the ETLSource table, the query optimizer decided that it would be easier to do an Index SCAN instead of a SEEK. In this case, however, the SEEK would have been A LOT faster.

So how do we force it to use a Seek? It turns out the optimizer has no idea how many rows were processing in a batch, so it bases its optimization on the entire table. We could use the Loop Join hint a the end of the merge statement:

	MERGE	...	 , @BatchDate)
	   OPTION (LOOP JOIN);

But most folks like to avoid these kinds of hints. So we needed another way. Someone suggested putting in a TOP clause in the Source SELECT statement. That worked. Here’s how the MERGE looks now:

MERGE ETLTarget AS target USING (
    SELECT TOP 10000
	     Field1 
	   , Field2 
	   , Field3 
	   , Field4 
	   , Field5
	   , UpdateType
    FROM ETLSource
    WHERE id BETWEEN @start AND @end
    ORDER BY id
    ) AS source

With this in place, I was able to process the 10 million rows in 10 minutes. Woohoo! Just to be sure, I re-ran the process with a thousand, a million and 10 million rows and it was consistent. I was able to process a million rows a minute.

When Loading Data, Should I Drop Indexes or Not?

I just ran a few simple tests in Azure SQL DB to see how each would perform.

I have a target table with an identity column that is the clustered primary key, and two other indexes that are the same except for the field order. (Whether having both is useful is a question for another day.) Here’s the DDL for the target table:

CREATE TABLE [ETL1](
	Field1 [numeric](12, 0) NULL,
	Field2 [smallint] NULL,
	Field3 [int] NULL,
	Field4 [smallint] NULL,
	Field5 [nvarchar](1) NULL,
	Field6 [nvarchar](2) NULL,
	Field7 [numeric](4, 0) NULL,
	Field8 [numeric](2, 0) NULL,
	Field9 [nvarchar](2) NULL,
	Field10 [nvarchar](8) NULL,
	Field11 [datetime2](7) NULL,
	Field12 [nvarchar](8) NULL,
	Field13 [datetime2](7) NULL,
	[UpdateType] [nchar](1) NOT NULL,
	[RowCreated] [datetime2](7) NOT NULL,
	Id BIGINT IDENTITY NOT NULL
) ON [PRIMARY]
GO

CREATE UNIQUE CLUSTERED INDEX iscdpk ON ETL1 (Id) WITH (DATA_COMPRESSION = PAGE);
CREATE UNIQUE NONCLUSTERED  INDEX iscd1 ON ETL1 (UpdateType, RowCreated, Field1, Field2) WITH (DATA_COMPRESSION = PAGE);
CREATE UNIQUE NONCLUSTERED  INDEX iscd2 ON ETL1 (Field1, Field2, UpdateType, RowCreated) WITH (DATA_COMPRESSION = PAGE);
GO

Test 1: Truncate the Target, Drop the Indexes, Insert the Data, Recreate the Indexes.

Test 2: Drop the Indexes, Truncate the Target, Insert the Data, Recreate the Indexes

Test 3: Just Truncate the Target and Insert the Data

Test 4: Truncate the Target, Drop the non-clustered Indexes (leaving the the clustered index on the identity column), Insert the Data, Recreate the non-clustered Indexes.

Here are the results. All timings are in milliseconds. These were run on a PRS1 instance of Azure SQL Database.

Test 1:
Trunc then
Drop Idxs
Test 2:
Drop before
Trunc
Test 3:
No Drop/
Create
Test 4:
Trunc but don’t
drop clustered
index
Truncate4 2 04
Drop PK8 4  n/a  n/a 
Drop Index 15 23,630 n/a 2
Drop Index 26 2  n/a 2
Insert 1.84 M rows83,033 82,315 161,706 83,205
Create PK20,454 21,205  n/a  n/a 
Create Index 112,149 12,264  n/a 12,265
Create Index 211,142 11,313  n/a 11,247
Total Time (ms)126,801 150,735 161,706 106,725
Total Time (mins)2.11 2.51 2.70 1.78
Delta (ms)0 23,934 34,905 (20,076)

Test 4 was the clear winner as it avoided the cost of recreating the clustered index. Which makes sense as the clustered index was being filled in order by the identity column as rows were added. Test 1 came in second, so if your clustered index is not on an identity column, or you have no clustered index, you are still better off dropping and recreating the indexes.

Conclusion: When inserting larger data sets into an empty table, drop the indexes before inserting the data, unless the index is clustered on an identity column.

Azure Data Factory Publishing Error

I am using Azure Data Factory v2. It is bound to an Azure DevOps GIT repository. I made some changes in my branch, including deleting some obsolete items. Then I merged my changes back to the ‘master’ branch.

Now, I’m trying to publish my changes from the ‘master’ branch to the Azure Data Factory. When I do, it says it’s deployed 33 of 33 changes, and then it fails, saying:

“Publishing Error

The document cannot be deleted since it is referenced by <some Pipeline>.”

I searched high and low looking for some evidence that the offending Pipeline existed anywhere in the GIT repo. It did not.

Then, I discovered that you can change from the Azure DevOps GIT version of the Data Factory to the actual Data Factory version by selecting the latter from the dropdown in the top left corner of the Data Factory editor:

This saved the day. I was able to locate and delete the offending Pipeline(s) directly from the actual Data Factory. Then, when I switched back to Azure DevOps GIT mode, it allowed me to publish the changes from the ‘master’ branch. Woohoo!

Creating an Alexa Skill in C# – Step 1

Recently, I decided to make an Alexa skill that I could play on my boss’s Alexa.  At first, I was doing it as a gag, but I figured that wouldn’t work as he has to actively install my skill onto his Alexa.  Now it reads some stats from one of our Azure databases and publishes those in a conversation.  Here’s how I built it.

Step 1:  Creating the Alexa Skill Model

I don’t know any of the other languages that you can write a skill in, so I chose to write it in C#.  That means getting a bunch of bits and loading them into Visual Studio.  But first, you’ll need to start the process of creating a skill.  At the time of this writing, these are the steps I took.

  1. Start Here.
  2. Click the Start a Skill button.capture20180507114817423
  3. If you don’t have an Amazon Developer account, create one.
  4. Eventually, you’ll get to the Alexa Skills Developers Console where you can click the Create Skill button.  capture20180507115228773
  5. Give your skill a name.  I’m calling mine:  Simon’s Example Skill.
  6. On the Choose a Model screen, select Custom. capture20180507115624173
  7. Then click Create Skill.
  8. You should now be on the Build tab for your Skill.  Notice the tree view on the left and the checklist on the right.capture20180507115839651

Invocation Name

The Invocation Name is the phrase that an Alexa user will use to launch/run/start your Alexa Skill.  It could be “joe’s hot dog recipes” or some such.  It needs to be lower case, there are some restrictions on the characters you can use, and any abbreviations you use need periods to separate the letters so Alexa knows to read the letters and not pronounce the word.  Read the Invocation Name Requirements for more details.

Click the Invocation Name link in the tree view on the left or the first option in the checklist on the right.  Then give your skill an Invocation Name.  I named mine:  “simon’s example”.

Intents

The Intents are the various functions your skill can perform.  For example, stating today’s date, looking up some data, figuring something out, etc.  Let’s do all three.

First, my skill is going to provide today’s date, so I’m going to name my first Intent, “GetTodaysDateIntent”.  My skill is also going to look up some data in an Azure SQL Database, so I’m going to name my second Intent, “DataLookupIntent”.  Lastly, I want to figure something out, like the average temperature in a major US city.

Utterances

The Utterances are the phrases an Alexa user might say to trigger your Intent (function).  You should put in several Utterances using synonyms and different phrasing so Alexa has a better chance of triggering your Intent instead of responding with “I don’t know how to do that.”

For the GetTodaysDateIntent, I added the following utterances:

snip_20180511131124

Slots

Within an Utterance, you can have Slots (or placeholders), that represent the multiple options for that slot.  For example, if my table has the count of animals per household per neighborhood per county in it, I might want to create slots for Animal Type, Household Size, Neighborhood Name, and County Name.  You can do this by typing a left brace { in the Utterance box.

capture20180507123258765

Here are three of my sample utterances for the DataLookupIntent:

capture20180507123605384

Once you have created a slot, you need to populate it with options.  You do this in the bottom half of the Utterance screen.

capture20180507123836958

You can easily select one of the pre-defined Slot Types in the drop-down.  In my case, Amazon has a list of Animals, so I’ll pick AMAZON.Animal in the first slot.

I need to manually add a few Counties for the second slot though.  And at this time, you don’t want to click Edit Dialog (though it’s tempting).  Instead, you want to define your own Slot Type by clicking the Add (Plus) button next to Slot Type in the tree view on the left:

capture20180507124646880

For example, here is the custom Slot Type for County Name:

capture20180507130009338

Notice the synonyms column.  This is important if there are synonyms, for example, a 1 person household and a single person household are synonymous.  So be certain to add any synonyms you can think of.  Here is my custom Slot Type for Household Size, notice the synonyms off to the right:

capture20180507130107411

Now that you’ve defined some custom Slot Types, you can click on the Slot names under the Intent in the tree view on the left and select the newly created Slot Type for each Slot.

capture20180507130205126

For the GetAverageTemperatureIntent, I added one Utterance:

snip_20180511131952

And configured the {City} slot as follows:

snip_20180511133037

Finally, you can Save your Model and Build it by clicking the buttons at the top of the screen:

capture20180507130345077

Hopefully, the model will build and we are ready to move on to Step 2.  If the model doesn’t build, check the bottom right of the screen for a list of the errors:

snip_20180511132317

Fix the errors until the model builds:

snip_20180511133106

Then go to Step 2.