Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] RocketMQ FIFO Topic still cannot guarantee message order for simple consumers even in sequential production scenarios. #8154

Closed
3 tasks done
JanYork opened this issue May 16, 2024 · 17 comments

Comments

@JanYork
Copy link

JanYork commented May 16, 2024

Before Creating the Bug Report

  • I found a bug, not just asking a question, which should be created in GitHub Discussions.

  • I have searched the GitHub Issues and GitHub Discussions of this repository and believe that this is not a duplicate.

  • I have confirmed that this bug belongs to the current repository, not other repositories of RocketMQ.

Runtime platform environment

The issue can be reproduced across multiple devices and various environments.

RocketMQ version

rocketmq-all-5.2.0-bin-release.

JDK Version

JDK17

Describe the Bug

In RocketMQ version 5.x, I created a FIFO type topic with both read and write queues set to one. Then, I sequentially produced messages using the client, with message content being numbers from 0 to 9. After that, I started a SimpleConsumer to consume messages. However, the order of numbers I received was: 1, 2, 3, 4, 5, 6, 7, 8, 9, 0. This is an issue because I didn't receive 0 first! This implies that the method of fetching messages by the SimpleConsumer is not atomic. Moreover, this phenomenon does not occur every time; it can only be occasionally reproduced.

I have tried on various platforms (MacOS and Windows) and experimented with different language clients (Java, Node.js, Go), all of which exhibit this issue. The fact that the atomic method for message retrieval fails to ensure message order consistently across these platforms and languages leads me to believe that this is a bug. Furthermore, I am using a single consumer, deploying RocketMQ on a single machine, and operating with a single producer. Additionally, I am not employing any asynchronous code; everything is executed synchronously.

Steps to Reproduce

You can also attempt to reproduce this issue in Java using the following code:

First, you can create a FIFO type topic and set both the read and write queues to one. Then, produce messages to the same group.

import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.ProducerBuilder;
import org.apache.rocketmq.client.apis.producer.TransactionChecker;

/**
 * Each client will establish an independent connection to the server node within a process.
 *
 * <p>In most cases, the singleton mode can meet the requirements of higher concurrency.
 * If multiple connections are desired, consider increasing the number of clients appropriately.
 */
public class ProducerSingleton {
    private static volatile Producer PRODUCER;
    private static volatile Producer TRANSACTIONAL_PRODUCER;
    private static final String ACCESS_KEY = "yourAccessKey";
    private static final String SECRET_KEY = "yourSecretKey";
    private static final String ENDPOINTS = "localhost:8081";

    private ProducerSingleton() {
    }

    private static Producer buildProducer(TransactionChecker checker, String... topics) throws ClientException {
        final ClientServiceProvider provider = ClientServiceProvider.loadService();
        SessionCredentialsProvider sessionCredentialsProvider =
                new StaticSessionCredentialsProvider(ACCESS_KEY, SECRET_KEY);
        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
                .setEndpoints(ENDPOINTS)
                .enableSsl(false)
                .setCredentialProvider(sessionCredentialsProvider)
                .build();
        final ProducerBuilder builder = provider.newProducerBuilder()
                .setClientConfiguration(clientConfiguration)
                .setTopics(topics);
        if (checker != null) {
            // Set the transaction checker.
            builder.setTransactionChecker(checker);
        }
        return builder.build();
    }

    public static Producer getInstance(String... topics) throws ClientException {
        if (null == PRODUCER) {
            synchronized (ProducerSingleton.class) {
                if (null == PRODUCER) {
                    PRODUCER = buildProducer(null, topics);
                }
            }
        }
        return PRODUCER;
    }

    public static Producer getTransactionalInstance(TransactionChecker checker,
                                                    String... topics) throws ClientException {
        if (null == TRANSACTIONAL_PRODUCER) {
            synchronized (ProducerSingleton.class) {
                if (null == TRANSACTIONAL_PRODUCER) {
                    TRANSACTIONAL_PRODUCER = buildProducer(checker, topics);
                }
            }
        }
        return TRANSACTIONAL_PRODUCER;
    }
}
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

/**
 * @author muyouzhi
 */
public class MessageProducer {
    private static final Logger log = LoggerFactory.getLogger(MessageProducer.class);

    private MessageProducer() {
    }

