Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to use vertx-redis-client well? The version is 4.1.2 #304

Open
aloserman opened this issue Jul 27, 2021 · 3 comments
Open

How to use vertx-redis-client well? The version is 4.1.2 #304

aloserman opened this issue Jul 27, 2021 · 3 comments
Labels

Comments

@aloserman
Copy link

When I wrote the following code and found that a large amount of data entered, it could not work properly. The cause is as follows: Future{cause=Connection pool reached max wait queue size of 200}; All data is discarded。 How can we ensure that a piece of data is not lost and use it correctly. I did not find a good way to use it in the official

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);

        vertx = Vertx.vertx();
        RedisOptions options = new RedisOptions();
        options.setType(RedisClientType.STANDALONE)
                .setMaxPoolSize(30)
                .setMaxWaitingHandlers(1024)
                .setPoolRecycleTimeout(15_000)
                .setMaxPoolWaiting(200)
                .setPassword("password");

        List<String> clusters = new ArrayList<>();
        clusters.add("redis://host:port/db");
        options.setEndpoints(clusters);
        Redis client = Redis.createClient(vertx, options);
        api = RedisAPI.api(client);
    }

    @Override
    public void invoke(List<String> value, Context context) throws Exception {
        api.hset(value, new Handler<AsyncResult<Response>>() {
            @Override
            public void handle(AsyncResult<Response> responseAsyncResult) {
                if (!responseAsyncResult.succeeded()) {
                    System.out.println("-----------failed");
                    System.out.println(responseAsyncResult.cause());
                }
            }
        });
    }

Future{cause=Connection pool reached max wait queue size of 2000}

@pmlopes
Copy link
Member

pmlopes commented Jul 27, 2021

@aloserman you're used the pooled mode, which means that for each command, a connection is acquired from the pool, the command executed, and the connection returned to the pool.

Given your error, it seems that you're invoking the invoke method with a high frequency, the frequency is so high that the bottleneck becomes getting and returning the connection.

To solve this, you should reuse the connection:

Redis client = Redis.createClient(vertx, options);
client.connect()
  .onSuccess(conn -> {
        api = RedisAPI.api(conn);
  });

This way only a single connection will be used and all the queuing is controlled by: setMaxWaitingHandlers making it a large number usually is good enough.

Remember that now is also up to your application to decide when to release the connection:

conn.close();

@aloserman
Copy link
Author

aloserman commented Jul 28, 2021

Through your guidance, the code is like this, but there is an exception that cannot be serialized. How can I solve it?
` @OverRide
public void open(Configuration parameters) throws Exception {
super.open(parameters);

            vertx = Vertx.vertx();
            options.setType(RedisClientType.STANDALONE)
                    .setMaxPoolSize(30)
                    .setMaxWaitingHandlers(1024)
                    .setPoolRecycleTimeout(15_000)
                    .setMaxPoolWaiting(-1)
                    .setPassword("password");

            List<String> clusters = new ArrayList<>();
            clusters.add("redis://host:port/db");
            options.setEndpoints(clusters);
        }
        @Override
        public void invoke(List<String> value, Context context) throws Exception {
            Redis client = Redis.createClient(vertx, options);
            client.connect().onSuccess(new Handler<RedisConnection>() {
                @Override
                public void handle(RedisConnection conn) {
                    RedisAPI api = RedisAPI.api(conn);
                    api.hset(value, new Handler<AsyncResult<Response>>() {
                        @Override
                        public void handle(AsyncResult<Response> event) {
                            if (!event.succeeded()) {
                                System.out.println("-----------failed");
                                System.out.println(event.cause());
                            }
                            api.close();
                            if (conn != null) {
                                conn.close();
                            }
                        }
                    });
                }
            });
        }`

exception as follow:
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: io.vertx.core.net.NetClientOptions@13805618 is not serializable. The object probably contains or references non serializable fields. at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:164) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:69) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:2000) at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:203) at org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1243) at com.test.findbug.TestRedisAsyncInsert.main(TestRedisAsyncInsert.java:44) Caused by: java.io.NotSerializableException: io.vertx.core.net.NetClientOptions at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:624) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:143) ... 7 more

@aloserman
Copy link
Author

When the data emit speed is very fast, the client will be blocked and no data will be sent out. How can we ensure that when the processing reaches the bottleneck, the data will not be discarded and can be sent normally.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Development

No branches or pull requests

2 participants