# Pipeline

A Pipeline is a wrapper that enables the execution of Units over a single sample or a dataset. Refer to the Pipeline Execution Lifecycle section for specific details.

Run Single Sample
Run Dataset
from verdict import Pipeline

pipeline = Pipeline() \
  >> Layer(
    JudgeUnit(BooleanScale()).prompt(f"""
      Is this funny?

      {source.joke}
    """)
  , 5) \
  >> MeanPoolUnit()

response, leaf_node_prefixes = pipeline.run(
  Schema.of(joke="Why did the chicken cross the road? To get to the other side."))
from verdict.dataset import DatasetWrapper

dataset = DatasetWrapper.from_hf(
  load_dataset("jokes-ai/jokes"),
  columns=["joke"]
)

response, leaf_node_prefixes = pipeline.run_from_dataset(
    dataset,
    max_workers=128,
    display=True,
    graceful=True
)

# Usage

Both run and run_from_dataset accept the following arguments:

  • max_workers: max number of worker threads1
  • display: visualize the progress of the pipeline.
  • graceful: controls whether an error will cause the program to exit.

To refer to a particular Unit in the pipeline, we generate determinstic human-readable prefixes for each Unit in the pipeline. Refer to the Prefix section for more details. After the pipeline has completed execution, you can inspect all intermediate and leaf node prefixes.

  • run: we return a dictionary mapping prefix to OutputSchema.
  • run_from_dataset: we return a pd.DataFrame where the columns are {prefix}_{field_name} for each field in the OutputSchema.

# Failure/Termination

Failures in threads are handled by the executor differently depending on the cause:

  • declaration-time errors (e.g., a Prompt contains an invalid field name), fail immediately
  • runtime errors (e.g., inference provider downtime, ResponseSchema not obeyed by Structured Output extractor), retry until the Model Selection Policy is exhausted

# Visualization

Pass display=True to visualize the progress of a pipeline within the current stdout. If running a single sample, the results of each Unit will be displayed alongside the progress tree.

# Streaming

Mark a Unit with .stream() to enable streaming output. If visualization is enabled, five Unit's at a time will be streamed to the right-half of the stdout.

# Experiment Config

Pass an optional ExperimentConfig to run_from_dataset via the experiment_config parameter. Refer to the Experiment section for more details.


  1. We will adjust your system's process file descriptor limit (equivalent to ulimit -n max_workers) accordingly.