businys.businys.dev

Documentation

@businys/ops

Production MCP middleware. Auth, Observer Mode, stdio Bridge, OpenTelemetry, Agent Lineage, rate limiting, metering, audit logging, and agent reputation — extracted from the production stack at businys.app. MIT licensed. Zero external dependencies.

Installation

npm install @businys/ops
# or
pnpm add @businys/ops

Auth middleware

Validate API keys before any tool call reaches your handlers. Supports static key maps, per-agent scoping, wildcard scopes, and an async validator for database-backed keys.

import { createAuthMiddleware } from "@businys/ops"

const auth = createAuthMiddleware({
  keys: {
    "sk-prod-abc123": {
      agentId: "agent-prod",   // override ctx.agentId from key
      scopes: ["*"],           // allow all tools
    },
    "sk-readonly-xyz": {
      scopes: ["get_*", "list_clients"], // prefix wildcard + exact
    },
  },
  // optional: async lookup for DB-backed keys
  validate: async (token) => {
    const key = await db.apiKeys.findUnique({ where: { key: token } })
    if (!key) return null
    return { agentId: key.agentId, scopes: key.scopes }
  },
})

A missing or unrecognised token returns 401. A valid token calling an out-of-scope tool returns 403. Scope rules: * allows all tools, get_* matches any tool starting with get_, exact names match literally.

The Bridge extracts the bearer token from the Authorization header and populates ctx.bearerToken automatically, so auth middleware works without any wiring.

Observer Mode

The fastest way to see what your MCP server is doing. One import, no account, no config. A local dashboard opens at localhost:3100 and streams every tool call in real time.

import { observe } from "@businys/ops"

const ops = await observe()
// → Dashboard running at http://localhost:3100

// Add ops.middleware to your MCP server pipeline
// Every tool call is now visible in the dashboard

Options

const ops = await observe({
  port: 3200,              // default: 3100
  hostname: "0.0.0.0",    // default: "localhost"
  maxCalls: 500,           // default: 200 (ring buffer)
  token: "secret",         // gates dashboard + API with Bearer auth
  storage: myAdapter,      // plug in FileAdapter or PostgresAdapter
})

console.log(ops.url)   // "http://0.0.0.0:3200"

// Programmatic access
const calls = await ops.storage.getCalls({ limit: 20 })
const stats = await ops.storage.getStats()

// Shut down
await ops.close()

Dashboard API

The Observer dashboard exposes a small HTTP API you can query directly:

GET /api/calls          # paginated call history
GET /api/stats          # aggregated stats
GET /api/reputation/:id # agent reputation record
GET /events             # SSE stream of new calls

CLI

The businys-ops CLI wraps Observer Mode and the stdio Bridge as standalone processes.

# Scaffold a config file in the current directory
npx @businys/ops init

# Observer dashboard (in-memory, ephemeral)
npx @businys/ops observe
npx @businys/ops observe --port 3200 --token mysecret

# Dev mode: file-backed storage + optional bridge, one command
npx @businys/ops dev                         # dashboard only
npx @businys/ops dev node ./my-server.js     # dashboard + bridge
npx @businys/ops dev --port 3200 --token sk  # with auth
# Ctrl+C cleanly closes bridge and flushes storage to disk

# Check stats from a running instance
npx @businys/ops status

# Wrap any stdio MCP server as managed HTTP
npx @businys/ops bridge node ./my-server.js
npx @businys/ops bridge python server.py --port 3200
npx @businys/ops bridge npx some-mcp-server --name my-tools

stdio Bridge

Wrap any stdio MCP server as a managed Streamable HTTP endpoint with the full middleware pipeline. Works with any server — Node, Python, npx.

import { createBridge } from "@businys/ops"

const bridge = await createBridge(["node", "my-server.js"], {
  port: 3100,
  // proxy: createMCPProxy({ rateLimit: { globalMax: 100 } }),
})

console.log(bridge.url)  // http://localhost:3100
// GET  /health → { status: "ok", serverInfo: { ... } }
// POST /        → JSON-RPC with full middleware pipeline

await bridge.close()

OpenTelemetry

Pass any OTel-compatible Tracer — zero new dependencies. The middleware uses structural typing so it works with any OTel SDK version.

import { createMCPProxy } from "@businys/ops"
import { trace } from "@opentelemetry/api"

const proxy = createMCPProxy({
  telemetry: {
    tracer: trace.getTracer("my-mcp-server"),
    recordInput: true,    // attach input JSON as span attribute
    inputMaxLength: 500,  // truncate long inputs
    attributePrefix: "mcp", // default
  },
})

