-
Notifications
You must be signed in to change notification settings - Fork 871
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor client tracking, fix atomicity, squashing and multi/exec #2970
base: main
Are you sure you want to change the base?
Conversation
src/server/transaction.cc
Outdated
@@ -838,13 +838,23 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) { | |||
|
|||
// Runs in coordinator thread. | |||
void Transaction::Execute(RunnableType cb, bool conclude) { | |||
auto tracking_wrap = [cb, this](Transaction* t, EngineShard* shard) -> RunnableResult { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dranikpg this with the changes in InvokeCmd
seemed to be the most non intrusive way (to comply with the requirements of the state machine)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- I would like to understand why tracking requires transaction semantics (an example will be fine)
- why cid_ is not enough and we need
invoke_cid_
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would suggest to do it like we handle blocking commands - once it finished and manually from RunSquashedCb
dragonfly/src/server/transaction.cc
Lines 638 to 640 in a95419b
if (auto* bcontroller = shard->blocking_controller(); bcontroller) { | |
if (awaked_prerun || was_suspended) { | |
bcontroller->FinalizeWatched(GetShardArgs(idx), this); |
So it becomes if (concluding || (multi && multi_->concluding)) Track(this)
Now you don't need invoke_cid, etc there as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would like to understand why tracking requires transaction semantics (an example will be fine)
Because invalidation messages must be sent before the transaction concludes. Otherwise, we might accidentally skip them. An example would be:
>> CLIENT TRACKING ON
>> GET FOO
>> SET FOO BAR
>> GET FOO
>> SET FOO BAR
>> GET FOO ---------> might miss Invalidation message
A valid execution would be once we call the first SET
we will send an invalidation message as a separate transaction. Now before that even starts/concludes, the GET
that follows will get executed first and it will itself issue a separate transaction to send an invalidation message. Now the problem here is, that once we send an invalidation message we remove the key from the tracking map (since we only send invalidation messages once until the key is reread). Then the second invalidation transaction won't work because the key no longer exists in the map and we will never get that second invalidation message.
src/server/conn_context.cc
Outdated
@@ -119,6 +119,13 @@ void ConnectionContext::ChangeMonitor(bool start) { | |||
EnableMonitoring(start); | |||
} | |||
|
|||
ConnectionState::ClientTracking& ConnectionContext::ClientTrackingInfo() { | |||
if (parent_cntx_) { | |||
return parent_cntx_->conn_state.tracking_info_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That;s for squashing :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you access conn_state, don't make it a function on conn_cntx
then you can just use conn_state, you can make it mutable or add a new member like conn
dragonfly/src/server/main_service.cc
Lines 214 to 215 in a95419b
if (cntx->conn_state.squashing_info) | |
cntx = cntx->conn_state.squashing_info->owner; |
} | ||
auto& client_set = it->second; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
consider client_tracking_map_.extract(key)
function that can combine find and delete in one call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doesn't satisfy extract
requirements (for the value_type
) and generates a compiler error.
src/server/main_service.cc
Outdated
@@ -1206,6 +1186,7 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx) | |||
if (stored_cmd.Cid()->IsWriteOnly()) { | |||
dfly_cntx->conn_state.exec_info.is_write = true; | |||
} | |||
dfly_cntx->conn_state.tracking_info_.UpdatePrevAndLastCommand(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do you need to call it here?
src/server/conn_context.cc
Outdated
} | ||
|
||
void ConnectionState::ClientTracking::UpdatePrevAndLastCommand() { | ||
if (prev_command_ && multi_) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems that what you really want is to know if you are in the middle of EXEC execution and not multi.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We store so much fragile info that needs to be updated everywhere... seqnums would solve all this
src/server/conn_context.h
Outdated
// Enable tracking on the client | ||
void TrackClientCaching(); | ||
|
||
void UpdatePrevAndLastCommand(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: UdatePrevAndLastCommand name describes the implementation of this function. What it does is advancing the state. So I think it's better call it Tick or Advance or Update
src/server/conn_context.h
Outdated
// true if the previous command invoked is CLIENT CACHING TRUE | ||
bool prev_command_ = false; | ||
// true if the currently executing command is CLIENT CACHING TRUE | ||
bool executing_command_ = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename: executing_command_
to track_next_cmd_
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but the track_next_cmd_
seems misleading since it implies it's the next command. executing_command_
is the command we currently execute in InvokeCmd flow
and prev_command_
is the command before it. So:
>> GET FOO ----> prev_command
>> GET BAR ----> current_command
src/server/conn_context.h
Outdated
bool optin_ = false; | ||
// remember if CLIENT CACHING TRUE was the last command | ||
// true if the previous command invoked is CLIENT CACHING TRUE | ||
bool prev_command_ = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename prev_command_
to track_current_cmd_
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks somewhat over engineered to me 😅 We have similar semantics in blocking commands - only that we subscribe with specific commands and not with any.
- Let's use squashing_info instead of adding a parent field to ConnectionContext or let's use that field for everything - there should be one way of doing things with proper comments, so nobody adds yet a third
- I'd still suggest to add numbers to commands, because
UpdatePrevAndLastCommand()
appears in many places and we update three whole fileds: prev, executing, multi. The track command can just store its number and we don't have to update much more - Track() should be called when we conclude or finish the current multi command, currently we call it for every hop. Not that there are multi-hop read commands, but I think it belongs to all other management code. Invoke-cid should also not be needed with that
src/server/transaction.cc
Outdated
@@ -838,13 +838,23 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) { | |||
|
|||
// Runs in coordinator thread. | |||
void Transaction::Execute(RunnableType cb, bool conclude) { | |||
auto tracking_wrap = [cb, this](Transaction* t, EngineShard* shard) -> RunnableResult { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would suggest to do it like we handle blocking commands - once it finished and manually from RunSquashedCb
dragonfly/src/server/transaction.cc
Lines 638 to 640 in a95419b
if (auto* bcontroller = shard->blocking_controller(); bcontroller) { | |
if (awaked_prerun || was_suspended) { | |
bcontroller->FinalizeWatched(GetShardArgs(idx), this); |
So it becomes if (concluding || (multi && multi_->concluding)) Track(this)
Now you don't need invoke_cid, etc there as well
src/server/conn_context.cc
Outdated
@@ -119,6 +119,13 @@ void ConnectionContext::ChangeMonitor(bool start) { | |||
EnableMonitoring(start); | |||
} | |||
|
|||
ConnectionState::ClientTracking& ConnectionContext::ClientTrackingInfo() { | |||
if (parent_cntx_) { | |||
return parent_cntx_->conn_state.tracking_info_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you access conn_state, don't make it a function on conn_cntx
then you can just use conn_state, you can make it mutable or add a new member like conn
dragonfly/src/server/main_service.cc
Lines 214 to 215 in a95419b
if (cntx->conn_state.squashing_info) | |
cntx = cntx->conn_state.squashing_info->owner; |
src/server/conn_context.cc
Outdated
} | ||
|
||
void ConnectionState::ClientTracking::UpdatePrevAndLastCommand() { | ||
if (prev_command_ && multi_) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We store so much fragile info that needs to be updated everywhere... seqnums would solve all this
src/server/conn_context.h
Outdated
ConnectionContext* parent_cntx_ = nullptr; | ||
|
||
ConnectionState::ClientTracking& ClientTrackingInfo(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See previous comment on whether we can keep this in conn_state
there is a deeper problem with suppose you have
Now, if you squash the last 3, you loose the order - becuase |
lets reject |
in fact, should we even allow |
Only |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some nits remaining, LGTM
src/server/conn_context.cc
Outdated
if ((cid->opt_mask() & CO::READONLY) && cid->IsTransactional() && info.ShouldTrackKeys()) { | ||
auto conn = cntx->parent_cntx_ ? cntx->parent_cntx_->conn()->Borrow() : cntx->conn()->Borrow(); | ||
auto cb = [&, conn](unsigned i, auto* pb) { | ||
if (shards.find(i) != shards.end()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: There is IsActive() so you don't need GetActiveShards()
src/server/transaction.h
Outdated
void SetConnectionContextAndInvokeCid(ConnectionContext* cntx) { | ||
cntx_ = cntx; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: please rename it then
ConnectionContext* cntx_{nullptr}; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll invent something in the future to get rid of this 😆
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to wait for the future, I think transactions should not be aware of ConnectionContext. It's a design choice and it is possible to preserve it. Let's introduce a on_track_cb
that will be passed to transaction if tracking is needed, i.e. if the condition ((cid->opt_mask() & CO::READONLY) && cid->IsTransactional() && info.ShouldTrackKeys())
holds. The callback will call OpTrackKeys
. This way ClientTracking::Track
will disappear at all
<< " with thread ID: " << conn_ref.Thread(); | ||
|
||
auto& db_slice = slice_args.shard->db_slice(); | ||
// TODO: There is a bug here that we track all arguments instead of tracking only keys. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah, found it, yes that's also left 🙂
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's correct now. Note there are some unresolved nits by Roman
@@ -1119,7 +1118,8 @@ void Connection::HandleMigrateRequest() { | |||
this->Migrate(dest); | |||
} | |||
|
|||
DCHECK(dispatch_q_.empty()); | |||
// This triggers on rueidis SingleIntegrationTest |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this comment can be improved. Explain how it happens that DCHECK(dispatch_q_.empty());
fails after the migration, "rueidis SingleIntegrationTest" is irrelevant here
return OpStatus::OK; | ||
} | ||
|
||
void ConnectionState::ClientTracking::Track(ConnectionContext* cntx, const CommandId* cid) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this function now is called within the Shard. We have a convention to name it as xxxOnShard
to stress it.
auto& info = cntx->conn_state.tracking_info_; | ||
if ((cid->opt_mask() & CO::READONLY) && cid->IsTransactional() && info.ShouldTrackKeys()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe the code can be moved to the coordinator thread, i.e. when we prepare the transaction. the result can be kept as a boolean within transaction. Nothing here is relevant to the shard or the transaction state. we have Transaction::coordinator_state_
mask that can be used for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually, see my other comment.
CLIENT CACHING TRUE
(only to be used with TRACKING OPTIN)Resolves #2969, #2971, #2997, #2998
P.s. All tests in rueidis TestSingleClientIntegration pass except pub/sub because we don't yet support it see #3001