243 lines
10 KiB
Java
243 lines
10 KiB
Java
/*
|
|
* Copyright 2014 Google Inc. All rights reserved.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
import MyGame.Example.Monster;
|
|
import MyGame.Example.MonsterStorageGrpc;
|
|
import MyGame.Example.Stat;
|
|
import com.google.flatbuffers.FlatBufferBuilder;
|
|
import io.grpc.ManagedChannel;
|
|
import io.grpc.ManagedChannelBuilder;
|
|
import io.grpc.Server;
|
|
import io.grpc.ServerBuilder;
|
|
import io.grpc.stub.StreamObserver;
|
|
import org.junit.Assert;
|
|
|
|
import java.io.IOException;
|
|
import java.lang.InterruptedException;
|
|
import java.nio.ByteBuffer;
|
|
import java.util.Iterator;
|
|
import java.util.concurrent.TimeUnit;
|
|
import java.util.concurrent.atomic.AtomicReference;
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
import java.util.concurrent.CountDownLatch;
|
|
|
|
|
|
/**
|
|
* Demonstrates basic client-server interaction using grpc-java over netty.
|
|
*/
|
|
public class JavaGrpcTest {
|
|
static final String BIG_MONSTER_NAME = "Cyberdemon";
|
|
static final short nestedMonsterHp = 600;
|
|
static final short nestedMonsterMana = 1024;
|
|
static final int numStreamedMsgs = 10;
|
|
static final int timeoutMs = 3000;
|
|
static Server server;
|
|
static ManagedChannel channel;
|
|
static MonsterStorageGrpc.MonsterStorageBlockingStub blockingStub;
|
|
static MonsterStorageGrpc.MonsterStorageStub asyncStub;
|
|
|
|
static class MyService extends MonsterStorageGrpc.MonsterStorageImplBase {
|
|
@Override
|
|
public void store(Monster request, io.grpc.stub.StreamObserver<Stat> responseObserver) {
|
|
Assert.assertEquals(request.name(), BIG_MONSTER_NAME);
|
|
Assert.assertEquals(request.hp(), nestedMonsterHp);
|
|
Assert.assertEquals(request.mana(), nestedMonsterMana);
|
|
System.out.println("Received store request from " + request.name());
|
|
// Create a response from the incoming request name.
|
|
Stat stat = GameFactory.createStat("Hello " + request.name(), 100, 10);
|
|
responseObserver.onNext(stat);
|
|
responseObserver.onCompleted();
|
|
}
|
|
|
|
@Override
|
|
public void retrieve(Stat request, io.grpc.stub.StreamObserver<Monster> responseObserver) {
|
|
// Create 10 monsters for streaming response.
|
|
for (int i=0; i<numStreamedMsgs; i++) {
|
|
Monster monster = GameFactory.createMonsterFromStat(request, i);
|
|
responseObserver.onNext(monster);
|
|
}
|
|
responseObserver.onCompleted();
|
|
}
|
|
|
|
@Override
|
|
public StreamObserver<Monster> getMaxHitPoint(final StreamObserver<Stat> responseObserver) {
|
|
return computeMinMax(responseObserver, false);
|
|
}
|
|
|
|
@Override
|
|
public StreamObserver<Monster> getMinMaxHitPoints(final StreamObserver<Stat> responseObserver) {
|
|
return computeMinMax(responseObserver, true);
|
|
}
|
|
|
|
private StreamObserver<Monster> computeMinMax(final StreamObserver<Stat> responseObserver, final boolean includeMin) {
|
|
final AtomicInteger maxHp = new AtomicInteger(Integer.MIN_VALUE);
|
|
final AtomicReference<String> maxHpMonsterName = new AtomicReference<String>();
|
|
final AtomicInteger maxHpCount = new AtomicInteger();
|
|
|
|
final AtomicInteger minHp = new AtomicInteger(Integer.MAX_VALUE);
|
|
final AtomicReference<String> minHpMonsterName = new AtomicReference<String>();
|
|
final AtomicInteger minHpCount = new AtomicInteger();
|
|
|
|
return new StreamObserver<Monster>() {
|
|
public void onNext(Monster monster) {
|
|
if (monster.hp() > maxHp.get()) {
|
|
// Found a monster of higher hit points.
|
|
maxHp.set(monster.hp());
|
|
maxHpMonsterName.set(monster.name());
|
|
maxHpCount.set(1);
|
|
}
|
|
else if (monster.hp() == maxHp.get()) {
|
|
// Count how many times we saw a monster of current max hit points.
|
|
maxHpCount.getAndIncrement();
|
|
}
|
|
|
|
if (monster.hp() < minHp.get()) {
|
|
// Found a monster of a lower hit points.
|
|
minHp.set(monster.hp());
|
|
minHpMonsterName.set(monster.name());
|
|
minHpCount.set(1);
|
|
}
|
|
else if (monster.hp() == minHp.get()) {
|
|
// Count how many times we saw a monster of current min hit points.
|
|
minHpCount.getAndIncrement();
|
|
}
|
|
}
|
|
public void onCompleted() {
|
|
Stat maxHpStat = GameFactory.createStat(maxHpMonsterName.get(), maxHp.get(), maxHpCount.get());
|
|
// Send max hit points first.
|
|
responseObserver.onNext(maxHpStat);
|
|
if (includeMin) {
|
|
// Send min hit points.
|
|
Stat minHpStat = GameFactory.createStat(minHpMonsterName.get(), minHp.get(), minHpCount.get());
|
|
responseObserver.onNext(minHpStat);
|
|
}
|
|
responseObserver.onCompleted();
|
|
}
|
|
public void onError(Throwable t) {
|
|
// Not expected
|
|
Assert.fail();
|
|
};
|
|
};
|
|
}
|
|
}
|
|
|
|
@org.junit.BeforeClass
|
|
public static void startServer() throws IOException {
|
|
server = ServerBuilder.forPort(0).addService(new MyService()).build().start();
|
|
int port = server.getPort();
|
|
channel = ManagedChannelBuilder.forAddress("localhost", port)
|
|
// Channels are secure by default (via SSL/TLS). For the example we disable TLS to avoid
|
|
// needing certificates.
|
|
.usePlaintext(true)
|
|
.directExecutor()
|
|
.build();
|
|
blockingStub = MonsterStorageGrpc.newBlockingStub(channel);
|
|
asyncStub = MonsterStorageGrpc.newStub(channel);
|
|
}
|
|
|
|
@org.junit.Test
|
|
public void testUnary() throws IOException {
|
|
Monster monsterRequest = GameFactory.createMonster(BIG_MONSTER_NAME, nestedMonsterHp, nestedMonsterMana);
|
|
Stat stat = blockingStub.store(monsterRequest);
|
|
Assert.assertEquals(stat.id(), "Hello " + BIG_MONSTER_NAME);
|
|
System.out.println("Received stat response from service: " + stat.id());
|
|
}
|
|
|
|
@org.junit.Test
|
|
public void testServerStreaming() throws IOException {
|
|
Monster monsterRequest = GameFactory.createMonster(BIG_MONSTER_NAME, nestedMonsterHp, nestedMonsterMana);
|
|
Stat stat = blockingStub.store(monsterRequest);
|
|
Iterator<Monster> iterator = blockingStub.retrieve(stat);
|
|
int counter = 0;
|
|
while(iterator.hasNext()) {
|
|
Monster m = iterator.next();
|
|
System.out.println("Received monster " + m.name());
|
|
counter ++;
|
|
}
|
|
Assert.assertEquals(counter, numStreamedMsgs);
|
|
System.out.println("FlatBuffers GRPC client/server test: completed successfully");
|
|
}
|
|
|
|
@org.junit.Test
|
|
public void testClientStreaming() throws IOException, InterruptedException {
|
|
final AtomicReference<Stat> maxHitStat = new AtomicReference<Stat>();
|
|
final CountDownLatch streamAlive = new CountDownLatch(1);
|
|
|
|
StreamObserver<Stat> statObserver = new StreamObserver<Stat>() {
|
|
public void onCompleted() {
|
|
streamAlive.countDown();
|
|
}
|
|
public void onError(Throwable ex) { }
|
|
public void onNext(Stat stat) {
|
|
maxHitStat.set(stat);
|
|
}
|
|
};
|
|
StreamObserver<Monster> monsterStream = asyncStub.getMaxHitPoint(statObserver);
|
|
short count = 10;
|
|
for (short i = 0;i < count; ++i) {
|
|
Monster monster = GameFactory.createMonster(BIG_MONSTER_NAME + i, (short) (nestedMonsterHp * i), nestedMonsterMana);
|
|
monsterStream.onNext(monster);
|
|
}
|
|
monsterStream.onCompleted();
|
|
// Wait a little bit for the server to send the stats of the monster with the max hit-points.
|
|
streamAlive.await(timeoutMs, TimeUnit.MILLISECONDS);
|
|
Assert.assertEquals(maxHitStat.get().id(), BIG_MONSTER_NAME + (count - 1));
|
|
Assert.assertEquals(maxHitStat.get().val(), nestedMonsterHp * (count - 1));
|
|
Assert.assertEquals(maxHitStat.get().count(), 1);
|
|
}
|
|
|
|
@org.junit.Test
|
|
public void testBiDiStreaming() throws IOException, InterruptedException {
|
|
final AtomicReference<Stat> maxHitStat = new AtomicReference<Stat>();
|
|
final AtomicReference<Stat> minHitStat = new AtomicReference<Stat>();
|
|
final CountDownLatch streamAlive = new CountDownLatch(1);
|
|
|
|
StreamObserver<Stat> statObserver = new StreamObserver<Stat>() {
|
|
public void onCompleted() {
|
|
streamAlive.countDown();
|
|
}
|
|
public void onError(Throwable ex) { }
|
|
public void onNext(Stat stat) {
|
|
// We expect the server to send the max stat first and then the min stat.
|
|
if (maxHitStat.get() == null) {
|
|
maxHitStat.set(stat);
|
|
}
|
|
else {
|
|
minHitStat.set(stat);
|
|
}
|
|
}
|
|
};
|
|
StreamObserver<Monster> monsterStream = asyncStub.getMinMaxHitPoints(statObserver);
|
|
short count = 10;
|
|
for (short i = 0;i < count; ++i) {
|
|
Monster monster = GameFactory.createMonster(BIG_MONSTER_NAME + i, (short) (nestedMonsterHp * i), nestedMonsterMana);
|
|
monsterStream.onNext(monster);
|
|
}
|
|
monsterStream.onCompleted();
|
|
|
|
// Wait a little bit for the server to send the stats of the monster with the max and min hit-points.
|
|
streamAlive.await(timeoutMs, TimeUnit.MILLISECONDS);
|
|
|
|
Assert.assertEquals(maxHitStat.get().id(), BIG_MONSTER_NAME + (count - 1));
|
|
Assert.assertEquals(maxHitStat.get().val(), nestedMonsterHp * (count - 1));
|
|
Assert.assertEquals(maxHitStat.get().count(), 1);
|
|
|
|
Assert.assertEquals(minHitStat.get().id(), BIG_MONSTER_NAME + 0);
|
|
Assert.assertEquals(minHitStat.get().val(), nestedMonsterHp * 0);
|
|
Assert.assertEquals(minHitStat.get().count(), 1);
|
|
}
|
|
}
|