aboutsummaryrefslogtreecommitdiff
path: root/pw_transfer/java/test/dev/pigweed/pw_transfer/TransferClientTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'pw_transfer/java/test/dev/pigweed/pw_transfer/TransferClientTest.java')
-rw-r--r--pw_transfer/java/test/dev/pigweed/pw_transfer/TransferClientTest.java870
1 files changed, 515 insertions, 355 deletions
diff --git a/pw_transfer/java/test/dev/pigweed/pw_transfer/TransferClientTest.java b/pw_transfer/java/test/dev/pigweed/pw_transfer/TransferClientTest.java
index 3a61b362a..898f0e9b0 100644
--- a/pw_transfer/java/test/dev/pigweed/pw_transfer/TransferClientTest.java
+++ b/pw_transfer/java/test/dev/pigweed/pw_transfer/TransferClientTest.java
@@ -92,11 +92,11 @@ public final class TransferClientTest {
createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY);
ListenableFuture<byte[]> future = transferClient.read(1, TRANSFER_PARAMETERS);
- assertThat(lastChunks()).containsExactly(initialReadChunk(1, ProtocolVersion.LEGACY));
+ assertThat(lastChunks()).containsExactly(initialLegacyReadChunk(1));
receiveReadServerError(Status.FAILED_PRECONDITION);
- assertThat(lastChunks()).containsExactly(initialReadChunk(1, ProtocolVersion.LEGACY));
+ assertThat(lastChunks()).containsExactly(initialLegacyReadChunk(1));
receiveReadChunks(newLegacyChunk(Chunk.Type.DATA, 1)
.setOffset(0)
@@ -112,7 +112,7 @@ public final class TransferClientTest {
TransferParameters params = TransferParameters.create(50, 50, 0);
ListenableFuture<byte[]> future = transferClient.read(1, params);
- assertThat(lastChunks()).containsExactly(initialReadChunk(1, ProtocolVersion.LEGACY, params));
+ assertThat(lastChunks()).containsExactly(initialLegacyReadChunk(1, params));
receiveReadChunks(legacyDataChunk(1, TEST_DATA_100B, 0, 50));
@@ -140,7 +140,7 @@ public final class TransferClientTest {
receiveReadServerError(Status.FAILED_PRECONDITION);
}
- Chunk initialChunk = initialReadChunk(1, ProtocolVersion.LEGACY);
+ Chunk initialChunk = initialLegacyReadChunk(1);
assertThat(lastChunks())
.containsExactlyElementsIn(Collections.nCopies(1 + MAX_RETRIES, initialChunk));
@@ -207,7 +207,7 @@ public final class TransferClientTest {
TransferParameters params = TransferParameters.create(3, 2, 1);
ListenableFuture<byte[]> future = transferClient.read(99, params);
- assertThat(lastChunks()).containsExactly(initialReadChunk(99, ProtocolVersion.LEGACY, params));
+ assertThat(lastChunks()).containsExactly(initialLegacyReadChunk(99, params));
assertThat(future.cancel(true)).isTrue();
}
@@ -216,7 +216,7 @@ public final class TransferClientTest {
createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY);
ListenableFuture<byte[]> future = transferClient.read(123, TRANSFER_PARAMETERS);
- assertThat(lastChunks()).containsExactly(initialReadChunk(123, ProtocolVersion.LEGACY));
+ assertThat(lastChunks()).containsExactly(initialLegacyReadChunk(123));
receiveReadChunks(newLegacyChunk(Chunk.Type.DATA, 123)
.setOffset(0)
@@ -291,7 +291,7 @@ public final class TransferClientTest {
createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY);
ListenableFuture<byte[]> future = transferClient.read(123, TRANSFER_PARAMETERS);
- assertThat(lastChunks()).containsExactly(initialReadChunk(123, ProtocolVersion.LEGACY));
+ assertThat(lastChunks()).containsExactly(initialLegacyReadChunk(123));
receiveReadChunks(newLegacyChunk(Chunk.Type.DATA, 123).setOffset(50).setData(range(30, 50)));
@@ -448,9 +448,8 @@ public final class TransferClientTest {
// read should have retried sending the transfer parameters 2 times, for a total of 3
assertThat(lastChunks())
- .containsExactly(initialReadChunk(123, ProtocolVersion.LEGACY),
- initialReadChunk(123, ProtocolVersion.LEGACY),
- initialReadChunk(123, ProtocolVersion.LEGACY));
+ .containsExactly(
+ initialLegacyReadChunk(123), initialLegacyReadChunk(123), initialLegacyReadChunk(123));
}
@Test
@@ -491,13 +490,11 @@ public final class TransferClientTest {
createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY);
ListenableFuture<Void> future = transferClient.write(2, TEST_DATA_SHORT.toByteArray());
- assertThat(lastChunks())
- .containsExactly(initialWriteChunk(2, ProtocolVersion.LEGACY, TEST_DATA_SHORT.size()));
+ assertThat(lastChunks()).containsExactly(initialLegacyWriteChunk(2, TEST_DATA_SHORT.size()));
receiveWriteServerError(Status.FAILED_PRECONDITION);
- assertThat(lastChunks())
- .containsExactly(initialWriteChunk(2, ProtocolVersion.LEGACY, TEST_DATA_SHORT.size()));
+ assertThat(lastChunks()).containsExactly(initialLegacyWriteChunk(2, TEST_DATA_SHORT.size()));
receiveWriteChunks(newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 2)
.setOffset(0)
@@ -519,7 +516,7 @@ public final class TransferClientTest {
.setMaxChunkSizeBytes(50));
assertThat(lastChunks())
- .containsExactly(initialWriteChunk(2, ProtocolVersion.LEGACY, TEST_DATA_100B.size()),
+ .containsExactly(initialLegacyWriteChunk(2, TEST_DATA_100B.size()),
legacyDataChunk(2, TEST_DATA_100B, 0, 50));
receiveWriteServerError(Status.FAILED_PRECONDITION);
@@ -538,7 +535,7 @@ public final class TransferClientTest {
receiveWriteServerError(Status.FAILED_PRECONDITION);
}
- Chunk initialChunk = initialWriteChunk(2, ProtocolVersion.LEGACY, TEST_DATA_SHORT.size());
+ Chunk initialChunk = initialLegacyWriteChunk(2, TEST_DATA_SHORT.size());
assertThat(lastChunks())
.containsExactlyElementsIn(Collections.nCopies(1 + MAX_RETRIES, initialChunk));
@@ -559,7 +556,7 @@ public final class TransferClientTest {
receiveWriteChunks(legacyFinalChunk(2, Status.OK));
- assertThat(lastChunks()).containsExactly(initialWriteChunk(2, ProtocolVersion.LEGACY, 0));
+ assertThat(lastChunks()).containsExactly(initialLegacyWriteChunk(2, 0));
assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown.
}
@@ -569,8 +566,7 @@ public final class TransferClientTest {
createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY);
ListenableFuture<Void> future = transferClient.write(123, TEST_DATA_100B.toByteArray());
- assertThat(lastChunks())
- .containsExactly(initialWriteChunk(123, ProtocolVersion.LEGACY, TEST_DATA_100B.size()));
+ assertThat(lastChunks()).containsExactly(initialLegacyWriteChunk(123, TEST_DATA_100B.size()));
receiveWriteChunks(newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123)
.setOffset(0)
@@ -615,8 +611,7 @@ public final class TransferClientTest {
createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY);
ListenableFuture<Void> future = transferClient.write(123, TEST_DATA_100B.toByteArray());
- assertThat(lastChunks())
- .containsExactly(initialWriteChunk(123, ProtocolVersion.LEGACY, TEST_DATA_100B.size()));
+ assertThat(lastChunks()).containsExactly(initialLegacyWriteChunk(123, TEST_DATA_100B.size()));
receiveWriteChunks(newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123)
.setOffset(0)
@@ -664,8 +659,7 @@ public final class TransferClientTest {
createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.LEGACY);
ListenableFuture<Void> future = transferClient.write(123, TEST_DATA_100B.toByteArray());
- assertThat(lastChunks())
- .containsExactly(initialWriteChunk(123, ProtocolVersion.LEGACY, TEST_DATA_100B.size()));
+ assertThat(lastChunks()).containsExactly(initialLegacyWriteChunk(123, TEST_DATA_100B.size()));
receiveWriteChunks(newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123)
.setOffset(0)
@@ -734,7 +728,7 @@ public final class TransferClientTest {
.setMaxChunkSizeBytes(25));
assertThat(lastChunks())
- .containsExactly(initialWriteChunk(123, ProtocolVersion.LEGACY, TEST_DATA_100B.size()),
+ .containsExactly(initialLegacyWriteChunk(123, TEST_DATA_100B.size()),
newLegacyChunk(Chunk.Type.DATA, 123).setOffset(100).setRemainingBytes(0).build());
assertThat(future.isDone()).isFalse();
}
@@ -800,7 +794,7 @@ public final class TransferClientTest {
.setMaxChunkSizeBytes(25));
assertThat(lastChunks())
- .containsExactly(initialWriteChunk(123, ProtocolVersion.LEGACY, TEST_DATA_100B.size()),
+ .containsExactly(initialLegacyWriteChunk(123, TEST_DATA_100B.size()),
legacyFinalChunk(123, Status.OUT_OF_RANGE));
ExecutionException thrown = assertThrows(ExecutionException.class, future::get);
@@ -874,10 +868,9 @@ public final class TransferClientTest {
// Client should have resent the last chunk (the initial chunk in this case) for each timeout.
assertThat(lastChunks())
- .containsExactly(
- initialWriteChunk(123, ProtocolVersion.LEGACY, TEST_DATA_SHORT.size()), // initial
- initialWriteChunk(123, ProtocolVersion.LEGACY, TEST_DATA_SHORT.size()), // retry 1
- initialWriteChunk(123, ProtocolVersion.LEGACY, TEST_DATA_SHORT.size())); // retry 2
+ .containsExactly(initialLegacyWriteChunk(123, TEST_DATA_SHORT.size()), // initial
+ initialLegacyWriteChunk(123, TEST_DATA_SHORT.size()), // retry 1
+ initialLegacyWriteChunk(123, TEST_DATA_SHORT.size())); // retry 2
}
@Test
@@ -901,8 +894,7 @@ public final class TransferClientTest {
.setRemainingBytes(0)
.build();
assertThat(lastChunks())
- .containsExactly(
- initialWriteChunk(123, ProtocolVersion.LEGACY, TEST_DATA_SHORT.size()), // initial
+ .containsExactly(initialLegacyWriteChunk(123, TEST_DATA_SHORT.size()), // initial
data, // data chunk
data, // retry 1
data); // retry 2
@@ -944,7 +936,7 @@ public final class TransferClientTest {
assertThat(lastChunks())
.containsExactly(
// initial
- initialWriteChunk(123, ProtocolVersion.LEGACY, TEST_DATA_100B.size()),
+ initialLegacyWriteChunk(123, TEST_DATA_100B.size()),
// after 2, receive parameters: 40 from 0 by 20
legacyDataChunk(123, TEST_DATA_100B, 0, 20), // data 0-20
legacyDataChunk(123, TEST_DATA_100B, 20, 40), // data 20-40
@@ -966,29 +958,33 @@ public final class TransferClientTest {
public void read_singleChunk_successful() throws Exception {
createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
ListenableFuture<byte[]> future = transferClient.read(3, TRANSFER_PARAMETERS);
+ ReadTransfer transfer = transferClient.getReadTransferForTest(future);
assertThat(future.isDone()).isFalse();
- assertThat(lastChunks()).containsExactly(initialReadChunk(3, ProtocolVersion.VERSION_TWO));
+ assertThat(lastChunks()).containsExactly(initialReadChunk(transfer));
- receiveReadChunks(newChunk(Chunk.Type.START_ACK, 321)
+ receiveReadChunks(newChunk(Chunk.Type.START_ACK, transfer.getSessionId())
.setResourceId(3)
.setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal()));
- assertThat(lastChunks()).containsExactly(readStartAckConfirmation(321, TRANSFER_PARAMETERS));
+ assertThat(lastChunks())
+ .containsExactly(readStartAckConfirmation(transfer.getSessionId(), TRANSFER_PARAMETERS));
- receiveReadChunks(
- newChunk(Chunk.Type.DATA, 321).setOffset(0).setData(TEST_DATA_SHORT).setRemainingBytes(0));
+ receiveReadChunks(newChunk(Chunk.Type.DATA, transfer.getSessionId())
+ .setOffset(0)
+ .setData(TEST_DATA_SHORT)
+ .setRemainingBytes(0));
assertThat(lastChunks())
.containsExactly(Chunk.newBuilder()
.setType(Chunk.Type.COMPLETION)
- .setSessionId(321)
+ .setSessionId(transfer.getSessionId())
.setStatus(Status.OK.ordinal())
.build());
assertThat(future.isDone()).isFalse();
- receiveReadChunks(newChunk(Chunk.Type.COMPLETION_ACK, 321));
+ receiveReadChunks(newChunk(Chunk.Type.COMPLETION_ACK, transfer.getSessionId()));
assertThat(future.get()).isEqualTo(TEST_DATA_SHORT.toByteArray());
}
@@ -997,9 +993,10 @@ public final class TransferClientTest {
public void read_requestV2ReceiveLegacy() throws Exception {
createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
ListenableFuture<byte[]> future = transferClient.read(1, TRANSFER_PARAMETERS);
+ ReadTransfer transfer = transferClient.getReadTransferForTest(future);
assertThat(future.isDone()).isFalse();
- assertThat(lastChunks()).containsExactly(initialReadChunk(1, ProtocolVersion.VERSION_TWO));
+ assertThat(lastChunks()).containsExactly(initialReadChunk(transfer));
receiveReadChunks(newLegacyChunk(Chunk.Type.DATA, 1)
.setOffset(0)
@@ -1016,19 +1013,21 @@ public final class TransferClientTest {
public void read_failedPreconditionError_retriesInitialPacket() {
createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
ListenableFuture<byte[]> future = transferClient.read(1, TRANSFER_PARAMETERS);
+ ReadTransfer transfer = transferClient.getReadTransferForTest(future);
- assertThat(lastChunks()).containsExactly(initialReadChunk(1, ProtocolVersion.VERSION_TWO));
+ assertThat(lastChunks()).containsExactly(initialReadChunk(transfer));
for (int i = 0; i < MAX_RETRIES; ++i) {
receiveReadServerError(Status.FAILED_PRECONDITION);
- assertThat(lastChunks()).containsExactly(initialReadChunk(1, ProtocolVersion.VERSION_TWO));
+ assertThat(lastChunks()).containsExactly(initialReadChunk(transfer));
}
- receiveReadChunks(newChunk(Chunk.Type.START_ACK, 54321)
+ receiveReadChunks(newChunk(Chunk.Type.START_ACK, transfer.getSessionId())
.setResourceId(1)
.setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal()));
- assertThat(lastChunks()).containsExactly(readStartAckConfirmation(54321, TRANSFER_PARAMETERS));
+ assertThat(lastChunks())
+ .containsExactly(readStartAckConfirmation(transfer.getSessionId(), TRANSFER_PARAMETERS));
}
@Test
@@ -1036,11 +1035,11 @@ public final class TransferClientTest {
createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
TransferParameters params = TransferParameters.create(50, 50, 0);
ListenableFuture<byte[]> future = transferClient.read(1, params);
+ ReadTransfer transfer = transferClient.getReadTransferForTest(future);
- assertThat(lastChunks())
- .containsExactly(initialReadChunk(1, ProtocolVersion.VERSION_TWO, params));
+ assertThat(lastChunks()).containsExactly(initialReadChunk(transfer));
- receiveReadChunks(newChunk(Chunk.Type.START_ACK, 555)
+ receiveReadChunks(newChunk(Chunk.Type.START_ACK, transfer.getSessionId())
.setResourceId(1)
.setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal()));
@@ -1056,20 +1055,21 @@ public final class TransferClientTest {
createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
TransferParameters params = TransferParameters.create(50, 50, 0);
ListenableFuture<byte[]> future = transferClient.read(1, params);
+ ReadTransfer transfer = transferClient.getReadTransferForTest(future);
- assertThat(lastChunks())
- .containsExactly(initialReadChunk(1, ProtocolVersion.VERSION_TWO, params));
+ assertThat(lastChunks()).containsExactly(initialReadChunk(transfer));
- receiveReadChunks(newChunk(Chunk.Type.START_ACK, 555)
+ receiveReadChunks(newChunk(Chunk.Type.START_ACK, transfer.getSessionId())
.setResourceId(1)
.setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal()));
- assertThat(lastChunks()).containsExactly(readStartAckConfirmation(555, params));
+ assertThat(lastChunks())
+ .containsExactly(readStartAckConfirmation(transfer.getSessionId(), params));
- receiveReadChunks(dataChunk(555, TEST_DATA_100B, 0, 50));
+ receiveReadChunks(dataChunk(transfer.getSessionId(), TEST_DATA_100B, 0, 50));
assertThat(lastChunks())
- .containsExactly(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 555)
+ .containsExactly(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, transfer.getSessionId())
.setOffset(50)
.setWindowEndOffset(100)
.setMaxChunkSizeBytes(50)
@@ -1086,12 +1086,13 @@ public final class TransferClientTest {
public void read_failedPreconditionErrorMaxRetriesTimes_aborts() {
createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
ListenableFuture<byte[]> future = transferClient.read(1, TRANSFER_PARAMETERS);
+ ReadTransfer transfer = transferClient.getReadTransferForTest(future);
for (int i = 0; i < MAX_RETRIES; ++i) {
receiveReadServerError(Status.FAILED_PRECONDITION);
}
- Chunk initialChunk = initialReadChunk(1, ProtocolVersion.VERSION_TWO);
+ Chunk initialChunk = initialReadChunk(transfer);
assertThat(lastChunks())
.containsExactlyElementsIn(Collections.nCopies(1 + MAX_RETRIES, initialChunk));
@@ -1108,23 +1109,38 @@ public final class TransferClientTest {
public void read_singleChunk_ignoresUnknownIdOrWriteChunks() throws Exception {
createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
ListenableFuture<byte[]> future = transferClient.read(1);
+ ReadTransfer transfer = transferClient.getReadTransferForTest(future);
assertThat(future.isDone()).isFalse();
- performReadStartHandshake(1, 99);
+ performReadStartHandshake(transfer);
- receiveReadChunks(finalChunk(2, Status.OK),
- newChunk(Chunk.Type.DATA, 1).setOffset(0).setData(TEST_DATA_100B).setRemainingBytes(0),
- newChunk(Chunk.Type.DATA, 3).setOffset(0).setData(TEST_DATA_100B).setRemainingBytes(0));
- receiveWriteChunks(finalChunk(99, Status.INVALID_ARGUMENT),
- newChunk(Chunk.Type.DATA, 99).setOffset(0).setData(TEST_DATA_100B).setRemainingBytes(0),
- newChunk(Chunk.Type.DATA, 2).setOffset(0).setData(TEST_DATA_100B).setRemainingBytes(0));
+ receiveReadChunks(finalChunk(transfer.getSessionId() + 1, Status.OK),
+ newChunk(Chunk.Type.DATA, transfer.getSessionId() + 2)
+ .setOffset(0)
+ .setData(TEST_DATA_100B)
+ .setRemainingBytes(0),
+ newChunk(Chunk.Type.DATA, transfer.getSessionId() + 3)
+ .setOffset(0)
+ .setData(TEST_DATA_100B)
+ .setRemainingBytes(0));
+ receiveWriteChunks(finalChunk(transfer.getSessionId(), Status.INVALID_ARGUMENT),
+ newChunk(Chunk.Type.DATA, transfer.getSessionId())
+ .setOffset(0)
+ .setData(TEST_DATA_100B)
+ .setRemainingBytes(0),
+ newChunk(Chunk.Type.DATA, transfer.getSessionId() + 1)
+ .setOffset(0)
+ .setData(TEST_DATA_100B)
+ .setRemainingBytes(0));
assertThat(future.isDone()).isFalse();
- receiveReadChunks(
- newChunk(Chunk.Type.DATA, 99).setOffset(0).setData(TEST_DATA_SHORT).setRemainingBytes(0));
+ receiveReadChunks(newChunk(Chunk.Type.DATA, transfer.getSessionId())
+ .setOffset(0)
+ .setData(TEST_DATA_SHORT)
+ .setRemainingBytes(0));
- performReadCompletionHandshake(99, Status.OK);
+ performReadCompletionHandshake(transfer.getSessionId(), Status.OK);
assertThat(future.get()).isEqualTo(TEST_DATA_SHORT.toByteArray());
}
@@ -1133,10 +1149,11 @@ public final class TransferClientTest {
public void read_empty() throws Exception {
createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
ListenableFuture<byte[]> future = transferClient.read(2);
- performReadStartHandshake(2, 5678);
- receiveReadChunks(newChunk(Chunk.Type.DATA, 5678).setRemainingBytes(0));
+ ReadTransfer transfer = transferClient.getReadTransferForTest(future);
+ performReadStartHandshake(transfer);
+ receiveReadChunks(newChunk(Chunk.Type.DATA, transfer.getSessionId()).setRemainingBytes(0));
- performReadCompletionHandshake(5678, Status.OK);
+ performReadCompletionHandshake(transfer.getSessionId(), Status.OK);
assertThat(future.get()).isEqualTo(new byte[] {});
}
@@ -1146,9 +1163,9 @@ public final class TransferClientTest {
createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
TransferParameters params = TransferParameters.create(3, 2, 1);
ListenableFuture<byte[]> future = transferClient.read(99, params);
+ ReadTransfer transfer = transferClient.getReadTransferForTest(future);
- assertThat(lastChunks())
- .containsExactly(initialReadChunk(99, ProtocolVersion.VERSION_TWO, params));
+ assertThat(lastChunks()).containsExactly(initialReadChunk(transfer));
assertThat(future.cancel(true)).isTrue();
}
@@ -1156,33 +1173,39 @@ public final class TransferClientTest {
public void read_severalChunks() throws Exception {
createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
ListenableFuture<byte[]> future = transferClient.read(7, TRANSFER_PARAMETERS);
+ ReadTransfer transfer = transferClient.getReadTransferForTest(future);
- performReadStartHandshake(7, 123, TRANSFER_PARAMETERS);
+ performReadStartHandshake(transfer);
- receiveReadChunks(
- newChunk(Chunk.Type.DATA, 123).setOffset(0).setData(range(0, 20)).setRemainingBytes(70),
- newChunk(Chunk.Type.DATA, 123).setOffset(20).setData(range(20, 40)));
+ receiveReadChunks(newChunk(Chunk.Type.DATA, transfer.getSessionId())
+ .setOffset(0)
+ .setData(range(0, 20))
+ .setRemainingBytes(70),
+ newChunk(Chunk.Type.DATA, transfer.getSessionId()).setOffset(20).setData(range(20, 40)));
assertThat(lastChunks())
- .containsExactly(newChunk(Chunk.Type.PARAMETERS_CONTINUE, 123)
+ .containsExactly(newChunk(Chunk.Type.PARAMETERS_CONTINUE, transfer.getSessionId())
.setOffset(40)
.setMaxChunkSizeBytes(30)
.setWindowEndOffset(90)
.build());
- receiveReadChunks(newChunk(Chunk.Type.DATA, 123).setOffset(40).setData(range(40, 70)));
+ receiveReadChunks(
+ newChunk(Chunk.Type.DATA, transfer.getSessionId()).setOffset(40).setData(range(40, 70)));
assertThat(lastChunks())
- .containsExactly(newChunk(Chunk.Type.PARAMETERS_CONTINUE, 123)
+ .containsExactly(newChunk(Chunk.Type.PARAMETERS_CONTINUE, transfer.getSessionId())
.setOffset(70)
.setMaxChunkSizeBytes(30)
.setWindowEndOffset(120)
.build());
- receiveReadChunks(
- newChunk(Chunk.Type.DATA, 123).setOffset(70).setData(range(70, 100)).setRemainingBytes(0));
+ receiveReadChunks(newChunk(Chunk.Type.DATA, transfer.getSessionId())
+ .setOffset(70)
+ .setData(range(70, 100))
+ .setRemainingBytes(0));
- performReadCompletionHandshake(123, Status.OK);
+ performReadCompletionHandshake(transfer.getSessionId(), Status.OK);
assertThat(future.get()).isEqualTo(TEST_DATA_100B.toByteArray());
}
@@ -1192,113 +1215,115 @@ public final class TransferClientTest {
createTransferClientThatMayTimeOut(ProtocolVersion.VERSION_TWO);
TransferParameters params = TransferParameters.create(50, 10, 0);
+ final int id = transferClient.getNextSessionIdForTest();
+
// Handshake
enqueueReadChunks(2, // Wait for read RPC open & START packet
- newChunk(Chunk.Type.START_ACK, 99)
+ newChunk(Chunk.Type.START_ACK, id)
.setResourceId(7)
.setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal()));
enqueueReadChunks(1, // Ignore the first START_ACK_CONFIRMATION
- newChunk(Chunk.Type.START_ACK, 99)
+ newChunk(Chunk.Type.START_ACK, id)
.setResourceId(7)
.setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal()));
// Window 1: server waits for START_ACK_CONFIRMATION, drops 2nd packet
enqueueReadChunks(1,
- newChunk(Chunk.Type.DATA, 99).setOffset(0).setData(range(0, 10)),
- newChunk(Chunk.Type.DATA, 99).setOffset(20).setData(range(20, 30)),
- newChunk(Chunk.Type.DATA, 99).setOffset(30).setData(range(30, 40)),
- newChunk(Chunk.Type.DATA, 99).setOffset(40).setData(range(40, 50)));
+ newChunk(Chunk.Type.DATA, id).setOffset(0).setData(range(0, 10)),
+ newChunk(Chunk.Type.DATA, id).setOffset(20).setData(range(20, 30)),
+ newChunk(Chunk.Type.DATA, id).setOffset(30).setData(range(30, 40)),
+ newChunk(Chunk.Type.DATA, id).setOffset(40).setData(range(40, 50)));
// Window 2: server waits for retransmit, drops 1st packet
enqueueReadChunks(1,
- newChunk(Chunk.Type.DATA, 99).setOffset(20).setData(range(20, 30)),
- newChunk(Chunk.Type.DATA, 99).setOffset(30).setData(range(30, 40)),
- newChunk(Chunk.Type.DATA, 99).setOffset(40).setData(range(40, 50)),
- newChunk(Chunk.Type.DATA, 99).setOffset(50).setData(range(50, 60)));
+ newChunk(Chunk.Type.DATA, id).setOffset(20).setData(range(20, 30)),
+ newChunk(Chunk.Type.DATA, id).setOffset(30).setData(range(30, 40)),
+ newChunk(Chunk.Type.DATA, id).setOffset(40).setData(range(40, 50)),
+ newChunk(Chunk.Type.DATA, id).setOffset(50).setData(range(50, 60)));
// Window 3: server waits for retransmit, drops last packet
enqueueReadChunks(1,
- newChunk(Chunk.Type.DATA, 99).setOffset(10).setData(range(10, 20)),
- newChunk(Chunk.Type.DATA, 99).setOffset(20).setData(range(20, 30)),
- newChunk(Chunk.Type.DATA, 99).setOffset(30).setData(range(30, 40)),
- newChunk(Chunk.Type.DATA, 99).setOffset(40).setData(range(40, 50)));
+ newChunk(Chunk.Type.DATA, id).setOffset(10).setData(range(10, 20)),
+ newChunk(Chunk.Type.DATA, id).setOffset(20).setData(range(20, 30)),
+ newChunk(Chunk.Type.DATA, id).setOffset(30).setData(range(30, 40)),
+ newChunk(Chunk.Type.DATA, id).setOffset(40).setData(range(40, 50)));
// Window 4: server waits for continue and retransmit, normal window.
enqueueReadChunks(2,
- newChunk(Chunk.Type.DATA, 99).setOffset(50).setData(range(50, 60)),
- newChunk(Chunk.Type.DATA, 99).setOffset(60).setData(range(60, 70)),
- newChunk(Chunk.Type.DATA, 99).setOffset(70).setData(range(70, 80)),
- newChunk(Chunk.Type.DATA, 99).setOffset(80).setData(range(80, 90)),
- newChunk(Chunk.Type.DATA, 99).setOffset(90).setData(range(90, 100)));
+ newChunk(Chunk.Type.DATA, id).setOffset(50).setData(range(50, 60)),
+ newChunk(Chunk.Type.DATA, id).setOffset(60).setData(range(60, 70)),
+ newChunk(Chunk.Type.DATA, id).setOffset(70).setData(range(70, 80)),
+ newChunk(Chunk.Type.DATA, id).setOffset(80).setData(range(80, 90)),
+ newChunk(Chunk.Type.DATA, id).setOffset(90).setData(range(90, 100)));
enqueueReadChunks(2, // Ignore continue and retransmit chunks, retry last packet in window
- newChunk(Chunk.Type.DATA, 99).setOffset(90).setData(range(90, 100)),
- newChunk(Chunk.Type.DATA, 99).setOffset(90).setData(range(90, 100)));
+ newChunk(Chunk.Type.DATA, id).setOffset(90).setData(range(90, 100)),
+ newChunk(Chunk.Type.DATA, id).setOffset(90).setData(range(90, 100)));
// Window 5: Final packet
enqueueReadChunks(2, // Receive two retries, then send final packet
- newChunk(Chunk.Type.DATA, 99).setOffset(100).setData(range(100, 110)).setRemainingBytes(0));
+ newChunk(Chunk.Type.DATA, id).setOffset(100).setData(range(100, 110)).setRemainingBytes(0));
enqueueReadChunks(1, // Ignore first COMPLETION packet
- newChunk(Chunk.Type.DATA, 99).setOffset(100).setData(range(100, 110)).setRemainingBytes(0));
- enqueueReadChunks(1, newChunk(Chunk.Type.COMPLETION_ACK, 99));
+ newChunk(Chunk.Type.DATA, id).setOffset(100).setData(range(100, 110)).setRemainingBytes(0));
+ enqueueReadChunks(1, newChunk(Chunk.Type.COMPLETION_ACK, id));
ListenableFuture<byte[]> future = transferClient.read(7, params);
- // assertThat(future.get()).isEqualTo(range(0, 110).toByteArray());
- while (!future.isDone()) {
- }
+ ReadTransfer transfer = transferClient.getReadTransferForTest(future);
+ assertThat(transfer.getSessionId()).isEqualTo(id);
+ assertThat(future.get()).isEqualTo(range(0, 110).toByteArray());
assertThat(lastChunks())
.containsExactly(
// Handshake
- initialReadChunk(7, ProtocolVersion.VERSION_TWO, params),
- readStartAckConfirmation(99, params),
- readStartAckConfirmation(99, params),
+ initialReadChunk(transfer),
+ readStartAckConfirmation(id, params),
+ readStartAckConfirmation(id, params),
// Window 1: send one transfer parameters update after the drop
- newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 99)
+ newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, id)
.setOffset(10)
.setWindowEndOffset(60)
.setMaxChunkSizeBytes(10)
.build(),
// Window 2: send one transfer parameters update after the drop
- newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 99)
+ newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, id)
.setOffset(10)
.setWindowEndOffset(60)
.setMaxChunkSizeBytes(10)
.build(),
// Window 3: send one transfer parameters update after the drop, then continue packet
- newChunk(Chunk.Type.PARAMETERS_CONTINUE, 99) // Not seen by server
+ newChunk(Chunk.Type.PARAMETERS_CONTINUE, id) // Not seen by server
.setOffset(40)
.setWindowEndOffset(90)
.setMaxChunkSizeBytes(10)
.build(),
- newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 99) // Sent after timeout
+ newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, id) // Sent after timeout
.setOffset(50)
.setWindowEndOffset(100)
.setMaxChunkSizeBytes(10)
.build(),
// Window 4: send one transfer parameters update after the drop, then continue packet
- newChunk(Chunk.Type.PARAMETERS_CONTINUE, 99) // Ignored by server
+ newChunk(Chunk.Type.PARAMETERS_CONTINUE, id) // Ignored by server
.setOffset(80)
.setWindowEndOffset(130)
.setMaxChunkSizeBytes(10)
.build(),
- newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 99) // Sent after last packet
+ newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, id) // Sent after last packet
.setOffset(100)
.setWindowEndOffset(150)
.setMaxChunkSizeBytes(10)
.build(),
- newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 99) // Sent due to repeated packet
+ newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, id) // Sent due to repeated packet
.setOffset(100)
.setWindowEndOffset(150)
.setMaxChunkSizeBytes(10)
.build(),
- newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 99) // Sent due to repeated packet
+ newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, id) // Sent due to repeated packet
.setOffset(100)
.setWindowEndOffset(150)
.setMaxChunkSizeBytes(10)
.build(),
// Window 5: final packet and closing handshake
- newChunk(Chunk.Type.COMPLETION, 99).setStatus(Status.OK.ordinal()).build(),
- newChunk(Chunk.Type.COMPLETION, 99).setStatus(Status.OK.ordinal()).build());
+ newChunk(Chunk.Type.COMPLETION, id).setStatus(Status.OK.ordinal()).build(),
+ newChunk(Chunk.Type.COMPLETION, id).setStatus(Status.OK.ordinal()).build());
}
@Test
@@ -1306,21 +1331,31 @@ public final class TransferClientTest {
createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
ListenableFuture<byte[]> future =
transferClient.read(123, TRANSFER_PARAMETERS, progressCallback);
+ ReadTransfer transfer = transferClient.getReadTransferForTest(future);
- performReadStartHandshake(123, 123, TRANSFER_PARAMETERS);
+ performReadStartHandshake(transfer);
- receiveReadChunks(newChunk(Chunk.Type.DATA, 123).setOffset(0).setData(range(0, 30)),
- newChunk(Chunk.Type.DATA, 123).setOffset(30).setData(range(30, 50)),
- newChunk(Chunk.Type.DATA, 123).setOffset(50).setData(range(50, 60)).setRemainingBytes(5),
- newChunk(Chunk.Type.DATA, 123).setOffset(60).setData(range(60, 70)),
- newChunk(Chunk.Type.DATA, 123).setOffset(70).setData(range(70, 80)).setRemainingBytes(20),
- newChunk(Chunk.Type.DATA, 123).setOffset(90).setData(range(90, 100)),
- newChunk(Chunk.Type.DATA, 123).setOffset(0).setData(range(0, 30)));
+ receiveReadChunks(
+ newChunk(Chunk.Type.DATA, transfer.getSessionId()).setOffset(0).setData(range(0, 30)),
+ newChunk(Chunk.Type.DATA, transfer.getSessionId()).setOffset(30).setData(range(30, 50)),
+ newChunk(Chunk.Type.DATA, transfer.getSessionId())
+ .setOffset(50)
+ .setData(range(50, 60))
+ .setRemainingBytes(5),
+ newChunk(Chunk.Type.DATA, transfer.getSessionId()).setOffset(60).setData(range(60, 70)),
+ newChunk(Chunk.Type.DATA, transfer.getSessionId())
+ .setOffset(70)
+ .setData(range(70, 80))
+ .setRemainingBytes(20),
+ newChunk(Chunk.Type.DATA, transfer.getSessionId()).setOffset(90).setData(range(90, 100)),
+ newChunk(Chunk.Type.DATA, transfer.getSessionId()).setOffset(0).setData(range(0, 30)));
lastChunks(); // Discard chunks; no need to inspect for this test
- receiveReadChunks(
- newChunk(Chunk.Type.DATA, 123).setOffset(80).setData(range(80, 100)).setRemainingBytes(0));
- performReadCompletionHandshake(123, Status.OK);
+ receiveReadChunks(newChunk(Chunk.Type.DATA, transfer.getSessionId())
+ .setOffset(80)
+ .setData(range(80, 100))
+ .setRemainingBytes(0));
+ performReadCompletionHandshake(transfer.getSessionId(), Status.OK);
verify(progressCallback, times(6)).accept(progress.capture());
assertThat(progress.getAllValues())
@@ -1338,51 +1373,59 @@ public final class TransferClientTest {
public void read_rewindWhenPacketsSkipped() throws Exception {
createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
ListenableFuture<byte[]> future = transferClient.read(123, TRANSFER_PARAMETERS);
+ ReadTransfer transfer = transferClient.getReadTransferForTest(future);
- performReadStartHandshake(123, 123, TRANSFER_PARAMETERS);
+ performReadStartHandshake(transfer);
- receiveReadChunks(newChunk(Chunk.Type.DATA, 123).setOffset(50).setData(range(30, 50)));
+ receiveReadChunks(
+ newChunk(Chunk.Type.DATA, transfer.getSessionId()).setOffset(50).setData(range(30, 50)));
assertThat(lastChunks())
- .containsExactly(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123)
+ .containsExactly(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, transfer.getSessionId())
.setWindowEndOffset(50)
.setMaxChunkSizeBytes(30)
.setOffset(0)
.build());
- receiveReadChunks(newChunk(Chunk.Type.DATA, 123).setOffset(0).setData(range(0, 30)),
- newChunk(Chunk.Type.DATA, 123).setOffset(30).setData(range(30, 50)));
+ receiveReadChunks(
+ newChunk(Chunk.Type.DATA, transfer.getSessionId()).setOffset(0).setData(range(0, 30)),
+ newChunk(Chunk.Type.DATA, transfer.getSessionId()).setOffset(30).setData(range(30, 50)));
assertThat(lastChunks())
- .containsExactly(newChunk(Chunk.Type.PARAMETERS_CONTINUE, 123)
+ .containsExactly(newChunk(Chunk.Type.PARAMETERS_CONTINUE, transfer.getSessionId())
.setOffset(30)
.setWindowEndOffset(80)
.setMaxChunkSizeBytes(30)
.build());
- receiveReadChunks(
- newChunk(Chunk.Type.DATA, 123).setOffset(80).setData(range(80, 100)).setRemainingBytes(0));
+ receiveReadChunks(newChunk(Chunk.Type.DATA, transfer.getSessionId())
+ .setOffset(80)
+ .setData(range(80, 100))
+ .setRemainingBytes(0));
assertThat(lastChunks())
- .containsExactly(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123)
+ .containsExactly(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, transfer.getSessionId())
.setOffset(50)
.setWindowEndOffset(100)
.setMaxChunkSizeBytes(30)
.build());
- receiveReadChunks(newChunk(Chunk.Type.DATA, 123).setOffset(50).setData(range(50, 80)));
+ receiveReadChunks(
+ newChunk(Chunk.Type.DATA, transfer.getSessionId()).setOffset(50).setData(range(50, 80)));
assertThat(lastChunks())
- .containsExactly(newChunk(Chunk.Type.PARAMETERS_CONTINUE, 123)
+ .containsExactly(newChunk(Chunk.Type.PARAMETERS_CONTINUE, transfer.getSessionId())
.setOffset(80)
.setWindowEndOffset(130)
.setMaxChunkSizeBytes(30)
.build());
- receiveReadChunks(
- newChunk(Chunk.Type.DATA, 123).setOffset(80).setData(range(80, 100)).setRemainingBytes(0));
+ receiveReadChunks(newChunk(Chunk.Type.DATA, transfer.getSessionId())
+ .setOffset(80)
+ .setData(range(80, 100))
+ .setRemainingBytes(0));
- performReadCompletionHandshake(123, Status.OK);
+ performReadCompletionHandshake(transfer.getSessionId(), Status.OK);
assertThat(future.get()).isEqualTo(TEST_DATA_100B.toByteArray());
}
@@ -1392,15 +1435,16 @@ public final class TransferClientTest {
createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
for (int i = 0; i < 3; ++i) {
ListenableFuture<byte[]> future = transferClient.read(1);
+ ReadTransfer transfer = transferClient.getReadTransferForTest(future);
- performReadStartHandshake(1, 100 + i);
+ performReadStartHandshake(transfer);
- receiveReadChunks(newChunk(Chunk.Type.DATA, 100 + i)
+ receiveReadChunks(newChunk(Chunk.Type.DATA, transfer.getSessionId())
.setOffset(0)
.setData(TEST_DATA_SHORT)
.setRemainingBytes(0));
- performReadCompletionHandshake(100 + i, Status.OK);
+ performReadCompletionHandshake(transfer.getSessionId(), Status.OK);
assertThat(future.get()).isEqualTo(TEST_DATA_SHORT.toByteArray());
}
@@ -1435,15 +1479,18 @@ public final class TransferClientTest {
public void read_sendErrorOnLaterPacket_aborts() {
createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
ListenableFuture<byte[]> future = transferClient.read(1024, TRANSFER_PARAMETERS);
+ ReadTransfer transfer = transferClient.getReadTransferForTest(future);
- performReadStartHandshake(1024, 123, TRANSFER_PARAMETERS);
+ performReadStartHandshake(transfer);
- receiveReadChunks(newChunk(Chunk.Type.DATA, 123).setOffset(0).setData(range(0, 20)));
+ receiveReadChunks(
+ newChunk(Chunk.Type.DATA, transfer.getSessionId()).setOffset(0).setData(range(0, 20)));
ChannelOutputException exception = new ChannelOutputException("blah");
rpcClient.setChannelOutputException(exception);
- receiveReadChunks(newChunk(Chunk.Type.DATA, 123).setOffset(20).setData(range(20, 50)));
+ receiveReadChunks(
+ newChunk(Chunk.Type.DATA, transfer.getSessionId()).setOffset(20).setData(range(20, 50)));
ExecutionException thrown = assertThrows(ExecutionException.class, future::get);
assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class);
@@ -1454,21 +1501,22 @@ public final class TransferClientTest {
public void read_cancelFuture_abortsTransfer() {
createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
ListenableFuture<byte[]> future = transferClient.read(1, TRANSFER_PARAMETERS);
+ ReadTransfer transfer = transferClient.getReadTransferForTest(future);
- performReadStartHandshake(1, 123, TRANSFER_PARAMETERS);
+ performReadStartHandshake(transfer);
assertThat(future.cancel(true)).isTrue();
- assertThat(lastChunks()).contains(finalChunk(123, Status.CANCELLED));
+ assertThat(lastChunks()).contains(finalChunk(transfer.getSessionId(), Status.CANCELLED));
}
@Test
public void read_immediateTransferProtocolError_aborts() {
createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
ListenableFuture<byte[]> future = transferClient.read(123);
+ ReadTransfer transfer = transferClient.getReadTransferForTest(future);
- // Resource ID will be set since session ID hasn't been assigned yet.
- receiveReadChunks(newChunk(Chunk.Type.COMPLETION, VersionedChunk.UNASSIGNED_SESSION_ID)
+ receiveReadChunks(newChunk(Chunk.Type.COMPLETION, transfer.getSessionId())
.setResourceId(123)
.setStatus(Status.ALREADY_EXISTS.ordinal()));
@@ -1482,10 +1530,11 @@ public final class TransferClientTest {
public void read_laterTransferProtocolError_aborts() {
createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
ListenableFuture<byte[]> future = transferClient.read(123);
+ ReadTransfer transfer = transferClient.getReadTransferForTest(future);
- performReadStartHandshake(123, 514);
+ performReadStartHandshake(transfer);
- receiveReadChunks(finalChunk(514, Status.ALREADY_EXISTS));
+ receiveReadChunks(finalChunk(transfer.getSessionId(), Status.ALREADY_EXISTS));
ExecutionException thrown = assertThrows(ExecutionException.class, future::get);
assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class);
@@ -1508,14 +1557,16 @@ public final class TransferClientTest {
public void read_serverRespondsWithUnknownVersion_invalidArgument() {
createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
ListenableFuture<byte[]> future = transferClient.read(2, TRANSFER_PARAMETERS);
+ ReadTransfer transfer = transferClient.getReadTransferForTest(future);
- assertThat(lastChunks())
- .containsExactly(initialReadChunk(2, ProtocolVersion.VERSION_TWO, TRANSFER_PARAMETERS));
+ assertThat(lastChunks()).containsExactly(initialReadChunk(transfer));
- receiveReadChunks(
- newChunk(Chunk.Type.START_ACK, 99).setResourceId(2).setProtocolVersion(600613));
+ receiveReadChunks(newChunk(Chunk.Type.START_ACK, transfer.getSessionId())
+ .setResourceId(2)
+ .setProtocolVersion(600613));
- assertThat(lastChunks()).containsExactly(finalChunk(99, Status.INVALID_ARGUMENT));
+ assertThat(lastChunks())
+ .containsExactly(finalChunk(transfer.getSessionId(), Status.INVALID_ARGUMENT));
ExecutionException exception = assertThrows(ExecutionException.class, future::get);
assertThat(((TransferError) exception.getCause()).status()).isEqualTo(Status.INVALID_ARGUMENT);
@@ -1525,6 +1576,7 @@ public final class TransferClientTest {
public void read_timeout() {
createTransferClientThatMayTimeOut(ProtocolVersion.VERSION_TWO);
ListenableFuture<byte[]> future = transferClient.read(123, TRANSFER_PARAMETERS);
+ ReadTransfer transfer = transferClient.getReadTransferForTest(future);
// Call future.get() without sending any server-side packets.
ExecutionException exception = assertThrows(ExecutionException.class, future::get);
@@ -1532,45 +1584,65 @@ public final class TransferClientTest {
// read should have retried sending the transfer parameters 2 times, for a total of 3
assertThat(lastChunks())
- .containsExactly(initialReadChunk(123, ProtocolVersion.VERSION_TWO),
- initialReadChunk(123, ProtocolVersion.VERSION_TWO),
- initialReadChunk(123, ProtocolVersion.VERSION_TWO));
+ .containsExactly(
+ initialReadChunk(transfer), initialReadChunk(transfer), initialReadChunk(transfer));
+ }
+
+ @Test
+ public void read_generatesUniqueSessionIds() {
+ createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
+
+ int sessionId1 = transferClient.getNextSessionIdForTest();
+ ReadTransfer transfer1 = transferClient.getReadTransferForTest(transferClient.read(123));
+ assertThat(sessionId1).isEqualTo(transfer1.getSessionId());
+
+ int sessionId2 = transferClient.getNextSessionIdForTest();
+ ReadTransfer transfer2 = transferClient.getReadTransferForTest(transferClient.read(456));
+ assertThat(sessionId2).isEqualTo(transfer2.getSessionId());
+
+ int sessionId3 = transferClient.getNextSessionIdForTest();
+ ReadTransfer transfer3 = transferClient.getReadTransferForTest(transferClient.read(789));
+ assertThat(sessionId3).isEqualTo(transfer3.getSessionId());
+
+ assertThat(sessionId1).isNotEqualTo(sessionId2);
+ assertThat(sessionId1).isNotEqualTo(sessionId3);
}
@Test
public void write_singleChunk() throws Exception {
createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
ListenableFuture<Void> future = transferClient.write(2, TEST_DATA_SHORT.toByteArray());
+ WriteTransfer transfer = transferClient.getWriteTransferForTest(future);
// Do the start handshake (equivalent to performWriteStartHandshake()).
- assertThat(lastChunks())
- .containsExactly(initialWriteChunk(2, ProtocolVersion.VERSION_TWO, TEST_DATA_SHORT.size()));
+ assertThat(lastChunks()).containsExactly(initialWriteChunk(transfer, TEST_DATA_SHORT.size()));
- receiveWriteChunks(newChunk(Chunk.Type.START_ACK, 123)
+ receiveWriteChunks(newChunk(Chunk.Type.START_ACK, transfer.getSessionId())
.setResourceId(2)
.setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal()));
assertThat(lastChunks())
- .containsExactly(newChunk(Chunk.Type.START_ACK_CONFIRMATION, 123)
+ .containsExactly(newChunk(Chunk.Type.START_ACK_CONFIRMATION, transfer.getSessionId())
.setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal())
.setRemainingBytes(TEST_DATA_SHORT.size())
.build());
- receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123)
+ receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, transfer.getSessionId())
.setOffset(0)
.setWindowEndOffset(1024)
.setMaxChunkSizeBytes(128));
assertThat(lastChunks())
- .containsExactly(newChunk(Chunk.Type.DATA, 123)
+ .containsExactly(newChunk(Chunk.Type.DATA, transfer.getSessionId())
.setOffset(0)
.setData(TEST_DATA_SHORT)
.setRemainingBytes(0)
.build());
- receiveWriteChunks(finalChunk(123, Status.OK));
+ receiveWriteChunks(finalChunk(transfer.getSessionId(), Status.OK));
- assertThat(lastChunks()).containsExactly(newChunk(Chunk.Type.COMPLETION_ACK, 123).build());
+ assertThat(lastChunks())
+ .containsExactly(newChunk(Chunk.Type.COMPLETION_ACK, transfer.getSessionId()).build());
assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown.
}
@@ -1579,9 +1651,9 @@ public final class TransferClientTest {
public void write_requestV2ReceiveLegacy() throws Exception {
createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
ListenableFuture<Void> future = transferClient.write(2, TEST_DATA_SHORT.toByteArray());
+ WriteTransfer transfer = transferClient.getWriteTransferForTest(future);
- assertThat(lastChunks())
- .containsExactly(initialWriteChunk(2, ProtocolVersion.VERSION_TWO, TEST_DATA_SHORT.size()));
+ assertThat(lastChunks()).containsExactly(initialWriteChunk(transfer, TEST_DATA_SHORT.size()));
receiveWriteChunks(newLegacyChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 2)
.setOffset(0)
@@ -1596,10 +1668,11 @@ public final class TransferClientTest {
public void write_platformTransferDisabled_aborted() {
createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
ListenableFuture<Void> future = transferClient.write(2, TEST_DATA_SHORT.toByteArray());
+ WriteTransfer transfer = transferClient.getWriteTransferForTest(future);
assertThat(future.isDone()).isFalse();
shouldAbortFlag = true;
- receiveWriteChunks(newChunk(Chunk.Type.START_ACK, 3).setResourceId(2));
+ receiveWriteChunks(newChunk(Chunk.Type.START_ACK, transfer.getSessionId()).setResourceId(2));
ExecutionException thrown = assertThrows(ExecutionException.class, future::get);
assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class);
@@ -1610,23 +1683,21 @@ public final class TransferClientTest {
public void write_failedPreconditionError_retriesInitialPacket() {
createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
ListenableFuture<Void> future = transferClient.write(2, TEST_DATA_SHORT.toByteArray());
+ WriteTransfer transfer = transferClient.getWriteTransferForTest(future);
- assertThat(lastChunks())
- .containsExactly(initialWriteChunk(2, ProtocolVersion.VERSION_TWO, TEST_DATA_SHORT.size()));
+ assertThat(lastChunks()).containsExactly(initialWriteChunk(transfer, TEST_DATA_SHORT.size()));
for (int i = 0; i < MAX_RETRIES; ++i) {
receiveWriteServerError(Status.FAILED_PRECONDITION);
- assertThat(lastChunks())
- .containsExactly(
- initialWriteChunk(2, ProtocolVersion.VERSION_TWO, TEST_DATA_SHORT.size()));
+ assertThat(lastChunks()).containsExactly(initialWriteChunk(transfer, TEST_DATA_SHORT.size()));
}
- receiveWriteChunks(newChunk(Chunk.Type.START_ACK, 54321)
+ receiveWriteChunks(newChunk(Chunk.Type.START_ACK, transfer.getSessionId())
.setResourceId(2)
.setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal()));
assertThat(lastChunks())
- .containsExactly(newChunk(Chunk.Type.START_ACK_CONFIRMATION, 54321)
+ .containsExactly(newChunk(Chunk.Type.START_ACK_CONFIRMATION, transfer.getSessionId())
.setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal())
.setRemainingBytes(TEST_DATA_SHORT.size())
.build());
@@ -1636,11 +1707,11 @@ public final class TransferClientTest {
public void write_failedPreconditionError_abortsAfterInitialPacket() {
createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
ListenableFuture<Void> future = transferClient.write(2, TEST_DATA_100B.toByteArray());
+ WriteTransfer transfer = transferClient.getWriteTransferForTest(future);
- assertThat(lastChunks())
- .containsExactly(initialWriteChunk(2, ProtocolVersion.VERSION_TWO, TEST_DATA_100B.size()));
+ assertThat(lastChunks()).containsExactly(initialWriteChunk(transfer, TEST_DATA_100B.size()));
- receiveWriteChunks(newChunk(Chunk.Type.START_ACK, 4)
+ receiveWriteChunks(newChunk(Chunk.Type.START_ACK, transfer.getSessionId())
.setResourceId(2)
.setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal()));
@@ -1655,12 +1726,13 @@ public final class TransferClientTest {
public void write_failedPreconditionErrorMaxRetriesTimes_aborts() {
createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
ListenableFuture<Void> future = transferClient.write(2, TEST_DATA_SHORT.toByteArray());
+ WriteTransfer transfer = transferClient.getWriteTransferForTest(future);
for (int i = 0; i < MAX_RETRIES; ++i) {
receiveWriteServerError(Status.FAILED_PRECONDITION);
}
- Chunk initialChunk = initialWriteChunk(2, ProtocolVersion.VERSION_TWO, TEST_DATA_SHORT.size());
+ Chunk initialChunk = initialWriteChunk(transfer, TEST_DATA_SHORT.size());
assertThat(lastChunks())
.containsExactlyElementsIn(Collections.nCopies(1 + MAX_RETRIES, initialChunk));
@@ -1677,12 +1749,14 @@ public final class TransferClientTest {
public void write_empty() throws Exception {
createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
ListenableFuture<Void> future = transferClient.write(2, new byte[] {});
+ WriteTransfer transfer = transferClient.getWriteTransferForTest(future);
- performWriteStartHandshake(2, 123, 0);
+ performWriteStartHandshake(transfer, 0);
- receiveWriteChunks(finalChunk(123, Status.OK));
+ receiveWriteChunks(finalChunk(transfer.getSessionId(), Status.OK));
- assertThat(lastChunks()).containsExactly(newChunk(Chunk.Type.COMPLETION_ACK, 123).build());
+ assertThat(lastChunks())
+ .containsExactly(newChunk(Chunk.Type.COMPLETION_ACK, transfer.getSessionId()).build());
assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown.
}
@@ -1691,34 +1765,47 @@ public final class TransferClientTest {
public void write_severalChunks() throws Exception {
createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
ListenableFuture<Void> future = transferClient.write(500, TEST_DATA_100B.toByteArray());
+ WriteTransfer transfer = transferClient.getWriteTransferForTest(future);
- performWriteStartHandshake(500, 123, TEST_DATA_100B.size());
+ performWriteStartHandshake(transfer, TEST_DATA_100B.size());
- receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123)
+ receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, transfer.getSessionId())
.setOffset(0)
.setWindowEndOffset(50)
.setMaxChunkSizeBytes(30)
.setMinDelayMicroseconds(1));
assertThat(lastChunks())
- .containsExactly(newChunk(Chunk.Type.DATA, 123).setOffset(0).setData(range(0, 30)).build(),
- newChunk(Chunk.Type.DATA, 123).setOffset(30).setData(range(30, 50)).build());
+ .containsExactly(newChunk(Chunk.Type.DATA, transfer.getSessionId())
+ .setOffset(0)
+ .setData(range(0, 30))
+ .build(),
+ newChunk(Chunk.Type.DATA, transfer.getSessionId())
+ .setOffset(30)
+ .setData(range(30, 50))
+ .build());
- receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123)
+ receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, transfer.getSessionId())
.setOffset(50)
.setWindowEndOffset(90)
.setMaxChunkSizeBytes(25));
assertThat(lastChunks())
- .containsExactly(
- newChunk(Chunk.Type.DATA, 123).setOffset(50).setData(range(50, 75)).build(),
- newChunk(Chunk.Type.DATA, 123).setOffset(75).setData(range(75, 90)).build());
+ .containsExactly(newChunk(Chunk.Type.DATA, transfer.getSessionId())
+ .setOffset(50)
+ .setData(range(50, 75))
+ .build(),
+ newChunk(Chunk.Type.DATA, transfer.getSessionId())
+ .setOffset(75)
+ .setData(range(75, 90))
+ .build());
- receiveWriteChunks(
- newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123).setOffset(90).setWindowEndOffset(140));
+ receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, transfer.getSessionId())
+ .setOffset(90)
+ .setWindowEndOffset(140));
assertThat(lastChunks())
- .containsExactly(newChunk(Chunk.Type.DATA, 123)
+ .containsExactly(newChunk(Chunk.Type.DATA, transfer.getSessionId())
.setOffset(90)
.setData(range(90, 100))
.setRemainingBytes(0)
@@ -1726,9 +1813,10 @@ public final class TransferClientTest {
assertThat(future.isDone()).isFalse();
- receiveWriteChunks(finalChunk(123, Status.OK));
+ receiveWriteChunks(finalChunk(transfer.getSessionId(), Status.OK));
- assertThat(lastChunks()).containsExactly(newChunk(Chunk.Type.COMPLETION_ACK, 123).build());
+ assertThat(lastChunks())
+ .containsExactly(newChunk(Chunk.Type.COMPLETION_ACK, transfer.getSessionId()).build());
assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown.
}
@@ -1737,32 +1825,43 @@ public final class TransferClientTest {
public void write_parametersContinue() throws Exception {
createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
ListenableFuture<Void> future = transferClient.write(321, TEST_DATA_100B.toByteArray());
+ WriteTransfer transfer = transferClient.getWriteTransferForTest(future);
- performWriteStartHandshake(321, 123, TEST_DATA_100B.size());
+ performWriteStartHandshake(transfer, TEST_DATA_100B.size());
- receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123)
+ receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, transfer.getSessionId())
.setOffset(0)
.setWindowEndOffset(50)
.setMaxChunkSizeBytes(30)
.setMinDelayMicroseconds(1));
assertThat(lastChunks())
- .containsExactly(newChunk(Chunk.Type.DATA, 123).setOffset(0).setData(range(0, 30)).build(),
- newChunk(Chunk.Type.DATA, 123).setOffset(30).setData(range(30, 50)).build());
+ .containsExactly(newChunk(Chunk.Type.DATA, transfer.getSessionId())
+ .setOffset(0)
+ .setData(range(0, 30))
+ .build(),
+ newChunk(Chunk.Type.DATA, transfer.getSessionId())
+ .setOffset(30)
+ .setData(range(30, 50))
+ .build());
- receiveWriteChunks(
- newChunk(Chunk.Type.PARAMETERS_CONTINUE, 123).setOffset(30).setWindowEndOffset(80));
+ receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_CONTINUE, transfer.getSessionId())
+ .setOffset(30)
+ .setWindowEndOffset(80));
// Transfer doesn't roll back to offset 30 but instead continues sending up to 80.
assertThat(lastChunks())
- .containsExactly(
- newChunk(Chunk.Type.DATA, 123).setOffset(50).setData(range(50, 80)).build());
+ .containsExactly(newChunk(Chunk.Type.DATA, transfer.getSessionId())
+ .setOffset(50)
+ .setData(range(50, 80))
+ .build());
- receiveWriteChunks(
- newChunk(Chunk.Type.PARAMETERS_CONTINUE, 123).setOffset(80).setWindowEndOffset(130));
+ receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_CONTINUE, transfer.getSessionId())
+ .setOffset(80)
+ .setWindowEndOffset(130));
assertThat(lastChunks())
- .containsExactly(newChunk(Chunk.Type.DATA, 123)
+ .containsExactly(newChunk(Chunk.Type.DATA, transfer.getSessionId())
.setOffset(80)
.setData(range(80, 100))
.setRemainingBytes(0)
@@ -1770,9 +1869,10 @@ public final class TransferClientTest {
assertThat(future.isDone()).isFalse();
- receiveWriteChunks(finalChunk(123, Status.OK));
+ receiveWriteChunks(finalChunk(transfer.getSessionId(), Status.OK));
- assertThat(lastChunks()).containsExactly(newChunk(Chunk.Type.COMPLETION_ACK, 123).build());
+ assertThat(lastChunks())
+ .containsExactly(newChunk(Chunk.Type.COMPLETION_ACK, transfer.getSessionId()).build());
assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown.
}
@@ -1781,33 +1881,42 @@ public final class TransferClientTest {
public void write_continuePacketWithWindowEndBeforeOffsetIsIgnored() throws Exception {
createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
ListenableFuture<Void> future = transferClient.write(123, TEST_DATA_100B.toByteArray());
+ WriteTransfer transfer = transferClient.getWriteTransferForTest(future);
- performWriteStartHandshake(123, 555, TEST_DATA_100B.size());
+ performWriteStartHandshake(transfer, TEST_DATA_100B.size());
- receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 555)
+ receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, transfer.getSessionId())
.setOffset(0)
.setWindowEndOffset(90)
.setMaxChunkSizeBytes(90)
.setMinDelayMicroseconds(1));
assertThat(lastChunks())
- .containsExactly(newChunk(Chunk.Type.DATA, 555).setOffset(0).setData(range(0, 90)).build());
+ .containsExactly(newChunk(Chunk.Type.DATA, transfer.getSessionId())
+ .setOffset(0)
+ .setData(range(0, 90))
+ .build());
receiveWriteChunks(
// This stale packet with a window end before the offset should be ignored.
- newChunk(Chunk.Type.PARAMETERS_CONTINUE, 555).setOffset(25).setWindowEndOffset(50),
+ newChunk(Chunk.Type.PARAMETERS_CONTINUE, transfer.getSessionId())
+ .setOffset(25)
+ .setWindowEndOffset(50),
// Start from an arbitrary offset before the current, but extend the window to the end.
- newChunk(Chunk.Type.PARAMETERS_CONTINUE, 555).setOffset(80).setWindowEndOffset(100));
+ newChunk(Chunk.Type.PARAMETERS_CONTINUE, transfer.getSessionId())
+ .setOffset(80)
+ .setWindowEndOffset(100));
assertThat(lastChunks())
- .containsExactly(newChunk(Chunk.Type.DATA, 555)
+ .containsExactly(newChunk(Chunk.Type.DATA, transfer.getSessionId())
.setOffset(90)
.setData(range(90, 100))
.setRemainingBytes(0)
.build());
- receiveWriteChunks(finalChunk(555, Status.OK));
- assertThat(lastChunks()).containsExactly(newChunk(Chunk.Type.COMPLETION_ACK, 555).build());
+ receiveWriteChunks(finalChunk(transfer.getSessionId(), Status.OK));
+ assertThat(lastChunks())
+ .containsExactly(newChunk(Chunk.Type.COMPLETION_ACK, transfer.getSessionId()).build());
assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown.
}
@@ -1817,15 +1926,18 @@ public final class TransferClientTest {
createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
ListenableFuture<Void> future =
transferClient.write(123, TEST_DATA_100B.toByteArray(), progressCallback);
+ WriteTransfer transfer = transferClient.getWriteTransferForTest(future);
- performWriteStartHandshake(123, 123, TEST_DATA_100B.size());
+ performWriteStartHandshake(transfer, TEST_DATA_100B.size());
- receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123)
+ receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, transfer.getSessionId())
.setOffset(0)
.setWindowEndOffset(90)
.setMaxChunkSizeBytes(30),
- newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123).setOffset(50).setWindowEndOffset(100),
- finalChunk(123, Status.OK));
+ newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, transfer.getSessionId())
+ .setOffset(50)
+ .setWindowEndOffset(100),
+ finalChunk(transfer.getSessionId(), Status.OK));
verify(progressCallback, times(6)).accept(progress.capture());
assertThat(progress.getAllValues())
@@ -1842,17 +1954,20 @@ public final class TransferClientTest {
public void write_asksForFinalOffset_sendsFinalPacket() {
createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
ListenableFuture<Void> future = transferClient.write(123, TEST_DATA_100B.toByteArray());
+ WriteTransfer transfer = transferClient.getWriteTransferForTest(future);
- performWriteStartHandshake(123, 456, TEST_DATA_100B.size());
+ performWriteStartHandshake(transfer, TEST_DATA_100B.size());
- receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 456)
+ receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, transfer.getSessionId())
.setOffset(100)
.setWindowEndOffset(140)
.setMaxChunkSizeBytes(25));
assertThat(lastChunks())
- .containsExactly(
- newChunk(Chunk.Type.DATA, 456).setOffset(100).setRemainingBytes(0).build());
+ .containsExactly(newChunk(Chunk.Type.DATA, transfer.getSessionId())
+ .setOffset(100)
+ .setRemainingBytes(0)
+ .build());
}
@Test
@@ -1860,17 +1975,21 @@ public final class TransferClientTest {
createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
for (int i = 0; i < 3; ++i) {
ListenableFuture<Void> future = transferClient.write(6, TEST_DATA_SHORT.toByteArray());
+ WriteTransfer transfer = transferClient.getWriteTransferForTest(future);
- performWriteStartHandshake(6, 123, TEST_DATA_SHORT.size());
+ performWriteStartHandshake(transfer, TEST_DATA_SHORT.size());
- receiveWriteChunks(
- newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123).setOffset(0).setWindowEndOffset(50),
- finalChunk(123, Status.OK));
+ receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, transfer.getSessionId())
+ .setOffset(0)
+ .setWindowEndOffset(50),
+ finalChunk(transfer.getSessionId(), Status.OK));
assertThat(lastChunks())
- .containsExactly(
- newChunk(Chunk.Type.DATA, 123).setData(TEST_DATA_SHORT).setRemainingBytes(0).build(),
- newChunk(Chunk.Type.COMPLETION_ACK, 123).build());
+ .containsExactly(newChunk(Chunk.Type.DATA, transfer.getSessionId())
+ .setData(TEST_DATA_SHORT)
+ .setRemainingBytes(0)
+ .build(),
+ newChunk(Chunk.Type.COMPLETION_ACK, transfer.getSessionId()).build());
future.get();
}
@@ -1905,13 +2024,16 @@ public final class TransferClientTest {
public void write_serviceRequestsNoData_aborts() {
createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
ListenableFuture<Void> future = transferClient.write(7, TEST_DATA_SHORT.toByteArray());
+ WriteTransfer transfer = transferClient.getWriteTransferForTest(future);
- performWriteStartHandshake(7, 123, TEST_DATA_SHORT.size());
+ performWriteStartHandshake(transfer, TEST_DATA_SHORT.size());
- receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123).setOffset(0));
+ receiveWriteChunks(
+ newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, transfer.getSessionId()).setOffset(0));
- assertThat(lastChunks()).containsExactly(finalChunk(123, Status.INVALID_ARGUMENT));
- receiveWriteChunks(newChunk(Chunk.Type.COMPLETION_ACK, 123));
+ assertThat(lastChunks())
+ .containsExactly(finalChunk(transfer.getSessionId(), Status.INVALID_ARGUMENT));
+ receiveWriteChunks(newChunk(Chunk.Type.COMPLETION_ACK, transfer.getSessionId()));
ExecutionException thrown = assertThrows(ExecutionException.class, future::get);
assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.INVALID_ARGUMENT);
@@ -1921,16 +2043,18 @@ public final class TransferClientTest {
public void write_invalidOffset_aborts() {
createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
ListenableFuture<Void> future = transferClient.write(7, TEST_DATA_100B.toByteArray());
+ WriteTransfer transfer = transferClient.getWriteTransferForTest(future);
- performWriteStartHandshake(7, 123, TEST_DATA_100B.size());
+ performWriteStartHandshake(transfer, TEST_DATA_100B.size());
- receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123)
+ receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, transfer.getSessionId())
.setOffset(101)
.setWindowEndOffset(141)
.setMaxChunkSizeBytes(25));
- assertThat(lastChunks()).containsExactly(finalChunk(123, Status.OUT_OF_RANGE));
- receiveWriteChunks(newChunk(Chunk.Type.COMPLETION_ACK, 123));
+ assertThat(lastChunks())
+ .containsExactly(finalChunk(transfer.getSessionId(), Status.OUT_OF_RANGE));
+ receiveWriteChunks(newChunk(Chunk.Type.COMPLETION_ACK, transfer.getSessionId()));
ExecutionException thrown = assertThrows(ExecutionException.class, future::get);
assertThat(((TransferError) thrown.getCause()).status()).isEqualTo(Status.OUT_OF_RANGE);
@@ -1940,13 +2064,14 @@ public final class TransferClientTest {
public void write_sendErrorOnLaterPacket_aborts() {
createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
ListenableFuture<Void> future = transferClient.write(7, TEST_DATA_SHORT.toByteArray());
+ WriteTransfer transfer = transferClient.getWriteTransferForTest(future);
- performWriteStartHandshake(7, 123, TEST_DATA_SHORT.size());
+ performWriteStartHandshake(transfer, TEST_DATA_SHORT.size());
ChannelOutputException exception = new ChannelOutputException("blah");
rpcClient.setChannelOutputException(exception);
- receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123)
+ receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, transfer.getSessionId())
.setOffset(0)
.setWindowEndOffset(50)
.setMaxChunkSizeBytes(30));
@@ -1960,27 +2085,29 @@ public final class TransferClientTest {
public void write_cancelFuture_abortsTransfer() {
createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
ListenableFuture<Void> future = transferClient.write(7, TEST_DATA_100B.toByteArray());
+ WriteTransfer transfer = transferClient.getWriteTransferForTest(future);
- performWriteStartHandshake(7, 123, TEST_DATA_100B.size());
+ performWriteStartHandshake(transfer, TEST_DATA_100B.size());
assertThat(future.cancel(true)).isTrue();
assertThat(future.isCancelled()).isTrue();
- receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123)
+ receiveWriteChunks(newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, transfer.getSessionId())
.setOffset(0)
.setWindowEndOffset(50)
.setMaxChunkSizeBytes(50));
- assertThat(lastChunks()).contains(finalChunk(123, Status.CANCELLED));
- receiveWriteChunks(newChunk(Chunk.Type.COMPLETION_ACK, 123));
+ assertThat(lastChunks()).contains(finalChunk(transfer.getSessionId(), Status.CANCELLED));
+ receiveWriteChunks(newChunk(Chunk.Type.COMPLETION_ACK, transfer.getSessionId()));
}
@Test
public void write_immediateTransferProtocolError_aborts() {
createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
ListenableFuture<Void> future = transferClient.write(123, TEST_DATA_SHORT.toByteArray());
+ WriteTransfer transfer = transferClient.getWriteTransferForTest(future);
- receiveWriteChunks(newChunk(Chunk.Type.COMPLETION, VersionedChunk.UNASSIGNED_SESSION_ID)
+ receiveWriteChunks(newChunk(Chunk.Type.COMPLETION, transfer.getSessionId())
.setResourceId(123)
.setStatus(Status.NOT_FOUND.ordinal()));
@@ -1994,10 +2121,11 @@ public final class TransferClientTest {
public void write_laterTransferProtocolError_aborts() {
createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
ListenableFuture<Void> future = transferClient.write(123, TEST_DATA_SHORT.toByteArray());
+ WriteTransfer transfer = transferClient.getWriteTransferForTest(future);
- performWriteStartHandshake(123, 123, TEST_DATA_SHORT.size());
+ performWriteStartHandshake(transfer, TEST_DATA_SHORT.size());
- receiveWriteChunks(finalChunk(123, Status.NOT_FOUND));
+ receiveWriteChunks(finalChunk(transfer.getSessionId(), Status.NOT_FOUND));
ExecutionException thrown = assertThrows(ExecutionException.class, future::get);
assertThat(thrown).hasCauseThat().isInstanceOf(TransferError.class);
@@ -2020,29 +2148,34 @@ public final class TransferClientTest {
public void write_unknownVersion_invalidArgument() {
createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
ListenableFuture<Void> future = transferClient.write(2, TEST_DATA_SHORT.toByteArray());
+ WriteTransfer transfer = transferClient.getWriteTransferForTest(future);
- receiveWriteChunks(newChunk(Chunk.Type.START_ACK, 3).setResourceId(2).setProtocolVersion(9));
+ receiveWriteChunks(newChunk(Chunk.Type.START_ACK, transfer.getSessionId())
+ .setResourceId(2)
+ .setProtocolVersion(9));
ExecutionException exception = assertThrows(ExecutionException.class, future::get);
assertThat(((TransferError) exception.getCause()).status()).isEqualTo(Status.INVALID_ARGUMENT);
assertThat(lastChunks())
- .containsExactly(initialWriteChunk(2, ProtocolVersion.VERSION_TWO, TEST_DATA_SHORT.size()),
- finalChunk(3, Status.INVALID_ARGUMENT));
+ .containsExactly(initialWriteChunk(transfer, TEST_DATA_SHORT.size()),
+ finalChunk(transfer.getSessionId(), Status.INVALID_ARGUMENT));
}
@Test
public void write_serverRespondsWithUnknownVersion_invalidArgument() {
createTransferClientForTransferThatWillNotTimeOut(ProtocolVersion.VERSION_TWO);
ListenableFuture<Void> future = transferClient.write(2, TEST_DATA_100B.toByteArray());
+ WriteTransfer transfer = transferClient.getWriteTransferForTest(future);
- assertThat(lastChunks())
- .containsExactly(initialWriteChunk(2, ProtocolVersion.VERSION_TWO, 100));
+ assertThat(lastChunks()).containsExactly(initialWriteChunk(transfer, 100));
- receiveWriteChunks(
- newChunk(Chunk.Type.START_ACK, 99).setResourceId(2).setProtocolVersion(600613));
+ receiveWriteChunks(newChunk(Chunk.Type.START_ACK, transfer.getSessionId())
+ .setResourceId(2)
+ .setProtocolVersion(600613));
- assertThat(lastChunks()).containsExactly(finalChunk(99, Status.INVALID_ARGUMENT));
+ assertThat(lastChunks())
+ .containsExactly(finalChunk(transfer.getSessionId(), Status.INVALID_ARGUMENT));
ExecutionException exception = assertThrows(ExecutionException.class, future::get);
assertThat(((TransferError) exception.getCause()).status()).isEqualTo(Status.INVALID_ARGUMENT);
@@ -2052,6 +2185,7 @@ public final class TransferClientTest {
public void write_timeoutAfterInitialChunk() {
createTransferClientThatMayTimeOut(ProtocolVersion.VERSION_TWO);
ListenableFuture<Void> future = transferClient.write(123, TEST_DATA_SHORT.toByteArray());
+ WriteTransfer transfer = transferClient.getWriteTransferForTest(future);
// Call future.get() without sending any server-side packets.
ExecutionException exception = assertThrows(ExecutionException.class, future::get);
@@ -2059,10 +2193,9 @@ public final class TransferClientTest {
// Client should have resent the last chunk (the initial chunk in this case) for each timeout.
assertThat(lastChunks())
- .containsExactly(
- initialWriteChunk(123, ProtocolVersion.VERSION_TWO, TEST_DATA_SHORT.size()), // initial
- initialWriteChunk(123, ProtocolVersion.VERSION_TWO, TEST_DATA_SHORT.size()), // retry 1
- initialWriteChunk(123, ProtocolVersion.VERSION_TWO, TEST_DATA_SHORT.size())); // retry 2
+ .containsExactly(initialWriteChunk(transfer, TEST_DATA_SHORT.size()), // initial
+ initialWriteChunk(transfer, TEST_DATA_SHORT.size()), // retry 1
+ initialWriteChunk(transfer, TEST_DATA_SHORT.size())); // retry 2
}
@Test
@@ -2071,25 +2204,25 @@ public final class TransferClientTest {
// Wait for two outgoing packets (Write RPC request and first chunk), then do the handshake.
enqueueWriteChunks(2,
- newChunk(Chunk.Type.START_ACK, 123).setResourceId(9),
- newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123)
+ newChunk(Chunk.Type.START_ACK, transferClient.getNextSessionIdForTest()).setResourceId(9),
+ newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, transferClient.getNextSessionIdForTest())
.setOffset(0)
.setWindowEndOffset(90)
.setMaxChunkSizeBytes(30));
ListenableFuture<Void> future = transferClient.write(9, TEST_DATA_SHORT.toByteArray());
+ WriteTransfer transfer = transferClient.getWriteTransferForTest(future);
ExecutionException exception = assertThrows(ExecutionException.class, future::get);
assertThat(((TransferError) exception.getCause()).status()).isEqualTo(Status.DEADLINE_EXCEEDED);
- Chunk data = newChunk(Chunk.Type.DATA, 123)
+ Chunk data = newChunk(Chunk.Type.DATA, transfer.getSessionId())
.setOffset(0)
.setData(TEST_DATA_SHORT)
.setRemainingBytes(0)
.build();
assertThat(lastChunks())
- .containsExactly(
- initialWriteChunk(9, ProtocolVersion.VERSION_TWO, TEST_DATA_SHORT.size()), // initial
- newChunk(Chunk.Type.START_ACK_CONFIRMATION, 123)
+ .containsExactly(initialWriteChunk(transfer, TEST_DATA_SHORT.size()), // initial
+ newChunk(Chunk.Type.START_ACK_CONFIRMATION, transfer.getSessionId())
.setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal())
.setRemainingBytes(TEST_DATA_SHORT.size())
.build(),
@@ -2103,47 +2236,50 @@ public final class TransferClientTest {
createTransferClientThatMayTimeOut(ProtocolVersion.VERSION_TWO);
assertThat(MAX_RETRIES).isEqualTo(2); // This test assumes 2 retries
+ int id = transferClient.getNextSessionIdForTest();
+
// Wait for four outgoing packets (Write RPC request and START chunk + retry), then handshake.
enqueueWriteChunks(3,
- newChunk(Chunk.Type.START_ACK, 123)
+ newChunk(Chunk.Type.START_ACK, id)
.setResourceId(5)
.setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal()));
// Wait for start ack confirmation + 2 retries, then request three packets.
enqueueWriteChunks(3,
- newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123)
+ newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, id)
.setOffset(0)
.setWindowEndOffset(60)
.setMaxChunkSizeBytes(20));
// After two packets, request the remainder of the packets.
enqueueWriteChunks(
- 2, newChunk(Chunk.Type.PARAMETERS_CONTINUE, 123).setOffset(20).setWindowEndOffset(200));
+ 2, newChunk(Chunk.Type.PARAMETERS_CONTINUE, id).setOffset(20).setWindowEndOffset(200));
// Wait for last 3 data packets, then 2 final packet retries.
enqueueWriteChunks(5,
- newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123)
+ newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, id)
.setOffset(80)
.setWindowEndOffset(200)
.setMaxChunkSizeBytes(20),
- newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123)
+ newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, id)
.setOffset(80)
.setWindowEndOffset(200)
.setMaxChunkSizeBytes(20));
// After the retry, confirm completed multiple times; additional packets should be dropped
enqueueWriteChunks(1,
- newChunk(Chunk.Type.COMPLETION, 123).setStatus(Status.OK.code()),
- newChunk(Chunk.Type.COMPLETION, 123).setStatus(Status.OK.code()),
- newChunk(Chunk.Type.COMPLETION, 123).setStatus(Status.OK.code()),
- newChunk(Chunk.Type.COMPLETION, 123).setStatus(Status.OK.code()));
+ newChunk(Chunk.Type.COMPLETION, id).setStatus(Status.OK.code()),
+ newChunk(Chunk.Type.COMPLETION, id).setStatus(Status.OK.code()),
+ newChunk(Chunk.Type.COMPLETION, id).setStatus(Status.OK.code()),
+ newChunk(Chunk.Type.COMPLETION, id).setStatus(Status.OK.code()));
ListenableFuture<Void> future = transferClient.write(5, TEST_DATA_100B.toByteArray());
+ WriteTransfer transfer = transferClient.getWriteTransferForTest(future);
assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown.
final Chunk startAckConfirmation =
- newChunk(Chunk.Type.START_ACK_CONFIRMATION, 123)
+ newChunk(Chunk.Type.START_ACK_CONFIRMATION, id)
.setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal())
.setRemainingBytes(TEST_DATA_100B.size())
.build();
@@ -2151,25 +2287,25 @@ public final class TransferClientTest {
assertThat(lastChunks())
.containsExactly(
// initial handshake with retries
- initialWriteChunk(5, ProtocolVersion.VERSION_TWO, TEST_DATA_100B.size()),
- initialWriteChunk(5, ProtocolVersion.VERSION_TWO, TEST_DATA_100B.size()),
+ initialWriteChunk(transfer, TEST_DATA_100B.size()),
+ initialWriteChunk(transfer, TEST_DATA_100B.size()),
startAckConfirmation,
startAckConfirmation,
startAckConfirmation,
// send all data
- dataChunk(123, TEST_DATA_100B, 0, 20), // data 0-20
- dataChunk(123, TEST_DATA_100B, 20, 40), // data 20-40
- dataChunk(123, TEST_DATA_100B, 40, 60), // data 40-60
- dataChunk(123, TEST_DATA_100B, 60, 80), // data 60-80
- dataChunk(123, TEST_DATA_100B, 80, 100), // data 80-100 (final)
+ dataChunk(id, TEST_DATA_100B, 0, 20), // data 0-20
+ dataChunk(id, TEST_DATA_100B, 20, 40), // data 20-40
+ dataChunk(id, TEST_DATA_100B, 40, 60), // data 40-60
+ dataChunk(id, TEST_DATA_100B, 60, 80), // data 60-80
+ dataChunk(id, TEST_DATA_100B, 80, 100), // data 80-100 (final)
// retry last packet two times
- dataChunk(123, TEST_DATA_100B, 80, 100), // data 80-100 (final)
- dataChunk(123, TEST_DATA_100B, 80, 100), // data 80-100 (final)
+ dataChunk(id, TEST_DATA_100B, 80, 100), // data 80-100 (final)
+ dataChunk(id, TEST_DATA_100B, 80, 100), // data 80-100 (final)
// respond to two PARAMETERS_RETRANSMIT packets
- dataChunk(123, TEST_DATA_100B, 80, 100), // data 80-100 (final)
- dataChunk(123, TEST_DATA_100B, 80, 100), // data 80-100 (final)
+ dataChunk(id, TEST_DATA_100B, 80, 100), // data 80-100 (final)
+ dataChunk(id, TEST_DATA_100B, 80, 100), // data 80-100 (final)
// respond to OK packet
- newChunk(Chunk.Type.COMPLETION_ACK, 123).build());
+ newChunk(Chunk.Type.COMPLETION_ACK, id).build());
}
@Test
@@ -2177,98 +2313,105 @@ public final class TransferClientTest {
createTransferClientThatMayTimeOut(ProtocolVersion.VERSION_TWO);
assertThat(MAX_RETRIES).isEqualTo(2); // This test assumes 2 retries
+ int id = transferClient.getNextSessionIdForTest();
+
// Wait for two outgoing packets (Write RPC request and START chunk), then do the handshake.
enqueueWriteChunks(2,
- newChunk(Chunk.Type.START_ACK, 123)
+ newChunk(Chunk.Type.START_ACK, id)
.setResourceId(5)
.setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal()));
// Request two packets.
enqueueWriteChunks(1,
- newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123)
+ newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, id)
.setOffset(0)
.setWindowEndOffset(40)
.setMaxChunkSizeBytes(20));
// After the second retry, send more transfer parameters
enqueueWriteChunks(4,
- newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123)
+ newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, id)
.setOffset(40)
.setWindowEndOffset(120)
.setMaxChunkSizeBytes(40));
// After the first retry, send more transfer parameters
enqueueWriteChunks(3,
- newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123)
+ newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, id)
.setOffset(80)
.setWindowEndOffset(160)
.setMaxChunkSizeBytes(10));
// After the second retry, confirm completed
- enqueueWriteChunks(4, newChunk(Chunk.Type.COMPLETION, 123).setStatus(Status.OK.code()));
- enqueueWriteChunks(1, newChunk(Chunk.Type.COMPLETION_ACK, 123));
+ enqueueWriteChunks(4, newChunk(Chunk.Type.COMPLETION, id).setStatus(Status.OK.code()));
+ enqueueWriteChunks(1, newChunk(Chunk.Type.COMPLETION_ACK, id));
ListenableFuture<Void> future = transferClient.write(5, TEST_DATA_100B.toByteArray());
+ WriteTransfer transfer = transferClient.getWriteTransferForTest(future);
assertThat(future.get()).isNull(); // Ensure that no exceptions are thrown.
assertThat(lastChunks())
.containsExactly(
// initial handshake
- initialWriteChunk(5, ProtocolVersion.VERSION_TWO, TEST_DATA_100B.size()),
- newChunk(Chunk.Type.START_ACK_CONFIRMATION, 123)
+ initialWriteChunk(transfer, TEST_DATA_100B.size()),
+ newChunk(Chunk.Type.START_ACK_CONFIRMATION, id)
.setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal())
.setRemainingBytes(TEST_DATA_100B.size())
.build(),
// after 2, receive parameters: 40 from 0 by 20
- dataChunk(123, TEST_DATA_100B, 0, 20), // data 0-20
- dataChunk(123, TEST_DATA_100B, 20, 40), // data 20-40
- dataChunk(123, TEST_DATA_100B, 20, 40), // retry 1
- dataChunk(123, TEST_DATA_100B, 20, 40), // retry 2
+ dataChunk(id, TEST_DATA_100B, 0, 20), // data 0-20
+ dataChunk(id, TEST_DATA_100B, 20, 40), // data 20-40
+ dataChunk(id, TEST_DATA_100B, 20, 40), // retry 1
+ dataChunk(id, TEST_DATA_100B, 20, 40), // retry 2
// after 4, receive parameters: 80 from 40 by 40
- dataChunk(123, TEST_DATA_100B, 40, 80), // data 40-80
- dataChunk(123, TEST_DATA_100B, 80, 100), // data 80-100
- dataChunk(123, TEST_DATA_100B, 80, 100), // retry 1
+ dataChunk(id, TEST_DATA_100B, 40, 80), // data 40-80
+ dataChunk(id, TEST_DATA_100B, 80, 100), // data 80-100
+ dataChunk(id, TEST_DATA_100B, 80, 100), // retry 1
// after 3, receive parameters: 80 from 80 by 10
- dataChunk(123, TEST_DATA_100B, 80, 90), // data 80-90
- dataChunk(123, TEST_DATA_100B, 90, 100), // data 90-100
- dataChunk(123, TEST_DATA_100B, 90, 100), // retry 1
- dataChunk(123, TEST_DATA_100B, 90, 100), // retry 2
+ dataChunk(id, TEST_DATA_100B, 80, 90), // data 80-90
+ dataChunk(id, TEST_DATA_100B, 90, 100), // data 90-100
+ dataChunk(id, TEST_DATA_100B, 90, 100), // retry 1
+ dataChunk(id, TEST_DATA_100B, 90, 100), // retry 2
// after 4, receive final OK
- newChunk(Chunk.Type.COMPLETION_ACK, 123).build());
+ newChunk(Chunk.Type.COMPLETION_ACK, id).build());
}
+
@Test
- public void write_maxLifetimeRetries() throws Exception {
+ public void write_maxLifetimeRetries() {
createTransferClientThatMayTimeOut(ProtocolVersion.VERSION_TWO, 5);
assertThat(MAX_RETRIES).isEqualTo(2); // This test assumes 2 retries
+ int id = transferClient.getNextSessionIdForTest();
+
// Wait for four outgoing packets (Write RPC request and START chunk + 2 retries)
enqueueWriteChunks(4, // 2 retries
- newChunk(Chunk.Type.START_ACK, 123)
+ newChunk(Chunk.Type.START_ACK, id)
.setResourceId(5)
.setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal()));
// Wait for start ack confirmation + 2 retries, then request three packets.
enqueueWriteChunks(3, // 2 retries
- newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123)
+ newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, id)
.setOffset(0)
.setWindowEndOffset(60)
.setMaxChunkSizeBytes(20));
// After 3 data packets, wait for two more retries, which should put this over the retry limit.
enqueueWriteChunks(5, // 2 retries
- newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, 123) // This packet should be ignored
+ newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, id) // This packet should be ignored
.setOffset(80)
.setWindowEndOffset(200)
.setMaxChunkSizeBytes(20));
ListenableFuture<Void> future = transferClient.write(5, TEST_DATA_100B.toByteArray());
+ WriteTransfer transfer = transferClient.getWriteTransferForTest(future);
ExecutionException exception = assertThrows(ExecutionException.class, future::get);
assertThat(((TransferError) exception.getCause()).status()).isEqualTo(Status.DEADLINE_EXCEEDED);
final Chunk startAckConfirmation =
- newChunk(Chunk.Type.START_ACK_CONFIRMATION, 123)
+ newChunk(Chunk.Type.START_ACK_CONFIRMATION, id)
.setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal())
.setRemainingBytes(TEST_DATA_100B.size())
.build();
@@ -2276,19 +2419,19 @@ public final class TransferClientTest {
assertThat(lastChunks())
.containsExactly(
// initial chunk and 2 retries
- initialWriteChunk(5, ProtocolVersion.VERSION_TWO, TEST_DATA_100B.size()),
- initialWriteChunk(5, ProtocolVersion.VERSION_TWO, TEST_DATA_100B.size()),
- initialWriteChunk(5, ProtocolVersion.VERSION_TWO, TEST_DATA_100B.size()),
+ initialWriteChunk(transfer, TEST_DATA_100B.size()),
+ initialWriteChunk(transfer, TEST_DATA_100B.size()),
+ initialWriteChunk(transfer, TEST_DATA_100B.size()),
// START_ACK_CONFIRMATION and 2 retries
startAckConfirmation,
startAckConfirmation,
startAckConfirmation,
// send all data
- dataChunk(123, TEST_DATA_100B, 0, 20), // data 0-20
- dataChunk(123, TEST_DATA_100B, 20, 40), // data 20-40
- dataChunk(123, TEST_DATA_100B, 40, 60), // data 40-60
+ dataChunk(id, TEST_DATA_100B, 0, 20), // data 0-20
+ dataChunk(id, TEST_DATA_100B, 20, 40), // data 20-40
+ dataChunk(id, TEST_DATA_100B, 40, 60), // data 40-60
// last packet retry, then hit the lifetime retry limit and abort
- dataChunk(123, TEST_DATA_100B, 40, 60)); // data 40-60
+ dataChunk(id, TEST_DATA_100B, 40, 60)); // data 40-60
}
private static ByteString range(int startInclusive, int endExclusive) {
@@ -2310,21 +2453,35 @@ public final class TransferClientTest {
return Chunk.newBuilder().setType(type).setSessionId(sessionId);
}
- private static Chunk initialReadChunk(int resourceId, ProtocolVersion version) {
- return initialReadChunk(resourceId, version, TRANSFER_PARAMETERS);
+ private static Chunk initialReadChunk(ReadTransfer transfer) {
+ Chunk.Builder chunk =
+ newLegacyChunk(Chunk.Type.START, transfer.getResourceId())
+ .setResourceId(transfer.getResourceId())
+ .setPendingBytes(transfer.getParametersForTest().maxPendingBytes())
+ .setWindowEndOffset(transfer.getParametersForTest().maxPendingBytes())
+ .setMaxChunkSizeBytes(transfer.getParametersForTest().maxChunkSizeBytes())
+ .setOffset(0);
+ if (transfer.getDesiredProtocolVersionForTest() != ProtocolVersion.LEGACY) {
+ chunk.setProtocolVersion(transfer.getDesiredProtocolVersionForTest().ordinal());
+ chunk.setDesiredSessionId(transfer.getSessionId());
+ }
+ if (transfer.getParametersForTest().chunkDelayMicroseconds() > 0) {
+ chunk.setMinDelayMicroseconds(transfer.getParametersForTest().chunkDelayMicroseconds());
+ }
+ return chunk.build();
}
- private static Chunk initialReadChunk(
- int resourceId, ProtocolVersion version, TransferParameters params) {
+ private static Chunk initialLegacyReadChunk(int resourceId) {
+ return initialLegacyReadChunk(resourceId, TRANSFER_PARAMETERS);
+ }
+
+ private static Chunk initialLegacyReadChunk(int resourceId, TransferParameters params) {
Chunk.Builder chunk = newLegacyChunk(Chunk.Type.START, resourceId)
.setResourceId(resourceId)
.setPendingBytes(params.maxPendingBytes())
.setWindowEndOffset(params.maxPendingBytes())
.setMaxChunkSizeBytes(params.maxChunkSizeBytes())
.setOffset(0);
- if (version != ProtocolVersion.LEGACY) {
- chunk.setProtocolVersion(version.ordinal());
- }
if (params.chunkDelayMicroseconds() > 0) {
chunk.setMinDelayMicroseconds(params.chunkDelayMicroseconds());
}
@@ -2343,12 +2500,20 @@ public final class TransferClientTest {
return chunk.build();
}
- private static Chunk initialWriteChunk(int resourceId, ProtocolVersion version, int size) {
- Chunk.Builder chunk = newLegacyChunk(Chunk.Type.START, resourceId)
- .setResourceId(resourceId)
+ private static Chunk initialLegacyWriteChunk(int resourceId, int size) {
+ return newLegacyChunk(Chunk.Type.START, resourceId)
+ .setResourceId(resourceId)
+ .setRemainingBytes(size)
+ .build();
+ }
+
+ private static Chunk initialWriteChunk(WriteTransfer transfer, int size) {
+ Chunk.Builder chunk = newLegacyChunk(Chunk.Type.START, transfer.getResourceId())
+ .setResourceId(transfer.getResourceId())
.setRemainingBytes(size);
- if (version != ProtocolVersion.LEGACY) {
+ if (transfer.getDesiredProtocolVersionForTest() != ProtocolVersion.LEGACY) {
chunk.setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal());
+ chunk.setDesiredSessionId(transfer.getSessionId());
}
return chunk.build();
}
@@ -2415,20 +2580,16 @@ public final class TransferClientTest {
}
}
- private void performReadStartHandshake(int resourceId, int sessionId) {
- performReadStartHandshake(
- resourceId, sessionId, TransferClient.DEFAULT_READ_TRANSFER_PARAMETERS);
- }
+ private void performReadStartHandshake(ReadTransfer transfer) {
+ assertThat(lastChunks()).containsExactly(initialReadChunk(transfer));
- private void performReadStartHandshake(int resourceId, int sessionId, TransferParameters params) {
- assertThat(lastChunks())
- .containsExactly(initialReadChunk(resourceId, ProtocolVersion.VERSION_TWO, params));
-
- receiveReadChunks(newChunk(Chunk.Type.START_ACK, sessionId)
- .setResourceId(resourceId)
+ receiveReadChunks(newChunk(Chunk.Type.START_ACK, transfer.getSessionId())
+ .setResourceId(transfer.getResourceId())
.setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal()));
- assertThat(lastChunks()).containsExactly(readStartAckConfirmation(sessionId, params));
+ assertThat(lastChunks())
+ .containsExactly(
+ readStartAckConfirmation(transfer.getSessionId(), transfer.getParametersForTest()));
}
private void performReadCompletionHandshake(int sessionId, Status status) {
@@ -2442,16 +2603,15 @@ public final class TransferClientTest {
receiveReadChunks(newChunk(Chunk.Type.COMPLETION_ACK, sessionId));
}
- private void performWriteStartHandshake(int resourceId, int sessionId, int dataSize) {
- assertThat(lastChunks())
- .containsExactly(initialWriteChunk(resourceId, ProtocolVersion.VERSION_TWO, dataSize));
+ private void performWriteStartHandshake(WriteTransfer transfer, int dataSize) {
+ assertThat(lastChunks()).containsExactly(initialWriteChunk(transfer, dataSize));
- receiveWriteChunks(newChunk(Chunk.Type.START_ACK, sessionId)
- .setResourceId(resourceId)
+ receiveWriteChunks(newChunk(Chunk.Type.START_ACK, transfer.getSessionId())
+ .setResourceId(transfer.getResourceId())
.setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal()));
assertThat(lastChunks())
- .containsExactly(newChunk(Chunk.Type.START_ACK_CONFIRMATION, sessionId)
+ .containsExactly(newChunk(Chunk.Type.START_ACK_CONFIRMATION, transfer.getSessionId())
.setProtocolVersion(ProtocolVersion.VERSION_TWO.ordinal())
.setRemainingBytes(dataSize)
.build());