# Step 3: Connect a Cluster

{% hint style="success" %}
**Prerequisites Checklist**

* [ ] The agent has been successfully deployed, and all pods are running in a healthy state.
* [ ] The required Kafka user has been successfully created with all required permissions.
  {% endhint %}

### Step 1: Fill in cluster details

Each vendor has a slightly different connection approach

#### Confluent Cloud / AWS MSK

Automatic cluster discovery will initiate once an API key is provided.\
Metrics will be collected via the vendor API.

#### Aiven

You'll need an API token, Kafka cluster connection details, and the project and service names.

#### Redpanda / Apache (Self-hosted)

No automatic cluster discovery. Each cluster should be added manually. \
To enable metric collection in Superstream, a JMX connection must also be configured.

{% hint style="info" %}
Superstream will fetch metrics from the `/metrics` endpoint, regardless of whether they are exposed by **Prometheus** exporters or directly from **JMX** sources.
{% endhint %}

To get Apache Kafka JMX port and token information, here are the key approaches:

<details>

<summary>Getting JMX Port</summary>

**1. Check Kafka Server Configuration**

* Look in your `server.properties` file for JMX-related settings
* Common JMX port configurations:

```bash
# Default JMX port is often 9999
export JMX_PORT=9999
# Or set via KAFKA_JMX_OPTS
export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote.port=9999"
```

**2. Check Environment Variables**

```bash
echo $JMX_PORT
env | grep JMX
```

**3. Check Running Processes**

```bash
# Find Kafka process and check JMX arguments
ps aux | grep kafka
# Or use netstat to see what ports are listening
netstat -tlnp | grep java
```

**4. Check Startup Scripts**

* Look in `kafka-server-start.sh` or similar startup scripts
* Check for JMX\_PORT or KAFKA\_JMX\_OPTS variables

</details>

<details>

<summary>Testing JMX Connection</summary>

<pre class="language-bash"><code class="lang-bash"><strong># Test connection with JConsole
</strong>jconsole localhost:9999

# Or use command line tools
jmxterm -l localhost:9999
</code></pre>

#### Common Default Locations

* **Confluent Platform**: JMX typically on port 9581-9585
* **Standard Kafka**: Often port 9999
* **Docker/Kubernetes**: Check container environment variables

If JMX isn't enabled, you'll need to configure it by adding the appropriate JMX options to your Kafka startup configuration.

</details>

<details>

<summary>Required JMX Rules/metrics</summary>

To collect detailed Kafka JMX metrics, add the following `rules` section to the JMX Exporter YAML configuration. These patterns match Kafka server, network, controller, log, and Java metrics, and convert them into Prometheus-compatible metrics.

Include this full rules list in the configuration to ensure comprehensive metric coverage:

{% code lineNumbers="true" %}

