Documentation
@businys/ops
Production MCP middleware. Auth, Observer Mode, stdio Bridge, OpenTelemetry, Agent Lineage, rate limiting, metering, audit logging, and agent reputation — extracted from the production stack at businys.app. MIT licensed. Zero external dependencies.
Installation
npm install @businys/ops
# or
pnpm add @businys/opsAuth middleware
Validate API keys before any tool call reaches your handlers. Supports static key maps, per-agent scoping, wildcard scopes, and an async validator for database-backed keys.
import { createAuthMiddleware } from "@businys/ops"
const auth = createAuthMiddleware({
keys: {
"sk-prod-abc123": {
agentId: "agent-prod", // override ctx.agentId from key
scopes: ["*"], // allow all tools
},
"sk-readonly-xyz": {
scopes: ["get_*", "list_clients"], // prefix wildcard + exact
},
},
// optional: async lookup for DB-backed keys
validate: async (token) => {
const key = await db.apiKeys.findUnique({ where: { key: token } })
if (!key) return null
return { agentId: key.agentId, scopes: key.scopes }
},
})A missing or unrecognised token returns 401. A valid token calling an out-of-scope tool returns 403. Scope rules: * allows all tools, get_* matches any tool starting with get_, exact names match literally.
The Bridge extracts the bearer token from the Authorization header and populates ctx.bearerToken automatically, so auth middleware works without any wiring.
Observer Mode
The fastest way to see what your MCP server is doing. One import, no account, no config. A local dashboard opens at localhost:3100 and streams every tool call in real time.
import { observe } from "@businys/ops"
const ops = await observe()
// → Dashboard running at http://localhost:3100
// Add ops.middleware to your MCP server pipeline
// Every tool call is now visible in the dashboardOptions
const ops = await observe({
port: 3200, // default: 3100
hostname: "0.0.0.0", // default: "localhost"
maxCalls: 500, // default: 200 (ring buffer)
token: "secret", // gates dashboard + API with Bearer auth
storage: myAdapter, // plug in FileAdapter or PostgresAdapter
})
console.log(ops.url) // "http://0.0.0.0:3200"
// Programmatic access
const calls = await ops.storage.getCalls({ limit: 20 })
const stats = await ops.storage.getStats()
// Shut down
await ops.close()Dashboard API
The Observer dashboard exposes a small HTTP API you can query directly:
GET /api/calls # paginated call history
GET /api/stats # aggregated stats
GET /api/reputation/:id # agent reputation record
GET /events # SSE stream of new callsCLI
The businys-ops CLI wraps Observer Mode and the stdio Bridge as standalone processes.
# Scaffold a config file in the current directory
npx @businys/ops init
# Observer dashboard (in-memory, ephemeral)
npx @businys/ops observe
npx @businys/ops observe --port 3200 --token mysecret
# Dev mode: file-backed storage + optional bridge, one command
npx @businys/ops dev # dashboard only
npx @businys/ops dev node ./my-server.js # dashboard + bridge
npx @businys/ops dev --port 3200 --token sk # with auth
# Ctrl+C cleanly closes bridge and flushes storage to disk
# Check stats from a running instance
npx @businys/ops status
# Wrap any stdio MCP server as managed HTTP
npx @businys/ops bridge node ./my-server.js
npx @businys/ops bridge python server.py --port 3200
npx @businys/ops bridge npx some-mcp-server --name my-toolsstdio Bridge
Wrap any stdio MCP server as a managed Streamable HTTP endpoint with the full middleware pipeline. Works with any server — Node, Python, npx.
import { createBridge } from "@businys/ops"
const bridge = await createBridge(["node", "my-server.js"], {
port: 3100,
// proxy: createMCPProxy({ rateLimit: { globalMax: 100 } }),
})
console.log(bridge.url) // http://localhost:3100
// GET /health → { status: "ok", serverInfo: { ... } }
// POST / → JSON-RPC with full middleware pipeline
await bridge.close()OpenTelemetry
Pass any OTel-compatible Tracer — zero new dependencies. The middleware uses structural typing so it works with any OTel SDK version.
import { createMCPProxy } from "@businys/ops"
import { trace } from "@opentelemetry/api"
const proxy = createMCPProxy({
telemetry: {
tracer: trace.getTracer("my-mcp-server"),
recordInput: true, // attach input JSON as span attribute
inputMaxLength: 500, // truncate long inputs
attributePrefix: "mcp", // default
},
})Span name format: serverName/toolName. Standard attributes include mcp.tool.name, mcp.agent.id, mcp.duration_ms, mcp.is_error, and mcp.tool.destructive.
Agent Lineage
A causal DAG that traces every tool call back to the originating human intent — through every agent delegation, with cryptographic integrity. Built for the EU AI Act, SOC 2 auditors, and multi-agent workflows.
import { createMCPProxy, MemoryLineageStore, verifyLineage } from "@businys/ops"
const store = new MemoryLineageStore()
const proxy = createMCPProxy({
lineage: { store },
})
// After calls complete, verify the chain is intact
const result = await verifyLineage(rootId, store)
console.log(result.valid) // true iff all hashes check out
console.log(result.maxDepth) // delegation depth reached
console.log(result.errors) // [] if validHeader propagation
When using the Bridge, lineage context flows automatically via HTTP headers. Downstream agents receive X-Lineage-Id in the response and can pass it forward as X-Lineage-Parent to chain delegations.
// Inbound (set by calling agent)
X-Lineage-Root: <rootId> // originating human prompt
X-Lineage-Parent: <parentId> // immediate parent node
X-Lineage-Depth: <number> // delegation depth (0 = human)
// Outbound (returned by bridge)
X-Lineage-Id: <nodeId> // id of the node created for this callCustom LineageStore
Swap MemoryLineageStore for any persistent backend by implementing the LineageStore interface:
import type { LineageStore, LineageNode } from "@businys/ops"
const myStore: LineageStore = {
async recordNode(node: LineageNode) { /* persist to DB */ },
async getChain(rootId: string) { /* return nodes sorted by timestamp */ },
async getNode(id: string) { /* lookup single node */ },
}createMCPProxy
The batteries-included factory. Pipeline order: lineage → telemetry → reputation → rate limit → confirmation → metering → audit → custom.
import { createMCPProxy, MemoryLineageStore } from "@businys/ops"
const proxy = createMCPProxy({
rateLimit: {
globalMax: 1000, // calls per window across all agents
groupMax: 100, // calls per window per agent
windowMs: 3600000 // 1 hour
},
lineage: { store: new MemoryLineageStore() },
// telemetry: { tracer },
auditLog: (entry) => console.log(entry), // default: stderr
disable: {
confirmation: true, // disable specific layers
},
middleware: [myCustomMiddleware], // append custom layers
})
// proxy.middleware — array of assembled middleware
// proxy.storage — the MemoryAdapter
// proxy.run(ctx, handler) — run pipeline directlyWriting middleware
import type { Middleware } from "@businys/ops"
const myMiddleware: Middleware = {
name: "my-middleware",
async execute(ctx, next) {
console.log("Before:", ctx.toolName)
const result = await next()
console.log("After:", result.isError ? "error" : "ok")
return result
},
}MiddlewareContext
interface MiddlewareContext {
toolName: string
toolGroup: string
toolTier: "core" | "craft" | "kit"
method: string
path: string
input: Record<string, unknown>
agentId: string
serverName: string
startedAt: number // Date.now()
destructive: boolean // true = confirmation required
bearerToken?: string // raw token extracted by Bridge
}FileAdapter
Persist call history and reputation to a local JSON file with zero dependencies. State survives process restarts. Uses a debounced flush (default 2s) and a synchronous flush on process.exit so no data is lost.
import { FileAdapter, observe } from "@businys/ops"
const storage = new FileAdapter({
path: "./mcp-ops.json",
maxCalls: 1000, // ring buffer size (default: 200)
flushDebounceMs: 2000, // default: 2000
})
const ops = await observe({ storage })
// Flush immediately and release file handle
await storage.close()PostgresAdapter
Durable PostgreSQL storage for production and multi-instance deployments. Schema is created automatically on first connect(). Requires the pg optional peer dependency.
import { PostgresAdapter, observe } from "@businys/ops"
# npm install pg
const storage = new PostgresAdapter({
connectionString: process.env.DATABASE_URL,
// pool: existingPgPool, // or pass a pre-configured pool
maxCalls: 10000,
})
await storage.connect() // creates schema, must call before use
const ops = await observe({ storage })
await storage.close() // releases poolSchema: ops_calls, ops_reputations, ops_loop_buffers. All DDL uses CREATE TABLE IF NOT EXISTS — safe to call connect() multiple times. Reputation updates are atomic UPSERTs with no read-modify-write race.
MCP Revenue
Per-call billing instrumentation for MCP servers. Credit wallets, configurable pricing, budget enforcement, and a full ledger — without writing billing infrastructure from scratch.
import { createRevenueMiddleware, MemoryAdapter } from "@businys/ops"
const storage = new MemoryAdapter()
// Top up an agent's wallet
await storage.topUpCredits("agent-123", 100, "Initial grant")
const revenue = createRevenueMiddleware(storage, {
pricing: {
default: { perCall: 1 }, // 1 credit for most tools
overrides: {
create_invoice: { perCall: 10 }, // premium actions cost more
get_clients: { perCall: 0 }, // free reads
},
},
enforceBudget: true, // default: true — blocks calls when balance < cost
})When enforceBudget is true (the default), calls are blocked with a 402 status and an insufficient_credits error message before reaching your handler. Credits are only deducted after a successful call — error responses are never charged.
// Check an agent's wallet
const wallet = await storage.getWallet("agent-123")
// {
// agentId: "agent-123",
// balance: 89,
// totalSpent: 11,
// totalCharged: 100,
// createdAt: 1700000000000,
// updatedAt: 1700000001234,
// }
// Full ledger (most recent first)
const ledger = await storage.getLedger("agent-123", { limit: 50 })
// [{ type: "charge", amount: 10, toolName: "create_invoice", ... }, ...]Wiring into createMCPProxy
import { createMCPProxy } from "@businys/ops"
const proxy = createMCPProxy({
revenue: {
pricing: { default: { perCall: 1 } },
enforceBudget: true,
},
// Revenue runs after confirmation — destructive unconfirmed calls are never charged
})Record-only mode
Set enforceBudget: false to track spend without blocking calls. Wallets can go negative — useful for post-hoc billing or metered plans where you invoice at the end of a period.
Custom storage
The default MemoryAdapter is a ring buffer. For other backends, implement StorageAdapter:
import type { StorageAdapter } from "@businys/ops"
const myAdapter: StorageAdapter = {
async recordCall(record) { /* persist to DB */ },
async getCalls(opts) { /* paginated query */ },
async getStats() { /* aggregations */ },
async getReputation(agentId) { /* fetch rep record */ },
async updateReputation(agentId, signal) { /* update score */ },
async checkLoop(agentId, hash) { /* ring buffer check */ },
subscribe(listener) { /* SSE push, return unsub fn */ },
// Wallet (required for MCP Revenue)
async getWallet(agentId) { /* fetch wallet */ },
async deductCredits(agentId, amount, meta) { /* atomic deduct */ },
async topUpCredits(agentId, amount, description) { /* credit wallet */ },
async getLedger(agentId, opts) { /* paginated ledger */ },
}HostedAdapter
Stream call records, reputation events, and stats to the hosted dashboard at businys.dev. Persistent storage, anomaly detection, compliance export, and team access — without running your own database.
Get an API key by creating a project at businys.dev, or use the CLI:
# Link your server to the hosted dashboard in one step
npx @businys/ops deploy --key YOUR_API_KEYOr wire it manually:
import { HostedAdapter, observe } from "@businys/ops"
const storage = new HostedAdapter({
apiKey: process.env.BDEV_API_KEY, // from dashboard → Settings → API Keys
projectId: process.env.BDEV_PROJECT_ID,
// flushIntervalMs: 5000, // default: 5000
// maxBatchSize: 100, // default: 100
})
const ops = await observe({ storage })
server.use(ops.middleware)
// Every tool call now appears in the hosted dashboard in real timeEnvironment variables
BDEV_API_KEY=bdev_live_...
BDEV_PROJECT_ID=... # UUID from project settingsThe adapter batches records and flushes on an interval (default 5s) and on process exit. Calls are buffered locally if the ingest endpoint is unreachable — no data is lost during brief network interruptions.