Channels assíncronos com Tokio

1. Introdução aos Channels no ecossistema Tokio

Channels são mecanismos de comunicação entre tasks assíncronas que permitem transferir dados de forma segura sem compartilhamento de estado. Em Rust, onde a segurança de memória é garantida em tempo de compilação, os channels do Tokio oferecem uma abstração poderosa para coordenar trabalho concorrente.

O Tokio fornece quatro tipos principais de channels:

  • oneshot: canal de uso único, ideal para enviar exatamente uma mensagem
  • mpsc: múltiplos produtores, único consumidor, com buffer limitado
  • broadcast: um produtor envia para múltiplos consumidores
  • watch: estado compartilhado que notifica mudanças

A escolha do canal depende do padrão de comunicação: resposta única, fluxo de dados, eventos ou estado observável.

2. Oneshot Channel: comunicação única entre tasks

O canal oneshot é o mais simples: envia exatamente um valor de uma task para outra.

use tokio::sync::oneshot;

#[tokio::main]
async fn main() {
    let (tx, rx) = oneshot::channel::<String>();

    tokio::spawn(async move {
        // Simula processamento
        tx.send("Resultado processado".to_string()).unwrap();
    });

    match rx.await {
        Ok(msg) => println!("Recebido: {}", msg),
        Err(e) => println!("Canal fechado: {:?}", e),
    }
}

O método send() retorna Err se o receptor foi descartado. Já recv() retorna Err se o remetente foi descartado sem enviar. Isso permite detectar cancelamentos e falhas.

3. MPSC Channel: múltiplos produtores, único consumidor

O mpsc é ideal para cenários onde várias tasks produzem dados que uma única task consome.

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::channel::<u32>(32);

    // Múltiplos produtores
    for i in 0..3 {
        let tx = tx.clone();
        tokio::spawn(async move {
            for j in 0..5 {
                tx.send(i * 100 + j).await.unwrap();
            }
        });
    }

    // Consumidor único
    while let Some(msg) = rx.recv().await {
        println!("Processando: {}", msg);
    }
}

O buffer limitado cria backpressure natural: se o consumidor for lento, os produtores aguardam em send().await. Isso evita crescimento descontrolado de memória.

4. Broadcast Channel: um para muitos

Broadcast permite que uma mensagem seja recebida por múltiplos receptores.

use tokio::sync::broadcast;

#[tokio::main]
async fn main() {
    let (tx, mut rx1) = broadcast::channel::<String>(16);
    let mut rx2 = tx.subscribe();

    tokio::spawn(async move {
        loop {
            match rx1.recv().await {
                Ok(msg) => println!("Receptor 1: {}", msg),
                Err(broadcast::error::RecvError::Lagged(n)) => {
                    println!("Perdeu {} mensagens", n);
                }
                Err(broadcast::error::RecvError::Closed) => break,
            }
        }
    });

    tokio::spawn(async move {
        loop {
            match rx2.recv().await {
                Ok(msg) => println!("Receptor 2: {}", msg),
                Err(broadcast::error::RecvError::Lagged(n)) => {
                    println!("Perdeu {} mensagens", n);
                }
                Err(broadcast::error::RecvError::Closed) => break,
            }
        }
    });

    tx.send("Evento importante".to_string()).unwrap();
    tx.send("Outro evento".to_string()).unwrap();
    drop(tx); // Fecha o canal
    tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}

O RecvError::Lagged indica que o receptor era mais lento que o produtor e perdeu mensagens. Use resubscribe() para recomeçar do valor mais recente.

5. Watch Channel: estado compartilhado observável

Watch é diferente de broadcast: retém apenas o último valor e notifica mudanças.

use tokio::sync::watch;

#[tokio::main]
async fn main() {
    let (tx, mut rx) = watch::channel::<u32>(0);

    tokio::spawn(async move {
        loop {
            rx.changed().await.unwrap();
            println!("Estado atual: {}", *rx.borrow());
        }
    });

    for i in 1..=5 {
        tx.send(i).unwrap();
        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
    }
}

Ao contrário do broadcast, quem se inscreve tarde recebe imediatamente o último valor via borrow(). É perfeito para configurações dinâmicas ou sinalização de término.

