Streaming bidirecional com gRPC
1. Introdução ao Streaming Bidirecional no gRPC
O gRPC oferece quatro tipos de comunicação entre cliente e servidor: unário (requisição-resposta simples), server-side streaming (cliente envia uma requisição e recebe múltiplas respostas), client-side streaming (cliente envia múltiplas requisições e recebe uma resposta) e streaming bidirecional.
No streaming bidirecional, ambas as partes podem enviar e receber mensagens de forma independente e simultânea. Diferente dos outros tipos, não há sincronização obrigatória entre as mensagens — o servidor pode processar e responder enquanto o cliente continua enviando novos dados. Isso é possível porque cada stream utiliza duas goroutines independentes (uma para leitura, outra para escrita), operando sobre o mesmo canal de comunicação.
Casos de uso típicos incluem:
- Chat em tempo real: múltiplos usuários trocam mensagens simultaneamente
- Jogos multiplayer: atualizações contínuas de posição e ações dos jogadores
- Monitoramento contínuo: servidor envia alertas enquanto cliente envia comandos de configuração
- Processamento de dados em pipeline: cliente envia lotes de dados e servidor retorna resultados parciais
2. Definindo o Serviço com Protocol Buffers
Criamos um arquivo chat.proto que define um serviço de chat com streaming bidirecional:
syntax = "proto3";
package chat;
option go_package = "github.com/exemplo/chatpb";
message ChatMessage {
string user = 1;
string text = 2;
int64 timestamp = 3;
}
message JoinRequest {
string user = 1;
}
service ChatService {
rpc Chat(stream ChatMessage) returns (stream ChatMessage);
}
A assinatura rpc Chat(stream ChatMessage) returns (stream ChatMessage) indica que tanto a requisição quanto a resposta são streams contínuos.
Para gerar o código Golang, execute:
protoc --go_out=. --go-grpc_out=. chat.proto
Isso gera os arquivos chat.pb.go (estruturas das mensagens) e chat_grpc.pb.go (interfaces do serviço e stubs do cliente).
3. Implementação do Servidor em Golang
O servidor deve gerenciar múltiplos clientes conectados simultaneamente. Cada conexão de stream bidirecional recebe sua própria goroutine.
package main
import (
"context"
"fmt"
"io"
"log"
"net"
"sync"
"time"
"google.golang.org/grpc"
pb "github.com/exemplo/chatpb"
)
type chatServer struct {
pb.UnimplementedChatServiceServer
mu sync.RWMutex
streams map[string]pb.ChatService_ChatServer
}
func (s *chatServer) Chat(stream pb.ChatService_ChatServer) error {
// Identifica o usuário pela primeira mensagem
msg, err := stream.Recv()
if err != nil {
return err
}
user := msg.User
s.mu.Lock()
s.streams[user] = stream
s.mu.Unlock()
defer func() {
s.mu.Lock()
delete(s.streams, user)
s.mu.Unlock()
}()
// Goroutine para broadcast de mensagens recebidas
go func() {
for {
in, err := stream.Recv()
if err == io.EOF {
return
}
if err != nil {
log.Printf("Erro ao receber de %s: %v", user, err)
return
}
// Broadcast para todos os clientes
s.mu.RLock()
for _, clientStream := range s.streams {
if err := clientStream.Send(in); err != nil {
log.Printf("Erro ao enviar para cliente: %v", err)
}
}
s.mu.RUnlock()
}
}()
// Mantém o stream ativo
<-stream.Context().Done()
return stream.Context().Err()
}
func main() {
lis, err := net.Listen("tcp", ":50051")
if err != nil {
log.Fatalf("Falha ao iniciar listener: %v", err)
}
s := grpc.NewServer()
pb.RegisterChatServiceServer(s, &chatServer{
streams: make(map[string]pb.ChatService_ChatServer),
})
log.Println("Servidor gRPC iniciado na porta :50051")
if err := s.Serve(lis); err != nil {
log.Fatalf("Falha ao servir: %v", err)
}
}
4. Implementação do Cliente em Golang
O cliente conecta-se ao servidor e gerencia dois fluxos simultâneos: envio de mensagens do usuário e recebimento de mensagens de outros clientes.
package main
import (
"bufio"
"context"
"log"
"os"
"sync"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
pb "github.com/exemplo/chatpb"
)
func main() {
conn, err := grpc.Dial("localhost:50051",
grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf("Falha ao conectar: %v", err)
}
defer conn.Close()
client := pb.NewChatServiceClient(conn)
stream, err := client.Chat(context.Background())
if err != nil {
log.Fatalf("Falha ao criar stream: %v", err)
}
var wg sync.WaitGroup
wg.Add(2)
// Goroutine para envio de mensagens
go func() {
defer wg.Done()
scanner := bufio.NewScanner(os.Stdin)
log.Print("Digite seu nome: ")
scanner.Scan()
user := scanner.Text()
// Envia primeira mensagem com o nome do usuário
stream.Send(&pb.ChatMessage{
User: user,
Text: "entrou no chat",
Timestamp: time.Now().Unix(),
})
for scanner.Scan() {
text := scanner.Text()
if text == "/sair" {
stream.Send(&pb.ChatMessage{
User: user,
Text: "saiu do chat",
Timestamp: time.Now().Unix(),
})
stream.CloseSend()
return
}
msg := &pb.ChatMessage{
User: user,
Text: text,
Timestamp: time.Now().Unix(),
}
if err := stream.Send(msg); err != nil {
log.Printf("Erro ao enviar: %v", err)
return
}
}
}()
// Goroutine para recebimento de mensagens
go func() {
defer wg.Done()
for {
msg, err := stream.Recv()
if err != nil {
log.Printf("Erro ao receber: %v", err)
return
}
log.Printf("[%s] %s: %s",
time.Unix(msg.Timestamp, 0).Format("15:04:05"),
msg.User,
msg.Text)
}
}()
wg.Wait()
}
5. Tratamento de Erros e Cancelamento
O gerenciamento correto de erros é crucial em streams bidirecionais:
io.EOF: indica que o cliente fechou o stream de envio (CloseSend)context.Canceled: ocorre quando o contexto é cancelado (ex.: timeout ou desconexão)- Erros de rede:
RecvMsgretorna erro quando a conexão é perdida
Para cancelamento controlado, utilize context.WithCancel:
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Em caso de erro crítico, chame cancel()
if err != nil {
cancel()
return
}
6. Exemplo Prático: Chat Simples em Tempo Real
O código completo do servidor e cliente apresentados nas seções 3 e 4 formam um chat funcional. Para testar:
- Inicie o servidor:
go run server.go - Inicie múltiplos clientes em terminais diferentes:
go run client.go - Cada cliente digita seu nome e começa a conversar
O servidor faz broadcast de todas as mensagens para todos os clientes conectados, garantindo que todos vejam as mensagens em tempo real.
7. Considerações de Performance e Boas Práticas
- Concorrência limitada: Use
semaphore.Weightedpara limitar o número de streams simultâneos e evitar sobrecarga:
import "golang.org/x/sync/semaphore"
var sem = semaphore.NewWeighted(100) // máximo 100 streams concorrentes
sem.Acquire(context.Background(), 1)
defer sem.Release(1)
- Backpressure: Implemente buffers com capacidade limitada para evitar que streams lentos consumam memória excessiva:
msgChan := make(chan *pb.ChatMessage, 100) // buffer de 100 mensagens
- Métricas: Integre Prometheus ou OpenTelemetry para monitorar:
- Número de streams ativos
- Latência de entrega de mensagens
-
Taxa de erros por stream
-
Versionamento: Para compatibilidade retroativa, nunca remova campos de mensagens existentes. Adicione novos campos com números de campo não utilizados.
Referências
- Documentação oficial do gRPC em Golang — Guia completo de instalação, conceitos e exemplos de streaming
- Protocol Buffers em Go — Tutorial oficial de Protocol Buffers com exemplos de geração de código
- gRPC Bidirectional Streaming em Go - Blog do gRPC — Artigo técnico detalhado sobre padrões de streaming bidirecional
- Exemplo de Chat com gRPC em Go - GitHub oficial — Código-fonte oficial com exemplos práticos de todos os tipos de streaming
- Boas práticas de performance em gRPC — Guia oficial de otimização, incluindo recomendações para streams
- Context e cancelamento em Go — Documentação oficial da biblioteca
contextpara gerenciamento de deadlines e cancelamento