Fabric notebook orchestration

This page provides a complete notebook implementation for running Ploosh in Microsoft Fabric and tracking results over time.

Notebook structure

The notebook is composed of 4 cells:

  1. Parameters — Input variables for flexible execution
  2. Imports — Load the required libraries
  3. Ploosh execution — Run the test cases
  4. Results history — Persist results into a Delta table

Cell 1: Parameters

# Parameters (can be overridden by Fabric Pipeline)
casessubfolder = "/"
cases_filter = "*.yaml"

These parameters enable flexible execution:

  • casessubfolder: Target a specific subfolder within ploosh_cases/
  • cases_filter: Filter which YAML files to process

Cell 2: Imports

from ploosh import execute_cases
from pyspark.sql.functions import col, lit, to_date
from pyspark.sql.types import (
    StructType, StructField, StringType, DoubleType,
    LongType, TimestampType
)

Cell 3: Ploosh execution

outputfolder = "plooshoutputs"

casesfolderpath = f"/lakehouse/default/Files/plooshcases{casessub_folder}" connectionsfile = f"/lakehouse/default/Files/plooshconnections.yaml" outputpath = f"/lakehouse/default/Files/{outputfolder}"

execute_cases( cases=casesfolderpath, connections=connections_file, spark_session=spark, filter=cases_filter, pathoutput=outputpath )

The spark variable is automatically available in Fabric notebooks.

Cell 4: Results history

This cell reads the JSON output generated by Ploosh and appends it to a Delta table for historical tracking.

sparkoutputpath = f"Files/{outputfolder}/json/testresults.json"

schema = StructType([ StructField("execution_id", StringType(), True), StructField("name", StringType(), True), StructField("state", StringType(), True), StructField("source", StructType([ StructField("start", TimestampType(), True), StructField("end", TimestampType(), True), StructField("duration", DoubleType(), True), StructField("count", LongType(), True), StructField("executed_action", StringType(), True) ]), True), StructField("expected", StructType([ StructField("start", TimestampType(), True), StructField("end", TimestampType(), True), StructField("duration", DoubleType(), True), StructField("count", LongType(), True), StructField("executed_action", StringType(), True) ]), True), StructField("compare", StructType([ StructField("start", TimestampType(), True), StructField("end", TimestampType(), True), StructField("duration", DoubleType(), True), StructField("success_rate", DoubleType(), True) ]), True), StructField("error", StructType([ StructField("type", StringType(), True), StructField("message", StringType(), True), StructField("detailfilepath", StringType(), True) ]), True) ])

df = spark.read.schema(schema).option("multiline", "true").json(sparkoutputpath)

flatten_cols = [ col("execution_id"), col("name"), col("state"), todate(col("source.start")).alias("executiondate"),

col("source.start").alias("source_start"), col("source.end").alias("source_end"), col("source.duration").alias("source_duration"), col("source.count").cast("integer").alias("source_count"), col("source.executedaction").alias("sourceexecuted_action"),

col("expected.start").alias("expected_start"), col("expected.end").alias("expected_end"), col("expected.duration").alias("expected_duration"), col("expected.count").cast("integer").alias("expected_count"), col("expected.executedaction").alias("expectedexecuted_action"),

col("compare.start").alias("compare_start"), col("compare.end").alias("compare_end"), col("compare.duration").alias("compare_duration"), col("compare.successrate").alias("comparesuccess_rate"),

col("error.type").alias("error_type"), col("error.message").alias("error_message"), col("error.detailfilepath").alias("errordetailfile_path"), ]

dfflat = df.select(*flattencols) df_flat.write.mode("append") \ .option("mergeSchema", "true") \ .saveAsTable("ploosh_results")

Results table schema

The ploosh_results Delta table contains:

ColumnTypeDescription
executionidstringUnique identifier for the test run
namestringTest case name
statestringResult: passed, failed, error
executiondatedateDate of execution
sourcestarttimestampSource data loading start time
sourceendtimestampSource data loading end time
sourcedurationdoubleSource loading duration in seconds
sourcecountintegerNumber of rows in source dataset
sourceexecutedactionstringQuery or path executed for source
expectedstarttimestampExpected data loading start time
expectedendtimestampExpected data loading end time
expecteddurationdoubleExpected loading duration in seconds
expectedcountintegerNumber of rows in expected dataset
expectedexecutedactionstringQuery or path executed for expected
comparestarttimestampComparison start time
compareendtimestampComparison end time
comparedurationdoubleComparison duration in seconds
comparesuccessratedoublePercentage of matching rows (0.0 to 1.0)
errortypestringError category (headers, count, data, compare)
errormessagestringError description
errordetailfilepathstringPath to XLSX gap analysis file

Pipeline integration

This notebook can be called from a Fabric Pipeline using a Notebook activity:

  1. Create a new Pipeline
  2. Add a Notebook activity
  3. Point it to the orchestration notebook
  4. Override parameters (casessubfolder, cases_filter) as needed
  5. Schedule the pipeline or trigger it after upstream pipelines complete