SQL Saturday St. Louis – February 2020

I will be speaking at SQL Saturday St. Louis on Saturday, February 8th at 8:00 am and 10:20 am. The topics are:

8:00 am Moving Data to the Cloud (with Azure Data Factory)

You need to move data. A lot of data. To the cloud. You’ve got data in a variety of on- and off-site data sources. There are several ways to do it. Some of them can be quite easily implemented using Azure Data Factory. Learn how to use variables and looping in your Data Factory pipelines. Use the Integration Runtime to pull directly from on-site sources. See how to upload files to blob storage and import them. Learn how to trigger Data Factory activities. And, learn how to keep all those connection strings and passwords secret in Azure Vault. After this session, you will have tools that you can readily implement in your own data migrations.

10:20 am Why Learn Python? A Microsoft DB/ETL/BI Developer’s Answer

You’re a Microsoft Developer. C#, MSSQL, SSIS/SSRS, SSMS, and Azure are your tools of choice. Why would you want to learn Python? In this session, I will show you several take-home utilities that use Python. The first hunts through a folder structure full of SSIS packages looking for the one(s) that load(s) a specified table. The second executes the data sources in an SSRS report to identify performance problems and document them in Excel. The third GeoCodes the City/Country names from a SQL table, getting the Lat/Lng so you can use the data in maps. Familiarity with Python is not necessary to use the utilities, but we’re not going to do Hello World either. This is real Python for Microsoft Database, ETL and BI Developers. This all-demo session shows you how to use Python with the .Net CLR, XML, ODBC, Excel, SQL Server, and Web API calls.

  1. Which SSIS Package loads this table.py
import os
import shutil
import xml.etree.ElementTree as et
from collections import OrderedDict
import pandas as pd
import fnmatch

# CHANGE THESE TWO PARAMETERS
# pathToSearch should be set to the root directory of your source code repo
# and, in Windows, will look something like this:
# "\\Users\\fred\\source\\Workspaces\\SQL\\SSIS"
pathToSearch = "F:\\_GIT\\SqlSaturday\\Python\\SSIS Packages"
# tableToFind should be set to the name of the table you want to find
tableToFind = "Listing"


##############################################################################
# DO NOT CHANGE CODE BELOW THIS LINE

# ParseFile parses the DTSX file looking for the occurence of tableToFind
def ParseFile(file, tableToFind):
    tableFound = []
    treeOrig = et.parse(file)
    rootOrig = treeOrig.getroot()
    nsDTS = "www.microsoft.com/SqlServer/Dts"
    DTS = {"DTS": nsDTS}
    nsSQLTask = "www.microsoft.com/sqlserver/dts/tasks/sqltask"
    SQLTask = {"SQLTask": nsSQLTask}
    executablesRoot = rootOrig.find("DTS:Executables", namespaces=DTS)
    if executablesRoot:
        executables = executablesRoot.findall("DTS:Executable", namespaces=DTS)
        # "SqlTaskData", namespaces={"SQLTask": nsSQLTask})
        for e in executables:
            od = e.find("DTS:ObjectData", namespaces=DTS)
            if od:
                pipelines = od.findall("pipeline")
                for pipeline in pipelines:
                    componentsElement = pipeline.find("components")
                    if componentsElement:
                        components = componentsElement.findall("component")
                        for c in components:
                            if c.attrib["componentClassID"] == "Microsoft.OLEDBDestination":
                                props = c.find("properties")
                                table = ""
                                tablevar = ""
                                for p in props:
                                    if p.attrib["name"] == "OpenRowset":
                                        table = p.text
                                    if p.attrib["name"] == "OpenRowsetVariable":
                                        tablevar = p.text
                                if table == "" and tablevar != "":
                                    table = GetTableFromVar(
                                        tablevar, rootOrig, file)
                                if table != "":
                                    if tableToFind in table.upper():
                                        cns = c.find("connections")
                                        cn = cns.find("connection")
                                        cnName = cn.attrib["connectionManagerRefId"]
                                        tableFound.append(
                                            {"file": file,  "statementType": "OLEDB Destination", "connection": cnName})
                sqlTasks = od.findall(
                    "SQLTask:SqlTaskData", namespaces=SQLTask)
                for task in sqlTasks:
                    sql = task.attrib[f"{{{nsSQLTask}}}SqlStatementSource"].strip(
                    ).upper()
                    if sql.startswith("MERGE"):
                        words = sql[0:300].split(" ")
                        table = words[1]
                        if tableToFind in table:
                            tableFound.append(
                                {"file": file,  "statementType": "Merge", "sql": sql})
                    elif sql.startswith("INSERT INTO"):
                        words = sql[0:300].split(" ")
                        table = words[2]
                        if tableToFind in table:
                            tableFound.append(
                                {"file": file,  "statementType": "Insert Into", "sql": sql})
                    elif sql.startswith("TRUNCATE TABLE"):
                        words = sql[0:300].split(" ")
                        table = words[2]
                        if tableToFind in table:
                            tableFound.append(
                                {"file": file,  "statementType": "Truncate Table", "sql": sql})
                    elif sql.startswith("EXEC"):
                        words = sql[0:300].split(" ")
                        sproc = words[1]
                        result = SeeIfSprocUpdatesTable(
                            sproc, tableToFind, file)
                        if result:
                            tableFound.append(result)
                    elif "MERGE " in sql:
                        index = sql.find("MERGE ")
                        words = sql[index:index+300].split(" ")
                        table = words[1]
                        if tableToFind in table:
                            tableFound.append(
                                {"file": file,  "statementType": "Merge", "sql": sql})
                    elif "INSERT INTO " in sql:
                        index = sql.find("INSERT INTO ")
                        words = sql[index:index+300].split(" ")
                        table = words[2]
                        if tableToFind in table:
                            tableFound.append(
                                {"file": file,  "statementType": "Insert Into", "sql": sql})
                    elif "TRUNCATE TABLE " in sql:
                        index = sql.find("TRUNCATE TABLE ")
                        words = sql[index:index+300].split(" ")
                        table = words[2]
                        if tableToFind in table:
                            tableFound.append(
                                {"file": file,  "statementType": "Truncate Table", "sql": sql})
                    elif "INTO " in sql:
                        index = sql.find("INTO ")
                        words = sql[index:index+300].split(" ")
                        table = words[1]
                        if tableToFind in table:
                            tableFound.append(
                                {"file": file,  "statementType": "Select Into", "sql": sql})
    return tableFound

