Learning should always be fun

Making simple notification service using go and grpc

Making simple notification service using go and grpc

Hello gud ppl! Its been a while I have not written down new tutorials.  In this tutorial we will see how can we implement simple notification service in go lang using gRPC. We will also see what gRPC is and its use cases. We will also see its implementation to solve different scenarios where simple HTTP/1 request fails to address.

 

Lets get started …

 

Before we begin I would like you guys to know a little about RPC. RPC stands for Remote Procedure Calls. In computer science we use term Remote Procedure Call(RPC) to indicate that some task or procedure needs to be done in specific domain or a machine. These machines can be devices connected in your local network or something in your cloud. The point of RPC is to indicate the distributed machines or computer to do a task without a programmer explicitly coding on it.

 

Introduction to gRPC

 

gRPC is a RPC system developed by google. It supports bidirectional streaming, authentication, flow controls, cancellations and timeout. So with gRPC you can easily authenticate a client and exchange information bidirectionally. You also don’t need to worry about the different bandwidth and connections speed in different client. It has implemented its own buffer mechanisms to address such scenarios. These features best suits gRPC for implementation in micro service architecture.

 

Use cases of gRPC

 

  • Connect different micro services running in different locations or data centers.
  • Connect mobile applications to the backend services.
  • Stream bidirectional data with different machines

 

Implementing notification service

 

Notification service is one very important feature that HTTP/1 fails to provide. Developers use techniques like long polling in order to implement the notification service. In HTTP/1 the client make TCP connection to server and this connection is closed every time when the response from the server is delivered to the client. Hence a client always need to initiate a connection in order to get a command or response back to it. HTTP/2 on the other hand can perform bidirectional data exchange. The transfer protocol of gRPC is HTTP/2. Due to this reason we can use gRPC to exchange information bidirectionally between client and server without closing the connection.

The gRPC uses Protocol Buffers as the interface description language. It  uses protocol buffers to define the structure of data that is used in data communication. Lets start by creating our new project in $GOPATH/src directory. Lets name the project folder as ‘gonotification’. Under our project folder lets create another two more folder named ‘notification’ and ‘notification_proto’.

 

Under ‘notification_proto’ folder lets create a ‘notification.proto’ file with following content :

syntax = "proto3";

option java_multiple_files = true;
option java_package = "notification_proto";
option java_outer_classname = "NotificationProto";

package result;

// The Notification service definition.
service Notification {
  // ConnectToServer connects to server with the ClientDetail message
  // this will in response receive stream of NotificationMessage
  rpc ConnectToServer(ClientDetail) returns (stream NotificationMessage) {}
}


// ClientDetail message
message ClientDetail {
  string clientName = 1;
  int32 clientAge = 2;
  string address = 3;
  bool isNepali = 4;
}

// NotificationMessage is the message sent as notification from the server to the client
message NotificationMessage{
  string message = 1;
  int64 time = 2; 
}

Here we have defined the Protocol buffer of Notification service. You can see that ConnectToServer will take ClientDetail message to the server at the time of connection. When the connection is successfull it will in turn receive stream where the notification message will arrive. Now lets implement these features in our code but before that lets create then necessary server and client interface running the following command

protoc -I notification_proto/ notification_proto/notification.proto --go_out=plugins=grpc:notification_proto

Once done you must see a new file name notification.pb.go under notification_proto directory. If you open the file then  you will see auto generated go codes which will be imported by our server and client interfaces.

Under ‘notification’ folder create two more folders named ‘client’ and ‘server’.

Server and Client Interface

Now under client create client.go file as :

 

// Package main implements a client for Notification service
package main

import (
	"bufio"
	"context"
	"fmt"
	"io"
	"log"
	"os"
	"strconv"

	pb "gonotification/notification_proto"

	"google.golang.org/grpc"
)

const (
	address = "localhost:5001"
)

func main() {

	reader := bufio.NewReader(os.Stdin)

	clientName, _ := stdin(reader, "Enter client name : ")

	ageStr, _ := stdin(reader, "Enter client age : ")
	age, err := strconv.Atoi(ageStr)
	if err != nil {
		fmt.Print(err)
		log.Fatal("Enter valid age %v", err)
	}

	address, _ := stdin(reader, "Enter client address : ")

	clientDetails := &pb.ClientDetail{
		Name:    clientName,
		Age:     int32(age),
		Address: address,
	}

	connectToServer(clientDetails)
}

func connectToServer(clientDetails *pb.ClientDetail) {

	// Set up a connection to the server.
	conn, err := grpc.Dial(address, grpc.WithInsecure())
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}

	client := pb.NewNotificationClient(conn)

	stream, err := client.ConnectToServer(context.Background(), clientDetails)

	for {

		// listen for streams
		notificationMessage, err := stream.Recv()

		if err == io.EOF { //no more stream to listen
			break
		}
		if err != nil { // some error occured
			log.Fatal("%v", err)
		}
		onNewNotification(notificationMessage)
	}
}

// called when the notification arrives
func onNewNotification(notificationMessage *pb.NotificationMessage) {
	fmt.Printf("%d: New message : %s", notificationMessage.GetTime()/1e6, notificationMessage.Message)
}

func stdin(reader *bufio.Reader, query string) (string, error) {
	fmt.Println(query)
	input, err := reader.ReadString('\n')
	if err != nil {
		return "", nil
	}
	return input[:len(input)-1], nil
}

Similarly under server folder create server.go as:

 

// Package main implements a server for Notification service
package main

import (
	"bufio"
	"fmt"
	pb "gonotification/notification_proto"
	"log"
	"net"
	"os"
	"time"

	"google.golang.org/grpc"
)

const (
	address = "localhost:5001"
)

// Server implement the  Notification service
// clients is the map of connected clients
// clientStreams is the map of connected clients stream
type Server struct {
	clients       map[string]*pb.ClientDetail
	clientStreams map[string]*pb.Notification_ConnectToServerServer
}

func (server *Server) init() {
	server.clients = make(map[string]*pb.ClientDetail)
	server.clientStreams = make(map[string]*pb.Notification_ConnectToServerServer)
}

// ConnectToServer is called when clietn make connection to the server
// this function will add the client to the servers clienst map and stores the client stream
// the stream should not be killed so we do not return from this server
// for this purpose the infinite loop is used
func (server *Server) ConnectToServer(in *pb.ClientDetail, stream pb.Notification_ConnectToServerServer) error {
	server.addNewClient(in, &stream)
	// loop infinitely to keep stream alive
	// else this stream will be closed
	for {
	}
	return nil
}

// adds new client to map
func (server *Server) addNewClient(in *pb.ClientDetail, stream *pb.Notification_ConnectToServerServer) {
	log.Printf("adding new client")
	server.clientStreams[in.Name] = stream
	server.clients[in.Name] = in
}

// send notification to specific client
func (server *Server) sendNotification(clientID string, msg string) {
	client := server.clients[clientID]
	stream := server.clientStreams[clientID]

	notificationMessage := &pb.NotificationMessage{
		Message: fmt.Sprintf("%s(age : %d) currently living in %s :: %s", client.Name, client.Age, client.Address, msg),
		Time:    time.Now().UnixNano(),
	}

	(*stream).Send(notificationMessage)

}

func main() {

	lis, err := net.Listen("tcp", address)
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}
	server := &Server{}
	server.init()
	options := []grpc.ServerOption{}
	options = append(options, grpc.MaxMsgSize(100*1024*1024))
	options = append(options, grpc.MaxRecvMsgSize(100*1024*1024))
	s := grpc.NewServer(options...)

	pb.RegisterNotificationServer(s, server)
	// go routine to get server notification message from stdin
	go waitForMessage(server)
	if err := s.Serve(lis); err != nil {
		log.Fatalf("failed to serve: %v", err)
	}

}

func waitForMessage(server *Server) {
	for { // get the server notification message
		reader := bufio.NewReader(os.Stdin)
		fmt.Println("Notification Msg : ")
		msg, _ := reader.ReadString('\n')
		// send the message to all the clients
		for clientID := range server.clients {
			server.sendNotification(clientID, msg)
		}
	}

}

If you look carefully then you will see that we have implemented  an interface called NotificationServer in server. This interface has been defined in notification.pb.go file. Everything here is almost straight forward. I want you guys to look at ConnectToServer function of server. You will see that we have used an infinite loop that does nothing at all.

So why do we need the infinite loop? Well turns out that once the function returns some value the stream will be closed. If this happens we cannot send messages to the client directly. Hence we want to keep the stream alive. In order to do so we will have to use infinite loop so that the stream doesn’t close.

 

Building the application

 

Now lets build the client and server. Enter the following to build server.

go build gonotification/notification/server 

Enter the following command to build client

go build gonotification/notification/client

 

Inorder to ease things up lets create a Makefile as :


SRC_DIR = notification_proto
DST_DIR = notification_proto
PROTO_FILE_NAME = notification.proto

# create auto generated codes for server and client interfaces
proto:
	protoc -I $(SRC_DIR)/ $(SRC_DIR)/$(PROTO_FILE_NAME) --go_out=plugins=grpc:$(DST_DIR)

# build the server code
build-server:
	go build gonotification/notification/server

# build the client code
build-client:
	go build gonotification/notification/client

# run the server
run-server:
	./server

# run the client
run-client:
	./client

Now run the app as :

1. make proto
2. make build-server
3. make build-client
4. make run-server
5. make run-client

Once the server and clients are connected, type some message in server console and press enter. The message should pop up on client console. You can connect as many client as you want to the same server. The notification goes to all of them. The message are sent to the client without the connection being closed and reusing the same stream.

You can get the code from my github here.

Reference

https://en.wikipedia.org/wiki/Remote_procedure_call

https://grpc.io/about/

https://grpc.io/about/

https://en.wikipedia.org/wiki/Interface_description_language

https://en.wikipedia.org/wiki/Protocol_Buffers

 

Credits:

Gofer image downloaded frrom https://github.com/egonelbre/gophers