```yaml
rules:
  # Special cases and very specific rules
  - pattern: kafka.server<type=(.+), name=(.+), clientId=(.+), topic=(.+), partition=(.*)><>Value
    name: kafka_server_$1_$2
    type: GAUGE
    labels:
      clientId: "$3"
      topic: "$4"
      partition: "$5"

  - pattern: kafka.server<type=(.+), name=(.+), clientId=(.+), brokerHost=(.+), brokerPort=(.+)><>Value
    name: kafka_server_$1_$2
    type: GAUGE
    labels:
      clientId: "$3"
      broker: "$4:$5"

  - pattern: kafka.server<type=(.+), cipher=(.+), protocol=(.+), listener=(.+), networkProcessor=(.+)><>connections
    name: kafka_server_$1_connections_tls_info
    type: GAUGE
    labels:
      cipher: "$2"
      protocol: "$3"
      listener: "$4"
      networkProcessor: "$5"

  - pattern: kafka.server<type=(.+), clientSoftwareName=(.+), clientSoftwareVersion=(.+), listener=(.+), networkProcessor=(.+)><>connections
    name: kafka_server_$1_connections_software
    type: GAUGE
    labels:
      clientSoftwareName: "$2"
      clientSoftwareVersion: "$3"
      listener: "$4"
      networkProcessor: "$5"

  - pattern: "kafka.server<type=(.+), listener=(.+), networkProcessor=(.+)><>(.+):"
    name: kafka_server_$1_$4
    type: GAUGE
    labels:
      listener: "$2"
      networkProcessor: "$3"

  - pattern: kafka.server<type=(.+), listener=(.+), networkProcessor=(.+)><>(.+)
    name: kafka_server_$1_$4
    type: GAUGE
    labels:
      listener: "$2"
      networkProcessor: "$3"

  # Percent metrics
  - pattern: kafka.(\w+)<type=(.+), name=(.+)Percent\w*><>MeanRate
    name: kafka_$1_$2_$3_percent
    type: GAUGE

  - pattern: kafka.(\w+)<type=(.+), name=(.+)Percent\w*><>Value
    name: kafka_$1_$2_$3_percent
    type: GAUGE

  - pattern: kafka.(\w+)<type=(.+), name=(.+)Percent\w*, (.+)=(.+)><>Value
    name: kafka_$1_$2_$3_percent
    type: GAUGE
    labels:
      "$4": "$5"

  # Generic per-second counters
  - pattern: kafka.(\w+)<type=(.+), name=(.+)PerSec\w*, (.+)=(.+), (.+)=(.+)><>Count
    name: kafka_$1_$2_$3_total
    type: COUNTER
    labels:
      "$4": "$5"
      "$6": "$7"

  - pattern: kafka.(\w+)<type=(.+), name=(.+)PerSec\w*, (.+)=(.+)><>Count
    name: kafka_$1_$2_$3_total
    type: COUNTER
    labels:
      "$4": "$5"

  - pattern: kafka.(\w+)<type=(.+), name=(.+)PerSec\w*><>Count
    name: kafka_$1_$2_$3_total
    type: COUNTER

  # Generic gauges with optional key-value pairs
  - pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.+), (.+)=(.+)><>Value
    name: kafka_$1_$2_$3
    type: GAUGE
    labels:
      "$4": "$5"
      "$6": "$7"

  - pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.+)><>Value
    name: kafka_$1_$2_$3
    type: GAUGE
    labels:
      "$4": "$5"

  - pattern: kafka.(\w+)<type=(.+), name=(.+)><>Value
    name: kafka_$1_$2_$3
    type: GAUGE

  # Histogram-like metrics (summary emulation)
  - pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.+), (.+)=(.+)><>Count
    name: kafka_$1_$2_$3_count
    type: COUNTER
    labels:
      "$4": "$5"
      "$6": "$7"

  - pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.*), (.+)=(.+)><>(\d+)thPercentile
    name: kafka_$1_$2_$3
    type: GAUGE
    labels:
      "$4": "$5"
      "$6": "$7"
      quantile: "0.$8"

  - pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.+)><>Count
    name: kafka_$1_$2_$3_count
    type: COUNTER
    labels:
      "$4": "$5"

  - pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.*)><>(\d+)thPercentile
    name: kafka_$1_$2_$3
    type: GAUGE
    labels:
      "$4": "$5"
      quantile: "0.$6"

  - pattern: kafka.(\w+)<type=(.+), name=(.+)><>Count
    name: kafka_$1_$2_$3_count
    type: COUNTER

  - pattern: kafka.(\w+)<type=(.+), name=(.+)><>(\d+)thPercentile
    name: kafka_$1_$2_$3
    type: GAUGE
    labels:
      quantile: "0.$4"

  # Controller metrics
  - pattern: kafka.controller<type=(ControllerChannelManager), name=(QueueSize), broker-id=(\d+)><>(Value)
    name: kafka_controller_$1_$2_$4
    labels:
      broker_id: "$3"

  - pattern: kafka.controller<type=(ControllerChannelManager), name=(TotalQueueSize)><>(Value)
    name: kafka_controller_$1_$2_$3

  - pattern: kafka.controller<type=(KafkaController), name=(.+)><>(Value)
    name: kafka_controller_$1_$2_$3

  - pattern: kafka.controller<type=(ControllerStats), name=(.+)><>(Count)
    name: kafka_controller_$1_$2_$3

  # Network metrics
  - pattern: kafka.network<type=(Processor), name=(IdlePercent), networkProcessor=(.+)><>(Value)
    name: kafka_network_$1_$2_$4
    labels:
      network_processor: "$3"

  - pattern: kafka.network<type=(RequestMetrics), name=(.+), request=(.+)><>(Count|Value)
    name: kafka_network_$1_$2_$4
    labels:
      request: "$3"

  - pattern: kafka.network<type=(SocketServer), name=(.+)><>(Count|Value)
    name: kafka_network_$1_$2_$3

  - pattern: kafka.network<type=(RequestChannel), name=(.+)><>(Count|Value)
    name: kafka_network_$1_$2_$3

  # Additional server metrics
  - pattern: kafka.server<type=(.+), name=(.+), topic=(.+)><>(Count|OneMinuteRate)
    name: kafka_server_$1_$2_$4
    labels:
      topic: "$3"

  - pattern: kafka.server<type=(ReplicaFetcherManager), name=(.+), clientId=(.+)><>(Value)
    name: kafka_server_$1_$2_$4
    labels:
      client_id: "$3"

  - pattern: kafka.server<type=(DelayedOperationPurgatory), name=(.+), delayedOperation=(.+)><>(Value)
    name: kafka_server_$1_$2_$3_$4

  - pattern: kafka.server<type=(.+), name=(.+)><>(Count|Value|OneMinuteRate)
    name: kafka_server_$1_total_$2_$3

  - pattern: kafka.server<type=(.+)><>(queue-size)
    name: kafka_server_$1_$2

  # Java memory and GC metrics
  - pattern: java.lang<type=(.+), name=(.+)><(.+)>(\w+)
    name: java_lang_$1_$4_$3_$2

  - pattern: java.lang<type=(.+), name=(.+)><>(\w+)
    name: java_lang_$1_$3_$2

  - pattern: java.lang<type=(.*)>

  # Kafka log metrics
  - pattern: kafka.log<type=(.+), name=(.+), topic=(.+), partition=(.+)><>Value
    name: kafka_log_$1_$2
    labels:
      topic: "$3"
      partition: "$4"
```

{% endcode %}

These rules should be added to the JMX Exporter YAML configuration to expose comprehensive metrics for the Kafka broker, controller, network, log, and JVM.

</details>

### Step 3: Verify that all discovered or added clusters are in a healthy state

When clusters are added or discovered, the system may surface warnings related to permissions or network connectivity. It’s recommended to resolve these promptly to ensure proper functionality.

<figure><img src="/files/d3oIe4VI9NQvPtCV30sF" alt=""><figcaption></figcaption></figure>

{% content-ref url="/pages/vUsvuIPQsP5K2aJsugQ8" %}
[Step 4: What's next](/getting-started/step-4-whats-next.md)
{% endcontent-ref %}


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.superstream.ai/getting-started/step-3-connect-a-cluster.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
