Kafka 协议
本文以 Python Kafka 客户端为例,介绍如何使用 Kafka 客户端访问 FlowMQ 。
TIP
下文所有示例中的 your-flowmq-host 均为占位符,请根据实际情况进行替换。
1. 连接与依赖
1.1 安装依赖
bash
python3 -m pip install kafka-python1.2 基本连接参数
bootstrap_servers:your-flowmq-host:9092
2. 创建 Topic
在生产或消费消息之前,先创建 Kafka Topic:
python
from kafka.admin import KafkaAdminClient, NewTopic
admin = KafkaAdminClient(bootstrap_servers=["your-flowmq-host:9092"])
topic = NewTopic(
name="telemetry",
num_partitions=3,
replication_factor=1,
)
admin.create_topics(new_topics=[topic])
admin.close()3. 生产消息(Producer)
2.1 基础生产
python
from kafka import KafkaProducer
producer = KafkaProducer(
bootstrap_servers=["your-flowmq-host:9092"],
acks="all",
retries=3,
)
topic = "telemetry"
producer.send(topic, key=b"device-001", value=b"{\"temp\":23.5}")
producer.flush()
producer.close()2.2 携带 Headers
python
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=["your-flowmq-host:9092"])
producer.send(
"alerts",
key=b"device-001",
value=b"overheat",
headers=[
("source", b"rule-engine"),
("severity", b"high"),
],
)
producer.flush()
producer.close()4. 消费消息(Consumer)
3.1 基础消费
python
from kafka import KafkaConsumer
consumer = KafkaConsumer(
"telemetry",
bootstrap_servers=["your-flowmq-host:9092"],
auto_offset_reset="latest",
enable_auto_commit=True,
group_id="telemetry-analytics",
)
for msg in consumer:
print(msg.topic, msg.partition, msg.offset, msg.key, msg.value)3.2 消费组与并行扩展
- 同一
group_id下启动多个消费者实例,可自动分配分区以提升吞吐。 - 需要提升并行度时,通常通过增加 Topic 分区数实现。
3.3 手动 Offset 管理(精细控制)
适用于:严格控制"处理成功后再提交"与容错重试。
python
from kafka import KafkaConsumer
consumer = KafkaConsumer(
"telemetry",
bootstrap_servers=["your-flowmq-host:9092"],
group_id="telemetry-workers",
enable_auto_commit=False,
auto_offset_reset="earliest",
)
for msg in consumer:
try:
# 业务处理
# process(msg.value)
print("processing", msg.offset)
# 处理成功后提交
consumer.commit()
except Exception as e:
# 失败时不提交,可由重启/重平衡触发重放
print("error", e)TIP
对"至少一次"投递的业务,消费端应实现幂等(例如基于业务主键去重)。
5. 分区策略
- 分区策略建议:
- 需要同设备有序:使用
key=device-id,确保同 key 的消息落在同一分区 - 需要高吞吐:增加分区数,并通过多消费者并行处理
- 需要同设备有序:使用
6. Offset 管理与语义说明
- 自动提交(
enable_auto_commit=True):- 配置简单,但可能在处理失败时造成"已提交但未处理完成"的窗口。
- 手动提交:
- 更适合需要严格控制处理语义的任务型消费。
- 重放:
- 通过调整 offset(或使用新消费组)实现历史数据重放与回溯分析。
7. 错误处理与排障建议
- 无法连接:
- 检查
your-flowmq-host:9092可达性、防火墙与监听端口 - 检查集群是否启用 TLS/SASL,客户端是否配置匹配
- 检查