    public static void main(String[] args) throws ClientException, IOException {
        final ClientServiceProvider provider = ClientServiceProvider.loadService();

        String topic = "checkout-topic-fifo";
        String group = "checkout-group";
        String tag = "checkout";

        final Producer producer = ProducerSingleton.getInstance(topic);

        for (int i = 0; i < 10; i++) {
            byte[] body = String.valueOf(i).getBytes(StandardCharsets.UTF_8);

            final Message message = provider.newMessageBuilder()
                    .setTopic(topic)
                    .setTag(tag)
                    .setMessageGroup(group)
                    .setKeys("message"+i)
                    .setBody(body)
                    .build();


            try {
                final SendReceipt sendReceipt = producer.send(message);
                log.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
            } catch (Throwable t) {
                log.error("Failed to send message", t);
            }
        }

         producer.close();
    }
}
import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.message.MessageId;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Collections;
import java.util.List;

/**
 * @author muyouzhi
 */
public class SimpleConsumer {
    private static final Logger log = LoggerFactory.getLogger(SimpleConsumer.class);

    private SimpleConsumer() {
    }

    @SuppressWarnings({"resource", "InfiniteLoopStatement"})
    public static void main(String[] args) throws ClientException {
        final ClientServiceProvider provider = ClientServiceProvider.loadService();

        String accessKey = "yourAccessKey";
        String secretKey = "yourSecretKey";
        SessionCredentialsProvider sessionCredentialsProvider =
                new StaticSessionCredentialsProvider(accessKey, secretKey);

        String endpoints = "localhost:8081";
        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
                .setEndpoints(endpoints)
                // .enableSsl(false)
                .setCredentialProvider(sessionCredentialsProvider)
                .build();

        String topic = "checkout-topic-fifo";
        String group = "checkout-group";
        String tag = "checkout";
        Duration awaitDuration = Duration.ofSeconds(30);

        FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);

        org.apache.rocketmq.client.apis.consumer.SimpleConsumer consumer = provider.newSimpleConsumerBuilder()
                .setClientConfiguration(clientConfiguration)
                .setConsumerGroup(group)
                .setAwaitDuration(awaitDuration)
                .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
                .build();

        int maxMessageNum = 1;
        Duration invisibleDuration = Duration.ofSeconds(15);

        do {
            final List<MessageView> messages = consumer.receive(maxMessageNum, invisibleDuration);
            log.info("Received {} message(s)", messages.size());
            for (MessageView message : messages) {
                final MessageId messageId = message.getMessageId();

                ByteBuffer buffer = message.getBody();

                byte[] messageBytes = new byte[buffer.remaining()];
                buffer.get(messageBytes);

                String messageStr = new String(messageBytes, StandardCharsets.UTF_8);

                try {
                    consumer.ack(message);
                    log.info("Message is acknowledged, messageId={}, message={}", messageId, messageStr);
                } catch (Throwable t) {
                    log.error("Message is failed to be acknowledged, messageId={}", messageId, t);
                }
            }
        } while (true);
//         consumer.close();
    }
}

Then, run the MessageProducer class to sequentially send messages. After sending is complete, promptly run the consumer. If the issue does not occur, close the consumer and repeat the previous steps. This can be reproduced; I have asked some friends in technical exchange groups to do the same, and they have successfully reproduced it. They also believe it to be a bug.

For Node.js, you can refer to the issue I raised previously: #8133

What Did You Expect to See?

Messages in the message queue are consumed sequentially.

What Did You See Instead?

the log:

14:59:05.735 [main] INFO SimpleConsumer - Received 1 message(s)
14:59:05.756 [main] INFO SimpleConsumer - Message is acknowledged, messageId=012EFD84B22E14BBE7065748B200000001, message=1
14:59:05.763 [main] INFO SimpleConsumer - Received 1 message(s)
14:59:05.768 [main] INFO SimpleConsumer - Message is acknowledged, messageId=012EFD84B22E14BBE7065748B200000002, message=2
14:59:05.776 [main] INFO SimpleConsumer - Received 1 message(s)
14:59:05.782 [main] INFO SimpleConsumer - Message is acknowledged, messageId=012EFD84B22E14BBE7065748B200000003, message=3
14:59:05.792 [main] INFO SimpleConsumer - Received 1 message(s)
14:59:05.799 [main] INFO SimpleConsumer - Message is acknowledged, messageId=012EFD84B22E14BBE7065748B200000004, message=4
14:59:05.811 [main] INFO SimpleConsumer - Received 1 message(s)
14:59:05.821 [main] INFO SimpleConsumer - Message is acknowledged, messageId=012EFD84B22E14BBE7065748B200000005, message=5
14:59:05.837 [main] INFO SimpleConsumer - Received 1 message(s)
14:59:05.847 [main] INFO SimpleConsumer - Message is acknowledged, messageId=012EFD84B22E14BBE7065748B200000006, message=6
14:59:05.855 [main] INFO SimpleConsumer - Received 1 message(s)
14:59:05.866 [main] INFO SimpleConsumer - Message is acknowledged, messageId=012EFD84B22E14BBE7065748B200000007, message=7
14:59:05.875 [main] INFO SimpleConsumer - Received 1 message(s)
14:59:05.884 [main] INFO SimpleConsumer - Message is acknowledged, messageId=012EFD84B22E14BBE7065748B200000008, message=8
14:59:05.895 [main] INFO SimpleConsumer - Received 1 message(s)
14:59:05.902 [main] INFO SimpleConsumer - Message is acknowledged, messageId=012EFD84B22E14BBE7065748B200000009, message=9