# GetTableFromVar is not implented yet
# The intention is to lookup the value(s) of the variable and see if it's == tableToFind


def GetTableFromVar(tablevar, rootOrig, file):
    if tablevar == "NOT IMPLEMENTED":
        return "tableName"
    else:
        return None

# SeeIfSprocUpdatesTable is not implented yet
# The intention is to look at the sproc source and see if it's == tableToFind


def SeeIfSprocUpdatesTable(sproc, tableToFind, file):
    if sproc == "NOT IMPLEMENTED":
        return {"file": file,  "statementType": "EXEC", "proc": sproc, "sql": "???"}
    else:
        return None


# Code Execution starts here
tableToFind = tableToFind.upper()
print(f"Looking for: {tableToFind}")

# Finding *.dtsx files in the supplied folder and its subfolders
allDtsxFiles = []
for dirpath, dirnames, filenames in os.walk(pathToSearch):
    if not filenames:
        continue
    dtsxFiles = fnmatch.filter(filenames, "*.dtsx")
    if dtsxFiles:
        for file in dtsxFiles:
            allDtsxFiles.append(f"{dirpath}\\{file}")
if len(allDtsxFiles) == 0:
    print('No *.dtsx files found')
else:
    print(f"First *.dtsx file found: {allDtsxFiles[0]}")

# Examining each file to see if the tableToFind is in there
tableFound = []
for file in allDtsxFiles:
    print(f"Checking file {file}")
    tableFound.extend(ParseFile(file, tableToFind))

# Save results as Excel file
if len(tableFound) == 0:
    print(f"{tableToFind} not found in any files")
else:
    df = pd.DataFrame(tableFound)
    excelFile = os.path.join(pathToSearch, tableToFind + '-found.xlsx')
    print(f"Saving {excelFile}")
    df.to_excel(excelFile)

print("Done")

2) Which SSRS Data Sources are Slow.py

import clr
clr.AddReference("C:\\Windows\\Microsoft.NET\\assembly\\GAC_MSIL\\Microsoft.AnalysisServices.AdomdClient\\v4.0_15.0.0.0__89845dcd8080cc91\\Microsoft.AnalysisServices.AdomdClient.dll")
clr.AddReference("System.Data")
from Microsoft.AnalysisServices.AdomdClient import (
    AdomdConnection, AdomdDataAdapter)
