Skip to content

Subscriptions

channel.subscribe(pattern, callback) registers a callback for messages matching the pattern:

import { getChannel } from "postal";

const ch = getChannel("orders");

const unsub = ch.subscribe("item.*", envelope => {
    console.log(envelope.topic, envelope.payload);
});

The callback receives the full Envelope, which includes id, type, channel, topic, payload, and timestamp.

subscribe returns an unsubscribe function. Call it to remove the subscription:

const unsub = ch.subscribe("item.*", callback);

// Later...
unsub();

Calling the unsubscribe function multiple times is safe — subsequent calls are no-ops.

Multiple subscribers can match the same topic. All of them receive the envelope:

ch.subscribe("order.placed", logToConsole);
ch.subscribe("order.placed", updateDashboard);
ch.subscribe("order.*", auditAllOrders);

// All three fire when "order.placed" is published
ch.publish("order.placed", { id: "abc" });

If a subscriber throws, the remaining subscribers still execute. Errors are collected and re-thrown as a single AggregateError after all subscribers have run:

ch.subscribe("order.placed", () => {
    throw new Error("boom");
});
ch.subscribe("order.placed", env => {
    // This still runs
    console.log("Got order:", env.payload);
});

try {
    ch.publish("order.placed", { id: "abc" });
} catch (err) {
    // AggregateError with the "boom" error
}

channel.publish(topic, payload) creates an envelope and dispatches it to all matching subscribers:

ch.publish("order.placed", { sku: "WIDGET-42", qty: 3 });

Topics are exact strings — no wildcards in publish. Wildcards are only for subscription patterns.

postal includes built-in RPC via request() and handle().

const unhandle = ch.handle("quote.request", envelope => {
    const { sku, qty } = envelope.payload;
    return { total: qty * 9.99, currency: "USD" };
});

Only one handler can be registered per topic per channel. Attempting to register a second handler for the same topic throws immediately.

Handlers can be synchronous or async:

ch.handle("user.lookup", async envelope => {
    const user = await db.findUser(envelope.payload.id);
    return { name: user.name, email: user.email };
});
const quote = await ch.request("quote.request", { sku: "WIDGET-42", qty: 5 });
// quote is { total: 49.95, currency: "USD" }

request() returns a Promise that resolves with the handler’s return value.

Requests time out after 5 seconds by default. Customize with the timeout option:

const result = await ch.request("slow.operation", payload, {
    timeout: 15000, // 15 seconds
});

If no response arrives in time, the Promise rejects with PostalTimeoutError:

try {
    await ch.request("quote.request", data, { timeout: 1000 });
} catch (err) {
    if (err instanceof PostalTimeoutError) {
        console.log(err.channel, err.topic, err.timeout);
    }
}

If a handler throws, the error is caught and relayed to the requester as PostalRpcError:

ch.handle("risky.operation", () => {
    throw new Error("something went wrong");
});

try {
    await ch.request("risky.operation", {});
} catch (err) {
    // err is PostalRpcError with message "something went wrong"
}

Handlers can throw errors with a code property for machine-readable classification:

const err = new Error("Not found");
(err as any).code = "NOT_FOUND";
throw err;
// Requester receives PostalRpcError with .code === "NOT_FOUND"

Request envelopes flow through the normal subscriber list too — any subscriber matching the topic will see it. This is useful for logging or auditing RPC traffic:

ch.subscribe("#", env => {
    if (env.type === "request") {
        console.log("RPC request:", env.topic);
    }
});
  • Wire Taps — global observers across all channels
  • Concepts — channels, topics, wildcards, and envelopes