Projeto final: CLI de alta performance com Tokio e tratamento de erros robusto

1. Visão geral e arquitetura do projeto

Este projeto final consiste em uma CLI (Command Line Interface) em Rust que processa arquivos de forma assíncrona, utilizando concorrência controlada para máxima performance. O objetivo é construir uma ferramenta de linha de comando que leia múltiplos arquivos, execute transformações e gere saídas formatadas — tudo com tratamento de erros robusto e feedback visual ao usuário.

A arquitetura é modular, dividida em:
- Módulo de entrada: parsing de argumentos com clap
- Módulo de processamento: tasks assíncronas com tokio
- Módulo de erros: tipos customizados com thiserror e contexto com anyhow
- Módulo de saída: logs com tracing e progresso com indicatif

Fluxo: entrada do usuário → parsing → execução concorrente → saída formatada.

2. Configuração do ambiente e parsing de argumentos com Clap

Primeiro, adicionamos as dependências no Cargo.toml:

[dependencies]
clap = { version = "4.5", features = ["derive"] }
tokio = { version = "1.40", features = ["full"] }
thiserror = "1.0"
anyhow = "1.0"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
indicatif = "0.17"
colored = "2.1"
tokio-stream = "0.1"

Definimos a estrutura da CLI com clap::Parser:

use clap::{Parser, Subcommand, ValueEnum};

#[derive(Parser)]
#[command(name = "file-processor")]
#[command(about = "Processa arquivos de forma assíncrona e concorrente")]
struct Cli {
    #[command(subcommand)]
    command: Commands,
}

#[derive(Subcommand)]
enum Commands {
    /// Processa arquivos em lote
    Process {
        /// Caminhos dos arquivos a processar
        #[arg(required = true, num_args = 1..)]
        files: Vec<String>,

        /// Número máximo de tarefas concorrentes
        #[arg(short = 'c', long = "concurrency", default_value_t = 4)]
        concurrency: usize,

        /// Modo de saída
        #[arg(short = 'o', long = "output", default_value = "stdout")]
        output: OutputMode,
    },
}

#[derive(ValueEnum, Clone)]
enum OutputMode {
    Stdout,
    File,
}

Validamos os argumentos com um custom validator:

fn validate_concurrency(val: &str) -> Result<usize, String> {
    let n: usize = val.parse().map_err(|_| "Deve ser um número".to_string())?;
    if n < 1 || n > 100 {
        return Err("Concorrência deve estar entre 1 e 100".to_string());
    }
    Ok(n)
}

3. Implementação do runtime Tokio e tarefas assíncronas

A inicialização do runtime é feita com #[tokio::main]:

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    tracing_subscriber::fmt()
        .with_env_filter("info")
        .init();

    let cli = Cli::parse();

    match cli.command {
        Commands::Process { files, concurrency, output } => {
            run_processing(files, concurrency, output).await?;
        }
    }

    Ok(())
}

Usamos um Semaphore para limitar a concorrência:

use tokio::sync::Semaphore;
use std::sync::Arc;

async fn run_processing(
    files: Vec<String>,
    concurrency: usize,
    output: OutputMode,
) -> anyhow::Result<()> {
    let semaphore = Arc::new(Semaphore::new(concurrency));
    let mut handles = vec![];

    for file in files {
        let permit = semaphore.clone().acquire_owned().await?;
        let handle = tokio::spawn(async move {
            let _permit = permit;
            process_file(&file).await
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.await??;
    }

    Ok(())
}

4. Tratamento de erros robusto com thiserror e anyhow

Definimos erros customizados com thiserror:

use thiserror::Error;

#[derive(Error, Debug)]
pub enum ProcessingError {
    #[error("Erro de I/O no arquivo {path}: {source}")]
    IoError {
        path: String,
        #[source]
        source: std::io::Error,
    },
    #[error("Timeout ao processar {path} após {seconds}s")]
    Timeout {
        path: String,
        seconds: u64,
    },
    #[error("Formato inválido no arquivo {path}: {detail}")]
    ParseError {
        path: String,
        detail: String,
    },
}

Usamos anyhow::Context para enriquecer erros:

async fn process_file(path: &str) -> anyhow::Result<String> {
    let content = tokio::fs::read_to_string(path)
        .await
        .with_context(|| format!("Falha ao ler arquivo: {}", path))?;

    // Processamento com timeout
    let result = tokio::time::timeout(
        std::time::Duration::from_secs(30),
        transform_content(&content),
    )
    .await
    .map_err(|_| anyhow::anyhow!("Timeout ao processar {}", path))??;

    Ok(result)
}

Propagação de erros entre tasks:

let handle = tokio::spawn(async move {
    process_file(&file).await
});

match handle.await {
    Ok(Ok(result)) => println!("Sucesso: {}", result),
    Ok(Err(e)) => eprintln!("Erro: {:?}", e),
    Err(join_err) => eprintln!("Task panicked: {}", join_err),
}

5. Operações de I/O assíncronas e streaming de dados

Leitura eficiente com AsyncBufRead:

use tokio::io::AsyncBufReadExt;
use tokio_stream::StreamExt;
use tokio_stream::wrappers::LinesStream;

async fn process_large_file(path: &str) -> anyhow::Result<Vec<String>> {
    let file = tokio::fs::File::open(path).await?;
    let reader = tokio::io::BufReader::new(file);
    let mut lines = LinesStream::new(reader.lines());

    let mut results = Vec::new();
    while let Some(line) = lines.next().await {
        let line = line?;
        let processed = transform_line(&line)?;
        results.push(processed);
    }

    Ok(results)
}

Timeout em operações longas:

async fn safe_process(path: &str) -> anyhow::Result<String> {
    let future = process_file(path);
    match tokio::time::timeout(Duration::from_secs(60), future).await {
        Ok(result) => result,
        Err(_) => Err(anyhow::anyhow!("Operação cancelada por timeout")),
    }
}

6. Logging e feedback ao usuário

Configuração de logs com tracing:

use tracing::{info, warn, error};

async fn run_processing(/* ... */) -> anyhow::Result<()> {
    info!("Iniciando processamento de {} arquivos", files.len());

    for (i, file) in files.iter().enumerate() {
        info!("Processando {}/{}: {}", i + 1, files.len(), file);
        match process_file(file).await {
            Ok(result) => {
                info!("Arquivo {} processado com sucesso", file);
                println!("{}", result.green());
            }
            Err(e) => {
                warn!("Erro ao processar {}: {:?}", file, e);
                eprintln!("{} {}", "Erro:".red(), e);
            }
        }
    }
}

Barra de progresso com indicatif:

use indicatif::{ProgressBar, ProgressStyle};

fn create_progress_bar(total: u64) -> ProgressBar {
    let pb = ProgressBar::new(total);
    pb.set_style(
        ProgressStyle::default_bar()
            .template("{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} ({eta})")
            .unwrap()
            .progress_chars("#>-"),
    );
    pb
}

7. Testes e integração contínua

Testes unitários assíncronos com #[tokio::test]:

#[cfg(test)]
mod tests {
    use super::*;

    #[tokio::test]
    async fn test_process_file_success() {
        let result = process_file("test_data/sample.txt").await;
        assert!(result.is_ok());
    }

    #[tokio::test]
    async fn test_timeout_handling() {
        let result = tokio::time::timeout(
            Duration::from_millis(1),
            process_file("test_data/large.txt"),
        ).await;
        assert!(result.is_err());
    }
}

Testes de integração simulando entrada/saída:

#[tokio::test]
async fn test_cli_integration() {
    let output = std::process::Command::new("cargo")
        .args(&["run", "--", "process", "test.txt"])
        .output()
        .expect("Falha ao executar CLI");
    assert!(output.status.success());
}

Configuração de CI (.github/workflows/ci.yml):

name: CI
on: [push, pull_request]
jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: actions-rs/toolchain@v1
        with:
          toolchain: stable
      - run: cargo test
      - run: cargo clippy -- -D warnings
      - run: cargo audit

8. Otimizações finais e boas práticas

Ajuste de buffer size e pool de threads:

#[tokio::main(flavor = "multi_thread", worker_threads = 8)]
async fn main() {
    // ...
}

async fn read_with_buffer(path: &str) -> anyhow::Result<String> {
    let file = tokio::fs::File::open(path).await?;
    let mut reader = tokio::io::BufReader::with_capacity(64 * 1024, file); // 64KB buffer
    let mut content = String::new();
    reader.read_to_string(&mut content).await?;
    Ok(content)
}

Tratamento de sinais do sistema:

use tokio::signal;

async fn shutdown_signal() {
    let ctrl_c = async {
        signal::ctrl_c()
            .await
            .expect("Falha ao instalar handler de Ctrl+C");
    };

    #[cfg(unix)]
    let terminate = async {
        signal::unix::signal(signal::unix::SignalKind::terminate())
            .expect("Falha ao instalar handler de SIGTERM")
            .recv()
            .await;
    };

    #[cfg(not(unix))]
    let terminate = std::future::pending::<()>();

    tokio::select! {
        _ = ctrl_c => {},
        _ = terminate => {},
    }

    println!("Desligamento solicitado, finalizando tarefas...");
}

Documentação completa do CLI:

$ cargo run -- --help
file-processor 0.1.0
Processa arquivos de forma assíncrona e concorrente

USAGE:
    file-processor <COMMAND>

COMMANDS:
    process    Processa arquivos em lote
    help       Print this message or the help of the given subcommand(s)

$ cargo run -- process --help
Processa arquivos em lote

USAGE:
    file-processor process [OPTIONS] <FILES>...

ARGS:
    <FILES>...    Caminhos dos arquivos a processar

OPTIONS:
    -c, --concurrency <CONCURRENCY>    Número máximo de tarefas concorrentes [default: 4]
    -o, --output <OUTPUT>              Modo de saída [default: stdout] [possible values: stdout, file]
    -h, --help                         Print help

Referências