import datetime
import os
import shutil
import time
import xml.etree.ElementTree as et
from collections import OrderedDict
import numpy as np
import pandas as pd
import pyodbc
from System.Data import DataSet

# CHANGE THESE SETTINGS IN THE systemconfig.py FILE IN THIS FOLDER
from systemconfig import pwd, sqlserver, uid

# CHANGE THESE PARAMETERS
# path should be set to the directory where the report .RDL file is
# and, in Windows, will look something like this:
# "\\Users\\fred\\source\\Workspaces\\SQL\\SSRS"
path = "F:\\_GIT\\SqlSaturday\\Python\\SSRS Reports\Report Project1"
# reportFile should be set to the name of the .RDL you want to analyze
reportFile = 'ReportWithALotOfDatasets.rdl'
# sqlServerName should be set to the name of the SQL Server the queries run against
sqlServerName = sqlserver
# ssasServerName should be set to the name of the SSAS Server the MDX queries run against
ssasServerName = "(local)"
# ssasDatabaseNames should be set to a List of the SSAS Databases the MDX queries run against
ssasDatabaseNames = ["TheCube", "TheOtherCube"]

##############################################################################
# DO NOT CHANGE CODE BELOW THIS LINE (maybe)


def parseQueries(file):
    treeOrig = et.parse(file)
    rootOrig = treeOrig.getroot()

    dsnames = []
    dsns = []
    parms = []
    cmds = []
    fields = []

    i = 1
    datasets = rootOrig.find(
        "{http://schemas.microsoft.com/sqlserver/reporting/2016/01/reportdefinition}DataSets")
    for dataset in datasets.findall(
            "{http://schemas.microsoft.com/sqlserver/reporting/2016/01/reportdefinition}DataSet"):
        dsnames.append(dataset.attrib["Name"])
        query = dataset[0]
        dsn = query[0].text
        parm = ""
        cmd = ""
        if query[1].tag == '{http://schemas.microsoft.com/sqlserver/reporting/2016/01/reportdefinition}QueryParameters':
            parm = query[1].text
            cmd = query[2].text
        else:
            cmd = query[1].text
        field = dataset[1].text

        dsns.append(dsn)
        parms.append(parm)
        cmds.append(cmd)
        fields.append(field)

        i += 1
        # This is a debugging technique.  It limits the loop to one iteration.
        # if i > 1:
        # break
    df = pd.DataFrame({"DataSets": dsnames, "DSNs": dsns, "Parms": parms,
                       "Commands": cmds, "Fields": fields})
    return df


def test_MDX(name, server, database, mdx):
    conn = AdomdConnection("Data Source=" + server + ";Catalog=" + database)
    conn.Open()
    cmd = conn.CreateCommand()
    # you might need to add parameters here
    cmd.CommandText = mdx

    try:
        print("Trying: " + name)
        start = time.time_ns()
        adp = AdomdDataAdapter(cmd)
        datasetParam = DataSet()
        adp.Fill(datasetParam)
        end = time.time_ns()
        result = {"Name": name, "DB": database,
                  "SQL": sql, "Time": end - start, "Missing": False,
                  "Rows": datasetParam.Tables[0].Rows.Count}
    except:
        result = {"Name": name, "DB": database,
                  "SQL": sql, "Time": -1, "Missing": True,
                  "Rows": 0}
    finally:
        conn.Close()
    return result


def test_SQL(name, server, database, sql):
    connstring = "Driver={ODBC Driver 17 for SQL Server};Server=" + \
        server + ";Database=" + database + ";UID=" + uid + ";PWD=" + pwd
    conn = pyodbc.connect(connstring)
    cursor = conn.cursor()
    result = ""
    try:
        print("Trying: " + name)
        start = time.time_ns()
        cursor.execute(sql)
        rows = cursor.fetchall()
        end = time.time_ns()
        result = {"Name": name, "DB": database,
                  "SQL": sql, "Time": end - start, "Missing": False,
                  "Rows": len(rows)}
    except:
        result = {"Name": name, "DB": database,
                  "SQL": sql, "Time": -1, "Missing": True, "Rows": 0}
    return result


# Code Execution starts here
file = os.path.join(path, reportFile)
df = parseQueries(file)
date1 = str(datetime.datetime.date(datetime.datetime.today()))
# Uncomment the next line to save a list of the Datasets and their SQL to an Excel file
# df.to_excel(os.path.join(path, date1 + f"_{reportFile}_ReportQueries.xlsx"))
print("Number of Datasets to Check: " + str(df["DataSets"].count()))

