faucet-stream
The fast, config-driven way to move data in Rust.
faucet-stream wires 21 source and 17 sink connectors together with a single
faucet binary that runs pipelines declaratively from a YAML/JSON file — no Rust
code required. Or skip the binary and embed the same engine in your own service
through the typed Source / Sink traits.
cargo install faucet-cli
faucet init my_pipeline --source postgres --sink bigquery
faucet validate pipeline.yaml
faucet run pipeline.yaml
What you get
- Fast and reliable by default — native streaming with bounded memory, connection pooling, multi-row inserts, bulk APIs, and parallel I/O.
- Config-driven or embeddable — run
faucet run pipeline.yaml, or callPipeline::new(&source, &sink).run().await?from Rust. - A runtime, not just connectors — incremental + resumable replication,
PostgreSQL change-data-capture, dead-letter queues, automatic retries, and
built-in Prometheus metrics +
tracingspans, all with zero per-connector code. - Pay only for what you use — every connector is a Cargo feature.
How this book is organized
- Getting Started — install, run your first pipeline in five minutes, and learn the core concepts.
- Tutorials — end-to-end walkthroughs of real pipelines (incremental REST → BigQuery, Postgres CDC, DAGs, embedding).
- Cookbook — short, task-oriented recipes for pagination, auth, state, dead-letter queues, and compression.
- Reference — the connector catalog, CLI commands, and config-file grammar.
- Operations — deploying, observability, performance tuning, and troubleshooting.
- Extending — author and publish your
own
faucet-source-*/faucet-sink-*crate.
Where else to look
- API docs: every crate is on docs.rs, rendered with all features so optional connectors are visible.
- Source & issues: github.com/PawanSikawat/faucet-stream.
- Runnable examples: the
cli/examples/directory ships a config for nearly every connector pair, andexamples/has adocker-composestack so they run locally.
Installation
The faucet CLI
The CLI is the fastest way to start. Install it from crates.io:
cargo install faucet-cli
This gives you a faucet binary with every first-party connector compiled in,
so it can run any of the published example configs out of the box.
Slim builds
Every connector is a Cargo feature, so you can build a smaller binary with only what you need:
cargo install faucet-cli --no-default-features \
--features "source-rest,sink-jsonl,sink-stdout,transforms"
Run faucet list to see which sources and sinks are compiled into your binary.
The library
To embed pipelines in your own Rust program, depend on the umbrella crate and enable the connectors you need:
[dependencies]
# Default features include the REST source only.
faucet-stream = "1.0"
# Or enable specific connectors:
faucet-stream = { version = "1.0", features = ["source-rest", "sink-postgres", "sink-s3"] }
# Or everything:
faucet-stream = { version = "1.0", features = ["full"] }
Feature groups: source (all sources), sink (all sinks), state (all
state-store backends), full (everything), and compression (gzip/zstd on the
file-shaped connectors you’ve enabled).
You can also depend on individual connector crates directly
(faucet-source-rest, faucet-sink-bigquery, …) — each depends only on
faucet-core.
Requirements
- A recent stable Rust toolchain (see the repo’s
rust-toolchain.tomlfor the current MSRV). - Some connectors link native libraries — the Kafka connectors build
librdkafkaand needcmakeand a C toolchain available at compile time.
Next: run your first pipeline.
Your first pipeline
This walkthrough moves a local CSV file to JSON Lines — no external services
required, so it works immediately after cargo install faucet-cli.
1. Create some input
mkdir -p data out
cat > data/input.csv <<'CSV'
id,name,city
1,Ada,London
2,Grace,New York
3,Linus,Helsinki
CSV
2. Write a config
Create pipeline.yaml:
version: 1
name: csv_to_jsonl
pipeline:
source:
type: csv
config:
path: ./data/input.csv
sink:
type: jsonl
config:
path: ./out/records.jsonl
faucet run auto-discovers a faucet.yaml / faucet.yml / faucet.json in the
current directory (and a sibling .env), so you can also name the file
faucet.yaml and just run faucet run.
3. Validate, then run
faucet validate pipeline.yaml
faucet run pipeline.yaml
$ cat out/records.jsonl
{"id":"1","name":"Ada","city":"London"}
{"id":"2","name":"Grace","city":"New York"}
{"id":"3","name":"Linus","city":"Helsinki"}
4. Preview without writing
To see what a source emits without touching a sink, use preview — it runs the
source and prints records to stdout:
faucet preview pipeline.yaml --limit 5
5. Scaffold from a connector’s schema
faucet init generates a commented config skeleton from any connector’s JSON
schema, marking required fields and commenting out optional ones:
faucet init my_pipeline --source rest --sink postgres
Add a transform
Insert a transforms: list between source and sink to reshape records. For
example, normalize keys to snake_case:
pipeline:
source: { type: csv, config: { path: ./data/input.csv } }
transforms:
- type: snake_case
sink: { type: jsonl, config: { path: ./out/records.jsonl } }
Built-in config transforms are flatten, rename_keys, and snake_case.
Next: core concepts.
Core concepts
faucet-stream is built from a handful of small pieces. Understanding them makes both the YAML config and the Rust API obvious.
Source
A source fetches records from an external system (a REST API, a database, a
Kafka topic, an object store, …) and yields them as JSON values. Sources stream
in batches via stream_pages, so memory stays bounded no matter how much data
flows through.
Sink
A sink writes records to an external system. Sinks accept batches and most
expose a batch_size knob that controls the natural unit of work (a multi-row
INSERT, a _bulk body, an insertAll request, and so on).
Transform
An optional transform reshapes each record between source and sink. The
config-exposed transforms are flatten, rename_keys, and snake_case;
additional custom transforms are available from Rust.
Pipeline
The pipeline connects a source to a sink. It drives the source’s
stream_pages, applies transforms, and writes each page to the sink as it
arrives — then flushes and records progress. Memory is bounded at one
batch_size page on both sides regardless of total volume.
let result = Pipeline::new(&source, &sink).run().await?;
State store & bookmarks
For incremental and resumable runs, a state store persists a bookmark after
each page the sink confirms. On the next run the source resumes from that
bookmark. Built-in backends are memory and file (in faucet-core); redis
and postgres backends live in their own crates.
This is what makes change-data-capture safe: the PostgreSQL CDC source only tells Postgres it can recycle write-ahead log up to a bookmark that has actually been persisted.
Dead-letter queue (DLQ)
A pipeline can attach a DLQ sink. When a sink reports per-row failures, the
failing rows are wrapped in a fixed-shape envelope and routed to the DLQ before
the page’s bookmark advances — so a few bad records don’t abort the whole run.
The on_batch_error policy (propagate vs dlq_all) decides what happens when
a sink can’t report per-row results.
Matrix & DAGs
A single config can fan out into many invocations with a matrix: block — either
independent rows or a parent/child DAG where a child runs once per record
produced by its parent. See the matrix DAG tutorial.
Observability
Every source, sink, transform, and state operation is automatically wrapped to
emit tracing spans and metrics counters/histograms — no per-connector code.
See Observability.
REST API → BigQuery (incremental)
This tutorial pulls records from a paginated REST API and streams them into a BigQuery table, then converts it to an incremental pipeline that only fetches new rows on each run.
Full-table version
version: 1
name: rest_to_bigquery
pipeline:
source:
type: rest
config:
base_url: https://api.example.com
path: /v1/events
method: GET
name: events
auth:
type: basic
config:
username: ${env:API_USER}
password: ${env:API_PASS}
records_path: $.events[*]
pagination:
type: PageNumber
param_name: page
start_page: 1
page_size: 500
page_size_param: per_page
max_pages: 200
timeout: 45
max_retries: 5
retry_backoff: 2
tolerated_http_errors: [404]
replication_method:
type: FullTable
primary_keys: [event_id]
schema_sample_size: 100
sink:
type: bigquery
config:
project_id: my-gcp-project
dataset_id: analytics
table_id: events
auth:
type: service_account_key_path
config:
path: service-account.json
batch_size: 1000
Secrets come from the environment via ${env:VAR} — keep credentials out of the
config file. Put them in a sibling .env or export them before running.
export API_USER=… API_PASS=…
faucet run rest_to_bigquery.yaml
The records_path is a JSONPath that selects the array of records inside each
response body; pagination walks pages until an empty page or max_pages. See
the pagination cookbook for the other styles.
Make it incremental
Switch replication_method from FullTable to a key-based incremental method
and attach a state store so progress survives between runs:
pipeline:
source:
type: rest
config:
# … as above …
replication_method:
type: Incremental
cursor_field: updated_at
primary_keys: [event_id]
sink:
# … as above …
state:
type: file
config:
path: ./state
Now each run records the maximum updated_at it saw; the next run resumes from
that bookmark. Swap the file state store for redis or postgres for shared,
durable state across machines — see state.
Tip: run
faucet schema source restandfaucet schema sink bigqueryto see every available config field with its type and default.
PostgreSQL CDC → JSONL
Change data capture (CDC) streams every INSERT/UPDATE/DELETE from a
PostgreSQL table by reading its write-ahead log via logical replication — no
polling, no updated_at column required.
Prepare Postgres
CDC needs logical replication enabled (wal_level = logical) and a publication
for the tables you want to follow:
CREATE TABLE IF NOT EXISTS users (id int4 PRIMARY KEY, name text);
CREATE PUBLICATION faucet_pub FOR TABLE users;
The bundled examples/docker-compose.yml
starts a Postgres already configured for logical replication.
Config
version: 1
pipeline:
source:
type: postgres-cdc
config:
connection_url: postgres://faucet:faucet@localhost:5432/appdb
slot_name: faucet_slot
publication_name: faucet_pub
create_slot_if_missing: true
idle_timeout: 30
sink:
type: jsonl
config:
path: ./out/changes.jsonl
append: true
state:
type: file
config:
path: ./state
faucet run postgres_cdc_to_jsonl.yaml
Open a psql session and INSERT/UPDATE/DELETE some rows — the connector
drains them every fetch cycle until idle_timeout fires.
Why the state store matters here
The CDC source advances Postgres’s confirmed_flush_lsn (the point up to which
Postgres may recycle WAL) only from a durable bookmark — i.e. after the
pipeline has persisted the position. It never confirms WAL for changes that
haven’t been written to the sink. That means a crash mid-run cannot lose data:
on restart the source resumes from the last persisted bookmark. The tradeoff is
that WAL is retained until the next run advances the bookmark, so don’t point a
CDC slot at a table and then never run it.
The state key is postgres-cdc:<slot>. Use a durable backend (redis /
postgres) in production so the bookmark survives the loss of the local disk.
Slot lifecycle
slot_type: temporarydrops the slot when the connection closes — good for experiments.permanent(the default) keeps it, which retains WAL until you drop it.- Free an abandoned slot’s WAL with
PostgresCdcSource::drop_slot()(library) or by dropping the replication slot in Postgres. tls: disable | require | verify_ca | verify_fullconfigures the replication connection (defaultdisable= plaintext; useverify_fullover untrusted networks).
Multi-pipeline DAGs with matrix
A single config can drive many pipeline invocations. The matrix: block lists
rows that are each deep-merged onto the base pipeline:. Rows can be independent
(fan-out) or form a parent/child DAG where a child runs once per record the
parent produced.
Independent fan-out
Each row overrides part of the pipeline and runs independently, bounded by
execution.max_concurrent:
version: 1
name: multi_region
pipeline:
source: { type: rest, config: { base_url: https://api.example.com, method: GET } }
sink: { type: jsonl, config: {} }
execution:
max_concurrent: 4
on_error: continue # or `stop`
matrix:
- id: us
source: { config: { path: /v1/us/events } }
sink: { config: { path: us.jsonl } }
- id: eu
source: { config: { path: /v1/eu/events } }
sink: { config: { path: eu.jsonl } }
Parent/child DAG
A row with parent: runs once per record produced by the parent. Tokens like
${parent_id.dotted.path} are resolved per parent record at runtime:
version: 1
name: dag_users_posts
pipeline:
source: { type: rest, config: { base_url: https://api.example.com, method: GET, records_path: $.data[*] } }
sink: { type: jsonl, config: { append: false } }
matrix:
# Root: fetch the users list once.
- id: users
source: { config: { path: /v1/users, name: users } }
sink: { config: { path: users.jsonl } }
# Child: for each user record, fetch that user's posts.
- id: posts
parent: users
parent_key: id
source: { config: { path: /v1/users/${users.id}/posts, name: posts } }
sink: { config: { path: posts-${users.id}.jsonl } }
The child’s state key is suffixed with the parent record’s key, so each per-user fetch resumes independently.
Merge semantics
A row is deep-merged onto the base pipeline: scalars replace, objects merge
recursively, and arrays replace wholesale. That single rule defines all override
behavior.
Named templates (DRY)
For many heterogeneous rows, define reusable source/sink templates under
pipeline.sources / pipeline.sinks and a top-level vars: block, then select
them per row with ref:. See cli/README.md
for the full grammar.
Error handling
execution.on_error: continue lets sibling subtrees finish when one fails (the
failed subtree is skipped); stop aborts pending and in-flight work on the first
failure. stop cancels in-flight tasks at their next await, which can leave
partial sink state — acceptable for idempotent sinks, something to know for
others.
Embedding faucet as a Rust library
The faucet CLI is a thin wrapper over the same library you can use directly.
Embedding gives you typed configs, compile-time connector selection, and the
ability to build a Source or Sink from your own code.
Add the dependency
[dependencies]
faucet-stream = { version = "1.0", features = ["source-rest", "sink-bigquery"] }
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
Build and run a pipeline
use faucet_stream::source::rest::{RestStream, RestStreamConfig, Auth, PaginationStyle};
use faucet_stream::sink::bigquery::{BigQuerySink, BigQuerySinkConfig};
use faucet_stream::Pipeline;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let source = RestStream::new(RestStreamConfig {
base_url: "https://api.example.com".into(),
path: "/v1/events".into(),
auth: Auth::Bearer { token: std::env::var("API_TOKEN")? },
..Default::default()
})?;
let sink = BigQuerySink::new(/* BigQuerySinkConfig { .. } */).await?;
let result = Pipeline::new(&source, &sink).run().await?;
println!("moved {} records", result.records_written);
Ok(())
}
Exact field names and constructors are documented per crate on docs.rs (rendered with all features, so every connector’s API is visible). Treat the snippet above as the shape, not the literal field list.
Applying transforms
faucet_stream::TransformingSource is the library entry point for attaching
transforms to any source. It wraps a Box<dyn Source> with a flat list of
RecordTransforms applied to every record emitted via fetch_* and
stream_pages.
use faucet_stream::{
KeyCaseMode, Labels, RecordTransform, Source, TransformingSource,
};
let inner: Box<dyn Source> = Box::new(my_source);
let source = TransformingSource::new(
inner,
vec![
RecordTransform::Flatten { separator: "__".into() },
RecordTransform::KeysCase { mode: KeyCaseMode::Snake },
RecordTransform::custom(|mut record| {
if let serde_json::Value::Object(ref mut map) = record {
map.insert("_ingested_at".into(), serde_json::json!("2026-05-28T00:00:00Z"));
}
record
}),
],
Labels::for_named("my-source"),
)?;
// `source` is now a `Source` that streams the inner source's pages with
// transforms applied per page — memory stays bounded by `batch_size` even on
// large result sets.
Transforms compile eagerly inside new() — an invalid regex in RenameKeys
surfaces immediately as FaucetError::Transform, not at first record.
Labels::for_named(name) is the convenient constructor for library callers
(the CLI uses its own Labels carrying the pipeline / row / run-id triple).
The wrapper emits faucet_transform_records_in_total /
faucet_transform_records_out_total (use the out/in ratio for filter drop
rate or explode fan-out), faucet_transform_duration_seconds, and
faucet_transform_errors_total per page through the standard observability
stack.
For configuration-driven users (the faucet binary), transforms are declared
in YAML — see the transforms cookbook for the
three-layer model and per-layer opt-out.
Durable state and streaming
Wire a state store for resumable runs, and use the streaming entry point when you want to control batching explicitly:
use std::sync::Arc;
use faucet_stream::{Pipeline, FileStateStore};
let state = Arc::new(FileStateStore::new("./state")?);
let result = Pipeline::new(&source, &sink)
.with_state_store(state)
.run()
.await?;
The pipeline reads the bookmark before fetching and persists a new one only after the sink confirms each page — so a crash never loses unwritten data.
Why embed instead of shelling out to the CLI?
- Typed configs — config structs implement
serde+JsonSchema, so you get compile-time checking and can generate UIs/forms from the schema. - Custom connectors — implement the
Source/Sinktraits for systems we don’t ship, and run them through the samePipeline. See authoring a connector. - One process — no subprocess, no temp config files; integrate pipelines into an existing service, job runner, or test harness.
Pagination styles (REST source)
The REST source walks multi-page responses automatically. Set pagination.type
to one of the styles below. max_pages is a hard cap across all of them, and
every style has a loop/termination guard so a misbehaving API can’t loop forever.
| Style | Stops when |
|---|---|
None | after the first page |
Cursor | the next-token JSONPath is null/absent (or repeats) |
PageNumber | a page returns zero records (or an identical body repeats) |
Offset | the offset reaches total (via total_path) or a short page arrives |
LinkHeader | there’s no rel="next" in the Link response header |
NextLinkInBody | the next-page URL in the body is absent, null, or empty |
Cursor
pagination:
type: Cursor
next_token_path: $.meta.next_cursor # JSONPath to the next-page token
param_name: starting_after # query param to send it back as
Page number
pagination:
type: PageNumber
param_name: page
start_page: 1
page_size: 500
page_size_param: per_page
Offset / limit
pagination:
type: Offset
limit: 1000
limit_param: limit
offset_param: offset
total_path: $.meta.total # optional; enables an exact stop
Link header
pagination:
type: LinkHeader # follows the RFC 5988 `Link: <…>; rel="next"` header
Next link in body
pagination:
type: NextLinkInBody
next_link_path: $.links.next # JSONPath to the absolute next-page URL
Use
faucet schema source restto see the exact fields and defaults for each style in your installed version.
Authentication
Every connector’s auth: block uses one consistent shape — a type:
discriminator plus a nested config: map:
auth:
type: <method>
config:
<method-specific fields>
Always pull secrets from the environment with ${env:VAR} (or ${file:PATH} /
${secret:VAR}) rather than hard-coding them.
API key / header
auth:
type: api_key
config:
header: Authorization
value: "Bearer ${env:API_TOKEN}"
Bearer token
auth:
type: bearer
config:
token: ${env:API_TOKEN}
Basic auth
auth:
type: basic
config:
username: ${env:API_USER}
password: ${env:API_PASS}
OAuth2 client credentials
The source fetches and refreshes the token automatically (before expiry):
auth:
type: oauth2
config:
token_url: https://auth.example.com/oauth/token
client_id: ${env:CLIENT_ID}
client_secret: ${env:CLIENT_SECRET}
scopes: ["read:events"]
Custom token endpoint
For non-standard token endpoints, token_endpoint lets you describe the request
and point at the access-token and expiry fields in the response. See
faucet schema source rest for the full field list.
Shared auth providers (auth: { ref })
When several connectors authenticate against the same system — e.g. four
matrix rows reading four endpoints of one API, or four Snowflake tables — define
the credential once in the top-level auth: catalog and reference it with
auth: { ref: <name> }. faucet builds a single provider and shares it across
every row, so there is one token fetch and one refresh cycle
(single-flight) instead of each row racing to refresh a single-active / rotating
token:
auth:
api:
type: oauth2_refresh # rotating refresh token captured centrally
config:
token_url: ${env:API_TOKEN_URL}
client_id: ${secret:API_CLIENT_ID}
client_secret: ${secret:API_CLIENT_SECRET}
refresh_token: ${secret:API_REFRESH_TOKEN}
pipeline:
sources:
ep:
type: rest
config:
base_url: ${env:API_BASE_URL}
auth: { ref: api } # every row sharing this template shares ONE token
sink: { type: stdout, config: {} }
matrix:
- { id: customers, source: { ref: ep, config: { path: /customers } } }
- { id: orders, source: { ref: ep, config: { path: /orders } } }
Provider type: values (catalog only): static, oauth2 (client-credentials),
oauth2_refresh (with rotation), token_endpoint. A connector’s auth: is
either an inline definition or a { ref } — never both. See
cli/examples/shared_auth_rest.yaml for a full four-row example.
Shared providers are supported by the bearer/header-based connectors (rest, graphql, xml, grpc, websocket, http sink, elasticsearch, snowflake-OAuth).
Library use: build one faucet_auth provider, wrap it in an Arc, and pass
it to each source/sink with .with_auth_provider(provider.clone()).
Connector-specific inline auth
Each connector also has its own inline auth methods, all under the auth: key
and all in { type, config } form:
- BigQuery —
service_account_key_path,service_account_key(inline JSON), orapplication_default. - Snowflake —
key_pair(JWT) oroauth. - Kafka —
sasl_plain/sasl_scram/ssl/sasl_ssl. - Elasticsearch —
basic,api_key,bearer, ornone. - GCS —
service_account_json_file,service_account_json_inline,application_default, oranonymous.
Inspect any connector’s auth shape with faucet schema source <name> /
faucet schema sink <name>.
Secret interpolation
${env:VAR} and ${file:PATH} are resolved at config-load time, so secrets
never need to appear in the file. A sibling .env is loaded automatically (use
--no-env-file to disable, or --env-file PATH to point elsewhere).
Incremental replication & state
For pipelines that run repeatedly, you usually want to fetch only what’s new. That requires two things: an incremental replication method on the source and a state store to persist the bookmark between runs.
Replication methods
FullTable— fetch everything every run.Incremental— track a high-water mark on acursor_field(e.g.updated_at, an auto-increment id) and only emit records past the last seen value.
source:
type: rest
config:
# …
replication_method:
type: Incremental
cursor_field: updated_at
primary_keys: [id]
State stores
Attach a state: block so the bookmark survives between runs:
state:
type: file # built into faucet-core
config:
path: ./state
Available backends:
| Backend | Crate | Use when |
|---|---|---|
memory | faucet-core | tests, one-shot runs (not persistent) |
file | faucet-core | single host; one JSON file per key, atomic writes |
redis | faucet-state-redis | shared/ephemeral state across hosts |
postgres | faucet-state-postgres | shared, durable, transactional state |
# Redis
state:
type: redis
config:
connection_url: redis://localhost:6379
namespace: faucet
# Postgres
state:
type: postgres
config:
connection_url: postgres://user:pass@localhost/faucet
How bookmarks advance
The pipeline reads the bookmark before fetching, and persists a new one only after the sink confirms the page. Most sources emit a bookmark on the final page; CDC-style sources emit one per committed transaction and get per-transaction durability automatically. Either way, a crash can never advance the bookmark past data that wasn’t written — the next run re-fetches from the last confirmed point.
State keys
Each invocation has a state key so concurrent matrix rows don’t collide:
{name}::{row_id} for roots and {name}::{row_id}::{parent_record_key} for DAG
children. The CDC source uses postgres-cdc:<slot>.
Dead-letter queues
A dead-letter queue (DLQ) keeps a pipeline running when a handful of records fail to write, instead of aborting the whole run. Failing rows are wrapped in a fixed-shape envelope and routed to a separate DLQ sink before the page’s bookmark advances.
When it helps
Sinks whose underlying API reports per-row results — BigQuery insertAll,
Elasticsearch _bulk — can tell exactly which records failed. The DLQ captures
just those, while the good rows commit normally.
Configure a DLQ
Add a dlq: block naming a sink to receive the bad rows and the policy for
sinks that can’t report per-row outcomes:
pipeline:
source: { type: rest, config: { /* … */ } }
sink: { type: bigquery, config: { /* … */ } }
dlq:
on_batch_error: dlq_all # or `propagate`
sink:
type: jsonl
config:
path: ./dead-letters.jsonl
The envelope
Each dead-lettered record is wrapped with metadata — the original record, the reason it failed, and context — so you can inspect, fix, and replay it later.
on_batch_error policy
For a sink that can only succeed or fail a whole batch (no per-row detail):
propagate— a batch failure aborts the run (the default, fail-fast behavior).dlq_all— route every row in the failed batch to the DLQ and keep going.
Sinks that do report per-row results (BigQuery, Elasticsearch, and the HTTP
sink in Individual mode) override the partial-write path so only the genuinely
failed rows are dead-lettered — the already-delivered rows are not duplicated
into the DLQ.
Failure budgets
A DLQ keeps a run going through occasional bad rows, but a flood of failures usually means something is broken upstream. Two optional budgets turn the DLQ into a circuit breaker:
dlq:
sink: { type: jsonl, config: { path: ./dead-letters.jsonl } }
max_failures_per_page: 50 # abort if a single page dead-letters > 50 rows
max_failures_total: 500 # abort once the run has dead-lettered > 500 rows
When a budget trips, the run aborts — but only after the page that crossed the threshold is fully committed: its surviving rows are written to the main sink, its failed rows are routed to the DLQ, and (if the page carried one) the bookmark advances. So the committed survivors are not re-delivered when you fix the upstream problem and re-run, and the failed rows are preserved in the DLQ for replay rather than dropped. The run still stops, so you get alerted.
The full design is in
docs/superpowers/specs/2026-05-24-dlq-design.mdand thefaucet_core::dlqmodule on docs.rs.
Data-quality checks
Add a quality: block under pipeline: to assert invariants on every page of
records as they flow through the pipeline. The quality pass runs after
transforms and before the sink write:
- Per-record checks partition the page into survivors and quarantined rows (first-failure-wins per record).
- Per-batch checks run over the survivors.
- Quarantined rows are routed to the DLQ sink; survivors flow to the main sink.
- The page bookmark advances only after the sink confirms — an
abortnever commits partial progress.
Quality checks require the quality Cargo feature (included in full and in
faucet-cli’s default build). The json_schema check additionally requires
quality-jsonschema.
Full example
The following config fetches users from a REST API, normalises keys to snake_case, and enforces several quality invariants before writing survivors to PostgreSQL. Quarantined rows land in a local JSONL file.
# rest_to_postgres_with_quality.yaml
version: 1
name: users_api_to_postgres_with_quality
pipeline:
source:
type: rest
config:
base_url: https://api.example.com/v1
path: /users
method: GET
auth:
type: bearer
config:
token: ${env:API_TOKEN}
query_params:
per_page: "100"
pagination:
type: Cursor
next_token_path: $.meta.next_cursor
param_name: cursor
max_retries: 3
retry_backoff: 2
tolerated_http_errors: []
replication_method:
type: Incremental
replication_key: updated_at
primary_keys: ["id"]
partitions: []
schema_sample_size: 100
state_key: users_api:users
transforms:
- type: keys_case
config: { mode: snake }
quality:
record:
- type: not_null
field: id
on_failure: abort # abort: a null id is a catastrophic upstream bug
- type: not_null
field: email
on_failure: quarantine # quarantine: route bad rows to the DLQ
- type: regex_match
field: email
pattern: '^[^@\s]+@[^@\s]+\.[^@\s]+$'
on_failure: quarantine
- type: value_in_set
field: status
values: ["active", "inactive", "pending", "suspended"]
on_failure: quarantine
- type: compare
field: age
op: gte
value: 0
on_failure: quarantine
batch:
- type: row_count
min: 1
on_failure: abort # empty pages indicate a misconfigured source
- type: unique
fields: [id]
on_failure: quarantine # route duplicate ids to the DLQ
dlq:
sink:
type: jsonl
config:
path: ./dlq/users_quality_failures.jsonl
on_batch_error: propagate
max_failures_per_page: 50
max_failures_total: 500
sink:
type: postgres
config:
connection_url: ${env:PG_URL}
table_name: users
column_mapping:
type: jsonb
column: data
batch_size: 500
max_connections: 5
state:
type: file
config:
path: ./.faucet-state
Check catalog
Per-record checks
Evaluated in declared order; first failure wins for a given record.
on_failure may be quarantine (route the row to the DLQ) or abort (raise
FaucetError::QualityFailure and stop the run immediately).
| Check | Key fields | Passes when | Missing field |
|---|---|---|---|
not_null | field, treat_missing_as_null (default true) | value present and non-null | fail (pass iff treat_missing_as_null: false) |
not_empty | field | value is a non-empty string after trimming whitespace | fail |
regex_match | field, pattern | value is a string matching pattern | fail |
value_in_set | field, values: [...] | value is in the allowed set (exact JSON equality) | fail |
not_in_set | field, values: [...] | value is NOT in the forbidden set | pass (trivially not in set) |
compare | field, op, value | ordering or equality holds (see below) | fail |
type_is | field, expected | JSON type of the value matches expected | fail |
string_length | field, min?, max? | char count in [min, max] (at least one bound required) | fail |
json_schema | schema | whole record validates against a JSON Schema document | (whole-record check) |
compare operators: gt, gte, lt, lte require both the field value and
the configured value to be JSON numbers (compared as f64); eq and ne do
exact JSON equality with no type coercion.
json_schema requires the quality-jsonschema Cargo feature. It is the most
expressive check; its cost scales with schema complexity — for very large or deeply
nested schemas on hot paths, prefer the granular checks above and benchmark your
case.
Per-batch checks
Evaluated per page over the survivors (records that passed all per-record
checks). Aggregate checks (row_count, null_rate, distinct_count) are not
row-attributable, so they offer quarantine_batch (route all survivors to the DLQ,
write nothing this page) or abort. unique is row-attributable and accepts
quarantine (route the duplicate rows) or abort.
| Check | Key fields | Passes when |
|---|---|---|
row_count | min?, max? (at least one required) | survivor count in [min, max] |
null_rate | field, max (0.0–1.0) | null-or-missing rate ≤ max; zero survivors → 0.0 → pass |
unique | fields: [...] (composite key) | every survivor’s composite key is unique within the page |
distinct_count | field, min?, max? | distinct values of field in [min, max] |
Failure policies
| Policy | Meaning | Allowed on |
|---|---|---|
quarantine | Route the specific offending row(s) to the DLQ; keep the rest as survivors | per-record checks; unique |
quarantine_batch | Route all current survivors of the page to the DLQ; nothing written this page | aggregate batch checks (row_count, null_rate, distinct_count) |
abort | Raise FaucetError::QualityFailure and stop the run | every check |
DLQ requirement
Any check that uses quarantine or quarantine_batch requires a dlq: block.
Omitting it fails validation with an error explaining that a dlq: block is
required (faucet validate catches this before the run starts; the core
guards it again at run start).
See the Dead-letter queues cookbook page for dlq: options.
Observability
The quality pass emits faucet_quality_* metrics automatically:
faucet_quality_checks_total{pipeline,row,check,outcome=pass|fail}faucet_quality_records_quarantined_total{pipeline,row,check,field}faucet_quality_aborts_total{pipeline,row,check}faucet_quality_check_duration_seconds{check}
These are available alongside the standard faucet_source_*, faucet_sink_*, and
faucet_transform_* metrics. See Observability
for the full metrics reference.
Validate the config
faucet validate rest_to_postgres_with_quality.yaml
# ok: 'users_api_to_postgres_with_quality' rows=1 (roots=1, children=0)
faucet schema quality prints the full JSON Schema for the quality: block, and
faucet list shows all available checks with descriptions.
Compression
The file-shaped connectors can read and write gzip / zstd transparently. Enable
the compression feature, then set a compression: field on the connector.
Enable the feature
# CLI
cargo install faucet-cli --features compression
# Library (umbrella) — activates compression on whichever file connectors you've enabled
faucet-stream = { version = "1.0", features = ["sink-jsonl", "source-csv", "compression"] }
The compression aggregate feature forwards to whichever of the supported
connectors you’ve already opted into; it doesn’t pull in connectors by itself.
full includes compression.
Connectors that support it
source-csv, source-s3, source-gcs, sink-jsonl, sink-csv, sink-s3,
sink-gcs.
Config
sink:
type: jsonl
config:
path: ./out/records.jsonl.gz
compression: auto # none | gzip | zstd | auto (default)
autochooses from the filename suffix:.gz→ gzip,.zst→ zstd, anything else → none.- Explicit
gzip/zstd/noneoverride the suffix.
Auto-detection runs per file at I/O time, so one matrix run can read a mix of
.jsonl, .jsonl.gz, and .jsonl.zst objects.
Notes
- File sinks finalize the encoder on
flush(); later writes reopen in append mode, producing a multi-member compressed file that gzip/zstd decoders read back transparently. - S3 and GCS sinks do not set a
Content-Encodingheader — consumers must decompress explicitly. - Parquet, Kafka, HTTP, stdout, and the database sinks are intentionally out of scope: Parquet has internal columnar compression and the others have native protocol-level options.
Record transforms
A pipeline’s transforms: list is a sequence of pure Fn(Value) -> Value
steps run on every record between source and sink. Each transform is a
small, declarative reshape — pick the ones you need, list them in the
order you want them to run, and the CLI wires them up for you.
This page is a tour of the standard transforms exposed in YAML. All of
them are listed in faucet list and dispatchable as type: values.
At a glance
| Kind | Purpose | Shape |
|---|---|---|
flatten | Collapse nested objects to a flat record | separator |
rename_keys | Regex rename of every key, recursively | pattern, replacement |
keys_case | Re-case every key (snake / camel / pascal / kebab / screaming_snake) | mode |
spell_symbols | Spell out symbols in keys (% → percent, # → number, …) | extra, separator |
select | Keep only listed top-level fields | fields: [..] |
drop | Remove listed top-level fields | fields: [..] |
set | Add or overwrite top-level fields with constants | values: {k: v, ..} |
rename_field | Exact-name rename (vs. regex) | fields: {from: to, ..} |
cast | Coerce per-field types | fields: {name: type}, on_error |
redact | Replace listed field values with a mask | fields: [..], mask |
value_case | Lowercase / uppercase / trim string values | fields: [..], mode |
The field-targeting transforms (select, drop, set, rename_field,
cast, redact, value_case) act on top-level fields only —
dotted paths into nested objects are intentionally out of scope. If you
need to reach a nested field, run flatten first, then operate on the
flattened key.
Missing fields are silently skipped. None of the field-selection
transforms introduce a null for a name that wasn’t already on the
record.
A full example
The runnable file is at cli/examples/rest_to_stdout_transforms.yaml:
pipeline:
source:
type: rest
config: { ... }
transforms:
- type: flatten
config: { separator: "__" }
- type: select
config:
fields: [id, name, email, address__city, company__name]
- type: rename_field
config:
fields:
address__city: city
company__name: company
- type: value_case
config:
fields: [email]
mode: lower
- type: cast
config:
fields: { id: string }
on_error: error
- type: redact
config:
fields: [phone]
mask: "[redacted]"
- type: set
config:
values:
_source: jsonplaceholder
_ingested_at: "2026-01-01T00:00:00Z"
sink:
type: stdout
config: { format: json_lines }
Run it:
faucet run cli/examples/rest_to_stdout_transforms.yaml | jq .
The order matters: flatten runs first so that select can reference
address__city; rename_field runs after select so it only has to
rename keys that survived; cast runs before set so the stamped
_source field is left untouched.
Declaration layers
Transforms can be declared at three layers in a config. The executor resolves them per matrix row by concatenating contributions in lifecycle order — pipeline first, then source template, then row:
final = T_pipeline ++ T_source ++ T_row
| Layer | Lives at | Intent |
|---|---|---|
| Pipeline | pipeline.transforms | cross-cutting policy (PII redaction, provenance stamp) |
| Source template | pipeline.sources.<name>.transforms | cleanup tied to the source’s natural emission shape |
| Matrix row | matrix[i].transforms | row-specific extras or one-off shaping |
Each layer is optional. Empty layers contribute nothing.
pipeline:
transforms: # T_pipeline (runs first)
- { type: set, config: { values: { _ingested_at: "${env:NOW}" } } }
sources:
users_api:
type: rest
transforms: # T_source
- { type: flatten, config: { separator: "__" } }
- { type: keys_case, config: { mode: snake } }
matrix:
- id: users_pii
source: { ref: users_api }
transforms: # T_row (runs last)
- { type: redact, config: { fields: [email], mask: "[pii]" } }
# final = [set, flatten, keys_case, redact]
Opting out: inherit_transforms: false
Each layer that introduces transforms (source template, matrix row) carries
a sibling boolean field inherit_transforms, default true. Set to false,
it drops every layer declared above it.
source.inherit_transforms | row.inherit_transforms | Final list |
|---|---|---|
true (default) | true (default) | T_pipeline ++ T_source ++ T_row |
false | true | T_source ++ T_row |
true | false | T_row |
false | false | T_row |
Use this for debug rows that need raw records, or for a source whose natural shape is already canonical and shouldn’t be touched by global policy:
matrix:
- id: forensic_row
source: { ref: users_api }
inherit_transforms: false # ← drops T_pipeline AND T_source
transforms:
- { type: select, config: { fields: [id, raw_payload] } }
# final = [select]
Sinks reject both transforms: and inherit_transforms:. Destination shaping
belongs at the pipeline or row layer.
Reusing transform lists across sources
Use YAML anchors:
pipeline:
sources:
users_api:
type: rest
transforms: &user_cleanup
- { type: flatten, config: { separator: "__" } }
- { type: keys_case, config: { mode: snake } }
archived_users_api:
type: rest
transforms: *user_cleanup
No grammar extension needed — the YAML parser expands anchors before the
config reaches faucet.
keys_case — pick the output convention
- type: keys_case
config:
mode: snake # | camel | pascal | kebab | screaming_snake
The tokeniser splits each key on whitespace, _, -, dropped
punctuation, and lower→upper transitions (so "firstName" and
"first_name" and "first-name" all tokenise the same), then re-joins
in the requested style:
| Input | snake | camel | pascal | kebab | screaming_snake |
|---|---|---|---|---|---|
"First Name" | first_name | firstName | FirstName | first-name | FIRST_NAME |
"last-name" | last_name | lastName | LastName | last-name | LAST_NAME |
"camelCase" | camel_case | camelCase | CamelCase | camel-case | CAMEL_CASE |
"ID" | id | id | Id | id | ID |
Two distinct keys that re-case to the same name error rather than
silently overwriting (same collision rule as flatten and
spell_symbols). An all-symbol key ("!@#") tokenises to nothing and
is kept as-is to avoid producing a blank key.
Multi-char uppercase runs are left as one token: "XMLParser" →
["XMLParser"] → xmlparser (snake). If you need them split, normalise
with rename_keys first.
spell_symbols — symbols → words in keys
- type: spell_symbols
config:
extra:
"©": copyright
"<=": lte
separator: " " # default
The default map covers the common ASCII symbols:
| % → percent | # → number | $ → dollar | & → and | @ → at |
| + → plus | * → star | = → equals | < → lt | > → gt |
| / → slash | \ → backslash | | → pipe | ^ → caret | ~ → tilde |
User entries in extra are merged on top of the defaults (an override
with the same key wins). Replacements are sorted longest-first, so
"<=" beats "<" when both are present.
Each replacement is surrounded by separator (default " ") so a
chained keys_case cleanly picks up the word boundary:
transforms:
- type: spell_symbols
- type: keys_case
config: { mode: snake }
turns "% sold" → " percent sold" → "percent_sold".
select vs. drop
- type: select
config:
fields: [id, email]
Listed fields are kept; everything else is dropped.
- type: drop
config:
fields: [password, ssn]
Listed fields are removed; everything else is kept. Use select when
the schema is fixed and you want to defend against the source adding
new fields you don’t want; use drop for targeted PII / secret
removal.
set — constant stamps
- type: set
config:
values:
_source: my-api
_ingested_at: "2026-05-28T00:00:00Z"
version: 2
tags: [pii-free]
Any JSON value is accepted (string, number, bool, null, array, object).
Existing fields with the same name are overwritten — set is the
intentional “I want this value” transform.
rename_field vs. rename_keys
Both transforms rename keys, but they’re aimed at different jobs:
rename_keys | rename_field |
|---|---|
| Single regex substitution applied to every key, recursively (including keys inside nested objects and arrays). | Exact-name match on top-level keys only. |
Best for systematic patterns: ^_sdc_ → "", ([a-z])([A-Z]) → $1_$2. | Best for a handful of explicit renames: address__city → city. |
rename_field errors if a target name already exists on the record
(same collision rule as flatten and keys_case) — to avoid silently
overwriting a real value.
cast — type coercion
- type: cast
config:
fields:
age: int
price: float
active: bool
id: string
created_at: timestamp
on_error: error
Target types: int (i64), float (f64), bool, string, timestamp
(RFC 3339). bool from a string accepts true|false|1|0|yes|no
case-insensitively. timestamp parses RFC 3339 / ISO 8601 and
normalises the output (so +00:00 becomes Z). Casting a float to
int only succeeds for a whole number within i64 range — a fractional
value (e.g. 3.9) or one beyond ±9.2e18 is treated as uncastable (governed
by on_error) rather than being silently truncated or saturated.
Failure behaviour is controlled by on_error:
on_error | What happens on an uncastable value |
|---|---|
error (default) | The transform errors with FaucetError::Transform. The pipeline either aborts or routes the record to the DLQ, depending on your DLQ config. |
null | The value is replaced with null. Use when the schema must hold and a downstream nullable column is acceptable. |
skip | The value is left as-is (original type). Use when downstream code already handles mixed types. |
Missing fields are always a no-op — cast will never insert a null for
a field that wasn’t already on the record.
Casting epoch seconds / millis to a timestamp is out of scope for the initial release; file a follow-up issue if you need it.
redact
- type: redact
config:
fields: [password, ssn, credit_card]
mask: "***"
mask is any JSON value (default "***" if omitted). Missing fields
are skipped — redact will not add "***" to a record that didn’t
have the field.
value_case
- type: value_case
config:
fields: [email, username]
mode: lower # | upper | trim
Only string field values are touched; non-string values (numbers, bools, nulls, nested objects) pass through unchanged.
Ordering rules of thumb
Transforms run in the order you list them, so think about dependencies:
flatten,spell_symbols, andkeys_casechange key names — list field-targeting transforms (select,drop,cast,redact,value_case,rename_field) after them, referencing the post-rename keys.castruns before downstream consumers see the record, so put it after any rename steps but beforesetif you wantset’s stamped values left untouched.setoverwrites by name — put it last when you want it to win.
The “clean keys for a downstream warehouse” pipeline is canonical:
transforms:
- type: spell_symbols # %sold → percent sold
- type: keys_case
config: { mode: snake } # percent sold → percent_sold
- type: rename_field
config:
fields: { legacy_id: id }
Out of scope
- Dotted-path field selection on the field-list transforms (
select,drop,cast,redact,value_case,rename_field) — they still operate on bare top-level keys. Runflattenfirst if you need nested access.filterandexplodeare the exceptions and support the JSONPath subset documented in their sections. - A general expression / scripting transform (jq, CEL, …) — separate, larger discussion.
Filter and explode
Filter — keep records matching a predicate
transforms:
- { type: filter, config: { path: deleted, op: ne, value: true } }
Operators: eq, ne, exists, in, not_in.
path:— JSONPath subset: bare key (status), dot path ($.user.status), or bracketed string key ($['order-id']). Bare keys are auto-prefixed with$.. Keys that literally contain.require the$-rooted bracket form ("$['foo.bar']").value:— required foreq/ne/in/not_in. Forin/not_in, must be an array. Forbidden forexists.- Type semantics: strict JSON equality.
"5" eq 5is false. Chaincastupstream to coerce. neandnot_inkeep records with a missing path (the predicate is satisfied by absence). All other operators drop missing-path records.
Explode — expand an array into one record per element
transforms:
- { type: explode, config: { path: items, prefix: item } }
path:— same JSONPath subset as filter.prefix:— prepended to each element field when the element is an object. Defaults to the last segment ofpath(sopath: items⇒prefix: items). Empty string opts out of prefixing (pure LATERAL FLATTEN).separator:— between prefix and element field key. Default"_".on_missing:— what to do when the path doesn’t yield a non-empty array.passthrough(default — record flows through unchanged),drop(SQLUNNESTsemantics), orerror.
Merge rule (object elements): the array node at path is removed from its parent container and each element field is added as a sibling, prefixed.
| Input | Stage | Output |
|---|---|---|
{id: 1, items: [{sku: A, qty: 2}]} | explode { path: items } | {id: 1, items_sku: A, items_qty: 2} |
{id: 1, items: [{sku: A}, {sku: B}]} | explode { path: items, prefix: item } | {id: 1, item_sku: A}, {id: 1, item_sku: B} |
{id: 1, items: [{sku: A}], prefix: ""} | explode { path: items, prefix: "" } | {id: 1, sku: A} |
{id: 1, tags: ["rust", "etl"]} | explode { path: tags } | {id: 1, tags: rust}, {id: 1, tags: etl} |
{id: 1, user: {name: A, items: [{x: 1}]}} | explode { path: $.user.items } | {id: 1, user: {name: A, items_x: 1}} |
Collisions (a prefixed element key would overwrite a sibling) fail loudly with FaucetError::Transform("explode produced duplicate key 'X'") — mirroring flatten / keys_case.
Ordering: explode early, filter late (usually)
The recommended order is explode → transform → filter: each child of the explode gets transforms applied uniformly, and the final filter acts on cleaned shape. Two legitimate deviations:
- filter before explode: drop soft-deleted parents before exploding, saving the work of expanding children of dead rows.
- filter both sides: drop dead parents, explode, then drop archived children.
transforms:
- { type: filter, config: { path: deleted, op: ne, value: true } }
- { type: explode, config: { path: items, prefix: item } }
- { type: filter, config: { path: item_status, op: in, value: [active, pending] } }
- { type: keys_case, config: { mode: snake } }
Secrets-manager interpolation
faucet can pull secret values directly from HashiCorp Vault, AWS Secrets Manager,
GCP Secret Manager, and Azure Key Vault — using ${scheme:reference} directives
right inside your config file. Resolution happens at config-load time: values are
fetched concurrently, de-duplicated, substituted into the config tree, and
never written to disk or logs.
These directives join the existing load-time set: ${env:VAR}, ${file:PATH},
and ${secret:VAR} (alias for ${env:}).
Build features
None of the four backends are compiled in by default. Opt in per backend or take all four with the aggregate feature:
# All four backends
cargo install faucet-cli --features secrets
# Individual backends
cargo install faucet-cli --features secrets-vault
cargo install faucet-cli --features secrets-aws-sm
cargo install faucet-cli --features secrets-gcp-sm
cargo install faucet-cli --features secrets-azure-kv
Using faucet-cli from source or as a library dependency:
cargo build -p faucet-cli --features secrets
The full aggregate feature includes all four backends.
HashiCorp Vault (KV v2)
Directive: ${vault:<path>[#field]}
Auth: set VAULT_ADDR and VAULT_TOKEN in the environment. VAULT_NAMESPACE
is optional (for HCP Vault or enterprise namespaces).
The #field selector parses the secret body as a JSON object and extracts one
key. Omit it to receive the entire secret body as a string.
# Requires: VAULT_ADDR + VAULT_TOKEN, and a KV v2 secret at
# secret/data/faucet/api with a `token` field.
# Build with: --features secrets-vault
version: 1
name: rest-to-jsonl-with-vault
pipeline:
source:
type: rest
config:
base_url: https://api.example.com
path: /v1/items
auth:
type: bearer
config:
token: "${vault:secret/data/faucet/api#token}"
sink:
type: jsonl
config:
path: ./out/items.jsonl
AWS Secrets Manager
Directive: ${aws-sm:<name-or-ARN>[#field]}
Auth: the standard aws-config default credential chain — environment
variables (AWS_ACCESS_KEY_ID / AWS_SECRET_ACCESS_KEY / AWS_SESSION_TOKEN),
~/.aws/credentials profile, EC2/ECS instance credentials, web identity token,
or IAM role attached to the compute environment. No manual config needed beyond
what the AWS SDK picks up automatically.
The #field selector works the same as for Vault: it parses the secret as JSON
and extracts one key.
# Build with: --features secrets-aws-sm
version: 1
name: postgres-to-bigquery-secure
pipeline:
source:
type: postgres
config:
connection_url: "${aws-sm:prod/postgres#connection_url}"
query: "SELECT * FROM events WHERE created_at > now() - interval '1 day'"
sink:
type: bigquery
config:
project_id: my-gcp-project
dataset_id: analytics
table_id: events
credentials:
type: application_default
GCP Secret Manager
Directive: ${gcp-sm:projects/<project>/secrets/<secret>/versions/<version>}
Use versions/latest to always fetch the current active version.
Auth: Application Default Credentials — run gcloud auth application-default login
for local development, or rely on the service account attached to GCE/Cloud Run
in production. No extra environment variables needed.
# Build with: --features secrets-gcp-sm
version: 1
name: rest-to-gcs-secure
pipeline:
source:
type: rest
config:
base_url: https://api.partner.com
path: /v2/export
auth:
type: bearer
config:
token: "${gcp-sm:projects/my-project/secrets/partner-api-token/versions/latest}"
sink:
type: gcs
config:
bucket: my-export-bucket
prefix: exports/
credentials:
type: application_default
Azure Key Vault
Directive: ${azure-kv:<vault>/<secret>[/<version>]}
Omit the version segment to fetch the current (enabled) version.
Auth: the azure_identity default chain — AZURE_TENANT_ID /
AZURE_CLIENT_ID / AZURE_CLIENT_SECRET environment variables (service
principal), managed identity (when running in Azure), or az login (developer
tools). These are tried in that order; the first that succeeds is used.
# Build with: --features secrets-azure-kv
version: 1
name: rest-to-snowflake-secure
pipeline:
source:
type: rest
config:
base_url: https://api.example.com
path: /v1/records
auth:
type: bearer
config:
token: "${azure-kv:my-vault/api-token}"
sink:
type: snowflake
config:
account: myaccount.us-east-1
warehouse: LOAD_WH
database: RAW
schema: PUBLIC
table: records
auth:
type: oauth
config:
access_token: "${azure-kv:my-vault/snowflake-token}"
The #field JSON extractor
Both Vault and AWS Secrets Manager support storing multiple values as a JSON
object inside one secret. The #field selector lets you extract a single key:
# Secret at prod/db contains: {"host": "db.example.com", "password": "s3cr3t"}
connection_url: "postgresql://app:${aws-sm:prod/db#password}@${aws-sm:prod/db#host}/mydb"
Each reference is fetched and de-duplicated — the same (scheme, path) pair is
fetched exactly once even if it appears in multiple config fields.
If the field is absent, faucet surfaces a clear error listing the available keys.
If the secret body isn’t valid JSON when #field is used, faucet errors rather
than returning raw bytes.
Validating configs with secrets
With resolution (real preflight): faucet validate resolves all secrets
as part of config validation and prints one line per reference to confirm
which secrets were reached (never the values):
secret: vault:secret/data/faucet/api#token → resolved
ok: 'rest-to-jsonl-with-vault' rows=1 (roots=1, children=0) execution=(defaults)
- default [root] source=rest sink=jsonl
Offline (no network / credentials): faucet validate --no-secrets validates
grammar and structure only, skipping all secret fetches. Use this in CI steps
that don’t have credentials, or in local development before you have vault access:
faucet validate --no-secrets pipeline.yaml
Grammar reference: faucet schema secrets prints the full directive syntax
and auth requirements for all four schemes in machine-readable JSON:
faucet schema secrets
Resolution order
Secret directives resolve as the final load-time stage, after ${env:} /
${file:} / ${vars.X} / ${sources.X} are all settled. This means you can
use env vars to compose a secret path:
pipeline:
source:
type: rest
config:
auth:
type: bearer
config:
token: "${vault:secret/data/${env:APP_ENV}/api#token}"
Substitution order: ${env:APP_ENV} resolves first (during the raw text pass);
the resulting path secret/data/prod/api#token is then fetched from Vault.
Redaction guarantee and its boundary
faucet scrubs every resolved secret value from its own tracing / log / error
output. Every byte written through the CLI’s tracing subscriber passes through a
RedactingWriter that replaces any registered secret value with ***. Errors
that contain deserialized config fields go through the same scrubber before they
reach stderr.
Every resolution path registers its result for redaction — the secrets-manager
directives (${vault:…}, ${aws-sm:…}, …) and the load-time
${env:…} / ${secret:…} / ${file:…} forms. A credential supplied via the
common ${env:TOKEN} form is therefore scrubbed exactly like a ${vault:…} one
(values shorter than 4 characters are not registered). The faucet serve bearer
auth token (--auth-token / FAUCET_SERVE_AUTH_TOKEN) is registered the same way.
The scrubber withholds a short trailing window between writes, so a secret split
across two separate log writes is still masked. Independently, faucet_core::Credential
and the built-in auth providers hand-write their Debug to print secrets as ***,
so a {:?} of a credential or shared provider never reveals the token.
This boundary covers faucet’s own output only. A third-party connector that
debug-logs its own deserialized config fields — or any library that logs a
reqwest::Request, a database row, or a JSON object — operates outside this
boundary. In particular:
- Do not enable
RUST_LOG=debugorFAUCET_LOG=debugwhen running a pipeline whose connector configs hold resolved secrets. The connector libraries may log intermediate objects that contain the resolved value before faucet’s scrubber can see it. - Prometheus metric labels and span attributes set by connectors are also outside this boundary.
- The scrubber does not redact values shorter than 4 characters.
Secrets in the auth: catalog and vars: block
Secret directives are resolved everywhere config interpolation runs:
connector configs, transforms, state, dlq, matrix rows, the top-level
auth: shared-provider catalog, and the top-level vars: block.
Putting a secret in the shared auth: catalog is often the cleanest option — a
single bearer token resolved once and shared across every matrix row that
references it via auth: { ref } (one token cache, single-flight refresh):
# A secret in the shared catalog, resolved once and shared by reference.
auth:
api:
type: static
config:
token: "${vault:secret/data/app#token}"
pipeline:
sources:
orders: { type: rest, config: { base_url: https://api.example.com/orders, auth: { ref: api } } }
refunds: { type: rest, config: { base_url: https://api.example.com/refunds, auth: { ref: api } } }
sink: { type: jsonl, config: { path: ./out.jsonl } }
A secret in the vars: block works the same way and can be reused through
${vars.X}:
vars:
db_password: "${aws-sm:prod/db#password}"
pipeline:
source:
type: postgres
config:
connection_url: "postgres://app:${vars.db_password}@db.internal:5432/app"
sink: { type: jsonl, config: { path: ./rows.jsonl } }
The shared auth: catalog is a first-class config location in every respect:
its provider specs can also reference ${vars.X} and ${sources.X.PATH}, not
just secret directives.
Inline
auth:blocks on individual connectors resolve secrets too — use the shared catalog when several connectors share one credential, and inline auth when a credential belongs to a single connector.
Adaptive batch sizing
Adaptive batch sizing lets faucet automatically tune how many records it sends to
the sink in each write, instead of using a fixed batch_size. The built-in
AIMD controller (Additive Increase / Multiplicative Decrease) starts at the
source page size, grows the batch additively when writes are clean and fast, and
shrinks it multiplicatively when errors appear or write latency rises above a
target.
When to use it
Useful when the optimal write batch size changes over time or varies by data shape:
- Spiky data volumes — smaller batches during large-row bursts; bigger ones for narrow rows.
- Sink rate limits / quotas — back off automatically when the API starts returning errors or timing out.
- Latency-sensitive pipelines — keep each write inside a target window
(e.g.
target_latency_ms: 1000) rather than guessing a fixed size.
Adaptive batch sizing is pure write-side tuning: the source page size is unchanged, and the controller simply reslices each page into sub-batches of the current effective size.
Configuration
Add an execution.adaptive_batch_size: block to your config file:
execution:
adaptive_batch_size:
enabled: true
min: 500
max: 10000
increase_step: 500
decrease_factor: 0.5
cooldown_batches: 5
target_latency_ms: 1000
latency_window: 10
error_threshold: 0.01
respect_source_max: true
log_every: 50
Full example
The postgres_to_bigquery_adaptive.yaml
example pairs a PostgreSQL source with a BigQuery sink and a JSONL DLQ:
# postgres_to_bigquery_adaptive.yaml (abbreviated)
version: 1
name: postgres_to_bigquery_adaptive
pipeline:
source:
type: postgres
config:
connection_url: ${env:PG_URL}
query: SELECT id, created_at, payload FROM orders WHERE created_at > $1
params: ["2026-01-01T00:00:00Z"]
batch_size: 5000 # source page size — also the effective upper ceiling
max_connections: 8
sink:
type: bigquery
config:
project_id: my-gcp-project
dataset_id: warehouse
table_id: orders
auth:
type: service_account_key
config:
json: ${env:GCP_KEY_JSON}
batch_size: 1000 # starting write size; the controller tunes this
dlq:
sink:
type: jsonl
config:
path: ./dlq/orders_failed.jsonl
on_batch_error: dlq_all
execution:
adaptive_batch_size:
enabled: true
min: 500
max: 10000
increase_step: 500
decrease_factor: 0.5
cooldown_batches: 5
target_latency_ms: 1000
error_threshold: 0.01
Config field reference
All fields are optional except enabled. Unset fields take the defaults shown
below.
| Field | Type | Default | Description |
|---|---|---|---|
enabled | bool | false | Master switch. Set to true to activate the controller. |
controller | string | "aimd" | Algorithm. Only "aimd" is supported in v1. |
min | integer | 100 | Lower bound on effective batch size. Must be ≥ 1. |
max | integer | 50000 | Upper bound. Must be ≤ 1,000,000. Values above the source page size are inert (see Caveats). |
increase_step | integer | 250 | Rows added per clean, fast batch (additive growth). Must be ≥ 1 and ≤ 1,000,000. |
decrease_factor | float | 0.5 | Multiplicative shrink factor on error or high latency. Must be in (0, 1). |
cooldown_batches | integer | 5 | Batches to skip after a shrink before allowing growth again. |
target_latency_ms | integer | null | null | Optional target write latency (ms). null means react to errors only. |
latency_window | integer | 10 | Rolling window size (batches) for the p50 latency estimate. Must be ≥ 1. |
error_threshold | float | 0.01 | Per-batch error rate (0.0–1.0) above which the controller shrinks. |
respect_source_max | bool | true | Cap effective batch size at the source page size. Must be true; false is rejected (cross-page buffering would break the O(batch_size) memory guarantee). |
log_every | integer | 50 | Emit a tracing::info summary every N adjustments (0 = never). |
AIMD behavior
The controller follows a strict priority order for each sub-batch observation:
- Error shrink (always fires, even during cooldown) — if the per-batch error
rate exceeds
error_threshold, the current size is multiplied bydecrease_factor(floor-rounded, clamped tomin), andcooldown_batchesis armed. - Cooldown gate — if cooldown is active, decrement the counter and skip growth. A new error during cooldown fires rule 1 again and re-arms the counter.
- Latency target (when
target_latency_msis set) — evaluate the rolling p50 latency:- p50 > 1.2 ×
target_latency_ms→ shrink. - p50 < 0.5 ×
target_latency_ms→ grow. - Otherwise, stay (dead-band prevents oscillation).
- p50 > 1.2 ×
- Success growth — add
increase_stepto the current size (clamped tomax).
Cold start
The controller initialises to the first source page length, clamped into
[min, max]. If the first page is smaller than min, the effective size starts
at min.
Example trajectory
With min=500, max=5000, increase_step=500, decrease_factor=0.5,
cooldown_batches=2:
batch 1: size=1000, ok, fast → grow → 1500
batch 2: size=1500, ok, fast → grow → 2000
batch 3: size=2000, 3% errors → shrink→ 1000, cooldown armed (2)
batch 4: size=1000, cooldown → skip → 1000
batch 5: size=1000, cooldown → skip → 1000
batch 6: size=1000, ok, fast → grow → 1500
Metrics
Four per-pipeline-row gauges / counters are emitted automatically:
| Metric | Type | Description |
|---|---|---|
faucet_pipeline_adaptive_batch_size | gauge | Current effective batch size. |
faucet_pipeline_adaptive_batch_adjustments_total | counter | Total adjustments, labeled direction=up|down and reason=success|error|latency. |
faucet_pipeline_adaptive_batch_cooldown_active | gauge | 1 while cooldown is active, 0 otherwise. |
faucet_pipeline_adaptive_batch_p50_latency_ms | gauge | Rolling p50 write latency (ms); absent until the window fills. |
All four carry the standard pipeline and row labels.
Example PromQL to alert when the controller is stuck shrinking:
# Shrink rate over the last 5 minutes
rate(faucet_pipeline_adaptive_batch_adjustments_total{direction="down"}[5m])
> 0.5
Caveats
Error-driven shrink requires a DLQ
The error signal comes from per-row outcomes reported via the DLQ path
(Sink::write_batch_partial). If no dlq: block is present, the controller
sees zero errors regardless of the sink response — only target_latency_ms
can drive shrinks. Add a dlq: block with on_batch_error: dlq_all if you
want the controller to react to sink-side write errors.
Within-page ceiling: max is capped at the source page size
In v1 the controller reslices pages it already received from the source — it
cannot buffer records across pages. The effective upper bound is therefore
min(max, source_page_size). If you set max: 50000 but the source emits
pages of 1 000 records, the controller will never write more than 1 000 rows
per call.
To allow bigger write batches, raise the source’s batch_size (e.g.
batch_size: 20000 on the postgres source config). Setting max higher than
the source page size is harmless but inert.
respect_source_max: false to cross page boundaries is rejected at config
load: cross-page buffering would have to hold records across source pages, which
breaks the pipeline’s O(batch_size) memory guarantee. Raise the source
batch_size instead.
No-op for per-record sinks
jsonl, csv, and stdout write one record at a time regardless of
batch_size. Adaptive sizing is active but harmless for these sinks — the
controller adjusts its internal state normally, but the actual write granularity
is unchanged. A one-time tracing::info message notes this when the pipeline
starts.
Scheduling pipelines with faucet schedule
faucet schedule runs a pipeline on a cron schedule in a long-running
foreground process. It is designed for server-side deployment: drop it into
systemd, Kubernetes, or any supervisor that can restart it on failure, and the
pipeline fires on time every time.
faucet schedule pipeline.yaml # foreground; Ctrl-C or SIGTERM to stop
faucet schedule pipeline.yaml --once # run exactly once now, then exit
The config must include a schedule: block alongside the usual pipeline:.
Configs without one are rejected with a hint to use faucet run instead.
A runnable example
The following config runs a CSV→JSONL pipeline every night at 02:00
America/Los_Angeles. Save it as nightly.yaml and start it with
faucet schedule nightly.yaml:
# nightly.yaml — run at 02:00 Pacific every night
version: 1
name: nightly-rollup
schedule:
cron: "0 2 * * *"
timezone: "America/Los_Angeles"
overlap_policy: skip # don't pile up if a run runs long
max_consecutive_failures: 5 # exit non-zero after 5 straight failures (supervisor restarts)
on_failure: continue
shutdown_grace_secs: 30
pipeline:
source:
type: csv
config:
path: ./events.csv
sink:
type: jsonl
config:
path: ./events.jsonl
See cli/examples/scheduled_nightly.yaml
for the canonical copy.
Cron syntax
faucet uses a standard Unix cron expression, validated at config-load time. A bad expression or an expression that can never fire produces a clear error before the process starts.
5-field form (MIN HOUR DOM MON DOW):
| Expression | Meaning |
|---|---|
0 2 * * * | Every night at 02:00 |
*/15 * * * * | Every 15 minutes |
0 9 * * 1-5 | Weekdays at 09:00 |
0 0 1 * * | First of every month at midnight |
0 */6 * * * | Every 6 hours |
6-field form (SEC MIN HOUR DOM MON DOW) — add a leading seconds field
for sub-minute intervals:
| Expression | Meaning |
|---|---|
*/30 * * * * * | Every 30 seconds |
0 */5 * * * * | Every 5 minutes (explicit seconds=0) |
Field ranges follow standard cron semantics: * (every), */N (every N),
a-b (range), a,b,c (list). Month and day-of-week names (JAN, MON,
etc.) are accepted. Special strings like @daily and @hourly are not
supported — use the numeric form.
Timezone and DST
Set timezone to any IANA timezone name
(e.g. America/Los_Angeles, Europe/Berlin, Asia/Tokyo). The default is
UTC.
All tick times are computed on UTC monotonic instants with timezone-correct wall-clock interpretation, so DST transitions behave correctly:
- Fall-back (clocks go back): a repeated wall-clock hour fires once.
- Spring-forward (clocks skip ahead): a wall-clock time in the skipped hour is treated as if it were in the hour immediately after the gap — the next valid tick.
The scheduler loop re-checks the wall clock at least every 30 seconds, so NTP steps, VM freeze/thaw, and DST shifts can never drift a scheduled fire by more than ~30 seconds.
Missed-tick behavior
The scheduler advances from the scheduled tick, not the wall clock, so a single occurrence is not skipped just because dispatch latency pushed the clock a little past it — it fires promptly (slightly late) and the schedule resumes. But if many ticks elapsed (the process was down, or a run took longer than several cron periods), the backlog is collapsed to a single catch-up: the scheduler fires once at the next due time and moves on. There is no catch-up storm and no flood of backfilled runs.
To find out how late a run fired, scrape
faucet_schedule_run_lateness_seconds (histogram: actual_start − scheduled_for).
Overlap policy
The overlap policy controls what happens when a tick fires while a run is already executing.
| Policy | When to use |
|---|---|
skip (default) | The tick is dropped and a faucet_schedule_overlaps_total{policy=skip} counter is incremented. Use when it is acceptable to miss a cycle if the previous one ran long. Most pipelines. |
queue | One missed tick is buffered and fires immediately when the current run finishes. Further misses during that same run collapse into the single queued tick (in-memory only — lost on restart). Use when missing a cycle is unacceptable but strict concurrency still must be preserved. |
forbid | The process exits non-zero the moment an overlap would occur. Use when overlapping runs would produce corrupt output or you want a hard guarantee that no two instances run simultaneously — pair with a supervisor that alerts or pages on non-zero exit. |
Choosing between skip and queue: if your pipeline is idempotent and
catching up after a long run matters (e.g. incremental replication with
state), use queue. If occasional missed cycles are harmless and you prefer
simplicity, use skip.
Failure model and supervisor integration
Two independent knobs govern what happens when a run fails:
on_failure | max_consecutive_failures | Behaviour |
|---|---|---|
continue (default) | null | Tolerates all failures indefinitely. Alert via faucet_schedule_consecutive_failures gauge. |
continue | N | Tolerates up to N−1 straight failures; exits non-zero when the Nth consecutive failure occurs. A successful run resets the counter to 0. |
stop | any | Exits non-zero immediately on the first failure. |
The recommended production pattern is on_failure: continue with
max_consecutive_failures: N (5–10 depending on how quickly you want a
supervisor restart):
schedule:
cron: "*/5 * * * *"
on_failure: continue
max_consecutive_failures: 5 # restart after 5 straight failures
Systemd unit example
# /etc/systemd/system/nightly-rollup.service
[Unit]
Description=faucet nightly rollup
After=network.target
[Service]
Type=simple
ExecStart=/usr/local/bin/faucet schedule /opt/pipelines/nightly.yaml
Restart=on-failure
RestartSec=30s
# Env vars for the pipeline
EnvironmentFile=/opt/pipelines/nightly.env
[Install]
WantedBy=multi-user.target
Restart=on-failure means systemd restarts the process whenever it exits
with a non-zero code, which is exactly the condition max_consecutive_failures
produces. RestartSec=30s adds a brief cooldown between restarts to avoid
hammering a broken upstream.
Kubernetes CronJob vs long-running Deployment
faucet schedule is designed for a Deployment (or long-running Pod):
one process, always running, fires on cron. This keeps token caches warm
and avoids cold-start latency on every tick.
If you need Kubernetes to manage the schedule itself, use a Kubernetes
CronJob with faucet run instead — each invocation is ephemeral and
the scheduler handles missed/overlapping pods at the platform level.
Graceful shutdown and SIGTERM
On SIGTERM or Ctrl-C:
- faucet stops accepting new ticks.
- If a run is in flight, it waits up to
shutdown_grace_secs(default 30) for it to finish. - If the run finishes within the grace period, the process exits 0.
- If the run is still running after the grace period, it is aborted. The
per-page
StateStorebookmark means the next start resumes from the last confirmed write — no data is lost, but the partial page since the last bookmark is re-fetched on the next run. Whether that causes duplicates depends on your sink’s idempotency.
Increase shutdown_grace_secs for long-running pages (e.g. a BigQuery batch
that takes several minutes to flush):
schedule:
cron: "0 * * * *"
shutdown_grace_secs: 120
Dated outputs with ${now.*}
${now.*} tokens let you inject the run’s wall time into source and sink config
values — so a scheduled pipeline can write to a different file or object-storage
prefix on every tick without any manual bookkeeping.
The headline use case is a dated partition path:
# nightly_partitioned.yaml — write to a new dated partition every night
version: 1
name: nightly-events
schedule:
cron: "0 2 * * *"
timezone: "America/Los_Angeles"
overlap_policy: skip
max_consecutive_failures: 5
pipeline:
source:
type: rest
config:
base_url: https://api.example.com
path: /v1/events
sink:
type: jsonl
config:
# ${now.date} reflects the schedule's timezone (America/Los_Angeles),
# so the partition label matches the business date of the run.
path: "./warehouse/dt=${now.date}/events.jsonl"
When the cron fires at 02:00 on 2026-03-09 Pacific time, ${now.date} resolves
to 2026-03-09 and faucet writes to ./warehouse/dt=2026-03-09/events.jsonl.
The parent directory is created automatically — local file sinks (JSONL, CSV)
create missing parent directories so dated subdirectory paths work without
pre-creating the tree.
The full token set:
| Token | Example | Use case |
|---|---|---|
${now.date} | 2026-03-08 | Daily partition key |
${now.year} / ${now.month} / ${now.day} | 2026 / 03 / 08 | Hive-style year=…/month=…/day=… paths |
${now.hour} | 14 | Hourly partitions |
${now.unix} | 1741442709 | Unique epoch-based filenames |
${now.strftime.<fmt>} | 2026/03/08/14 | Arbitrary layout — e.g. ${now.strftime.%Y/%m/%d/%H} |
${now.datetime} / ${now.iso} | 2026-03-08T14:05:09+00:00 | RFC 3339 timestamp in a filename or object key |
Clock semantics
faucet schedule uses the tick’s scheduled time rendered in the schedule’s
timezone — not the actual wall clock when the run started. This means
${now.date} is deterministic: re-running the same tick (e.g. after a restart)
produces the same path.
faucet schedule --once uses the current wall clock in the schedule’s timezone.
Backfilling with faucet run --clock
To backfill a range of dates, use faucet run with the --clock flag instead
of faucet schedule. --clock overrides the process start time used by
${now.*}:
# Backfill three nightly partitions
faucet run --clock 2026-03-01 nightly_partitioned.yaml
faucet run --clock 2026-03-02 nightly_partitioned.yaml
faucet run --clock 2026-03-03 nightly_partitioned.yaml
A bare date (2026-03-01) is treated as midnight UTC. An RFC 3339 timestamp
(2026-03-01T02:00:00-08:00) sets the clock precisely. Unknown ${now.*}
tokens are config errors; the token set is validated at run start before any
I/O begins.
Health metrics to scrape
Register a Prometheus listener via the observability: block:
observability:
prometheus:
listen: "127.0.0.1:9464"
Key metrics for a scheduling health dashboard:
| Metric | What to alert on |
|---|---|
faucet_schedule_heartbeat_unix_seconds | time() - value > 90 → scheduler loop is stuck or process crashed |
faucet_schedule_consecutive_failures | > 0 → at least one recent failure; >= max_consecutive_failures → imminent exit |
faucet_schedule_next_tick_unix_seconds | value - time() > 2 * expected_interval → scheduler is not advancing |
faucet_schedule_runs_total{outcome="err"} | Increasing counter → runs are failing |
faucet_schedule_overlaps_total | Repeated increments → runs are taking longer than the cron period |
faucet_schedule_run_lateness_seconds | p99 > threshold → runs are starting significantly late |
Full metric reference:
| Metric | Type | Description |
|---|---|---|
faucet_schedule_runs_total{pipeline,outcome} | Counter | outcome ∈ {ok, err, skipped} |
faucet_schedule_overlaps_total{pipeline,policy} | Counter | Overlap events by policy |
faucet_schedule_next_tick_unix_seconds{pipeline} | Gauge | Unix timestamp of the next scheduled tick |
faucet_schedule_runs_in_flight{pipeline} | Gauge | 0 or 1 |
faucet_schedule_consecutive_failures{pipeline} | Gauge | Resets to 0 on success |
faucet_schedule_heartbeat_unix_seconds{pipeline} | Gauge | Updated every loop wake (≤30 s) |
faucet_schedule_last_run_started_unix_seconds{pipeline} | Gauge | |
faucet_schedule_last_run_completed_unix_seconds{pipeline} | Gauge | |
faucet_schedule_last_run_duration_seconds{pipeline} | Gauge | |
faucet_schedule_run_lateness_seconds{pipeline} | Histogram | actual_start − scheduled_for |
Each run also emits a faucet.schedule.run tracing span (attributes:
run_ordinal, scheduled_for_unix_seconds, tick_unix_seconds) that wraps
the inner pipeline spans, so distributed tracing carries the scheduling
context through the full pipeline.
Running faucet as a service (faucet serve)
faucet serve turns faucet from a one-shot CLI into a long-running HTTP control
plane: an orchestrator (Airflow, Temporal, Dagster, Argo) submits pipeline
configs over HTTP, polls status, cancels runs, and streams logs — while faucet
amortizes startup (TLS handshakes, connection pools, schema introspection)
across many runs in one process. It is the second supported runtime mode
alongside one-shot faucet run and the cron
faucet schedule.
The full endpoint/schema reference is in HTTP API reference;
this page is the guided tour. serve requires the serve Cargo feature
(cargo install faucet-cli --features serve, or --features full).
Quickstart
# Start the server (loopback by default). Auth is mandatory — see below.
FAUCET_SERVE_AUTH_TOKEN=s3cret faucet serve --listen 127.0.0.1:8080
# Submit a run.
curl -XPOST http://127.0.0.1:8080/v1/runs \
-H "Authorization: Bearer s3cret" -H 'content-type: application/json' \
-d '{"config":"version: 1\npipeline:\n source: {type: csv, config: {path: in.csv}}\n sink: {type: jsonl, config: {path: out.jsonl}}\n","name":"adhoc"}'
# → {"run_id":"0192…","status":"queued","submitted_at":"…"}
# Poll it to completion.
curl -H "Authorization: Bearer s3cret" http://127.0.0.1:8080/v1/runs/0192…
# Tail its logs (SSE).
curl -N -H "Authorization: Bearer s3cret" http://127.0.0.1:8080/v1/runs/0192…/logs
⚠️ Security model — read before exposing
serve executes arbitrary client-supplied pipeline configs with the server’s
identity. That is a real privilege surface:
- Full interpolation: submitted configs resolve
${env:…},${file:…},${secret:…}, and${vault:…}/${aws-sm:…}/… against the server’s environment, filesystem, and credentials — exactly likefaucet run. An authenticated caller can read any secret the server can reach. - SSRF / egress: a submitted REST/HTTP source can be pointed at
169.254.169.254or internal services and will be fetched with the server’s network identity.
Mitigations are deployment-level and mandatory:
- Never run with
--no-authon a non-loopback bind. The no-auth gate is explicit: without--auth-token/FAUCET_SERVE_AUTH_TOKENand without--no-auth, startup fails. - Run single-tenant, behind authentication, behind egress controls / network
policy. The default loopback bind (
127.0.0.1) is deliberate — exposing externally is an explicit choice. - Terminate TLS at a proxy/ingress (serve speaks plain HTTP).
- Prefer
FAUCET_SERVE_AUTH_TOKENover--auth-token(the latter leaks throughps//proc). - Never run a serve process at
FAUCET_LOG=debugwhen submitted configs hold resolved secrets — only faucet’s own log output is redacted, not third-party connector debug logging.
Bounded concurrency & backpressure
--max-concurrent-runs (default min(16, cpu_count())) bounds how many runs
execute at once; --max-queued-runs (default 8×) bounds the queue. A submit
past the queue cap returns 429 with Retry-After. Note that total concurrent
pipeline work ≈ max-concurrent-runs × each config's execution.max_concurrent.
Idempotency
Supply idempotency_key to make retries safe (Stripe-style):
- First submit with a key → runs normally.
- Re-submit the same key + same request within
--idempotency-retention-secs(default 24h) → returns the originalrun_id(replayed, no new run). - Same key + a different request →
409 Conflict. - After the retention window, the key is re-usable for a fresh run.
- Deleting a run also frees its idempotency key immediately — a later submit with that key starts a fresh run rather than 404-ing on the deleted record.
The “request” identity covers the merged config and the run-affecting
request fields — clock, timeout_secs, and labels. In particular, a retry
that reuses the key but changes the backfill clock is a 409, not a replay of
the original window (so you can’t silently get the original clock’s results).
The claim is atomic, so concurrent retries can’t both start a run.
Degraded mode: while the persistent history backend is degraded (see Run history), the in-memory fallback can’t see claims the database made before the outage. Rather than risk a duplicate run, submissions carrying an idempotency key are rejected with
503until the backend recovers — retry then, or resubmit without a key if at-least-once is acceptable. Submissions without a key are unaffected.
Cancellation
POST /v1/runs/{id}/cancel cooperatively cancels an in-flight run (202); on an
already-terminal run it’s a 200 no-op. The same cooperative path handles a run
that hits its timeout_secs and the server-shutdown drain.
Cancellation is flush-completing: the pipeline stops at its next page
boundary and flushes the sink, so a buffered sink (e.g. Parquet, whose footer is
only written on flush) commits the rows written so far rather than orphaning the
whole file (#146 H16). The run is then marked cancelled — there is no
cross-process resume, so re-submit to continue. A run still stuck mid-write
after a bounded flush grace is hard-dropped (its buffered output may be lost),
so a hung run can’t wedge shutdown.
--default-config (workspace defaults)
Pass --default-config <file> to merge shared settings under every submitted
run (submitted values win; objects merge, scalars/arrays replace). Pin state:,
execution:, and the auth: catalog once instead of repeating them per request.
See cli/examples/serve_minimal.yaml.
Cardinality: a config’s
name:field drives the metricpipelinelabel and the state-key prefix. Use a stablename:per logical pipeline — never an ad-hoc per-run string — or Prometheus cardinality blows up. The request-levelname/labelsare run-record metadata only, never metric labels.
Run history & persistence
By default run records live in memory and are lost on restart. For durable
history across restarts, point --history at a database (requires the matching
build feature):
# Postgres (feature: serve-history-postgres)
faucet serve --history 'postgres://user:pw@db/faucet'
# SQLite (feature: serve-history-sqlite)
faucet serve --history 'sqlite:/var/lib/faucet/runs.db'
Both create their schema on first connect. If the backend is unreachable at
startup, or fails at runtime, serve degrades to the in-memory store so it
stays up: it logs once, sets the faucet_serve_history_degraded gauge, and
/readyz returns 503. Persisted records are not migrated into the fallback —
degraded mode is a stay-alive, not a replica. Terminal records are retained for
--retain-terminal-runs-secs (default 7 days).
Multi-instance orphan recovery (run-ownership leases)
A persistent backend can be shared by several faucet serve instances (an
HA pair, a rolling/blue-green deploy). Each instance gets a fresh id at startup
and owns the runs it executes; while a run is in flight its owner heartbeats a
lease on the run record (at ~⅓ of --lease-ttl-secs, default 30s). A run is
only recovered — marked failed with owning serve instance's lease expired —
once its lease has expired, i.e. its owner stopped heartbeating (crashed or was
shut down). Recovery runs both at startup and periodically, so a surviving
instance reclaims a dead peer’s orphans without waiting for a restart.
This means a starting or running instance never fails another live
instance’s in-flight runs — the gap that an unscoped “fail every non-terminal
run at startup” sweep would open on a shared database. Tune --lease-ttl-secs
above your worst-case GC/IO stall so a healthy-but-slow instance is never
falsely reclaimed (a longer TTL is safer but slows how quickly a crashed
instance’s runs are cleaned up). The in-memory backend is single-process and
unshared, so leases don’t apply to it. There is still no cross-process resume:
a recovered run is marked failed, not continued — re-submit to retry.
Graceful shutdown
SIGTERM/SIGINT stops accepting new connections, drains in-flight runs up to
--shutdown-grace-secs (default 60), then cancels the remainder (marked failed).
Health & observability
/healthz— liveness (always 200 while serving)./readyz— 503 when history is degraded or the queue is full./metrics— Prometheus, includingfaucet_serve_*series./metricsis unauthenticated; restrict it at the network layer if its labels are sensitive.
Troubleshooting with faucet doctor
faucet doctor answers “why won’t my pipeline run?” before you run it. It
probes every connector in a config — auth, network, permissions, reachability —
and prints a green/red checklist, exiting non-zero if anything fails. It is
non-mutating: no data is written, no rows inserted, no objects uploaded.
faucet doctor pipeline.yaml
✓ Config parses and interpolates 8 ms
✓ Matrix expands to 2 invocations 0 skipped (children)
▸ Invocation default::us-east (source=postgres, sink=bigquery)
✓ source [postgres] read 42 ms
✓ sink [bigquery] auth 280 ms
✓ state [redis] sentinel 14 ms
▸ Invocation default::eu-west (source=postgres, sink=bigquery)
✓ source [postgres] read 39 ms
✗ sink [bigquery] auth (dataset eu_west not found) 410 ms
hint: check bigquery credentials and that the dataset exists
Summary: 5 passed, 1 failed, 0 skipped total elapsed 0.5s
The exit code is the number of failed probes (clamped to 255), so doctor
drops straight into a CI gate or a deploy script:
faucet doctor pipeline.yaml || { echo "preflight failed"; exit 1; }
What gets probed
| Role | Probe |
|---|---|
| Source (most) | Pulls a single page via the real read path (DNS + TLS + auth + first request) and stops — never the full dataset. |
webhook source | The configured port is bindable. |
websocket source | TCP connect to the host (no WebSocket handshake). |
postgres-cdc source | The replication slot is reachable (missing slot → skip, since run can create it). |
kafka source / sink | A cluster metadata request (validates brokers + auth without consuming/producing). |
SQL sinks (postgres/mysql/sqlite) | SELECT 1 on the pool. |
s3 / gcs sinks | Bucket head / metadata list. |
bigquery / snowflake sinks | Token mint + a read-only metadata call / SELECT 1. |
redis / mongodb / elasticsearch / http sinks | PING / ping / cluster health / a HEAD request. |
File sinks (jsonl/csv/parquet/stdout) | Target directory is writable (stdout always passes). |
State stores (redis/postgres/file/memory) | A sentinel put/get/delete that leaves no residue. |
Reading the result
✓ pass— the probe succeeded.✗ fail— unreachable / unauthenticated / misconfigured. The parenthesized reason and thehint:line tell you what to fix.• skip— not applicable: an optional target is absent (e.g. a CDC slot not yet created), a connector ships no probe, or an object-store path can’t be cheaply checked.
Flags
| Flag | Purpose |
|---|---|
--timeout-secs <N> | Per-probe timeout in seconds (default 10). Lower it to fail fast against dead hosts. |
--json | Emit a { config, invocations, summary } JSON document for tooling. |
--env-file <path> / --no-env-file | Same .env handling as run. |
The --json shape:
{
"config": "pipeline.yaml",
"invocations": [
{
"id": "default::eu-west",
"probes": [
{ "role": "source", "connector": "postgres", "name": "read", "status": "pass", "elapsed_ms": 39 },
{ "role": "sink", "connector": "bigquery", "name": "auth", "status": "fail",
"reason": "dataset eu_west not found", "elapsed_ms": 410,
"hint": "check bigquery credentials and that the dataset exists" }
]
}
],
"summary": { "passed": 5, "failed": 1, "skipped": 0, "elapsed_ms": 500 }
}
Limitations
- Child invocations in a parent/child matrix are listed but not probed: their
configs depend on parent records that only exist at run time (same limitation
as
faucet preview). doctorneeds real credentials — it resolves secrets likerundoes. Usefaucet validate --no-secretsfor an offline grammar-only check.- Probe
reason/hinttext is scrubbed for resolved secrets, but don’t run withFAUCET_LOG=debugagainst a config holding live secrets (third-party connector logging is outside faucet’s redaction boundary).
Connector catalog
faucet-stream ships 21 sources and 17 sinks. Each is a Cargo feature
(source-<name> / sink-<name>) and an independently published crate. Full API
docs are on docs.rs.
Run faucet list to see what’s compiled into your binary, and
faucet schema source <name> / faucet schema sink <name> for a connector’s
exact config fields. Not sure which to pick? See
Choosing a connector.
Legend: ✓ supported · ✗ not applicable.
Sources
| Connector | Feature | Streams¹ | Resumable² | Compression | Underlying primitive |
|---|---|---|---|---|---|
| REST | source-rest | ✓ | ✓ | ✗ | HTTP + 6 pagination styles, JSONPath extraction |
| GraphQL | source-graphql | ✓ | ✗ | ✗ | cursor pagination, variable injection |
| XML / SOAP | source-xml | ✓ | ✗ | ✗ | streaming XML→JSON, dot-path extraction |
| gRPC | source-grpc | ✓³ | ✗ | ✗ | dynamic protobuf; unary + server-streaming |
| PostgreSQL | source-postgres | ✓ | ✗ | ✗ | SQL query, rows as JSON |
| PostgreSQL CDC | source-postgres-cdc | ✓ | ✓ | ✗ | logical replication (pgoutput), LSN bookmarks |
| MySQL | source-mysql | ✓ | ✗ | ✗ | SQL query, rows as JSON |
| Microsoft SQL Server | source-mssql | ✓ | ✓⁷ | ✗ | SQL query (tiberius), rows as JSON |
| SQLite | source-sqlite | ✓ | ✗ | ✗ | SQL query, rows as JSON |
| AWS S3 | source-s3 | ✓⁴ | ✗ | ✓ | object reader: JSONL, JSON array, raw text |
| Google Cloud Storage | source-gcs | ✓⁴ | ✗ | ✓ | object reader: JSONL, JSON array, raw text |
| MongoDB | source-mongodb | ✓ | ✗ | ✗ | find() with filter/projection/sort |
| Redis | source-redis | ✓ | ✗ | ✗ | streams, lists, key patterns |
| Webhook | source-webhook | ✗⁵ | ✗ | ✗ | temporary HTTP server collecting POSTs |
| WebSocket | source-websocket | ✓ | ✗ | ✗ | live push feed; subscribe frames, reconnect, ping keepalive |
| CSV | source-csv | ✓ | ✗ | ✓ | CSV files as JSON |
| Elasticsearch | source-elasticsearch | ✓ | ✗ | ✗ | search/scroll API |
| Apache Kafka | source-kafka | ✓ | ✓ | ✗ | consumer; idle/max-messages termination, offset bookmarks |
| Apache Parquet | source-parquet | ✓ | ✗ | ✗ | local/glob/S3, vectorized Arrow reader, projection |
| BigQuery | source-bigquery | ✓ | ✗ | ✗ | jobs.query + pageToken pagination |
| Snowflake | source-snowflake | ✓ | ✗ | ✗ | SQL REST API, server-side partitions |
¹ Streams = yields records in bounded-memory batches rather than buffering the
whole result. ² Resumable = persists a bookmark to a state store
so re-runs continue where they left off (incremental replication / CDC / Kafka
offsets). ³ gRPC streams natively in server-streaming mode; unary buffers the
single response. ⁴ S3/GCS stream in JSONL and raw-text modes; JSON-array mode
buffers one object. ⁵ Webhook is buffer-shaped by nature (it collects POSTs over
a window). ⁷ MSSQL is resumable only in replication: incremental mode (it
persists a tracking-column bookmark); in full mode it is not.
Sinks
Every sink exposes a batch_size knob for write-side re-chunking. For the
file/append sinks (jsonl, csv, stdout) it’s a no-op — they write per record.
| Connector | Feature | batch_size | Compression | Write unit |
|---|---|---|---|---|
| BigQuery | sink-bigquery | ✓ | ✗ | tabledata.insertAll (per-row DLQ) |
| PostgreSQL | sink-postgres | ✓ | ✗ | multi-row INSERT (JSONB or mapped cols) |
| JSON Lines | sink-jsonl | no-op | ✓ | buffered file append |
| Snowflake | sink-snowflake | ✓ | ✗ | SQL REST API |
| MySQL | sink-mysql | ✓ | ✗ | multi-row INSERT |
| Microsoft SQL Server | sink-mssql | ✓ | ✗ | multi-row INSERT (2100-param auto-split, per-row DLQ) |
| SQLite | sink-sqlite | ✓ | ✗ | transaction-wrapped batch |
| AWS S3 | sink-s3 | ✓ | ✓ | JSONL objects, parallel uploads |
| Google Cloud Storage | sink-gcs | ✓ | ✓ | JSONL objects |
| MongoDB | sink-mongodb | ✓ | ✗ | insert_many |
| Redis | sink-redis | ✓ | ✗ | streams, lists, key-value (pipelined) |
| CSV | sink-csv | no-op | ✓ | buffered file rows |
| Elasticsearch | sink-elasticsearch | ✓ | ✗ | _bulk NDJSON (per-row DLQ) |
| HTTP | sink-http | ✓ | ✗ | POST, concurrent under a semaphore |
| Stdout | sink-stdout | no-op | ✗ | JSON Lines / pretty JSON / TSV |
| Apache Kafka | sink-kafka | ✓ | ✗ | producer, batched sends, multi-topic routing |
| Apache Parquet | sink-parquet | ✓ | ✗⁶ | local/S3, schema inference, row/byte rollover |
⁶ Parquet has internal columnar compression, so the file-level compression
feature doesn’t apply.
Authentication at a glance
| Family | Auth options |
|---|---|
| REST / GraphQL / XML | Bearer, Basic, ApiKey (header), ApiKeyQuery, OAuth2 (client-credentials), TokenEndpoint, Custom headers — see Auth cookbook |
| BigQuery | service-account key (path or inline JSON), application-default credentials |
| Snowflake | JWT key-pair, OAuth |
| Kafka | SASL (PLAIN/SCRAM) + TLS |
| WebSocket | none, Bearer token, Custom headers |
| Elasticsearch | basic, API key, bearer, none |
| S3 / GCS | cloud SDK credential chains (env, profile, metadata) |
| SQL databases | connection URL (with embedded credentials / TLS params) |
Inspect any connector’s exact auth shape with faucet schema source <name> /
faucet schema sink <name>.
Batching
Default batch_size is 1000; max is 1,000,000. batch_size: 0 means “no
batching” — the source emits the whole result set in one page and the sink writes
it in one request (good for small lookup tables or load-job-style sinks). See
Performance tuning.
Choosing a connector
Several connectors overlap. This page resolves the common “which one?” questions. For the full feature grid see the connector catalog.
PostgreSQL: query source vs. CDC
source-postgresruns a SQL query and returns the rows. Use it for one-shot extracts, snapshots, or when you control anupdated_atcolumn and parameterize the query yourself. Simple, no special Postgres config.source-postgres-cdcstreams everyINSERT/UPDATE/DELETEfrom the write-ahead log via logical replication. Use it when you need every change (including deletes), low-latency capture, or resumability without a cursor column. Requireswal_level = logicaland a publication, and retains WAL between runs. See the CDC tutorial.
Rule of thumb: periodic snapshot → query source; continuous change feed → CDC.
Object storage: S3/GCS source vs. Parquet source
source-s3/source-gcsread objects as JSONL, a JSON array, or raw text. Use them for line-delimited JSON, logs, or text dumps.source-parquetreads columnar Parquet (local, glob, or S3) with a vectorized Arrow reader and column projection. Use it for analytical datasets — it’s far faster and can skip columns you don’t need.
Rule of thumb: the file is .parquet → Parquet source; it’s JSON/text →
S3/GCS source. (The Parquet source reads from S3 directly, so you don’t need the
S3 source in front of it.)
Live feeds: WebSocket vs. Webhook vs. Kafka/Redis
source-websocket— connects out to a live push endpoint (ws:///wss://), optionally sends subscription frames, and streams each incoming message as a record. Use it for market data, chat feeds, telemetry, or any server that pushes over WebSocket. Live-only — no replay, no durable offset.source-webhook— opens a temporary HTTP server and receives inbound HTTP POSTs from external systems over a time window. Use it when the remote system pushes to you over HTTP rather than WebSocket.source-kafka/source-redis— broker-backed streaming with durable, replayable offsets and resumable bookmarks. Use these when you need guaranteed delivery and the ability to continue from where a previous run left off.
Rule of thumb: connecting out to a live WebSocket feed → source-websocket; receiving
inbound HTTP POST payloads → source-webhook; durable, replayable event stream →
source-kafka or source-redis.
Streaming: Redis vs. Kafka
source-redisreads streams, lists, or key patterns. Great when Redis is already in your stack and volumes are modest.source-kafkais a real consumer with consumer-group offsets and resumable bookmarks. Use it for high-throughput event pipelines and durable, replayable streams.
Rule of thumb: durable, high-volume event stream → Kafka; lightweight queue/cache already on hand → Redis.
HTTP APIs: REST vs. GraphQL vs. XML vs. gRPC
source-rest— JSON REST APIs. The most full-featured source: six pagination styles, seven auth strategies, incremental replication, partitions.source-graphql— GraphQL endpoints with cursor pagination and variable injection.source-xml— XML/SOAP APIs; converts XML to JSON with dot-path extraction.source-grpc— gRPC services via dynamic protobuf (prost-reflect), unary or server-streaming.
Rule of thumb: match the protocol the API speaks. For incremental/resumable ingestion, REST has the richest support.
Warehouses: when to read with BigQuery / Snowflake sources
Use source-bigquery / source-snowflake to read out of a warehouse
(e.g. to move a query result elsewhere). To load into one, use the matching
sink. To transform data already inside the warehouse, reach for
dbt — that’s not faucet’s job.
Sinks: column-mapped vs. JSON blob (SQL databases)
The Postgres/MySQL/SQLite/SQL Server sinks can write either:
- a single JSON/JSONB column (
column_mapping: { type: jsonb, column: data }) — schemaless, no DDL coupling, easiest to start with; or - auto-mapped columns — one column per top-level field, for queryable relational tables.
Rule of thumb: exploratory / evolving schema → JSON column; stable schema you query with SQL → mapped columns.
File sinks: JSONL vs. CSV vs. Parquet vs. stdout
sink-stdout— debugging and pipelines (faucet previewuses it).sink-jsonl— line-delimited JSON; lossless, streaming-friendly, gzip/zstd-capable.sink-csv— flat tabular output for spreadsheets/BI; nested fields flatten.sink-parquet— columnar analytical output with built-in compression and schema inference; best for large datasets consumed by analytics engines.
Rule of thumb: machine-to-machine JSON → JSONL; tabular for humans → CSV; analytics at scale → Parquet.
Still unsure?
Run faucet list to see what’s installed, faucet schema source <name> to
inspect a connector’s config, and faucet preview <config> --limit 10 to try a
source without writing anywhere.
CLI commands
The faucet binary exposes these commands. Pass --log-level <level> (or set
FAUCET_LOG) to control logging.
| Command | What it does |
|---|---|
faucet run [config] | Run the pipeline(s) in a config file. |
faucet validate [config] | Parse, expand, and validate a config without running it. |
faucet preview [config] | Run only the source side and print records to stdout. |
faucet schema <target> | Print the JSON Schema for a connector, transform, or the DLQ. |
faucet list | List every compiled-in source, sink, and transform with a one-line description. |
faucet init [name] | Scaffold a commented config skeleton from connector schemas. |
faucet doctor [config] | Probe every connector (auth/network/permissions) and print a checklist. |
faucet schedule [config] | Run a pipeline on a cron schedule (long-running foreground process). |
faucet serve | Run a long-running HTTP control plane: submit / poll / cancel pipeline runs over REST. |
[config] is optional for run / validate / preview / doctor / schedule: if omitted,
faucet auto-discovers faucet.yaml → .yml → .json in the current directory.
run
faucet run pipeline.yaml
faucet run # auto-discover faucet.yaml in cwd
faucet run --from-env # build the pipeline entirely from FAUCET_* env vars
faucet run pipeline.yaml --env-file prod.env
faucet run pipeline.yaml --no-env-file
faucet run pipeline.yaml --clock 2026-03-01 # backfill: set ${now.*} clock to midnight UTC
faucet run pipeline.yaml --clock 2026-03-01T02:00:00-08:00 # backfill: precise RFC 3339 timestamp
Flags:
| Flag | Purpose |
|---|---|
--clock <value> | Override the clock used by ${now.*} tokens. Accepts an RFC 3339 timestamp (2026-03-01T00:00:00Z) or a bare date (2026-03-01, treated as midnight UTC). Default: process start time in UTC. Use this for backfills — run the same config with a different date without changing the file. |
--env-file <path> / --no-env-file | Same .env handling as validate / preview. |
--from-env | Build the pipeline entirely from FAUCET_* environment variables; mutually exclusive with a positional config path. |
validate
Reports one line per expanded matrix row. Use it in CI to catch config errors before deploying.
faucet validate pipeline.yaml
When the config contains secrets-manager directives (${vault:…}, ${aws-sm:…},
etc.), faucet validate resolves them as a real preflight and prints one
confirmation line per reference (never the value):
secret: vault:secret/data/faucet/api#token → resolved
ok: 'my-pipeline' rows=1 (roots=1, children=0) execution=(defaults)
- default [root] source=rest sink=jsonl
Pass --no-secrets to validate grammar and structure only, skipping all secret
fetches. This is useful in CI environments that lack credentials, or in local
development before vault access is available:
faucet validate --no-secrets pipeline.yaml
preview
Runs the first root row’s source and prints records (via the stdout sink).
Children aren’t previewed because they need parent records to resolve
${parent.path} tokens.
faucet preview pipeline.yaml --limit 10
schema
faucet schema source rest
faucet schema sink bigquery
faucet schema transform keys_case
faucet schema dlq
faucet schema secrets
faucet schema transform <name> prints the inline config schema for a
transform (e.g. keys_case lists the valid mode: values). Run
faucet list to see which transforms are compiled into your binary.
faucet schema secrets prints the directive grammar and auth requirements for
all four secrets-manager backends in machine-readable JSON — useful for tooling
that needs to understand the interpolation syntax without reading the docs.
init
faucet init my_pipeline --source postgres --sink bigquery
Required fields are surfaced with a typed placeholder and a # REQUIRED marker;
optional fields are commented out so connector defaults apply. The interactive
mode (--interactive) is gated behind the cli-interactive feature.
doctor
faucet doctor pipeline.yaml # checklist; exit code = # of failed probes
faucet doctor pipeline.yaml --timeout-secs 5 # per-probe timeout (default 10)
faucet doctor pipeline.yaml --json # machine-readable, for CI gating
Runs a fast, non-mutating preflight against every connector in the config so misconfiguration surfaces before a real run. For each root invocation it probes the source, sink, and state store and prints a green/red checklist with elapsed times; the exit code equals the number of failed probes (clamped to 255).
- Sources reuse the real read path — the probe pulls a single page and stops
(never the full dataset). Sources whose first page would block or mutate use a
targeted probe instead:
webhook(port bindable),websocket(TCP connect),postgres-cdc(slot reachable),kafka(cluster metadata). - Sinks run a read-only connect/auth/metadata call —
SELECT 1,HeadBucket,PING,tables.get, cluster health,fetch_metadata, or a directory-writable check for file sinks. Never a real write. - State stores do a sentinel
put/get/deletethat leaves no residue.
Child invocations (parent/child matrix rows) are listed but not probed — their configs depend on parent records that only exist at run time. Probe messages are scrubbed for resolved secrets before printing.
See the Troubleshooting cookbook page for reading the output and common failures.
schedule
faucet schedule pipeline.yaml # run on cron schedule, foreground; Ctrl-C to stop
faucet schedule pipeline.yaml --once # run exactly once now, then exit
faucet schedule pipeline.yaml --env-file prod.env
faucet schedule pipeline.yaml --no-env-file
Runs a pipeline on a recurring cron schedule in a long-running foreground process. The config
must contain a top-level schedule: block (without one, faucet errors and suggests faucet run).
Requires the schedule Cargo feature (included in full).
- Stop with Ctrl-C or SIGTERM; the in-flight run drains for up to
shutdown_grace_secs(default 30) before the process exits. --onceignores cron timing and runs the pipeline exactly once immediately — handy for testing a scheduled config or for one-shot container invocations.- Missed ticks are skipped, not backfilled. A run that starts late emits
faucet_schedule_run_lateness_secondsfor monitoring.
Flags:
| Flag | Purpose |
|---|---|
--once | Run exactly once now, then exit. Ignores cron timing. |
--env-file <path> / --no-env-file | Same .env handling as run / validate. |
See the scheduling cookbook for worked examples, the overlap-policy decision tree, the resilience/supervisor model, and the full metric set to scrape.
serve
FAUCET_SERVE_AUTH_TOKEN=s3cret faucet serve --listen 0.0.0.0:8080
faucet serve --no-auth # explicit opt-in; required if no token
faucet serve --history sqlite:/var/lib/faucet/runs.db --default-config defaults.yaml
Runs a long-running HTTP control plane that accepts pipeline configs over REST, executes them
under bounded concurrency (reusing the same executor as faucet run), and exposes status / cancel /
list / SSE-logs endpoints plus /healthz, /readyz, and /metrics. Requires the serve Cargo
feature (included in full).
Unlike the other commands, serve takes no config file — configs arrive per request. Auth is
mandatory: pass --auth-token/FAUCET_SERVE_AUTH_TOKEN, or --no-auth to explicitly disable it
(absent both, startup fails).
Selected flags (faucet serve --help for the full list):
| Flag | Purpose |
|---|---|
--listen <addr> | Bind address (default 127.0.0.1:8080; env FAUCET_SERVE_LISTEN). |
--auth-token <t> / --no-auth | Bearer token (prefer the env var) or explicit no-auth opt-in. |
--max-concurrent-runs <n> / --max-queued-runs <n> | Concurrency + queue caps (429 past the queue). |
--history <url> | postgres://… / sqlite:… for durable run history (feature-gated; default in-memory). |
--default-config <path> | Workspace defaults merged under every submitted run. |
--cors-origin <origin> | Allow-list a browser origin (repeatable; CORS off by default). |
--lease-ttl-secs <n> | Run-ownership lease TTL (default 30) for multi-instance orphan fencing on a shared persistent backend — set above worst-case stalls. See the serve cookbook. |
--body-limit-bytes / --shutdown-grace-secs / --retain-terminal-runs-secs / --idempotency-retention-secs | Tuning knobs. |
⚠️
serveexecutes arbitrary client-supplied configs with the server’s identity (secrets, files, network egress). Run single-tenant, authenticated, behind egress controls. See the serve cookbook for the security model and the HTTP API reference for endpoints.
Environment-only mode
faucet run --from-env assembles a pipeline from a FAUCET_* snapshot
(FAUCET_SOURCE_*, FAUCET_SINK_*, FAUCET_STATE_*, FAUCET_TRANSFORM_<N>_*),
which is handy for containerized deployments where everything comes from the
environment. Nested/tagged-enum fields use a *_JSON suffix.
The complete config grammar (matrix, templates, vars, execution) lives in
cli/README.md.
Configuration file format
A faucet config is a YAML or JSON document with this top-level shape:
version: 1 # required, must be 1
name: my_pipeline # optional; used in state keys and metrics
vars: {} # optional; reusable values referenced as ${vars.X}
auth: {} # optional; named shared auth providers (see below)
schedule: {} # optional; cron schedule for faucet schedule (see below)
pipeline: # required
source: { type: …, config: { … } }
transforms: [] # optional list
sink: { type: …, config: { … } }
state: { type: …, config: { … } } # optional
dlq: { … } # optional dead-letter queue
matrix: [] # optional per-row overrides / DAG
execution: # optional
max_concurrent: 4
on_error: continue # continue | stop
Unknown keys are rejected. The structural blocks (
pipeline, eachsource/sink/transform/statespec,matrixrows,execution) reject unrecognized fields, so a typo liketransorms:orparnet:is a load-time error rather than a silently-ignored field. A connector’s ownconfig: { … }object is still passed through verbatim to that connector.
pipeline
source and sink each take a type (the connector name) and a config
object whose fields are that connector’s schema — see faucet schema source <name>. transforms is an ordered list applied to every record. state
attaches a state store; dlq attaches a
dead-letter queue.
Transforms layering
Transforms can be declared at three layers and are resolved additively per matrix row in lifecycle order:
final = T_pipeline ++ T_source ++ T_row
pipeline.transforms— cross-cutting policy, runs first on every row.pipeline.sources.<name>.transforms— bound to a source template; runs for every row that resolves to this source.matrix[i].transforms— row-specific extras, runs last.
Each declaring layer (source template, matrix row) carries an
inherit_transforms: bool (default true); setting it false drops every
upstream layer for that scope.
Sinks reject both transforms: and inherit_transforms: at expand time —
destination shaping belongs at the pipeline or row layer. See the
transforms cookbook for the full model and
worked examples.
Available transforms
The full catalogue (with shapes and worked examples) lives in the
transforms cookbook; faucet list prints the
same set, and faucet schema transform <name> returns the JSON schema for
each. Highlights:
filter— keep records where a JSONPath predicate is true. See the cookbook for the operator set and path syntax.explode— expand an array field into one record per element. See the cookbook for the merge rule andon_missingsemantics.
Interpolation
Three stages resolve placeholders:
- Load time:
${env:VAR},${file:PATH},${secret:VAR}are resolved when the file is read.${vars.X}resolves against the top-levelvars:block;${sources.NAME.PATH}/${sinks.NAME.PATH}resolve against named templates. Secret-manager directives (see below) run as the final load-time stage. - Runtime:
${row_id.dotted.path}tokens are resolved per parent record in DAG runs.${now.*}tokens are resolved per invocation at run time (see below).
Reference cycles surface as a clear InterpolationCycle error.
${now.*} — run-clock interpolation
${now.*} tokens inject the current wall time into source and sink config
values. Each invocation evaluates them once at run time:
| Token | Example output | Notes |
|---|---|---|
${now.date} | 2026-03-08 | YYYY-MM-DD |
${now.datetime} | 2026-03-08T14:05:09+00:00 | RFC 3339; alias: ${now.iso} |
${now.iso} | 2026-03-08T14:05:09+00:00 | Alias for ${now.datetime} |
${now.year} | 2026 | Zero-padded 4-digit year |
${now.month} | 03 | Zero-padded month (01–12) |
${now.day} | 08 | Zero-padded day (01–31) |
${now.hour} | 14 | Zero-padded hour (00–23) |
${now.minute} | 05 | Zero-padded minute (00–59) |
${now.second} | 09 | Zero-padded second (00–59) |
${now.unix} | 1741442709 | Unix epoch seconds |
${now.strftime.<fmt>} | 2026/03/08/14 | Arbitrary chrono strftime — e.g. ${now.strftime.%Y/%m/%d/%H} |
An unknown token (e.g. ${now.foo}) is a config error at run time. An invalid
strftime format produces a clean config error rather than a panic.
Clock source:
faucet run— the process start time in UTC. Override with--clock <value>for backfills: an RFC 3339 timestamp (2026-03-01T00:00:00Z) or a bare date (2026-03-01, treated as midnight UTC). See theruncommand reference.faucet schedule— the tick’s scheduled time, rendered in the schedule’stimezone.${now.date}therefore reflects the date in the timezone the cron fires in (e.g.America/Los_Angeles), not UTC. Queued runs use their original scheduled time;--onceuses the current wall clock.
Scope: ${now.*} tokens (and ${row_id.path} parent-record references) are
resolved only in source and sink config values. Using one in a state:,
dlq:, or transforms: config is a config error at validate/expand time —
it is rejected rather than silently passed to the connector as a literal
${…} string. (${env:…} / ${vars.X} / ${sources.X} still resolve
everywhere.)
Reserved id: now is a reserved matrix row id — a matrix row cannot be
named now.
SQL caveat: ${now.*} substitutes as plain text into config values — the
same semantics as ${row_id.path} tokens. For SQL sources that interpolate
${now.*} into a query string, prefer the connector’s bind-parameter path
(substitute_context_bind_params) over raw text substitution to avoid
injection risk.
Secrets-manager directives
Four additional load-time schemes pull values from external secrets managers.
Each requires the matching build feature (--features secrets-vault, etc.;
--features secrets enables all four). Values are fetched concurrently and
de-duplicated; they are never written to disk.
| Directive | Backend | Auth |
|---|---|---|
${vault:<path>[#field]} | HashiCorp Vault KV v2 | VAULT_ADDR + VAULT_TOKEN (+ optional VAULT_NAMESPACE) |
${aws-sm:<name-or-ARN>[#field]} | AWS Secrets Manager | aws-config default chain (env / profile / instance / web-identity) |
${gcp-sm:projects/<p>/secrets/<s>/versions/<v>} | GCP Secret Manager (versions/latest ok) | Application Default Credentials |
${azure-kv:<vault>/<secret>[/<version>]} | Azure Key Vault | AZURE_* env / managed identity / az login |
The #field selector (Vault and AWS only) parses the secret body as a JSON
object and extracts a single key. Use faucet schema secrets for the machine-readable
grammar reference and faucet validate --no-secrets to check grammar offline.
See the secrets cookbook for full examples, the
redaction guarantee, and the known limitation around the auth: catalog.
matrix
Each row is deep-merged onto pipeline (scalars replace, objects merge, arrays
replace). A row with parent: runs once per parent record. See the
matrix DAG tutorial. For DRY configs with many
rows, define named templates under pipeline.sources / pipeline.sinks and
select them per row with ref:.
auth
A map of named auth providers, each { type, config } (type ∈ static /
oauth2 / oauth2_refresh / token_endpoint). A connector references one with
auth: { ref: <name> } instead of inline auth; faucet builds each provider once
and shares it across every connector that references it (one token, single-flight
refresh). See the authentication cookbook.
auth:
api:
type: oauth2_refresh
config:
token_url: ${env:API_TOKEN_URL}
client_id: ${secret:API_CLIENT_ID}
client_secret: ${secret:API_CLIENT_SECRET}
refresh_token: ${secret:API_REFRESH_TOKEN}
execution
max_concurrent— one shared concurrency budget across roots and child fan-outs.on_error—continue(siblings finish; failed subtree skipped) orstop(abort pending and in-flight work on first failure).
Adaptive batch sizing
The optional adaptive_batch_size: sub-block enables the AIMD controller that
auto-tunes the effective write batch size from observed sink latency and error
rate. Default enabled: false (opt-in).
execution:
adaptive_batch_size:
enabled: true # master switch
controller: aimd # only "aimd" is supported in v1
min: 100 # lower bound (rows)
max: 50000 # upper bound; inert above the source page size
increase_step: 250 # additive growth per clean batch
decrease_factor: 0.5 # multiplicative shrink on error/high latency (0, 1)
cooldown_batches: 5 # batches to skip after a shrink
target_latency_ms: null # optional write-latency target (ms)
latency_window: 10 # rolling window size for p50 latency
error_threshold: 0.01 # per-batch error rate that triggers a shrink
respect_source_max: true # cap at source page size (see Caveats)
log_every: 50 # tracing::info every N adjustments
Key caveats:
- Error-driven shrink requires a
dlq:block. Without one the controller sees no per-row errors; onlytarget_latency_mscan drive shrinks. - Effective ceiling = source page size. In v1 the controller reslices pages
in-memory — it cannot buffer across pages. Setting
maxhigher than the sourcebatch_sizeis harmless but inert. Raise the sourcebatch_sizeto allow bigger write batches. - No-op for per-record sinks.
jsonl,csv, andstdoutwrite one record at a time; the controller adjusts normally but the write granularity is unchanged.
See the Adaptive batching cookbook for a
full worked example, the AIMD trajectory, and the four Prometheus metrics
(faucet_pipeline_adaptive_batch_*).
schedule
Present only when you run faucet schedule. Absent configs are rejected by that
command with a hint to use faucet run instead. All fields except cron are
optional.
schedule:
cron: "0 2 * * *" # REQUIRED. Standard 5-field cron, or 6-field with leading seconds.
timezone: "UTC" # IANA timezone name. Default UTC.
overlap_policy: skip # skip | queue | forbid. Default skip.
max_runs: null # null = run forever; N = exit 0 after N successful runs.
max_consecutive_failures: null # null = never exit on failure; N = exit non-zero after N straight failures.
on_failure: continue # continue | stop. Default continue.
start_immediately: false # Run once on startup before waiting for the first tick. Default false.
run_timeout_secs: null # Per-run wall-clock kill switch (seconds). Timed-out runs count as failed.
shutdown_grace_secs: 30 # SIGTERM: wait this long for the in-flight run before aborting. Default 30.
| Field | Type | Default | Description |
|---|---|---|---|
cron | string | required | 5-field standard Unix cron (MIN HOUR DOM MON DOW) or 6-field with a leading seconds field (SEC MIN HOUR DOM MON DOW). Validated at load time. |
timezone | string | "UTC" | IANA timezone name (e.g. "America/Los_Angeles", "Europe/Berlin"). Affects how the cron expression is interpreted. |
overlap_policy | skip | queue | forbid | skip | What to do when a tick fires while a run is already in flight. skip drops the tick; queue buffers one missed tick (in-memory only, lost on restart); forbid exits non-zero. |
max_runs | integer | null | null | Stop the scheduler cleanly (exit 0) after this many successful runs. null means run forever. 0 is rejected as a config error. |
max_consecutive_failures | integer | null | null | Exit non-zero after this many consecutive failed runs without a success in between. A successful run resets the counter. null means never exit on failures alone. |
on_failure | continue | stop | continue | stop exits non-zero immediately after the first failed run. continue keeps scheduling; use max_consecutive_failures to bound sustained outages. |
start_immediately | bool | false | When true, the first run fires right on startup before the cron clock reaches its first tick. |
run_timeout_secs | integer | null | null | Per-run time limit in seconds. A run that exceeds this is killed and counts as a failure. null means no timeout. |
shutdown_grace_secs | integer | 30 | On SIGTERM/SIGINT, wait this many seconds for the in-flight run to finish before forcibly aborting it. |
Validation: faucet validate pipeline.yaml checks the schedule: block at parse time — bad cron
syntax, unknown timezone names, max_runs: 0, and a cron expression that can never fire all produce
a clear config error: schedule: … message before any run starts.
See the scheduling cookbook for worked examples, the DST/timezone details, the overlap-policy decision tree, and the full Prometheus metric set.
Discovery & env files
run / validate / preview / schedule auto-discover faucet.yaml → .yml → .json in
the current directory, and load a sibling .env unless --no-env-file is given
(--env-file PATH points elsewhere).
The authoritative, exhaustive grammar — including every matrix and template edge case — is in
cli/README.md.
HTTP API reference (faucet serve)
faucet serve exposes a JSON REST control plane for submitting, polling,
listing, cancelling, and streaming the logs of pipeline runs, plus
unauthenticated health and Prometheus endpoints. A machine-readable
docs/openapi.yaml
spec ships alongside this page and is kept in sync with the router by a CI test.
See the serve cookbook for a guided quickstart, the security model, and operational guidance. This page is the endpoint reference.
Authentication
All /v1/* endpoints require Authorization: Bearer <token> unless the server
was started with --no-auth. The token is compared in constant time; the
Authorization header is the only accepted credential (no query-string auth).
/healthz, /readyz, and /metrics are always unauthenticated (probes /
scrapers). OPTIONS preflight bypasses auth so browsers behind a CORS policy
work.
Endpoints
| Method | Path | Success | Notes |
|---|---|---|---|
POST | /v1/runs | 202 | Submit a run; config validated synchronously |
GET | /v1/runs | 200 | List runs (filters below) |
GET | /v1/runs/{id} | 200 | Get one run record |
DELETE | /v1/runs/{id} | 204 | Remove a terminal run from history |
POST | /v1/runs/{id}/cancel | 202 / 200 | Request cancel (202) or no-op if terminal (200) |
GET | /v1/runs/{id}/logs | 200 | Stream the run’s logs as text/event-stream |
GET | /healthz | 200 | Liveness (unauthenticated) |
GET | /readyz | 200/503 | Readiness (unauthenticated) |
GET | /metrics | 200 | Prometheus exposition (unauthenticated) |
POST /v1/runs
Request body:
{
"config": "version: 1\npipeline:\n source: {...}\n sink: {...}\n",
"config_format": "yaml",
"name": "nightly-rollup",
"labels": {"requester": "airflow"},
"timeout_secs": 3600,
"doctor_first": true,
"idempotency_key": "airflow-task-123-attempt-2",
"clock": "2026-05-29T00:00:00Z"
}
config(required) — the YAML or JSON pipeline body.config_format—yaml(default) orjson.name— metadata; also drives the state-key and metric identity (see the cookbook’s cardinality note). Two submissions sharing anameshare replication bookmarks.labels— arbitrary string metadata, stored on the run record only.timeout_secs— wall-clock cap; on expiry the run is marked failed.doctor_first— run preflight probes before executing; on any failure the submit returns422with the doctor report inerror.details.idempotency_key— replay protection (see cookbook).clock— overrides the${now.*}clock for backfills (default: submit time).
Response (202):
{ "run_id": "0192…", "status": "queued", "submitted_at": "2026-05-29T12:00:00Z" }
A --default-config (if the server was started with one) is merged under the
submitted config (submitted values win).
GET /v1/runs
Query parameters: status, name, since, until (RFC3339), limit (default
50, max 500), cursor. Ordering is (submitted_at DESC, run_id DESC); cursor
is the last run_id from the previous page.
{ "runs": [ { "run_id": "…", "status": "completed", … } ], "next_cursor": "0192…" }
GET /v1/runs/{id} → RunRecord
{
"run_id": "0192…",
"name": "nightly-rollup",
"labels": {"requester": "airflow"},
"status": "completed",
"submitted_at": "…", "started_at": "…", "finished_at": "…",
"elapsed_secs": 12.4,
"records_written": 4096,
"invocations": [
{"row_id": "default", "parent_record_key": null, "records_written": 4096, "error": null}
],
"error": null,
"idempotency_key": "airflow-task-123-attempt-2",
"doctor_report": null
}
status is one of queued, running, completed, failed, cancelled.
elapsed_secs is filled live for running runs.
Bookmarks: run records carry record counts + per-row outcomes, not replication bookmarks. Bookmark state is per-row/per-state-key and lives in the configured state backend, not in the run record.
GET /v1/runs/{id}/logs (SSE)
text/event-stream. The server replays the run’s bounded ring buffer, then
streams the live tail. Event types:
event: log— one captured log line (subject to the server’sFAUCET_LOGlevel; secrets are redacted).event: truncated— the reader fell behind and lines were dropped; rely on the centralized log sink for the full history.event: end— the run reached a terminal state; the stream closes.
Log buffers are ephemeral: they survive a short drain window after the run
finishes (independent of run-record retention), then are dropped. A known run
whose buffer has expired yields a single end.
curl -N -H "Authorization: Bearer $TOKEN" \
http://127.0.0.1:8080/v1/runs/0192…/logs
Error envelope
Every error is a JSON ApiError:
{ "error": { "code": "unprocessable", "message": "…", "details": { } } }
| Status | When |
|---|---|
400 | Malformed body / parse / interpolation failure; a schedule: block in the config |
401 | Missing/invalid bearer token |
404 | Unknown run_id |
409 | DELETE on a running run; idempotency key reused with a different payload |
413 | Body exceeds --body-limit-bytes |
422 | Expand/validation failure; doctor_first failed (report in details) |
429 | Run queue full (carries Retry-After) |
500 | Internal error |
Metrics
/metrics serves the standard faucet_* pipeline metrics plus serve-specific
series: faucet_serve_requests_total{method,path,status},
faucet_serve_request_duration_seconds{method,path}, faucet_serve_runs_queued,
faucet_serve_runs_in_flight, faucet_serve_runs_total{status,reason},
faucet_serve_idempotency_hits_total, and faucet_serve_history_degraded. See
Observability.
Deploying faucet
faucet runs pipelines to completion — it’s not a long-running daemon. That makes deployment simple: schedule the binary, point it at a config, and let it exit. Durable state (bookmarks) lets the next run pick up where the last left off.
Patterns
Cron / scheduled jobs
The most common deployment. Run on an interval; incremental replication + a durable state store mean each run only fetches what’s new.
# crontab: every 15 minutes
*/15 * * * * faucet run /etc/faucet/events.yaml >> /var/log/faucet.log 2>&1
Containers
Build a slim image with only the connectors you need, and supply config via a mounted file or entirely from the environment:
FROM rust:slim AS build
RUN cargo install faucet-cli --no-default-features \
--features "source-rest,sink-bigquery,state-postgres,observability"
FROM debian:stable-slim
COPY --from=build /usr/local/cargo/bin/faucet /usr/local/bin/faucet
ENTRYPOINT ["faucet", "run"]
With faucet run --from-env you can drive the whole pipeline from FAUCET_*
environment variables — no config file in the image. See the
CLI reference.
Kubernetes CronJob
Wrap the container above in a CronJob. Use the postgres or redis state
backend so bookmarks survive pod restarts, and scrape the metrics endpoint (see
Observability).
Secrets
Never commit secrets. Use ${env:VAR} / ${file:PATH} in the config and inject
real values through your platform’s secret mechanism (Kubernetes secrets, Docker
secrets, a mounted .env, etc.).
Exit codes & retries
faucet run exits non-zero when a pipeline fails (subject to the
execution.on_error policy and any DLQ). Let your scheduler’s retry/alert
mechanism react to a non-zero exit; because bookmarks only advance after the sink
confirms, a retried run resumes safely.
Observability
Every source, sink, transform, and state-store operation is automatically wrapped
to emit tracing spans and metrics counters/histograms. Connector authors
write no observability code — they only override connector_name() for a
friendly label.
Enabling the Prometheus endpoint
The CLI’s observability feature (on by default in the full build) installs a
Prometheus exporter. Configure it from the pipeline config or environment; once
running, scrape the listen address with Prometheus.
Common labels
pipeline, row (matrix row id; empty for non-matrix runs), and connector
(from connector_name()). run_id is a span attribute only — it’s high
cardinality and never a Prometheus label.
Key metrics
- Source:
faucet_source_records_total,faucet_source_errors_total{kind},faucet_source_page_duration_seconds,faucet_source_in_flight. - Sink:
faucet_sink_records_total,faucet_sink_writes_total,faucet_sink_errors_total,faucet_sink_write_duration_seconds,faucet_sink_flush_duration_seconds,faucet_sink_in_flight. - Transform:
faucet_transform_records_in_total,faucet_transform_records_out_total(use theout/inratio for filter drop rate or explode fan-out),faucet_transform_errors_total{kind},faucet_transform_duration_seconds. - State:
faucet_state_{get,put,delete}_total(get carriesoutcome=hit|miss),faucet_state_errors_total{op,kind}, plus duration histograms. - Pipeline:
faucet_pipeline_runs_total{status=ok|err,kind},faucet_pipeline_run_duration_seconds,faucet_pipeline_in_flight,faucet_pipeline_seconds_since_last_bookmark,faucet_pipeline_last_bookmark_unix_seconds. - Build:
faucet_build_info{version}is set to1—group_leftit onto other metrics to annotate dashboards with the running version.
Reliability properties
- Drop-guard timers sample durations even when a task is cancelled.
- Panic isolation — a panicking connector surfaces as a
Panicerror kind rather than crashing the process. - Idempotent install — installing the recorder/subscriber twice warns rather than panics.
Cardinality rules
Never use high-cardinality values (record ids, URLs, query strings) as metric
labels. parent_record_key in a DAG is a span attribute only. Connector authors
must return a non-empty &'static str from connector_name().
Tracing
Spans carry run_id, pipeline, row, and per-operation timing. Point a
tracing subscriber at your logging/trace backend; control verbosity with
--log-level or FAUCET_LOG.
Full design:
docs/superpowers/specs/2026-05-23-observability-otel-prometheus-design.md.
Performance tuning
faucet is built to be fast by default, but a few knobs let you trade memory for throughput on a given pipeline.
batch_size
The single most important knob. It bounds how many records are buffered and sets
each sink’s natural write unit (multi-row INSERT, _bulk body, insertAll
request, Redis pipeline, …).
- Default: 1000. Max: 1,000,000.
- Larger = fewer, bigger requests = more throughput, more memory per batch.
batch_size: 0= “no batching”: the source emits the whole result set in one page and the sink writes it in one request. Use it for small lookup tables, or for sinks that prefer one large request (load-job-style ingestion).
Set it on the sink (authoritative) and/or source. Streaming keeps memory at O(batch_size) on both sides regardless of total volume.
Connection pooling
Database connectors use configurable pools — max_connections defaults to 10 for
sources and 5 for sinks. Raise it for highly concurrent workloads; keep it under
your database’s connection limit.
Concurrency
- The REST source can process partitions concurrently (
partition_concurrency). - S3/GCS sources and sinks read/write objects in parallel
(
buffer_unordered-style concurrency). - The HTTP sink sends per-record requests concurrently under a semaphore.
- Kafka sink uses
FuturesUnorderedbatched sends with QueueFull retry. - At the pipeline level,
execution.max_concurrentbounds how many matrix rows run at once.
Retries
HTTP-based sources retry with exponential backoff + jitter on retriable failures.
The backoff is capped at 60s and its jitter is decorrelated across concurrent
retries (so a fleet of matrix rows doesn’t re-align into a thundering herd). The
REST source additionally honors 429 / Retry-After (delta-seconds or an
RFC 7231 HTTP-date). Tune max_retries and retry_backoff per connector. A
permanently throttled endpoint surfaces a RateLimited error rather than hanging.
Choosing values
- Many small rows, row-oriented sink (Postgres/MySQL): larger
batch_size(5k–50k) for fat multi-row INSERTs. - Large objects (Parquet/S3): moderate
batch_size; lean on parallel I/O. - Tiny lookup tables / COPY-style loads:
batch_size: 0. - Memory-constrained host: smaller
batch_sizeto cap per-batch footprint.
Measure with the metrics — faucet_sink_write_duration_seconds
and faucet_source_page_duration_seconds tell you where time goes.
Benchmarks
faucet-core ships a criterion
benchmark of the observability hot path, and CI guards it against a 5% regression
on every PR. Run it locally with:
cargo bench -p faucet-core --bench observability
Numbers are hardware-dependent, so run the benchmark on your target machine rather than relying on published figures.
Troubleshooting & FAQ
My config won’t parse / validate
Run faucet validate <config> — it reports one line per expanded row. Common
causes:
versionmissing or not1— the top-levelversion: 1is required.- Old top-level
source:/sink:— these must live underpipeline:. faucet rejects the pre-pipeline:shape with a hint. - Unknown connector
type— runfaucet listto see what’s compiled in; you may have a slim build without that feature. InterpolationCycle— a${vars.X}/ template reference forms a loop.
A ${env:VAR} isn’t being substituted
Load-time interpolation reads the environment and a sibling .env. If the value
is empty, the var isn’t set (or --no-env-file disabled the .env). Use
--env-file PATH to point at a specific file.
“feature not enabled” / connector missing
Your binary was built without that connector. Reinstall with the feature:
cargo install faucet-cli --features "source-foo,sink-bar", or use the full
build (the default cargo install faucet-cli).
docs.rs shows fewer APIs than I expected
It shouldn’t anymore — every crate is configured to build with all features. If you’re looking at an old version, check the latest release.
Kafka connector fails to build
The Kafka crates build librdkafka, which needs cmake and a C toolchain. Make
sure those are installed in your build environment (CI installs
libsasl2-dev libssl-dev libcurl4-openssl-dev cmake build-essential).
Postgres CDC retains a lot of WAL
A CDC replication slot retains WAL until a run advances the bookmark. If you
created a permanent slot and stopped running the pipeline, Postgres keeps WAL
forever. Either run the pipeline regularly, drop the slot
(PostgresCdcSource::drop_slot() or SELECT pg_drop_replication_slot(...)), or
use slot_type: temporary for experiments. See the
CDC tutorial.
Some records failed but I don’t want the run to abort
Attach a dead-letter queue so failing rows are captured and the rest commit.
Run is slower / using more memory than expected
Tune batch_size and concurrency — see Performance tuning. Use
the metrics to find the bottleneck.
Where do I report a bug or request a connector?
Open an issue at github.com/PawanSikawat/faucet-stream/issues.
Authoring a connector
faucet-stream is designed as an ecosystem: third parties can publish their own
faucet-source-* / faucet-sink-* crates with minimal friction. faucet-core
is the only required dependency — it re-exports everything a connector author
needs (async_trait, serde_json, schemars).
The traits
Implement Source or Sink. Both are object-safe (Box<dyn Source> works) and
all newer methods have defaults, so a minimal connector is small.
use faucet_core::{async_trait, Source, Sink, FaucetError, Value};
struct MySource { /* reusable client/pool created in new() */ }
#[async_trait]
impl Source for MySource {
// Primary entry point. (`fetch_all()` is a provided convenience.)
async fn fetch_with_context(&self) -> Result<Vec<Value>, FaucetError> {
todo!("fetch records from your system")
}
}
struct MySink { /* reusable client/pool */ }
#[async_trait]
impl Sink for MySink {
async fn write_batch(&self, records: &[Value]) -> Result<usize, FaucetError> {
todo!("write records to your system")
}
}
Your connector now works with the Pipeline and every other connector:
Pipeline::new(&MySource { .. }, &MySink { .. }).run().await?.
Crate layout
Follow the same module layout as the built-in connectors:
lib.rs— re-export the config + theSource/Sinktype. First line:#.config.rs— the config struct + sub-enums, derivingSerialize + Deserialize + JsonSchema. No I/O here.stream.rs(source) /sink.rs(sink) — the one place that performs I/O. Create reusable clients/pools innew()and store them; never reconnect per call.
Make it fast
Performance is the project’s first principle. Reuse clients and connections,
pool database connections, use multi-row inserts and bulk APIs, and prefer
parallel I/O. Where it makes sense, override stream_pages to stream natively
from your source’s paging primitive so memory stays bounded.
Config schema introspection
Implement config_schema() so faucet schema and faucet init work:
fn config_schema(&self) -> Value {
faucet_core::schema_for!(MyConfig).into()
}
Derive JsonSchema on the config struct and all sub-types, and add
#[schemars(with = "String")] for any custom-serde fields.
Errors
Map every failure to a FaucetError variant. Third-party error types wrap into
FaucetError::Custom(Box<dyn Error + Send + Sync>) without losing the chain.
Never .unwrap() on anything that can fail at runtime.
docs.rs setup
So docs.rs renders your full API with per-feature badges, add to Cargo.toml:
[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs"]
and make the first line of lib.rs #![cfg_attr(docsrs, feature(doc_cfg))].
Naming & publishing
Name crates faucet-source-<name> / faucet-sink-<name>. If you ship both a
source and a sink for the same system, put shared types (auth, formats) in a
faucet-common-<name> crate that both depend on and re-export.
See any built-in connector (e.g.
faucet-source-rest) for a reference implementation.