6. Padrões avançados e boas práticas

Combinando channels

use tokio::sync::{mpsc, oneshot};

struct Tarefa {
    dados: String,
    resposta: oneshot::Sender<String>,
}

#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::channel::<Tarefa>(100);

    tokio::spawn(async move {
        while let Some(tarefa) = rx.recv().await {
            let resultado = format!("Processado: {}", tarefa.dados);
            let _ = tarefa.resposta.send(resultado);
        }
    });

    let (resposta_tx, resposta_rx) = oneshot::channel();
    tx.send(Tarefa {
        dados: "dado1".to_string(),
        resposta: resposta_tx,
    }).await.unwrap();

    if let Ok(res) = resposta_rx.await {
        println!("Resposta: {}", res);
    }
}

Multiplexação com select!

use tokio::select;

#[tokio::main]
async fn main() {
    let (tx1, mut rx1) = mpsc::channel::<u32>(32);
    let (tx2, mut rx2) = mpsc::channel::<u32>(32);

    tokio::spawn(async move {
        loop {
            select! {
                msg = rx1.recv() => {
                    if let Some(m) = msg {
                        println!("Canal 1: {}", m);
                    }
                }
                msg = rx2.recv() => {
                    if let Some(m) = msg {
                        println!("Canal 2: {}", m);
                    }
                }
                else => break,
            }
        }
    });

    tx1.send(10).await.unwrap();
    tx2.send(20).await.unwrap();
    drop(tx1);
    drop(tx2);
    tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}

7. Exemplo prático: sistema de processamento de tarefas

use tokio::sync::{mpsc, oneshot, watch};
use std::time::Duration;

#[tokio::main]
async fn main() {
    let (task_tx, task_rx) = mpsc::channel::<String>(100);
    let (shutdown_tx, shutdown_rx) = watch::channel(false);
    let (result_tx, mut result_rx) = mpsc::channel::<String>(100);

    // Workers
    for id in 0..3 {
        let task_rx = task_rx.clone();
        let mut shutdown_rx = shutdown_rx.clone();
        let result_tx = result_tx.clone();

        tokio::spawn(async move {
            loop {
                select! {
                    _ = shutdown_rx.changed() => {
                        if *shutdown_rx.borrow() {
                            println!("Worker {} finalizando", id);
                            break;
                        }
                    }
                    task = task_rx.recv() => {
                        match task {
                            Some(t) => {
                                tokio::time::sleep(Duration::from_millis(100)).await;
                                let _ = result_tx.send(format!("Worker {}: {}", id, t)).await;
                            }
                            None => break,
                        }
                    }
                }
            }
        });
    }

    // Produtor de tarefas
    for i in 0..10 {
        task_tx.send(format!("Tarefa {}", i)).await.unwrap();
    }
    drop(task_tx); // Fecha o canal para workers detectarem fim

    // Coleta resultados
    while let Some(result) = result_rx.recv().await {
        println!("{}", result);
    }

    // Shutdown graceful
    shutdown_tx.send(true).unwrap();
    tokio::time::sleep(Duration::from_millis(200)).await;
}

8. Considerações de desempenho e alternativas

Cada tipo de canal tem overhead diferente:

  • oneshot: mínimo, apenas uma alocação
  • mpsc: moderado, buffer lock-free para produtores
  • broadcast: moderado, cada receptor consome espaço no buffer
  • watch: leve, sem buffer e sem alocação por mensagem

Channels síncronos (std::sync::mpsc) bloqueiam a thread atual, o que é inadequado para código assíncrono. crossbeam-channel oferece canais mais rápidos que os do Tokio, mas sem integração com .await.

Evite channels quando:

  • Tasks compartilham estado mutável frequentemente (prefira Arc<Mutex<T>>)
  • A comunicação é unidirecional e simples (use tokio::task::spawn_blocking para CPU-bound)
  • O volume de mensagens é muito alto e o overhead do canal se torna gargalo

Channels assíncronos do Tokio são ferramentas poderosas para construir sistemas concorrentes seguros e eficientes. A escolha correta do tipo de canal, combinada com boas práticas de tratamento de erros e gerenciamento de recursos, permite criar arquiteturas robustas e escaláveis.

Referências