Channels assíncronos com Tokio
1. Introdução aos Channels no ecossistema Tokio
Channels são mecanismos de comunicação entre tasks assíncronas que permitem transferir dados de forma segura sem compartilhamento de estado. Em Rust, onde a segurança de memória é garantida em tempo de compilação, os channels do Tokio oferecem uma abstração poderosa para coordenar trabalho concorrente.
O Tokio fornece quatro tipos principais de channels:
oneshot: canal de uso único, ideal para enviar exatamente uma mensagemmpsc: múltiplos produtores, único consumidor, com buffer limitadobroadcast: um produtor envia para múltiplos consumidoreswatch: estado compartilhado que notifica mudanças
A escolha do canal depende do padrão de comunicação: resposta única, fluxo de dados, eventos ou estado observável.
2. Oneshot Channel: comunicação única entre tasks
O canal oneshot é o mais simples: envia exatamente um valor de uma task para outra.
use tokio::sync::oneshot;
#[tokio::main]
async fn main() {
let (tx, rx) = oneshot::channel::<String>();
tokio::spawn(async move {
// Simula processamento
tx.send("Resultado processado".to_string()).unwrap();
});
match rx.await {
Ok(msg) => println!("Recebido: {}", msg),
Err(e) => println!("Canal fechado: {:?}", e),
}
}
O método send() retorna Err se o receptor foi descartado. Já recv() retorna Err se o remetente foi descartado sem enviar. Isso permite detectar cancelamentos e falhas.
3. MPSC Channel: múltiplos produtores, único consumidor
O mpsc é ideal para cenários onde várias tasks produzem dados que uma única task consome.
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel::<u32>(32);
// Múltiplos produtores
for i in 0..3 {
let tx = tx.clone();
tokio::spawn(async move {
for j in 0..5 {
tx.send(i * 100 + j).await.unwrap();
}
});
}
// Consumidor único
while let Some(msg) = rx.recv().await {
println!("Processando: {}", msg);
}
}
O buffer limitado cria backpressure natural: se o consumidor for lento, os produtores aguardam em send().await. Isso evita crescimento descontrolado de memória.
4. Broadcast Channel: um para muitos
Broadcast permite que uma mensagem seja recebida por múltiplos receptores.
use tokio::sync::broadcast;
#[tokio::main]
async fn main() {
let (tx, mut rx1) = broadcast::channel::<String>(16);
let mut rx2 = tx.subscribe();
tokio::spawn(async move {
loop {
match rx1.recv().await {
Ok(msg) => println!("Receptor 1: {}", msg),
Err(broadcast::error::RecvError::Lagged(n)) => {
println!("Perdeu {} mensagens", n);
}
Err(broadcast::error::RecvError::Closed) => break,
}
}
});
tokio::spawn(async move {
loop {
match rx2.recv().await {
Ok(msg) => println!("Receptor 2: {}", msg),
Err(broadcast::error::RecvError::Lagged(n)) => {
println!("Perdeu {} mensagens", n);
}
Err(broadcast::error::RecvError::Closed) => break,
}
}
});
tx.send("Evento importante".to_string()).unwrap();
tx.send("Outro evento".to_string()).unwrap();
drop(tx); // Fecha o canal
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
O RecvError::Lagged indica que o receptor era mais lento que o produtor e perdeu mensagens. Use resubscribe() para recomeçar do valor mais recente.
5. Watch Channel: estado compartilhado observável
Watch é diferente de broadcast: retém apenas o último valor e notifica mudanças.
use tokio::sync::watch;
#[tokio::main]
async fn main() {
let (tx, mut rx) = watch::channel::<u32>(0);
tokio::spawn(async move {
loop {
rx.changed().await.unwrap();
println!("Estado atual: {}", *rx.borrow());
}
});
for i in 1..=5 {
tx.send(i).unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
}
Ao contrário do broadcast, quem se inscreve tarde recebe imediatamente o último valor via borrow(). É perfeito para configurações dinâmicas ou sinalização de término.
6. Padrões avançados e boas práticas
Combinando channels
use tokio::sync::{mpsc, oneshot};
struct Tarefa {
dados: String,
resposta: oneshot::Sender<String>,
}
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel::<Tarefa>(100);
tokio::spawn(async move {
while let Some(tarefa) = rx.recv().await {
let resultado = format!("Processado: {}", tarefa.dados);
let _ = tarefa.resposta.send(resultado);
}
});
let (resposta_tx, resposta_rx) = oneshot::channel();
tx.send(Tarefa {
dados: "dado1".to_string(),
resposta: resposta_tx,
}).await.unwrap();
if let Ok(res) = resposta_rx.await {
println!("Resposta: {}", res);
}
}
Multiplexação com select!
use tokio::select;
#[tokio::main]
async fn main() {
let (tx1, mut rx1) = mpsc::channel::<u32>(32);
let (tx2, mut rx2) = mpsc::channel::<u32>(32);
tokio::spawn(async move {
loop {
select! {
msg = rx1.recv() => {
if let Some(m) = msg {
println!("Canal 1: {}", m);
}
}
msg = rx2.recv() => {
if let Some(m) = msg {
println!("Canal 2: {}", m);
}
}
else => break,
}
}
});
tx1.send(10).await.unwrap();
tx2.send(20).await.unwrap();
drop(tx1);
drop(tx2);
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
7. Exemplo prático: sistema de processamento de tarefas
use tokio::sync::{mpsc, oneshot, watch};
use std::time::Duration;
#[tokio::main]
async fn main() {
let (task_tx, task_rx) = mpsc::channel::<String>(100);
let (shutdown_tx, shutdown_rx) = watch::channel(false);
let (result_tx, mut result_rx) = mpsc::channel::<String>(100);
// Workers
for id in 0..3 {
let task_rx = task_rx.clone();
let mut shutdown_rx = shutdown_rx.clone();
let result_tx = result_tx.clone();
tokio::spawn(async move {
loop {
select! {
_ = shutdown_rx.changed() => {
if *shutdown_rx.borrow() {
println!("Worker {} finalizando", id);
break;
}
}
task = task_rx.recv() => {
match task {
Some(t) => {
tokio::time::sleep(Duration::from_millis(100)).await;
let _ = result_tx.send(format!("Worker {}: {}", id, t)).await;
}
None => break,
}
}
}
}
});
}
// Produtor de tarefas
for i in 0..10 {
task_tx.send(format!("Tarefa {}", i)).await.unwrap();
}
drop(task_tx); // Fecha o canal para workers detectarem fim
// Coleta resultados
while let Some(result) = result_rx.recv().await {
println!("{}", result);
}
// Shutdown graceful
shutdown_tx.send(true).unwrap();
tokio::time::sleep(Duration::from_millis(200)).await;
}
8. Considerações de desempenho e alternativas
Cada tipo de canal tem overhead diferente:
oneshot: mínimo, apenas uma alocaçãompsc: moderado, buffer lock-free para produtoresbroadcast: moderado, cada receptor consome espaço no bufferwatch: leve, sem buffer e sem alocação por mensagem
Channels síncronos (std::sync::mpsc) bloqueiam a thread atual, o que é inadequado para código assíncrono. crossbeam-channel oferece canais mais rápidos que os do Tokio, mas sem integração com .await.
Evite channels quando:
- Tasks compartilham estado mutável frequentemente (prefira
Arc<Mutex<T>>) - A comunicação é unidirecional e simples (use
tokio::task::spawn_blockingpara CPU-bound) - O volume de mensagens é muito alto e o overhead do canal se torna gargalo
Channels assíncronos do Tokio são ferramentas poderosas para construir sistemas concorrentes seguros e eficientes. A escolha correta do tipo de canal, combinada com boas práticas de tratamento de erros e gerenciamento de recursos, permite criar arquiteturas robustas e escaláveis.
Referências
- Documentação oficial do Tokio sobre Channels — Referência completa de todos os tipos de channels no Tokio, com exemplos e detalhes de API
- Tokio Tutorial: Channels — Tutorial oficial do Tokio cobrindo conceitos fundamentais de comunicação entre tasks
- Rust Async Book: Channels — Capítulo do livro oficial sobre programação assíncrona em Rust, abordando canais
- Tokio MPSC Channel Deep Dive — Análise detalhada dos internals do MPSC channel do Tokio, incluindo implementação lock-free
- Effective Rust Async: Channel Patterns — Padrões avançados de uso de channels em Rust assíncrono, com exemplos práticos