Skip to main content
These endpoints manage streaming ingestion and change-data-capture (CDC) pipelines: starting and stopping connectors, inspecting lag, and reviewing health metrics. Authenticate with a Bearer API key. Base URL: https://api.planasonix.com
Authorization: Bearer YOUR_API_KEY
Content-Type: application/json
If your workspace uses a versioned base path (for example /v1), prepend it before /api. See API reference.

List streams

Returns all streams (CDC databases, Kafka-style topics, webhook streams) in the workspace. GET /api/streams

Request parameters

status
string
Filter by lifecycle state: created, starting, running, paused, stopped, error.
source_type
string
Filter by source family: postgres_cdc, mysql_cdc, sqlserver_cdc, kafka, webhook, and so on.
pipeline_id
string
Filter streams feeding a specific pipeline.
limit
integer
default:"50"
Page size.
cursor
string
Pagination cursor.

Example response

{
  "data": [
    {
      "id": "str_01j9k9z0a1b2c3d4",
      "type": "stream",
      "attributes": {
        "name": "prod_pg_orders_cdc",
        "source_type": "postgres_cdc",
        "pipeline_id": "pl_01hq8orders_rt",
        "status": "running",
        "created_at": "2025-02-10T08:00:00Z",
        "updated_at": "2025-03-27T10:00:00Z"
      }
    }
  ],
  "meta": {
    "page": { "limit": 50, "cursor": null }
  }
}

Create stream

Registers a new stream from a CDC or event source and binds it to downstream processing. POST /api/streams

Request body

name
string
required
Unique name within the workspace.
sourceType
string
required
postgres_cdc, mysql_cdc, sqlserver_cdc, kafka, webhook, or deployment-specific values.
pipelineId
string
required
Pipeline that consumes stream output.
connectionId
string
Source connection ID (required for database CDC and most brokers).
config
object
Type-specific options: publication/slot for Postgres, topic names, include/exclude tables, starting offsets, webhook signing secret, and so on.

Example response

{
  "data": {
    "id": "str_01j9k9z0a1b2c3d4",
    "type": "stream",
    "attributes": {
      "name": "prod_pg_orders_cdc",
      "source_type": "postgres_cdc",
      "pipeline_id": "pl_01hq8orders_rt",
      "status": "created",
      "config": {
        "publication": "planasonix_orders_pub",
        "slot_name": "planasonix_orders_slot",
        "tables": ["public.orders", "public.order_items"]
      },
      "created_at": "2025-03-27T18:00:00Z",
      "updated_at": "2025-03-27T18:00:00Z"
    }
  }
}

Streams summary

Returns aggregate counts and high-level health for dashboard use. GET /api/streams/summary

Request parameters

None.

Example response

{
  "data": {
    "type": "streams_summary",
    "attributes": {
      "total": 14,
      "by_status": {
        "running": 11,
        "paused": 1,
        "stopped": 2,
        "error": 0
      },
      "approx_events_per_minute": 18420,
      "computed_at": "2025-03-27T18:05:00Z"
    }
  }
}

Delete stream

Tears down the stream, releases CDC slots where applicable, and removes workspace metadata. DELETE /api/streams/{id}

Request parameters

id
string
required
Stream ID.
force
boolean
default:"false"
When true, best-effort teardown even if the connector reports active consumers (use with care).

Example response

{
  "data": {
    "id": "str_01j9k9z0a1b2c3d4",
    "type": "stream",
    "attributes": {
      "deleted": true,
      "deleted_at": "2025-03-27T18:10:00Z"
    }
  }
}

Start streaming

Allocates workers and begins reading from the source (or resumes from last committed offset). POST /api/streams/{id}/start

Request parameters

id
string
required
Stream ID.

Example response

{
  "data": {
    "id": "str_01j9k9z0a1b2c3d4",
    "type": "stream",
    "attributes": {
      "status": "starting",
      "updated_at": "2025-03-27T18:11:00Z"
    }
  }
}

Stop streaming

Stops ingestion and commits offsets where the engine supports graceful shutdown. POST /api/streams/{id}/stop

Request parameters

id
string
required
Stream ID.

Example response

{
  "data": {
    "id": "str_01j9k9z0a1b2c3d4",
    "type": "stream",
    "attributes": {
      "status": "stopped",
      "updated_at": "2025-03-27T18:12:00Z"
    }
  }
}

Pause streaming

Temporarily halts reading while keeping slot or subscription state for quick resume. POST /api/streams/{id}/pause

Request parameters

id
string
required
Stream ID.

Example response

{
  "data": {
    "id": "str_01j9k9z0a1b2c3d4",
    "type": "stream",
    "attributes": {
      "status": "paused",
      "updated_at": "2025-03-27T18:13:00Z"
    }
  }
}

Resume streaming

Resumes a paused stream. POST /api/streams/{id}/resume

Request parameters

id
string
required
Stream ID.

Example response

{
  "data": {
    "id": "str_01j9k9z0a1b2c3d4",
    "type": "stream",
    "attributes": {
      "status": "running",
      "updated_at": "2025-03-27T18:14:00Z"
    }
  }
}

Stream metrics

Returns throughput, consumer lag, and error counters for observability. GET /api/streams/{id}/metrics

Request parameters

id
string
required
Stream ID.
window
string
default:"5m"
Rolling window for rates: 1m, 5m, 1h, 24h.

Example response

{
  "data": {
    "id": "str_01j9k9z0a1b2c3d4",
    "type": "stream_metrics",
    "attributes": {
      "stream_id": "str_01j9k9z0a1b2c3d4",
      "status": "running",
      "events_per_second": 312.4,
      "bytes_per_second": 1048576,
      "lag_seconds": 2.8,
      "error_rate_per_minute": 0.02,
      "last_event_at": "2025-03-27T18:14:58Z",
      "window": "5m",
      "computed_at": "2025-03-27T18:15:00Z"
    }
  }
}