-
Notifications
You must be signed in to change notification settings - Fork 11.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix #1680 #1682
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good stuff -- document the magic constant, and it seems v low. Second, there seems to be complex separate tasks to maintain the hashmap -- is all this necessary as separate components?
sui/src/sui_commands.rs
Outdated
rx_sui_to_consensus, | ||
rx_consensus_to_sui, | ||
/* max_pending_transactions */ 10_000, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the implication of this? Make a const and document what it does?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
adding a comment
sui_core/src/consensus_adapter.rs
Outdated
/// Receive consensus outputs. | ||
rx_consensus_output: Receiver<ConsensusOutput>, | ||
/// The maximum number of pending replies. | ||
max_pending_transactions: usize, | ||
/// Keep a map of all consensus inputs that are currently being sequenced. | ||
pending: HashMap<ConsensusTransactionDigest, Vec<Replier>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we keep the full certificate instead of just the digest? We might want to re-submit, or share at some point? Should this be persisted to disk, or are we ok if it goes away upon crash.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will be the job of the client (and gateway) to handle resubmit in case of timeouts or crashes. It is ok if this goes away upon crash, the client will simply have to re-submit the certificate.
} | ||
} | ||
} | ||
|
||
/// Main loop receiving messages input to consensus and notifying the caller once the inputs | ||
/// are sequenced (of if an error happened). | ||
async fn run(&mut self) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Key question: why is all this happening through message passing and separate tasks instead of the task that interacts with consensus keeping the hashmap and updating when transactions go in, and when transactions go out? Are there more than 1 threads injecting and consuming messages to/from consensus?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is not clear to me how to best embed this with the current Sui logic. There is one task (ConsensusListener
) that receives consensus outputs. The ConsensusAdapter
is not a task, it simply provides helpers to interact with consensus that the AuthorityServer
uses.
We have many tasks injecting transactions to consensus (the same tasks running the AuthorityServer
, one per client). We however have a single task handling the output of consensus (ConsensusListener
).
@asonnino |
It doesn't matter here, we don't need ordering of any kind |
@@ -838,9 +838,7 @@ impl<const ALL_OBJ_VER: bool, S: Eq + Serialize + for<'de> Deserialize<'de>> | |||
// sequence number (`OBJECT_START_VERSION`). Otherwise use the `scheduled` map to | |||
// to assign the next sequence number. | |||
let version = v.unwrap_or_else(|| OBJECT_START_VERSION); | |||
let next_version = v | |||
.map(|v| v.increment()) | |||
.unwrap_or_else(|| SequenceNumber::from(2)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you unwrap_or_else
a const at OBJECT_START_VERSION + 1
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we could, but this is even simpler let next_version = version.increment();
sui_core/src/consensus_adapter.rs
Outdated
Ok(reply) => reply.expect("Channel with consensus listener dropped"), | ||
Err(e) => { | ||
let message = ConsensusListenerMessage::Cleanup(serialized); | ||
self.tx_consensus_listener | ||
.send(message) | ||
.await | ||
.expect("Channel with consensus listener dropped"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Nit: we're reusing the same message in two places. Local var?
- Could we also use
warn!
logging under a suitable span?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is not the same, once it is New
the other is Cleanup
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I meant error message: "Channel with consensus listener dropped"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah ok, made clearer error messages (they could be made more specific)
ConsensusListenerMessage::Cleanup(transaction) => { | ||
let digest = Self::hash(&transaction); | ||
let _ = self.pending.get_mut(&digest).and_then(|x| x.pop()); | ||
if self.pending.get(&digest).map_or_else(|| false, |x| x.is_empty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't wait for inspect
to stabilize.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this really necessary, or could we just periodically self.pending.retain(|x| !x.is_empty())
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could but it's extra LOC
sui_core/src/consensus_adapter.rs
Outdated
Ok(reply) => reply.expect("Channel with consensus listener dropped"), | ||
Err(e) => { | ||
let message = ConsensusListenerMessage::Cleanup(serialized); | ||
self.tx_consensus_listener | ||
.send(message) | ||
.await | ||
.expect("Channel with consensus listener dropped"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I meant error message: "Channel with consensus listener dropped"
/// than this cap, the transactions will be handled by consensus as usual but this module won't | ||
/// be keeping track of when they are sequenced. Its only purpose is to ensure the field called | ||
/// `pending` has a maximum size. | ||
max_pending_transactions: usize, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So if this cap is exceeded there can be clients waiting for an ack forever? Shouldn't we be rejecting instead
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The channel drops so the client will knows automatically. Although you're right that this can be more explicit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let me make a note for this
More info: #1680