#
Pipeline
A Pipeline
is a wrapper that enables the execution of Unit
s over a single sample or a dataset. Refer to the Pipeline Execution Lifecycle section for specific details.
from verdict import Pipeline
pipeline = Pipeline() \
>> Layer(
JudgeUnit(BooleanScale()).prompt(f"""
Is this funny?
{source.joke}
""")
, 5) \
>> MeanPoolUnit()
response, leaf_node_prefixes = pipeline.run(
Schema.of(joke="Why did the chicken cross the road? To get to the other side."))
from verdict.dataset import DatasetWrapper
dataset = DatasetWrapper.from_hf(
load_dataset("jokes-ai/jokes"),
columns=["joke"]
)
response, leaf_node_prefixes = pipeline.run_from_dataset(
dataset,
max_workers=128,
display=True,
graceful=True
)
#
Usage
Both run
and run_from_dataset
accept the following arguments:
max_workers
: max number of worker threads1display
: visualize the progress of the pipeline.graceful
: controls whether an error will cause the program to exit.
We use a separate ThreadPoolExecutor
for CPU-bound tasks (e.g., MapUnit
s).
from verdict import config
config.LIGHTWEIGHT_EXECUTOR_WORKER_COUNT = 128 # default is 32
To refer to a particular Unit
in the pipeline, we generate determinstic human-readable prefixes for each Unit
in the pipeline. Refer to the Prefix section for more details. After the pipeline has completed execution, you can inspect all intermediate and leaf node prefixes.
run
: we return a dictionary mapping prefix toOutputSchema
.run_from_dataset
: we return apd.DataFrame
where the columns are{prefix}_{field_name}
for each field in theOutputSchema
.
#
Failure/Termination
Failures in threads are handled by the executor differently depending on the cause:
- declaration-time errors (e.g., a Prompt contains an invalid field name), fail immediately
- runtime errors (e.g., inference provider downtime, ResponseSchema not obeyed by Structured Output extractor), retry until the Model Selection Policy is exhausted
#
Visualization
Pass display=True
to visualize the progress of a pipeline within the current stdout
.
If running a single sample, the results of each Unit
will be displayed alongside the progress tree.
#
Streaming
Mark a Unit
with .stream()
to enable streaming output. If visualization is enabled, five Unit
's at a time will be streamed to the right-half of the stdout
.
#
Experiment Config
Pass an optional ExperimentConfig
to run_from_dataset
via the experiment_config
parameter. Refer to the Experiment section for more details.
-
We will adjust your system's process file descriptor limit (equivalent to
ulimit -n max_workers
) accordingly.↩