commands = []
for i in range(0, df["DataSets"].count()):
    row = df.iloc[i, :]
    name = row["DataSets"]
    database = row["DSNs"]
    sql = row["Commands"]

    if database in ssasDatabaseNames:
        server = ssasServerName
        commands.append(test_MDX(name, server, database, sql))
    else:
        server = sqlServerName
        commands.append(test_SQL(name, server, database, sql))

dfSql = pd.DataFrame(commands)
dfSql["Seconds"] = dfSql['Time'] / 1000000000
excelFile = os.path.join(
    path, date1 + f"_{reportFile}_ReportQueryResults.xlsx")
print(f"Saving {excelFile}")
dfSql.to_excel(excelFile)
print("Done")

3. Geocoding Places.py

import clr
clr.AddReference("C:\\Windows\\Microsoft.NET\\assembly\\GAC_MSIL\\Microsoft.AnalysisServices.AdomdClient\\v4.0_15.0.0.0__89845dcd8080cc91\\Microsoft.AnalysisServices.AdomdClient.dll")
clr.AddReference("System.Data")
import os
import shutil
import xml.etree.ElementTree as et
from collections import OrderedDict
import numpy as np
import pandas as pd
import datetime
import pyodbc
import time
import json
import requests
from Microsoft.AnalysisServices.AdomdClient import AdomdConnection, AdomdDataAdapter
from System.Data import DataSet

# CHANGE THESE SETTINGS IN THE systemconfig.py FILE IN THIS FOLDER
from systemconfig import api_key, sqlserver, uid, pwd

# CHANGE THESE PARAMETERS
database = "AdventureWorks2016"
# The Places Table/View should have 6 columns:
# PrimaryKeyField, City, State, Country, Latitude and Longitude
placesTable = "dbo.Places"
# This is the Primary Key Field for the placesTable
placesPKField = "PlaceId"
# Set this parameter to a small number for debugging,
# and set it larger than the number of rows in your placesTable to do them all
maxGeocodingCalls = 25


##############################################################################
# DO NOT CHANGE CODE BELOW THIS LINE

connstring = "Driver={ODBC Driver 17 for SQL Server};Server=" + \
    sqlserver+";Database="+database+";UID=omwtm-sa;Pwd=" + pwd
baseurl = f'https://maps.googleapis.com/maps/api/geocode/json?key={api_key}'

def LoadCities():
    sql = f"SELECT * FROM {placesTable} WHERE ISNULL(Latitude, 0) = 0"
    rows = LoadDataSet(sql)
    cities = []
    for row in rows:
        cities.append([row[0], row[1], row[2], row[3], 0, 0, False])
    return cities

def LoadDataSet(sql):
    conn = pyodbc.connect(connstring)
    cursor = conn.cursor()
    cursor.execute(sql)
    rows = cursor.fetchall()
    cursor.close()
    conn.close()
    return rows

def ExecuteSql(conn, sql):
    cursor = conn.cursor()
    cursor.execute(sql)
    cursor.close()

def GeocodeCities(cities):
    i = 1
    for c in cities:
        print(f"Geocoding: {c[1]}, {c[2]}, {c[3]}")
        url = baseurl + f'&address={c[1]},{c[2]},{c[3]}&components=country:{c[3]}'
        geo_data = requests.get(url).json()
        if geo_data["status"] == "OK":
            lat = geo_data["results"][0]["geometry"]["location"]["lat"]
            lng = geo_data["results"][0]["geometry"]["location"]["lng"]
            c[4] = lat
            c[5] = lng
            i += 1
            # this break statement aborts the loop so it only calls
            # the google API maxGeocodingCalls number of times
            if i > maxGeocodingCalls:
               break


def UpdateCities(cities):
    conn = pyodbc.connect(connstring)
    conn.autocommit = True
    for c in cities:
        if (c[4] != 0 or c[5] != 0) and c[6] == False:
            print(f"Updating: {c[1]}, {c[2]}, {c[3]}")
            placeId = c[0]
            sql = f"UPDATE {placesTable} " + \
                f"SET Latitude = {c[4]}, Longitude = {c[5]} " + \
                f"WHERE {placesPKField} = {placeId}"
            ExecuteSql(conn, sql)
            c[6] = True
    conn.close()


