Embedding faucet as a Rust library
The faucet CLI is a thin wrapper over the same library you can use directly.
Embedding gives you typed configs, compile-time connector selection, and the
ability to build a Source or Sink from your own code.
Add the dependency
[dependencies]
faucet-stream = { version = "1.0", features = ["source-rest", "sink-bigquery"] }
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
Build and run a pipeline
use faucet_stream::source::rest::{RestStream, RestStreamConfig, Auth, PaginationStyle};
use faucet_stream::sink::bigquery::{BigQuerySink, BigQuerySinkConfig};
use faucet_stream::Pipeline;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let source = RestStream::new(RestStreamConfig {
base_url: "https://api.example.com".into(),
path: "/v1/events".into(),
auth: Auth::Bearer { token: std::env::var("API_TOKEN")? },
..Default::default()
})?;
let sink = BigQuerySink::new(/* BigQuerySinkConfig { .. } */).await?;
let result = Pipeline::new(&source, &sink).run().await?;
println!("moved {} records", result.records_written);
Ok(())
}
Exact field names and constructors are documented per crate on docs.rs (rendered with all features, so every connector’s API is visible). Treat the snippet above as the shape, not the literal field list.
Applying transforms
faucet_stream::TransformingSource is the library entry point for attaching
transforms to any source. It wraps a Box<dyn Source> with a flat list of
RecordTransforms applied to every record emitted via fetch_* and
stream_pages.
use faucet_stream::{
KeyCaseMode, Labels, RecordTransform, Source, TransformingSource,
};
let inner: Box<dyn Source> = Box::new(my_source);
let source = TransformingSource::new(
inner,
vec![
RecordTransform::Flatten { separator: "__".into() },
RecordTransform::KeysCase { mode: KeyCaseMode::Snake },
RecordTransform::custom(|mut record| {
if let serde_json::Value::Object(ref mut map) = record {
map.insert("_ingested_at".into(), serde_json::json!("2026-05-28T00:00:00Z"));
}
record
}),
],
Labels::for_named("my-source"),
)?;
// `source` is now a `Source` that streams the inner source's pages with
// transforms applied per page — memory stays bounded by `batch_size` even on
// large result sets.
Transforms compile eagerly inside new() — an invalid regex in RenameKeys
surfaces immediately as FaucetError::Transform, not at first record.
Labels::for_named(name) is the convenient constructor for library callers
(the CLI uses its own Labels carrying the pipeline / row / run-id triple).
The wrapper emits faucet_transform_records_in_total /
faucet_transform_records_out_total (use the out/in ratio for filter drop
rate or explode fan-out), faucet_transform_duration_seconds, and
faucet_transform_errors_total per page through the standard observability
stack.
For configuration-driven users (the faucet binary), transforms are declared
in YAML — see the transforms cookbook for the
three-layer model and per-layer opt-out.
Durable state and streaming
Wire a state store for resumable runs, and use the streaming entry point when you want to control batching explicitly:
use std::sync::Arc;
use faucet_stream::{Pipeline, FileStateStore};
let state = Arc::new(FileStateStore::new("./state")?);
let result = Pipeline::new(&source, &sink)
.with_state_store(state)
.run()
.await?;
The pipeline reads the bookmark before fetching and persists a new one only after the sink confirms each page — so a crash never loses unwritten data.
Why embed instead of shelling out to the CLI?
- Typed configs — config structs implement
serde+JsonSchema, so you get compile-time checking and can generate UIs/forms from the schema. - Custom connectors — implement the
Source/Sinktraits for systems we don’t ship, and run them through the samePipeline. See authoring a connector. - One process — no subprocess, no temp config files; integrate pipelines into an existing service, job runner, or test harness.