fix(copy_data): add retries for copy_data command#916
Conversation
| @@ -106,22 +146,19 @@ impl ParallelSyncManager { | |||
| self.permit.available_permits() / self.replicas.len(), | |||
| ); | |||
|
|
|||
| let mut replicas_iter = self.replicas.iter(); | |||
| // Loop through replicas, one at a time. | |||
| // This works around Rust iterators not having a "rewind" function. | |||
| let replica = loop { | |||
| if let Some(replica) = replicas_iter.next() { | |||
| break replica; | |||
| } else { | |||
| replicas_iter = self.replicas.iter(); | |||
| } | |||
| }; | |||
| // cycle() is the idiomatic "rewind": it restarts the iterator from the | |||
| // beginning once exhausted, giving round-robin distribution across replicas. | |||
| let mut replicas_iter = self.replicas.iter().cycle(); | |||
|
|
|||
There was a problem hiding this comment.
@levkk could you please check this. Did I get the idea right about the original intention?
This comment has been minimized.
This comment has been minimized.
Codecov Report❌ Patch coverage is 📢 Thoughts on this report? Let us know! |
|
|
||
| tokio::time::sleep(backoff).await; | ||
|
|
||
| if let Err(trunc_err) = self.table.truncate_destination(&self.dest).await { |
There was a problem hiding this comment.
I don't think we should do this for two reasons:
COPYis atomic and transactional: if it fails, none of the rows will be saved in the table; it will be empty when we retryTRUNCATEis a scary command to run for now. We should add a bunch more tests and conditions that prevent this from accidentally being called on the source DB. For now, let's have the user truncate manually if this retry logic fails for some reason.
There is a chance for a race condition where the table copy completes and we get an error somewhere below, e.g., while running COMMIT, but the odds of that are slim. We should definitely account for this (and truncate), but after we implement a few "this is definitely the destination" checks. I'll write up a separate issue for this.
There was a problem hiding this comment.
yes, dropped the truncate, left the comment for future
| Ok((0, 0)) | ||
| } | ||
|
|
||
| async fn flush(&mut self) -> Result<(usize, usize), Error> { |
There was a problem hiding this comment.
send_one is actually not free! The ParallelConnection gave us I think a 30-40% copy speed boost. Have you benchmarked this? Also curious what made you refactor this one.
There was a problem hiding this comment.
moved to separate pr to verify and test it #920
This reverts commit 5560276.
|
|
||
| impl ShardMonitor { | ||
| async fn spawn(&self) { | ||
| if self.shard.comms().lsn_check_interval == Duration::MAX { |
There was a problem hiding this comment.
We test this code somewhere in CI right? Just double checking. This is the replica/primary promoter, so we need to be sure its tested before tweaking it. At the very least, test it locally (with role = "auto") making sure it still works. I think this change is fine, although probably a no-op since the code below won't take any action if data provided by lsn_check_interval-controlled loop isn't provided.
| /// Prevents accumulated counts from a discarded attempt inflating totals | ||
| /// and throughput calculations across retries. | ||
| pub(crate) fn reset(&self) { | ||
| if let Some(mut state) = TableCopies::get().get_mut(self) { |
There was a problem hiding this comment.
Not a bad idea to track retries, but can add that as a follow-up.
|
The mirror test is flaky. The rest looks good to me! |
fixes #897