Server-sent events (SSE) em Go
1. Fundamentos do SSE e sua Aplicação em Go
Server-Sent Events (SSE) é uma tecnologia padrão do W3C que permite que servidores enviem dados para clientes web de forma unidirecional através de uma conexão HTTP persistente. Diferentemente dos WebSockets, que oferecem comunicação bidirecional, o SSE é ideal para cenários onde apenas o servidor precisa enviar atualizações para o cliente.
Diferenças Principais
| Característica | SSE | WebSockets | Polling |
|---|---|---|---|
| Direção | Servidor → Cliente | Bidirecional | Cliente → Servidor |
| Protocolo | HTTP puro | WS personalizado | HTTP |
| Reconexão automática | Nativa (Last-Event-ID) | Manual | Não |
| Suporte a navegadores | Amplo (IE除外) | Universal | Universal |
Casos de Uso Típicos
- Notificações em tempo real (alertas, mensagens)
- Feeds de logs de servidores
- Dashboards com métricas atualizadas
- Cotações financeiras e preços de ações
- Status de jobs assíncronos
No ecossistema Go, SSE se destaca pela simplicidade de implementação usando apenas a biblioteca padrão net/http, sem dependências externas.
2. Implementação Básica de um Servidor SSE em Go
A base do SSE é o envio de headers HTTP específicos e o streaming contínuo de dados no formato text/event-stream.
package main
import (
"fmt"
"log"
"net/http"
"time"
)
func sseHandler(w http.ResponseWriter, r *http.Request) {
// Configurar headers obrigatórios
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Access-Control-Allow-Origin", "*")
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "Streaming unsupported", http.StatusInternalServerError)
return
}
// Loop de envio de eventos
for i := 0; i < 10; i++ {
fmt.Fprintf(w, "data: Mensagem %d\n\n", i)
flusher.Flush()
time.Sleep(1 * time.Second)
}
}
func main() {
http.HandleFunc("/events", sseHandler)
log.Fatal(http.ListenAndServe(":8080", nil))
}
Formato do Payload SSE
O protocolo SSE define campos específicos no payload:
event:— Nome do evento (opcional)data:— Dados do evento (obrigatório, pode ter múltiplas linhas)id:— Identificador único (para reconexão)retry:— Tempo de reconexão em milissegundos
// Exemplo de evento completo
fmt.Fprintf(w, "id: 123\n")
fmt.Fprintf(w, "event: atualizacao\n")
fmt.Fprintf(w, "data: {\"mensagem\": \"hello\"}\n")
fmt.Fprintf(w, "retry: 3000\n\n")
3. Gerenciamento de Conexões e Clientes
Para aplicações reais, precisamos gerenciar múltiplos clientes simultaneamente. Um padrão comum é usar um hub centralizado com canais.
type Client struct {
send chan []byte
}
type Hub struct {
clients map[*Client]bool
broadcast chan []byte
register chan *Client
unregister chan *Client
}
func NewHub() *Hub {
return &Hub{
clients: make(map[*Client]bool),
broadcast: make(chan []byte),
register: make(chan *Client),
unregister: make(chan *Client),
}
}
func (h *Hub) Run() {
for {
select {
case client := <-h.register:
h.clients[client] = true
case client := <-h.unregister:
if _, ok := h.clients[client]; ok {
delete(h.clients, client)
close(client.send)
}
case message := <-h.broadcast:
for client := range h.clients {
select {
case client.send <- message:
default:
close(client.send)
delete(h.clients, client)
}
}
}
}
}
4. Envio de Eventos com Dados Estruturados
Para enviar dados complexos, serializamos structs Go para JSON e os enviamos no campo data:.
type StockQuote struct {
Symbol string `json:"symbol"`
Price float64 `json:"price"`
Change float64 `json:"change"`
}
func handleStockStream(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
flusher := w.(http.Flusher)
encoder := json.NewEncoder(w)
quotes := []StockQuote{
{Symbol: "GOOGL", Price: 141.50, Change: 1.23},
{Symbol: "AAPL", Price: 175.30, Change: -0.45},
}
for {
// Enviar múltiplos eventos em lote
for _, quote := range quotes {
fmt.Fprintf(w, "event: stock_quote\n")
fmt.Fprintf(w, "data: ")
encoder.Encode(quote)
fmt.Fprintf(w, "\n")
}
flusher.Flush()
time.Sleep(5 * time.Second)
}
}
5. Tratamento de Desconexões e Reconexão
A detecção de clientes desconectados é crucial para evitar vazamento de goroutines.
func handleSSEWithContext(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
flusher := w.(http.Flusher)
ctx := r.Context()
eventID := 0
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
log.Printf("Cliente desconectado (ID: %d)", eventID)
return
case <-ticker.C:
eventID++
// Enviar heartbeat com suporte a reconexão
fmt.Fprintf(w, "id: %d\n", eventID)
fmt.Fprintf(w, "event: heartbeat\n")
fmt.Fprintf(w, "data: {\"status\": \"ok\"}\n")
fmt.Fprintf(w, "retry: 5000\n\n")
flusher.Flush()
}
}
}
6. Otimizações e Boas Práticas
Controle de Backpressure
type BufferedClient struct {
send chan []byte
ctx context.Context
cancel context.CancelFunc
}
func NewBufferedClient(bufferSize int) *BufferedClient {
ctx, cancel := context.WithCancel(context.Background())
return &BufferedClient{
send: make(chan []byte, bufferSize),
ctx: ctx,
cancel: cancel,
}
}
Reutilização de Buffers com sync.Pool
var bufferPool = sync.Pool{
New: func() interface{} {
return new(bytes.Buffer)
},
}
func sendEvent(w http.ResponseWriter, event string, data []byte) {
buf := bufferPool.Get().(*bytes.Buffer)
defer bufferPool.Put(buf)
buf.Reset()
buf.WriteString("event: ")
buf.WriteString(event)
buf.WriteString("\ndata: ")
buf.Write(data)
buf.WriteString("\n\n")
w.Write(buf.Bytes())
}
Monitoramento de Conexões
type MetricsHub struct {
Hub
activeConnections int32
totalMessages int64
}
func (m *MetricsHub) Register(client *Client) {
atomic.AddInt32(&m.activeConnections, 1)
m.Hub.Register(client)
}
func (m *MetricsHub) Unregister(client *Client) {
atomic.AddInt32(&m.activeConnections, -1)
m.Hub.Unregister(client)
}
7. Exemplo Completo: Sistema de Notificações em Tempo Real
package main
import (
"encoding/json"
"fmt"
"log"
"net/http"
"time"
)
type Notification struct {
Type string `json:"type"`
Message string `json:"message"`
Time string `json:"time"`
}
type SSEHandler struct {
clients map[chan []byte]bool
register chan chan []byte
unregister chan chan []byte
broadcast chan []byte
}
func NewSSEHandler() *SSEHandler {
return &SSEHandler{
clients: make(map[chan []byte]bool),
register: make(chan chan []byte),
unregister: make(chan chan []byte),
broadcast: make(chan []byte),
}
}
func (h *SSEHandler) Run() {
for {
select {
case client := <-h.register:
h.clients[client] = true
case client := <-h.unregister:
if _, ok := h.clients[client]; ok {
delete(h.clients, client)
close(client)
}
case msg := <-h.broadcast:
for client := range h.clients {
select {
case client <- msg:
default:
close(client)
delete(h.clients, client)
}
}
}
}
}
func (h *SSEHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "Streaming unsupported", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
messageChan := make(chan []byte, 10)
h.register <- messageChan
defer func() { h.unregister <- messageChan }()
ctx := r.Context()
eventID := 0
for {
select {
case <-ctx.Done():
return
case msg := <-messageChan:
eventID++
fmt.Fprintf(w, "id: %d\n", eventID)
w.Write(msg)
fmt.Fprintf(w, "\n")
flusher.Flush()
case <-time.After(30 * time.Second):
// Heartbeat para manter conexão
fmt.Fprintf(w, ": heartbeat\n\n")
flusher.Flush()
}
}
}
func main() {
handler := NewSSEHandler()
go handler.Run()
http.Handle("/events", handler)
// Endpoint para enviar notificações
http.HandleFunc("/send", func(w http.ResponseWriter, r *http.Request) {
notif := Notification{
Type: "info",
Message: "Sistema operacional",
Time: time.Now().Format(time.RFC3339),
}
data, _ := json.Marshal(notif)
event := []byte(fmt.Sprintf("event: notification\ndata: %s", data))
handler.broadcast <- event
w.WriteHeader(http.StatusOK)
})
log.Println("Servidor SSE rodando em :8080")
log.Fatal(http.ListenAndServe(":8080", nil))
}
Cliente HTML/JS mínimo:
<!DOCTYPE html>
<html>
<body>
<div id="events"></div>
<script>
const evtSource = new EventSource('/events');
evtSource.addEventListener('notification', function(e) {
const data = JSON.parse(e.data);
const div = document.getElementById('events');
div.innerHTML += `<p>[${data.type}] ${data.message} - ${data.time}</p>`;
});
evtSource.onerror = function(e) {
console.log('Erro de conexão, reconectando...');
};
</script>
</body>
</html>
Referências
- Documentação oficial do pacote net/http — Documentação completa da biblioteca padrão Go para HTTP, incluindo ResponseWriter e Flusher
- MDN: Server-sent events — Guia completo da Mozilla sobre SSE no lado do cliente
- W3C Server-Sent Events Specification — Especificação oficial do W3C para SSE
- Go by Example: Server-Sent Events — Exemplo prático de implementação SSE em Go
- Real-time Go: Building SSE Servers — Tutorial avançado de SSE em Go com gerenciamento de conexões
- Golang SSE Library: r3labs/sse — Biblioteca popular para SSE em Go com suporte a canais e broadcast
- HTML5 Rocks: Stream Updates with SSE — Tutorial introdutório sobre SSE no lado do cliente