Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

faucet-stream logo

faucet-stream

The fast, config-driven way to move data in Rust.

faucet-stream wires 21 source and 17 sink connectors together with a single faucet binary that runs pipelines declaratively from a YAML/JSON file — no Rust code required. Or skip the binary and embed the same engine in your own service through the typed Source / Sink traits.

cargo install faucet-cli
faucet init my_pipeline --source postgres --sink bigquery
faucet validate pipeline.yaml
faucet run pipeline.yaml

What you get

  • Fast and reliable by default — native streaming with bounded memory, connection pooling, multi-row inserts, bulk APIs, and parallel I/O.
  • Config-driven or embeddable — run faucet run pipeline.yaml, or call Pipeline::new(&source, &sink).run().await? from Rust.
  • A runtime, not just connectors — incremental + resumable replication, PostgreSQL change-data-capture, dead-letter queues, automatic retries, and built-in Prometheus metrics + tracing spans, all with zero per-connector code.
  • Pay only for what you use — every connector is a Cargo feature.

How this book is organized

  • Getting Started — install, run your first pipeline in five minutes, and learn the core concepts.
  • Tutorials — end-to-end walkthroughs of real pipelines (incremental REST → BigQuery, Postgres CDC, DAGs, embedding).
  • Cookbook — short, task-oriented recipes for pagination, auth, state, dead-letter queues, and compression.
  • Reference — the connector catalog, CLI commands, and config-file grammar.
  • Operations — deploying, observability, performance tuning, and troubleshooting.
  • Extending — author and publish your own faucet-source-* / faucet-sink-* crate.

Where else to look

Installation

The faucet CLI

The CLI is the fastest way to start. Install it from crates.io:

cargo install faucet-cli

This gives you a faucet binary with every first-party connector compiled in, so it can run any of the published example configs out of the box.

Slim builds

Every connector is a Cargo feature, so you can build a smaller binary with only what you need:

cargo install faucet-cli --no-default-features \
  --features "source-rest,sink-jsonl,sink-stdout,transforms"

Run faucet list to see which sources and sinks are compiled into your binary.

The library

To embed pipelines in your own Rust program, depend on the umbrella crate and enable the connectors you need:

[dependencies]
# Default features include the REST source only.
faucet-stream = "1.0"

# Or enable specific connectors:
faucet-stream = { version = "1.0", features = ["source-rest", "sink-postgres", "sink-s3"] }

# Or everything:
faucet-stream = { version = "1.0", features = ["full"] }

Feature groups: source (all sources), sink (all sinks), state (all state-store backends), full (everything), and compression (gzip/zstd on the file-shaped connectors you’ve enabled).

You can also depend on individual connector crates directly (faucet-source-rest, faucet-sink-bigquery, …) — each depends only on faucet-core.

Requirements

  • A recent stable Rust toolchain (see the repo’s rust-toolchain.toml for the current MSRV).
  • Some connectors link native libraries — the Kafka connectors build librdkafka and need cmake and a C toolchain available at compile time.

Next: run your first pipeline.

Your first pipeline

This walkthrough moves a local CSV file to JSON Lines — no external services required, so it works immediately after cargo install faucet-cli.

1. Create some input

mkdir -p data out
cat > data/input.csv <<'CSV'
id,name,city
1,Ada,London
2,Grace,New York
3,Linus,Helsinki
CSV

2. Write a config

Create pipeline.yaml:

version: 1
name: csv_to_jsonl

pipeline:
  source:
    type: csv
    config:
      path: ./data/input.csv
  sink:
    type: jsonl
    config:
      path: ./out/records.jsonl

faucet run auto-discovers a faucet.yaml / faucet.yml / faucet.json in the current directory (and a sibling .env), so you can also name the file faucet.yaml and just run faucet run.

3. Validate, then run

faucet validate pipeline.yaml
faucet run pipeline.yaml
$ cat out/records.jsonl
{"id":"1","name":"Ada","city":"London"}
{"id":"2","name":"Grace","city":"New York"}
{"id":"3","name":"Linus","city":"Helsinki"}

4. Preview without writing

To see what a source emits without touching a sink, use preview — it runs the source and prints records to stdout:

faucet preview pipeline.yaml --limit 5

5. Scaffold from a connector’s schema

faucet init generates a commented config skeleton from any connector’s JSON schema, marking required fields and commenting out optional ones:

faucet init my_pipeline --source rest --sink postgres

Add a transform

Insert a transforms: list between source and sink to reshape records. For example, normalize keys to snake_case:

pipeline:
  source: { type: csv, config: { path: ./data/input.csv } }
  transforms:
    - type: snake_case
  sink: { type: jsonl, config: { path: ./out/records.jsonl } }

Built-in config transforms are flatten, rename_keys, and snake_case.

Next: core concepts.

Core concepts

faucet-stream is built from a handful of small pieces. Understanding them makes both the YAML config and the Rust API obvious.

Source

A source fetches records from an external system (a REST API, a database, a Kafka topic, an object store, …) and yields them as JSON values. Sources stream in batches via stream_pages, so memory stays bounded no matter how much data flows through.

Sink

A sink writes records to an external system. Sinks accept batches and most expose a batch_size knob that controls the natural unit of work (a multi-row INSERT, a _bulk body, an insertAll request, and so on).

Transform

An optional transform reshapes each record between source and sink. The config-exposed transforms are flatten, rename_keys, and snake_case; additional custom transforms are available from Rust.

Pipeline

The pipeline connects a source to a sink. It drives the source’s stream_pages, applies transforms, and writes each page to the sink as it arrives — then flushes and records progress. Memory is bounded at one batch_size page on both sides regardless of total volume.

let result = Pipeline::new(&source, &sink).run().await?;

State store & bookmarks

For incremental and resumable runs, a state store persists a bookmark after each page the sink confirms. On the next run the source resumes from that bookmark. Built-in backends are memory and file (in faucet-core); redis and postgres backends live in their own crates.

This is what makes change-data-capture safe: the PostgreSQL CDC source only tells Postgres it can recycle write-ahead log up to a bookmark that has actually been persisted.

Dead-letter queue (DLQ)

A pipeline can attach a DLQ sink. When a sink reports per-row failures, the failing rows are wrapped in a fixed-shape envelope and routed to the DLQ before the page’s bookmark advances — so a few bad records don’t abort the whole run. The on_batch_error policy (propagate vs dlq_all) decides what happens when a sink can’t report per-row results.

Matrix & DAGs

A single config can fan out into many invocations with a matrix: block — either independent rows or a parent/child DAG where a child runs once per record produced by its parent. See the matrix DAG tutorial.

Observability

Every source, sink, transform, and state operation is automatically wrapped to emit tracing spans and metrics counters/histograms — no per-connector code. See Observability.

REST API → BigQuery (incremental)

This tutorial pulls records from a paginated REST API and streams them into a BigQuery table, then converts it to an incremental pipeline that only fetches new rows on each run.

Full-table version

version: 1
name: rest_to_bigquery

pipeline:
  source:
    type: rest
    config:
      base_url: https://api.example.com
      path: /v1/events
      method: GET
      name: events
      auth:
        type: basic
        config:
          username: ${env:API_USER}
          password: ${env:API_PASS}
      records_path: $.events[*]
      pagination:
        type: PageNumber
        param_name: page
        start_page: 1
        page_size: 500
        page_size_param: per_page
      max_pages: 200
      timeout: 45
      max_retries: 5
      retry_backoff: 2
      tolerated_http_errors: [404]
      replication_method:
        type: FullTable
      primary_keys: [event_id]
      schema_sample_size: 100

  sink:
    type: bigquery
    config:
      project_id: my-gcp-project
      dataset_id: analytics
      table_id: events
      auth:
        type: service_account_key_path
        config:
          path: service-account.json
      batch_size: 1000

Secrets come from the environment via ${env:VAR} — keep credentials out of the config file. Put them in a sibling .env or export them before running.

export API_USER=… API_PASS=…
faucet run rest_to_bigquery.yaml

The records_path is a JSONPath that selects the array of records inside each response body; pagination walks pages until an empty page or max_pages. See the pagination cookbook for the other styles.

Make it incremental

Switch replication_method from FullTable to a key-based incremental method and attach a state store so progress survives between runs:

pipeline:
  source:
    type: rest
    config:
      # … as above …
      replication_method:
        type: Incremental
        cursor_field: updated_at
      primary_keys: [event_id]
  sink:
    # … as above …
  state:
    type: file
    config:
      path: ./state

Now each run records the maximum updated_at it saw; the next run resumes from that bookmark. Swap the file state store for redis or postgres for shared, durable state across machines — see state.

Tip: run faucet schema source rest and faucet schema sink bigquery to see every available config field with its type and default.

PostgreSQL CDC → JSONL

Change data capture (CDC) streams every INSERT/UPDATE/DELETE from a PostgreSQL table by reading its write-ahead log via logical replication — no polling, no updated_at column required.

Prepare Postgres

CDC needs logical replication enabled (wal_level = logical) and a publication for the tables you want to follow:

CREATE TABLE IF NOT EXISTS users (id int4 PRIMARY KEY, name text);
CREATE PUBLICATION faucet_pub FOR TABLE users;

The bundled examples/docker-compose.yml starts a Postgres already configured for logical replication.

Config

version: 1
pipeline:
  source:
    type: postgres-cdc
    config:
      connection_url: postgres://faucet:faucet@localhost:5432/appdb
      slot_name: faucet_slot
      publication_name: faucet_pub
      create_slot_if_missing: true
      idle_timeout: 30
  sink:
    type: jsonl
    config:
      path: ./out/changes.jsonl
      append: true
  state:
    type: file
    config:
      path: ./state
faucet run postgres_cdc_to_jsonl.yaml

Open a psql session and INSERT/UPDATE/DELETE some rows — the connector drains them every fetch cycle until idle_timeout fires.

Why the state store matters here

The CDC source advances Postgres’s confirmed_flush_lsn (the point up to which Postgres may recycle WAL) only from a durable bookmark — i.e. after the pipeline has persisted the position. It never confirms WAL for changes that haven’t been written to the sink. That means a crash mid-run cannot lose data: on restart the source resumes from the last persisted bookmark. The tradeoff is that WAL is retained until the next run advances the bookmark, so don’t point a CDC slot at a table and then never run it.

The state key is postgres-cdc:<slot>. Use a durable backend (redis / postgres) in production so the bookmark survives the loss of the local disk.

Slot lifecycle

  • slot_type: temporary drops the slot when the connection closes — good for experiments. permanent (the default) keeps it, which retains WAL until you drop it.
  • Free an abandoned slot’s WAL with PostgresCdcSource::drop_slot() (library) or by dropping the replication slot in Postgres.
  • tls: disable | require | verify_ca | verify_full configures the replication connection (default disable = plaintext; use verify_full over untrusted networks).

Multi-pipeline DAGs with matrix

A single config can drive many pipeline invocations. The matrix: block lists rows that are each deep-merged onto the base pipeline:. Rows can be independent (fan-out) or form a parent/child DAG where a child runs once per record the parent produced.

Independent fan-out

Each row overrides part of the pipeline and runs independently, bounded by execution.max_concurrent:

version: 1
name: multi_region
pipeline:
  source: { type: rest, config: { base_url: https://api.example.com, method: GET } }
  sink:   { type: jsonl, config: {} }
execution:
  max_concurrent: 4
  on_error: continue   # or `stop`
matrix:
  - id: us
    source: { config: { path: /v1/us/events } }
    sink:   { config: { path: us.jsonl } }
  - id: eu
    source: { config: { path: /v1/eu/events } }
    sink:   { config: { path: eu.jsonl } }

Parent/child DAG

A row with parent: runs once per record produced by the parent. Tokens like ${parent_id.dotted.path} are resolved per parent record at runtime:

version: 1
name: dag_users_posts
pipeline:
  source: { type: rest, config: { base_url: https://api.example.com, method: GET, records_path: $.data[*] } }
  sink:   { type: jsonl, config: { append: false } }
matrix:
  # Root: fetch the users list once.
  - id: users
    source: { config: { path: /v1/users, name: users } }
    sink:   { config: { path: users.jsonl } }
  # Child: for each user record, fetch that user's posts.
  - id: posts
    parent: users
    parent_key: id
    source: { config: { path: /v1/users/${users.id}/posts, name: posts } }
    sink:   { config: { path: posts-${users.id}.jsonl } }

The child’s state key is suffixed with the parent record’s key, so each per-user fetch resumes independently.

Merge semantics

A row is deep-merged onto the base pipeline: scalars replace, objects merge recursively, and arrays replace wholesale. That single rule defines all override behavior.

Named templates (DRY)

For many heterogeneous rows, define reusable source/sink templates under pipeline.sources / pipeline.sinks and a top-level vars: block, then select them per row with ref:. See cli/README.md for the full grammar.

Error handling

execution.on_error: continue lets sibling subtrees finish when one fails (the failed subtree is skipped); stop aborts pending and in-flight work on the first failure. stop cancels in-flight tasks at their next await, which can leave partial sink state — acceptable for idempotent sinks, something to know for others.

Embedding faucet as a Rust library

The faucet CLI is a thin wrapper over the same library you can use directly. Embedding gives you typed configs, compile-time connector selection, and the ability to build a Source or Sink from your own code.

Add the dependency

[dependencies]
faucet-stream = { version = "1.0", features = ["source-rest", "sink-bigquery"] }
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }

Build and run a pipeline

use faucet_stream::source::rest::{RestStream, RestStreamConfig, Auth, PaginationStyle};
use faucet_stream::sink::bigquery::{BigQuerySink, BigQuerySinkConfig};
use faucet_stream::Pipeline;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let source = RestStream::new(RestStreamConfig {
        base_url: "https://api.example.com".into(),
        path: "/v1/events".into(),
        auth: Auth::Bearer { token: std::env::var("API_TOKEN")? },
        ..Default::default()
    })?;

    let sink = BigQuerySink::new(/* BigQuerySinkConfig { .. } */).await?;

    let result = Pipeline::new(&source, &sink).run().await?;
    println!("moved {} records", result.records_written);
    Ok(())
}

Exact field names and constructors are documented per crate on docs.rs (rendered with all features, so every connector’s API is visible). Treat the snippet above as the shape, not the literal field list.

Applying transforms

faucet_stream::TransformingSource is the library entry point for attaching transforms to any source. It wraps a Box<dyn Source> with a flat list of RecordTransforms applied to every record emitted via fetch_* and stream_pages.

use faucet_stream::{
    KeyCaseMode, Labels, RecordTransform, Source, TransformingSource,
};

let inner: Box<dyn Source> = Box::new(my_source);
let source = TransformingSource::new(
    inner,
    vec![
        RecordTransform::Flatten { separator: "__".into() },
        RecordTransform::KeysCase { mode: KeyCaseMode::Snake },
        RecordTransform::custom(|mut record| {
            if let serde_json::Value::Object(ref mut map) = record {
                map.insert("_ingested_at".into(), serde_json::json!("2026-05-28T00:00:00Z"));
            }
            record
        }),
    ],
    Labels::for_named("my-source"),
)?;
// `source` is now a `Source` that streams the inner source's pages with
// transforms applied per page — memory stays bounded by `batch_size` even on
// large result sets.

Transforms compile eagerly inside new() — an invalid regex in RenameKeys surfaces immediately as FaucetError::Transform, not at first record.

Labels::for_named(name) is the convenient constructor for library callers (the CLI uses its own Labels carrying the pipeline / row / run-id triple). The wrapper emits faucet_transform_records_in_total / faucet_transform_records_out_total (use the out/in ratio for filter drop rate or explode fan-out), faucet_transform_duration_seconds, and faucet_transform_errors_total per page through the standard observability stack.

For configuration-driven users (the faucet binary), transforms are declared in YAML — see the transforms cookbook for the three-layer model and per-layer opt-out.

Durable state and streaming

Wire a state store for resumable runs, and use the streaming entry point when you want to control batching explicitly:

use std::sync::Arc;
use faucet_stream::{Pipeline, FileStateStore};

let state = Arc::new(FileStateStore::new("./state")?);
let result = Pipeline::new(&source, &sink)
    .with_state_store(state)
    .run()
    .await?;

The pipeline reads the bookmark before fetching and persists a new one only after the sink confirms each page — so a crash never loses unwritten data.

Why embed instead of shelling out to the CLI?

  • Typed configs — config structs implement serde + JsonSchema, so you get compile-time checking and can generate UIs/forms from the schema.
  • Custom connectors — implement the Source / Sink traits for systems we don’t ship, and run them through the same Pipeline. See authoring a connector.
  • One process — no subprocess, no temp config files; integrate pipelines into an existing service, job runner, or test harness.

Pagination styles (REST source)

The REST source walks multi-page responses automatically. Set pagination.type to one of the styles below. max_pages is a hard cap across all of them, and every style has a loop/termination guard so a misbehaving API can’t loop forever.

StyleStops when
Noneafter the first page
Cursorthe next-token JSONPath is null/absent (or repeats)
PageNumbera page returns zero records (or an identical body repeats)
Offsetthe offset reaches total (via total_path) or a short page arrives
LinkHeaderthere’s no rel="next" in the Link response header
NextLinkInBodythe next-page URL in the body is absent, null, or empty

Cursor

pagination:
  type: Cursor
  next_token_path: $.meta.next_cursor  # JSONPath to the next-page token
  param_name: starting_after           # query param to send it back as

Page number

pagination:
  type: PageNumber
  param_name: page
  start_page: 1
  page_size: 500
  page_size_param: per_page

Offset / limit

pagination:
  type: Offset
  limit: 1000
  limit_param: limit
  offset_param: offset
  total_path: $.meta.total             # optional; enables an exact stop
pagination:
  type: LinkHeader      # follows the RFC 5988 `Link: <…>; rel="next"` header
pagination:
  type: NextLinkInBody
  next_link_path: $.links.next         # JSONPath to the absolute next-page URL

Use faucet schema source rest to see the exact fields and defaults for each style in your installed version.

Authentication

Every connector’s auth: block uses one consistent shape — a type: discriminator plus a nested config: map:

auth:
  type: <method>
  config:
    <method-specific fields>

Always pull secrets from the environment with ${env:VAR} (or ${file:PATH} / ${secret:VAR}) rather than hard-coding them.

API key / header

auth:
  type: api_key
  config:
    header: Authorization
    value: "Bearer ${env:API_TOKEN}"

Bearer token

auth:
  type: bearer
  config:
    token: ${env:API_TOKEN}

Basic auth

auth:
  type: basic
  config:
    username: ${env:API_USER}
    password: ${env:API_PASS}

OAuth2 client credentials

The source fetches and refreshes the token automatically (before expiry):

auth:
  type: oauth2
  config:
    token_url: https://auth.example.com/oauth/token
    client_id: ${env:CLIENT_ID}
    client_secret: ${env:CLIENT_SECRET}
    scopes: ["read:events"]

Custom token endpoint

For non-standard token endpoints, token_endpoint lets you describe the request and point at the access-token and expiry fields in the response. See faucet schema source rest for the full field list.

Shared auth providers (auth: { ref })

When several connectors authenticate against the same system — e.g. four matrix rows reading four endpoints of one API, or four Snowflake tables — define the credential once in the top-level auth: catalog and reference it with auth: { ref: <name> }. faucet builds a single provider and shares it across every row, so there is one token fetch and one refresh cycle (single-flight) instead of each row racing to refresh a single-active / rotating token:

auth:
  api:
    type: oauth2_refresh        # rotating refresh token captured centrally
    config:
      token_url: ${env:API_TOKEN_URL}
      client_id: ${secret:API_CLIENT_ID}
      client_secret: ${secret:API_CLIENT_SECRET}
      refresh_token: ${secret:API_REFRESH_TOKEN}
pipeline:
  sources:
    ep:
      type: rest
      config:
        base_url: ${env:API_BASE_URL}
        auth: { ref: api }      # every row sharing this template shares ONE token
  sink: { type: stdout, config: {} }
matrix:
  - { id: customers, source: { ref: ep, config: { path: /customers } } }
  - { id: orders,    source: { ref: ep, config: { path: /orders } } }

Provider type: values (catalog only): static, oauth2 (client-credentials), oauth2_refresh (with rotation), token_endpoint. A connector’s auth: is either an inline definition or a { ref } — never both. See cli/examples/shared_auth_rest.yaml for a full four-row example.

Shared providers are supported by the bearer/header-based connectors (rest, graphql, xml, grpc, websocket, http sink, elasticsearch, snowflake-OAuth).

Library use: build one faucet_auth provider, wrap it in an Arc, and pass it to each source/sink with .with_auth_provider(provider.clone()).

Connector-specific inline auth

Each connector also has its own inline auth methods, all under the auth: key and all in { type, config } form:

  • BigQueryservice_account_key_path, service_account_key (inline JSON), or application_default.
  • Snowflakekey_pair (JWT) or oauth.
  • Kafkasasl_plain / sasl_scram / ssl / sasl_ssl.
  • Elasticsearchbasic, api_key, bearer, or none.
  • GCSservice_account_json_file, service_account_json_inline, application_default, or anonymous.

Inspect any connector’s auth shape with faucet schema source <name> / faucet schema sink <name>.

Secret interpolation

${env:VAR} and ${file:PATH} are resolved at config-load time, so secrets never need to appear in the file. A sibling .env is loaded automatically (use --no-env-file to disable, or --env-file PATH to point elsewhere).

Incremental replication & state

For pipelines that run repeatedly, you usually want to fetch only what’s new. That requires two things: an incremental replication method on the source and a state store to persist the bookmark between runs.

Replication methods

  • FullTable — fetch everything every run.
  • Incremental — track a high-water mark on a cursor_field (e.g. updated_at, an auto-increment id) and only emit records past the last seen value.
source:
  type: rest
  config:
    # …
    replication_method:
      type: Incremental
      cursor_field: updated_at
    primary_keys: [id]

State stores

Attach a state: block so the bookmark survives between runs:

state:
  type: file          # built into faucet-core
  config:
    path: ./state

Available backends:

BackendCrateUse when
memoryfaucet-coretests, one-shot runs (not persistent)
filefaucet-coresingle host; one JSON file per key, atomic writes
redisfaucet-state-redisshared/ephemeral state across hosts
postgresfaucet-state-postgresshared, durable, transactional state
# Redis
state:
  type: redis
  config:
    connection_url: redis://localhost:6379
    namespace: faucet

# Postgres
state:
  type: postgres
  config:
    connection_url: postgres://user:pass@localhost/faucet

How bookmarks advance

The pipeline reads the bookmark before fetching, and persists a new one only after the sink confirms the page. Most sources emit a bookmark on the final page; CDC-style sources emit one per committed transaction and get per-transaction durability automatically. Either way, a crash can never advance the bookmark past data that wasn’t written — the next run re-fetches from the last confirmed point.

State keys

Each invocation has a state key so concurrent matrix rows don’t collide: {name}::{row_id} for roots and {name}::{row_id}::{parent_record_key} for DAG children. The CDC source uses postgres-cdc:<slot>.

Dead-letter queues

A dead-letter queue (DLQ) keeps a pipeline running when a handful of records fail to write, instead of aborting the whole run. Failing rows are wrapped in a fixed-shape envelope and routed to a separate DLQ sink before the page’s bookmark advances.

When it helps

Sinks whose underlying API reports per-row results — BigQuery insertAll, Elasticsearch _bulk — can tell exactly which records failed. The DLQ captures just those, while the good rows commit normally.

Configure a DLQ

Add a dlq: block naming a sink to receive the bad rows and the policy for sinks that can’t report per-row outcomes:

pipeline:
  source: { type: rest, config: { /* … */ } }
  sink:   { type: bigquery, config: { /* … */ } }
  dlq:
    on_batch_error: dlq_all      # or `propagate`
    sink:
      type: jsonl
      config:
        path: ./dead-letters.jsonl

The envelope

Each dead-lettered record is wrapped with metadata — the original record, the reason it failed, and context — so you can inspect, fix, and replay it later.

on_batch_error policy

For a sink that can only succeed or fail a whole batch (no per-row detail):

  • propagate — a batch failure aborts the run (the default, fail-fast behavior).
  • dlq_all — route every row in the failed batch to the DLQ and keep going.

Sinks that do report per-row results (BigQuery, Elasticsearch, and the HTTP sink in Individual mode) override the partial-write path so only the genuinely failed rows are dead-lettered — the already-delivered rows are not duplicated into the DLQ.

Failure budgets

A DLQ keeps a run going through occasional bad rows, but a flood of failures usually means something is broken upstream. Two optional budgets turn the DLQ into a circuit breaker:

  dlq:
    sink: { type: jsonl, config: { path: ./dead-letters.jsonl } }
    max_failures_per_page: 50    # abort if a single page dead-letters > 50 rows
    max_failures_total: 500      # abort once the run has dead-lettered > 500 rows

When a budget trips, the run aborts — but only after the page that crossed the threshold is fully committed: its surviving rows are written to the main sink, its failed rows are routed to the DLQ, and (if the page carried one) the bookmark advances. So the committed survivors are not re-delivered when you fix the upstream problem and re-run, and the failed rows are preserved in the DLQ for replay rather than dropped. The run still stops, so you get alerted.

The full design is in docs/superpowers/specs/2026-05-24-dlq-design.md and the faucet_core::dlq module on docs.rs.

Data-quality checks

Add a quality: block under pipeline: to assert invariants on every page of records as they flow through the pipeline. The quality pass runs after transforms and before the sink write:

  1. Per-record checks partition the page into survivors and quarantined rows (first-failure-wins per record).
  2. Per-batch checks run over the survivors.
  3. Quarantined rows are routed to the DLQ sink; survivors flow to the main sink.
  4. The page bookmark advances only after the sink confirms — an abort never commits partial progress.

Quality checks require the quality Cargo feature (included in full and in faucet-cli’s default build). The json_schema check additionally requires quality-jsonschema.

Full example

The following config fetches users from a REST API, normalises keys to snake_case, and enforces several quality invariants before writing survivors to PostgreSQL. Quarantined rows land in a local JSONL file.

# rest_to_postgres_with_quality.yaml
version: 1
name: users_api_to_postgres_with_quality

pipeline:
  source:
    type: rest
    config:
      base_url: https://api.example.com/v1
      path: /users
      method: GET
      auth:
        type: bearer
        config:
          token: ${env:API_TOKEN}
      query_params:
        per_page: "100"
      pagination:
        type: Cursor
        next_token_path: $.meta.next_cursor
        param_name: cursor
      max_retries: 3
      retry_backoff: 2
      tolerated_http_errors: []
      replication_method:
        type: Incremental
      replication_key: updated_at
      primary_keys: ["id"]
      partitions: []
      schema_sample_size: 100
      state_key: users_api:users

  transforms:
    - type: keys_case
      config: { mode: snake }

  quality:
    record:
      - type: not_null
        field: id
        on_failure: abort             # abort: a null id is a catastrophic upstream bug
      - type: not_null
        field: email
        on_failure: quarantine        # quarantine: route bad rows to the DLQ
      - type: regex_match
        field: email
        pattern: '^[^@\s]+@[^@\s]+\.[^@\s]+$'
        on_failure: quarantine
      - type: value_in_set
        field: status
        values: ["active", "inactive", "pending", "suspended"]
        on_failure: quarantine
      - type: compare
        field: age
        op: gte
        value: 0
        on_failure: quarantine
    batch:
      - type: row_count
        min: 1
        on_failure: abort             # empty pages indicate a misconfigured source
      - type: unique
        fields: [id]
        on_failure: quarantine        # route duplicate ids to the DLQ

  dlq:
    sink:
      type: jsonl
      config:
        path: ./dlq/users_quality_failures.jsonl
    on_batch_error: propagate
    max_failures_per_page: 50
    max_failures_total: 500

  sink:
    type: postgres
    config:
      connection_url: ${env:PG_URL}
      table_name: users
      column_mapping:
        type: jsonb
        column: data
      batch_size: 500
      max_connections: 5

  state:
    type: file
    config:
      path: ./.faucet-state

Check catalog

Per-record checks

Evaluated in declared order; first failure wins for a given record. on_failure may be quarantine (route the row to the DLQ) or abort (raise FaucetError::QualityFailure and stop the run immediately).

CheckKey fieldsPasses whenMissing field
not_nullfield, treat_missing_as_null (default true)value present and non-nullfail (pass iff treat_missing_as_null: false)
not_emptyfieldvalue is a non-empty string after trimming whitespacefail
regex_matchfield, patternvalue is a string matching patternfail
value_in_setfield, values: [...]value is in the allowed set (exact JSON equality)fail
not_in_setfield, values: [...]value is NOT in the forbidden setpass (trivially not in set)
comparefield, op, valueordering or equality holds (see below)fail
type_isfield, expectedJSON type of the value matches expectedfail
string_lengthfield, min?, max?char count in [min, max] (at least one bound required)fail
json_schemaschemawhole record validates against a JSON Schema document(whole-record check)

compare operators: gt, gte, lt, lte require both the field value and the configured value to be JSON numbers (compared as f64); eq and ne do exact JSON equality with no type coercion.

json_schema requires the quality-jsonschema Cargo feature. It is the most expressive check; its cost scales with schema complexity — for very large or deeply nested schemas on hot paths, prefer the granular checks above and benchmark your case.

Per-batch checks

Evaluated per page over the survivors (records that passed all per-record checks). Aggregate checks (row_count, null_rate, distinct_count) are not row-attributable, so they offer quarantine_batch (route all survivors to the DLQ, write nothing this page) or abort. unique is row-attributable and accepts quarantine (route the duplicate rows) or abort.

CheckKey fieldsPasses when
row_countmin?, max? (at least one required)survivor count in [min, max]
null_ratefield, max (0.0–1.0)null-or-missing rate ≤ max; zero survivors → 0.0 → pass
uniquefields: [...] (composite key)every survivor’s composite key is unique within the page
distinct_countfield, min?, max?distinct values of field in [min, max]

Failure policies

PolicyMeaningAllowed on
quarantineRoute the specific offending row(s) to the DLQ; keep the rest as survivorsper-record checks; unique
quarantine_batchRoute all current survivors of the page to the DLQ; nothing written this pageaggregate batch checks (row_count, null_rate, distinct_count)
abortRaise FaucetError::QualityFailure and stop the runevery check

DLQ requirement

Any check that uses quarantine or quarantine_batch requires a dlq: block. Omitting it fails validation with an error explaining that a dlq: block is required (faucet validate catches this before the run starts; the core guards it again at run start).

See the Dead-letter queues cookbook page for dlq: options.

Observability

The quality pass emits faucet_quality_* metrics automatically:

  • faucet_quality_checks_total{pipeline,row,check,outcome=pass|fail}
  • faucet_quality_records_quarantined_total{pipeline,row,check,field}
  • faucet_quality_aborts_total{pipeline,row,check}
  • faucet_quality_check_duration_seconds{check}

These are available alongside the standard faucet_source_*, faucet_sink_*, and faucet_transform_* metrics. See Observability for the full metrics reference.

Validate the config

faucet validate rest_to_postgres_with_quality.yaml
# ok: 'users_api_to_postgres_with_quality' rows=1 (roots=1, children=0)

faucet schema quality prints the full JSON Schema for the quality: block, and faucet list shows all available checks with descriptions.

Compression

The file-shaped connectors can read and write gzip / zstd transparently. Enable the compression feature, then set a compression: field on the connector.

Enable the feature

# CLI
cargo install faucet-cli --features compression

# Library (umbrella) — activates compression on whichever file connectors you've enabled
faucet-stream = { version = "1.0", features = ["sink-jsonl", "source-csv", "compression"] }

The compression aggregate feature forwards to whichever of the supported connectors you’ve already opted into; it doesn’t pull in connectors by itself. full includes compression.

Connectors that support it

source-csv, source-s3, source-gcs, sink-jsonl, sink-csv, sink-s3, sink-gcs.

Config

sink:
  type: jsonl
  config:
    path: ./out/records.jsonl.gz
    compression: auto      # none | gzip | zstd | auto (default)
  • auto chooses from the filename suffix: .gz → gzip, .zst → zstd, anything else → none.
  • Explicit gzip / zstd / none override the suffix.

Auto-detection runs per file at I/O time, so one matrix run can read a mix of .jsonl, .jsonl.gz, and .jsonl.zst objects.

Notes

  • File sinks finalize the encoder on flush(); later writes reopen in append mode, producing a multi-member compressed file that gzip/zstd decoders read back transparently.
  • S3 and GCS sinks do not set a Content-Encoding header — consumers must decompress explicitly.
  • Parquet, Kafka, HTTP, stdout, and the database sinks are intentionally out of scope: Parquet has internal columnar compression and the others have native protocol-level options.

Record transforms

A pipeline’s transforms: list is a sequence of pure Fn(Value) -> Value steps run on every record between source and sink. Each transform is a small, declarative reshape — pick the ones you need, list them in the order you want them to run, and the CLI wires them up for you.

This page is a tour of the standard transforms exposed in YAML. All of them are listed in faucet list and dispatchable as type: values.

At a glance

KindPurposeShape
flattenCollapse nested objects to a flat recordseparator
rename_keysRegex rename of every key, recursivelypattern, replacement
keys_caseRe-case every key (snake / camel / pascal / kebab / screaming_snake)mode
spell_symbolsSpell out symbols in keys (%percent, #number, …)extra, separator
selectKeep only listed top-level fieldsfields: [..]
dropRemove listed top-level fieldsfields: [..]
setAdd or overwrite top-level fields with constantsvalues: {k: v, ..}
rename_fieldExact-name rename (vs. regex)fields: {from: to, ..}
castCoerce per-field typesfields: {name: type}, on_error
redactReplace listed field values with a maskfields: [..], mask
value_caseLowercase / uppercase / trim string valuesfields: [..], mode

The field-targeting transforms (select, drop, set, rename_field, cast, redact, value_case) act on top-level fields only — dotted paths into nested objects are intentionally out of scope. If you need to reach a nested field, run flatten first, then operate on the flattened key.

Missing fields are silently skipped. None of the field-selection transforms introduce a null for a name that wasn’t already on the record.

A full example

The runnable file is at cli/examples/rest_to_stdout_transforms.yaml:

pipeline:
  source:
    type: rest
    config: { ... }

  transforms:
    - type: flatten
      config: { separator: "__" }
    - type: select
      config:
        fields: [id, name, email, address__city, company__name]
    - type: rename_field
      config:
        fields:
          address__city: city
          company__name: company
    - type: value_case
      config:
        fields: [email]
        mode: lower
    - type: cast
      config:
        fields: { id: string }
        on_error: error
    - type: redact
      config:
        fields: [phone]
        mask: "[redacted]"
    - type: set
      config:
        values:
          _source: jsonplaceholder
          _ingested_at: "2026-01-01T00:00:00Z"

  sink:
    type: stdout
    config: { format: json_lines }

Run it:

faucet run cli/examples/rest_to_stdout_transforms.yaml | jq .

The order matters: flatten runs first so that select can reference address__city; rename_field runs after select so it only has to rename keys that survived; cast runs before set so the stamped _source field is left untouched.

Declaration layers

Transforms can be declared at three layers in a config. The executor resolves them per matrix row by concatenating contributions in lifecycle order — pipeline first, then source template, then row:

final = T_pipeline ++ T_source ++ T_row
LayerLives atIntent
Pipelinepipeline.transformscross-cutting policy (PII redaction, provenance stamp)
Source templatepipeline.sources.<name>.transformscleanup tied to the source’s natural emission shape
Matrix rowmatrix[i].transformsrow-specific extras or one-off shaping

Each layer is optional. Empty layers contribute nothing.

pipeline:
  transforms:                                  # T_pipeline (runs first)
    - { type: set, config: { values: { _ingested_at: "${env:NOW}" } } }
  sources:
    users_api:
      type: rest
      transforms:                              # T_source
        - { type: flatten, config: { separator: "__" } }
        - { type: keys_case, config: { mode: snake } }
matrix:
  - id: users_pii
    source: { ref: users_api }
    transforms:                                # T_row (runs last)
      - { type: redact, config: { fields: [email], mask: "[pii]" } }
    # final = [set, flatten, keys_case, redact]

Opting out: inherit_transforms: false

Each layer that introduces transforms (source template, matrix row) carries a sibling boolean field inherit_transforms, default true. Set to false, it drops every layer declared above it.

source.inherit_transformsrow.inherit_transformsFinal list
true (default)true (default)T_pipeline ++ T_source ++ T_row
falsetrueT_source ++ T_row
truefalseT_row
falsefalseT_row

Use this for debug rows that need raw records, or for a source whose natural shape is already canonical and shouldn’t be touched by global policy:

matrix:
  - id: forensic_row
    source: { ref: users_api }
    inherit_transforms: false              # ← drops T_pipeline AND T_source
    transforms:
      - { type: select, config: { fields: [id, raw_payload] } }
    # final = [select]

Sinks reject both transforms: and inherit_transforms:. Destination shaping belongs at the pipeline or row layer.

Reusing transform lists across sources

Use YAML anchors:

pipeline:
  sources:
    users_api:
      type: rest
      transforms: &user_cleanup
        - { type: flatten, config: { separator: "__" } }
        - { type: keys_case, config: { mode: snake } }
    archived_users_api:
      type: rest
      transforms: *user_cleanup

No grammar extension needed — the YAML parser expands anchors before the config reaches faucet.

keys_case — pick the output convention

- type: keys_case
  config:
    mode: snake   # | camel | pascal | kebab | screaming_snake

The tokeniser splits each key on whitespace, _, -, dropped punctuation, and lower→upper transitions (so "firstName" and "first_name" and "first-name" all tokenise the same), then re-joins in the requested style:

Inputsnakecamelpascalkebabscreaming_snake
"First Name"first_namefirstNameFirstNamefirst-nameFIRST_NAME
"last-name"last_namelastNameLastNamelast-nameLAST_NAME
"camelCase"camel_casecamelCaseCamelCasecamel-caseCAMEL_CASE
"ID"ididIdidID

Two distinct keys that re-case to the same name error rather than silently overwriting (same collision rule as flatten and spell_symbols). An all-symbol key ("!@#") tokenises to nothing and is kept as-is to avoid producing a blank key.

Multi-char uppercase runs are left as one token: "XMLParser"["XMLParser"]xmlparser (snake). If you need them split, normalise with rename_keys first.

spell_symbols — symbols → words in keys

- type: spell_symbols
  config:
    extra:
      "©": copyright
      "<=": lte
    separator: " "   # default

The default map covers the common ASCII symbols:

| %percent | #number | $dollar | &and | @at | | +plus | *star | =equals | <lt | >gt | | /slash | \backslash | |pipe | ^caret | ~tilde |

User entries in extra are merged on top of the defaults (an override with the same key wins). Replacements are sorted longest-first, so "<=" beats "<" when both are present.

Each replacement is surrounded by separator (default " ") so a chained keys_case cleanly picks up the word boundary:

transforms:
  - type: spell_symbols
  - type: keys_case
    config: { mode: snake }

turns "% sold"" percent sold""percent_sold".

select vs. drop

- type: select
  config:
    fields: [id, email]

Listed fields are kept; everything else is dropped.

- type: drop
  config:
    fields: [password, ssn]

Listed fields are removed; everything else is kept. Use select when the schema is fixed and you want to defend against the source adding new fields you don’t want; use drop for targeted PII / secret removal.

set — constant stamps

- type: set
  config:
    values:
      _source: my-api
      _ingested_at: "2026-05-28T00:00:00Z"
      version: 2
      tags: [pii-free]

Any JSON value is accepted (string, number, bool, null, array, object). Existing fields with the same name are overwrittenset is the intentional “I want this value” transform.

rename_field vs. rename_keys

Both transforms rename keys, but they’re aimed at different jobs:

rename_keysrename_field
Single regex substitution applied to every key, recursively (including keys inside nested objects and arrays).Exact-name match on top-level keys only.
Best for systematic patterns: ^_sdc_"", ([a-z])([A-Z])$1_$2.Best for a handful of explicit renames: address__citycity.

rename_field errors if a target name already exists on the record (same collision rule as flatten and keys_case) — to avoid silently overwriting a real value.

cast — type coercion

- type: cast
  config:
    fields:
      age: int
      price: float
      active: bool
      id: string
      created_at: timestamp
    on_error: error

Target types: int (i64), float (f64), bool, string, timestamp (RFC 3339). bool from a string accepts true|false|1|0|yes|no case-insensitively. timestamp parses RFC 3339 / ISO 8601 and normalises the output (so +00:00 becomes Z). Casting a float to int only succeeds for a whole number within i64 range — a fractional value (e.g. 3.9) or one beyond ±9.2e18 is treated as uncastable (governed by on_error) rather than being silently truncated or saturated.

Failure behaviour is controlled by on_error:

on_errorWhat happens on an uncastable value
error (default)The transform errors with FaucetError::Transform. The pipeline either aborts or routes the record to the DLQ, depending on your DLQ config.
nullThe value is replaced with null. Use when the schema must hold and a downstream nullable column is acceptable.
skipThe value is left as-is (original type). Use when downstream code already handles mixed types.

Missing fields are always a no-op — cast will never insert a null for a field that wasn’t already on the record.

Casting epoch seconds / millis to a timestamp is out of scope for the initial release; file a follow-up issue if you need it.

redact

- type: redact
  config:
    fields: [password, ssn, credit_card]
    mask: "***"

mask is any JSON value (default "***" if omitted). Missing fields are skipped — redact will not add "***" to a record that didn’t have the field.

value_case

- type: value_case
  config:
    fields: [email, username]
    mode: lower   # | upper | trim

Only string field values are touched; non-string values (numbers, bools, nulls, nested objects) pass through unchanged.

Ordering rules of thumb

Transforms run in the order you list them, so think about dependencies:

  • flatten, spell_symbols, and keys_case change key names — list field-targeting transforms (select, drop, cast, redact, value_case, rename_field) after them, referencing the post-rename keys.
  • cast runs before downstream consumers see the record, so put it after any rename steps but before set if you want set’s stamped values left untouched.
  • set overwrites by name — put it last when you want it to win.

The “clean keys for a downstream warehouse” pipeline is canonical:

transforms:
  - type: spell_symbols     # %sold → percent sold
  - type: keys_case
    config: { mode: snake } # percent sold → percent_sold
  - type: rename_field
    config:
      fields: { legacy_id: id }

Out of scope

  • Dotted-path field selection on the field-list transforms (select, drop, cast, redact, value_case, rename_field) — they still operate on bare top-level keys. Run flatten first if you need nested access. filter and explode are the exceptions and support the JSONPath subset documented in their sections.
  • A general expression / scripting transform (jq, CEL, …) — separate, larger discussion.

Filter and explode

Filter — keep records matching a predicate

transforms:
  - { type: filter, config: { path: deleted, op: ne, value: true } }

Operators: eq, ne, exists, in, not_in.

  • path: — JSONPath subset: bare key (status), dot path ($.user.status), or bracketed string key ($['order-id']). Bare keys are auto-prefixed with $.. Keys that literally contain . require the $-rooted bracket form ("$['foo.bar']").
  • value: — required for eq / ne / in / not_in. For in / not_in, must be an array. Forbidden for exists.
  • Type semantics: strict JSON equality. "5" eq 5 is false. Chain cast upstream to coerce.
  • ne and not_in keep records with a missing path (the predicate is satisfied by absence). All other operators drop missing-path records.

Explode — expand an array into one record per element

transforms:
  - { type: explode, config: { path: items, prefix: item } }
  • path: — same JSONPath subset as filter.
  • prefix: — prepended to each element field when the element is an object. Defaults to the last segment of path (so path: itemsprefix: items). Empty string opts out of prefixing (pure LATERAL FLATTEN).
  • separator: — between prefix and element field key. Default "_".
  • on_missing: — what to do when the path doesn’t yield a non-empty array. passthrough (default — record flows through unchanged), drop (SQL UNNEST semantics), or error.

Merge rule (object elements): the array node at path is removed from its parent container and each element field is added as a sibling, prefixed.

InputStageOutput
{id: 1, items: [{sku: A, qty: 2}]}explode { path: items }{id: 1, items_sku: A, items_qty: 2}
{id: 1, items: [{sku: A}, {sku: B}]}explode { path: items, prefix: item }{id: 1, item_sku: A}, {id: 1, item_sku: B}
{id: 1, items: [{sku: A}], prefix: ""}explode { path: items, prefix: "" }{id: 1, sku: A}
{id: 1, tags: ["rust", "etl"]}explode { path: tags }{id: 1, tags: rust}, {id: 1, tags: etl}
{id: 1, user: {name: A, items: [{x: 1}]}}explode { path: $.user.items }{id: 1, user: {name: A, items_x: 1}}

Collisions (a prefixed element key would overwrite a sibling) fail loudly with FaucetError::Transform("explode produced duplicate key 'X'") — mirroring flatten / keys_case.

Ordering: explode early, filter late (usually)

The recommended order is explode → transform → filter: each child of the explode gets transforms applied uniformly, and the final filter acts on cleaned shape. Two legitimate deviations:

  • filter before explode: drop soft-deleted parents before exploding, saving the work of expanding children of dead rows.
  • filter both sides: drop dead parents, explode, then drop archived children.
transforms:
  - { type: filter, config: { path: deleted, op: ne, value: true } }
  - { type: explode, config: { path: items, prefix: item } }
  - { type: filter, config: { path: item_status, op: in, value: [active, pending] } }
  - { type: keys_case, config: { mode: snake } }

Secrets-manager interpolation

faucet can pull secret values directly from HashiCorp Vault, AWS Secrets Manager, GCP Secret Manager, and Azure Key Vault — using ${scheme:reference} directives right inside your config file. Resolution happens at config-load time: values are fetched concurrently, de-duplicated, substituted into the config tree, and never written to disk or logs.

These directives join the existing load-time set: ${env:VAR}, ${file:PATH}, and ${secret:VAR} (alias for ${env:}).

Build features

None of the four backends are compiled in by default. Opt in per backend or take all four with the aggregate feature:

# All four backends
cargo install faucet-cli --features secrets

# Individual backends
cargo install faucet-cli --features secrets-vault
cargo install faucet-cli --features secrets-aws-sm
cargo install faucet-cli --features secrets-gcp-sm
cargo install faucet-cli --features secrets-azure-kv

Using faucet-cli from source or as a library dependency:

cargo build -p faucet-cli --features secrets

The full aggregate feature includes all four backends.

HashiCorp Vault (KV v2)

Directive: ${vault:<path>[#field]}

Auth: set VAULT_ADDR and VAULT_TOKEN in the environment. VAULT_NAMESPACE is optional (for HCP Vault or enterprise namespaces).

The #field selector parses the secret body as a JSON object and extracts one key. Omit it to receive the entire secret body as a string.

# Requires: VAULT_ADDR + VAULT_TOKEN, and a KV v2 secret at
# secret/data/faucet/api with a `token` field.
# Build with: --features secrets-vault
version: 1
name: rest-to-jsonl-with-vault
pipeline:
  source:
    type: rest
    config:
      base_url: https://api.example.com
      path: /v1/items
      auth:
        type: bearer
        config:
          token: "${vault:secret/data/faucet/api#token}"
  sink:
    type: jsonl
    config:
      path: ./out/items.jsonl

AWS Secrets Manager

Directive: ${aws-sm:<name-or-ARN>[#field]}

Auth: the standard aws-config default credential chain — environment variables (AWS_ACCESS_KEY_ID / AWS_SECRET_ACCESS_KEY / AWS_SESSION_TOKEN), ~/.aws/credentials profile, EC2/ECS instance credentials, web identity token, or IAM role attached to the compute environment. No manual config needed beyond what the AWS SDK picks up automatically.

The #field selector works the same as for Vault: it parses the secret as JSON and extracts one key.

# Build with: --features secrets-aws-sm
version: 1
name: postgres-to-bigquery-secure
pipeline:
  source:
    type: postgres
    config:
      connection_url: "${aws-sm:prod/postgres#connection_url}"
      query: "SELECT * FROM events WHERE created_at > now() - interval '1 day'"
  sink:
    type: bigquery
    config:
      project_id: my-gcp-project
      dataset_id: analytics
      table_id: events
      credentials:
        type: application_default

GCP Secret Manager

Directive: ${gcp-sm:projects/<project>/secrets/<secret>/versions/<version>}

Use versions/latest to always fetch the current active version.

Auth: Application Default Credentials — run gcloud auth application-default login for local development, or rely on the service account attached to GCE/Cloud Run in production. No extra environment variables needed.

# Build with: --features secrets-gcp-sm
version: 1
name: rest-to-gcs-secure
pipeline:
  source:
    type: rest
    config:
      base_url: https://api.partner.com
      path: /v2/export
      auth:
        type: bearer
        config:
          token: "${gcp-sm:projects/my-project/secrets/partner-api-token/versions/latest}"
  sink:
    type: gcs
    config:
      bucket: my-export-bucket
      prefix: exports/
      credentials:
        type: application_default

Azure Key Vault

Directive: ${azure-kv:<vault>/<secret>[/<version>]}

Omit the version segment to fetch the current (enabled) version.

Auth: the azure_identity default chain — AZURE_TENANT_ID / AZURE_CLIENT_ID / AZURE_CLIENT_SECRET environment variables (service principal), managed identity (when running in Azure), or az login (developer tools). These are tried in that order; the first that succeeds is used.

# Build with: --features secrets-azure-kv
version: 1
name: rest-to-snowflake-secure
pipeline:
  source:
    type: rest
    config:
      base_url: https://api.example.com
      path: /v1/records
      auth:
        type: bearer
        config:
          token: "${azure-kv:my-vault/api-token}"
  sink:
    type: snowflake
    config:
      account: myaccount.us-east-1
      warehouse: LOAD_WH
      database: RAW
      schema: PUBLIC
      table: records
      auth:
        type: oauth
        config:
          access_token: "${azure-kv:my-vault/snowflake-token}"

The #field JSON extractor

Both Vault and AWS Secrets Manager support storing multiple values as a JSON object inside one secret. The #field selector lets you extract a single key:

# Secret at prod/db contains: {"host": "db.example.com", "password": "s3cr3t"}
connection_url: "postgresql://app:${aws-sm:prod/db#password}@${aws-sm:prod/db#host}/mydb"

Each reference is fetched and de-duplicated — the same (scheme, path) pair is fetched exactly once even if it appears in multiple config fields.

If the field is absent, faucet surfaces a clear error listing the available keys. If the secret body isn’t valid JSON when #field is used, faucet errors rather than returning raw bytes.

Validating configs with secrets

With resolution (real preflight): faucet validate resolves all secrets as part of config validation and prints one line per reference to confirm which secrets were reached (never the values):

secret: vault:secret/data/faucet/api#token → resolved
ok: 'rest-to-jsonl-with-vault' rows=1 (roots=1, children=0) execution=(defaults)
  - default [root] source=rest sink=jsonl

Offline (no network / credentials): faucet validate --no-secrets validates grammar and structure only, skipping all secret fetches. Use this in CI steps that don’t have credentials, or in local development before you have vault access:

faucet validate --no-secrets pipeline.yaml

Grammar reference: faucet schema secrets prints the full directive syntax and auth requirements for all four schemes in machine-readable JSON:

faucet schema secrets

Resolution order

Secret directives resolve as the final load-time stage, after ${env:} / ${file:} / ${vars.X} / ${sources.X} are all settled. This means you can use env vars to compose a secret path:

pipeline:
  source:
    type: rest
    config:
      auth:
        type: bearer
        config:
          token: "${vault:secret/data/${env:APP_ENV}/api#token}"

Substitution order: ${env:APP_ENV} resolves first (during the raw text pass); the resulting path secret/data/prod/api#token is then fetched from Vault.

Redaction guarantee and its boundary

faucet scrubs every resolved secret value from its own tracing / log / error output. Every byte written through the CLI’s tracing subscriber passes through a RedactingWriter that replaces any registered secret value with ***. Errors that contain deserialized config fields go through the same scrubber before they reach stderr.

Every resolution path registers its result for redaction — the secrets-manager directives (${vault:…}, ${aws-sm:…}, …) and the load-time ${env:…} / ${secret:…} / ${file:…} forms. A credential supplied via the common ${env:TOKEN} form is therefore scrubbed exactly like a ${vault:…} one (values shorter than 4 characters are not registered). The faucet serve bearer auth token (--auth-token / FAUCET_SERVE_AUTH_TOKEN) is registered the same way. The scrubber withholds a short trailing window between writes, so a secret split across two separate log writes is still masked. Independently, faucet_core::Credential and the built-in auth providers hand-write their Debug to print secrets as ***, so a {:?} of a credential or shared provider never reveals the token.

This boundary covers faucet’s own output only. A third-party connector that debug-logs its own deserialized config fields — or any library that logs a reqwest::Request, a database row, or a JSON object — operates outside this boundary. In particular:

  • Do not enable RUST_LOG=debug or FAUCET_LOG=debug when running a pipeline whose connector configs hold resolved secrets. The connector libraries may log intermediate objects that contain the resolved value before faucet’s scrubber can see it.
  • Prometheus metric labels and span attributes set by connectors are also outside this boundary.
  • The scrubber does not redact values shorter than 4 characters.

Secrets in the auth: catalog and vars: block

Secret directives are resolved everywhere config interpolation runs: connector configs, transforms, state, dlq, matrix rows, the top-level auth: shared-provider catalog, and the top-level vars: block.

Putting a secret in the shared auth: catalog is often the cleanest option — a single bearer token resolved once and shared across every matrix row that references it via auth: { ref } (one token cache, single-flight refresh):

# A secret in the shared catalog, resolved once and shared by reference.
auth:
  api:
    type: static
    config:
      token: "${vault:secret/data/app#token}"

pipeline:
  sources:
    orders:  { type: rest, config: { base_url: https://api.example.com/orders,  auth: { ref: api } } }
    refunds: { type: rest, config: { base_url: https://api.example.com/refunds, auth: { ref: api } } }
  sink: { type: jsonl, config: { path: ./out.jsonl } }

A secret in the vars: block works the same way and can be reused through ${vars.X}:

vars:
  db_password: "${aws-sm:prod/db#password}"

pipeline:
  source:
    type: postgres
    config:
      connection_url: "postgres://app:${vars.db_password}@db.internal:5432/app"
  sink: { type: jsonl, config: { path: ./rows.jsonl } }

The shared auth: catalog is a first-class config location in every respect: its provider specs can also reference ${vars.X} and ${sources.X.PATH}, not just secret directives.

Inline auth: blocks on individual connectors resolve secrets too — use the shared catalog when several connectors share one credential, and inline auth when a credential belongs to a single connector.

Adaptive batch sizing

Adaptive batch sizing lets faucet automatically tune how many records it sends to the sink in each write, instead of using a fixed batch_size. The built-in AIMD controller (Additive Increase / Multiplicative Decrease) starts at the source page size, grows the batch additively when writes are clean and fast, and shrinks it multiplicatively when errors appear or write latency rises above a target.

When to use it

Useful when the optimal write batch size changes over time or varies by data shape:

  • Spiky data volumes — smaller batches during large-row bursts; bigger ones for narrow rows.
  • Sink rate limits / quotas — back off automatically when the API starts returning errors or timing out.
  • Latency-sensitive pipelines — keep each write inside a target window (e.g. target_latency_ms: 1000) rather than guessing a fixed size.

Adaptive batch sizing is pure write-side tuning: the source page size is unchanged, and the controller simply reslices each page into sub-batches of the current effective size.

Configuration

Add an execution.adaptive_batch_size: block to your config file:

execution:
  adaptive_batch_size:
    enabled: true
    min: 500
    max: 10000
    increase_step: 500
    decrease_factor: 0.5
    cooldown_batches: 5
    target_latency_ms: 1000
    latency_window: 10
    error_threshold: 0.01
    respect_source_max: true
    log_every: 50

Full example

The postgres_to_bigquery_adaptive.yaml example pairs a PostgreSQL source with a BigQuery sink and a JSONL DLQ:

# postgres_to_bigquery_adaptive.yaml  (abbreviated)
version: 1
name: postgres_to_bigquery_adaptive

pipeline:
  source:
    type: postgres
    config:
      connection_url: ${env:PG_URL}
      query: SELECT id, created_at, payload FROM orders WHERE created_at > $1
      params: ["2026-01-01T00:00:00Z"]
      batch_size: 5000   # source page size — also the effective upper ceiling
      max_connections: 8

  sink:
    type: bigquery
    config:
      project_id: my-gcp-project
      dataset_id: warehouse
      table_id: orders
      auth:
        type: service_account_key
        config:
          json: ${env:GCP_KEY_JSON}
      batch_size: 1000   # starting write size; the controller tunes this

  dlq:
    sink:
      type: jsonl
      config:
        path: ./dlq/orders_failed.jsonl
    on_batch_error: dlq_all

execution:
  adaptive_batch_size:
    enabled: true
    min: 500
    max: 10000
    increase_step: 500
    decrease_factor: 0.5
    cooldown_batches: 5
    target_latency_ms: 1000
    error_threshold: 0.01

Config field reference

All fields are optional except enabled. Unset fields take the defaults shown below.

FieldTypeDefaultDescription
enabledboolfalseMaster switch. Set to true to activate the controller.
controllerstring"aimd"Algorithm. Only "aimd" is supported in v1.
mininteger100Lower bound on effective batch size. Must be ≥ 1.
maxinteger50000Upper bound. Must be ≤ 1,000,000. Values above the source page size are inert (see Caveats).
increase_stepinteger250Rows added per clean, fast batch (additive growth). Must be ≥ 1 and ≤ 1,000,000.
decrease_factorfloat0.5Multiplicative shrink factor on error or high latency. Must be in (0, 1).
cooldown_batchesinteger5Batches to skip after a shrink before allowing growth again.
target_latency_msinteger | nullnullOptional target write latency (ms). null means react to errors only.
latency_windowinteger10Rolling window size (batches) for the p50 latency estimate. Must be ≥ 1.
error_thresholdfloat0.01Per-batch error rate (0.0–1.0) above which the controller shrinks.
respect_source_maxbooltrueCap effective batch size at the source page size. Must be true; false is rejected (cross-page buffering would break the O(batch_size) memory guarantee).
log_everyinteger50Emit a tracing::info summary every N adjustments (0 = never).

AIMD behavior

The controller follows a strict priority order for each sub-batch observation:

  1. Error shrink (always fires, even during cooldown) — if the per-batch error rate exceeds error_threshold, the current size is multiplied by decrease_factor (floor-rounded, clamped to min), and cooldown_batches is armed.
  2. Cooldown gate — if cooldown is active, decrement the counter and skip growth. A new error during cooldown fires rule 1 again and re-arms the counter.
  3. Latency target (when target_latency_ms is set) — evaluate the rolling p50 latency:
    • p50 > 1.2 × target_latency_ms → shrink.
    • p50 < 0.5 × target_latency_ms → grow.
    • Otherwise, stay (dead-band prevents oscillation).
  4. Success growth — add increase_step to the current size (clamped to max).

Cold start

The controller initialises to the first source page length, clamped into [min, max]. If the first page is smaller than min, the effective size starts at min.

Example trajectory

With min=500, max=5000, increase_step=500, decrease_factor=0.5, cooldown_batches=2:

batch 1: size=1000, ok, fast  → grow  → 1500
batch 2: size=1500, ok, fast  → grow  → 2000
batch 3: size=2000, 3% errors → shrink→ 1000, cooldown armed (2)
batch 4: size=1000, cooldown  → skip  → 1000
batch 5: size=1000, cooldown  → skip  → 1000
batch 6: size=1000, ok, fast  → grow  → 1500

Metrics

Four per-pipeline-row gauges / counters are emitted automatically:

MetricTypeDescription
faucet_pipeline_adaptive_batch_sizegaugeCurrent effective batch size.
faucet_pipeline_adaptive_batch_adjustments_totalcounterTotal adjustments, labeled direction=up|down and reason=success|error|latency.
faucet_pipeline_adaptive_batch_cooldown_activegauge1 while cooldown is active, 0 otherwise.
faucet_pipeline_adaptive_batch_p50_latency_msgaugeRolling p50 write latency (ms); absent until the window fills.

All four carry the standard pipeline and row labels.

Example PromQL to alert when the controller is stuck shrinking:

# Shrink rate over the last 5 minutes
rate(faucet_pipeline_adaptive_batch_adjustments_total{direction="down"}[5m])
  > 0.5

Caveats

Error-driven shrink requires a DLQ

The error signal comes from per-row outcomes reported via the DLQ path (Sink::write_batch_partial). If no dlq: block is present, the controller sees zero errors regardless of the sink response — only target_latency_ms can drive shrinks. Add a dlq: block with on_batch_error: dlq_all if you want the controller to react to sink-side write errors.

Within-page ceiling: max is capped at the source page size

In v1 the controller reslices pages it already received from the source — it cannot buffer records across pages. The effective upper bound is therefore min(max, source_page_size). If you set max: 50000 but the source emits pages of 1 000 records, the controller will never write more than 1 000 rows per call.

To allow bigger write batches, raise the source’s batch_size (e.g. batch_size: 20000 on the postgres source config). Setting max higher than the source page size is harmless but inert.

respect_source_max: false to cross page boundaries is rejected at config load: cross-page buffering would have to hold records across source pages, which breaks the pipeline’s O(batch_size) memory guarantee. Raise the source batch_size instead.

No-op for per-record sinks

jsonl, csv, and stdout write one record at a time regardless of batch_size. Adaptive sizing is active but harmless for these sinks — the controller adjusts its internal state normally, but the actual write granularity is unchanged. A one-time tracing::info message notes this when the pipeline starts.

Scheduling pipelines with faucet schedule

faucet schedule runs a pipeline on a cron schedule in a long-running foreground process. It is designed for server-side deployment: drop it into systemd, Kubernetes, or any supervisor that can restart it on failure, and the pipeline fires on time every time.

faucet schedule pipeline.yaml           # foreground; Ctrl-C or SIGTERM to stop
faucet schedule pipeline.yaml --once    # run exactly once now, then exit

The config must include a schedule: block alongside the usual pipeline:. Configs without one are rejected with a hint to use faucet run instead.

A runnable example

The following config runs a CSV→JSONL pipeline every night at 02:00 America/Los_Angeles. Save it as nightly.yaml and start it with faucet schedule nightly.yaml:

# nightly.yaml — run at 02:00 Pacific every night
version: 1
name: nightly-rollup

schedule:
  cron: "0 2 * * *"
  timezone: "America/Los_Angeles"
  overlap_policy: skip            # don't pile up if a run runs long
  max_consecutive_failures: 5     # exit non-zero after 5 straight failures (supervisor restarts)
  on_failure: continue
  shutdown_grace_secs: 30

pipeline:
  source:
    type: csv
    config:
      path: ./events.csv
  sink:
    type: jsonl
    config:
      path: ./events.jsonl

See cli/examples/scheduled_nightly.yaml for the canonical copy.

Cron syntax

faucet uses a standard Unix cron expression, validated at config-load time. A bad expression or an expression that can never fire produces a clear error before the process starts.

5-field form (MIN HOUR DOM MON DOW):

ExpressionMeaning
0 2 * * *Every night at 02:00
*/15 * * * *Every 15 minutes
0 9 * * 1-5Weekdays at 09:00
0 0 1 * *First of every month at midnight
0 */6 * * *Every 6 hours

6-field form (SEC MIN HOUR DOM MON DOW) — add a leading seconds field for sub-minute intervals:

ExpressionMeaning
*/30 * * * * *Every 30 seconds
0 */5 * * * *Every 5 minutes (explicit seconds=0)

Field ranges follow standard cron semantics: * (every), */N (every N), a-b (range), a,b,c (list). Month and day-of-week names (JAN, MON, etc.) are accepted. Special strings like @daily and @hourly are not supported — use the numeric form.

Timezone and DST

Set timezone to any IANA timezone name (e.g. America/Los_Angeles, Europe/Berlin, Asia/Tokyo). The default is UTC.

All tick times are computed on UTC monotonic instants with timezone-correct wall-clock interpretation, so DST transitions behave correctly:

  • Fall-back (clocks go back): a repeated wall-clock hour fires once.
  • Spring-forward (clocks skip ahead): a wall-clock time in the skipped hour is treated as if it were in the hour immediately after the gap — the next valid tick.

The scheduler loop re-checks the wall clock at least every 30 seconds, so NTP steps, VM freeze/thaw, and DST shifts can never drift a scheduled fire by more than ~30 seconds.

Missed-tick behavior

The scheduler advances from the scheduled tick, not the wall clock, so a single occurrence is not skipped just because dispatch latency pushed the clock a little past it — it fires promptly (slightly late) and the schedule resumes. But if many ticks elapsed (the process was down, or a run took longer than several cron periods), the backlog is collapsed to a single catch-up: the scheduler fires once at the next due time and moves on. There is no catch-up storm and no flood of backfilled runs.

To find out how late a run fired, scrape faucet_schedule_run_lateness_seconds (histogram: actual_start − scheduled_for).

Overlap policy

The overlap policy controls what happens when a tick fires while a run is already executing.

PolicyWhen to use
skip (default)The tick is dropped and a faucet_schedule_overlaps_total{policy=skip} counter is incremented. Use when it is acceptable to miss a cycle if the previous one ran long. Most pipelines.
queueOne missed tick is buffered and fires immediately when the current run finishes. Further misses during that same run collapse into the single queued tick (in-memory only — lost on restart). Use when missing a cycle is unacceptable but strict concurrency still must be preserved.
forbidThe process exits non-zero the moment an overlap would occur. Use when overlapping runs would produce corrupt output or you want a hard guarantee that no two instances run simultaneously — pair with a supervisor that alerts or pages on non-zero exit.

Choosing between skip and queue: if your pipeline is idempotent and catching up after a long run matters (e.g. incremental replication with state), use queue. If occasional missed cycles are harmless and you prefer simplicity, use skip.

Failure model and supervisor integration

Two independent knobs govern what happens when a run fails:

on_failuremax_consecutive_failuresBehaviour
continue (default)nullTolerates all failures indefinitely. Alert via faucet_schedule_consecutive_failures gauge.
continueNTolerates up to N−1 straight failures; exits non-zero when the Nth consecutive failure occurs. A successful run resets the counter to 0.
stopanyExits non-zero immediately on the first failure.

The recommended production pattern is on_failure: continue with max_consecutive_failures: N (5–10 depending on how quickly you want a supervisor restart):

schedule:
  cron: "*/5 * * * *"
  on_failure: continue
  max_consecutive_failures: 5   # restart after 5 straight failures

Systemd unit example

# /etc/systemd/system/nightly-rollup.service
[Unit]
Description=faucet nightly rollup
After=network.target

[Service]
Type=simple
ExecStart=/usr/local/bin/faucet schedule /opt/pipelines/nightly.yaml
Restart=on-failure
RestartSec=30s
# Env vars for the pipeline
EnvironmentFile=/opt/pipelines/nightly.env

[Install]
WantedBy=multi-user.target

Restart=on-failure means systemd restarts the process whenever it exits with a non-zero code, which is exactly the condition max_consecutive_failures produces. RestartSec=30s adds a brief cooldown between restarts to avoid hammering a broken upstream.

Kubernetes CronJob vs long-running Deployment

faucet schedule is designed for a Deployment (or long-running Pod): one process, always running, fires on cron. This keeps token caches warm and avoids cold-start latency on every tick.

If you need Kubernetes to manage the schedule itself, use a Kubernetes CronJob with faucet run instead — each invocation is ephemeral and the scheduler handles missed/overlapping pods at the platform level.

Graceful shutdown and SIGTERM

On SIGTERM or Ctrl-C:

  1. faucet stops accepting new ticks.
  2. If a run is in flight, it waits up to shutdown_grace_secs (default 30) for it to finish.
  3. If the run finishes within the grace period, the process exits 0.
  4. If the run is still running after the grace period, it is aborted. The per-page StateStore bookmark means the next start resumes from the last confirmed write — no data is lost, but the partial page since the last bookmark is re-fetched on the next run. Whether that causes duplicates depends on your sink’s idempotency.

Increase shutdown_grace_secs for long-running pages (e.g. a BigQuery batch that takes several minutes to flush):

schedule:
  cron: "0 * * * *"
  shutdown_grace_secs: 120

Dated outputs with ${now.*}

${now.*} tokens let you inject the run’s wall time into source and sink config values — so a scheduled pipeline can write to a different file or object-storage prefix on every tick without any manual bookkeeping.

The headline use case is a dated partition path:

# nightly_partitioned.yaml — write to a new dated partition every night
version: 1
name: nightly-events

schedule:
  cron: "0 2 * * *"
  timezone: "America/Los_Angeles"
  overlap_policy: skip
  max_consecutive_failures: 5

pipeline:
  source:
    type: rest
    config:
      base_url: https://api.example.com
      path: /v1/events
  sink:
    type: jsonl
    config:
      # ${now.date} reflects the schedule's timezone (America/Los_Angeles),
      # so the partition label matches the business date of the run.
      path: "./warehouse/dt=${now.date}/events.jsonl"

When the cron fires at 02:00 on 2026-03-09 Pacific time, ${now.date} resolves to 2026-03-09 and faucet writes to ./warehouse/dt=2026-03-09/events.jsonl. The parent directory is created automatically — local file sinks (JSONL, CSV) create missing parent directories so dated subdirectory paths work without pre-creating the tree.

The full token set:

TokenExampleUse case
${now.date}2026-03-08Daily partition key
${now.year} / ${now.month} / ${now.day}2026 / 03 / 08Hive-style year=…/month=…/day=… paths
${now.hour}14Hourly partitions
${now.unix}1741442709Unique epoch-based filenames
${now.strftime.<fmt>}2026/03/08/14Arbitrary layout — e.g. ${now.strftime.%Y/%m/%d/%H}
${now.datetime} / ${now.iso}2026-03-08T14:05:09+00:00RFC 3339 timestamp in a filename or object key

Clock semantics

faucet schedule uses the tick’s scheduled time rendered in the schedule’s timezone — not the actual wall clock when the run started. This means ${now.date} is deterministic: re-running the same tick (e.g. after a restart) produces the same path.

faucet schedule --once uses the current wall clock in the schedule’s timezone.

Backfilling with faucet run --clock

To backfill a range of dates, use faucet run with the --clock flag instead of faucet schedule. --clock overrides the process start time used by ${now.*}:

# Backfill three nightly partitions
faucet run --clock 2026-03-01 nightly_partitioned.yaml
faucet run --clock 2026-03-02 nightly_partitioned.yaml
faucet run --clock 2026-03-03 nightly_partitioned.yaml

A bare date (2026-03-01) is treated as midnight UTC. An RFC 3339 timestamp (2026-03-01T02:00:00-08:00) sets the clock precisely. Unknown ${now.*} tokens are config errors; the token set is validated at run start before any I/O begins.

Health metrics to scrape

Register a Prometheus listener via the observability: block:

observability:
  prometheus:
    listen: "127.0.0.1:9464"

Key metrics for a scheduling health dashboard:

MetricWhat to alert on
faucet_schedule_heartbeat_unix_secondstime() - value > 90 → scheduler loop is stuck or process crashed
faucet_schedule_consecutive_failures> 0 → at least one recent failure; >= max_consecutive_failures → imminent exit
faucet_schedule_next_tick_unix_secondsvalue - time() > 2 * expected_interval → scheduler is not advancing
faucet_schedule_runs_total{outcome="err"}Increasing counter → runs are failing
faucet_schedule_overlaps_totalRepeated increments → runs are taking longer than the cron period
faucet_schedule_run_lateness_secondsp99 > threshold → runs are starting significantly late

Full metric reference:

MetricTypeDescription
faucet_schedule_runs_total{pipeline,outcome}Counteroutcome{ok, err, skipped}
faucet_schedule_overlaps_total{pipeline,policy}CounterOverlap events by policy
faucet_schedule_next_tick_unix_seconds{pipeline}GaugeUnix timestamp of the next scheduled tick
faucet_schedule_runs_in_flight{pipeline}Gauge0 or 1
faucet_schedule_consecutive_failures{pipeline}GaugeResets to 0 on success
faucet_schedule_heartbeat_unix_seconds{pipeline}GaugeUpdated every loop wake (≤30 s)
faucet_schedule_last_run_started_unix_seconds{pipeline}Gauge
faucet_schedule_last_run_completed_unix_seconds{pipeline}Gauge
faucet_schedule_last_run_duration_seconds{pipeline}Gauge
faucet_schedule_run_lateness_seconds{pipeline}Histogramactual_start − scheduled_for

Each run also emits a faucet.schedule.run tracing span (attributes: run_ordinal, scheduled_for_unix_seconds, tick_unix_seconds) that wraps the inner pipeline spans, so distributed tracing carries the scheduling context through the full pipeline.

Running faucet as a service (faucet serve)

faucet serve turns faucet from a one-shot CLI into a long-running HTTP control plane: an orchestrator (Airflow, Temporal, Dagster, Argo) submits pipeline configs over HTTP, polls status, cancels runs, and streams logs — while faucet amortizes startup (TLS handshakes, connection pools, schema introspection) across many runs in one process. It is the second supported runtime mode alongside one-shot faucet run and the cron faucet schedule.

The full endpoint/schema reference is in HTTP API reference; this page is the guided tour. serve requires the serve Cargo feature (cargo install faucet-cli --features serve, or --features full).

Quickstart

# Start the server (loopback by default). Auth is mandatory — see below.
FAUCET_SERVE_AUTH_TOKEN=s3cret faucet serve --listen 127.0.0.1:8080
# Submit a run.
curl -XPOST http://127.0.0.1:8080/v1/runs \
  -H "Authorization: Bearer s3cret" -H 'content-type: application/json' \
  -d '{"config":"version: 1\npipeline:\n  source: {type: csv, config: {path: in.csv}}\n  sink: {type: jsonl, config: {path: out.jsonl}}\n","name":"adhoc"}'
# → {"run_id":"0192…","status":"queued","submitted_at":"…"}

# Poll it to completion.
curl -H "Authorization: Bearer s3cret" http://127.0.0.1:8080/v1/runs/0192…

# Tail its logs (SSE).
curl -N -H "Authorization: Bearer s3cret" http://127.0.0.1:8080/v1/runs/0192…/logs

⚠️ Security model — read before exposing

serve executes arbitrary client-supplied pipeline configs with the server’s identity. That is a real privilege surface:

  • Full interpolation: submitted configs resolve ${env:…}, ${file:…}, ${secret:…}, and ${vault:…}/${aws-sm:…}/… against the server’s environment, filesystem, and credentials — exactly like faucet run. An authenticated caller can read any secret the server can reach.
  • SSRF / egress: a submitted REST/HTTP source can be pointed at 169.254.169.254 or internal services and will be fetched with the server’s network identity.

Mitigations are deployment-level and mandatory:

  • Never run with --no-auth on a non-loopback bind. The no-auth gate is explicit: without --auth-token/FAUCET_SERVE_AUTH_TOKEN and without --no-auth, startup fails.
  • Run single-tenant, behind authentication, behind egress controls / network policy. The default loopback bind (127.0.0.1) is deliberate — exposing externally is an explicit choice.
  • Terminate TLS at a proxy/ingress (serve speaks plain HTTP).
  • Prefer FAUCET_SERVE_AUTH_TOKEN over --auth-token (the latter leaks through ps//proc).
  • Never run a serve process at FAUCET_LOG=debug when submitted configs hold resolved secrets — only faucet’s own log output is redacted, not third-party connector debug logging.

Bounded concurrency & backpressure

--max-concurrent-runs (default min(16, cpu_count())) bounds how many runs execute at once; --max-queued-runs (default 8×) bounds the queue. A submit past the queue cap returns 429 with Retry-After. Note that total concurrent pipeline work ≈ max-concurrent-runs × each config's execution.max_concurrent.

Idempotency

Supply idempotency_key to make retries safe (Stripe-style):

  • First submit with a key → runs normally.
  • Re-submit the same key + same request within --idempotency-retention-secs (default 24h) → returns the original run_id (replayed, no new run).
  • Same key + a different request → 409 Conflict.
  • After the retention window, the key is re-usable for a fresh run.
  • Deleting a run also frees its idempotency key immediately — a later submit with that key starts a fresh run rather than 404-ing on the deleted record.

The “request” identity covers the merged config and the run-affecting request fields — clock, timeout_secs, and labels. In particular, a retry that reuses the key but changes the backfill clock is a 409, not a replay of the original window (so you can’t silently get the original clock’s results).

The claim is atomic, so concurrent retries can’t both start a run.

Degraded mode: while the persistent history backend is degraded (see Run history), the in-memory fallback can’t see claims the database made before the outage. Rather than risk a duplicate run, submissions carrying an idempotency key are rejected with 503 until the backend recovers — retry then, or resubmit without a key if at-least-once is acceptable. Submissions without a key are unaffected.

Cancellation

POST /v1/runs/{id}/cancel cooperatively cancels an in-flight run (202); on an already-terminal run it’s a 200 no-op. The same cooperative path handles a run that hits its timeout_secs and the server-shutdown drain.

Cancellation is flush-completing: the pipeline stops at its next page boundary and flushes the sink, so a buffered sink (e.g. Parquet, whose footer is only written on flush) commits the rows written so far rather than orphaning the whole file (#146 H16). The run is then marked cancelled — there is no cross-process resume, so re-submit to continue. A run still stuck mid-write after a bounded flush grace is hard-dropped (its buffered output may be lost), so a hung run can’t wedge shutdown.

--default-config (workspace defaults)

Pass --default-config <file> to merge shared settings under every submitted run (submitted values win; objects merge, scalars/arrays replace). Pin state:, execution:, and the auth: catalog once instead of repeating them per request. See cli/examples/serve_minimal.yaml.

Cardinality: a config’s name: field drives the metric pipeline label and the state-key prefix. Use a stable name: per logical pipeline — never an ad-hoc per-run string — or Prometheus cardinality blows up. The request-level name/labels are run-record metadata only, never metric labels.

Run history & persistence

By default run records live in memory and are lost on restart. For durable history across restarts, point --history at a database (requires the matching build feature):

# Postgres (feature: serve-history-postgres)
faucet serve --history 'postgres://user:pw@db/faucet'
# SQLite (feature: serve-history-sqlite)
faucet serve --history 'sqlite:/var/lib/faucet/runs.db'

Both create their schema on first connect. If the backend is unreachable at startup, or fails at runtime, serve degrades to the in-memory store so it stays up: it logs once, sets the faucet_serve_history_degraded gauge, and /readyz returns 503. Persisted records are not migrated into the fallback — degraded mode is a stay-alive, not a replica. Terminal records are retained for --retain-terminal-runs-secs (default 7 days).

Multi-instance orphan recovery (run-ownership leases)

A persistent backend can be shared by several faucet serve instances (an HA pair, a rolling/blue-green deploy). Each instance gets a fresh id at startup and owns the runs it executes; while a run is in flight its owner heartbeats a lease on the run record (at ~⅓ of --lease-ttl-secs, default 30s). A run is only recovered — marked failed with owning serve instance's lease expired — once its lease has expired, i.e. its owner stopped heartbeating (crashed or was shut down). Recovery runs both at startup and periodically, so a surviving instance reclaims a dead peer’s orphans without waiting for a restart.

This means a starting or running instance never fails another live instance’s in-flight runs — the gap that an unscoped “fail every non-terminal run at startup” sweep would open on a shared database. Tune --lease-ttl-secs above your worst-case GC/IO stall so a healthy-but-slow instance is never falsely reclaimed (a longer TTL is safer but slows how quickly a crashed instance’s runs are cleaned up). The in-memory backend is single-process and unshared, so leases don’t apply to it. There is still no cross-process resume: a recovered run is marked failed, not continued — re-submit to retry.

Graceful shutdown

SIGTERM/SIGINT stops accepting new connections, drains in-flight runs up to --shutdown-grace-secs (default 60), then cancels the remainder (marked failed).

Health & observability

  • /healthz — liveness (always 200 while serving).
  • /readyz — 503 when history is degraded or the queue is full.
  • /metrics — Prometheus, including faucet_serve_* series. /metrics is unauthenticated; restrict it at the network layer if its labels are sensitive.

Troubleshooting with faucet doctor

faucet doctor answers “why won’t my pipeline run?” before you run it. It probes every connector in a config — auth, network, permissions, reachability — and prints a green/red checklist, exiting non-zero if anything fails. It is non-mutating: no data is written, no rows inserted, no objects uploaded.

faucet doctor pipeline.yaml
✓ Config parses and interpolates                                 8 ms
✓ Matrix expands to 2 invocations                    0 skipped (children)

▸ Invocation default::us-east  (source=postgres, sink=bigquery)
  ✓ source [postgres] read                                      42 ms
  ✓ sink   [bigquery] auth                                     280 ms
  ✓ state  [redis] sentinel                                     14 ms

▸ Invocation default::eu-west  (source=postgres, sink=bigquery)
  ✓ source [postgres] read                                      39 ms
  ✗ sink   [bigquery] auth (dataset eu_west not found)         410 ms
        hint: check bigquery credentials and that the dataset exists

Summary: 5 passed, 1 failed, 0 skipped       total elapsed 0.5s

The exit code is the number of failed probes (clamped to 255), so doctor drops straight into a CI gate or a deploy script:

faucet doctor pipeline.yaml || { echo "preflight failed"; exit 1; }

What gets probed

RoleProbe
Source (most)Pulls a single page via the real read path (DNS + TLS + auth + first request) and stops — never the full dataset.
webhook sourceThe configured port is bindable.
websocket sourceTCP connect to the host (no WebSocket handshake).
postgres-cdc sourceThe replication slot is reachable (missing slot → skip, since run can create it).
kafka source / sinkA cluster metadata request (validates brokers + auth without consuming/producing).
SQL sinks (postgres/mysql/sqlite)SELECT 1 on the pool.
s3 / gcs sinksBucket head / metadata list.
bigquery / snowflake sinksToken mint + a read-only metadata call / SELECT 1.
redis / mongodb / elasticsearch / http sinksPING / ping / cluster health / a HEAD request.
File sinks (jsonl/csv/parquet/stdout)Target directory is writable (stdout always passes).
State stores (redis/postgres/file/memory)A sentinel put/get/delete that leaves no residue.

Reading the result

  • ✓ pass — the probe succeeded.
  • ✗ fail — unreachable / unauthenticated / misconfigured. The parenthesized reason and the hint: line tell you what to fix.
  • • skip — not applicable: an optional target is absent (e.g. a CDC slot not yet created), a connector ships no probe, or an object-store path can’t be cheaply checked.

Flags

FlagPurpose
--timeout-secs <N>Per-probe timeout in seconds (default 10). Lower it to fail fast against dead hosts.
--jsonEmit a { config, invocations, summary } JSON document for tooling.
--env-file <path> / --no-env-fileSame .env handling as run.

The --json shape:

{
  "config": "pipeline.yaml",
  "invocations": [
    {
      "id": "default::eu-west",
      "probes": [
        { "role": "source", "connector": "postgres", "name": "read", "status": "pass", "elapsed_ms": 39 },
        { "role": "sink", "connector": "bigquery", "name": "auth", "status": "fail",
          "reason": "dataset eu_west not found", "elapsed_ms": 410,
          "hint": "check bigquery credentials and that the dataset exists" }
      ]
    }
  ],
  "summary": { "passed": 5, "failed": 1, "skipped": 0, "elapsed_ms": 500 }
}

Limitations

  • Child invocations in a parent/child matrix are listed but not probed: their configs depend on parent records that only exist at run time (same limitation as faucet preview).
  • doctor needs real credentials — it resolves secrets like run does. Use faucet validate --no-secrets for an offline grammar-only check.
  • Probe reason/hint text is scrubbed for resolved secrets, but don’t run with FAUCET_LOG=debug against a config holding live secrets (third-party connector logging is outside faucet’s redaction boundary).

Connector catalog

faucet-stream ships 21 sources and 17 sinks. Each is a Cargo feature (source-<name> / sink-<name>) and an independently published crate. Full API docs are on docs.rs.

Run faucet list to see what’s compiled into your binary, and faucet schema source <name> / faucet schema sink <name> for a connector’s exact config fields. Not sure which to pick? See Choosing a connector.

Legend: ✓ supported · ✗ not applicable.

Sources

ConnectorFeatureStreams¹Resumable²CompressionUnderlying primitive
RESTsource-restHTTP + 6 pagination styles, JSONPath extraction
GraphQLsource-graphqlcursor pagination, variable injection
XML / SOAPsource-xmlstreaming XML→JSON, dot-path extraction
gRPCsource-grpc✓³dynamic protobuf; unary + server-streaming
PostgreSQLsource-postgresSQL query, rows as JSON
PostgreSQL CDCsource-postgres-cdclogical replication (pgoutput), LSN bookmarks
MySQLsource-mysqlSQL query, rows as JSON
Microsoft SQL Serversource-mssql✓⁷SQL query (tiberius), rows as JSON
SQLitesource-sqliteSQL query, rows as JSON
AWS S3source-s3✓⁴object reader: JSONL, JSON array, raw text
Google Cloud Storagesource-gcs✓⁴object reader: JSONL, JSON array, raw text
MongoDBsource-mongodbfind() with filter/projection/sort
Redissource-redisstreams, lists, key patterns
Webhooksource-webhook✗⁵temporary HTTP server collecting POSTs
WebSocketsource-websocketlive push feed; subscribe frames, reconnect, ping keepalive
CSVsource-csvCSV files as JSON
Elasticsearchsource-elasticsearchsearch/scroll API
Apache Kafkasource-kafkaconsumer; idle/max-messages termination, offset bookmarks
Apache Parquetsource-parquetlocal/glob/S3, vectorized Arrow reader, projection
BigQuerysource-bigqueryjobs.query + pageToken pagination
Snowflakesource-snowflakeSQL REST API, server-side partitions

¹ Streams = yields records in bounded-memory batches rather than buffering the whole result. ² Resumable = persists a bookmark to a state store so re-runs continue where they left off (incremental replication / CDC / Kafka offsets). ³ gRPC streams natively in server-streaming mode; unary buffers the single response. ⁴ S3/GCS stream in JSONL and raw-text modes; JSON-array mode buffers one object. ⁵ Webhook is buffer-shaped by nature (it collects POSTs over a window). ⁷ MSSQL is resumable only in replication: incremental mode (it persists a tracking-column bookmark); in full mode it is not.

Sinks

Every sink exposes a batch_size knob for write-side re-chunking. For the file/append sinks (jsonl, csv, stdout) it’s a no-op — they write per record.

ConnectorFeaturebatch_sizeCompressionWrite unit
BigQuerysink-bigquerytabledata.insertAll (per-row DLQ)
PostgreSQLsink-postgresmulti-row INSERT (JSONB or mapped cols)
JSON Linessink-jsonlno-opbuffered file append
Snowflakesink-snowflakeSQL REST API
MySQLsink-mysqlmulti-row INSERT
Microsoft SQL Serversink-mssqlmulti-row INSERT (2100-param auto-split, per-row DLQ)
SQLitesink-sqlitetransaction-wrapped batch
AWS S3sink-s3JSONL objects, parallel uploads
Google Cloud Storagesink-gcsJSONL objects
MongoDBsink-mongodbinsert_many
Redissink-redisstreams, lists, key-value (pipelined)
CSVsink-csvno-opbuffered file rows
Elasticsearchsink-elasticsearch_bulk NDJSON (per-row DLQ)
HTTPsink-httpPOST, concurrent under a semaphore
Stdoutsink-stdoutno-opJSON Lines / pretty JSON / TSV
Apache Kafkasink-kafkaproducer, batched sends, multi-topic routing
Apache Parquetsink-parquet✗⁶local/S3, schema inference, row/byte rollover

⁶ Parquet has internal columnar compression, so the file-level compression feature doesn’t apply.

Authentication at a glance

FamilyAuth options
REST / GraphQL / XMLBearer, Basic, ApiKey (header), ApiKeyQuery, OAuth2 (client-credentials), TokenEndpoint, Custom headers — see Auth cookbook
BigQueryservice-account key (path or inline JSON), application-default credentials
SnowflakeJWT key-pair, OAuth
KafkaSASL (PLAIN/SCRAM) + TLS
WebSocketnone, Bearer token, Custom headers
Elasticsearchbasic, API key, bearer, none
S3 / GCScloud SDK credential chains (env, profile, metadata)
SQL databasesconnection URL (with embedded credentials / TLS params)

Inspect any connector’s exact auth shape with faucet schema source <name> / faucet schema sink <name>.

Batching

Default batch_size is 1000; max is 1,000,000. batch_size: 0 means “no batching” — the source emits the whole result set in one page and the sink writes it in one request (good for small lookup tables or load-job-style sinks). See Performance tuning.

Choosing a connector

Several connectors overlap. This page resolves the common “which one?” questions. For the full feature grid see the connector catalog.

PostgreSQL: query source vs. CDC

  • source-postgres runs a SQL query and returns the rows. Use it for one-shot extracts, snapshots, or when you control an updated_at column and parameterize the query yourself. Simple, no special Postgres config.
  • source-postgres-cdc streams every INSERT/UPDATE/DELETE from the write-ahead log via logical replication. Use it when you need every change (including deletes), low-latency capture, or resumability without a cursor column. Requires wal_level = logical and a publication, and retains WAL between runs. See the CDC tutorial.

Rule of thumb: periodic snapshot → query source; continuous change feed → CDC.

Object storage: S3/GCS source vs. Parquet source

  • source-s3 / source-gcs read objects as JSONL, a JSON array, or raw text. Use them for line-delimited JSON, logs, or text dumps.
  • source-parquet reads columnar Parquet (local, glob, or S3) with a vectorized Arrow reader and column projection. Use it for analytical datasets — it’s far faster and can skip columns you don’t need.

Rule of thumb: the file is .parquet → Parquet source; it’s JSON/text → S3/GCS source. (The Parquet source reads from S3 directly, so you don’t need the S3 source in front of it.)

Live feeds: WebSocket vs. Webhook vs. Kafka/Redis

  • source-websocket — connects out to a live push endpoint (ws:///wss://), optionally sends subscription frames, and streams each incoming message as a record. Use it for market data, chat feeds, telemetry, or any server that pushes over WebSocket. Live-only — no replay, no durable offset.
  • source-webhook — opens a temporary HTTP server and receives inbound HTTP POSTs from external systems over a time window. Use it when the remote system pushes to you over HTTP rather than WebSocket.
  • source-kafka / source-redis — broker-backed streaming with durable, replayable offsets and resumable bookmarks. Use these when you need guaranteed delivery and the ability to continue from where a previous run left off.

Rule of thumb: connecting out to a live WebSocket feed → source-websocket; receiving inbound HTTP POST payloads → source-webhook; durable, replayable event stream → source-kafka or source-redis.

Streaming: Redis vs. Kafka

  • source-redis reads streams, lists, or key patterns. Great when Redis is already in your stack and volumes are modest.
  • source-kafka is a real consumer with consumer-group offsets and resumable bookmarks. Use it for high-throughput event pipelines and durable, replayable streams.

Rule of thumb: durable, high-volume event stream → Kafka; lightweight queue/cache already on hand → Redis.

HTTP APIs: REST vs. GraphQL vs. XML vs. gRPC

  • source-rest — JSON REST APIs. The most full-featured source: six pagination styles, seven auth strategies, incremental replication, partitions.
  • source-graphql — GraphQL endpoints with cursor pagination and variable injection.
  • source-xml — XML/SOAP APIs; converts XML to JSON with dot-path extraction.
  • source-grpc — gRPC services via dynamic protobuf (prost-reflect), unary or server-streaming.

Rule of thumb: match the protocol the API speaks. For incremental/resumable ingestion, REST has the richest support.

Warehouses: when to read with BigQuery / Snowflake sources

Use source-bigquery / source-snowflake to read out of a warehouse (e.g. to move a query result elsewhere). To load into one, use the matching sink. To transform data already inside the warehouse, reach for dbt — that’s not faucet’s job.

Sinks: column-mapped vs. JSON blob (SQL databases)

The Postgres/MySQL/SQLite/SQL Server sinks can write either:

  • a single JSON/JSONB column (column_mapping: { type: jsonb, column: data }) — schemaless, no DDL coupling, easiest to start with; or
  • auto-mapped columns — one column per top-level field, for queryable relational tables.

Rule of thumb: exploratory / evolving schema → JSON column; stable schema you query with SQL → mapped columns.

File sinks: JSONL vs. CSV vs. Parquet vs. stdout

  • sink-stdout — debugging and pipelines (faucet preview uses it).
  • sink-jsonl — line-delimited JSON; lossless, streaming-friendly, gzip/zstd-capable.
  • sink-csv — flat tabular output for spreadsheets/BI; nested fields flatten.
  • sink-parquet — columnar analytical output with built-in compression and schema inference; best for large datasets consumed by analytics engines.

Rule of thumb: machine-to-machine JSON → JSONL; tabular for humans → CSV; analytics at scale → Parquet.

Still unsure?

Run faucet list to see what’s installed, faucet schema source <name> to inspect a connector’s config, and faucet preview <config> --limit 10 to try a source without writing anywhere.

CLI commands

The faucet binary exposes these commands. Pass --log-level <level> (or set FAUCET_LOG) to control logging.

CommandWhat it does
faucet run [config]Run the pipeline(s) in a config file.
faucet validate [config]Parse, expand, and validate a config without running it.
faucet preview [config]Run only the source side and print records to stdout.
faucet schema <target>Print the JSON Schema for a connector, transform, or the DLQ.
faucet listList every compiled-in source, sink, and transform with a one-line description.
faucet init [name]Scaffold a commented config skeleton from connector schemas.
faucet doctor [config]Probe every connector (auth/network/permissions) and print a checklist.
faucet schedule [config]Run a pipeline on a cron schedule (long-running foreground process).
faucet serveRun a long-running HTTP control plane: submit / poll / cancel pipeline runs over REST.

[config] is optional for run / validate / preview / doctor / schedule: if omitted, faucet auto-discovers faucet.yaml.yml.json in the current directory.

run

faucet run pipeline.yaml
faucet run                              # auto-discover faucet.yaml in cwd
faucet run --from-env                   # build the pipeline entirely from FAUCET_* env vars
faucet run pipeline.yaml --env-file prod.env
faucet run pipeline.yaml --no-env-file
faucet run pipeline.yaml --clock 2026-03-01          # backfill: set ${now.*} clock to midnight UTC
faucet run pipeline.yaml --clock 2026-03-01T02:00:00-08:00  # backfill: precise RFC 3339 timestamp

Flags:

FlagPurpose
--clock <value>Override the clock used by ${now.*} tokens. Accepts an RFC 3339 timestamp (2026-03-01T00:00:00Z) or a bare date (2026-03-01, treated as midnight UTC). Default: process start time in UTC. Use this for backfills — run the same config with a different date without changing the file.
--env-file <path> / --no-env-fileSame .env handling as validate / preview.
--from-envBuild the pipeline entirely from FAUCET_* environment variables; mutually exclusive with a positional config path.

validate

Reports one line per expanded matrix row. Use it in CI to catch config errors before deploying.

faucet validate pipeline.yaml

When the config contains secrets-manager directives (${vault:…}, ${aws-sm:…}, etc.), faucet validate resolves them as a real preflight and prints one confirmation line per reference (never the value):

secret: vault:secret/data/faucet/api#token → resolved
ok: 'my-pipeline' rows=1 (roots=1, children=0) execution=(defaults)
  - default [root] source=rest sink=jsonl

Pass --no-secrets to validate grammar and structure only, skipping all secret fetches. This is useful in CI environments that lack credentials, or in local development before vault access is available:

faucet validate --no-secrets pipeline.yaml

preview

Runs the first root row’s source and prints records (via the stdout sink). Children aren’t previewed because they need parent records to resolve ${parent.path} tokens.

faucet preview pipeline.yaml --limit 10

schema

faucet schema source rest
faucet schema sink bigquery
faucet schema transform keys_case
faucet schema dlq
faucet schema secrets

faucet schema transform <name> prints the inline config schema for a transform (e.g. keys_case lists the valid mode: values). Run faucet list to see which transforms are compiled into your binary.

faucet schema secrets prints the directive grammar and auth requirements for all four secrets-manager backends in machine-readable JSON — useful for tooling that needs to understand the interpolation syntax without reading the docs.

init

faucet init my_pipeline --source postgres --sink bigquery

Required fields are surfaced with a typed placeholder and a # REQUIRED marker; optional fields are commented out so connector defaults apply. The interactive mode (--interactive) is gated behind the cli-interactive feature.

doctor

faucet doctor pipeline.yaml                  # checklist; exit code = # of failed probes
faucet doctor pipeline.yaml --timeout-secs 5 # per-probe timeout (default 10)
faucet doctor pipeline.yaml --json           # machine-readable, for CI gating

Runs a fast, non-mutating preflight against every connector in the config so misconfiguration surfaces before a real run. For each root invocation it probes the source, sink, and state store and prints a green/red checklist with elapsed times; the exit code equals the number of failed probes (clamped to 255).

  • Sources reuse the real read path — the probe pulls a single page and stops (never the full dataset). Sources whose first page would block or mutate use a targeted probe instead: webhook (port bindable), websocket (TCP connect), postgres-cdc (slot reachable), kafka (cluster metadata).
  • Sinks run a read-only connect/auth/metadata call — SELECT 1, HeadBucket, PING, tables.get, cluster health, fetch_metadata, or a directory-writable check for file sinks. Never a real write.
  • State stores do a sentinel put/get/delete that leaves no residue.

Child invocations (parent/child matrix rows) are listed but not probed — their configs depend on parent records that only exist at run time. Probe messages are scrubbed for resolved secrets before printing.

See the Troubleshooting cookbook page for reading the output and common failures.

schedule

faucet schedule pipeline.yaml                  # run on cron schedule, foreground; Ctrl-C to stop
faucet schedule pipeline.yaml --once           # run exactly once now, then exit
faucet schedule pipeline.yaml --env-file prod.env
faucet schedule pipeline.yaml --no-env-file

Runs a pipeline on a recurring cron schedule in a long-running foreground process. The config must contain a top-level schedule: block (without one, faucet errors and suggests faucet run). Requires the schedule Cargo feature (included in full).

  • Stop with Ctrl-C or SIGTERM; the in-flight run drains for up to shutdown_grace_secs (default 30) before the process exits.
  • --once ignores cron timing and runs the pipeline exactly once immediately — handy for testing a scheduled config or for one-shot container invocations.
  • Missed ticks are skipped, not backfilled. A run that starts late emits faucet_schedule_run_lateness_seconds for monitoring.

Flags:

FlagPurpose
--onceRun exactly once now, then exit. Ignores cron timing.
--env-file <path> / --no-env-fileSame .env handling as run / validate.

See the scheduling cookbook for worked examples, the overlap-policy decision tree, the resilience/supervisor model, and the full metric set to scrape.

serve

FAUCET_SERVE_AUTH_TOKEN=s3cret faucet serve --listen 0.0.0.0:8080
faucet serve --no-auth                             # explicit opt-in; required if no token
faucet serve --history sqlite:/var/lib/faucet/runs.db --default-config defaults.yaml

Runs a long-running HTTP control plane that accepts pipeline configs over REST, executes them under bounded concurrency (reusing the same executor as faucet run), and exposes status / cancel / list / SSE-logs endpoints plus /healthz, /readyz, and /metrics. Requires the serve Cargo feature (included in full).

Unlike the other commands, serve takes no config file — configs arrive per request. Auth is mandatory: pass --auth-token/FAUCET_SERVE_AUTH_TOKEN, or --no-auth to explicitly disable it (absent both, startup fails).

Selected flags (faucet serve --help for the full list):

FlagPurpose
--listen <addr>Bind address (default 127.0.0.1:8080; env FAUCET_SERVE_LISTEN).
--auth-token <t> / --no-authBearer token (prefer the env var) or explicit no-auth opt-in.
--max-concurrent-runs <n> / --max-queued-runs <n>Concurrency + queue caps (429 past the queue).
--history <url>postgres://… / sqlite:… for durable run history (feature-gated; default in-memory).
--default-config <path>Workspace defaults merged under every submitted run.
--cors-origin <origin>Allow-list a browser origin (repeatable; CORS off by default).
--lease-ttl-secs <n>Run-ownership lease TTL (default 30) for multi-instance orphan fencing on a shared persistent backend — set above worst-case stalls. See the serve cookbook.
--body-limit-bytes / --shutdown-grace-secs / --retain-terminal-runs-secs / --idempotency-retention-secsTuning knobs.

⚠️ serve executes arbitrary client-supplied configs with the server’s identity (secrets, files, network egress). Run single-tenant, authenticated, behind egress controls. See the serve cookbook for the security model and the HTTP API reference for endpoints.

Environment-only mode

faucet run --from-env assembles a pipeline from a FAUCET_* snapshot (FAUCET_SOURCE_*, FAUCET_SINK_*, FAUCET_STATE_*, FAUCET_TRANSFORM_<N>_*), which is handy for containerized deployments where everything comes from the environment. Nested/tagged-enum fields use a *_JSON suffix.

The complete config grammar (matrix, templates, vars, execution) lives in cli/README.md.

Configuration file format

A faucet config is a YAML or JSON document with this top-level shape:

version: 1                 # required, must be 1
name: my_pipeline          # optional; used in state keys and metrics
vars: {}                   # optional; reusable values referenced as ${vars.X}
auth: {}                   # optional; named shared auth providers (see below)
schedule: {}               # optional; cron schedule for faucet schedule (see below)
pipeline:                  # required
  source: { type: …, config: { … } }
  transforms: []           # optional list
  sink:   { type: …, config: { … } }
  state:  { type: …, config: { … } }   # optional
  dlq:    { … }            # optional dead-letter queue
matrix: []                 # optional per-row overrides / DAG
execution:                 # optional
  max_concurrent: 4
  on_error: continue       # continue | stop

Unknown keys are rejected. The structural blocks (pipeline, each source/sink/transform/state spec, matrix rows, execution) reject unrecognized fields, so a typo like transorms: or parnet: is a load-time error rather than a silently-ignored field. A connector’s own config: { … } object is still passed through verbatim to that connector.

pipeline

source and sink each take a type (the connector name) and a config object whose fields are that connector’s schema — see faucet schema source <name>. transforms is an ordered list applied to every record. state attaches a state store; dlq attaches a dead-letter queue.

Transforms layering

Transforms can be declared at three layers and are resolved additively per matrix row in lifecycle order:

final = T_pipeline ++ T_source ++ T_row
  • pipeline.transforms — cross-cutting policy, runs first on every row.
  • pipeline.sources.<name>.transforms — bound to a source template; runs for every row that resolves to this source.
  • matrix[i].transforms — row-specific extras, runs last.

Each declaring layer (source template, matrix row) carries an inherit_transforms: bool (default true); setting it false drops every upstream layer for that scope.

Sinks reject both transforms: and inherit_transforms: at expand time — destination shaping belongs at the pipeline or row layer. See the transforms cookbook for the full model and worked examples.

Available transforms

The full catalogue (with shapes and worked examples) lives in the transforms cookbook; faucet list prints the same set, and faucet schema transform <name> returns the JSON schema for each. Highlights:

  • filter — keep records where a JSONPath predicate is true. See the cookbook for the operator set and path syntax.
  • explode — expand an array field into one record per element. See the cookbook for the merge rule and on_missing semantics.

Interpolation

Three stages resolve placeholders:

  • Load time: ${env:VAR}, ${file:PATH}, ${secret:VAR} are resolved when the file is read. ${vars.X} resolves against the top-level vars: block; ${sources.NAME.PATH} / ${sinks.NAME.PATH} resolve against named templates. Secret-manager directives (see below) run as the final load-time stage.
  • Runtime: ${row_id.dotted.path} tokens are resolved per parent record in DAG runs. ${now.*} tokens are resolved per invocation at run time (see below).

Reference cycles surface as a clear InterpolationCycle error.

${now.*} — run-clock interpolation

${now.*} tokens inject the current wall time into source and sink config values. Each invocation evaluates them once at run time:

TokenExample outputNotes
${now.date}2026-03-08YYYY-MM-DD
${now.datetime}2026-03-08T14:05:09+00:00RFC 3339; alias: ${now.iso}
${now.iso}2026-03-08T14:05:09+00:00Alias for ${now.datetime}
${now.year}2026Zero-padded 4-digit year
${now.month}03Zero-padded month (01–12)
${now.day}08Zero-padded day (01–31)
${now.hour}14Zero-padded hour (00–23)
${now.minute}05Zero-padded minute (00–59)
${now.second}09Zero-padded second (00–59)
${now.unix}1741442709Unix epoch seconds
${now.strftime.<fmt>}2026/03/08/14Arbitrary chrono strftime — e.g. ${now.strftime.%Y/%m/%d/%H}

An unknown token (e.g. ${now.foo}) is a config error at run time. An invalid strftime format produces a clean config error rather than a panic.

Clock source:

  • faucet run — the process start time in UTC. Override with --clock <value> for backfills: an RFC 3339 timestamp (2026-03-01T00:00:00Z) or a bare date (2026-03-01, treated as midnight UTC). See the run command reference.
  • faucet schedule — the tick’s scheduled time, rendered in the schedule’s timezone. ${now.date} therefore reflects the date in the timezone the cron fires in (e.g. America/Los_Angeles), not UTC. Queued runs use their original scheduled time; --once uses the current wall clock.

Scope: ${now.*} tokens (and ${row_id.path} parent-record references) are resolved only in source and sink config values. Using one in a state:, dlq:, or transforms: config is a config error at validate/expand time — it is rejected rather than silently passed to the connector as a literal ${…} string. (${env:…} / ${vars.X} / ${sources.X} still resolve everywhere.)

Reserved id: now is a reserved matrix row id — a matrix row cannot be named now.

SQL caveat: ${now.*} substitutes as plain text into config values — the same semantics as ${row_id.path} tokens. For SQL sources that interpolate ${now.*} into a query string, prefer the connector’s bind-parameter path (substitute_context_bind_params) over raw text substitution to avoid injection risk.

Secrets-manager directives

Four additional load-time schemes pull values from external secrets managers. Each requires the matching build feature (--features secrets-vault, etc.; --features secrets enables all four). Values are fetched concurrently and de-duplicated; they are never written to disk.

DirectiveBackendAuth
${vault:<path>[#field]}HashiCorp Vault KV v2VAULT_ADDR + VAULT_TOKEN (+ optional VAULT_NAMESPACE)
${aws-sm:<name-or-ARN>[#field]}AWS Secrets Manageraws-config default chain (env / profile / instance / web-identity)
${gcp-sm:projects/<p>/secrets/<s>/versions/<v>}GCP Secret Manager (versions/latest ok)Application Default Credentials
${azure-kv:<vault>/<secret>[/<version>]}Azure Key VaultAZURE_* env / managed identity / az login

The #field selector (Vault and AWS only) parses the secret body as a JSON object and extracts a single key. Use faucet schema secrets for the machine-readable grammar reference and faucet validate --no-secrets to check grammar offline.

See the secrets cookbook for full examples, the redaction guarantee, and the known limitation around the auth: catalog.

matrix

Each row is deep-merged onto pipeline (scalars replace, objects merge, arrays replace). A row with parent: runs once per parent record. See the matrix DAG tutorial. For DRY configs with many rows, define named templates under pipeline.sources / pipeline.sinks and select them per row with ref:.

auth

A map of named auth providers, each { type, config } (typestatic / oauth2 / oauth2_refresh / token_endpoint). A connector references one with auth: { ref: <name> } instead of inline auth; faucet builds each provider once and shares it across every connector that references it (one token, single-flight refresh). See the authentication cookbook.

auth:
  api:
    type: oauth2_refresh
    config:
      token_url: ${env:API_TOKEN_URL}
      client_id: ${secret:API_CLIENT_ID}
      client_secret: ${secret:API_CLIENT_SECRET}
      refresh_token: ${secret:API_REFRESH_TOKEN}

execution

  • max_concurrent — one shared concurrency budget across roots and child fan-outs.
  • on_errorcontinue (siblings finish; failed subtree skipped) or stop (abort pending and in-flight work on first failure).

Adaptive batch sizing

The optional adaptive_batch_size: sub-block enables the AIMD controller that auto-tunes the effective write batch size from observed sink latency and error rate. Default enabled: false (opt-in).

execution:
  adaptive_batch_size:
    enabled: true          # master switch
    controller: aimd       # only "aimd" is supported in v1
    min: 100               # lower bound (rows)
    max: 50000             # upper bound; inert above the source page size
    increase_step: 250     # additive growth per clean batch
    decrease_factor: 0.5   # multiplicative shrink on error/high latency  (0, 1)
    cooldown_batches: 5    # batches to skip after a shrink
    target_latency_ms: null  # optional write-latency target (ms)
    latency_window: 10     # rolling window size for p50 latency
    error_threshold: 0.01  # per-batch error rate that triggers a shrink
    respect_source_max: true  # cap at source page size (see Caveats)
    log_every: 50          # tracing::info every N adjustments

Key caveats:

  • Error-driven shrink requires a dlq: block. Without one the controller sees no per-row errors; only target_latency_ms can drive shrinks.
  • Effective ceiling = source page size. In v1 the controller reslices pages in-memory — it cannot buffer across pages. Setting max higher than the source batch_size is harmless but inert. Raise the source batch_size to allow bigger write batches.
  • No-op for per-record sinks. jsonl, csv, and stdout write one record at a time; the controller adjusts normally but the write granularity is unchanged.

See the Adaptive batching cookbook for a full worked example, the AIMD trajectory, and the four Prometheus metrics (faucet_pipeline_adaptive_batch_*).

schedule

Present only when you run faucet schedule. Absent configs are rejected by that command with a hint to use faucet run instead. All fields except cron are optional.

schedule:
  cron: "0 2 * * *"               # REQUIRED. Standard 5-field cron, or 6-field with leading seconds.
  timezone: "UTC"                 # IANA timezone name. Default UTC.
  overlap_policy: skip            # skip | queue | forbid. Default skip.
  max_runs: null                  # null = run forever; N = exit 0 after N successful runs.
  max_consecutive_failures: null  # null = never exit on failure; N = exit non-zero after N straight failures.
  on_failure: continue            # continue | stop. Default continue.
  start_immediately: false        # Run once on startup before waiting for the first tick. Default false.
  run_timeout_secs: null          # Per-run wall-clock kill switch (seconds). Timed-out runs count as failed.
  shutdown_grace_secs: 30         # SIGTERM: wait this long for the in-flight run before aborting. Default 30.
FieldTypeDefaultDescription
cronstringrequired5-field standard Unix cron (MIN HOUR DOM MON DOW) or 6-field with a leading seconds field (SEC MIN HOUR DOM MON DOW). Validated at load time.
timezonestring"UTC"IANA timezone name (e.g. "America/Los_Angeles", "Europe/Berlin"). Affects how the cron expression is interpreted.
overlap_policyskip | queue | forbidskipWhat to do when a tick fires while a run is already in flight. skip drops the tick; queue buffers one missed tick (in-memory only, lost on restart); forbid exits non-zero.
max_runsinteger | nullnullStop the scheduler cleanly (exit 0) after this many successful runs. null means run forever. 0 is rejected as a config error.
max_consecutive_failuresinteger | nullnullExit non-zero after this many consecutive failed runs without a success in between. A successful run resets the counter. null means never exit on failures alone.
on_failurecontinue | stopcontinuestop exits non-zero immediately after the first failed run. continue keeps scheduling; use max_consecutive_failures to bound sustained outages.
start_immediatelyboolfalseWhen true, the first run fires right on startup before the cron clock reaches its first tick.
run_timeout_secsinteger | nullnullPer-run time limit in seconds. A run that exceeds this is killed and counts as a failure. null means no timeout.
shutdown_grace_secsinteger30On SIGTERM/SIGINT, wait this many seconds for the in-flight run to finish before forcibly aborting it.

Validation: faucet validate pipeline.yaml checks the schedule: block at parse time — bad cron syntax, unknown timezone names, max_runs: 0, and a cron expression that can never fire all produce a clear config error: schedule: … message before any run starts.

See the scheduling cookbook for worked examples, the DST/timezone details, the overlap-policy decision tree, and the full Prometheus metric set.

Discovery & env files

run / validate / preview / schedule auto-discover faucet.yaml.yml.json in the current directory, and load a sibling .env unless --no-env-file is given (--env-file PATH points elsewhere).

The authoritative, exhaustive grammar — including every matrix and template edge case — is in cli/README.md.

HTTP API reference (faucet serve)

faucet serve exposes a JSON REST control plane for submitting, polling, listing, cancelling, and streaming the logs of pipeline runs, plus unauthenticated health and Prometheus endpoints. A machine-readable docs/openapi.yaml spec ships alongside this page and is kept in sync with the router by a CI test.

See the serve cookbook for a guided quickstart, the security model, and operational guidance. This page is the endpoint reference.

Authentication

All /v1/* endpoints require Authorization: Bearer <token> unless the server was started with --no-auth. The token is compared in constant time; the Authorization header is the only accepted credential (no query-string auth). /healthz, /readyz, and /metrics are always unauthenticated (probes / scrapers). OPTIONS preflight bypasses auth so browsers behind a CORS policy work.

Endpoints

MethodPathSuccessNotes
POST/v1/runs202Submit a run; config validated synchronously
GET/v1/runs200List runs (filters below)
GET/v1/runs/{id}200Get one run record
DELETE/v1/runs/{id}204Remove a terminal run from history
POST/v1/runs/{id}/cancel202 / 200Request cancel (202) or no-op if terminal (200)
GET/v1/runs/{id}/logs200Stream the run’s logs as text/event-stream
GET/healthz200Liveness (unauthenticated)
GET/readyz200/503Readiness (unauthenticated)
GET/metrics200Prometheus exposition (unauthenticated)

POST /v1/runs

Request body:

{
  "config": "version: 1\npipeline:\n  source: {...}\n  sink: {...}\n",
  "config_format": "yaml",
  "name": "nightly-rollup",
  "labels": {"requester": "airflow"},
  "timeout_secs": 3600,
  "doctor_first": true,
  "idempotency_key": "airflow-task-123-attempt-2",
  "clock": "2026-05-29T00:00:00Z"
}
  • config (required) — the YAML or JSON pipeline body.
  • config_formatyaml (default) or json.
  • name — metadata; also drives the state-key and metric identity (see the cookbook’s cardinality note). Two submissions sharing a name share replication bookmarks.
  • labels — arbitrary string metadata, stored on the run record only.
  • timeout_secs — wall-clock cap; on expiry the run is marked failed.
  • doctor_first — run preflight probes before executing; on any failure the submit returns 422 with the doctor report in error.details.
  • idempotency_key — replay protection (see cookbook).
  • clock — overrides the ${now.*} clock for backfills (default: submit time).

Response (202):

{ "run_id": "0192…", "status": "queued", "submitted_at": "2026-05-29T12:00:00Z" }

A --default-config (if the server was started with one) is merged under the submitted config (submitted values win).

GET /v1/runs

Query parameters: status, name, since, until (RFC3339), limit (default 50, max 500), cursor. Ordering is (submitted_at DESC, run_id DESC); cursor is the last run_id from the previous page.

{ "runs": [ { "run_id": "…", "status": "completed", … } ], "next_cursor": "0192…" }

GET /v1/runs/{id}RunRecord

{
  "run_id": "0192…",
  "name": "nightly-rollup",
  "labels": {"requester": "airflow"},
  "status": "completed",
  "submitted_at": "…", "started_at": "…", "finished_at": "…",
  "elapsed_secs": 12.4,
  "records_written": 4096,
  "invocations": [
    {"row_id": "default", "parent_record_key": null, "records_written": 4096, "error": null}
  ],
  "error": null,
  "idempotency_key": "airflow-task-123-attempt-2",
  "doctor_report": null
}

status is one of queued, running, completed, failed, cancelled. elapsed_secs is filled live for running runs.

Bookmarks: run records carry record counts + per-row outcomes, not replication bookmarks. Bookmark state is per-row/per-state-key and lives in the configured state backend, not in the run record.

GET /v1/runs/{id}/logs (SSE)

text/event-stream. The server replays the run’s bounded ring buffer, then streams the live tail. Event types:

  • event: log — one captured log line (subject to the server’s FAUCET_LOG level; secrets are redacted).
  • event: truncated — the reader fell behind and lines were dropped; rely on the centralized log sink for the full history.
  • event: end — the run reached a terminal state; the stream closes.

Log buffers are ephemeral: they survive a short drain window after the run finishes (independent of run-record retention), then are dropped. A known run whose buffer has expired yields a single end.

curl -N -H "Authorization: Bearer $TOKEN" \
  http://127.0.0.1:8080/v1/runs/0192…/logs

Error envelope

Every error is a JSON ApiError:

{ "error": { "code": "unprocessable", "message": "…", "details": { } } }
StatusWhen
400Malformed body / parse / interpolation failure; a schedule: block in the config
401Missing/invalid bearer token
404Unknown run_id
409DELETE on a running run; idempotency key reused with a different payload
413Body exceeds --body-limit-bytes
422Expand/validation failure; doctor_first failed (report in details)
429Run queue full (carries Retry-After)
500Internal error

Metrics

/metrics serves the standard faucet_* pipeline metrics plus serve-specific series: faucet_serve_requests_total{method,path,status}, faucet_serve_request_duration_seconds{method,path}, faucet_serve_runs_queued, faucet_serve_runs_in_flight, faucet_serve_runs_total{status,reason}, faucet_serve_idempotency_hits_total, and faucet_serve_history_degraded. See Observability.

Deploying faucet

faucet runs pipelines to completion — it’s not a long-running daemon. That makes deployment simple: schedule the binary, point it at a config, and let it exit. Durable state (bookmarks) lets the next run pick up where the last left off.

Patterns

Cron / scheduled jobs

The most common deployment. Run on an interval; incremental replication + a durable state store mean each run only fetches what’s new.

# crontab: every 15 minutes
*/15 * * * * faucet run /etc/faucet/events.yaml >> /var/log/faucet.log 2>&1

Containers

Build a slim image with only the connectors you need, and supply config via a mounted file or entirely from the environment:

FROM rust:slim AS build
RUN cargo install faucet-cli --no-default-features \
    --features "source-rest,sink-bigquery,state-postgres,observability"

FROM debian:stable-slim
COPY --from=build /usr/local/cargo/bin/faucet /usr/local/bin/faucet
ENTRYPOINT ["faucet", "run"]

With faucet run --from-env you can drive the whole pipeline from FAUCET_* environment variables — no config file in the image. See the CLI reference.

Kubernetes CronJob

Wrap the container above in a CronJob. Use the postgres or redis state backend so bookmarks survive pod restarts, and scrape the metrics endpoint (see Observability).

Secrets

Never commit secrets. Use ${env:VAR} / ${file:PATH} in the config and inject real values through your platform’s secret mechanism (Kubernetes secrets, Docker secrets, a mounted .env, etc.).

Exit codes & retries

faucet run exits non-zero when a pipeline fails (subject to the execution.on_error policy and any DLQ). Let your scheduler’s retry/alert mechanism react to a non-zero exit; because bookmarks only advance after the sink confirms, a retried run resumes safely.

Observability

Every source, sink, transform, and state-store operation is automatically wrapped to emit tracing spans and metrics counters/histograms. Connector authors write no observability code — they only override connector_name() for a friendly label.

Enabling the Prometheus endpoint

The CLI’s observability feature (on by default in the full build) installs a Prometheus exporter. Configure it from the pipeline config or environment; once running, scrape the listen address with Prometheus.

Common labels

pipeline, row (matrix row id; empty for non-matrix runs), and connector (from connector_name()). run_id is a span attribute only — it’s high cardinality and never a Prometheus label.

Key metrics

  • Source: faucet_source_records_total, faucet_source_errors_total{kind}, faucet_source_page_duration_seconds, faucet_source_in_flight.
  • Sink: faucet_sink_records_total, faucet_sink_writes_total, faucet_sink_errors_total, faucet_sink_write_duration_seconds, faucet_sink_flush_duration_seconds, faucet_sink_in_flight.
  • Transform: faucet_transform_records_in_total, faucet_transform_records_out_total (use the out/in ratio for filter drop rate or explode fan-out), faucet_transform_errors_total{kind}, faucet_transform_duration_seconds.
  • State: faucet_state_{get,put,delete}_total (get carries outcome=hit|miss), faucet_state_errors_total{op,kind}, plus duration histograms.
  • Pipeline: faucet_pipeline_runs_total{status=ok|err,kind}, faucet_pipeline_run_duration_seconds, faucet_pipeline_in_flight, faucet_pipeline_seconds_since_last_bookmark, faucet_pipeline_last_bookmark_unix_seconds.
  • Build: faucet_build_info{version} is set to 1group_left it onto other metrics to annotate dashboards with the running version.

Reliability properties

  • Drop-guard timers sample durations even when a task is cancelled.
  • Panic isolation — a panicking connector surfaces as a Panic error kind rather than crashing the process.
  • Idempotent install — installing the recorder/subscriber twice warns rather than panics.

Cardinality rules

Never use high-cardinality values (record ids, URLs, query strings) as metric labels. parent_record_key in a DAG is a span attribute only. Connector authors must return a non-empty &'static str from connector_name().

Tracing

Spans carry run_id, pipeline, row, and per-operation timing. Point a tracing subscriber at your logging/trace backend; control verbosity with --log-level or FAUCET_LOG.

Full design: docs/superpowers/specs/2026-05-23-observability-otel-prometheus-design.md.

Performance tuning

faucet is built to be fast by default, but a few knobs let you trade memory for throughput on a given pipeline.

batch_size

The single most important knob. It bounds how many records are buffered and sets each sink’s natural write unit (multi-row INSERT, _bulk body, insertAll request, Redis pipeline, …).

  • Default: 1000. Max: 1,000,000.
  • Larger = fewer, bigger requests = more throughput, more memory per batch.
  • batch_size: 0 = “no batching”: the source emits the whole result set in one page and the sink writes it in one request. Use it for small lookup tables, or for sinks that prefer one large request (load-job-style ingestion).

Set it on the sink (authoritative) and/or source. Streaming keeps memory at O(batch_size) on both sides regardless of total volume.

Connection pooling

Database connectors use configurable pools — max_connections defaults to 10 for sources and 5 for sinks. Raise it for highly concurrent workloads; keep it under your database’s connection limit.

Concurrency

  • The REST source can process partitions concurrently (partition_concurrency).
  • S3/GCS sources and sinks read/write objects in parallel (buffer_unordered-style concurrency).
  • The HTTP sink sends per-record requests concurrently under a semaphore.
  • Kafka sink uses FuturesUnordered batched sends with QueueFull retry.
  • At the pipeline level, execution.max_concurrent bounds how many matrix rows run at once.

Retries

HTTP-based sources retry with exponential backoff + jitter on retriable failures. The backoff is capped at 60s and its jitter is decorrelated across concurrent retries (so a fleet of matrix rows doesn’t re-align into a thundering herd). The REST source additionally honors 429 / Retry-After (delta-seconds or an RFC 7231 HTTP-date). Tune max_retries and retry_backoff per connector. A permanently throttled endpoint surfaces a RateLimited error rather than hanging.

Choosing values

  • Many small rows, row-oriented sink (Postgres/MySQL): larger batch_size (5k–50k) for fat multi-row INSERTs.
  • Large objects (Parquet/S3): moderate batch_size; lean on parallel I/O.
  • Tiny lookup tables / COPY-style loads: batch_size: 0.
  • Memory-constrained host: smaller batch_size to cap per-batch footprint.

Measure with the metricsfaucet_sink_write_duration_seconds and faucet_source_page_duration_seconds tell you where time goes.

Benchmarks

faucet-core ships a criterion benchmark of the observability hot path, and CI guards it against a 5% regression on every PR. Run it locally with:

cargo bench -p faucet-core --bench observability

Numbers are hardware-dependent, so run the benchmark on your target machine rather than relying on published figures.

Troubleshooting & FAQ

My config won’t parse / validate

Run faucet validate <config> — it reports one line per expanded row. Common causes:

  • version missing or not 1 — the top-level version: 1 is required.
  • Old top-level source: / sink: — these must live under pipeline:. faucet rejects the pre-pipeline: shape with a hint.
  • Unknown connector type — run faucet list to see what’s compiled in; you may have a slim build without that feature.
  • InterpolationCycle — a ${vars.X} / template reference forms a loop.

A ${env:VAR} isn’t being substituted

Load-time interpolation reads the environment and a sibling .env. If the value is empty, the var isn’t set (or --no-env-file disabled the .env). Use --env-file PATH to point at a specific file.

“feature not enabled” / connector missing

Your binary was built without that connector. Reinstall with the feature: cargo install faucet-cli --features "source-foo,sink-bar", or use the full build (the default cargo install faucet-cli).

docs.rs shows fewer APIs than I expected

It shouldn’t anymore — every crate is configured to build with all features. If you’re looking at an old version, check the latest release.

Kafka connector fails to build

The Kafka crates build librdkafka, which needs cmake and a C toolchain. Make sure those are installed in your build environment (CI installs libsasl2-dev libssl-dev libcurl4-openssl-dev cmake build-essential).

Postgres CDC retains a lot of WAL

A CDC replication slot retains WAL until a run advances the bookmark. If you created a permanent slot and stopped running the pipeline, Postgres keeps WAL forever. Either run the pipeline regularly, drop the slot (PostgresCdcSource::drop_slot() or SELECT pg_drop_replication_slot(...)), or use slot_type: temporary for experiments. See the CDC tutorial.

Some records failed but I don’t want the run to abort

Attach a dead-letter queue so failing rows are captured and the rest commit.

Run is slower / using more memory than expected

Tune batch_size and concurrency — see Performance tuning. Use the metrics to find the bottleneck.

Where do I report a bug or request a connector?

Open an issue at github.com/PawanSikawat/faucet-stream/issues.

Authoring a connector

faucet-stream is designed as an ecosystem: third parties can publish their own faucet-source-* / faucet-sink-* crates with minimal friction. faucet-core is the only required dependency — it re-exports everything a connector author needs (async_trait, serde_json, schemars).

The traits

Implement Source or Sink. Both are object-safe (Box<dyn Source> works) and all newer methods have defaults, so a minimal connector is small.

use faucet_core::{async_trait, Source, Sink, FaucetError, Value};

struct MySource { /* reusable client/pool created in new() */ }

#[async_trait]
impl Source for MySource {
    // Primary entry point. (`fetch_all()` is a provided convenience.)
    async fn fetch_with_context(&self) -> Result<Vec<Value>, FaucetError> {
        todo!("fetch records from your system")
    }
}

struct MySink { /* reusable client/pool */ }

#[async_trait]
impl Sink for MySink {
    async fn write_batch(&self, records: &[Value]) -> Result<usize, FaucetError> {
        todo!("write records to your system")
    }
}

Your connector now works with the Pipeline and every other connector: Pipeline::new(&MySource { .. }, &MySink { .. }).run().await?.

Crate layout

Follow the same module layout as the built-in connectors:

  • lib.rs — re-export the config + the Source/Sink type. First line: #![cfg_attr(docsrs, feature(doc_cfg))] (see below).
  • config.rs — the config struct + sub-enums, deriving Serialize + Deserialize + JsonSchema. No I/O here.
  • stream.rs (source) / sink.rs (sink) — the one place that performs I/O. Create reusable clients/pools in new() and store them; never reconnect per call.

Make it fast

Performance is the project’s first principle. Reuse clients and connections, pool database connections, use multi-row inserts and bulk APIs, and prefer parallel I/O. Where it makes sense, override stream_pages to stream natively from your source’s paging primitive so memory stays bounded.

Config schema introspection

Implement config_schema() so faucet schema and faucet init work:

fn config_schema(&self) -> Value {
    faucet_core::schema_for!(MyConfig).into()
}

Derive JsonSchema on the config struct and all sub-types, and add #[schemars(with = "String")] for any custom-serde fields.

Errors

Map every failure to a FaucetError variant. Third-party error types wrap into FaucetError::Custom(Box<dyn Error + Send + Sync>) without losing the chain. Never .unwrap() on anything that can fail at runtime.

docs.rs setup

So docs.rs renders your full API with per-feature badges, add to Cargo.toml:

[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs"]

and make the first line of lib.rs #![cfg_attr(docsrs, feature(doc_cfg))].

Naming & publishing

Name crates faucet-source-<name> / faucet-sink-<name>. If you ship both a source and a sink for the same system, put shared types (auth, formats) in a faucet-common-<name> crate that both depend on and re-export.

See any built-in connector (e.g. faucet-source-rest) for a reference implementation.