From 009d58bd45497c300b1bc6dcad2e927f9c4dea55 Mon Sep 17 00:00:00 2001 From: t-saste Date: Mon, 20 Jul 2020 13:54:52 -0700 Subject: [PATCH] job graph script --- ASJobGraphEvents/README.md | 17 +++ ASJobGraphEvents/rebuild.py | 239 ++++++++++++++++++++++++++++++++++++ 2 files changed, 256 insertions(+) create mode 100644 ASJobGraphEvents/README.md create mode 100644 ASJobGraphEvents/rebuild.py diff --git a/ASJobGraphEvents/README.md b/ASJobGraphEvents/README.md new file mode 100644 index 0000000..f8a41a2 --- /dev/null +++ b/ASJobGraphEvents/README.md @@ -0,0 +1,17 @@ +# Job Graph Events in Power BI + +Job Graph events can be used to identify bottlenecks in data refreshes by highlighting the critical path. For instances of Analysis Services not running on-premise, the graph is broken into 16 Kb chunks, each in their own event. The events can be reassembled with this script. + +## Usage + +1. Start a trace in SQL Server Profiler and select "Job Graph Events". +2. Start a data refresh ("Process Full" in SQL Server Management Studio). +3. Wait for all trace events to arrive in Profiler. +4. `File > Save As > Trace XML File` +5. Aim `rebuild.py` at this file like so: + +```bash +python rebuild.py path\to\trace.xml output_folder +``` + +6. Inside `output_folder` there will be two .DGML files, which can be opened in Visual Studio. diff --git a/ASJobGraphEvents/rebuild.py b/ASJobGraphEvents/rebuild.py new file mode 100644 index 0000000..e06355f --- /dev/null +++ b/ASJobGraphEvents/rebuild.py @@ -0,0 +1,239 @@ +""" +Rebuilds a DGML file. Requires Python 3.8. +""" + +from typing import Dict, List, Tuple, Set, NamedTuple, Optional +import csv, re, os, operator, sys +import xml.etree.ElementTree as ET + + +maxsize = sys.maxsize +while True: + try: + csv.field_size_limit(maxsize) + break + except OverflowError: + maxsize //= 2 + +# TYPES + + +class Row(NamedTuple): + guid: str + order_marker: int + textdata: str + + +# PARSING + + +def load_file(filename: str) -> List[Row]: + """ + Returns a list of events, not sorted or filtered. + """ + _, ext = os.path.splitext(filename) + + if ext == ".csv": + with open(filename) as file: + dict_rows = csv.DictReader(file) + rows = [make_row_from_jarvis(row["MessageText"]) for row in dict_rows] + + return [r for r in rows if r] + + elif ext == ".xml": + tree = ET.parse(filename) + ns = {"": "http://tempuri.org/TracePersistence.xsd"} + + xml_rows: List[Optional[Row]] = [] + + for event in tree.findall(".//Event", ns): + xml_rows.append(make_row_from_xml(event, ns)) + + return [r for r in xml_rows if r] + else: + return [] + + +def make_row_from_xml(event: ET.Element, ns: Dict[str, str]) -> Optional[Row]: + if event.attrib["id"] != "134": + return None + + textdata = None + order_marker = None + guid = None + subclass = None + + for col in event.findall("Column", ns): + if col.attrib["id"] == "46": + guid = col.text + + if col.attrib["id"] == "1": + subclass = col.text + + if col.attrib["id"] == "10" and col.text: + order_marker = int(col.text) + + if col.attrib["id"] == "42": + textdata = col.text + + if textdata and order_marker is not None and guid and subclass: + suffix = "annotated" if subclass == "2" else "plan" + return Row(f"{guid}-{suffix}", order_marker, textdata) + + return None + + +def make_row_from_jarvis(message_txt: str) -> Optional[Row]: + if "graphcorrelationid" in message_txt.lower(): + print( + "This event is from an older version of the job graph feature (shouldn't have 'GraphCorrelationID' in it)" + ) + + match = re.match(r"TextData: (.*); IntegerData: (.\d*)", message_txt) + if match: + textdata, guid, order_marker_str = match.group(1, 2, 3) + order_marker = int(order_marker_str) + return Row(guid, order_marker, textdata) + + return None + + +def extract_metadata(header_row: Row) -> Optional[Tuple[int, int]]: + # should really extract things correctly here + m = re.match( + r".*Length=\"(\d*)\".*AdditionalEvents=\"(\d*)\".*", header_row.textdata + ) + + if not m: + return None + + return int(m.group(1)), int(m.group(2)) + + +def remove_pii_tags(protected_data: str) -> str: + if protected_data[:5] == "" and protected_data[-6:] == "": + return protected_data[5:-6] + return protected_data + + +def get_all_guids(data: List[Row]) -> Set[str]: + return {row.guid for row in data} + + +# GRAPH + + +def get_graph(data: List[Row], guid: str) -> Tuple[str, str]: + rows = [row for row in data if row.guid == guid] + + rows = sorted(rows, key=operator.attrgetter("order_marker")) + + header, *graph_data = rows + + metadata = extract_metadata(header) + + if metadata: + size, additional_events = metadata + assert additional_events == len( + graph_data + ), f"metadata says there are {additional_events} rows; but there are {len(graph_data)}" + + graph_str_builder = [remove_pii_tags(row.textdata) for row in graph_data] + + return "".join(graph_str_builder), guid + + +# INPUT/OUTPUT FILES + + +def get_all_event_files() -> List[str]: + return [os.path.join("data", f) for f in os.listdir("data")] + + +def get_output_file(input_file: str, guid: str, output_folder: str) -> str: + _, input_file = os.path.split(input_file) + name, ext = os.path.splitext(input_file) + + os.makedirs(output_folder, exist_ok=True) + + return os.path.join(output_folder, f"{name}-{guid}.DGML") + + +def writefile(filename: str, data: str) -> None: + with open(filename, "w") as file: + file.write(data) + + +def reassemble_file(filename: str) -> List[Tuple[str, str]]: + result: List[Tuple[str, str]] = [] + + try: + data = load_file(filename) + guids = get_all_guids(data) + + for guid in guids: + result.append(get_graph(data, guid)) + except (IndexError, ValueError) as e: + print(f"error processing {filename}: {e}") + + return result + + +def all_files() -> None: + if not os.path.isdir("data"): + print("directory 'data' does not exist.") + return + + for input_file in get_all_event_files(): + try: + data = load_file(input_file) + guids = get_all_guids(data) + + os.makedirs("output", exist_ok=True) + + for guid in guids: + graph, _ = get_graph(data, guid) + output_file = get_output_file(input_file, guid, "output") + print(f'Saving "{output_file}"') + writefile(output_file, graph) + + except (IndexError, ValueError) as e: + print(f"error processing {input_file}: {e}") + + +# SCRIPT + + +def print_help() -> None: + print( + """ +Guide for rebuild.py + +(requires Python 3.8 or later) + +Use: + +\tpython rebuild.py \tRebuilds all graphs in "./data" and writes them to "./output". + +\tpython rebuild.py \tRebuilds and writes them to +""" + ) + + +def main() -> None: + if len(sys.argv) == 1: + print("Reassembling all graphs in ./data") + all_files() + if len(sys.argv) == 2: + print_help() + if len(sys.argv) == 3: + _, input_file, output_folder = sys.argv + + for graph, guid in reassemble_file(input_file): + output_file = get_output_file(input_file, guid, output_folder) + print(f'Saving "{output_file}"') + writefile(get_output_file(input_file, guid, output_folder), graph) + + +if __name__ == "__main__": + main()