Concorrência com o pacote io
1. Introdução à Concorrência com I/O em Go
O modelo de concorrência de Go, baseado em goroutines e canais, é particularmente poderoso quando combinado com operações de I/O. O pacote io fornece interfaces fundamentais como Reader e Writer que são naturalmente compatíveis com esse modelo.
Operações de I/O são inerentemente bloqueantes — ler de um arquivo ou socket exige espera por dados. Em linguagens tradicionais, isso significa bloquear uma thread do sistema operacional. Em Go, uma goroutine bloqueada em I/O não bloqueia a thread do SO, permitindo que milhares de operações concorrentes coexistam eficientemente.
A interface io.Reader define o método Read(p []byte) (n int, err error), enquanto io.Writer define Write(p []byte) (n int, err error). Essas interfaces são implementadas por arquivos, conexões de rede, buffers e muitos outros tipos, tornando-as ideais para padrões concorrentes.
2. Lendo Múltiplos Arquivos Concorrentemente
Um caso clássico é processar múltiplos arquivos em paralelo. Vamos ler vários arquivos de log simultaneamente:
package main
import (
"fmt"
"io"
"os"
"sync"
)
func readFile(filename string, wg *sync.WaitGroup, results chan<- string) {
defer wg.Done()
file, err := os.Open(filename)
if err != nil {
results <- fmt.Sprintf("Erro ao abrir %s: %v", filename, err)
return
}
defer file.Close()
data, err := io.ReadAll(file)
if err != nil {
results <- fmt.Sprintf("Erro ao ler %s: %v", filename, err)
return
}
results <- fmt.Sprintf("Arquivo %s: %d bytes lidos", filename, len(data))
}
func main() {
files := []string{"log1.txt", "log2.txt", "log3.txt", "log4.txt"}
results := make(chan string, len(files))
var wg sync.WaitGroup
for _, file := range files {
wg.Add(1)
go readFile(file, &wg, results)
}
wg.Wait()
close(results)
for result := range results {
fmt.Println(result)
}
}
3. Escrevendo com Concorrência e Sincronização
Escrita concorrente requer cuidado. Usando sync.Mutex para proteger escritas concorrentes em um mesmo destino:
type SafeWriter struct {
mu sync.Mutex
writer io.Writer
}
func (sw *SafeWriter) Write(p []byte) (n int, err error) {
sw.mu.Lock()
defer sw.mu.Unlock()
return sw.writer.Write(p)
}
func writeConcurrent(data []string, writer io.Writer) {
safeWriter := &SafeWriter{writer: writer}
var wg sync.WaitGroup
for _, line := range data {
wg.Add(1)
go func(l string) {
defer wg.Done()
safeWriter.Write([]byte(l + "\n"))
}(line)
}
wg.Wait()
}
Para enviar dados para múltiplos destinos simultaneamente, io.MultiWriter é útil:
func broadcastToWriters(data []byte, writers ...io.Writer) {
multiWriter := io.MultiWriter(writers...)
multiWriter.Write(data)
}
4. Streams Concorrentes com io.Pipe
O pacote io.Pipe cria um pipeline síncrono entre goroutines. io.PipeReader e io.PipeWriter permitem conectar produtores e consumidores concorrentemente:
func compressAndEncrypt(data []byte) ([]byte, error) {
pr, pw := io.Pipe()
defer pr.Close()
var result bytes.Buffer
go func() {
defer pw.Close()
// Simula compressão e criptografia
compressed := compressData(data)
pw.Write(compressed)
}()
io.Copy(&result, pr)
return result.Bytes(), nil
}
func processStreams() {
pr, pw := io.Pipe()
// Produtor
go func() {
defer pw.Close()
for i := 0; i < 10; i++ {
pw.Write([]byte(fmt.Sprintf("pacote %d\n", i)))
time.Sleep(100 * time.Millisecond)
}
}()
// Consumidor
scanner := bufio.NewScanner(pr)
for scanner.Scan() {
fmt.Println("Recebido:", scanner.Text())
}
}
5. Limitação de Concorrência com Workers
Controlar o número de operações I/O simultâneas evita sobrecarga de recursos:
type WorkerPool struct {
sem chan struct{}
wg sync.WaitGroup
}
func NewWorkerPool(maxWorkers int) *WorkerPool {
return &WorkerPool{
sem: make(chan struct{}, maxWorkers),
}
}
func (wp *WorkerPool) Process(files []string) {
for _, file := range files {
wp.sem <- struct{}{} // Adquire slot
wp.wg.Add(1)
go func(filename string) {
defer wp.wg.Done()
defer func() { <-wp.sem }() // Libera slot
// Processa arquivo
data, err := os.ReadFile(filename)
if err != nil {
log.Printf("Erro: %v", err)
return
}
processData(data)
}(file)
}
wp.wg.Wait()
}
func main() {
pool := NewWorkerPool(3) // Máximo 3 operações I/O simultâneas
files := []string{"file1.dat", "file2.dat", "file3.dat", "file4.dat"}
pool.Process(files)
}
6. Combinação com io.Copy e io.TeeReader em Paralelo
io.TeeReader permite duplicar um stream para processamento paralelo:
func processWithLogging(reader io.Reader) {
var logBuffer bytes.Buffer
var processBuffer bytes.Buffer
// TeeReader envia dados para ambos os buffers
teeReader := io.TeeReader(reader, &logBuffer)
var wg sync.WaitGroup
wg.Add(2)
// Goroutine 1: processamento
go func() {
defer wg.Done()
io.Copy(&processBuffer, teeReader)
}()
// Goroutine 2: logging (recebe os mesmos dados via TeeReader)
go func() {
defer wg.Done()
io.Copy(os.Stdout, &logBuffer)
}()
wg.Wait()
}
io.Copy concorrente para transferência simultânea:
func concurrentCopy(sources []io.Reader, dest io.Writer) {
var wg sync.WaitGroup
mu := &sync.Mutex{}
for _, src := range sources {
wg.Add(1)
go func(r io.Reader) {
defer wg.Done()
mu.Lock()
defer mu.Unlock()
io.Copy(dest, r)
}(src)
}
wg.Wait()
}
7. Tratamento de Erros e Cancelamento em I/O Concorrente
Usando context.Context para cancelamento e timeouts:
func readWithTimeout(ctx context.Context, readers []io.Reader) error {
errChan := make(chan error, len(readers))
for i, reader := range readers {
go func(id int, r io.Reader) {
data, err := io.ReadAll(r)
if err != nil {
errChan <- fmt.Errorf("reader %d: %w", id, err)
return
}
processData(data)
errChan <- nil
}(i, reader)
}
for range readers {
select {
case err := <-errChan:
if err != nil {
return err
}
case <-ctx.Done():
return ctx.Err()
}
}
return nil
}
// Uso com timeout de 5 segundos
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
readers := []io.Reader{/* ... */ }
if err := readWithTimeout(ctx, readers); err != nil {
log.Fatal(err)
}
}
8. Boas Práticas e Padrões Avançados
Evitando Race Conditions:
- Sempre sincronize acesso a io.Writer compartilhados com mutex
- Use canais para comunicação entre goroutines, não memória compartilhada
- Prefira io.Pipe para pipelines síncronos entre goroutines
Quando usar concorrência:
- I/O de rede: quase sempre beneficia-se de concorrência
- I/O de disco: depende do hardware (SSDs se beneficiam mais que HDs)
- Evite concorrência excessiva que cause thrashing de cache ou contenção de lock
Benchmarking:
func BenchmarkConcurrentRead(b *testing.B) {
for i := 0; i < b.N; i++ {
// Seu código concorrente aqui
}
}
Profiling:
go test -bench=. -cpuprofile=cpu.prof
go tool pprof -http=:8080 cpu.prof
A concorrência com o pacote io em Go oferece um modelo elegante e eficiente para operações de I/O. Com goroutines leves, canais para sincronização e as interfaces padronizadas do pacote io, você pode construir sistemas de processamento de dados altamente concorrentes e escaláveis. Lembre-se sempre de balancear concorrência com consumo de recursos e tratar erros adequadamente para criar aplicações robustas.
Referências
- Documentação oficial do pacote io — Referência completa das interfaces Reader, Writer e funções auxiliares como Pipe, MultiWriter e TeeReader
- Go Concurrency Patterns: Pipelines and cancellation — Artigo oficial da equipe Go sobre padrões de pipeline com canais e cancelamento
- Advanced Go Concurrency Patterns — Palestra de Sameer Ajmani sobre padrões avançados de concorrência em Go
- Understanding io.Pipe in Go — Tutorial prático explicando o uso de io.Pipe para criar pipelines entre goroutines
- Go: Concurrency and the io package — Artigo técnico sobre combinação de concorrência com operações de I/O em Go
- Context package documentation — Documentação oficial do pacote context para cancelamento e timeouts em operações concorrentes
- Go Race Detector — Guia oficial sobre detecção de race conditions em programas Go concorrentes