I will be speaking at SQL Saturday St. Louis on Saturday, February 8th at 8:00 am and 10:20 am. The topics are:
8:00 am Moving Data to the Cloud (with Azure Data Factory)
You need to move data. A lot of data. To the cloud. You’ve got data in a variety of on- and off-site data sources. There are several ways to do it. Some of them can be quite easily implemented using Azure Data Factory. Learn how to use variables and looping in your Data Factory pipelines. Use the Integration Runtime to pull directly from on-site sources. See how to upload files to blob storage and import them. Learn how to trigger Data Factory activities. And, learn how to keep all those connection strings and passwords secret in Azure Vault. After this session, you will have tools that you can readily implement in your own data migrations.
10:20 am Why Learn Python? A Microsoft DB/ETL/BI Developer’s Answer
You’re a Microsoft Developer. C#, MSSQL, SSIS/SSRS, SSMS, and Azure are your tools of choice. Why would you want to learn Python? In this session, I will show you several take-home utilities that use Python. The first hunts through a folder structure full of SSIS packages looking for the one(s) that load(s) a specified table. The second executes the data sources in an SSRS report to identify performance problems and document them in Excel. The third GeoCodes the City/Country names from a SQL table, getting the Lat/Lng so you can use the data in maps. Familiarity with Python is not necessary to use the utilities, but we’re not going to do Hello World either. This is real Python for Microsoft Database, ETL and BI Developers. This all-demo session shows you how to use Python with the .Net CLR, XML, ODBC, Excel, SQL Server, and Web API calls.
- Which SSIS Package loads this table.py
import os
import shutil
import xml.etree.ElementTree as et
from collections import OrderedDict
import pandas as pd
import fnmatch
# CHANGE THESE TWO PARAMETERS
# pathToSearch should be set to the root directory of your source code repo
# and, in Windows, will look something like this:
# "\\Users\\fred\\source\\Workspaces\\SQL\\SSIS"
pathToSearch = "F:\\_GIT\\SqlSaturday\\Python\\SSIS Packages"
# tableToFind should be set to the name of the table you want to find
tableToFind = "Listing"
##############################################################################
# DO NOT CHANGE CODE BELOW THIS LINE
# ParseFile parses the DTSX file looking for the occurence of tableToFind
def ParseFile(file, tableToFind):
tableFound = []
treeOrig = et.parse(file)
rootOrig = treeOrig.getroot()
nsDTS = "www.microsoft.com/SqlServer/Dts"
DTS = {"DTS": nsDTS}
nsSQLTask = "www.microsoft.com/sqlserver/dts/tasks/sqltask"
SQLTask = {"SQLTask": nsSQLTask}
executablesRoot = rootOrig.find("DTS:Executables", namespaces=DTS)
if executablesRoot:
executables = executablesRoot.findall("DTS:Executable", namespaces=DTS)
# "SqlTaskData", namespaces={"SQLTask": nsSQLTask})
for e in executables:
od = e.find("DTS:ObjectData", namespaces=DTS)
if od:
pipelines = od.findall("pipeline")
for pipeline in pipelines:
componentsElement = pipeline.find("components")
if componentsElement:
components = componentsElement.findall("component")
for c in components:
if c.attrib["componentClassID"] == "Microsoft.OLEDBDestination":
props = c.find("properties")
table = ""
tablevar = ""
for p in props:
if p.attrib["name"] == "OpenRowset":
table = p.text
if p.attrib["name"] == "OpenRowsetVariable":
tablevar = p.text
if table == "" and tablevar != "":
table = GetTableFromVar(
tablevar, rootOrig, file)
if table != "":
if tableToFind in table.upper():
cns = c.find("connections")
cn = cns.find("connection")
cnName = cn.attrib["connectionManagerRefId"]
tableFound.append(
{"file": file, "statementType": "OLEDB Destination", "connection": cnName})
sqlTasks = od.findall(
"SQLTask:SqlTaskData", namespaces=SQLTask)
for task in sqlTasks:
sql = task.attrib[f"{{{nsSQLTask}}}SqlStatementSource"].strip(
).upper()
if sql.startswith("MERGE"):
words = sql[0:300].split(" ")
table = words[1]
if tableToFind in table:
tableFound.append(
{"file": file, "statementType": "Merge", "sql": sql})
elif sql.startswith("INSERT INTO"):
words = sql[0:300].split(" ")
table = words[2]
if tableToFind in table:
tableFound.append(
{"file": file, "statementType": "Insert Into", "sql": sql})
elif sql.startswith("TRUNCATE TABLE"):
words = sql[0:300].split(" ")
table = words[2]
if tableToFind in table:
tableFound.append(
{"file": file, "statementType": "Truncate Table", "sql": sql})
elif sql.startswith("EXEC"):
words = sql[0:300].split(" ")
sproc = words[1]
result = SeeIfSprocUpdatesTable(
sproc, tableToFind, file)
if result:
tableFound.append(result)
elif "MERGE " in sql:
index = sql.find("MERGE ")
words = sql[index:index+300].split(" ")
table = words[1]
if tableToFind in table:
tableFound.append(
{"file": file, "statementType": "Merge", "sql": sql})
elif "INSERT INTO " in sql:
index = sql.find("INSERT INTO ")
words = sql[index:index+300].split(" ")
table = words[2]
if tableToFind in table:
tableFound.append(
{"file": file, "statementType": "Insert Into", "sql": sql})
elif "TRUNCATE TABLE " in sql:
index = sql.find("TRUNCATE TABLE ")
words = sql[index:index+300].split(" ")
table = words[2]
if tableToFind in table:
tableFound.append(
{"file": file, "statementType": "Truncate Table", "sql": sql})
elif "INTO " in sql:
index = sql.find("INTO ")
words = sql[index:index+300].split(" ")
table = words[1]
if tableToFind in table:
tableFound.append(
{"file": file, "statementType": "Select Into", "sql": sql})
return tableFound
# GetTableFromVar is not implented yet
# The intention is to lookup the value(s) of the variable and see if it's == tableToFind
def GetTableFromVar(tablevar, rootOrig, file):
if tablevar == "NOT IMPLEMENTED":
return "tableName"
else:
return None
# SeeIfSprocUpdatesTable is not implented yet
# The intention is to look at the sproc source and see if it's == tableToFind
def SeeIfSprocUpdatesTable(sproc, tableToFind, file):
if sproc == "NOT IMPLEMENTED":
return {"file": file, "statementType": "EXEC", "proc": sproc, "sql": "???"}
else:
return None
# Code Execution starts here
tableToFind = tableToFind.upper()
print(f"Looking for: {tableToFind}")
# Finding *.dtsx files in the supplied folder and its subfolders
allDtsxFiles = []
for dirpath, dirnames, filenames in os.walk(pathToSearch):
if not filenames:
continue
dtsxFiles = fnmatch.filter(filenames, "*.dtsx")
if dtsxFiles:
for file in dtsxFiles:
allDtsxFiles.append(f"{dirpath}\\{file}")
if len(allDtsxFiles) == 0:
print('No *.dtsx files found')
else:
print(f"First *.dtsx file found: {allDtsxFiles[0]}")
# Examining each file to see if the tableToFind is in there
tableFound = []
for file in allDtsxFiles:
print(f"Checking file {file}")
tableFound.extend(ParseFile(file, tableToFind))
# Save results as Excel file
if len(tableFound) == 0:
print(f"{tableToFind} not found in any files")
else:
df = pd.DataFrame(tableFound)
excelFile = os.path.join(pathToSearch, tableToFind + '-found.xlsx')
print(f"Saving {excelFile}")
df.to_excel(excelFile)
print("Done")
2) Which SSRS Data Sources are Slow.py
import clr
clr.AddReference("C:\\Windows\\Microsoft.NET\\assembly\\GAC_MSIL\\Microsoft.AnalysisServices.AdomdClient\\v4.0_15.0.0.0__89845dcd8080cc91\\Microsoft.AnalysisServices.AdomdClient.dll")
clr.AddReference("System.Data")
from Microsoft.AnalysisServices.AdomdClient import (
AdomdConnection, AdomdDataAdapter)
import datetime
import os
import shutil
import time
import xml.etree.ElementTree as et
from collections import OrderedDict
import numpy as np
import pandas as pd
import pyodbc
from System.Data import DataSet
# CHANGE THESE SETTINGS IN THE systemconfig.py FILE IN THIS FOLDER
from systemconfig import pwd, sqlserver, uid
# CHANGE THESE PARAMETERS
# path should be set to the directory where the report .RDL file is
# and, in Windows, will look something like this:
# "\\Users\\fred\\source\\Workspaces\\SQL\\SSRS"
path = "F:\\_GIT\\SqlSaturday\\Python\\SSRS Reports\Report Project1"
# reportFile should be set to the name of the .RDL you want to analyze
reportFile = 'ReportWithALotOfDatasets.rdl'
# sqlServerName should be set to the name of the SQL Server the queries run against
sqlServerName = sqlserver
# ssasServerName should be set to the name of the SSAS Server the MDX queries run against
ssasServerName = "(local)"
# ssasDatabaseNames should be set to a List of the SSAS Databases the MDX queries run against
ssasDatabaseNames = ["TheCube", "TheOtherCube"]
##############################################################################
# DO NOT CHANGE CODE BELOW THIS LINE (maybe)
def parseQueries(file):
treeOrig = et.parse(file)
rootOrig = treeOrig.getroot()
dsnames = []
dsns = []
parms = []
cmds = []
fields = []
i = 1
datasets = rootOrig.find(
"{http://schemas.microsoft.com/sqlserver/reporting/2016/01/reportdefinition}DataSets")
for dataset in datasets.findall(
"{http://schemas.microsoft.com/sqlserver/reporting/2016/01/reportdefinition}DataSet"):
dsnames.append(dataset.attrib["Name"])
query = dataset[0]
dsn = query[0].text
parm = ""
cmd = ""
if query[1].tag == '{http://schemas.microsoft.com/sqlserver/reporting/2016/01/reportdefinition}QueryParameters':
parm = query[1].text
cmd = query[2].text
else:
cmd = query[1].text
field = dataset[1].text
dsns.append(dsn)
parms.append(parm)
cmds.append(cmd)
fields.append(field)
i += 1
# This is a debugging technique. It limits the loop to one iteration.
# if i > 1:
# break
df = pd.DataFrame({"DataSets": dsnames, "DSNs": dsns, "Parms": parms,
"Commands": cmds, "Fields": fields})
return df
def test_MDX(name, server, database, mdx):
conn = AdomdConnection("Data Source=" + server + ";Catalog=" + database)
conn.Open()
cmd = conn.CreateCommand()
# you might need to add parameters here
cmd.CommandText = mdx
try:
print("Trying: " + name)
start = time.time_ns()
adp = AdomdDataAdapter(cmd)
datasetParam = DataSet()
adp.Fill(datasetParam)
end = time.time_ns()
result = {"Name": name, "DB": database,
"SQL": sql, "Time": end - start, "Missing": False,
"Rows": datasetParam.Tables[0].Rows.Count}
except:
result = {"Name": name, "DB": database,
"SQL": sql, "Time": -1, "Missing": True,
"Rows": 0}
finally:
conn.Close()
return result
def test_SQL(name, server, database, sql):
connstring = "Driver={ODBC Driver 17 for SQL Server};Server=" + \
server + ";Database=" + database + ";UID=" + uid + ";PWD=" + pwd
conn = pyodbc.connect(connstring)
cursor = conn.cursor()
result = ""
try:
print("Trying: " + name)
start = time.time_ns()
cursor.execute(sql)
rows = cursor.fetchall()
end = time.time_ns()
result = {"Name": name, "DB": database,
"SQL": sql, "Time": end - start, "Missing": False,
"Rows": len(rows)}
except:
result = {"Name": name, "DB": database,
"SQL": sql, "Time": -1, "Missing": True, "Rows": 0}
return result
# Code Execution starts here
file = os.path.join(path, reportFile)
df = parseQueries(file)
date1 = str(datetime.datetime.date(datetime.datetime.today()))
# Uncomment the next line to save a list of the Datasets and their SQL to an Excel file
# df.to_excel(os.path.join(path, date1 + f"_{reportFile}_ReportQueries.xlsx"))
print("Number of Datasets to Check: " + str(df["DataSets"].count()))
commands = []
for i in range(0, df["DataSets"].count()):
row = df.iloc[i, :]
name = row["DataSets"]
database = row["DSNs"]
sql = row["Commands"]
if database in ssasDatabaseNames:
server = ssasServerName
commands.append(test_MDX(name, server, database, sql))
else:
server = sqlServerName
commands.append(test_SQL(name, server, database, sql))
dfSql = pd.DataFrame(commands)
dfSql["Seconds"] = dfSql['Time'] / 1000000000
excelFile = os.path.join(
path, date1 + f"_{reportFile}_ReportQueryResults.xlsx")
print(f"Saving {excelFile}")
dfSql.to_excel(excelFile)
print("Done")
3. Geocoding Places.py
import clr
clr.AddReference("C:\\Windows\\Microsoft.NET\\assembly\\GAC_MSIL\\Microsoft.AnalysisServices.AdomdClient\\v4.0_15.0.0.0__89845dcd8080cc91\\Microsoft.AnalysisServices.AdomdClient.dll")
clr.AddReference("System.Data")
import os
import shutil
import xml.etree.ElementTree as et
from collections import OrderedDict
import numpy as np
import pandas as pd
import datetime
import pyodbc
import time
import json
import requests
from Microsoft.AnalysisServices.AdomdClient import AdomdConnection, AdomdDataAdapter
from System.Data import DataSet
# CHANGE THESE SETTINGS IN THE systemconfig.py FILE IN THIS FOLDER
from systemconfig import api_key, sqlserver, uid, pwd
# CHANGE THESE PARAMETERS
database = "AdventureWorks2016"
# The Places Table/View should have 6 columns:
# PrimaryKeyField, City, State, Country, Latitude and Longitude
placesTable = "dbo.Places"
# This is the Primary Key Field for the placesTable
placesPKField = "PlaceId"
# Set this parameter to a small number for debugging,
# and set it larger than the number of rows in your placesTable to do them all
maxGeocodingCalls = 25
##############################################################################
# DO NOT CHANGE CODE BELOW THIS LINE
connstring = "Driver={ODBC Driver 17 for SQL Server};Server=" + \
sqlserver+";Database="+database+";UID=omwtm-sa;Pwd=" + pwd
baseurl = f'https://maps.googleapis.com/maps/api/geocode/json?key={api_key}'
def LoadCities():
sql = f"SELECT * FROM {placesTable} WHERE ISNULL(Latitude, 0) = 0"
rows = LoadDataSet(sql)
cities = []
for row in rows:
cities.append([row[0], row[1], row[2], row[3], 0, 0, False])
return cities
def LoadDataSet(sql):
conn = pyodbc.connect(connstring)
cursor = conn.cursor()
cursor.execute(sql)
rows = cursor.fetchall()
cursor.close()
conn.close()
return rows
def ExecuteSql(conn, sql):
cursor = conn.cursor()
cursor.execute(sql)
cursor.close()
def GeocodeCities(cities):
i = 1
for c in cities:
print(f"Geocoding: {c[1]}, {c[2]}, {c[3]}")
url = baseurl + f'&address={c[1]},{c[2]},{c[3]}&components=country:{c[3]}'
geo_data = requests.get(url).json()
if geo_data["status"] == "OK":
lat = geo_data["results"][0]["geometry"]["location"]["lat"]
lng = geo_data["results"][0]["geometry"]["location"]["lng"]
c[4] = lat
c[5] = lng
i += 1
# this break statement aborts the loop so it only calls
# the google API maxGeocodingCalls number of times
if i > maxGeocodingCalls:
break
def UpdateCities(cities):
conn = pyodbc.connect(connstring)
conn.autocommit = True
for c in cities:
if (c[4] != 0 or c[5] != 0) and c[6] == False:
print(f"Updating: {c[1]}, {c[2]}, {c[3]}")
placeId = c[0]
sql = f"UPDATE {placesTable} " + \
f"SET Latitude = {c[4]}, Longitude = {c[5]} " + \
f"WHERE {placesPKField} = {placeId}"
ExecuteSql(conn, sql)
c[6] = True
conn.close()
# Code Execution starts here
cities = LoadCities()
# for Debugging: print(cities)
# print(cities[0])
GeocodeCities(cities)
# for Debugging: print(cities)
# print(cities[0])
UpdateCities(cities)
# for Debugging: print(cities)
# print(cities[0])
print("Done")
SetupPlacesTables.sql
create table Places (
PlaceId int not null identity constraint PlacesPK primary key,
City nvarchar(50),
StateProvince nvarchar(50),
CountryRegion nvarchar(50),
Latitude numeric(18, 6),
Longitude numeric(18, 6)
)
GO
insert into Places (City, StateProvince, CountryRegion)
select
distinct City,
s.Name as StateProvince,
c.Name as CountryRegion
from Person.Address a
inner join Person.StateProvince s on a.StateProvinceID = s.StateProvinceID
inner join Person.CountryRegion c on s.CountryRegionCode = c.CountryRegionCode
GO
systemconfig.py
api_key = ""
sqlserver = ""
uid = ""
pwd = ""
Thanks for the great presentation!