---
title: Kafka 协议
---

# Kafka 协议

本文以 Python Kafka 客户端为例，介绍如何使用 Kafka 客户端访问 FlowMQ 。

::: tip
下文所有示例中的 `your-flowmq-host` 均为占位符，请根据实际情况进行替换。
:::

## 1. 连接与依赖

### 1.1 安装依赖

```bash
python3 -m pip install kafka-python
```

### 1.2 基本连接参数

- `bootstrap_servers`: `your-flowmq-host:9092`

## 2. 创建 Topic

在生产或消费消息之前，先创建 Kafka Topic：

```python
from kafka.admin import KafkaAdminClient, NewTopic

admin = KafkaAdminClient(bootstrap_servers=["your-flowmq-host:9092"])

topic = NewTopic(
    name="telemetry",
    num_partitions=3,
    replication_factor=1,
)

admin.create_topics(new_topics=[topic])
admin.close()
```

## 3. 生产消息（Producer）

### 2.1 基础生产

```python
from kafka import KafkaProducer

producer = KafkaProducer(
    bootstrap_servers=["your-flowmq-host:9092"],
    acks="all",
    retries=3,
)

topic = "telemetry"
producer.send(topic, key=b"device-001", value=b"{\"temp\":23.5}")
producer.flush()
producer.close()
```

### 2.2 携带 Headers

```python
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers=["your-flowmq-host:9092"])

producer.send(
    "alerts",
    key=b"device-001",
    value=b"overheat",
    headers=[
        ("source", b"rule-engine"),
        ("severity", b"high"),
    ],
)
producer.flush()
producer.close()
```

## 4. 消费消息（Consumer）

### 3.1 基础消费

```python
from kafka import KafkaConsumer

consumer = KafkaConsumer(
    "telemetry",
    bootstrap_servers=["your-flowmq-host:9092"],
    auto_offset_reset="latest",
    enable_auto_commit=True,
    group_id="telemetry-analytics",
)

for msg in consumer:
    print(msg.topic, msg.partition, msg.offset, msg.key, msg.value)
```

### 3.2 消费组与并行扩展

- 同一 `group_id` 下启动多个消费者实例，可自动分配分区以提升吞吐。
- 需要提升并行度时，通常通过增加 Topic 分区数实现。

### 3.3 手动 Offset 管理（精细控制）

适用于：严格控制"处理成功后再提交"与容错重试。

```python
from kafka import KafkaConsumer

consumer = KafkaConsumer(
    "telemetry",
    bootstrap_servers=["your-flowmq-host:9092"],
    group_id="telemetry-workers",
    enable_auto_commit=False,
    auto_offset_reset="earliest",
)

for msg in consumer:
    try:
        # 业务处理
        # process(msg.value)
        print("processing", msg.offset)

        # 处理成功后提交
        consumer.commit()
    except Exception as e:
        # 失败时不提交，可由重启/重平衡触发重放
        print("error", e)
```

::: tip
对"至少一次"投递的业务，消费端应实现幂等（例如基于业务主键去重）。
:::

## 5. 分区策略

- 分区策略建议：
  - 需要同设备有序：使用 `key=device-id`，确保同 key 的消息落在同一分区
  - 需要高吞吐：增加分区数，并通过多消费者并行处理

## 6. Offset 管理与语义说明

- 自动提交（`enable_auto_commit=True`）：
  - 配置简单，但可能在处理失败时造成"已提交但未处理完成"的窗口。
- 手动提交：
  - 更适合需要严格控制处理语义的任务型消费。
- 重放：
  - 通过调整 offset（或使用新消费组）实现历史数据重放与回溯分析。

## 7. 错误处理与排障建议

- 无法连接：
  - 检查 `your-flowmq-host:9092` 可达性、防火墙与监听端口
  - 检查集群是否启用 TLS/SASL，客户端是否配置匹配
