Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

使用 docker compose 部署 2m 2s 并且加入代理组件当 其中一个 master broker 宕机之后部分消息一直重复消费 #8126

Open
3 tasks done
muzhi9018 opened this issue May 13, 2024 · 3 comments

Comments

@muzhi9018
Copy link

muzhi9018 commented May 13, 2024

Before Creating the Bug Report

  • I found a bug, not just asking a question, which should be created in GitHub Discussions.

  • I have searched the GitHub Issues and GitHub Discussions of this repository and believe that this is not a duplicate.

  • I have confirmed that this bug belongs to the current repository, not other repositories of RocketMQ.

Runtime platform environment

宿主机 Win 11 23H2
Docker 容器基础镜像 ubuntu:22.04

RocketMQ version

Rocket MQ 版本 5.1.4
Rocket MQ 客户端版本 5.0.6 (rocketmq-client-java)
客户端 Maven 坐标

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client-java</artifactId>
    <version>5.0.6</version>
</dependency>

JDK Version

Zulu JRE 17.0.11

Describe the Bug

使用 Docker Compose 部署 2 个 Master Broker; 2 个 Slave Broker; 当其中一个 Master Broker 宕机之后会有部分消息一直重复消费

Steps to Reproduce

Dockerfile 如下

FROM java:17.0.11


ARG version=5.1.4


# 定义用户名变量
ARG USER_NAME="rocketmq"
# 定义用户id变量
ARG USER_ID="1000"
# 定义用户组名称变量
ARG GROUP_NAME="rocketmq"
# 定义用户组id变量
ARG GROUP_ID="1000"COPY benchmark /rocketmq/benchmark
COPY bin /rocketmq/bin
COPY conf /rocketmq/conf
COPY lib /rocketmq/lib
COPY LICENSE /rocketmq/LICENSE
COPY NOTICE /rocketmq/NOTICE


# 创建用户组和创建用户; 并且更改文件权限
RUN chown -R $USER_ID:$GROUP_ID /rocketmq \
  && groupadd -g $GROUP_ID $GROUP_NAME \
  && useradd -d /rocketmq -u $USER_ID -g $GROUP_NAME -l -m -s /bin/bash $USER_NAME \
  && chown -R $USER_NAME:$GROUP_NAME /rocketmq \
  && chmod +x -R /rocketmq/bin

# rocketmq home 环境变量
ENV ROCKETMQ_HOME=/rocketmq
ENV PATH=$ROCKETMQ_HOME/bin:$PATH


# 切换用户
USER $USER_NAME

# 设置工作目录
WORKDIR /rocketmq/bin

ENV JAVA_OPT=""
ENV JAVA_OPT_EXT=""# nameserver 端口
EXPOSE 9876

# broker 端口
EXPOSE 10909 10911 10912


CMD ["dummy"]

docker-compose.yaml 如下

services:
  rocket-nameserver:
    image: apache/rocketmq:5.1.4
    container_name: rocket-nameserver
    restart: no
    stop_grace_period: 5s
    networks:
      - muzhi-cloud-net
#    ports:
#      - "9876:9876"
    env_file:
      - "conf/nameserver.env"
    entrypoint: ["sh", "mqnamesrv"]
  broker-master-a:
    image: apache/rocketmq:5.1.4
    container_name: broker-master-a
    restart: no
    stop_grace_period: 5s
    depends_on:
      - rocket-nameserver
    networks:
      - muzhi-cloud-net
    volumes:
      - "D:/WorkSpace/rocketmq/docker-home/broker-master-a/store:/rocketmq/store"
      - "D:/WorkSpace/rocketmq/docker-home/broker-master-a/logs:/rocketmq/logs"
      - "D:/WorkSpace/rocketmq/docker-home/conf/broker-master-a.conf:/rocketmq/conf/broker.conf"
#    ports:
#      - "10909:10909"
#      - "10911:10911"
#      - "10912:10912"
    env_file:
      - "conf/broker.env"
    entrypoint: ["sh", "mqbroker", "-c", "/rocketmq/conf/broker.conf"]
  broker-slave-a01:
    image: apache/rocketmq:5.1.4
    container_name: broker-slave-a01
    restart: no
    stop_grace_period: 5s
    depends_on:
      - rocket-nameserver
    networks:
      - muzhi-cloud-net
    volumes:
      - "D:/WorkSpace/rocketmq/docker-home/broker-slave-a01/store:/rocketmq/store"
      - "D:/WorkSpace/rocketmq/docker-home/broker-slave-a01/logs:/rocketmq/logs"
      - "D:/WorkSpace/rocketmq/docker-home/conf/broker-slave-a01.conf:/rocketmq/conf/broker.conf"
#    ports:
#      - "11909:11909"
#      - "11911:11911"
#      - "11912:11912"
    env_file:
      - "conf/broker.env"
    entrypoint: ["sh", "mqbroker", "-c", "/rocketmq/conf/broker.conf"]
  broker-master-b:
    image: apache/rocketmq:5.1.4
    container_name: broker-master-b
    restart: no
    stop_grace_period: 5s
    depends_on:
      - rocket-nameserver
    networks:
      - muzhi-cloud-net
    volumes:
      - "D:/WorkSpace/rocketmq/docker-home/broker-master-b/store:/rocketmq/store"
      - "D:/WorkSpace/rocketmq/docker-home/broker-master-b/logs:/rocketmq/logs"
      - "D:/WorkSpace/rocketmq/docker-home/conf/broker-master-b.conf:/rocketmq/conf/broker.conf"
#    ports:
#      - "12909:12909"
#      - "12911:12911"
#      - "12912:12912"
    env_file:
      - "conf/broker.env"
    entrypoint: ["sh", "mqbroker", "-c", "/rocketmq/conf/broker.conf"]
  broker-slave-b01:
    image: apache/rocketmq:5.1.4
    container_name: broker-slave-b01
    restart: no
    stop_grace_period: 5s
    depends_on:
      - rocket-nameserver
    networks:
      - muzhi-cloud-net
    volumes:
      - "D:/WorkSpace/rocketmq/docker-home/broker-slave-b01/store:/rocketmq/store"
      - "D:/WorkSpace/rocketmq/docker-home/broker-slave-b01/logs:/rocketmq/logs"
      - "D:/WorkSpace/rocketmq/docker-home/conf/broker-slave-b01.conf:/rocketmq/conf/broker.conf"
#    ports:
#      - "13909:13909"
#      - "13911:13911"
#      - "13912:13912"
    env_file:
      - "conf/broker.env"
    entrypoint: ["sh", "mqbroker", "-c", "/rocketmq/conf/broker.conf"]
  rocket-proxy:
    image: apache/rocketmq:5.1.4
    container_name: rocket-proxy
    restart: no
    stop_grace_period: 5s
    depends_on:
      - broker-master-a
      - broker-master-b
    networks:
      - muzhi-cloud-net
    volumes:
      - "D:/WorkSpace/rocketmq/docker-home/rocket-proxy/logs:/rocketmq/logs"
      - "D:/WorkSpace/rocketmq/docker-home/conf/rmq-proxy.json:/rocketmq/conf/rmq-proxy.json"
    ports:
      - "9080:9080"
      - "9081:9081"
    entrypoint: [ "sh", "mqproxy", "-pc", "/rocketmq/conf/rmq-proxy.json" ]
  rocketmq-dashboard:
    image: rocketmq/dashboard:1.0.0
    container_name: rocketmq-dashboard
    restart: no
    stop_grace_period: 5s
    depends_on:
      - rocket-nameserver
    networks:
      - muzhi-cloud-net
    ports:
      - "8180:8180"
networks:
  muzhi-cloud-net:
    external: true

Master Borker a 配置文件如下
broker-master-a.conf

namesrvAddr = rocket-nameserver:9876

listenPort = 10911

brokerClusterName = DefaultCluster

brokerName = muzhi-broker-a

brokerId = 0

brokerIP1 = broker-master-a

brokerIP2 = broker-master-a

brokerRole = ASYNC_MASTER

flushDiskType = ASYNC_FLUSH

autoCreateTopicEnable = false

slaveReadEnable = false

storePathCommitLog = /rocketmq/store/commitlog/

storePathConsumerQueue = /rocketmq/store/consumequeue/

deleteWhen = 04

fileReservedTime = 72

mapedFileSizeCommitLog = 1024 * 1024 * 1024a

Slave Borker a01 配置文件如下
broker-slave-a01.conf

namesrvAddr = rocket-nameserver:9876

listenPort = 11911

brokerClusterName = DefaultCluster

brokerName = muzhi-broker-a

brokerId = 1

brokerIP1 = broker-slave-a01

brokerIP2 = broker-slave-a01

brokerRole = SLAVE

flushDiskType = ASYNC_FLUSH

autoCreateTopicEnable = false

slaveReadEnable = false

storePathCommitLog = /rocketmq/store/commitlog/

storePathConsumerQueue = /rocketmq/store/consumequeue/

deleteWhen = 04

fileReservedTime = 72

mapedFileSizeCommitLog = 1024 * 1024 * 1024

Master Borker b 配置文件如下
broker-master-b.conf

namesrvAddr = rocket-nameserver:9876

listenPort = 12911

brokerClusterName = DefaultCluster

brokerName = muzhi-broker-b

brokerId = 0

brokerIP1 = broker-master-b

brokerIP2 = broker-master-b

brokerRole = ASYNC_MASTER

flushDiskType = ASYNC_FLUSH

autoCreateTopicEnable = false

slaveReadEnable = false

storePathCommitLog = /rocketmq/store/commitlog/

storePathConsumerQueue = /rocketmq/store/consumequeue/

deleteWhen = 04

fileReservedTime = 72

mapedFileSizeCommitLog = 1024 * 1024 * 1024

Slave Borker b01 配置文件如下
broker-slave-b01.conf

namesrvAddr = rocket-nameserver:9876

listenPort = 13911

brokerClusterName = DefaultCluster

brokerName = muzhi-broker-b

brokerId = 1

brokerIP1 = broker-slave-b01

brokerIP2 = broker-slave-b01

brokerRole = SLAVE

flushDiskType = ASYNC_FLUSH

autoCreateTopicEnable = false

slaveReadEnable = false

storePathCommitLog = /rocketmq/store/commitlog/

storePathConsumerQueue = /rocketmq/store/consumequeue/

deleteWhen = 04

fileReservedTime = 72

mapedFileSizeCommitLog = 1024 * 1024 * 1024

代理组件配置文件如下
rmq-proxy.json

{
 
    "namesrvAddr": "rocket-nameserver:9876",
    "rocketMQClusterName": "DefaultCluster",
    "remotingListenPort": 9080,
    "grpcServerPort": 9081
}

正常启动 消息生产者,和消息消费者
消息生产者代码如下

package com.muzhi.rocketmq;

import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Random;
import java.util.Scanner;

public class GrpcProducer {

    public static void main(String[] args) throws ClientException, IOException {
        final ClientServiceProvider provider = ClientServiceProvider.loadService();

        String endpoints = "192.168.2.150:9080";
        String topic = "muzhi-cloud-test";
        String tag = "test-message";

        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
                .setEndpoints(endpoints)
                .enableSsl(false)
                .build();

        Producer producer = provider.newProducerBuilder()
                .setClientConfiguration(clientConfiguration)
                .setTopics(topic)
                .build();


        Scanner scanner = new Scanner(System.in);
        Random random = new Random();
        DateFormat format = new SimpleDateFormat("yyyyMMddHHmmss");

        while (scanner.hasNext()) {
            String next;
            next = scanner.next();
            if (next.equals("shutdown")) {
                producer.close();
                scanner.close();
            }

            Message message = provider.newMessageBuilder()
                    .setTopic(topic)
                    .setTag(tag)
                    .setKeys(format.format(System.currentTimeMillis()) + (random.nextInt(900000) + 100000))
                    .setBody(next.getBytes(StandardCharsets.UTF_8))
                    .build();

            final SendReceipt sendReceipt = producer.send(message);
            System.out.printf("消息发送成功 id: %s%n",sendReceipt.getMessageId());
        }
    }
}

消息消费者代码如下

package com.muzhi.rocketmq;

import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Collections;

public class GrpcConsumer {

    public static void main(String[] args) throws ClientException, InterruptedException, IOException {
        final ClientServiceProvider provider = ClientServiceProvider.loadService();

        String endpoints = "192.168.2.150:9080";
        String consumerGroup = "muzhi-cloud-group";
        String topic = "muzhi-cloud-test";
        String tag = "test-message";

        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
                .setEndpoints(endpoints)
                .enableSsl(false)
                .build();

        FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
        PushConsumer pushConsumer = provider.newPushConsumerBuilder()
                .setClientConfiguration(clientConfiguration)
                .setConsumerGroup(consumerGroup)
                .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
                .setMessageListener(messageView -> {
                    String body = StandardCharsets.UTF_8.decode(messageView.getBody()).toString();
                    System.out.printf("MQ 客户端收到消息 id: %s; body: %s; %n", messageView.getKeys(), body);
                    return ConsumeResult.SUCCESS;
                })
                .build();

        Thread.sleep(Long.MAX_VALUE);
        pushConsumer.close();
    }
}

连续发送多条消息 中途关闭 broker-master-a
已经发送的部分消息一直重复消费
消费者控制台可以清除的看到 消息被重复消费
image

dashboard
image

后续 broker-master-a 启动,则回复正常

What Did You Expect to See?

配置文件 slaveReadEnable 设置为 false
希望 Master 宕机之后能正常 消费者不要重复一直消费

What Did You See Instead?

部分消息一直重复消费

Additional Context

No response

@caigy
Copy link
Contributor

caigy commented May 13, 2024

Could you also print message id in your consumer?

@muzhi9018
Copy link
Author

Could you also print message id in your consumer?

The one on the left is the producer console, which prints the message id; the one on the right is the consumer console, which prints the message key.

@Willhow-Gao
Copy link
Contributor

Try using the latest version to see if it can be reproduced

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants