Pipeline¶
The Pipeline class chains all preprocessing stages. Each stage is optional — use only the stages you need.
Pipeline¶
quprep.core.pipeline.Pipeline(ingester=None, preprocessor=None, cleaner=None, reducer=None, normalizer=None, encoder=None, exporter=None, schema=None, drift_detector=None)
¶
Composable preprocessing pipeline for quantum data preparation.
Each stage is optional and works independently. You can use just the encoder, just the reducer, or any combination without touching the rest.
sklearn-compatible: supports fit(), transform(), get_params(),
and set_params() in addition to the native fit_transform().
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ingester
|
optional
|
Data ingestion component. Auto-detected from source type if omitted. |
None
|
preprocessor
|
optional
|
Preprocessing step applied after ingestion. Accepts a single transformer
or a list of transformers applied in order (e.g. |
None
|
cleaner
|
optional
|
Data cleaning component (Imputer, OutlierHandler, CategoricalEncoder). |
None
|
reducer
|
optional
|
Dimensionality reduction component (PCA, LDA, etc.). |
None
|
normalizer
|
optional
|
Normalization component. Auto-selected per encoding if omitted. |
None
|
encoder
|
optional
|
Quantum encoding component. Returns a processed Dataset if omitted. |
None
|
exporter
|
optional
|
Framework export component. Returns EncodedResult list if omitted. |
None
|
schema
|
DataSchema
|
Input schema to validate at pipeline entry. Raises SchemaViolationError on mismatch. |
None
|
Examples:
>>> pipeline = Pipeline(
... encoder=AngleEncoder(),
... exporter=QASMExporter(),
... )
>>> result = pipeline.fit_transform(df)
>>> print(result.circuits[0])
Source code in quprep/core/pipeline.py
Functions¶
fingerprint()
¶
Compute a reproducibility fingerprint for this pipeline.
Returns a :class:~quprep.core.fingerprint.FingerprintResult containing
a deterministic SHA-256 hash of the full pipeline configuration (stage
classes, parameters, and dependency versions). The hash is stable across
runs for the same configuration and suitable for paper methods sections.
Returns:
| Type | Description |
|---|---|
FingerprintResult
|
|
Examples:
Source code in quprep/core/pipeline.py
fit(source, y=None)
¶
Fit all pipeline stages on training data.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
source
|
str, Path, np.ndarray, pd.DataFrame, or Dataset
|
Training data. |
required |
y
|
ndarray or array - like
|
Target labels. Stored in |
None
|
Returns:
| Type | Description |
|---|---|
Pipeline
|
Returns |
Source code in quprep/core/pipeline.py
fit_transform(source, y=None)
¶
Fit all stages and transform in a single pass.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
source
|
str, Path, np.ndarray, pd.DataFrame, or Dataset
|
Input data. |
required |
y
|
ndarray or array - like
|
Target labels. Stored in |
None
|
Returns:
| Type | Description |
|---|---|
PipelineResult
|
Contains |
Source code in quprep/core/pipeline.py
get_params(deep=True)
¶
Return pipeline parameters (sklearn convention).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
deep
|
bool
|
Ignored — included for sklearn API compatibility. |
True
|
Returns:
| Type | Description |
|---|---|
dict
|
|
Source code in quprep/core/pipeline.py
load(path)
classmethod
¶
Load a previously saved pipeline from a file.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str or Path
|
Path to a file created by :meth: |
required |
Returns:
| Type | Description |
|---|---|
Pipeline
|
|
Raises:
| Type | Description |
|---|---|
TypeError
|
If the file does not contain a Pipeline object. |
Source code in quprep/core/pipeline.py
save(path)
¶
Persist the pipeline (configuration and fitted state) to a file.
Uses Python's pickle protocol. The saved file can be reloaded
with :meth:Pipeline.load and applied to new data without re-fitting.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str or Path
|
Destination file path (e.g. |
required |
Source code in quprep/core/pipeline.py
set_params(**params)
¶
Set pipeline parameters (sklearn convention).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
**params
|
object
|
Parameter names and values. |
{}
|
Returns:
| Type | Description |
|---|---|
Pipeline
|
Returns |
Raises:
| Type | Description |
|---|---|
ValueError
|
If an unknown parameter name is given. |
Source code in quprep/core/pipeline.py
stream(source, chunksize=1000)
¶
Apply a fitted pipeline to a large source in chunks without loading it fully into RAM.
The pipeline must be fitted first (via :meth:fit or
:meth:fit_transform). Normaliser statistics and all other fitted
parameters are reused for every chunk — only transform is called
per chunk, not fit.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
source
|
str, Path, or np.ndarray
|
|
required |
chunksize
|
int
|
Rows per chunk. |
1000
|
Yields:
| Type | Description |
|---|---|
PipelineResult
|
One result per chunk. |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If the pipeline has not been fitted. |
Examples:
>>> import numpy as np
>>> import quprep as qd
>>> X = np.random.default_rng(0).uniform(0, 1, (1000, 4))
>>> pipeline = qd.Pipeline(encoder=qd.AngleEncoder(), exporter=qd.QASMExporter())
>>> _ = pipeline.fit(X[:100])
>>> for result in pipeline.stream(X, chunksize=200):
... print(len(result.circuits))
Source code in quprep/core/pipeline.py
summary()
¶
Return a human-readable snapshot of the pipeline configuration.
Shows which stages are configured, whether the pipeline has been fitted, the resolved normalizer, and the last cost estimate (if available).
Returns:
| Type | Description |
|---|---|
str
|
|
Source code in quprep/core/pipeline.py
transform(source)
¶
Apply fitted pipeline stages to data.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
source
|
str, Path, np.ndarray, pd.DataFrame, or Dataset
|
Input data. |
required |
Returns:
| Type | Description |
|---|---|
PipelineResult
|
|
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If the pipeline has not been fitted yet. |
Source code in quprep/core/pipeline.py
PipelineResult¶
quprep.core.pipeline.PipelineResult(dataset, encoded, circuits, cost=None, audit_log=None, drift_report=None, stages=None)
¶
Output of Pipeline.fit_transform().
Attributes:
| Name | Type | Description |
|---|---|---|
dataset |
Dataset
|
The processed Dataset after all pipeline stages (post-normalization). |
encoded |
list[EncodedResult] or None
|
One EncodedResult per sample. None if no encoder was configured. |
circuits |
list or None
|
Exported circuit objects (framework-specific). None if no exporter was configured. |
cost |
CostEstimate or None
|
Gate-count and NISQ-safety estimate for the chosen encoder. None if no encoder was configured. |
audit_log |
list[dict] or None
|
One entry per preprocessing stage that ran, in order. Each dict has keys:
|
stages |
dict[str, Dataset]
|
Intermediate datasets keyed by stage: |
Attributes¶
circuit
property
¶
First item in the batch — convenience for single-sample use.
Returns the first exported circuit if an exporter was configured,
otherwise the first EncodedResult if only an encoder was configured,
otherwise None.
Functions¶
summary()
¶
Return a human-readable report of the pipeline result.
Includes the audit log as a formatted table (if any preprocessing stages ran) and the cost estimate breakdown (if an encoder was used).
Returns:
| Type | Description |
|---|---|
str
|
|
FingerprintResult¶
quprep.core.fingerprint.FingerprintResult(config, hash_hex)
¶
Output of :func:fingerprint_pipeline.
Attributes:
| Name | Type | Description |
|---|---|---|
config |
dict
|
Full pipeline configuration (stages + dependency versions). This is the dict that was hashed — no timestamp, fully deterministic. |
hash |
str
|
SHA-256 hex digest of the canonical JSON serialisation of |
Functions¶
save(path, format='json')
¶
Write the fingerprint to a file.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
Destination file path. |
required |
format
|
('json', 'yaml')
|
Output format. |
"json"
|
to_dict()
¶
Return the config augmented with the hash and a UTC timestamp.
to_json(indent=2)
¶
Return a JSON string (hash + timestamp + config).
to_yaml()
¶
Return a YAML string (requires pyyaml).
fingerprint_pipeline¶
quprep.core.fingerprint.fingerprint_pipeline(pipeline)
¶
Compute a reproducibility fingerprint for pipeline.
The fingerprint captures the class name and constructor parameters of every configured stage (ingester, preprocessor, cleaner, reducer, normalizer, encoder, exporter, schema, drift_detector) plus the installed versions of key dependencies. The resulting SHA-256 hash is deterministic: the same configuration always produces the same hash regardless of when or where the pipeline runs.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pipeline
|
Pipeline
|
A |
required |
Returns:
| Type | Description |
|---|---|
FingerprintResult
|
Contains |
Examples:
Examples¶
Minimal — encode only¶
import quprep as qd
pipeline = qd.Pipeline(encoder=qd.AngleEncoder())
result = pipeline.fit_transform(data)
result.encoded # list[EncodedResult]
result.encoded[0].parameters # rotation angles for first sample
result.encoded[0].metadata # {"n_qubits": 4, "depth": 1, ...}
Full — clean + encode + export¶
import quprep as qd
pipeline = qd.Pipeline(
cleaner=qd.Imputer(strategy="knn"),
encoder=qd.AngleEncoder(rotation="ry"),
exporter=qd.QASMExporter(),
)
result = pipeline.fit_transform("data.csv")
result.circuits[0] # QASM string for first sample
With schema validation¶
import quprep as qd
schema = qd.DataSchema([
qd.FeatureSpec("age", dtype="continuous", min_value=0, max_value=120),
qd.FeatureSpec("income", dtype="continuous", min_value=0),
])
pipeline = qd.Pipeline(encoder=qd.AngleEncoder(), schema=schema)
result = pipeline.fit_transform("data.csv")
print(result.cost.nisq_safe) # True / False
result.summary() # audit table + cost breakdown
sklearn-style fit / transform split¶
import quprep as qd
pipeline = qd.Pipeline(
reducer=qd.PCAReducer(n_components=4),
encoder=qd.AngleEncoder(),
)
pipeline.fit(X_train)
r_train = pipeline.transform(X_train)
r_test = pipeline.transform(X_test)
Explicit normalizer¶
import quprep as qd
pipeline = qd.Pipeline(
encoder=qd.AngleEncoder(),
normalizer=qd.Scaler("zscore"), # override auto-selection
)
Saving and loading a fitted pipeline¶
import quprep as qd
pipeline = qd.Pipeline(
reducer=qd.PCAReducer(n_components=4),
encoder=qd.AngleEncoder(),
)
pipeline.fit(X_train)
pipeline.save("pipeline.pkl")
# Later — in a different process or deployment
loaded = qd.Pipeline.load("pipeline.pkl")
result = loaded.transform(X_new)
The parent directory is created automatically. All fitted state (reducer, normalizer, encoder) is preserved.
With drift detection¶
import quprep as qd
det = qd.DriftDetector(mean_threshold=3.0, std_threshold=2.0)
pipeline = qd.Pipeline(
encoder=qd.AngleEncoder(),
drift_detector=det,
)
pipeline.fit(X_train)
result = pipeline.transform(X_test)
print(result.drift_report.overall_drift) # True / False
print(result.drift_report.drifted_features) # list of feature names
Drift is checked automatically on every transform() call. A QuPrepWarning is issued when drift is detected. The drift detector state is preserved through save()/load().
Time series pipeline (v0.7.0)¶
import quprep as qd
pipeline = qd.Pipeline(
ingester=qd.TimeSeriesIngester(time_column="date"),
preprocessor=qd.WindowTransformer(window_size=8, step=1),
encoder=qd.AngleEncoder(),
)
result = pipeline.fit_transform("sensor_data.csv")
print(len(result.encoded)) # n_windows
print(result.encoded[0].metadata["n_qubits"]) # window_size × n_features
The preprocessor stage runs after ingestion and before cleaning/reduction. It is designed for shape-changing transforms like WindowTransformer.
Sparse data (v0.7.0)¶
import scipy.sparse as sp
import quprep as qd
sparse_matrix = sp.csr_matrix(X)
result = qd.Pipeline(encoder=qd.AngleEncoder()).fit_transform(sparse_matrix)
scipy.sparse matrices are accepted anywhere a NumPy array is expected. They are converted to dense at ingestion.
Labels and multi-label (v0.7.0)¶
import quprep as qd
# Attach labels at fit_transform time
result = qd.Pipeline(encoder=qd.AngleEncoder()).fit_transform(X, y=y)
print(result.dataset.labels) # preserved through all stages
# Or embed labels in the Dataset via CSVIngester
from quprep.ingest.csv_ingester import CSVIngester
pipeline = qd.Pipeline(
ingester=CSVIngester(target_columns="label"),
encoder=qd.AngleEncoder(),
)
result = pipeline.fit_transform("data.csv")
print(result.dataset.labels.shape) # (n_samples,)
For FeatureSelector(method="mutual_info"), labels in dataset.labels are used automatically — no separate labels= argument needed.
Inspecting intermediate stages (v0.10.0)¶
PipelineResult.stages gives access to the Dataset after each pipeline step:
result = qd.Pipeline(
cleaner=qd.OutlierHandler(),
reducer=qd.PCAReducer(n_components=4),
encoder=qd.AngleEncoder(),
).fit_transform(df)
print(result.stages["input"].data.shape) # raw input
print(result.stages["after_cleaner"].data.shape) # post outlier removal
print(result.stages["after_reducer"].data.shape) # post PCA
print(result.stages["after_normalizer"].data.shape) # pre-encoding
API consistency additions (v0.10.0)¶
Feature names after selection:
selector = qd.FeatureSelector(method="variance", threshold=0.01)
selector.fit(dataset)
print(selector.get_feature_names_out()) # ['age', 'income', ...]
Outlier mask:
handler = qd.OutlierHandler(method="iqr", action="remove")
handler.fit_transform(dataset)
print(handler.outlier_mask_) # bool array, True = outlier row
Reverse normalisation:
scaler = qd.Scaler("zscore")
scaled = scaler.fit_transform(dataset)
original = scaler.inverse_transform(scaled) # back to original scale
# Supported: minmax, minmax_pi, minmax_2pi, minmax_pm_pi, zscore
# Not supported: l2, binary, pm_one (not invertible)
Categorical cardinality control:
# Warn when a column has > 20 unique categories
# Group categories appearing fewer than 5 times as "_other"
encoder = qd.CategoricalEncoder(
strategy="onehot",
cardinality_threshold=20,
min_frequency=5,
)
encoder.fit_transform(dataset)
Explained variance (LDA):
pipeline = qd.Pipeline(reducer=qd.LDAReducer(n_components=3, labels=y))
pipeline.fit(dataset)
print(pipeline.reducer.explained_variance_ratio_) # also available on PCAReducer
Reproducibility fingerprinting (v0.8.0)¶
import quprep as qd
pipeline = qd.Pipeline(
cleaner=qd.Imputer(strategy="knn"),
reducer=qd.PCAReducer(n_components=4),
encoder=qd.AngleEncoder(rotation="ry"),
exporter=qd.QASMExporter(),
)
fp = pipeline.fingerprint()
print(fp.hash) # sha256 hex — stable across runs for the same config
fp.save("experiment.json") # JSON (default)
fp.save("experiment.yaml", format="yaml") # YAML (requires pyyaml)
print(fp.to_json()) # full JSON string including hash and UTC timestamp
The hash captures every stage class, all constructor parameters, and installed dependency versions. It is deterministic — the same configuration always produces the same hash regardless of when or where it runs. Include it in paper methods sections to make experiments exactly reproducible.