after waiting for some time:

14:59:25.868 [main] INFO SimpleConsumer - Received 1 message(s)
14:59:25.872 [main] INFO SimpleConsumer - Message is acknowledged, messageId=012EFD84B22E14BBE7065748B200000000, message=0

Additional Context

image
@RongtongJin
Copy link
Contributor

When creating the consumer group, did you set it as a FIFO-ConsumerGroup? For reference, use the command
sh mqadmin updateSubGroup -n <nameserver_address> -g <consumer_group_name> -c <cluster_name> -o true.

@JanYork
Copy link
Author

JanYork commented May 17, 2024

When creating the consumer group, did you set it as a FIFO-ConsumerGroup? For reference, use the command
sh mqadmin updateSubGroup -n <nameserver_address> -g <consumer_group_name> -c <cluster_name> -o true.

Yes, this is inevitable. I have explained above that this is a FIFO-type topic.
Commands from the document are used.

@RongtongJin
Copy link
Contributor

When creating the consumer group, did you set it as a FIFO-ConsumerGroup? For reference, use the command
sh mqadmin updateSubGroup -n <nameserver_address> -g <consumer_group_name> -c <cluster_name> -o true.

Yes, this is inevitable. I have explained above that this is a FIFO-type topic. Commands from the document are used.

FIFO Consumer Group non-FIFO Consumer Group
FIFO Topic FIFO non-FIFO
non-FIFO Topic non-FIFO non-FIFO

So the consumer group is also FIFO?

@JanYork
Copy link
Author

JanYork commented May 17, 2024

When creating the consumer group, did you set it as a FIFO-ConsumerGroup? For reference, use the command
sh mqadmin updateSubGroup -n <nameserver_address> -g <consumer_group_name> -c <cluster_name> -o true.

Yes, this is inevitable. I have explained above that this is a FIFO-type topic. Commands from the document are used.

FIFO Consumer Group non-FIFO Consumer Group
FIFO Topic FIFO non-FIFO
non-FIFO Topic non-FIFO non-FIFO

So the consumer group is also FIFO?

yes,I used a simple consumer.And without multithreading concurrency, I only get one message at a time, and when I finish one message, I get the next one.

Consumers and producers are single threads, no asynchrony, no concurrency.

Get only one message at a time.

All messages are in one group and there is only one queue.

@JanYork
Copy link
Author

JanYork commented May 17, 2024

When creating the consumer group, did you set it as a FIFO-ConsumerGroup? For reference, use the command
sh mqadmin updateSubGroup -n <nameserver_address> -g <consumer_group_name> -c <cluster_name> -o true.

Yes, this is inevitable. I have explained above that this is a FIFO-type topic. Commands from the document are used.

FIFO Consumer Group non-FIFO Consumer Group
FIFO Topic FIFO non-FIFO
non-FIFO Topic non-FIFO non-FIFO

So the consumer group is also FIFO?

Let's also say that my conditions are as follows:

  1. i created a FIFO topic using the command stated in the official documentation.
  2. all the messages produced by my producers are put into the same group.
  3. the producer and the consumer are single, there is no concurrency or asynchrony, they are both executing the code synchronously.
  4. when I use SimpleConsumer, I fetch one message at a time and wait for the previous message to complete before fetching the next one.

Also this mostly only happens with the 1st batch of messages received after startup.

I'm using the official github example.

I can reproduce and record a video on my side if needed.

The following is in Chinese, I think you'll understand it better that way:

也就说说,我的条件如下:
​1.我使用了官方文档上所说的命令创建了一个FIFO topic。
​2.我的生产者生产的消息都放到了同一个组。
​3.生产者与消费者都是单一的,没有并发也没有异步,他们都是同步执行的代码。
​4.我使用SimpleConsumer时,一次只获取一个消息,等待上一个消息完成后,才会获取下一个消息。

​另外这种情况多半只发生在启动后第1次接收的那批消息。

​我使用的是官方github上的示例。

​如果有需要,我这边可以复现并录制视频。

@RongtongJin
Copy link
Contributor

