Server-sent events (SSE) em Go

1. Fundamentos do SSE e sua Aplicação em Go

Server-Sent Events (SSE) é uma tecnologia padrão do W3C que permite que servidores enviem dados para clientes web de forma unidirecional através de uma conexão HTTP persistente. Diferentemente dos WebSockets, que oferecem comunicação bidirecional, o SSE é ideal para cenários onde apenas o servidor precisa enviar atualizações para o cliente.

Diferenças Principais

Característica SSE WebSockets Polling
Direção Servidor → Cliente Bidirecional Cliente → Servidor
Protocolo HTTP puro WS personalizado HTTP
Reconexão automática Nativa (Last-Event-ID) Manual Não
Suporte a navegadores Amplo (IE除外) Universal Universal

Casos de Uso Típicos

  • Notificações em tempo real (alertas, mensagens)
  • Feeds de logs de servidores
  • Dashboards com métricas atualizadas
  • Cotações financeiras e preços de ações
  • Status de jobs assíncronos

No ecossistema Go, SSE se destaca pela simplicidade de implementação usando apenas a biblioteca padrão net/http, sem dependências externas.

2. Implementação Básica de um Servidor SSE em Go

A base do SSE é o envio de headers HTTP específicos e o streaming contínuo de dados no formato text/event-stream.

package main

import (
    "fmt"
    "log"
    "net/http"
    "time"
)

func sseHandler(w http.ResponseWriter, r *http.Request) {
    // Configurar headers obrigatórios
    w.Header().Set("Content-Type", "text/event-stream")
    w.Header().Set("Cache-Control", "no-cache")
    w.Header().Set("Connection", "keep-alive")
    w.Header().Set("Access-Control-Allow-Origin", "*")

    flusher, ok := w.(http.Flusher)
    if !ok {
        http.Error(w, "Streaming unsupported", http.StatusInternalServerError)
        return
    }

    // Loop de envio de eventos
    for i := 0; i < 10; i++ {
        fmt.Fprintf(w, "data: Mensagem %d\n\n", i)
        flusher.Flush()
        time.Sleep(1 * time.Second)
    }
}

func main() {
    http.HandleFunc("/events", sseHandler)
    log.Fatal(http.ListenAndServe(":8080", nil))
}

Formato do Payload SSE

O protocolo SSE define campos específicos no payload:

  • event: — Nome do evento (opcional)
  • data: — Dados do evento (obrigatório, pode ter múltiplas linhas)
  • id: — Identificador único (para reconexão)
  • retry: — Tempo de reconexão em milissegundos
// Exemplo de evento completo
fmt.Fprintf(w, "id: 123\n")
fmt.Fprintf(w, "event: atualizacao\n")
fmt.Fprintf(w, "data: {\"mensagem\": \"hello\"}\n")
fmt.Fprintf(w, "retry: 3000\n\n")

3. Gerenciamento de Conexões e Clientes

Para aplicações reais, precisamos gerenciar múltiplos clientes simultaneamente. Um padrão comum é usar um hub centralizado com canais.

type Client struct {
    send chan []byte
}

type Hub struct {
    clients    map[*Client]bool
    broadcast  chan []byte
    register   chan *Client
    unregister chan *Client
}

func NewHub() *Hub {
    return &Hub{
        clients:    make(map[*Client]bool),
        broadcast:  make(chan []byte),
        register:   make(chan *Client),
        unregister: make(chan *Client),
    }
}

func (h *Hub) Run() {
    for {
        select {
        case client := <-h.register:
            h.clients[client] = true
        case client := <-h.unregister:
            if _, ok := h.clients[client]; ok {
                delete(h.clients, client)
                close(client.send)
            }
        case message := <-h.broadcast:
            for client := range h.clients {
                select {
                case client.send <- message:
                default:
                    close(client.send)
                    delete(h.clients, client)
                }
            }
        }
    }
}

4. Envio de Eventos com Dados Estruturados

Para enviar dados complexos, serializamos structs Go para JSON e os enviamos no campo data:.

type StockQuote struct {
    Symbol string  `json:"symbol"`
    Price  float64 `json:"price"`
    Change float64 `json:"change"`
}

func handleStockStream(w http.ResponseWriter, r *http.Request) {
    w.Header().Set("Content-Type", "text/event-stream")
    w.Header().Set("Cache-Control", "no-cache")

    flusher := w.(http.Flusher)
    encoder := json.NewEncoder(w)

    quotes := []StockQuote{
        {Symbol: "GOOGL", Price: 141.50, Change: 1.23},
        {Symbol: "AAPL", Price: 175.30, Change: -0.45},
    }

    for {
        // Enviar múltiplos eventos em lote
        for _, quote := range quotes {
            fmt.Fprintf(w, "event: stock_quote\n")
            fmt.Fprintf(w, "data: ")
            encoder.Encode(quote)
            fmt.Fprintf(w, "\n")
        }
        flusher.Flush()
        time.Sleep(5 * time.Second)
    }
}

5. Tratamento de Desconexões e Reconexão

A detecção de clientes desconectados é crucial para evitar vazamento de goroutines.