Span name format: serverName/toolName. Standard attributes include mcp.tool.name, mcp.agent.id, mcp.duration_ms, mcp.is_error, and mcp.tool.destructive.

Agent Lineage

A causal DAG that traces every tool call back to the originating human intent — through every agent delegation, with cryptographic integrity. Built for the EU AI Act, SOC 2 auditors, and multi-agent workflows.

import { createMCPProxy, MemoryLineageStore, verifyLineage } from "@businys/ops"

const store = new MemoryLineageStore()

const proxy = createMCPProxy({
  lineage: { store },
})

// After calls complete, verify the chain is intact
const result = await verifyLineage(rootId, store)
console.log(result.valid)     // true iff all hashes check out
console.log(result.maxDepth)  // delegation depth reached
console.log(result.errors)    // [] if valid

Header propagation

When using the Bridge, lineage context flows automatically via HTTP headers. Downstream agents receive X-Lineage-Id in the response and can pass it forward as X-Lineage-Parent to chain delegations.

// Inbound (set by calling agent)
X-Lineage-Root:   <rootId>   // originating human prompt
X-Lineage-Parent: <parentId> // immediate parent node
X-Lineage-Depth:  <number>   // delegation depth (0 = human)

// Outbound (returned by bridge)
X-Lineage-Id:     <nodeId>   // id of the node created for this call

Custom LineageStore

Swap MemoryLineageStore for any persistent backend by implementing the LineageStore interface:

import type { LineageStore, LineageNode } from "@businys/ops"

const myStore: LineageStore = {
  async recordNode(node: LineageNode) { /* persist to DB */ },
  async getChain(rootId: string) { /* return nodes sorted by timestamp */ },
  async getNode(id: string) { /* lookup single node */ },
}

createMCPProxy

The batteries-included factory. Pipeline order: lineage → telemetry → reputation → rate limit → confirmation → metering → audit → custom.

import { createMCPProxy, MemoryLineageStore } from "@businys/ops"

const proxy = createMCPProxy({
  rateLimit: {
    globalMax: 1000,  // calls per window across all agents
    groupMax: 100,    // calls per window per agent
    windowMs: 3600000 // 1 hour
  },
  lineage: { store: new MemoryLineageStore() },
  // telemetry: { tracer },
  auditLog: (entry) => console.log(entry), // default: stderr
  disable: {
    confirmation: true, // disable specific layers
  },
  middleware: [myCustomMiddleware], // append custom layers
})

// proxy.middleware — array of assembled middleware
// proxy.storage    — the MemoryAdapter
// proxy.run(ctx, handler) — run pipeline directly

Writing middleware

import type { Middleware } from "@businys/ops"

const myMiddleware: Middleware = {
  name: "my-middleware",
  async execute(ctx, next) {
    console.log("Before:", ctx.toolName)
    const result = await next()
    console.log("After:", result.isError ? "error" : "ok")
    return result
  },
}

MiddlewareContext

interface MiddlewareContext {
  toolName:    string
  toolGroup:   string
  toolTier:    "core" | "craft" | "kit"
  method:      string
  path:        string
  input:       Record<string, unknown>
  agentId:     string
  serverName:  string
  startedAt:   number    // Date.now()
  destructive: boolean   // true = confirmation required
  bearerToken?: string   // raw token extracted by Bridge
}

FileAdapter

Persist call history and reputation to a local JSON file with zero dependencies. State survives process restarts. Uses a debounced flush (default 2s) and a synchronous flush on process.exit so no data is lost.

import { FileAdapter, observe } from "@businys/ops"

const storage = new FileAdapter({
  path: "./mcp-ops.json",
  maxCalls: 1000,         // ring buffer size (default: 200)
  flushDebounceMs: 2000,  // default: 2000
})

const ops = await observe({ storage })

// Flush immediately and release file handle
await storage.close()

PostgresAdapter

Durable PostgreSQL storage for production and multi-instance deployments. Schema is created automatically on first connect(). Requires the pg optional peer dependency.

import { PostgresAdapter, observe } from "@businys/ops"
# npm install pg

const storage = new PostgresAdapter({
  connectionString: process.env.DATABASE_URL,
  // pool: existingPgPool,  // or pass a pre-configured pool
  maxCalls: 10000,
})

await storage.connect()   // creates schema, must call before use
const ops = await observe({ storage })

await storage.close()     // releases pool

Schema: ops_calls, ops_reputations, ops_loop_buffers. All DDL uses CREATE TABLE IF NOT EXISTS — safe to call connect() multiple times. Reputation updates are atomic UPSERTs with no read-modify-write race.

MCP Revenue