# Code Execution starts here
cities = LoadCities()
# for Debugging:  print(cities)
# print(cities[0])

GeocodeCities(cities)
# for Debugging:  print(cities)
# print(cities[0])

UpdateCities(cities)
# for Debugging:  print(cities)
# print(cities[0])

print("Done")

SetupPlacesTables.sql

create table Places (
  PlaceId int not null identity constraint PlacesPK primary key,
  City nvarchar(50),
  StateProvince nvarchar(50),
  CountryRegion nvarchar(50),
  Latitude numeric(18, 6),
  Longitude numeric(18, 6)
)
GO
insert into Places (City, StateProvince, CountryRegion)
select
  distinct City,
  s.Name as StateProvince,
  c.Name as CountryRegion
from Person.Address a
inner join Person.StateProvince s on a.StateProvinceID = s.StateProvinceID
inner join Person.CountryRegion c on s.CountryRegionCode = c.CountryRegionCode
GO

systemconfig.py

api_key = ""
sqlserver = ""
uid = ""
pwd = ""

Azure Data Factory Publishing Error

I am using Azure Data Factory v2. It is bound to an Azure DevOps GIT repository. I made some changes in my branch, including deleting some obsolete items. Then I merged my changes back to the ‘master’ branch.

Now, I’m trying to publish my changes from the ‘master’ branch to the Azure Data Factory. When I do, it says it’s deployed 33 of 33 changes, and then it fails, saying:

“Publishing Error

The document cannot be deleted since it is referenced by <some Pipeline>.”

I searched high and low looking for some evidence that the offending Pipeline existed anywhere in the GIT repo. It did not.

Then, I discovered that you can change from the Azure DevOps GIT version of the Data Factory to the actual Data Factory version by selecting the latter from the dropdown in the top left corner of the Data Factory editor:

This saved the day. I was able to locate and delete the offending Pipeline(s) directly from the actual Data Factory. Then, when I switched back to Azure DevOps GIT mode, it allowed me to publish the changes from the ‘master’ branch. Woohoo!

Connecting to IBM DB2 zOS from Azure Data Factory v2

Connecting to IBM DB2 zOS from Azure Data Factory v1 was a matter of setting up the Azure Data Gateway on an on-prem server that had the IBM DB2 Client installed; creating an ODBC connection to DB2 (I called it DB2Test).  Then, in the Data Factory v1 Copy Wizard, Select the ODBC source, pick the Gateway, and enter the phrase:  DSN=DB2Test into the Connection String.  This worked for us.

Azure Data Factory v2

First, the Azure Data Gateway is now called “Hosted Integration Runtime”.  So download and install the IR client on your on-prem gateway machine.  On my machine, it auto-configured to use the existing Data Factory Gateway configuration, which is NOT what I wanted.  After uninstalling and reinstalling the IR client a couple of times, it stopped auto-configuring and asked me for a key.  To get the key, I had our Azure Dev configuration guy run the following PowerShell:

Import-Module AzureRM
$dataFactoryName = "myDataFactoryv2NoSSIS"
$resourceGroupName = "myResourceGroup"
$selfHostedIntegrationRuntimeName = "mySelfHostedIntegrationRuntime"
Login-AzureRmAccount
Set-AzureRmDataFactoryV2IntegrationRuntime -ResourceGroupName $resouceGroupName -DataFactoryName $dataFactoryName -Name $selfHostedIntegrationRuntimeName -Type SelfHosted -Description "selfhosted IR description"
Get-AzureRmDataFactoryV2IntegrationRuntimeKey -ResourceGroupName $resourceGroupName -DataFactoryName $dataFactoryName -Name $selfHostedIntegrationRuntime

I then pasted the Key into the Integration Runtime Configuration screen, and it connected properly to myDataFactoryv2NoSSIS.  Tada:

snip_20180124073749

Next, is to test the connection to DB2.  I went to the Diagnostics tab, entered the DSN and credentials, just like I did for Data Factory V1:

Failed to connect to the database. Error message: ERROR [HY010] [IBM][CLI Driver] CLI0125E Function sequence error. SQLSTATE=HY010

Dang! Much googling later, I found this obscure note.

I added the phrase “Autocommit=Off” to the DSN in the connection string, and voila, the connection worked.  So my final diagnostic looked like this:

snip_20180124074603

YAY!!