func handleSSEWithContext(w http.ResponseWriter, r *http.Request) {
    w.Header().Set("Content-Type", "text/event-stream")
    w.Header().Set("Cache-Control", "no-cache")

    flusher := w.(http.Flusher)
    ctx := r.Context()

    eventID := 0
    ticker := time.NewTicker(2 * time.Second)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            log.Printf("Cliente desconectado (ID: %d)", eventID)
            return
        case <-ticker.C:
            eventID++
            // Enviar heartbeat com suporte a reconexão
            fmt.Fprintf(w, "id: %d\n", eventID)
            fmt.Fprintf(w, "event: heartbeat\n")
            fmt.Fprintf(w, "data: {\"status\": \"ok\"}\n")
            fmt.Fprintf(w, "retry: 5000\n\n")
            flusher.Flush()
        }
    }
}

6. Otimizações e Boas Práticas

Controle de Backpressure

type BufferedClient struct {
    send    chan []byte
    ctx     context.Context
    cancel  context.CancelFunc
}

func NewBufferedClient(bufferSize int) *BufferedClient {
    ctx, cancel := context.WithCancel(context.Background())
    return &BufferedClient{
        send:   make(chan []byte, bufferSize),
        ctx:    ctx,
        cancel: cancel,
    }
}

Reutilização de Buffers com sync.Pool

var bufferPool = sync.Pool{
    New: func() interface{} {
        return new(bytes.Buffer)
    },
}

func sendEvent(w http.ResponseWriter, event string, data []byte) {
    buf := bufferPool.Get().(*bytes.Buffer)
    defer bufferPool.Put(buf)
    buf.Reset()

    buf.WriteString("event: ")
    buf.WriteString(event)
    buf.WriteString("\ndata: ")
    buf.Write(data)
    buf.WriteString("\n\n")

    w.Write(buf.Bytes())
}

Monitoramento de Conexões

type MetricsHub struct {
    Hub
    activeConnections int32
    totalMessages     int64
}

func (m *MetricsHub) Register(client *Client) {
    atomic.AddInt32(&m.activeConnections, 1)
    m.Hub.Register(client)
}

func (m *MetricsHub) Unregister(client *Client) {
    atomic.AddInt32(&m.activeConnections, -1)
    m.Hub.Unregister(client)
}

7. Exemplo Completo: Sistema de Notificações em Tempo Real

package main

import (
    "encoding/json"
    "fmt"
    "log"
    "net/http"
    "time"
)

type Notification struct {
    Type    string `json:"type"`
    Message string `json:"message"`
    Time    string `json:"time"`
}

type SSEHandler struct {
    clients    map[chan []byte]bool
    register   chan chan []byte
    unregister chan chan []byte
    broadcast  chan []byte
}

func NewSSEHandler() *SSEHandler {
    return &SSEHandler{
        clients:    make(map[chan []byte]bool),
        register:   make(chan chan []byte),
        unregister: make(chan chan []byte),
        broadcast:  make(chan []byte),
    }
}

func (h *SSEHandler) Run() {
    for {
        select {
        case client := <-h.register:
            h.clients[client] = true
        case client := <-h.unregister:
            if _, ok := h.clients[client]; ok {
                delete(h.clients, client)
                close(client)
            }
        case msg := <-h.broadcast:
            for client := range h.clients {
                select {
                case client <- msg:
                default:
                    close(client)
                    delete(h.clients, client)
                }
            }
        }
    }
}

func (h *SSEHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    flusher, ok := w.(http.Flusher)
    if !ok {
        http.Error(w, "Streaming unsupported", http.StatusInternalServerError)
        return
    }

    w.Header().Set("Content-Type", "text/event-stream")
    w.Header().Set("Cache-Control", "no-cache")
    w.Header().Set("Connection", "keep-alive")

    messageChan := make(chan []byte, 10)
    h.register <- messageChan
    defer func() { h.unregister <- messageChan }()

    ctx := r.Context()
    eventID := 0

    for {
        select {
        case <-ctx.Done():
            return
        case msg := <-messageChan:
            eventID++
            fmt.Fprintf(w, "id: %d\n", eventID)
            w.Write(msg)
            fmt.Fprintf(w, "\n")
            flusher.Flush()
        case <-time.After(30 * time.Second):
            // Heartbeat para manter conexão
            fmt.Fprintf(w, ": heartbeat\n\n")
            flusher.Flush()
        }
    }
}

func main() {
    handler := NewSSEHandler()
    go handler.Run()

    http.Handle("/events", handler)

    // Endpoint para enviar notificações
    http.HandleFunc("/send", func(w http.ResponseWriter, r *http.Request) {
        notif := Notification{
            Type:    "info",
            Message: "Sistema operacional",
            Time:    time.Now().Format(time.RFC3339),
        }

        data, _ := json.Marshal(notif)
        event := []byte(fmt.Sprintf("event: notification\ndata: %s", data))
        handler.broadcast <- event

        w.WriteHeader(http.StatusOK)
    })

    log.Println("Servidor SSE rodando em :8080")
    log.Fatal(http.ListenAndServe(":8080", nil))
}

Cliente HTML/JS mínimo:

<!DOCTYPE html>
<html>
<body>
    <div id="events"></div>
    <script>
        const evtSource = new EventSource('/events');

        evtSource.addEventListener('notification', function(e) {
            const data = JSON.parse(e.data);
            const div = document.getElementById('events');
            div.innerHTML += `<p>[${data.type}] ${data.message} - ${data.time}</p>`;
        });

        evtSource.onerror = function(e) {
            console.log('Erro de conexão, reconectando...');
        };
    </script>
</body>
</html>

Referências