From 009d58bd45497c300b1bc6dcad2e927f9c4dea55 Mon Sep 17 00:00:00 2001 From: t-saste Date: Mon, 20 Jul 2020 13:54:52 -0700 Subject: [PATCH 1/3] job graph script --- ASJobGraphEvents/README.md | 17 +++ ASJobGraphEvents/rebuild.py | 239 ++++++++++++++++++++++++++++++++++++ 2 files changed, 256 insertions(+) create mode 100644 ASJobGraphEvents/README.md create mode 100644 ASJobGraphEvents/rebuild.py diff --git a/ASJobGraphEvents/README.md b/ASJobGraphEvents/README.md new file mode 100644 index 0000000..f8a41a2 --- /dev/null +++ b/ASJobGraphEvents/README.md @@ -0,0 +1,17 @@ +# Job Graph Events in Power BI + +Job Graph events can be used to identify bottlenecks in data refreshes by highlighting the critical path. For instances of Analysis Services not running on-premise, the graph is broken into 16 Kb chunks, each in their own event. The events can be reassembled with this script. + +## Usage + +1. Start a trace in SQL Server Profiler and select "Job Graph Events". +2. Start a data refresh ("Process Full" in SQL Server Management Studio). +3. Wait for all trace events to arrive in Profiler. +4. `File > Save As > Trace XML File` +5. Aim `rebuild.py` at this file like so: + +```bash +python rebuild.py path\to\trace.xml output_folder +``` + +6. Inside `output_folder` there will be two .DGML files, which can be opened in Visual Studio. diff --git a/ASJobGraphEvents/rebuild.py b/ASJobGraphEvents/rebuild.py new file mode 100644 index 0000000..e06355f --- /dev/null +++ b/ASJobGraphEvents/rebuild.py @@ -0,0 +1,239 @@ +""" +Rebuilds a DGML file. Requires Python 3.8. +""" + +from typing import Dict, List, Tuple, Set, NamedTuple, Optional +import csv, re, os, operator, sys +import xml.etree.ElementTree as ET + + +maxsize = sys.maxsize +while True: + try: + csv.field_size_limit(maxsize) + break + except OverflowError: + maxsize //= 2 + +# TYPES + + +class Row(NamedTuple): + guid: str + order_marker: int + textdata: str + + +# PARSING + + +def load_file(filename: str) -> List[Row]: + """ + Returns a list of events, not sorted or filtered. + """ + _, ext = os.path.splitext(filename) + + if ext == ".csv": + with open(filename) as file: + dict_rows = csv.DictReader(file) + rows = [make_row_from_jarvis(row["MessageText"]) for row in dict_rows] + + return [r for r in rows if r] + + elif ext == ".xml": + tree = ET.parse(filename) + ns = {"": "http://tempuri.org/TracePersistence.xsd"} + + xml_rows: List[Optional[Row]] = [] + + for event in tree.findall(".//Event", ns): + xml_rows.append(make_row_from_xml(event, ns)) + + return [r for r in xml_rows if r] + else: + return [] + + +def make_row_from_xml(event: ET.Element, ns: Dict[str, str]) -> Optional[Row]: + if event.attrib["id"] != "134": + return None + + textdata = None + order_marker = None + guid = None + subclass = None + + for col in event.findall("Column", ns): + if col.attrib["id"] == "46": + guid = col.text + + if col.attrib["id"] == "1": + subclass = col.text + + if col.attrib["id"] == "10" and col.text: + order_marker = int(col.text) + + if col.attrib["id"] == "42": + textdata = col.text + + if textdata and order_marker is not None and guid and subclass: + suffix = "annotated" if subclass == "2" else "plan" + return Row(f"{guid}-{suffix}", order_marker, textdata) + + return None + + +def make_row_from_jarvis(message_txt: str) -> Optional[Row]: + if "graphcorrelationid" in message_txt.lower(): + print( + "This event is from an older version of the job graph feature (shouldn't have 'GraphCorrelationID' in it)" + ) + + match = re.match(r"TextData: (.*); IntegerData: (.\d*)", message_txt) + if match: + textdata, guid, order_marker_str = match.group(1, 2, 3) + order_marker = int(order_marker_str) + return Row(guid, order_marker, textdata) + + return None + + +def extract_metadata(header_row: Row) -> Optional[Tuple[int, int]]: + # should really extract things correctly here + m = re.match( + r".*Length=\"(\d*)\".*AdditionalEvents=\"(\d*)\".*", header_row.textdata + ) + + if not m: + return None + + return int(m.group(1)), int(m.group(2)) + + +def remove_pii_tags(protected_data: str) -> str: + if protected_data[:5] == "" and protected_data[-6:] == "": + return protected_data[5:-6] + return protected_data + + +def get_all_guids(data: List[Row]) -> Set[str]: + return {row.guid for row in data} + + +# GRAPH + + +def get_graph(data: List[Row], guid: str) -> Tuple[str, str]: + rows = [row for row in data if row.guid == guid] + + rows = sorted(rows, key=operator.attrgetter("order_marker")) + + header, *graph_data = rows + + metadata = extract_metadata(header) + + if metadata: + size, additional_events = metadata + assert additional_events == len( + graph_data + ), f"metadata says there are {additional_events} rows; but there are {len(graph_data)}" + + graph_str_builder = [remove_pii_tags(row.textdata) for row in graph_data] + + return "".join(graph_str_builder), guid + + +# INPUT/OUTPUT FILES + + +def get_all_event_files() -> List[str]: + return [os.path.join("data", f) for f in os.listdir("data")] + + +def get_output_file(input_file: str, guid: str, output_folder: str) -> str: + _, input_file = os.path.split(input_file) + name, ext = os.path.splitext(input_file) + + os.makedirs(output_folder, exist_ok=True) + + return os.path.join(output_folder, f"{name}-{guid}.DGML") + + +def writefile(filename: str, data: str) -> None: + with open(filename, "w") as file: + file.write(data) + + +def reassemble_file(filename: str) -> List[Tuple[str, str]]: + result: List[Tuple[str, str]] = [] + + try: + data = load_file(filename) + guids = get_all_guids(data) + + for guid in guids: + result.append(get_graph(data, guid)) + except (IndexError, ValueError) as e: + print(f"error processing {filename}: {e}") + + return result + + +def all_files() -> None: + if not os.path.isdir("data"): + print("directory 'data' does not exist.") + return + + for input_file in get_all_event_files(): + try: + data = load_file(input_file) + guids = get_all_guids(data) + + os.makedirs("output", exist_ok=True) + + for guid in guids: + graph, _ = get_graph(data, guid) + output_file = get_output_file(input_file, guid, "output") + print(f'Saving "{output_file}"') + writefile(output_file, graph) + + except (IndexError, ValueError) as e: + print(f"error processing {input_file}: {e}") + + +# SCRIPT + + +def print_help() -> None: + print( + """ +Guide for rebuild.py + +(requires Python 3.8 or later) + +Use: + +\tpython rebuild.py \tRebuilds all graphs in "./data" and writes them to "./output". + +\tpython rebuild.py \tRebuilds and writes them to +""" + ) + + +def main() -> None: + if len(sys.argv) == 1: + print("Reassembling all graphs in ./data") + all_files() + if len(sys.argv) == 2: + print_help() + if len(sys.argv) == 3: + _, input_file, output_folder = sys.argv + + for graph, guid in reassemble_file(input_file): + output_file = get_output_file(input_file, guid, output_folder) + print(f'Saving "{output_file}"') + writefile(get_output_file(input_file, guid, output_folder), graph) + + +if __name__ == "__main__": + main() From 4922a6ae76f97161828e2376e24d4f5e96c26da0 Mon Sep 17 00:00:00 2001 From: t-saste Date: Mon, 20 Jul 2020 14:11:52 -0700 Subject: [PATCH 2/3] added link in readme.md --- ASJobGraphEvents/README.md | 5 +++++ README.md | 3 +++ 2 files changed, 8 insertions(+) diff --git a/ASJobGraphEvents/README.md b/ASJobGraphEvents/README.md index f8a41a2..20b3fce 100644 --- a/ASJobGraphEvents/README.md +++ b/ASJobGraphEvents/README.md @@ -2,6 +2,11 @@ Job Graph events can be used to identify bottlenecks in data refreshes by highlighting the critical path. For instances of Analysis Services not running on-premise, the graph is broken into 16 Kb chunks, each in their own event. The events can be reassembled with this script. +## Requirements + +* Python 3.8 or later +* Visual Studio + ## Usage 1. Start a trace in SQL Server Profiler and select "Job Graph Events". diff --git a/README.md b/README.md index 25fe7e2..8e8838b 100644 --- a/README.md +++ b/README.md @@ -22,6 +22,9 @@ BISM Normalizer is a schema diff tool for tabular models ## [UsqlScripts](https://github.com/Microsoft/Analysis-Services/tree/master/UsqlScripts) Sample U-SQL scripts that demonstrate how to process a TPC-DS data set in Azure Data Lake. +## [ASJobGraph](https://github.com/Microsoft/Analysis-Services/tree/master/ASJobGraphEvents) +Python script to reassemble job graph events from Anlysis Services. + ## Code of Conduct This project has adopted the [Microsoft Open Source Code of Conduct](https://opensource.microsoft.com/codeofconduct/). From f9ddc12a9be5073ec46b63334efe086031699488 Mon Sep 17 00:00:00 2001 From: Sam Date: Wed, 29 Jul 2020 10:10:49 -0700 Subject: [PATCH 3/3] Adds Gantt Charts (#63) * added gantt charts * removed .pycache * updated for newer versions of the event Co-authored-by: t-saste --- ASJobGraphEvents/README.md | 20 ++++ ASJobGraphEvents/gantt/__init__.py | 0 ASJobGraphEvents/gantt/dgml.py | 59 ++++++++++++ ASJobGraphEvents/gantt/gantt_types.py | 6 ++ ASJobGraphEvents/gantt/output.css | 70 ++++++++++++++ ASJobGraphEvents/gantt/output.py | 128 ++++++++++++++++++++++++++ ASJobGraphEvents/gantt/script.py | 86 +++++++++++++++++ ASJobGraphEvents/gantt/structures.py | 72 +++++++++++++++ ASJobGraphEvents/gantt/utility.py | 16 ++++ ASJobGraphEvents/rebuild.py | 43 ++++++--- 10 files changed, 488 insertions(+), 12 deletions(-) create mode 100644 ASJobGraphEvents/gantt/__init__.py create mode 100644 ASJobGraphEvents/gantt/dgml.py create mode 100644 ASJobGraphEvents/gantt/gantt_types.py create mode 100644 ASJobGraphEvents/gantt/output.css create mode 100644 ASJobGraphEvents/gantt/output.py create mode 100644 ASJobGraphEvents/gantt/script.py create mode 100644 ASJobGraphEvents/gantt/structures.py create mode 100644 ASJobGraphEvents/gantt/utility.py diff --git a/ASJobGraphEvents/README.md b/ASJobGraphEvents/README.md index 20b3fce..d8b5708 100644 --- a/ASJobGraphEvents/README.md +++ b/ASJobGraphEvents/README.md @@ -2,6 +2,8 @@ Job Graph events can be used to identify bottlenecks in data refreshes by highlighting the critical path. For instances of Analysis Services not running on-premise, the graph is broken into 16 Kb chunks, each in their own event. The events can be reassembled with this script. +# Rebuilding the DGMl file + ## Requirements * Python 3.8 or later @@ -20,3 +22,21 @@ python rebuild.py path\to\trace.xml output_folder ``` 6. Inside `output_folder` there will be two .DGML files, which can be opened in Visual Studio. + +# Creating a Gantt Chart + +## Requirements + +* Python 3.8 or later +* A valid job graph DGML file (from above) + +## Usage + +1. Get a .DGML file with all the anntoations (running duration, waiting duration, etc.) +2. Run `gantt\script.py` like so: + +```bash +python gantt\script.py path\to\file.dgml output_folder +``` + +3. Inside `output_folder` there will be an .html file that can be opened in a browser. diff --git a/ASJobGraphEvents/gantt/__init__.py b/ASJobGraphEvents/gantt/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/ASJobGraphEvents/gantt/dgml.py b/ASJobGraphEvents/gantt/dgml.py new file mode 100644 index 0000000..4eba948 --- /dev/null +++ b/ASJobGraphEvents/gantt/dgml.py @@ -0,0 +1,59 @@ +from dataclasses import dataclass +from typing import List, Set, Iterable, Optional, cast +from datetime import datetime +import xml.etree.ElementTree as ET + +from structures import Job +from gantt_types import ThreadId + + +def read_jobs(filename: str) -> List[Job]: + jobs: List[Job] = [] + + doc = ET.parse(filename) + root = doc.getroot() + + try: + nodes = [child for child in root if "nodes" in child.tag.lower()][0] + except IndexError: + return jobs + + for node in nodes: + if job := parse_job_node(node): + jobs.append(job) + + return jobs + + +def parse_iso(time: str) -> datetime: + if time[-1].lower() == "z": + time = time[:-1] + + return datetime.fromisoformat(time) + + +def parse_thread_id(s: str) -> ThreadId: + return ThreadId(int(s)) + + +def strip_newlines(s: str) -> str: + + return "".join([c for c in s if ord(c) > 32]) + + +def parse_job_node(node: ET.Element) -> Optional[Job]: + for attr, value in node.attrib.items(): + if attr == "StartedAt": + start = parse_iso(value) + if attr == "FinishedAt": + end = parse_iso(value) + if attr == "Label": + name = value + if attr == "Slot" or attr == "Thread": + thread = value + + try: + return Job(start, end, strip_newlines(name), parse_thread_id(thread)) + except UnboundLocalError: + # most likely doesn't include "Thread" or "Slot" attribute. + return None diff --git a/ASJobGraphEvents/gantt/gantt_types.py b/ASJobGraphEvents/gantt/gantt_types.py new file mode 100644 index 0000000..76c5aea --- /dev/null +++ b/ASJobGraphEvents/gantt/gantt_types.py @@ -0,0 +1,6 @@ +from typing import NewType + +ThreadId = NewType("ThreadId", int) +Millisecond = NewType("Millisecond", float) +Second = NewType("Second", float) + diff --git a/ASJobGraphEvents/gantt/output.css b/ASJobGraphEvents/gantt/output.css new file mode 100644 index 0000000..78142ae --- /dev/null +++ b/ASJobGraphEvents/gantt/output.css @@ -0,0 +1,70 @@ +html { + overflow-x: scroll; +} + +* { + box-sizing: border-box; +} + +main { + padding: 5px; + font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, Oxygen, Ubuntu, Cantarell, 'Open Sans', 'Helvetica Neue', sans-serif; +} + +div.gantt { + position: relative; + padding: 25px 0px; +} + +div.row { + white-space: nowrap; + position: relative; + height: 2em; +} + +div.row:nth-child(even) { + background-color: #f2f2f2; +} + +div.row>span.axis-tick { + border-left: 1px solid #777777; + position: absolute; + padding-left: 1ch; + display: inline-block; + height: 100%; +} + +div.row>span.axis-tick:nth-of-type(2) { + border-left: 0; +} + +div.row>span.job { + display: inline-block; + height: 100%; + border-radius: 3px; + border: 0.5px solid white; + position: absolute; +} + +div.row>span.legend { + display: inline-block; + height: 100%; + border-right: 1px solid #777777; +} + +span[data-descr]:hover::after, +span[data-descr]:focus::after { + content: attr(data-descr); + position: absolute; + left: 0px; + top: 1.8em; + min-width: 200px; + border: 1px #aaaaaa solid; + border-radius: 10px; + background-color: #ffff8d; + padding: 6px; + color: #000000; + font-size: 14px; + z-index: 1; + white-space: pre; +} \ No newline at end of file diff --git a/ASJobGraphEvents/gantt/output.py b/ASJobGraphEvents/gantt/output.py new file mode 100644 index 0000000..4aeb4d6 --- /dev/null +++ b/ASJobGraphEvents/gantt/output.py @@ -0,0 +1,128 @@ +from datetime import datetime +from structures import Gantt, Row, Job +from gantt_types import Second, Millisecond +import structures +import utility +import operator + +COLORS = [ + "#d50000", + "#00bfa5", + "#ff6f00", + "#aa00ff", + "#006064", + "#ffd600", + "#64dd17", +] + +HEADER_COLUMN_WIDTH = 240 + + +def ms_to_px(ms: Millisecond) -> float: + return ms / 10 + + +def job_to_html(job: Job, start: datetime, color: str) -> str: + left = ms_to_px(utility.duration_ms(start, job.start)) + HEADER_COLUMN_WIDTH + width = ms_to_px(structures.job_duration_ms(job)) + + return f"""""" + + +def row_to_html( + row: Row, start: datetime, process_num: int, color: str, width: float +) -> str: + legend_html = f"""Concurrency Slot {process_num} ({utility.ms_to_s(structures.row_computing_duration_ms(row)):.1f}s)""" + + jobs_html = "\n".join([job_to_html(job, start, color) for job in row.jobs]) + + return ( + f"""
{legend_html}{jobs_html}
""" + ) + + +def rownum_to_top(num: int) -> float: + return num * 2 + + +def make_axis_span(left: float, s: Second) -> str: + return f"""{s} sec""" + + +def make_axis_html(max_seconds: Second) -> str: + seconds = [Second(i * 2) for i in range(1000)] + + seconds = [i for i in seconds if i < max_seconds] + + axis_spans = "".join( + [ + make_axis_span(ms_to_px(utility.s_to_ms(s)) + HEADER_COLUMN_WIDTH, s) + for s in seconds + ] + ) + + return f"""
+ Total Processing Time + {axis_spans} +
""" + + +def gantt_to_html(g: Gantt) -> str: + if not g: + return "" + + start = min([row.jobs[0].start for row in g]) + + max_seconds = max([utility.ms_to_s(structures.row_duration_ms(row)) for row in g]) + + rows_html = "\n".join( + [ + row_to_html( + row, + start, + num + 1, + COLORS[num % len(COLORS)], + ms_to_px(utility.s_to_ms(max_seconds)) + HEADER_COLUMN_WIDTH, + ) + for num, row in enumerate( + sorted( + g, + reverse=True, + key=lambda r: structures.row_computing_duration_ms(r), + ) + ) + ] + ) + + return f"""
{make_axis_html(max_seconds)}{rows_html}
""" + + +def style() -> str: + with open("./gantt/output.css") as css: + return f"""""" + + +def html(g: Gantt) -> str: + html = f""" + + + +
+

Gantt Chart

+

Max parallelism: {len(g)}

+{gantt_to_html(g)} +

Explanation

+

+

    +
  • Each row represents a parallelism "slot"; if "maxParallelism" was 4, then there are 4 rows.
  • +
  • Each colored block is a job; hover with a mouse to show the name and how long it took.
  • +
  • Each row shows the total time spent doing jobs to highlight bottlenecks.
  • +
+

+
+{style()} + + +""" + + return html if g else "" diff --git a/ASJobGraphEvents/gantt/script.py b/ASJobGraphEvents/gantt/script.py new file mode 100644 index 0000000..20348cf --- /dev/null +++ b/ASJobGraphEvents/gantt/script.py @@ -0,0 +1,86 @@ +from dataclasses import dataclass +from typing import List, Set, Iterable, Optional, cast +import os, sys + +from structures import Job, Gantt, Row, new_gantt + +import dgml, output + + +def get_dir(folder: str) -> Set[str]: + return set( + [ + os.path.join(folder, filename) + for filename in os.listdir(folder) + if os.path.isfile(os.path.join(folder, filename)) + ] + ) + + +def write_document(content: str, filepath: str) -> None: + os.makedirs(os.path.dirname(filepath), exist_ok=True) + + with open(filepath, "w") as file: + file.write(content) + + +def output_file_path(file: str, out_folder: str) -> str: + base = os.path.basename(file) + base, ext = os.path.splitext(base) + + return os.path.join(out_folder, base + ".html") + + +def make_gantt(file: str, out_folder: str) -> None: + html = output.html(new_gantt(dgml.read_jobs(file))) + + if not html: + print(f"No jobs found in {file}; maybe this is not the -annotated file?") + else: + write_document(html, output_file_path(file, out_folder)) + print(f'Saving "{output_file_path(file, out_folder)}"') + + +def make_gantt_dir(folder: str, out_folder: str) -> None: + for file in get_dir(folder): + make_gantt(file, out_folder) + + +# SCRIPT + + +def print_help() -> None: + print( + """ +Guide for gantt/script.py + +(requires Python 3.8 or later) + +Use: + +\tpython gantt/script.py +\t\tRebuilds all graphs in "./data" and writes them to "./output". + +\tpython rebuild.py ... +\t\tRebuilds s and writes them to +""" + ) + + +def main() -> None: + if len(sys.argv) < 3: + print_help() + else: + _, *inputs, output_folder = sys.argv + + for i in inputs: + if os.path.isfile(i): + make_gantt(i, output_folder) + elif os.path.isdir(i): + make_gantt_dir(i, output_folder) + else: + print(f"{i} is not a file or directory.") + + +if __name__ == "__main__": + main() diff --git a/ASJobGraphEvents/gantt/structures.py b/ASJobGraphEvents/gantt/structures.py new file mode 100644 index 0000000..ff87c9e --- /dev/null +++ b/ASJobGraphEvents/gantt/structures.py @@ -0,0 +1,72 @@ +from typing import Tuple, List, NamedTuple, Optional, NewType +from datetime import datetime + +import utility +from gantt_types import ThreadId, Millisecond, Second + +import operator + + +class Job(NamedTuple): + start: datetime + end: datetime + name: str + thread: ThreadId + + +class Row(NamedTuple): + jobs: List[Job] + thread: ThreadId + + +Gantt = List[Row] + + +def add_job(row: Row, job: Job) -> None: + assert row.thread == job.thread, f"row: {row.thread}, job: {job.thread}" + + if row.jobs: + assert ( + row.jobs[-1].end <= job.start + ), f"{row.jobs[-1].end} is not less than {job.start} (thread id: {row.thread})" + + row.jobs.append(job) + + +def new_row(job: Job) -> Row: + return Row([job], job.thread) + + +def row_duration_ms(row: Row) -> Millisecond: + return utility.duration_ms(row.jobs[0].start, row.jobs[-1].end) + + +def row_computing_duration_ms(row: Row) -> Millisecond: + return Millisecond(sum([job_duration_ms(job) for job in row.jobs])) + + +def row_with_thread(g: Gantt, thread: ThreadId) -> Optional[Row]: + for row in g: + if row.thread == thread: + return row + return None + + +def add_row(g: Gantt, row: Row) -> None: + g.append(row) + + +def new_gantt(jobs: List[Job]) -> Gantt: + g: Gantt = [] + + for job in sorted(jobs, key=operator.attrgetter("start")): + if row := row_with_thread(g, job.thread): + add_job(row, job) + else: + add_row(g, new_row(job)) + + return g + + +def job_duration_ms(job: Job) -> Millisecond: + return utility.duration_ms(job.start, job.end) diff --git a/ASJobGraphEvents/gantt/utility.py b/ASJobGraphEvents/gantt/utility.py new file mode 100644 index 0000000..4cd972f --- /dev/null +++ b/ASJobGraphEvents/gantt/utility.py @@ -0,0 +1,16 @@ +from datetime import datetime +from gantt_types import Millisecond, Second + + +def duration_ms(start_time: datetime, end_time: datetime) -> Millisecond: + duration = end_time - start_time + + return Millisecond((duration.seconds * 1000000 + duration.microseconds) // 1000) + + +def ms_to_s(m: Millisecond) -> Second: + return Second(m / 1000) + + +def s_to_ms(s: Second) -> Millisecond: + return Millisecond(s * 1000) diff --git a/ASJobGraphEvents/rebuild.py b/ASJobGraphEvents/rebuild.py index e06355f..78c01d2 100644 --- a/ASJobGraphEvents/rebuild.py +++ b/ASJobGraphEvents/rebuild.py @@ -1,3 +1,5 @@ +#!/usr/bin/env python3 + """ Rebuilds a DGML file. Requires Python 3.8. """ @@ -36,7 +38,14 @@ def load_file(filename: str) -> List[Row]: if ext == ".csv": with open(filename) as file: dict_rows = csv.DictReader(file) - rows = [make_row_from_jarvis(row["MessageText"]) for row in dict_rows] + rows = [ + make_row_from_jarvis( + row["MessageText"], + row["CurrentActivityId"], + int(row["Engine_EventSubclass"]), + ) + for row in dict_rows + ] return [r for r in rows if r] @@ -64,7 +73,7 @@ def make_row_from_xml(event: ET.Element, ns: Dict[str, str]) -> Optional[Row]: subclass = None for col in event.findall("Column", ns): - if col.attrib["id"] == "46": + if col.attrib["id"] == "46" or col.attrib["id"] == "53": guid = col.text if col.attrib["id"] == "1": @@ -83,19 +92,29 @@ def make_row_from_xml(event: ET.Element, ns: Dict[str, str]) -> Optional[Row]: return None -def make_row_from_jarvis(message_txt: str) -> Optional[Row]: - if "graphcorrelationid" in message_txt.lower(): - print( - "This event is from an older version of the job graph feature (shouldn't have 'GraphCorrelationID' in it)" - ) +def make_row_from_jarvis( + message_txt: str, activity_id: str, subclass: int +) -> Optional[Row]: + guid = activity_id + str(subclass) + ("-annotated" if subclass == 2 else "-plan") - match = re.match(r"TextData: (.*); IntegerData: (.\d*)", message_txt) - if match: - textdata, guid, order_marker_str = match.group(1, 2, 3) + if "graphcorrelationid" in message_txt.lower(): + match = re.match( + r"TextData: (.*); GraphCorrelationID: (.*); IntegerData: (.\d*)", + message_txt, + ) + if match: + textdata, order_marker_str = match.group(1, 3) + else: + match = re.match(r"TextData: (.*); IntegerData: (.\d*)", message_txt) + + if match: + textdata, order_marker_str = match.group(1, 2) + + try: order_marker = int(order_marker_str) return Row(guid, order_marker, textdata) - - return None + except UnboundLocalError: + return None def extract_metadata(header_row: Row) -> Optional[Tuple[int, int]]: