华为云分布式消息服务DMS对接使用完全指南
引言:为什么需要分布式消息服务
在微服务架构和分布式系统日益普及的今天,服务之间的同步调用带来的耦合度高、性能瓶颈、故障传播等问题越来越突出。分布式消息服务作为一种异步通信中间件,能够有效解耦系统组件、削峰填谷、提升系统弹性和可扩展性。华为云分布式消息服务DMS提供了Kafka、RocketMQ、RabbitMQ三种业界主流的消息引擎,分别适用于大数据流处理、金融级事务消息、传统企业集成等不同场景。
本文将手把手教你如何开通DMS服务、创建实例、配置Topic,并提供Java、Python、Go等多语言的对接代码示例,帮助你在实际项目中快速落地。
需要先登录华为云控制台,点击:华为云控制台,还没有账号,点击:注册并关联,已有账号点击:登录后关联
一、DMS服务概览与产品选型
1.1 DMS支持的三种消息引擎
华为云分布式消息服务DMS当前提供三种消息中间件引擎,各自具有不同的技术特性和适用场景:
- DMS for Kafka:基于开源Apache Kafka构建,提供高吞吐、低延迟的消息队列服务,适用于日志采集、流数据处理、数据管道等大数据场景。Kafka实例采用物理隔离的方式部署,租户独占实例资源。
- DMS for RocketMQ:基于开源Apache RocketMQ构建,是一个低延迟、弹性高可靠、高吞吐、动态扩展的消息中间件服务,支持定时消息、顺序消息、事务消息等高级特性,适用于金融交易、订单处理等对消息可靠性要求极高的场景。
- DMS for RabbitMQ:基于开源RabbitMQ构建,支持AMQP协议,适合传统企业集成、任务队列、RPC调用等场景,提供灵活的路由和交换器机制。
1.2 如何选择适合自己的引擎
选型时可从以下几个维度考量:
- 吞吐量需求:Kafka吞吐量最高,适合海量数据流;RocketMQ次之;RabbitMQ适合中等吞吐量的业务。
- 消息可靠性:RocketMQ提供事务消息和定时消息,金融级可靠;Kafka通过副本机制保证持久化;RabbitMQ提供消息确认机制。
- 开发语言生态:三种引擎均支持主流开发语言,Kafka生态最为丰富。
- 运维复杂度:华为云DMS提供全托管服务,无需关心底层集群运维,三种引擎的运维成本差异不大。
二、开通DMS服务与权限配置
2.1 开通服务
登录华为云控制台后,在服务列表中选择\"分布式消息服务\",根据需求选择Kafka版、RocketMQ版或RabbitMQ版进入对应控制台,按提示开通服务即可。新用户通常可以享受一定额度的免费试用资源。
2.2 IAM权限管理
华为云DMS的权限管理基于统一身份认证服务IAM实现。通过IAM,可以给企业中不同职能部门的员工创建IAM用户,让每个员工拥有唯一安全凭证并使用DMS资源。DMS for Kafka支持的系统策略包括只读权限\"DMS ReadOnlyAccess\"和管理员权限等。
权限验证方法:在\"服务列表\"中选择分布式消息服务Kafka,进入Kafka实例主界面,尝试购买Kafka实例。如果无法购买(假设当前权限仅包含DMS ReadOnlyAccess),则表示只读权限已生效。
如果需要更细粒度的权限控制,可以创建自定义策略,在策略中精确指定允许或拒绝的操作。建议遵循最小权限原则,仅授予业务所需的最低权限。
三、创建DMS实例
3.1 创建Kafka实例
在DMS for Kafka控制台单击\"购买Kafka实例\",需要配置以下核心参数:
- 区域与可用区:选择与业务服务器相近的区域以降低网络延迟。
- 实例规格:根据预估的吞吐量和存储需求选择适当的规格,支持按需和包周期两种计费模式。
- 存储空间:Kafka消息持久化存储的磁盘容量,建议根据消息保留时长和日均消息量合理规划。
- VPC与子网:实例部署在虚拟私有云内,需要与客户端所在网络互通。
- 安全访问:可选择开启SASL认证和SSL加密传输。SASL_SSL提供了身份认证和数据加密的双重安全保障,但会对性能产生一定影响。
- Kafka自动创建Topic:开启后,生产或消费不存在的Topic时会自动创建,方便快速测试。
3.2 创建RocketMQ实例
在DMS for RocketMQ控制台单击\"购买RocketMQ实例\",配置项与Kafka类似,额外注意RocketMQ支持的消息类型选择(普通、定时、顺序、事务)。RocketMQ实例同样采用物理隔离方式部署,租户独占资源。
3.3 创建RabbitMQ实例
在DMS for RabbitMQ控制台创建实例,需要配置Vhost(虚拟主机)、Exchange(交换器)和Queue(队列)等资源。RabbitMQ支持复杂的路由规则,适用于需要灵活消息分发的场景。
四、配置Topic与队列
4.1 Kafka Topic配置
Topic是Kafka中消息存储的基本单元,生产者和消费者通过Topic进行消息的发布和订阅。创建Topic的步骤如下:
- 登录DMS for Kafka控制台,进入实例详情页面。
- 单击左侧导航栏\"Topic管理\",进入Topic管理页面。
- 单击\"创建Topic\",填写Topic名称、分区数、副本数等配置。
如果实例创建时开启了\"Kafka自动创建Topic\"功能,则在生产或消费消息时如果Topic不存在会自动创建,无需手动操作。
4.2 RocketMQ Topic配置
RocketMQ的Topic创建流程与Kafka类似,进入RocketMQ实例详情页后,在\"实例管理 > Topic管理\"中单击\"创建Topic\"。RocketMQ Topic名称有严格的命名规则:长度为3~64个字符,只能由英文字母、数字、百分号、竖线、中划线、下划线组成,且不能以\"rmq_sys_\"开头。创建Topic时还需指定消息类型,包括普通消息、定时消息、顺序消息和事务消息。
4.3 RabbitMQ Vhost与Queue配置
RabbitMQ通过Vhost实现多租户隔离,每个Vhost拥有独立的Exchange、Queue和权限配置。在实例中创建Vhost后,再创建Exchange和Queue并配置绑定关系,消息才能正确路由。
五、收集连接信息
在编写客户端代码之前,需要从DMS控制台收集以下连接信息:
- 实例连接地址与端口:在实例详情页的\"连接信息\"中查看,包括内网连接地址和公网连接地址(如果开启了公网访问)。
- Topic名称:从Topic管理页面获取已创建的Topic名称。
- SASL用户名与密码:如果实例开启了SASL认证,在\"用户管理\"页面查看用户名,密码在创建用户时设置。
- SSL证书:如果使用SASL_SSL安全协议,需要下载SSL证书文件(client.jks),默认密码为dms@kafka。
六、Kafka对接代码示例
6.1 Java客户端接入
华为云DMS for Kafka兼容开源Apache Kafka协议,可以使用开源Kafka客户端直接连接。以下是在Java中使用Kafka客户端生产消息的完整示例。
6.1.1 Maven依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.4.0</version>
</dependency>6.1.2 生产者配置文件(dms.sdk.producer.properties)
配置文件中的bootstrap.servers需要替换为实例的实际连接地址和端口:
# Kafka实例的broker信息
bootstrap.servers=100.xxx.xxx.87:9092,100.xxx.xxx.69:9092,100.xxx.xxx.155:9092
# 发送确认参数
acks=all
# 键的序列化方式
key.serializer=org.apache.kafka.common.serialization.StringSerializer
# 值的序列化方式
value.serializer=org.apache.kafka.common.serialization.StringSerializer
# producer缓存内存大小
buffer.memory=33554432
# 重试次数
retries=10
# 重试间隔
retry.backoff.ms=1000
# ====== 以下为SASL_SSL认证配置(如未开启密文接入请注释) ======
# SASL认证机制
sasl.mechanism=PLAIN
# SASL用户名和密码
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username=\"your_username\" \
password=\"your_password\";
# 安全协议
security.protocol=SASL_SSL
# SSL证书路径
ssl.truststore.location=E:\\temp\\client.jks
# SSL证书密码
ssl.truststore.password=dms@kafka
# 证书域名校验(必须关闭)
ssl.endpoint.identification.algorithm=如果使用SASL_PLAINTEXT(仅认证不加密),则将security.protocol改为SASL_PLAINTEXT并注释SSL相关配置。
6.1.3 生产者Java代码
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
import java.util.concurrent.Future;
public class DmsKafkaProducer {
public static void main(String[] args) {
Properties props = new Properties();
// 加载配置文件
try (java.io.InputStream input = DmsKafkaProducer.class
.getClassLoader().getResourceAsStream(\"dms.sdk.producer.properties\")) {
props.load(input);
} catch (Exception e) {
e.printStackTrace();
return;
}
String topicName = \"your_topic_name\";
Producer<String, String> producer = new KafkaProducer<>(props);
try {
for (int i = 0; i < 100; i++) {
String key = \"key-\" + i;
String value = \"message-\" + i + \" at \" + System.currentTimeMillis();
ProducerRecord<String, String> record =
new ProducerRecord<>(topicName, key, value);
Future<RecordMetadata> future = producer.send(record);
RecordMetadata metadata = future.get();
System.out.println(\"Sent message: \" + value +
\" to partition \" + metadata.partition() +
\" offset \" + metadata.offset());
}
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.close();
}
}
}6.1.4 消费者配置文件(dms.sdk.consumer.properties)
# Kafka实例的broker信息
bootstrap.servers=100.xxx.xxx.87:9092,100.xxx.xxx.69:9092,100.xxx.xxx.155:9092
# 消费者组ID
group.id=your_consumer_group
# 键的反序列化方式
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
# 值的反序列化方式
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
# 自动提交偏移量
enable.auto.commit=true
# 自动提交间隔
auto.commit.interval.ms=1000
# 从最早的消息开始消费
auto.offset.reset=earliest
# ====== SASL_SSL认证配置(同生产者) ======
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username=\"your_username\" \
password=\"your_password\";
security.protocol=SASL_SSL
ssl.truststore.location=E:\\temp\\client.jks
ssl.truststore.password=dms@kafka
ssl.endpoint.identification.algorithm=6.1.5 消费者Java代码
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class DmsKafkaConsumer {
public static void main(String[] args) {
Properties props = new Properties();
try (java.io.InputStream input = DmsKafkaConsumer.class
.getClassLoader().getResourceAsStream(\"dms.sdk.consumer.properties\")) {
props.load(input);
} catch (Exception e) {
e.printStackTrace();
return;
}
String topicName = \"your_topic_name\";
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(topicName));
try {
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(1000));
records.forEach(record -> {
System.out.println(\"Received message: \" + record.value() +
\" from partition \" + record.partition() +
\" offset \" + record.offset());
});
consumer.commitSync();
}
} finally {
consumer.close();
}
}
}6.2 Python客户端接入
Python开发者可以使用kafka-python库连接DMS for Kafka。
6.2.1 安装依赖
pip install kafka-python6.2.2 生产者代码(producer.py)
from kafka import KafkaProducer
import json
import time
# 配置信息
bootstrap_servers = ['100.xxx.xxx.87:9092', '100.xxx.xxx.69:9092', '100.xxx.xxx.155:9092']
topic = 'your_topic_name'
# 如果开启SASL_SSL认证
producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
security_protocol='SASL_SSL',
sasl_mechanism='PLAIN',
sasl_plain_username='your_username',
sasl_plain_password='your_password',
ssl_check_hostname=False,
ssl_cafile='client.jks', # 证书路径
key_serializer=lambda k: k.encode('utf-8') if k else None,
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
# 生产消息
for i in range(100):
message = {'id': i, 'content': f'message-{i}', 'timestamp': int(time.time())}
future = producer.send(topic, key=f'key-{i}', value=message)
result = future.get(timeout=10)
print(f'Sent: {message}, partition: {result.partition}, offset: {result.offset}')
producer.close()6.2.3 消费者代码(consumer.py)
from kafka import KafkaConsumer
import json
bootstrap_servers = ['100.xxx.xxx.87:9092', '100.xxx.xxx.69:9092', '100.xxx.xxx.155:9092']
topic = 'your_topic_name'
group_id = 'your_consumer_group'
consumer = KafkaConsumer(
topic,
bootstrap_servers=bootstrap_servers,
group_id=group_id,
security_protocol='SASL_SSL',
sasl_mechanism='PLAIN',
sasl_plain_username='your_username',
sasl_plain_password='your_password',
ssl_check_hostname=False,
ssl_cafile='client.jks',
auto_offset_reset='earliest',
enable_auto_commit=True,
key_deserializer=lambda k: k.decode('utf-8') if k else None,
value_deserializer=lambda v: json.loads(v.decode('utf-8'))
)
for message in consumer:
print(f'Received: {message.value}, partition: {message.partition}, offset: {message.offset}')6.3 Go客户端接入
Go语言可以使用Sarama库连接DMS for Kafka。
6.3.1 安装依赖
go get github.com/IBM/sarama6.3.2 生产者代码
package main
import (
\"fmt\"
\"log\"
\"time\"
\"github.com/IBM/sarama\"
)
func main() {
brokers := []string{\"100.xxx.xxx.87:9092\", \"100.xxx.xxx.69:9092\", \"100.xxx.xxx.155:9092\"}
topic := \"your_topic_name\"
config := sarama.NewConfig()
config.Producer.Return.Successes = true
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Retry.Max = 10
// SASL_SSL认证配置
config.Net.SASL.Enable = true
config.Net.SASL.Mechanism = sarama.SASLTypePlaintext
config.Net.SASL.User = \"your_username\"
config.Net.SASL.Password = \"your_password\"
config.Net.TLS.Enable = true
config.Net.TLS.InsecureSkipVerify = true
producer, err := sarama.NewSyncProducer(brokers, config)
if err != nil {
log.Fatalf(\"Failed to create producer: %v\", err)
}
defer producer.Close()
for i := 0; i < 100; i++ {
msg := &sarama.ProducerMessage{
Topic: topic,
Key: sarama.StringEncoder(fmt.Sprintf(\"key-%d\", i)),
Value: sarama.StringEncoder(fmt.Sprintf(\"message-%d at %d\", i, time.Now().Unix())),
}
partition, offset, err := producer.SendMessage(msg)
if err != nil {
log.Printf(\"Failed to send message: %v\", err)
} else {
fmt.Printf(\"Sent message %d to partition %d offset %d\n\", i, partition, offset)
}
}
}七、RocketMQ对接代码示例
DMS for RocketMQ兼容开源RocketMQ协议,可以使用开源客户端连接。推荐使用Java 5.1.4版本的客户端。
7.1 Maven依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client-java</artifactId>
<version>5.1.4</version>
</dependency>7.2 普通消息生产与消费
7.2.1 生产者代码
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.client.producer.SendResult;
public class RocketMQProducer {
public static void main(String[] args) throws Exception {
// 创建生产者,指定生产者组名
DefaultMQProducer producer = new DefaultMQProducer(\"your_producer_group\");
// 设置NameServer地址(从控制台获取)
producer.setNamesrvAddr(\"100.xxx.xxx.87:9876\");
producer.start();
String topic = \"your_topic_name\";
for (int i = 0; i < 100; i++) {
String body = \"message-\" + i + \" at \" + System.currentTimeMillis();
Message msg = new Message(topic, \"TagA\", body.getBytes(\"UTF-8\"));
SendResult result = producer.send(msg);
System.out.println(\"Send result: \" + result);
}
producer.shutdown();
}
}7.2.2 消费者代码
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
public class RocketMQConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(\"your_consumer_group\");
consumer.setNamesrvAddr(\"100.xxx.xxx.87:9876\");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe(\"your_topic_name\", \"*\");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
msgs.forEach(msg -> {
String body = new String(msg.getBody());
System.out.println(\"Received: \" + body);
});
return null;
});
consumer.start();
System.out.println(\"Consumer started.\");
Thread.sleep(Long.MAX_VALUE);
}
}7.3 定时消息示例
RocketMQ支持定时消息,生产者发送消息后可以指定延迟投递的时间点:
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import java.time.Instant;
// 定时消息:消息10秒后投递
long deliverTimestamp = Instant.now().plusSeconds(10).toEpochMilli();
Message msg = new Message(topic, \"TagA\", \"KEY\", \"Scheduled message\".getBytes(\"UTF-8\"));
msg.setDelayTimeMs(deliverTimestamp);
producer.send(msg);八、RabbitMQ对接代码示例
8.1 Java客户端连接RabbitMQ
RabbitMQ使用AMQP协议,Java开发者可以使用官方客户端:
8.1.1 Maven依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.20.0</version>
</dependency>8.1.2 生产者代码
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class RabbitMQProducer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(\"your_instance_host\");
factory.setPort(5671); // SSL端口
factory.setVirtualHost(\"your_vhost\");
factory.setUsername(\"your_username\");
factory.setPassword(\"your_password\");
factory.useSslProtocol(); // 开启SSL
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
String queueName = \"your_queue_name\";
channel.queueDeclare(queueName, false, false, false, null);
String message = \"Hello DMS RabbitMQ!\";
channel.basicPublish(\"\", queueName, null, message.getBytes(\"UTF-8\"));
System.out.println(\"[x] Sent \" + message);
}
}
}8.1.3 消费者代码
import com.rabbitmq.client.*;
public class RabbitMQConsumer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(\"your_instance_host\");
factory.setPort(5671);
factory.setVirtualHost(\"your_vhost\");
factory.setUsername(\"your_username\");
factory.setPassword(\"your_password\");
factory.useSslProtocol();
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
String queueName = \"your_queue_name\";
channel.queueDeclare(queueName, false, false, false, null);
System.out.println(\"[*] Waiting for messages...\");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), \"UTF-8\");
System.out.println(\"[x] Received \" + message);
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
Thread.sleep(Long.MAX_VALUE);
}
}
}8.2 命令行快速体验
华为云提供了预编译的RabbitMQ示例工程,可以通过命令行快速验证:
# 下载示例工程
wget https://dms-demo.obs.cn-north-1.myhuaweicloud.com/RabbitMQ-Tutorial-SSL.zip
# 解压
unzip RabbitMQ-Tutorial-SSL.zip
# 进入目录
cd RabbitMQ-Tutorial-SSL
# 运行生产者
java -cp .:rabbitmq-tutorial-sll.jar Send {host} {port} {user} {password}
# 运行消费者
java -cp .:rabbitmq-tutorial-sll.jar Recv {host} {port} {user} {password}九、安全与最佳实践
9.1 网络安全
- 使用VPC内网访问:生产环境建议将DMS实例与业务服务器部署在同一VPC内,通过内网地址访问,既保证安全又降低延迟。
- 公网访问需谨慎:如需公网访问,务必开启SASL_SSL认证并配置白名单。
- SSL加密传输:建议开启SSL加密,防止数据在传输过程中被窃听。
9.2 认证与授权
- 使用IAM进行权限管理:通过IAM为用户分配最小权限,避免使用主账号密钥。
- Kafka ACL:Kafka实例支持ACL(访问控制列表),可以对Topic级别的读写权限进行细粒度控制。
- RocketMQ ACL:RocketMQ同样支持ACL用户管理,可在控制台配置用户权限。
9.3 高可用与容灾
- 多副本机制:创建Topic时设置合理的副本数(建议至少2副本),保障数据不丢失。
- 跨可用区部署:建议使用跨可用区复制构建数据容灾能力。
- 消费者组:使用消费者组实现负载均衡和故障转移。
9.4 监控与告警
华为云提供云监控服务CES,可以监控DMS实例的以下关键指标:
- 消息生产速率与消费速率
- 消息堆积量
- 实例CPU和内存使用率
- 磁盘使用率
建议为这些指标配置告警规则,在异常发生时及时收到通知。
十、常见问题排查
10.1 连接超时
可能原因:安全组未放行端口、VPC网络不通、实例连接地址错误。检查安全组规则是否允许客户端IP访问DMS实例端口(Kafka 9092/9093,RocketMQ 9876,RabbitMQ 5671/5672)。
10.2 认证失败
可能原因:SASL用户名或密码错误、SSL证书配置错误、SASL机制不匹配(PLAIN vs SCRAM-SHA-512)。确认用户名密码正确,并检查sasl.mechanism配置是否与实例一致。
10.3 消息生产失败
可能原因:Topic不存在(且未开启自动创建)、分区已满、生产者配置错误。检查Topic是否存在,确认磁盘空间充足。
10.4 操作鉴权失败
错误码DMS.00403005表示操作鉴权失败,可能原因是IAM权限不足或IAM-PDP交互异常。检查IAM用户是否具有对应DMS资源的操作权限。
结语
华为云分布式消息服务DMS提供了Kafka、RocketMQ、RabbitMQ三种业界主流消息引擎的全托管服务,大幅降低了消息中间件的运维复杂度。本文从服务开通、实例创建、Topic配置到多语言客户端接入,系统性地介绍了DMS的完整对接流程。无论你是大数据工程师、后端开发者还是架构师,都可以根据业务需求选择合适的消息引擎,参考本文的代码示例快速完成集成。
在实际生产环境中,建议结合业务特点合理规划Topic数量、分区数、副本数和消息保留策略,同时充分利用IAM权限管理和云监控告警功能,构建安全、稳定、高效的消息通信体系。
常见问题解答
问1:DMS for Kafka和自建Kafka有何区别?
答:DMS for Kafka是华为云提供的全托管服务,无需自行部署和维护Kafka集群,支持一键扩容、自动故障恢复、监控告警等能力。自建Kafka需要自行管理服务器、网络和软件升级,运维成本较高。
问2:DMS实例支持公网访问吗?
答:支持。在创建实例或后续配置中可以开启公网访问,但建议同时开启SASL_SSL认证以保证数据传输安全。生产环境仍推荐使用VPC内网访问。
问3:Kafka Topic的分区数如何设置?
答:分区数影响并行消费能力,一般建议设置为消费者实例数的整数倍。分区数越多吞吐量越高,但也会增加Broker的元数据管理开销,需要根据实际流量合理规划。
问4:RocketMQ的定时消息最大延迟时间是多久?
答:RocketMQ定时消息的延迟时间可自定义设置,无硬性上限。但需注意定时消息会占用服务端存储资源,不建议设置过长的延迟时间。
问5:如何确保消息不丢失?
答:生产者端设置acks=all确保消息写入所有副本后才返回成功;消费者端使用手动提交偏移量,在业务处理完成后再提交;服务端配置多副本机制。三管齐下可最大程度保证消息不丢失。
问6:DMS按什么计费?费用主要包括哪些?
答:DMS按实例规格(计算+存储)计费,支持按需和包周期两种模式。费用主要包括实例费用和存储费用,公网流量单独计费。建议根据业务量选择合适的规格,避免资源浪费。



