otelcol.exporter.kafka
otelcol.exporter.kafka
accepts logs, metrics, and traces telemetry data from other otelcol
components and sends it to Kafka.
It’s important to use otelcol.exporter.kafka
together with otelcol.processor.batch
to make sure otelcol.exporter.kafka
doesn’t slow down due to sending Kafka a huge number of small payloads.
Note
otelcol.exporter.kafka
is a wrapper over the upstream OpenTelemetry Collectorkafka
exporter from theotelcol-contrib
distribution. Bug reports or feature requests will be redirected to the upstream repository, if necessary.
Multiple otelcol.exporter.kafka
components can be specified by giving them
different labels.
Usage
otelcol.exporter.kafka "LABEL" {
protocol_version = "PROTOCOL_VERSION"
}
Arguments
You can use the following arguments with otelcol.exporter.kafka
:
Name | Type | Description | Default | Required |
---|---|---|---|---|
protocol_version | string | Kafka protocol version to use. | yes | |
brokers | list(string) | Kafka brokers to connect to. | ["localhost:9092"] | no |
client_id | string | Consumer client ID to use. The ID will be used for all produce requests. | "sarama" | no |
encoding | string | (Deprecated) Encoding of payload read from Kafka. | "otlp_proto" | no |
partition_metrics_by_resource_attributes | bool | Whether to include the hash of sorted resource attributes as the message partitioning key in metric messages sent to Kafka. | false | no |
partition_traces_by_id | bool | Whether to include the trace ID as the message key in trace messages sent to Kafka. | false | no |
resolve_canonical_bootstrap_servers_only | bool | Whether to resolve then reverse-lookup broker IPs during startup. | false | no |
timeout | duration | The timeout for every attempt to send data to the backend. | "5s" | no |
topic_from_attribute | string | A resource attribute whose value should be used as the message’s topic. | "" | no |
topic | string | (Deprecated) Kafka topic to send to. | See below | no |
Warning
The
topic
andencoding
arguments are deprecated in favor of the [logs
][logs], [metrics
][metrics], and [traces
][traces] blocks.
When topic_from_attribute
is set, it will take precedence over the topic
arguments in
logs
,
metrics
, and
traces
blocks.
partition_traces_by_id
doesn’t have any effect on Jaeger encoding exporters since Jaeger exporters include trace ID as the message key by default.
Blocks
You can use the following blocks with otelcol.exporter.kafka
:
Block | Description | Required |
---|---|---|
authentication | Configures authentication for connecting to Kafka brokers. | no |
authentication >
kerberos | Authenticates against Kafka brokers with Kerberos. | no |
authentication >
plaintext | Authenticates against Kafka brokers with plaintext. | no |
authentication >
sasl | Authenticates against Kafka brokers with SASL. | no |
authentication > sasl >
aws_msk | Additional SASL parameters when using AWS_MSK_IAM. | no |
authentication >
tls | Configures TLS for connecting to the Kafka brokers. | no |
debug_metrics | Configures the metrics which this component generates to monitor its state. | no |
logs | Configures how to send logs to Kafka brokers. | no |
metadata | Configures how to retrieve metadata from Kafka brokers. | no |
metadata >
retry | Configures how to retry metadata retrieval. | no |
metrics | Configures how to send metrics to Kafka brokers. | no |
producer | Kafka producer configuration, | no |
retry_on_failure | Configures retry mechanism for failed requests. | no |
sending_queue | Configures batching of data before sending. | no |
tls | Configures TLS for connecting to the Kafka brokers. | no |
traces | Configures how to send traces to Kafka brokers. | no |
The > symbol indicates deeper levels of nesting.
For example, authentication
> tls
refers to a tls
block defined inside an authentication
block.
logs
The logs
block configures how to send logs to Kafka brokers.
Name | Type | Description | Default | Required |
---|---|---|---|---|
encoding | string | The encoding for logs. Refer to Supported encodings. | "otlp_proto" | no |
topic | string | The name of the Kafka topic to which logs will be exported. | "otlp_logs" | no |
metrics
The metrics
block configures how to send metrics to Kafka brokers.
Name | Type | Description | Default | Required |
---|---|---|---|---|
encoding | string | The encoding for logs. Refer to Supported encodings. | "otlp_proto" | no |
topic | string | The name of the Kafka topic to which metrics will be exported. | "otlp_metrics" | no |
traces
The traces
block configures how to send traces to Kafka brokers.
Name | Type | Description | Default | Required |
---|---|---|---|---|
encoding | string | The encoding for logs. Refer to Supported encodings. | "otlp_proto" | no |
topic | string | The name of the Kafka topic to which traces will be exported. | "otlp_spans" | no |
authentication
The authentication
block holds the definition of different authentication mechanisms to use when connecting to Kafka brokers.
It doesn’t support any arguments and is configured fully through inner blocks.
kerberos
The kerberos
block configures Kerberos authentication against the Kafka broker.
The following arguments are supported:
Name | Type | Description | Default | Required |
---|---|---|---|---|
config_file | string | Path to Kerberos location, for example, /etc/krb5.conf . | no | |
disable_fast_negotiation | bool | Disable PA-FX-FAST negotiation. | false | no |
keytab_file | string | Path to keytab file, for example, /etc/security/kafka.keytab . | no | |
password | secret | Kerberos password to authenticate with. | no | |
realm | string | Kerberos realm. | no | |
service_name | string | Kerberos service name. | no | |
use_keytab | string | Enables using keytab instead of password. | no | |
username | string | Kerberos username to authenticate as. | yes |
When use_keytab
is false
, the password
argument is required.
When use_keytab
is true
, the file pointed to by the keytab_file
argument is used for authentication instead.
At most one of password
or keytab_file
must be provided.
disable_fast_negotiation
is useful for Kerberos implementations which don’t support PA-FX-FAST (Pre-Authentication Framework - Fast) negotiation.
plaintext
Caution
The
plaintext
block has been deprecated. Usesasl
withmechanism
set toPLAIN
instead.
The plaintext
block configures plain text authentication against Kafka brokers.
The following arguments are supported:
Name | Type | Description | Default | Required |
---|---|---|---|---|
password | secret | Password to use for plain text authentication. | yes | |
username | string | Username to use for plain text authentication. | yes |
sasl
The sasl
block configures SASL authentication against Kafka brokers.
The following arguments are supported:
Name | Type | Description | Default | Required |
---|---|---|---|---|
mechanism | string | SASL mechanism to use when authenticating. | yes | |
password | secret | Password to use for SASL authentication. | yes | |
username | string | Username to use for SASL authentication. | yes | |
version | number | Version of the SASL Protocol to use when authenticating. | 0 | no |
You can set the mechanism
argument to one of the following strings:
"PLAIN"
"AWS_MSK_IAM"
"SCRAM-SHA-256"
"SCRAM-SHA-512"
"AWS_MSK_IAM_OAUTHBEARER"
When mechanism
is set to "AWS_MSK_IAM"
, the aws_msk
child block must also be provided.
You can set the version
argument to either 0
or 1
.
aws_msk
The aws_msk
block configures extra parameters for SASL authentication when using the AWS_MSK_IAM
or AWS_MSK_IAM_OAUTHBEARER
mechanisms.
The following arguments are supported:
Name | Type | Description | Default | Required |
---|---|---|---|---|
broker_addr | string | MSK address to connect to for authentication. | yes | |
region | string | AWS region the MSK cluster is based in. | yes |
tls
The tls
block configures TLS settings used for connecting to the Kafka brokers.
If the tls
block isn’t provided, TLS won’t be used for communication.
The following arguments are supported:
Name | Type | Description | Default | Required |
---|---|---|---|---|
ca_file | string | Path to the CA file. | no | |
ca_pem | string | CA PEM-encoded text to validate the server with. | no | |
cert_file | string | Path to the TLS certificate. | no | |
cert_pem | string | Certificate PEM-encoded text for client authentication. | no | |
cipher_suites | list(string) | A list of TLS cipher suites that the TLS transport can use. | [] | no |
curve_preferences | list(string) | Set of elliptic curves to use in a handshake. | [] | no |
include_system_ca_certs_pool | boolean | Whether to load the system certificate authorities pool alongside the certificate authority. | false | no |
insecure_skip_verify | boolean | Ignores insecure server TLS certificates. | no | |
insecure | boolean | Disables TLS when connecting to the configured server. | no | |
key_file | string | Path to the TLS certificate key. | no | |
key_pem | secret | Key PEM-encoded text for client authentication. | no | |
max_version | string | Maximum acceptable TLS version for connections. | "TLS 1.3" | no |
min_version | string | Minimum acceptable TLS version for connections. | "TLS 1.2" | no |
reload_interval | duration | The duration after which the certificate is reloaded. | "0s" | no |
server_name | string | Verifies the hostname of server certificates when set. | no |
If the server doesn’t support TLS, you must set the insecure
argument to true
.
To disable tls
for connections to the server, set the insecure
argument to true
.
If you set reload_interval
to "0s"
, the certificate never reloaded.
The following pairs of arguments are mutually exclusive and can’t both be set simultaneously:
ca_pem
andca_file
cert_pem
andcert_file
key_pem
andkey_file
If cipher_suites
is left blank, a safe default list is used.
Refer to the
Go TLS documentation for a list of supported cipher suites.
The curve_preferences
argument determines the set of
elliptic curves to prefer during a handshake in preference order.
If not provided, a default list is used.
The set of elliptic curves available are X25519
, P521
, P256
, and P384
.
debug_metrics
The debug_metrics
block configures the metrics that this component generates to monitor its state.
The following arguments are supported:
Name | Type | Description | Default | Required |
---|---|---|---|---|
disable_high_cardinality_metrics | boolean | Whether to disable certain high cardinality metrics. | true | no |
disable_high_cardinality_metrics
is the Alloy equivalent to the telemetry.disableHighCardinalityMetrics
feature gate in the OpenTelemetry Collector.
It removes attributes that could cause high cardinality metrics.
For example, attributes with IP addresses and port numbers in metrics about HTTP and gRPC connections are removed.
Note
If configured,
disable_high_cardinality_metrics
only applies tootelcol.exporter.*
andotelcol.receiver.*
components.
metadata
The metadata
block configures how to retrieve and store metadata from the Kafka broker.
The following arguments are supported:
Name | Type | Description | Default | Required |
---|---|---|---|---|
full | bool | Whether to maintain a full set of metadata. | true | no |
refresh_interval | duration | The frequency at which cluster metadata is refreshed. | "10m" | no |
When full
is set to false
, the client does not make the initial request to broker at the startup.
Retrieving metadata may fail if the Kafka broker is starting up at the same time as the Alloy component.
The retry
child block can be provided to customize retry behavior.
retry
The retry
block configures how to retry retrieving metadata when retrieval fails.
The following arguments are supported:
Name | Type | Description | Default | Required |
---|---|---|---|---|
backoff | duration | Time to wait between retries. | "250ms" | no |
max_retries | number | How many times to reattempt retrieving metadata. | 3 | no |
producer
The producer
block configures how to retry retrieving metadata when retrieval fails.
The following arguments are supported:
Name | Type | Description | Default | Required |
---|---|---|---|---|
compression | string | Time to wait between retries. | "none" | no |
flush_max_messages | number | Time to wait between retries. | 0 | no |
max_message_bytes | number | The maximum permitted size of a message in bytes. | 1000000 | no |
required_acks | number | Controls when a message is regarded as transmitted. | 1 | no |
Refer to the
Go sarama documentation for more information on required_acks
.
compression
could be set to either none
, gzip
, snappy
, lz4
, or zstd
.
Refer to the
Go sarama documentation for more information.
retry_on_failure
The retry_on_failure
block configures how failed requests to Kafka are retried.
The following arguments are supported:
Name | Type | Description | Default | Required |
---|---|---|---|---|
enabled | boolean | Enables retrying failed requests. | true | no |
initial_interval | duration | Initial time to wait before retrying a failed request. | "5s" | no |
max_elapsed_time | duration | Maximum time to wait before discarding a failed batch. | "5m" | no |
max_interval | duration | Maximum time to wait between retries. | "30s" | no |
multiplier | number | Factor to grow wait time before retrying. | 1.5 | no |
randomization_factor | number | Factor to randomize wait time before retrying. | 0.5 | no |
When enabled
is true
, failed batches are retried after a given interval.
The initial_interval
argument specifies how long to wait before the first retry attempt.
If requests continue to fail, the time to wait before retrying increases by the factor specified by the multiplier
argument, which must be greater than 1.0
.
The max_interval
argument specifies the upper bound of how long to wait between retries.
The randomization_factor
argument is useful for adding jitter between retrying Alloy instances.
If randomization_factor
is greater than 0
, the wait time before retries is multiplied by a random factor in the range [ I - randomization_factor * I, I + randomization_factor * I]
, where I
is the current interval.
If a batch hasn’t been sent successfully, it’s discarded after the time specified by max_elapsed_time
elapses.
If max_elapsed_time
is set to "0s"
, failed requests are retried forever until they succeed.
sending_queue
The sending_queue
block configures an in-memory buffer of batches before data is sent to the gRPC server.
The following arguments are supported:
Name | Type | Description | Default | Required |
---|---|---|---|---|
block_on_overflow | boolean | The behavior when the component’s TotalSize limit is reached. | false | no |
blocking | boolean | (Deprecated) If true , blocks until the queue has room for a new request. | false | no |
enabled | boolean | Enables a buffer before sending data to the client. | true | no |
num_consumers | number | Number of readers to send batches written to the queue in parallel. | 10 | no |
queue_size | number | Maximum number of unwritten batches allowed in the queue at the same time. | 1000 | no |
sizer | string | How the queue and batching is measured. | "requests" | no |
storage | capsule(otelcol.Handler) | Handler from an otelcol.storage component to use to enable a persistent queue mechanism. | no |
The blocking
argument is deprecated in favor of the block_on_overflow
argument.
When block_on_overflow
is true
, the component will wait for space. Otherwise, operations will immediately return a retryable error.
When enabled
is true
, data is first written to an in-memory buffer before sending it to the configured server.
Batches sent to the component’s input
exported field are added to the buffer as long as the number of unsent batches doesn’t exceed the configured queue_size
.
queue_size
determines how long an endpoint outage is tolerated.
Assuming 100 requests/second, the default queue size 1000
provides about 10 seconds of outage tolerance.
To calculate the correct value for queue_size
, multiply the average number of outgoing requests per second by the time in seconds that outages are tolerated. A very high value can cause Out Of Memory (OOM) kills.
The sizer
argument could be set to:
requests
: number of incoming batches of metrics, logs, traces (the most performant option).items
: number of the smallest parts of each signal (spans, metric data points, log records).bytes
: the size of serialized data in bytes (the least performant option).
The num_consumers
argument controls how many readers read from the buffer and send data in parallel.
Larger values of num_consumers
allow data to be sent more quickly at the expense of increased network traffic.
If an otelcol.storage.*
component is configured and provided in the queue’s storage
argument, the queue uses the
provided storage extension to provide a persistent queue and the queue is no longer stored in memory.
Any data persisted will be processed on startup if Alloy is killed or restarted.
Refer to the
exporterhelper documentation in the OpenTelemetry Collector repository for more details.
Exported fields
The following fields are exported and can be referenced by other components:
Name | Type | Description |
---|---|---|
input | otelcol.Consumer | A value that other components can use to send telemetry data to. |
input
accepts otelcol.Consumer
data for any telemetry signal (metrics, logs, or traces).
Supported encodings
otelcol.exporter.kafka
supports encoding extensions, as well as the following built-in encodings.
Available for all signals:
otlp_proto
: Data is encoded as OTLP Protobuf.otlp_json
: Data is encoded as OTLP JSON.
Available only for traces:
jaeger_proto
: The payload is serialized to a single Jaeger protoSpan
, and keyed by TraceID.jaeger_json
: The payload is serialized to a single Jaeger JSON Span usingjsonpb
, and keyed by TraceID.zipkin_proto
: The payload is serialized to Zipkin v2 proto Span.zipkin_json
: The payload is serialized to Zipkin v2 JSON Span.
Available only for logs:
raw
: If the log record body is a byte array, it is sent as is. Otherwise, it is serialized to JSON. Resource and record attributes are discarded.
Component health
otelcol.exporter.kafka
is only reported as unhealthy if given an invalid configuration.
Debug information
otelcol.exporter.kafka
doesn’t expose any component-specific debug information.
Example
This example forwards telemetry data through a batch processor before finally sending it to Kafka:
otelcol.receiver.otlp "default" {
http {}
grpc {}
output {
metrics = [otelcol.processor.batch.default.input]
logs = [otelcol.processor.batch.default.input]
traces = [otelcol.processor.batch.default.input]
}
}
otelcol.processor.batch "default" {
output {
metrics = [otelcol.exporter.kafka.default.input]
logs = [otelcol.exporter.kafka.default.input]
traces = [otelcol.exporter.kafka.default.input]
}
}
otelcol.exporter.kafka "default" {
brokers = ["localhost:9092"]
protocol_version = "2.0.0"
}
Compatible components
otelcol.exporter.kafka
has exports that can be consumed by the following components:
- Components that consume
OpenTelemetry
otelcol.Consumer
Note
Connecting some components may not be sensible or components may require further configuration to make the connection work correctly. Refer to the linked documentation for more details.