We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
I found a bug, not just asking a question, which should be created in GitHub Discussions.
I have searched the GitHub Issues and GitHub Discussions of this repository and believe that this is not a duplicate.
I have confirmed that this bug belongs to the current repository, not other repositories of RocketMQ.
宿主机 Win 11 23H2 Docker 容器基础镜像 ubuntu:22.04
Rocket MQ 版本 5.1.4 Rocket MQ 客户端版本 5.0.6 (rocketmq-client-java) 客户端 Maven 坐标
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client-java</artifactId> <version>5.0.6</version> </dependency>
Zulu JRE 17.0.11
使用 Docker Compose 部署 2 个 Master Broker; 2 个 Slave Broker; 当其中一个 Master Broker 宕机之后会有部分消息一直重复消费
Dockerfile 如下
FROM java:17.0.11 ARG version=5.1.4 # 定义用户名变量 ARG USER_NAME="rocketmq" # 定义用户id变量 ARG USER_ID="1000" # 定义用户组名称变量 ARG GROUP_NAME="rocketmq" # 定义用户组id变量 ARG GROUP_ID="1000" COPY benchmark /rocketmq/benchmark COPY bin /rocketmq/bin COPY conf /rocketmq/conf COPY lib /rocketmq/lib COPY LICENSE /rocketmq/LICENSE COPY NOTICE /rocketmq/NOTICE # 创建用户组和创建用户; 并且更改文件权限 RUN chown -R $USER_ID:$GROUP_ID /rocketmq \ && groupadd -g $GROUP_ID $GROUP_NAME \ && useradd -d /rocketmq -u $USER_ID -g $GROUP_NAME -l -m -s /bin/bash $USER_NAME \ && chown -R $USER_NAME:$GROUP_NAME /rocketmq \ && chmod +x -R /rocketmq/bin # rocketmq home 环境变量 ENV ROCKETMQ_HOME=/rocketmq ENV PATH=$ROCKETMQ_HOME/bin:$PATH # 切换用户 USER $USER_NAME # 设置工作目录 WORKDIR /rocketmq/bin ENV JAVA_OPT="" ENV JAVA_OPT_EXT="" # nameserver 端口 EXPOSE 9876 # broker 端口 EXPOSE 10909 10911 10912 CMD ["dummy"]
docker-compose.yaml 如下
services: rocket-nameserver: image: apache/rocketmq:5.1.4 container_name: rocket-nameserver restart: no stop_grace_period: 5s networks: - muzhi-cloud-net # ports: # - "9876:9876" env_file: - "conf/nameserver.env" entrypoint: ["sh", "mqnamesrv"] broker-master-a: image: apache/rocketmq:5.1.4 container_name: broker-master-a restart: no stop_grace_period: 5s depends_on: - rocket-nameserver networks: - muzhi-cloud-net volumes: - "D:/WorkSpace/rocketmq/docker-home/broker-master-a/store:/rocketmq/store" - "D:/WorkSpace/rocketmq/docker-home/broker-master-a/logs:/rocketmq/logs" - "D:/WorkSpace/rocketmq/docker-home/conf/broker-master-a.conf:/rocketmq/conf/broker.conf" # ports: # - "10909:10909" # - "10911:10911" # - "10912:10912" env_file: - "conf/broker.env" entrypoint: ["sh", "mqbroker", "-c", "/rocketmq/conf/broker.conf"] broker-slave-a01: image: apache/rocketmq:5.1.4 container_name: broker-slave-a01 restart: no stop_grace_period: 5s depends_on: - rocket-nameserver networks: - muzhi-cloud-net volumes: - "D:/WorkSpace/rocketmq/docker-home/broker-slave-a01/store:/rocketmq/store" - "D:/WorkSpace/rocketmq/docker-home/broker-slave-a01/logs:/rocketmq/logs" - "D:/WorkSpace/rocketmq/docker-home/conf/broker-slave-a01.conf:/rocketmq/conf/broker.conf" # ports: # - "11909:11909" # - "11911:11911" # - "11912:11912" env_file: - "conf/broker.env" entrypoint: ["sh", "mqbroker", "-c", "/rocketmq/conf/broker.conf"] broker-master-b: image: apache/rocketmq:5.1.4 container_name: broker-master-b restart: no stop_grace_period: 5s depends_on: - rocket-nameserver networks: - muzhi-cloud-net volumes: - "D:/WorkSpace/rocketmq/docker-home/broker-master-b/store:/rocketmq/store" - "D:/WorkSpace/rocketmq/docker-home/broker-master-b/logs:/rocketmq/logs" - "D:/WorkSpace/rocketmq/docker-home/conf/broker-master-b.conf:/rocketmq/conf/broker.conf" # ports: # - "12909:12909" # - "12911:12911" # - "12912:12912" env_file: - "conf/broker.env" entrypoint: ["sh", "mqbroker", "-c", "/rocketmq/conf/broker.conf"] broker-slave-b01: image: apache/rocketmq:5.1.4 container_name: broker-slave-b01 restart: no stop_grace_period: 5s depends_on: - rocket-nameserver networks: - muzhi-cloud-net volumes: - "D:/WorkSpace/rocketmq/docker-home/broker-slave-b01/store:/rocketmq/store" - "D:/WorkSpace/rocketmq/docker-home/broker-slave-b01/logs:/rocketmq/logs" - "D:/WorkSpace/rocketmq/docker-home/conf/broker-slave-b01.conf:/rocketmq/conf/broker.conf" # ports: # - "13909:13909" # - "13911:13911" # - "13912:13912" env_file: - "conf/broker.env" entrypoint: ["sh", "mqbroker", "-c", "/rocketmq/conf/broker.conf"] rocket-proxy: image: apache/rocketmq:5.1.4 container_name: rocket-proxy restart: no stop_grace_period: 5s depends_on: - broker-master-a - broker-master-b networks: - muzhi-cloud-net volumes: - "D:/WorkSpace/rocketmq/docker-home/rocket-proxy/logs:/rocketmq/logs" - "D:/WorkSpace/rocketmq/docker-home/conf/rmq-proxy.json:/rocketmq/conf/rmq-proxy.json" ports: - "9080:9080" - "9081:9081" entrypoint: [ "sh", "mqproxy", "-pc", "/rocketmq/conf/rmq-proxy.json" ] rocketmq-dashboard: image: rocketmq/dashboard:1.0.0 container_name: rocketmq-dashboard restart: no stop_grace_period: 5s depends_on: - rocket-nameserver networks: - muzhi-cloud-net ports: - "8180:8180" networks: muzhi-cloud-net: external: true
Master Borker a 配置文件如下 broker-master-a.conf
namesrvAddr = rocket-nameserver:9876 listenPort = 10911 brokerClusterName = DefaultCluster brokerName = muzhi-broker-a brokerId = 0 brokerIP1 = broker-master-a brokerIP2 = broker-master-a brokerRole = ASYNC_MASTER flushDiskType = ASYNC_FLUSH autoCreateTopicEnable = false slaveReadEnable = false storePathCommitLog = /rocketmq/store/commitlog/ storePathConsumerQueue = /rocketmq/store/consumequeue/ deleteWhen = 04 fileReservedTime = 72 mapedFileSizeCommitLog = 1024 * 1024 * 1024a
Slave Borker a01 配置文件如下 broker-slave-a01.conf
namesrvAddr = rocket-nameserver:9876 listenPort = 11911 brokerClusterName = DefaultCluster brokerName = muzhi-broker-a brokerId = 1 brokerIP1 = broker-slave-a01 brokerIP2 = broker-slave-a01 brokerRole = SLAVE flushDiskType = ASYNC_FLUSH autoCreateTopicEnable = false slaveReadEnable = false storePathCommitLog = /rocketmq/store/commitlog/ storePathConsumerQueue = /rocketmq/store/consumequeue/ deleteWhen = 04 fileReservedTime = 72 mapedFileSizeCommitLog = 1024 * 1024 * 1024
Master Borker b 配置文件如下 broker-master-b.conf
namesrvAddr = rocket-nameserver:9876 listenPort = 12911 brokerClusterName = DefaultCluster brokerName = muzhi-broker-b brokerId = 0 brokerIP1 = broker-master-b brokerIP2 = broker-master-b brokerRole = ASYNC_MASTER flushDiskType = ASYNC_FLUSH autoCreateTopicEnable = false slaveReadEnable = false storePathCommitLog = /rocketmq/store/commitlog/ storePathConsumerQueue = /rocketmq/store/consumequeue/ deleteWhen = 04 fileReservedTime = 72 mapedFileSizeCommitLog = 1024 * 1024 * 1024
Slave Borker b01 配置文件如下 broker-slave-b01.conf
namesrvAddr = rocket-nameserver:9876 listenPort = 13911 brokerClusterName = DefaultCluster brokerName = muzhi-broker-b brokerId = 1 brokerIP1 = broker-slave-b01 brokerIP2 = broker-slave-b01 brokerRole = SLAVE flushDiskType = ASYNC_FLUSH autoCreateTopicEnable = false slaveReadEnable = false storePathCommitLog = /rocketmq/store/commitlog/ storePathConsumerQueue = /rocketmq/store/consumequeue/ deleteWhen = 04 fileReservedTime = 72 mapedFileSizeCommitLog = 1024 * 1024 * 1024
代理组件配置文件如下 rmq-proxy.json
{ "namesrvAddr": "rocket-nameserver:9876", "rocketMQClusterName": "DefaultCluster", "remotingListenPort": 9080, "grpcServerPort": 9081 }
正常启动 消息生产者,和消息消费者 消息生产者代码如下
package com.muzhi.rocketmq; import org.apache.rocketmq.client.apis.ClientConfiguration; import org.apache.rocketmq.client.apis.ClientException; import org.apache.rocketmq.client.apis.ClientServiceProvider; import org.apache.rocketmq.client.apis.message.Message; import org.apache.rocketmq.client.apis.producer.Producer; import org.apache.rocketmq.client.apis.producer.SendReceipt; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.Random; import java.util.Scanner; public class GrpcProducer { public static void main(String[] args) throws ClientException, IOException { final ClientServiceProvider provider = ClientServiceProvider.loadService(); String endpoints = "192.168.2.150:9080"; String topic = "muzhi-cloud-test"; String tag = "test-message"; ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(endpoints) .enableSsl(false) .build(); Producer producer = provider.newProducerBuilder() .setClientConfiguration(clientConfiguration) .setTopics(topic) .build(); Scanner scanner = new Scanner(System.in); Random random = new Random(); DateFormat format = new SimpleDateFormat("yyyyMMddHHmmss"); while (scanner.hasNext()) { String next; next = scanner.next(); if (next.equals("shutdown")) { producer.close(); scanner.close(); } Message message = provider.newMessageBuilder() .setTopic(topic) .setTag(tag) .setKeys(format.format(System.currentTimeMillis()) + (random.nextInt(900000) + 100000)) .setBody(next.getBytes(StandardCharsets.UTF_8)) .build(); final SendReceipt sendReceipt = producer.send(message); System.out.printf("消息发送成功 id: %s%n",sendReceipt.getMessageId()); } } }
消息消费者代码如下
package com.muzhi.rocketmq; import org.apache.rocketmq.client.apis.*; import org.apache.rocketmq.client.apis.consumer.ConsumeResult; import org.apache.rocketmq.client.apis.consumer.FilterExpression; import org.apache.rocketmq.client.apis.consumer.FilterExpressionType; import org.apache.rocketmq.client.apis.consumer.PushConsumer; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Collections; public class GrpcConsumer { public static void main(String[] args) throws ClientException, InterruptedException, IOException { final ClientServiceProvider provider = ClientServiceProvider.loadService(); String endpoints = "192.168.2.150:9080"; String consumerGroup = "muzhi-cloud-group"; String topic = "muzhi-cloud-test"; String tag = "test-message"; ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(endpoints) .enableSsl(false) .build(); FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG); PushConsumer pushConsumer = provider.newPushConsumerBuilder() .setClientConfiguration(clientConfiguration) .setConsumerGroup(consumerGroup) .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)) .setMessageListener(messageView -> { String body = StandardCharsets.UTF_8.decode(messageView.getBody()).toString(); System.out.printf("MQ 客户端收到消息 id: %s; body: %s; %n", messageView.getKeys(), body); return ConsumeResult.SUCCESS; }) .build(); Thread.sleep(Long.MAX_VALUE); pushConsumer.close(); } }
连续发送多条消息 中途关闭 broker-master-a 已经发送的部分消息一直重复消费 消费者控制台可以清除的看到 消息被重复消费
dashboard
后续 broker-master-a 启动,则回复正常
配置文件 slaveReadEnable 设置为 false 希望 Master 宕机之后能正常 消费者不要重复一直消费
部分消息一直重复消费
No response
The text was updated successfully, but these errors were encountered:
Could you also print message id in your consumer?
Sorry, something went wrong.
The one on the left is the producer console, which prints the message id; the one on the right is the consumer console, which prints the message key.
Try using the latest version to see if it can be reproduced
No branches or pull requests
Before Creating the Bug Report
I found a bug, not just asking a question, which should be created in GitHub Discussions.
I have searched the GitHub Issues and GitHub Discussions of this repository and believe that this is not a duplicate.
I have confirmed that this bug belongs to the current repository, not other repositories of RocketMQ.
Runtime platform environment
宿主机 Win 11 23H2
Docker 容器基础镜像 ubuntu:22.04
RocketMQ version
Rocket MQ 版本 5.1.4
Rocket MQ 客户端版本 5.0.6 (rocketmq-client-java)
客户端 Maven 坐标
JDK Version
Zulu JRE 17.0.11
Describe the Bug
使用 Docker Compose 部署 2 个 Master Broker; 2 个 Slave Broker; 当其中一个 Master Broker 宕机之后会有部分消息一直重复消费
Steps to Reproduce
Dockerfile 如下
docker-compose.yaml 如下
Master Borker a 配置文件如下
broker-master-a.conf
Slave Borker a01 配置文件如下
broker-slave-a01.conf
Master Borker b 配置文件如下
broker-master-b.conf
Slave Borker b01 配置文件如下
broker-slave-b01.conf
代理组件配置文件如下
rmq-proxy.json
正常启动 消息生产者,和消息消费者
消息生产者代码如下
消息消费者代码如下
连续发送多条消息 中途关闭 broker-master-a
已经发送的部分消息一直重复消费
消费者控制台可以清除的看到 消息被重复消费
dashboard
后续 broker-master-a 启动,则回复正常
What Did You Expect to See?
配置文件 slaveReadEnable 设置为 false
希望 Master 宕机之后能正常 消费者不要重复一直消费
What Did You See Instead?
部分消息一直重复消费
Additional Context
No response
The text was updated successfully, but these errors were encountered: