gRPC Bi Directional Streaming

Pendahuluan

Pada Bi Directional Streaming, client maupun server dapat mengirim request atau response lebih dari satu kali. Jumlah request dan response tidak harus sama.

Bi Directional streaming cocok digunakan ketika baik client dan server perlu mengirimkan banyak data secara asynchronous.

Pada Bi Directional streaming, rpc menggunakan keyword stream baik untuk request maupun response. Berikut syntax dasar dari Bi Directional Streaming

message namaRequestMessage {
  //field goes here
}

message namaResponseMessage {
  //field goes here
}

service namaService {
  rpc namaFungsi (stream namaRequestMessage) returns (stream namaResponseMessage) {};
}

Implementasi Bi Directional Streaming

Buka file protos/greet.proto, kemudian tambahkan code untuk bi directional streaming.

syntax = "proto3";

package greet;

message Greeting {
    string first_name = 1;
    string last_name = 2;
}

message GreetRequest{
    Greeting greeting = 1;
}

message GreetResponse{
    string result = 1;
}

//streaming request
message GreetManyRequest{
    Greeting greeting = 1;
}

//streaming response
message GreetManyResponse{
    string result = 1;
}

//client streaming msg request
message LongGreetRequest {
    Greeting greet = 1;
}
message LongGreetResponse {
    string result = 1;
}

//bi directional streaming
message GreetEveryoneRequest {
    Greeting greet = 1;
}
message GreetEveryoneResponse {
    string result = 1;
}

service GreetService {
    //unary
    rpc Greet (GreetRequest) returns (GreetResponse){};

    //server streaming
    rpc GreetManyTimes (GreetManyRequest) returns (stream GreetManyResponse){};

    //client streaming
    rpc LongGreet (stream LongGreetRequest) returns (LongGreetResponse){};

    //bidi streaming
    rpc GreetEveryone (stream GreetEveryoneRequest) returns (stream GreetEveryoneResponse){};    
}

Kemudian kita compile file proto diatas, file greet_pb.js dan greet_grpc_pb.js akan diupdate.

Kita buka file server/index.js, lalu tambahkan code untuk service bi directional streaming.

Pada contoh kode akan melakukan response pada setiap message, tidak ada keharusan. Bagaiaman cara Anda merespons, tergantung dari bisnis logic yang akan diterapkan.

var grpc = require('grpc');

var greets = require('./greet_pb');
var service = require('./greet_grpc_pb');

function greet (call, callback){
    var greeting = new greets.GreetResponse();

    greeting.setResult(        
        "Hello, " + call.request.getGreeting().getFirstName() + " " + call.request.getGreeting().getLastName()
    );

    callback(null, greeting);
}

//implementasi streaming API
function greetManyTimes (call, callback){
    var firstName = call.request.getGreeting().getFirstName();

    //kita simulasikan streaming.
    let count = 0, intervalID = setInterval(function(){

        var greetManyResponse = new greets.GreetManyResponse();
        greetManyResponse.setResult(firstName);
    
        //streaming goes here    
        call.write(greetManyResponse);
        if(++count > 9){
            clearInterval(intervalID);
            call.end() //server selesai mengirim message.
        }
    }, 1000);


}

//client streaming
function longGreet (call, callback){
    call.on('data', request=>{
        var fullname = request.getGreet().getFirstName() + ' ' + request.getGreet().getLastName();
        console.log('hello, ' + fullname);
    });   

    call.on('error', (error)=>{
        console.error(error);
    });

    call.on('end', ()=>{
        var response = new greets.LongGreetResponse();
        response.setResult('Long greet client streaming..');

        callback(null, response);
    });

}

//bidi streaming
async function sleep(interval){
    return new Promise((resolve)=>{
        setTimeout(()=> resolve(), interval)
    });
}

async function greetEveryone(call, callback){
    call.on('data', request =>{
        var fullname = request.getGreet().getFirstName() + " " + request.getGreet().getLastName();
        console.log('Data from: ' + fullname);
    });

    call.on('error', error =>{
        log.error(error);
    });

    call.on('end', ()=>{
        console.log('End bidi streaming...');
    });

    //simulasi streaming menggunakan for loop

    for(var i=0; i<10; i++){
        var response = new greets.GreetEveryoneResponse();
        response.setResult('Data ke: ' +  i);

        call.write(response);

        await sleep(1000);
    }

    call.end();
}

function main(){
    var Server = new grpc.Server();
    
    Server.addService(service.GreetServiceService, {
        greet: greet, 
        greetManyTimes: greetManyTimes, 
        longGreet:longGreet, 
        greetEveryone:greetEveryone //tambahkan service bidi streaming
    });
    Server.bindAsync("127.0.0.1:50051", grpc.ServerCredentials.createInsecure(), ()=>{
        Server.start();
    });
    

    console.log("Server running..");
}