When creating the consumer group, did you set it as a FIFO-ConsumerGroup? For reference, use the command
sh mqadmin updateSubGroup -n <nameserver_address> -g <consumer_group_name> -c <cluster_name> -o true.

Yes, this is inevitable. I have explained above that this is a FIFO-type topic. Commands from the document are used.

FIFO Consumer Group
non-FIFO Consumer Group

FIFO Topic
FIFO
non-FIFO

non-FIFO Topic
non-FIFO
non-FIFO

So the consumer group is also FIFO?

Let's also say that my conditions are as follows:

  1. i created a FIFO topic using the command stated in the official documentation.
  2. all the messages produced by my producers are put into the same group.
  3. the producer and the consumer are single, there is no concurrency or asynchrony, they are both executing the code synchronously.
  4. when I use SimpleConsumer, I fetch one message at a time and wait for the previous message to complete before fetching the next one.

Also this mostly only happens with the 1st batch of messages received after startup.

I'm using the official github example.

I can reproduce and record a video on my side if needed.

The following is in Chinese, I think you'll understand it better that way:

也就说说,我的条件如下:​1.我使用了官方文档上所说的命令创建了一个FIFO topic。​2.我的生产者生产的消息都放到了同一个组。​3.生产者与消费者都是单一的,没有并发也没有异步,他们都是同步执行的代码。​4.我使用SimpleConsumer时,一次只获取一个消息,等待上一个消息完成后,才会获取下一个消息。​​另外这种情况多半只发生在启动后第1次接收的那批消息。​​我使用的是官方github上的示例。​​如果有需要,我这边可以复现并录制视频。

我们复现下

@JanYork
Copy link
Author

JanYork commented May 17, 2024

When creating the consumer group, did you set it as a FIFO-ConsumerGroup? For reference, use the command
sh mqadmin updateSubGroup -n <nameserver_address> -g <consumer_group_name> -c <cluster_name> -o true.

Yes, this is inevitable. I have explained above that this is a FIFO-type topic. Commands from the document are used.

FIFO Consumer Group
non-FIFO Consumer Group

FIFO Topic
FIFO
non-FIFO

non-FIFO Topic
non-FIFO
non-FIFO

So the consumer group is also FIFO?

Let's also say that my conditions are as follows:

  1. i created a FIFO topic using the command stated in the official documentation.
  2. all the messages produced by my producers are put into the same group.
  3. the producer and the consumer are single, there is no concurrency or asynchrony, they are both executing the code synchronously.
  4. when I use SimpleConsumer, I fetch one message at a time and wait for the previous message to complete before fetching the next one.

Also this mostly only happens with the 1st batch of messages received after startup.

I'm using the official github example.

I can reproduce and record a video on my side if needed.

The following is in Chinese, I think you'll understand it better that way:

也就说说,我的条件如下:​1.我使用了官方文档上所说的命令创建了一个FIFO topic。​2.我的生产者生产的消息都放到了同一个组。​3.生产者与消费者都是单一的,没有并发也没有异步,他们都是同步执行的代码。​4.我使用SimpleConsumer时,一次只获取一个消息,等待上一个消息完成后,才会获取下一个消息。​​另外这种情况多半只发生在启动后第1次接收的那批消息。​​我使用的是官方github上的示例。​​如果有需要,我这边可以复现并录制视频。

我们复现下

这种现象不是每次都能复现,你可以先启动单个生产者,发送1-10个数字到topic同一个group,然后紧接着启动Simple consumer并串行切一次获取一个message,有时候会丢失掉数字1。

在Java中复现的概率更低,或许和启动时间有些关系,在nodejs复现概率极高(单个node脚本启动迅速)。

@JanYork
Copy link
Author

JanYork commented May 17, 2024

When creating the consumer group, did you set it as a FIFO-ConsumerGroup? For reference, use the command
sh mqadmin updateSubGroup -n <nameserver_address> -g <consumer_group_name> -c <cluster_name> -o true.

Yes, this is inevitable. I have explained above that this is a FIFO-type topic. Commands from the document are used.

FIFO Consumer Group
non-FIFO Consumer Group

FIFO Topic
FIFO
non-FIFO

non-FIFO Topic
non-FIFO
non-FIFO

So the consumer group is also FIFO?

Let's also say that my conditions are as follows:

  1. i created a FIFO topic using the command stated in the official documentation.
  2. all the messages produced by my producers are put into the same group.
  3. the producer and the consumer are single, there is no concurrency or asynchrony, they are both executing the code synchronously.
  4. when I use SimpleConsumer, I fetch one message at a time and wait for the previous message to complete before fetching the next one.

Also this mostly only happens with the 1st batch of messages received after startup.

