このトピックでは、Java、Go、Node.js、Python を使用して gRPC 通信モデルを実装する方法について説明します。モデルには、単項リモートプロシージャコール (RPC)、サーバー ストリーミング RPC、クライアント ストリーミング RPC、双方向ストリーミング RPC が含まれます。
サンプルプロジェクト
gRPC のサンプルプロジェクトの詳細については、「hello-servicemesh-grpc」をご参照ください。このトピックのディレクトリは、hello-servicemesh-grpc のディレクトリです。
ステップ 1: コードの変換
- 次のコマンドを実行して gRPC と Protocol Buffers をインストールします。この例では、gRPC と Protocol Buffers は macOS オペレーティングシステムにインストールされます。
brew install grpc protobuf - Protocol Buffers の定義を使用するプログラミング言語のコードに変換します。このトピックでは、Java、Go、Node.js、Python が使用されます:説明 サンプルプロジェクトでは、各言語のコードディレクトリに、landing.proto ファイルを格納する proto ディレクトリが含まれています。landing.proto ファイルは、サンプルプロジェクトのルートディレクトリにある proto/landing.proto ファイルへのシンボリックリンクです。これにより、Protocol Buffers の定義を統一された方法で更新できます。
- Java: Maven は Java のビルド自動化ツールです。Maven は、コードを自動的に変換するための protobuf-maven-plugin プラグインを提供します。
mvn packageコマンドを実行して protoc-gen-grpc-java を使用し、gRPC テンプレートコードを生成できます。詳細については、「hello-grpc-java/pom.xml」をご参照ください。 - Go:
go get github.com/golang/protobuf/protoc-gen-goコマンドを実行して protoc-gen-go をインストールします。次に、protoc コマンドを実行して gRPC コードを生成します。詳細については、「hello-grpc-go/proto2go.sh」をご参照ください。 - Node.js:
npm install -g grpc-toolsコマンドを実行して grpc_tools_node_protoc をインストールします。次に、protoc コマンドを実行して gRPC コードを生成します。詳細については、「hello-grpc-nodejs/proto2js.sh」をご参照ください。 - Python:
pip install grpcio-toolsコマンドを実行して grpcio-tools をインストールします。次に、protoc コマンドを実行して gRPC コードを生成します。詳細については、「hello-grpc-python/proto2py.sh」をご参照ください。
- Java: Maven は Java のビルド自動化ツールです。Maven は、コードを自動的に変換するための protobuf-maven-plugin プラグインを提供します。
ステップ 2: 通信モデルの設定
- hello 配列を設定します。
- Java:
private final List<String> HELLO_LIST = Arrays.asList("Hello", "Bonjour", "Hola", "こんにちは", "Ciao", ""); kv.put("data", HELLO_LIST.get(index)); - Go:
var helloList = []string{"Hello", "Bonjour", "Hola", "こんにちは", "Ciao", ""} kv["data"] = helloList[index] - Node.js:
let hellos = ["Hello", "Bonjour", "Hola", "こんにちは", "Ciao", ""] kv.set("data", hellos[index]) - Python:
hellos = ["Hello", "Bonjour", "Hola", "こんにちは", "Ciao", ""] result.kv["data"] = hellos[index]
- Java:
- 通信モデルを設定します。
- 単項 RPC モデルを設定します。
- Java:
// ブロッキングスタブを使用してサーバーにリクエストを送信します。 public TalkResponse talk(TalkRequest talkRequest) { return blockingStub.talk(talkRequest); } // サーバーがリクエストを処理した後、StreamObserver インスタンスの onNext および onCompleted イベントがトリガーされます。 public void talk(TalkRequest request, StreamObserver<TalkResponse> responseObserver) { ... responseObserver.onNext(response); responseObserver.onCompleted(); } - Go:
func talk(client pb.LandingServiceClient, request *pb.TalkRequest) { r, err := client.Talk(context.Background(), request) } func (s *ProtoServer) Talk(ctx context.Context, request *pb.TalkRequest) (*pb.TalkResponse, error) { return &pb.TalkResponse{ Status: 200, Results: []*pb.TalkResult{s.buildResult(request.Data)}, }, nil } - Node.js:
function talk(client, request) { client.talk(request, function (err, response) { ... }) } function talk(call, callback) { const talkResult = buildResult(call.request.getData()) ... callback(null, response) } - Python:
def talk(stub): response = stub.talk(request) def talk(self, request, context): result = build_result(request.data) ... return response
- Java:
- サーバー ストリーミング RPC モデルを設定します。
- Java:
public List<TalkResponse> talkOneAnswerMore(TalkRequest request) { Iterator<TalkResponse> talkResponses = blockingStub.talkOneAnswerMore(request); talkResponses.forEachRemaining(talkResponseList::add); return talkResponseList; } public void talkOneAnswerMore(TalkRequest request, StreamObserver<TalkResponse> responseObserver) { String[] datas = request.getData().split(","); for (String data : datas) {...} talkResponses.forEach(responseObserver::onNext); responseObserver.onCompleted(); } - Go:
func talkOneAnswerMore(client pb.LandingServiceClient, request *pb.TalkRequest) { stream, err := client.TalkOneAnswerMore(context.Background(), request) for { r, err := stream.Recv() if err == io.EOF { break } ... } } func (s *ProtoServer) TalkOneAnswerMore(request *pb.TalkRequest, stream pb.Landing..Server) error { datas := strings.Split(request.Data, ",") for _, d := range datas { stream.Send(&pb.TalkResponse{...}) } - Node.js:
function talkOneAnswerMore(client, request) { let call = client.talkOneAnswerMore(request) call.on('data', function (response) { ... }) } function talkOneAnswerMore(call) { let datas = call.request.getData().split(",") for (const data in datas) { ... call.write(response) } call.end() } - Python:
def talk_one_answer_more(stub): responses = stub.talkOneAnswerMore(request) for response in responses: logger.info(response) def talkOneAnswerMore(self, request, context): datas = request.data.split(",") for data in datas: yield response
- Java:
- クライアント ストリーミング RPC モデルを設定します。
- Java:
public void talkMoreAnswerOne(List<TalkRequest> requests) throws InterruptedException { final CountDownLatch finishLatch = new CountDownLatch(1); StreamObserver<TalkResponse> responseObserver = new StreamObserver<TalkResponse>() { @Override public void onNext(TalkResponse talkResponse) { log.info("Response=\n{}", talkResponse); } @Override public void onCompleted() { finishLatch.countDown(); } }; final StreamObserver<TalkRequest> requestObserver = asyncStub.talkMoreAnswerOne(responseObserver); try { requests.forEach(request -> { if (finishLatch.getCount() > 0) { requestObserver.onNext(request); }); requestObserver.onCompleted(); } public StreamObserver<TalkRequest> talkMoreAnswerOne(StreamObserver<TalkResponse> responseObserver) { return new StreamObserver<TalkRequest>() { @Override public void onNext(TalkRequest request) { talkRequests.add(request); } @Override public void onCompleted() { responseObserver.onNext(buildResponse(talkRequests)); responseObserver.onCompleted(); } }; } - Go:
func talkMoreAnswerOne(client pb.LandingServiceClient, requests []*pb.TalkRequest) { stream, err := client.TalkMoreAnswerOne(context.Background()) for _, request := range requests { stream.Send(request) } r, err := stream.CloseAndRecv() } func (s *ProtoServer) TalkMoreAnswerOne(stream pb.LandingService_TalkMoreAnswerOneServer) error { for { in, err := stream.Recv() if err == io.EOF { talkResponse := &pb.TalkResponse{ Status: 200, Results: rs, } stream.SendAndClose(talkResponse) return nil } rs = append(rs, s.buildResult(in.Data)) } } - Node.js:
function talkMoreAnswerOne(client, requests) { let call = client.talkMoreAnswerOne(function (err, response) { ... }) requests.forEach(request => { call.write(request) }) call.end() } function talkMoreAnswerOne(call, callback) { let talkResults = [] call.on('data', function (request) { talkResults.push(buildResult(request.getData())) }) call.on('end', function () { let response = new messages.TalkResponse() response.setStatus(200) response.setResultsList(talkResults) callback(null, response) }) } - Python:
def talk_more_answer_one(stub): response_summary = stub.talkMoreAnswerOne(request_iterator) def generate_request(): for _ in range(0, 3): yield request def talkMoreAnswerOne(self, request_iterator, context): for request in request_iterator: response.results.append(build_result(request.data)) return response
- Java:
- 双方向ストリーミング RPC モデルを設定します。
- Java:
public void talkBidirectional(List<TalkRequest> requests) throws InterruptedException { final CountDownLatch finishLatch = new CountDownLatch(1); StreamObserver<TalkResponse> responseObserver = new StreamObserver<TalkResponse>() { @Override public void onNext(TalkResponse talkResponse) { log.info("Response=\n{}", talkResponse); } @Override public void onCompleted() { finishLatch.countDown(); } }; final StreamObserver<TalkRequest> requestObserver = asyncStub.talkBidirectional(responseObserver); try { requests.forEach(request -> { if (finishLatch.getCount() > 0) { requestObserver.onNext(request); ... requestObserver.onCompleted(); } public StreamObserver<TalkRequest> talkBidirectional(StreamObserver<TalkResponse> responseObserver) { return new StreamObserver<TalkRequest>() { @Override public void onNext(TalkRequest request) { responseObserver.onNext(TalkResponse.newBuilder() .setStatus(200) .addResults(buildResult(request.getData())).build()); } @Override public void onCompleted() { responseObserver.onCompleted(); } }; } - Go:
func talkBidirectional(client pb.LandingServiceClient, requests []*pb.TalkRequest) { stream, err := client.TalkBidirectional(context.Background()) waitc := make(chan struct{}) go func() { for { r, err := stream.Recv() if err == io.EOF { // 読み取り完了。 close(waitc) return } } }() for _, request := range requests { stream.Send(request) } stream.CloseSend() <-waitc } func (s *ProtoServer) TalkBidirectional(stream pb.LandingService_TalkBidirectionalServer) error { for { in, err := stream.Recv() if err == io.EOF { return nil } stream.Send(talkResponse) } } - Node.js:
function talkBidirectional(client, requests) { let call = client.talkBidirectional() call.on('data', function (response) { ... }) requests.forEach(request => { call.write(request) }) call.end() } function talkBidirectional(call) { call.on('data', function (request) { call.write(response) }) call.on('end', function () { call.end() }) } - Python:
def talk_bidirectional(stub): responses = stub.talkBidirectional(request_iterator) for response in responses: logger.info(response) def talkBidirectional(self, request_iterator, context): for request in request_iterator: yield response
- Java:
- 単項 RPC モデルを設定します。
ステップ 3: 関数の実装
- 環境変数関数を実装します。
- Java:
private static String getGrcServer() { String server = System.getenv("GRPC_SERVER"); if (server == null) { return "localhost"; } return server; } - Go:
func grpcServer() string { server := os.Getenv("GRPC_SERVER") if len(server) == 0 { return "localhost" } else { return server } } - Node.js:
function grpcServer() { let server = process.env.GRPC_SERVER; if (typeof server !== 'undefined' && server !== null) { return server } else { return "localhost" } } - Python:
def grpc_server(): server = os.getenv("GRPC_SERVER") if server: return server else: return "localhost"
- Java:
- 乱数関数を実装します。
- Java:
public static String getRandomId() { return String.valueOf(random.nextInt(5)); } - Go:
func randomId(max int) string { return strconv.Itoa(rand.Intn(max)) } - Node.js:
function randomId(max) { return Math.floor(Math.random() * Math.floor(max)).toString() } - Python:
def random_id(end): return str(random.randint(0, end))
- Java:
- タイムスタンプ関数を実装します。
- Java:
TalkResult.newBuilder().setId(System.nanoTime()) - Go:
result.Id = time.Now().UnixNano() - Node.js:
result.setId(Math.round(Date.now() / 1000)) - Python:
result.id = int((time.time()))
- Java:
- UUID 関数を実装します。
- Java:
kv.put("id", UUID.randomUUID().toString()); - Go:
import ( "github.com/google/uuid" ) kv["id"] = uuid.New().String() - Node.js:
kv.set("id", uuid.v1()) - Python:
result.kv["id"] = str(uuid.uuid1())
- Java:
- スリープ関数を実装します。
- Java:
TimeUnit.SECONDS.sleep(1); - Go:
time.Sleep(2 * time.Millisecond) - Node.js:
let sleep = require('sleep') sleep.msleep(2) - Python:
time.sleep(random.uniform(0.5, 1.5))
- Java:
結果の検証
機能
次のコマンドを実行して、あるターミナルで gRPC サーバーを起動し、別のターミナルで gRPC クライアントを起動します。gRPC クライアントとサーバーを起動すると、gRPC クライアントは 4 つの通信モデルの API 操作にリクエストを送信します。
- Java:
mvn exec:java -Dexec.mainClass="org.feuyeux.grpc.server.ProtoServer"mvn exec:java -Dexec.mainClass="org.feuyeux.grpc.client.ProtoClient" - Go:
go run server.gogo run client/proto_client.go - Node.js:
node proto_server.jsnode proto_client.js - Python:
python server/protoServer.pypython client/protoClient.py
通信エラーが発生しない場合、gRPC クライアントとサーバーが起動します。
クロス通信
クロス通信は、gRPC クライアントとサーバーが使用する言語に関係なく、gRPC クライアントとサーバーが同じ方法で相互に通信することを保証します。これにより、リクエストの応答が言語バージョンによって変わることはありません。
- gRPC サーバー (たとえば、Java gRPC サーバー) を起動します:
mvn exec:java -Dexec.mainClass="org.feuyeux.grpc.server.ProtoServer" - 次のコマンドを実行して、Java、Go、Node.js、Python で gRPC クライアントを起動します:
mvn exec:java -Dexec.mainClass="org.feuyeux.grpc.client.ProtoClient"go run client/proto_client.gonode proto_client.jspython client/protoClient.py通信エラーが発生しない場合、クロス通信は成功です。
次のステップ
gRPC クライアントとサーバーが期待どおりに通信できることを確認したら、クライアントとサーバーのイメージをビルドできます。
ステップ 1: プロジェクトのビルド
4 つのプログラミング言語を使用して、gRPC クライアントとサーバーのプロジェクトをビルドします。
- JavagRPC クライアントとサーバーの JAR パッケージを作成します。次に、パッケージを Docker ディレクトリにコピーします。
mvn clean install -DskipTests -f server_pom cp target/hello-grpc-java.jar ../docker/ mvn clean install -DskipTests -f client_pom cp target/hello-grpc-java.jar ../docker/ - GoGo を使用してコンパイルされたバイナリファイルには、オペレーティングシステムに関する構成が含まれており、Linux にデプロイする必要があります。したがって、次の内容をバイナリファイルに追加します。次に、バイナリファイルを Docker ディレクトリにコピーします。
env GOOS=linux GOARCH=amd64 go build -o proto_server server.go mv proto_server ../docker/ env GOOS=linux GOARCH=amd64 go build -o proto_client client/proto_client.go mv proto_client ../docker/ - NodeJSNode.js プロジェクトは、ランタイムに必要なあらゆる種類の C++ 依存関係をサポートするために、Docker イメージで作成する必要があります。したがって、ファイルを Docker ディレクトリにコピーします。
cp ../hello-grpc-nodejs/proto_server.js node cp ../hello-grpc-nodejs/package.json node cp -R ../hello-grpc-nodejs/common node cp -R ../proto node cp ../hello-grpc-nodejs/*_client.js node - PythonPython ファイルをコンパイルせずに Docker ディレクトリにコピーします。
cp -R ../hello-grpc-python/server py cp ../hello-grpc-python/start_server.sh py cp -R ../proto py cp ../hello-grpc-python/proto2py.sh py cp -R ../hello-grpc-python/client py cp ../hello-grpc-python/start_client.sh py
ステップ 2: GRPC サーバーとクライアントのイメージのビルド
プロジェクトをビルドすると、Dockerfile で必要なすべてのファイルが Docker ディレクトリに保存されます。このセクションでは、Dockerfile に関する主な情報について説明します。
- サイズが最小であるため、基本イメージとして alpine を選択します。この例では、Python の基本イメージは python v2.7 です。必要に応じてイメージのバージョンを変更できます。
- Node.js では、C++ とコンパイラ Make のインストールが必要です。Npm パッケージは grpc-tools とともにインストールする必要があります。
この例では、Node.js サーバーのイメージをビルドする方法を示します。
- grpc-server-node.dockerfile ファイルを作成します。
FROM node:14.11-alpine RUN apk add --update \ python \ make \ g++ \ && rm -rf /var/cache/apk/* RUN npm config set registry http://registry.npmmirror.com && npm install -g node-pre-gyp grpc-tools --unsafe-perm COPY node/package.json . RUN npm install --unsafe-perm COPY node . ENTRYPOINT ["node","proto_server.js"] - イメージをビルドします。
docker build -f grpc-server-node.dockerfile -t registry.cn-beijing.aliyuncs.com/asm_repo/grpc_server_node:1.0.0 .合計 8 つのイメージがビルドされます。 - Push コマンドを実行して、イメージを Container Registry に配布します。
docker push registry.cn-beijing.aliyuncs.com/asm_repo/grpc_server_java:1.0.0docker push registry.cn-beijing.aliyuncs.com/asm_repo/grpc_client_java:1.0.0docker push registry.cn-beijing.aliyuncs.com/asm_repo/grpc_server_go:1.0.0docker push registry.cn-beijing.aliyuncs.com/asm_repo/grpc_client_go:1.0.0docker push registry.cn-beijing.aliyuncs.com/asm_repo/grpc_server_node:1.0.0docker push registry.cn-beijing.aliyuncs.com/asm_repo/grpc_client_node:1.0.0docker push registry.cn-beijing.aliyuncs.com/asm_repo/grpc_server_python:1.0.0docker push registry.cn-beijing.aliyuncs.com/asm_repo/grpc_client_python:1.0.0