main()

Kemudian kita buka file client/client.js, tambahkan code untuk bidi streaming.

var grpc = require('grpc');

var greets = require('../server/greet_pb');
var service = require('../server/greet_grpc_pb');

// function unary(){
//     var client = new service.GreetServiceClient('localhost:50051', grpc.credentials.createInsecure());

//     var request = new greets.GreetRequest();
//     var greeting = new greets.Greeting();
//     greeting.setFirstName("Test1");
//     greeting.setLastName("Test2");

//     request.setGreeting(greeting);

//     client.greet(request, (error, response)=>{
//         if (!error){
//             console.log("Response: ", response.getResult());
//         }else{
//             console.error(error);
//         }
//     });

// }

// function callGreetMany(){
//     var client = new service.GreetServiceClient(
//         'localhost:50051', grpc.credentials.createInsecure()
//     );

//     var request = new greets.GreetManyRequest();
//     var greeting = new greets.Greeting();
//     greeting.setFirstName("Streaming");
//     greeting.setLastName("API");

//     request.setGreeting(greeting);

//     var call = client.greetManyTimes(request, ()=>{});

//     call.on("data", (response)=>{
//         console.log('Client streaming response: ', response.getResult());
//     });

//     call.on("status", (status)=>{
//         console.log(status.details);
//     });

//     call.on("error", (error)=>{
//         console.error(error.details);
//     });

//     call.on("end", ()=>{
//         console.log("streaming end..");
//     });


// }


// function callLongGreeting(){
//     var client = new service.GreetServiceClient(
//         'localhost:50051', grpc.credentials.createInsecure()
//     );

//     var request = new greets.LongGreetRequest();

//     var call = client.longGreet(request, (error, response)=>{
//         if(!error){
//             console.log('server response: ', response.getResult());
//         }else{
//             console.error(error);
//         }
//     });

//     let count = 0, intervalID = setInterval(function(){
//         console.log('Sending message ' + count);

//         //request 1
//         var request = new greets.LongGreetRequest();
//         var greeting = new greets.Greeting();
//         greeting.setFirstName('Client');
//         greeting.setLastName('Streaming');

//         request.setGreet(greeting);

//         //request 2
//         var request2 = new greets.LongGreetRequest();
//         var greeting2 = new greets.Greeting();
//         greeting2.setFirstName('Client2');
//         greeting2.setLastName('Streaming2');

//         request2.setGreet(greeting2);
        

//         call.write(request);
//         call.write(request2);

//         if(++count > 3){
//             clearInterval(intervalID);
//             call.end();
//         }

//     }, 1000);
// }

//bidi streaming
async function sleep(interval){
    return new Promise((resolve)=>{
        setTimeout(()=> resolve(), interval)
    });
}

async function callBiDirect(){
    var client = new service.GreetServiceClient(
        'localhost:50051', grpc.credentials.createInsecure()
    );

    var call =  client.greetEveryone(request, (error, response)=>{
        console.log('Server response: ' + response);
    });

    call.on('data', response=>{
        console.log('Response from server:  ' + response.getResult());
    });

    call.on('error', error =>{
        console.error(error);
    });

    call.on('end', ()=>{
        console.log('End bidi...');
    });

    //simulasi streaming data menggunakan for loop
    for(var i=0; i<10; i++){
        var greeting = new greets.Greeting();
        greeting.setFirstName('Bidi');
        greeting.setLastName('Streaming');

        var request = new greets.GreetEveryoneRequest();
        request.setGreet(greeting);

        call.write(request);
        await sleep(1500);
    }

    call.end();

}
callBiDirect();
// callLongGreeting();
//callGreetMany();
//unary()

Berikutnya kita jalankan service, buka command prompt, masuk ke direktori server

$ node index.js

Server running..
Data from: Bidi Streaming
Data from: Bidi Streaming
Data from: Bidi Streaming
Data from: Bidi Streaming
Data from: Bidi Streaming
Data from: Bidi Streaming
Data from: Bidi Streaming
Data from: Bidi Streaming
Data from: Bidi Streaming
End bidi streaming...

Buka command prompt kedua, jalankan client.js

$ node client.js

Response from server:  Data ke: 0
Response from server:  Data ke: 1
Response from server:  Data ke: 2
Response from server:  Data ke: 3
Response from server:  Data ke: 4
Response from server:  Data ke: 5
Response from server:  Data ke: 6
Response from server:  Data ke: 7
Response from server:  Data ke: 8
Response from server:  Data ke: 9
End bidi...

Sampai disini kita sudah berhasil melakukan simulasi bi-di streaming.

Sharing is caring:

Leave a Comment