> ## Documentation Index
> Fetch the complete documentation index at: https://docs.planasonix.com/llms.txt
> Use this file to discover all available pages before exploring further.

# Streams

> Manage real-time data streams and CDC sources.

These endpoints manage **streaming ingestion** and **change-data-capture (CDC)** pipelines: starting and stopping connectors, inspecting lag, and reviewing health metrics. Authenticate with a [Bearer API key](/api-reference/authentication).

**Base URL:** `https://api.planasonix.com`

```http theme={null}
Authorization: Bearer YOUR_API_KEY
Content-Type: application/json
```

<Info>
  If your workspace uses a versioned base path (for example `/v1`), prepend it before `/api`. See [API reference](/api-reference/introduction).
</Info>

***

## List streams

Returns all streams (CDC databases, Kafka-style topics, webhook streams) in the workspace.

**`GET /api/streams`**

### Request parameters

<ParamField query="status" type="string">
  Filter by lifecycle state: `created`, `starting`, `running`, `paused`, `stopped`, `error`.
</ParamField>

<ParamField query="source_type" type="string">
  Filter by source family: `postgres_cdc`, `mysql_cdc`, `sqlserver_cdc`, `kafka`, `webhook`, and so on.
</ParamField>

<ParamField query="pipeline_id" type="string">
  Filter streams feeding a specific pipeline.
</ParamField>

<ParamField query="limit" type="integer" default="50">
  Page size.
</ParamField>

<ParamField query="cursor" type="string">
  Pagination cursor.
</ParamField>

### Example response

```json theme={null}
{
  "data": [
    {
      "id": "str_01j9k9z0a1b2c3d4",
      "type": "stream",
      "attributes": {
        "name": "prod_pg_orders_cdc",
        "source_type": "postgres_cdc",
        "pipeline_id": "pl_01hq8orders_rt",
        "status": "running",
        "created_at": "2025-02-10T08:00:00Z",
        "updated_at": "2025-03-27T10:00:00Z"
      }
    }
  ],
  "meta": {
    "page": { "limit": 50, "cursor": null }
  }
}
```

***

## Create stream

Registers a new stream from a CDC or event source and binds it to downstream processing.

**`POST /api/streams`**

### Request body

<ParamField body="name" type="string" required>
  Unique name within the workspace.
</ParamField>

<ParamField body="sourceType" type="string" required>
  `postgres_cdc`, `mysql_cdc`, `sqlserver_cdc`, `kafka`, `webhook`, or deployment-specific values.
</ParamField>

<ParamField body="pipelineId" type="string" required>
  Pipeline that consumes stream output.
</ParamField>

<ParamField body="connectionId" type="string">
  Source connection ID (required for database CDC and most brokers).
</ParamField>

<ParamField body="config" type="object">
  Type-specific options: publication/slot for Postgres, topic names, include/exclude tables, starting offsets, webhook signing secret, and so on.
</ParamField>

### Example response

```json theme={null}
{
  "data": {
    "id": "str_01j9k9z0a1b2c3d4",
    "type": "stream",
    "attributes": {
      "name": "prod_pg_orders_cdc",
      "source_type": "postgres_cdc",
      "pipeline_id": "pl_01hq8orders_rt",
      "status": "created",
      "config": {
        "publication": "planasonix_orders_pub",
        "slot_name": "planasonix_orders_slot",
        "tables": ["public.orders", "public.order_items"]
      },
      "created_at": "2025-03-27T18:00:00Z",
      "updated_at": "2025-03-27T18:00:00Z"
    }
  }
}
```

***

## Streams summary

Returns aggregate counts and high-level health for dashboard use.

**`GET /api/streams/summary`**

### Request parameters

None.

### Example response

```json theme={null}
{
  "data": {
    "type": "streams_summary",
    "attributes": {
      "total": 14,
      "by_status": {
        "running": 11,
        "paused": 1,
        "stopped": 2,
        "error": 0
      },
      "approx_events_per_minute": 18420,
      "computed_at": "2025-03-27T18:05:00Z"
    }
  }
}
```

***

## Delete stream

Tears down the stream, releases CDC slots where applicable, and removes workspace metadata.

**`DELETE /api/streams/{id}`**

### Request parameters

<ParamField path="id" type="string" required>
  Stream ID.
</ParamField>

<ParamField query="force" type="boolean" default="false">
  When true, best-effort teardown even if the connector reports active consumers (use with care).
</ParamField>

### Example response

```json theme={null}
{
  "data": {
    "id": "str_01j9k9z0a1b2c3d4",
    "type": "stream",
    "attributes": {
      "deleted": true,
      "deleted_at": "2025-03-27T18:10:00Z"
    }
  }
}
```

***

## Start streaming

Allocates workers and begins reading from the source (or resumes from last committed offset).

**`POST /api/streams/{id}/start`**

### Request parameters

<ParamField path="id" type="string" required>
  Stream ID.
</ParamField>

### Example response

```json theme={null}
{
  "data": {
    "id": "str_01j9k9z0a1b2c3d4",
    "type": "stream",
    "attributes": {
      "status": "starting",
      "updated_at": "2025-03-27T18:11:00Z"
    }
  }
}
```

***

## Stop streaming

Stops ingestion and commits offsets where the engine supports graceful shutdown.

**`POST /api/streams/{id}/stop`**

### Request parameters

<ParamField path="id" type="string" required>
  Stream ID.
</ParamField>

### Example response

```json theme={null}
{
  "data": {
    "id": "str_01j9k9z0a1b2c3d4",
    "type": "stream",
    "attributes": {
      "status": "stopped",
      "updated_at": "2025-03-27T18:12:00Z"
    }
  }
}
```

***

## Pause streaming

Temporarily halts reading while keeping slot or subscription state for quick resume.

**`POST /api/streams/{id}/pause`**

### Request parameters

<ParamField path="id" type="string" required>
  Stream ID.
</ParamField>

### Example response

```json theme={null}
{
  "data": {
    "id": "str_01j9k9z0a1b2c3d4",
    "type": "stream",
    "attributes": {
      "status": "paused",
      "updated_at": "2025-03-27T18:13:00Z"
    }
  }
}
```

***

## Resume streaming

Resumes a paused stream.

**`POST /api/streams/{id}/resume`**

### Request parameters

<ParamField path="id" type="string" required>
  Stream ID.
</ParamField>

### Example response

```json theme={null}
{
  "data": {
    "id": "str_01j9k9z0a1b2c3d4",
    "type": "stream",
    "attributes": {
      "status": "running",
      "updated_at": "2025-03-27T18:14:00Z"
    }
  }
}
```

***

## Stream metrics

Returns throughput, consumer lag, and error counters for observability.

**`GET /api/streams/{id}/metrics`**

### Request parameters

<ParamField path="id" type="string" required>
  Stream ID.
</ParamField>

<ParamField query="window" type="string" default="5m">
  Rolling window for rates: `1m`, `5m`, `1h`, `24h`.
</ParamField>

### Example response

```json theme={null}
{
  "data": {
    "id": "str_01j9k9z0a1b2c3d4",
    "type": "stream_metrics",
    "attributes": {
      "stream_id": "str_01j9k9z0a1b2c3d4",
      "status": "running",
      "events_per_second": 312.4,
      "bytes_per_second": 1048576,
      "lag_seconds": 2.8,
      "error_rate_per_minute": 0.02,
      "last_event_at": "2025-03-27T18:14:58Z",
      "window": "5m",
      "computed_at": "2025-03-27T18:15:00Z"
    }
  }
}
```