Per-call billing instrumentation for MCP servers. Credit wallets, configurable pricing, budget enforcement, and a full ledger — without writing billing infrastructure from scratch.

import { createRevenueMiddleware, MemoryAdapter } from "@businys/ops"

const storage = new MemoryAdapter()

// Top up an agent's wallet
await storage.topUpCredits("agent-123", 100, "Initial grant")

const revenue = createRevenueMiddleware(storage, {
  pricing: {
    default: { perCall: 1 },               // 1 credit for most tools
    overrides: {
      create_invoice: { perCall: 10 },     // premium actions cost more
      get_clients:    { perCall: 0 },      // free reads
    },
  },
  enforceBudget: true,  // default: true — blocks calls when balance < cost
})

When enforceBudget is true (the default), calls are blocked with a 402 status and an insufficient_credits error message before reaching your handler. Credits are only deducted after a successful call — error responses are never charged.

// Check an agent's wallet
const wallet = await storage.getWallet("agent-123")
// {
//   agentId: "agent-123",
//   balance: 89,
//   totalSpent: 11,
//   totalCharged: 100,
//   createdAt: 1700000000000,
//   updatedAt: 1700000001234,
// }

// Full ledger (most recent first)
const ledger = await storage.getLedger("agent-123", { limit: 50 })
// [{ type: "charge", amount: 10, toolName: "create_invoice", ... }, ...]

Wiring into createMCPProxy

import { createMCPProxy } from "@businys/ops"

const proxy = createMCPProxy({
  revenue: {
    pricing: { default: { perCall: 1 } },
    enforceBudget: true,
  },
  // Revenue runs after confirmation — destructive unconfirmed calls are never charged
})

Record-only mode

Set enforceBudget: false to track spend without blocking calls. Wallets can go negative — useful for post-hoc billing or metered plans where you invoice at the end of a period.

Custom storage

The default MemoryAdapter is a ring buffer. For other backends, implement StorageAdapter:

import type { StorageAdapter } from "@businys/ops"

const myAdapter: StorageAdapter = {
  async recordCall(record) { /* persist to DB */ },
  async getCalls(opts) { /* paginated query */ },
  async getStats() { /* aggregations */ },
  async getReputation(agentId) { /* fetch rep record */ },
  async updateReputation(agentId, signal) { /* update score */ },
  async checkLoop(agentId, hash) { /* ring buffer check */ },
  subscribe(listener) { /* SSE push, return unsub fn */ },

  // Wallet (required for MCP Revenue)
  async getWallet(agentId) { /* fetch wallet */ },
  async deductCredits(agentId, amount, meta) { /* atomic deduct */ },
  async topUpCredits(agentId, amount, description) { /* credit wallet */ },
  async getLedger(agentId, opts) { /* paginated ledger */ },
}

HostedAdapter

Stream call records, reputation events, and stats to the hosted dashboard at businys.dev. Persistent storage, anomaly detection, compliance export, and team access — without running your own database.

Get an API key by creating a project at businys.dev, or use the CLI:

# Link your server to the hosted dashboard in one step
npx @businys/ops deploy --key YOUR_API_KEY

Or wire it manually:

import { HostedAdapter, observe } from "@businys/ops"

const storage = new HostedAdapter({
  apiKey: process.env.BDEV_API_KEY,  // from dashboard → Settings → API Keys
  projectId: process.env.BDEV_PROJECT_ID,
  // flushIntervalMs: 5000,  // default: 5000
  // maxBatchSize: 100,       // default: 100
})

const ops = await observe({ storage })
server.use(ops.middleware)
// Every tool call now appears in the hosted dashboard in real time

Environment variables

BDEV_API_KEY=bdev_live_...
BDEV_PROJECT_ID=...  # UUID from project settings

The adapter batches records and flushes on an interval (default 5s) and on process exit. Calls are buffered locally if the ingest endpoint is unreachable — no data is lost during brief network interruptions.

Roadmap

Observer Mode — zero-config dashboard + SSE stream
Auth middleware — API key validation, scoping, async validator
Rate limiting — per-agent, per-group, sliding window
Metering — call counting, cost attribution
Audit log — structured JSON, append-only
Agent reputation — score-based throttling + loop detection
OpenTelemetry — zero-dep structural typing, bring your own Tracer
stdio Bridge — wrap any stdio MCP server as managed HTTP
Agent Lineage — causal DAG, SHA-256 hash chain, EU AI Act ready
FileAdapter — zero-dependency JSON file persistence
PostgresAdapter — durable SQL storage, atomic reputation UPSERTs
CLI init + dev — scaffold config, file-backed dev mode
MCP Revenue — per-call billing, credit wallets, budget enforcement, ledger