Concorrência com o pacote io

1. Introdução à Concorrência com I/O em Go

O modelo de concorrência de Go, baseado em goroutines e canais, é particularmente poderoso quando combinado com operações de I/O. O pacote io fornece interfaces fundamentais como Reader e Writer que são naturalmente compatíveis com esse modelo.

Operações de I/O são inerentemente bloqueantes — ler de um arquivo ou socket exige espera por dados. Em linguagens tradicionais, isso significa bloquear uma thread do sistema operacional. Em Go, uma goroutine bloqueada em I/O não bloqueia a thread do SO, permitindo que milhares de operações concorrentes coexistam eficientemente.

A interface io.Reader define o método Read(p []byte) (n int, err error), enquanto io.Writer define Write(p []byte) (n int, err error). Essas interfaces são implementadas por arquivos, conexões de rede, buffers e muitos outros tipos, tornando-as ideais para padrões concorrentes.

2. Lendo Múltiplos Arquivos Concorrentemente

Um caso clássico é processar múltiplos arquivos em paralelo. Vamos ler vários arquivos de log simultaneamente:

package main

import (
    "fmt"
    "io"
    "os"
    "sync"
)

func readFile(filename string, wg *sync.WaitGroup, results chan<- string) {
    defer wg.Done()

    file, err := os.Open(filename)
    if err != nil {
        results <- fmt.Sprintf("Erro ao abrir %s: %v", filename, err)
        return
    }
    defer file.Close()

    data, err := io.ReadAll(file)
    if err != nil {
        results <- fmt.Sprintf("Erro ao ler %s: %v", filename, err)
        return
    }

    results <- fmt.Sprintf("Arquivo %s: %d bytes lidos", filename, len(data))
}

func main() {
    files := []string{"log1.txt", "log2.txt", "log3.txt", "log4.txt"}
    results := make(chan string, len(files))
    var wg sync.WaitGroup

    for _, file := range files {
        wg.Add(1)
        go readFile(file, &wg, results)
    }

    wg.Wait()
    close(results)

    for result := range results {
        fmt.Println(result)
    }
}

3. Escrevendo com Concorrência e Sincronização

Escrita concorrente requer cuidado. Usando sync.Mutex para proteger escritas concorrentes em um mesmo destino:

type SafeWriter struct {
    mu     sync.Mutex
    writer io.Writer
}

func (sw *SafeWriter) Write(p []byte) (n int, err error) {
    sw.mu.Lock()
    defer sw.mu.Unlock()
    return sw.writer.Write(p)
}

func writeConcurrent(data []string, writer io.Writer) {
    safeWriter := &SafeWriter{writer: writer}
    var wg sync.WaitGroup

    for _, line := range data {
        wg.Add(1)
        go func(l string) {
            defer wg.Done()
            safeWriter.Write([]byte(l + "\n"))
        }(line)
    }

    wg.Wait()
}

Para enviar dados para múltiplos destinos simultaneamente, io.MultiWriter é útil:

func broadcastToWriters(data []byte, writers ...io.Writer) {
    multiWriter := io.MultiWriter(writers...)
    multiWriter.Write(data)
}

4. Streams Concorrentes com io.Pipe

O pacote io.Pipe cria um pipeline síncrono entre goroutines. io.PipeReader e io.PipeWriter permitem conectar produtores e consumidores concorrentemente:

func compressAndEncrypt(data []byte) ([]byte, error) {
    pr, pw := io.Pipe()
    defer pr.Close()

    var result bytes.Buffer

    go func() {
        defer pw.Close()
        // Simula compressão e criptografia
        compressed := compressData(data)
        pw.Write(compressed)
    }()

    io.Copy(&result, pr)
    return result.Bytes(), nil
}

func processStreams() {
    pr, pw := io.Pipe()

    // Produtor
    go func() {
        defer pw.Close()
        for i := 0; i < 10; i++ {
            pw.Write([]byte(fmt.Sprintf("pacote %d\n", i)))
            time.Sleep(100 * time.Millisecond)
        }
    }()

    // Consumidor
    scanner := bufio.NewScanner(pr)
    for scanner.Scan() {
        fmt.Println("Recebido:", scanner.Text())
    }
}

5. Limitação de Concorrência com Workers

Controlar o número de operações I/O simultâneas evita sobrecarga de recursos:

type WorkerPool struct {
    sem chan struct{}
    wg  sync.WaitGroup
}

func NewWorkerPool(maxWorkers int) *WorkerPool {
    return &WorkerPool{
        sem: make(chan struct{}, maxWorkers),
    }
}

func (wp *WorkerPool) Process(files []string) {
    for _, file := range files {
        wp.sem <- struct{}{} // Adquire slot
        wp.wg.Add(1)

        go func(filename string) {
            defer wp.wg.Done()
            defer func() { <-wp.sem }() // Libera slot

            // Processa arquivo
            data, err := os.ReadFile(filename)
            if err != nil {
                log.Printf("Erro: %v", err)
                return
            }
            processData(data)
        }(file)
    }

    wp.wg.Wait()
}

func main() {
    pool := NewWorkerPool(3) // Máximo 3 operações I/O simultâneas
    files := []string{"file1.dat", "file2.dat", "file3.dat", "file4.dat"}
    pool.Process(files)
}

6. Combinação com io.Copy e io.TeeReader em Paralelo

io.TeeReader permite duplicar um stream para processamento paralelo:

func processWithLogging(reader io.Reader) {
    var logBuffer bytes.Buffer
    var processBuffer bytes.Buffer

    // TeeReader envia dados para ambos os buffers
    teeReader := io.TeeReader(reader, &logBuffer)

    var wg sync.WaitGroup
    wg.Add(2)

    // Goroutine 1: processamento
    go func() {
        defer wg.Done()
        io.Copy(&processBuffer, teeReader)
    }()

    // Goroutine 2: logging (recebe os mesmos dados via TeeReader)
    go func() {
        defer wg.Done()
        io.Copy(os.Stdout, &logBuffer)
    }()

    wg.Wait()
}

io.Copy concorrente para transferência simultânea:

func concurrentCopy(sources []io.Reader, dest io.Writer) {
    var wg sync.WaitGroup
    mu := &sync.Mutex{}

    for _, src := range sources {
        wg.Add(1)
        go func(r io.Reader) {
            defer wg.Done()
            mu.Lock()
            defer mu.Unlock()
            io.Copy(dest, r)
        }(src)
    }

    wg.Wait()
}

7. Tratamento de Erros e Cancelamento em I/O Concorrente

Usando context.Context para cancelamento e timeouts:

func readWithTimeout(ctx context.Context, readers []io.Reader) error {
    errChan := make(chan error, len(readers))

    for i, reader := range readers {
        go func(id int, r io.Reader) {
            data, err := io.ReadAll(r)
            if err != nil {
                errChan <- fmt.Errorf("reader %d: %w", id, err)
                return
            }
            processData(data)
            errChan <- nil
        }(i, reader)
    }

    for range readers {
        select {
        case err := <-errChan:
            if err != nil {
                return err
            }
        case <-ctx.Done():
            return ctx.Err()
        }
    }

    return nil
}

// Uso com timeout de 5 segundos
func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    readers := []io.Reader{/* ... */ }
    if err := readWithTimeout(ctx, readers); err != nil {
        log.Fatal(err)
    }
}

8. Boas Práticas e Padrões Avançados

Evitando Race Conditions:
- Sempre sincronize acesso a io.Writer compartilhados com mutex
- Use canais para comunicação entre goroutines, não memória compartilhada
- Prefira io.Pipe para pipelines síncronos entre goroutines

Quando usar concorrência:
- I/O de rede: quase sempre beneficia-se de concorrência
- I/O de disco: depende do hardware (SSDs se beneficiam mais que HDs)
- Evite concorrência excessiva que cause thrashing de cache ou contenção de lock

Benchmarking:

func BenchmarkConcurrentRead(b *testing.B) {
    for i := 0; i < b.N; i++ {
        // Seu código concorrente aqui
    }
}

Profiling:

go test -bench=. -cpuprofile=cpu.prof
go tool pprof -http=:8080 cpu.prof

A concorrência com o pacote io em Go oferece um modelo elegante e eficiente para operações de I/O. Com goroutines leves, canais para sincronização e as interfaces padronizadas do pacote io, você pode construir sistemas de processamento de dados altamente concorrentes e escaláveis. Lembre-se sempre de balancear concorrência com consumo de recursos e tratar erros adequadamente para criar aplicações robustas.

Referências