Issue
I have implemented GRPC bidirectional streaming service using vertx. Once I got the responses from server side, I have to do another process based on the response. For this I have used requestStreamObserver.onCompleted(), But right after this called, I am getting
Stream is already completed, no further calls are allowed.
from server side.
So my question is how to read the response stream and do the process without terminating the server streaming
proto file
syntax = "prto3";
service userService{
rpc getData(stream Request) returns(stream Response);
}
message Request{
int32 id = 1;
}
message Response{
int32 responseCode = 1;
string username = 2;
}
Client Side implementation
public void bidi() {
List<String> responseList = new ArrayList<>();
final CountDownLatch finishLatch = new CountDownLatch(1);
StreamObserver<Response> responseStreamObserver = new StreamObserver<Response>() {
@Override
public void onNext(Response response) {
System.out.println(response.getResponseCode() + " - " + response.getUsername());
responseList.add(response.getUsername());
}
@Override
public void onError(Throwable throwable) {
System.out.println("Error " + throwable.getMessage());
finishLatch.countDown();
}
@Override
public void onCompleted() {
System.out.println("completed stream");
finishLatch.countDown();
}
};
StreamObserver<Request> requestStreamObserver = nonBlockingStub.requestData(responseStreamObserver);
try {
for (int i = 1; i < 6; i++) {
Request request = Request.newBuilder().setId(i).build();
requestStreamObserver.onNext(request);
if (finishLatch.getCount() == 0) {
return;
}
}
} catch (Exception e) {
e.printStackTrace();
}
requestStreamObserver.onCompleted();
try {
finishLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
//These code blocks need to be executed after completing the response
for(String data:responseList){
System.out.println(data);
}
doStuff();
}
As I do not have access to Server side, I cannot post server side implementation, But request is accepting a Stream and returns a stream from server.
Thanks.
Solution
So this is the way I get sorted out this issue.
Used AtomicInteger and increased the count when making a request. Then in response observer have to decrement the counter once response received. If the counter get 0 then call CountdownLatch's countDown() and do other stuff.
public void bidi() {
List<String> responseList = new ArrayList<>();
AtomicInteger counter = new AtomicInteger();
final CountDownLatch finishLatch = new CountDownLatch(1);
StreamObserver<Response> responseStreamObserver = new StreamObserver<Response>() {
@Override
public void onNext(Response response) {
System.out.println(response.getResponseCode() + " - " + response.getUsername());
responseList.add(response.getUsername());
counter.decrementAndGet(); //decrement the counter
if(counter.get() == 0){
finishLatch.countDown();
}
}
@Override
public void onError(Throwable throwable) {
System.out.println("Error " + throwable.getMessage());
finishLatch.countDown();
}
@Override
public void onCompleted() {
System.out.println("completed stream");
finishLatch.countDown();
}
};
StreamObserver<Request> requestStreamObserver = nonBlockingStub.requestData(responseStreamObserver);
try {
for (int i = 1; i < 6; i++) {
Request request = Request.newBuilder().setId(i).build();
requestStreamObserver.onNext(request);
counter.incrementAndGet();//increment the counter
if (finishLatch.getCount() == 0) {
return;
}
}
} catch (Exception e) {
e.printStackTrace();
}
try {
finishLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
for(String data:responseList){
System.out.println(data);
}
doStuff();
}
Answered By - ThilinaMD
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.