I'm using the official github example.

I can reproduce and record a video on my side if needed.

The following is in Chinese, I think you'll understand it better that way:

也就说说,我的条件如下:​1.我使用了官方文档上所说的命令创建了一个FIFO topic。​2.我的生产者生产的消息都放到了同一个组。​3.生产者与消费者都是单一的,没有并发也没有异步,他们都是同步执行的代码。​4.我使用SimpleConsumer时,一次只获取一个消息,等待上一个消息完成后,才会获取下一个消息。​​另外这种情况多半只发生在启动后第1次接收的那批消息。​​我使用的是官方github上的示例。​​如果有需要,我这边可以复现并录制视频。

我们复现下

这种现象不是每次都能复现,你可以先启动单个生产者,发送1-10个数字到topic同一个group,然后紧接着启动Simple consumer并串行切一次获取一个message,有时候会丢失掉数字1。

在Java中复现的概率更低,或许和启动时间有些关系,在nodejs复现概率极高(单个node脚本启动迅速)。

可以参考我上方的复现过程以及代码。

@JanYork
Copy link
Author

JanYork commented May 17, 2024

@RongtongJin Has it been successfully repeated? I need results as soon as possible because it may have an impact on my business.

@drpmma
Copy link
Contributor

drpmma commented May 17, 2024

I could not reproduce your issue.

The SimpleConsumer with fifo is basically using the pop orderly interface which is mostly server-side behavior.

It seems that it is possible that the group or topic is not been set properly. Could you please provide your group in subscriptionGroup.json and topic in topics.json which in ~/store/config to ensure that the related configuration is correct?

@JanYork
Copy link
Author

JanYork commented May 17, 2024

I could not reproduce your issue.

The SimpleConsumer with fifo is basically using the pop orderly interface which is mostly server-side behavior.

It seems that it is possible that the group or topic is not been set properly. Could you please provide your group in subscriptionGroup.json and topic in topics.json which in ~/store/config to ensure that the related configuration is correct?

Of course, I can do that. I'll show you below, or do you have a more immediate communication tool? Like QQ or WeChat?

