> ## Documentation Index
> Fetch the complete documentation index at: https://docs.planasonix.com/llms.txt
> Use this file to discover all available pages before exploring further.

# Airflow integration

> Orchestrate Planasonix pipelines from Apache Airflow.

**Apache Airflow** can trigger Planasonix pipelines as tasks inside your existing DAGs. You keep Airflow as the **control plane** for cross-system workflows while Planasonix executes the ETL graph, surfaces **run history**, and applies **workspace permissions**.

## Integration overview

Typical flow:

1. Airflow schedules or sensors decide **when** work should run.
2. A **Planasonix operator** (or HTTP task) calls the Planasonix **API** with pipeline id, parameters, and environment.
3. Planasonix **enqueues** the run and returns a **run id**.
4. The operator **polls** until the run reaches a terminal state, or you use **deferrable** mode to free worker slots.

<Info>
  You can also invert the pattern: Planasonix **webhooks** or **chaining** trigger Airflow DAGs when loads complete. Choose one primary orchestrator to avoid circular dependencies.
</Info>

## Planasonix Airflow operator

Install the provider package your platform team distributes (PyPI name and version appear in your internal wiki). The operator usually accepts:

* `pipeline_id` or `pipeline_name` plus `environment`
* `parameters` / `variables` dict passed into the run
* `wait_for_completion` and **timeout**
* `api_key` or **connection** id referencing stored credentials

<Tabs>
  <Tab title="Sync operator">
    Blocks an Airflow worker until Planasonix finishes. Simple to reason about; ties up a worker for long jobs.
  </Tab>

  <Tab title="Deferrable operator">
    Yields worker slots while polling Planasonix asynchronously—preferred for hour-long warehouse loads.
  </Tab>
</Tabs>

## Connection configuration

Create an **Airflow HTTP** or **custom** connection:

| Field            | Value                                                                 |
| ---------------- | --------------------------------------------------------------------- |
| Host             | Planasonix API host for your region                                   |
| Login            | Service account identifier if required                                |
| Password / token | API key or OAuth secret from **API keys**                             |
| Extra JSON       | `organization_id`, default `environment`, optional `agent_pool` hints |

<Tip>
  Use a **workspace-scoped** service account with **run** permission on specific pipelines, not a human admin key.
</Tip>

## DAG examples

### Minimal synchronous task

```python theme={null}
from datetime import datetime
from airflow import DAG
from planasonix_provider.operators.planasonix import PlanasonixRunPipelineOperator

with DAG(
    dag_id="nightly_sales_to_warehouse",
    start_date=datetime(2025, 1, 1),
    schedule="0 2 * * *",
    catchup=False,
) as dag:
    run_planasonix = PlanasonixRunPipelineOperator(
        task_id="run_sales_pipeline",
        pipeline_id="pl_8f2a9c1d4e",
        environment="production",
        parameters={"business_date": "{{ ds }}"},
        wait_for_completion=True,
        planasonix_conn_id="planasonix_default",
    )
```

### Sensor plus downstream warehouse task

```python theme={null}
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from datetime import datetime
from planasonix_provider.operators.planasonix import PlanasonixRunPipelineOperator

with DAG(
    dag_id="after_planasonix_refresh",
    start_date=datetime(2025, 1, 1),
    schedule=None,
    catchup=False,
) as dag:
    start = EmptyOperator(task_id="start")

    load = PlanasonixRunPipelineOperator(
        task_id="planasonix_incremental_load",
        pipeline_id="pl_inventory_hourly",
        environment="production",
        parameters={"watermark": "{{ dag_run.conf.get('watermark') }}"},
        wait_for_completion=True,
        planasonix_conn_id="planasonix_default",
    )

    dbt_models = EmptyOperator(task_id="trigger_dbt_job_placeholder")

    start >> load >> dbt_models
```

Replace operator import paths with the package name your organization publishes.

<Warning>
  Never embed API keys in DAG source. Always use Airflow **connections** or **secrets backends**.
</Warning>

## Monitoring Airflow-triggered runs

In Planasonix **run history**, filter by **trigger type** (API / integration) and **correlation** metadata if your operator forwards `dag_id` and `task_id` as tags or parameters.

* Failed Planasonix runs should **fail the Airflow task** so retries and alerting follow your existing DAG policies.
* For long runs, align **Airflow task timeout** with **Planasonix run timeout** to avoid double-kill races.

<AccordionGroup>
  <Accordion title="Idempotency">
    Pass deterministic parameters (`ds`, `hour`, business keys) so retries do not duplicate loads unless your pipeline is designed for at-least-once semantics.
  </Accordion>

  <Accordion title="Cross-region latency">
    If Airflow lives in a different region than Planasonix, add slack to polling intervals to reduce API chatter.
  </Accordion>
</AccordionGroup>

## Related topics

<CardGroup cols={2}>
  <Card title="API authentication" icon="lock" href="/api-reference/authentication">
    Token types and scopes for automation.
  </Card>

  <Card title="Webhooks" icon="webhook" href="/orchestration/webhooks">
    Event-driven triggers into or out of Planasonix.
  </Card>
</CardGroup>
