Skip to content

Kafka 协议

本文以 Python Kafka 客户端为例,介绍如何使用 Kafka 客户端访问 FlowMQ 。

TIP

下文所有示例中的 your-flowmq-host 均为占位符,请根据实际情况进行替换。

1. 连接与依赖

1.1 安装依赖

bash
python3 -m pip install kafka-python

1.2 基本连接参数

  • bootstrap_servers: your-flowmq-host:9092

2. 创建 Topic

在生产或消费消息之前,先创建 Kafka Topic:

python
from kafka.admin import KafkaAdminClient, NewTopic

admin = KafkaAdminClient(bootstrap_servers=["your-flowmq-host:9092"])

topic = NewTopic(
    name="telemetry",
    num_partitions=3,
    replication_factor=1,
)

admin.create_topics(new_topics=[topic])
admin.close()

3. 生产消息(Producer)

2.1 基础生产

python
from kafka import KafkaProducer

producer = KafkaProducer(
    bootstrap_servers=["your-flowmq-host:9092"],
    acks="all",
    retries=3,
)

topic = "telemetry"
producer.send(topic, key=b"device-001", value=b"{\"temp\":23.5}")
producer.flush()
producer.close()

2.2 携带 Headers

python
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers=["your-flowmq-host:9092"])

producer.send(
    "alerts",
    key=b"device-001",
    value=b"overheat",
    headers=[
        ("source", b"rule-engine"),
        ("severity", b"high"),
    ],
)
producer.flush()
producer.close()

4. 消费消息(Consumer)

3.1 基础消费

python
from kafka import KafkaConsumer

consumer = KafkaConsumer(
    "telemetry",
    bootstrap_servers=["your-flowmq-host:9092"],
    auto_offset_reset="latest",
    enable_auto_commit=True,
    group_id="telemetry-analytics",
)

for msg in consumer:
    print(msg.topic, msg.partition, msg.offset, msg.key, msg.value)

3.2 消费组与并行扩展

  • 同一 group_id 下启动多个消费者实例,可自动分配分区以提升吞吐。
  • 需要提升并行度时,通常通过增加 Topic 分区数实现。

3.3 手动 Offset 管理(精细控制)

适用于:严格控制"处理成功后再提交"与容错重试。

python
from kafka import KafkaConsumer

consumer = KafkaConsumer(
    "telemetry",
    bootstrap_servers=["your-flowmq-host:9092"],
    group_id="telemetry-workers",
    enable_auto_commit=False,
    auto_offset_reset="earliest",
)

for msg in consumer:
    try:
        # 业务处理
        # process(msg.value)
        print("processing", msg.offset)

        # 处理成功后提交
        consumer.commit()
    except Exception as e:
        # 失败时不提交,可由重启/重平衡触发重放
        print("error", e)

TIP

对"至少一次"投递的业务,消费端应实现幂等(例如基于业务主键去重)。

5. 分区策略

  • 分区策略建议:
    • 需要同设备有序:使用 key=device-id,确保同 key 的消息落在同一分区
    • 需要高吞吐:增加分区数,并通过多消费者并行处理

6. Offset 管理与语义说明

  • 自动提交(enable_auto_commit=True):
    • 配置简单,但可能在处理失败时造成"已提交但未处理完成"的窗口。
  • 手动提交:
    • 更适合需要严格控制处理语义的任务型消费。
  • 重放:
    • 通过调整 offset(或使用新消费组)实现历史数据重放与回溯分析。

7. 错误处理与排障建议

  • 无法连接:
    • 检查 your-flowmq-host:9092 可达性、防火墙与监听端口
    • 检查集群是否启用 TLS/SASL,客户端是否配置匹配