{
	"dataVersion":{
		"counter":0,
		"stateVersion":0,
		"timestamp":1715929343644
	},
	"forbiddenTable":{},
	"subscriptionGroupTable":{
		"SELF_TEST_C_GROUP":{
			"attributes":{},
			"brokerId":0,
			"consumeBroadcastEnable":true,
			"consumeEnable":true,
			"consumeFromMinEnable":true,
			"consumeMessageOrderly":false,
			"consumeTimeoutMinute":15,
			"groupName":"SELF_TEST_C_GROUP",
			"groupRetryPolicy":{
				"type":"CUSTOMIZED"
			},
			"groupSysFlag":0,
			"notifyConsumerIdsChangedEnable":true,
			"retryMaxTimes":16,
			"retryQueueNums":1,
			"whichBrokerWhenConsumeSlowly":1
		},
		"CID_ONSAPI_OWNER":{
			"attributes":{},
			"brokerId":0,
			"consumeBroadcastEnable":true,
			"consumeEnable":true,
			"consumeFromMinEnable":true,
			"consumeMessageOrderly":false,
			"consumeTimeoutMinute":15,
			"groupName":"CID_ONSAPI_OWNER",
			"groupRetryPolicy":{
				"type":"CUSTOMIZED"
			},
			"groupSysFlag":0,
			"notifyConsumerIdsChangedEnable":true,
			"retryMaxTimes":16,
			"retryQueueNums":1,
			"whichBrokerWhenConsumeSlowly":1
		},
		"CID_ONSAPI_PERMISSION":{
			"attributes":{},
			"brokerId":0,
			"consumeBroadcastEnable":true,
			"consumeEnable":true,
			"consumeFromMinEnable":true,
			"consumeMessageOrderly":false,
			"consumeTimeoutMinute":15,
			"groupName":"CID_ONSAPI_PERMISSION",
			"groupRetryPolicy":{
				"type":"CUSTOMIZED"
			},
			"groupSysFlag":0,
			"notifyConsumerIdsChangedEnable":true,
			"retryMaxTimes":16,
			"retryQueueNums":1,
			"whichBrokerWhenConsumeSlowly":1
		},
		"CID_RMQ_SYS_TRANS":{
			"attributes":{},
			"brokerId":0,
			"consumeBroadcastEnable":true,
			"consumeEnable":true,
			"consumeFromMinEnable":true,
			"consumeMessageOrderly":false,
			"consumeTimeoutMinute":15,
			"groupName":"CID_RMQ_SYS_TRANS",
			"groupRetryPolicy":{
				"type":"CUSTOMIZED"
			},
			"groupSysFlag":0,
			"notifyConsumerIdsChangedEnable":true,
			"retryMaxTimes":16,
			"retryQueueNums":1,
			"whichBrokerWhenConsumeSlowly":1
		},
		"TOOLS_CONSUMER":{
			"attributes":{},
			"brokerId":0,
			"consumeBroadcastEnable":true,
			"consumeEnable":true,
			"consumeFromMinEnable":true,
			"consumeMessageOrderly":false,
			"consumeTimeoutMinute":15,
			"groupName":"TOOLS_CONSUMER",
			"groupRetryPolicy":{
				"type":"CUSTOMIZED"
			},
			"groupSysFlag":0,
			"notifyConsumerIdsChangedEnable":true,
			"retryMaxTimes":16,
			"retryQueueNums":1,
			"whichBrokerWhenConsumeSlowly":1
		},
		"CID_ONS-HTTP-PROXY":{
			"attributes":{},
			"brokerId":0,
			"consumeBroadcastEnable":true,
			"consumeEnable":true,
			"consumeFromMinEnable":true,
			"consumeMessageOrderly":false,
			"consumeTimeoutMinute":15,
			"groupName":"CID_ONS-HTTP-PROXY",
			"groupRetryPolicy":{
				"type":"CUSTOMIZED"
			},
			"groupSysFlag":0,
			"notifyConsumerIdsChangedEnable":true,
			"retryMaxTimes":16,
			"retryQueueNums":1,
			"whichBrokerWhenConsumeSlowly":1
		},
		"FILTERSRV_CONSUMER":{
			"attributes":{},
			"brokerId":0,
			"consumeBroadcastEnable":true,
			"consumeEnable":true,
			"consumeFromMinEnable":true,
			"consumeMessageOrderly":false,
			"consumeTimeoutMinute":15,
			"groupName":"FILTERSRV_CONSUMER",
			"groupRetryPolicy":{
				"type":"CUSTOMIZED"
			},
			"groupSysFlag":0,
			"notifyConsumerIdsChangedEnable":true,
			"retryMaxTimes":16,
			"retryQueueNums":1,
			"whichBrokerWhenConsumeSlowly":1
		},
		"CID_ONSAPI_PULL":{
			"attributes":{},
			"brokerId":0,
			"consumeBroadcastEnable":true,
			"consumeEnable":true,
			"consumeFromMinEnable":true,
			"consumeMessageOrderly":false,
			"consumeTimeoutMinute":15,
			"groupName":"CID_ONSAPI_PULL",
			"groupRetryPolicy":{
				"type":"CUSTOMIZED"
			},
			"groupSysFlag":0,
			"notifyConsumerIdsChangedEnable":true,
			"retryMaxTimes":16,
			"retryQueueNums":1,
			"whichBrokerWhenConsumeSlowly":1
		},
		"checkout-group":{
			"attributes":{},
			"brokerId":0,
			"consumeBroadcastEnable":true,
			"consumeEnable":true,
			"consumeFromMinEnable":true,
			"consumeMessageOrderly":false,
			"consumeTimeoutMinute":15,
			"groupName":"checkout-group",
			"groupRetryPolicy":{
				"type":"CUSTOMIZED"
			},
			"groupSysFlag":0,
			"notifyConsumerIdsChangedEnable":true,
			"retryMaxTimes":16,
			"retryQueueNums":1,
			"whichBrokerWhenConsumeSlowly":1
		}
	}
}
{
	"dataVersion":{
		"counter":28,
		"stateVersion":0,
		"timestamp":1715934484981
	},
	"topicConfigTable":{
		"SCHEDULE_TOPIC_XXXX":{
			"attributes":{},
			"order":false,
			"perm":6,
			"readQueueNums":18,
			"topicFilterType":"SINGLE_TAG",
			"topicName":"SCHEDULE_TOPIC_XXXX",
			"topicSysFlag":0,
			"writeQueueNums":18
		},
		"JanYorkMacBook-Pro.local":{
			"attributes":{},
			"order":false,
			"perm":7,
			"readQueueNums":1,
			"topicFilterType":"SINGLE_TAG",
			"topicName":"JanYorkMacBook-Pro.local",
			"topicSysFlag":0,
			"writeQueueNums":1
		},
		"SELF_TEST_TOPIC":{
			"attributes":{},
			"order":false,
			"perm":6,
			"readQueueNums":1,
			"topicFilterType":"SINGLE_TAG",
			"topicName":"SELF_TEST_TOPIC",
			"topicSysFlag":0,
			"writeQueueNums":1
		},
		"checkout-topic-trans":{
			"attributes":{
				"message.type":"TRANSACTION"
			},
			"order":false,
			"perm":6,
			"readQueueNums":8,
			"topicFilterType":"SINGLE_TAG",
			"topicName":"checkout-topic-trans",
			"topicSysFlag":0,
			"writeQueueNums":8
		},
		"rmq_sys_SYNC_BROKER_MEMBER_JanYorkMacBook-Pro.local":{
			"attributes":{},
			"order":false,
			"perm":1,
			"readQueueNums":1,
			"topicFilterType":"SINGLE_TAG",
			"topicName":"rmq_sys_SYNC_BROKER_MEMBER_JanYorkMacBook-Pro.local",
			"topicSysFlag":0,
			"writeQueueNums":1
		},
		"checkout-topic-fifo":{
			"attributes":{
				"message.type":"FIFO"
			},
			"order":true,
			"perm":6,
			"readQueueNums":8,
			"topicFilterType":"SINGLE_TAG",
			"topicName":"checkout-topic-fifo",
			"topicSysFlag":0,
			"writeQueueNums":8
		},
		"DefaultCluster":{
			"attributes":{},
			"order":false,
			"perm":7,
			"readQueueNums":16,
			"topicFilterType":"SINGLE_TAG",
			"topicName":"DefaultCluster",
			"topicSysFlag":0,
			"writeQueueNums":16
		},
		"DefaultCluster_REPLY_TOPIC":{
			"attributes":{},
			"order":false,
			"perm":6,
			"readQueueNums":1,
			"topicFilterType":"SINGLE_TAG",
			"topicName":"DefaultCluster_REPLY_TOPIC",
			"topicSysFlag":0,
			"writeQueueNums":1
		},
		"rmq_sys_REVIVE_LOG_DefaultCluster":{
			"attributes":{},
			"order":false,
			"perm":6,
			"readQueueNums":8,
			"topicFilterType":"SINGLE_TAG",
			"topicName":"rmq_sys_REVIVE_LOG_DefaultCluster",
			"topicSysFlag":0,
			"writeQueueNums":8
		},
		"checkout-topic":{
			"attributes":{
				"message.type":"NORMAL"
			},
			"order":false,
			"perm":6,
			"readQueueNums":1,
			"topicFilterType":"SINGLE_TAG",
			"topicName":"checkout-topic",
			"topicSysFlag":0,
			"writeQueueNums":1
		},
		"RMQ_SYS_TRANS_HALF_TOPIC":{
			"attributes":{},
			"order":false,
			"perm":6,
			"readQueueNums":1,
			"topicFilterType":"SINGLE_TAG",
			"topicName":"RMQ_SYS_TRANS_HALF_TOPIC",
			"topicSysFlag":0,
			"writeQueueNums":1
		},
		"rmq_sys_SYNC_BROKER_MEMBER_broker-a":{
			"attributes":{},
			"order":false,
			"perm":1,
			"readQueueNums":1,
			"topicFilterType":"SINGLE_TAG",
			"topicName":"rmq_sys_SYNC_BROKER_MEMBER_broker-a",
			"topicSysFlag":0,
			"writeQueueNums":1
		},
		"TRANS_CHECK_MAX_TIME_TOPIC":{
			"attributes":{},
			"order":false,
			"perm":6,
			"readQueueNums":1,
			"topicFilterType":"SINGLE_TAG",
			"topicName":"TRANS_CHECK_MAX_TIME_TOPIC",
			"topicSysFlag":0,
			"writeQueueNums":1
		},
		"broker-a":{
			"attributes":{},
			"order":false,
			"perm":7,
			"readQueueNums":1,
			"topicFilterType":"SINGLE_TAG",
			"topicName":"broker-a",
			"topicSysFlag":0,
			"writeQueueNums":1
		},
		"RMQ_SYS_TRACE_TOPIC":{
			"attributes":{},
			"order":false,
			"perm":6,
			"readQueueNums":4,
			"topicFilterType":"SINGLE_TAG",
			"topicName":"RMQ_SYS_TRACE_TOPIC",
			"topicSysFlag":0,
			"writeQueueNums":4
		},
		"RMQ_SYS_TRANS_OP_HALF_TOPIC":{
			"attributes":{},
			"order":false,
			"perm":6,
			"readQueueNums":1,
			"topicFilterType":"SINGLE_TAG",
			"topicName":"RMQ_SYS_TRANS_OP_HALF_TOPIC",
			"topicSysFlag":0,
			"writeQueueNums":1
		},
		"TBW102":{
			"attributes":{},
			"order":false,
			"perm":7,
			"readQueueNums":8,
			"topicFilterType":"SINGLE_TAG",
			"topicName":"TBW102",
			"topicSysFlag":0,
			"writeQueueNums":8
		},
		"BenchmarkTest":{
			"attributes":{},
			"order":false,
			"perm":6,
			"readQueueNums":1024,
			"topicFilterType":"SINGLE_TAG",
			"topicName":"BenchmarkTest",
			"topicSysFlag":0,
			"writeQueueNums":1024
		},
		"OFFSET_MOVED_EVENT":{
			"attributes":{},
			"order":false,
			"perm":6,
			"readQueueNums":1,
			"topicFilterType":"SINGLE_TAG",
			"topicName":"OFFSET_MOVED_EVENT",
			"topicSysFlag":0,
			"writeQueueNums":1
		}
	}
}

And I also attempted the following configuration later:

		"checkout-topic-fifo":{
			"attributes":{
				"message.type":"FIFO"
			},
			"order":true,
			"perm":6,
			"readQueueNums":1,
			"topicFilterType":"SINGLE_TAG",
			"topicName":"checkout-topic-fifo",
			"topicSysFlag":0,
			"writeQueueNums":1
		},
"checkout-group":{
			"attributes":{},
			"brokerId":0,
			"consumeBroadcastEnable":true,
			"consumeEnable":true,
			"consumeFromMinEnable":true,
			"consumeMessageOrderly":false,
			"consumeTimeoutMinute":15,
			"groupName":"checkout-group",
			"groupRetryPolicy":{
				"type":"CUSTOMIZED"
			},
			"groupSysFlag":0,
			"notifyConsumerIdsChangedEnable":true,
			"retryMaxTimes":16,
			"retryQueueNums":1,
			"whichBrokerWhenConsumeSlowly":1
		}

@JanYork
Copy link
Author

JanYork commented May 17, 2024

I could not reproduce your issue.

The SimpleConsumer with fifo is basically using the pop orderly interface which is mostly server-side behavior.

It seems that it is possible that the group or topic is not been set properly. Could you please provide your group in subscriptionGroup.json and topic in topics.json which in ~/store/config to ensure that the related configuration is correct?

Then I started the producer to produce 10 messages. After that, I started the consumer and obtained the following logs:

image image

@drpmma
Copy link
Contributor

drpmma commented May 17, 2024

"checkout-group":{
	"attributes":{},
	"brokerId":0,
	"consumeBroadcastEnable":true,
	"consumeEnable":true,
	"consumeFromMinEnable":true,
	"consumeMessageOrderly":false,
	"consumeTimeoutMinute":15,
	"groupName":"checkout-group",
	"groupRetryPolicy":{
		"type":"CUSTOMIZED"
	},
	"groupSysFlag":0,
	"notifyConsumerIdsChangedEnable":true,
	"retryMaxTimes":16,
	"retryQueueNums":1,
	"whichBrokerWhenConsumeSlowly":1
}

The "consumeMessageOrderly" is false which is not been set properly.

@drpmma
Copy link
Contributor

drpmma commented May 17, 2024

When creating the consumer group, did you set it as a FIFO-ConsumerGroup? For reference, use the command sh mqadmin updateSubGroup -n <nameserver_address> -g <consumer_group_name> -c <cluster_name> -o true.

You should set the option -o true to set consumeMessageOrderly to true when updateSubGroup.

If you had read this reply carefully, then you would find out that the correct answer is already provided.

@JanYork
Copy link
Author

JanYork commented May 17, 2024

I copied this command exactly as it is, and it seems there are no errors.

When creating the consumer group, did you set it as a FIFO-ConsumerGroup? For reference, use the command sh mqadmin updateSubGroup -n <nameserver_address> -g <consumer_group_name> -c <cluster_name> -o true.

You should set the option -o true to set consumeMessageOrderly to true when updateSubGroup.

If you had read this reply carefully, then you would find out that the correct answer is already provided.

Oh!!! It was my carelessness. I overlooked the name of this group! I feel so foolish. Thank you for pointing that out.

@JanYork JanYork closed this as completed May 17, 2024
@JanYork
Copy link
Author

JanYork commented May 17, 2024

Thank you again. I submitted a very foolish issue.

@JanYork JanYork reopened this May 17, 2024
@JanYork JanYork closed this as completed May 17, 2024
@drpmma
Copy link
Contributor

drpmma commented May 17, 2024

You should use updateSubGroup command to set ConsumerMessageOrderly field of group as sh mqadmin updateSubGroup -n <nameserver_address> -g <consumer_group_name> -c <cluster_name> -o true.

And the document should be updated to add the updateSubGroup command.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants