Como usar o Flink para processamento de eventos em tempo real

O Apache Flink é uma plataforma de processamento de streams distribuída e de código aberto, desenvolvida originalmente na Universidade Técnica de Berlim como parte do projeto Stratosphere. Lançado como projeto de nível superior da Apache Software Foundation em 2015, o Flink se destaca por oferecer processamento de dados em tempo real com baixa latência, alta throughput e garantias de consistência rigorosas.

No contexto de processamento de eventos, três conceitos são fundamentais:

  • Eventos: unidades atômicas de dados que representam algo que ocorreu em um sistema (uma transação, um clique, uma leitura de sensor).
  • Streams: sequências ordenadas e potencialmente infinitas de eventos.
  • Tempo de evento vs tempo de processamento: o tempo de evento é quando o fato realmente ocorreu (timestamp embutido no dado), enquanto o tempo de processamento é quando o Flink processa o evento. O Flink oferece suporte nativo a ambos, sendo o tempo de evento essencial para aplicações que exigem ordenação correta.

Comparado a outras ferramentas, o Flink oferece vantagens significativas: ao contrário do Spark Streaming (que opera em micro-batches), o Flink processa evento por evento, resultando em latências menores. Diferente do Kafka Streams (biblioteca embutida no Kafka), o Flink possui gerenciamento de estado mais sofisticado e suporte a windowing complexo. Em relação ao Storm, o Flink oferece garantias exactly-once mais robustas e API mais expressiva.

A arquitetura do Flink é composta por três componentes principais:

  • JobManager: coordena a execução distribuída, gerencia checkpoints e recuperação de falhas.
  • TaskManager: executa as tarefas (operators) em slots de tarefas (task slots).
  • Slots de tarefas: unidades de paralelismo que determinam quantos sub-tasks podem executar concorrentemente.

O modelo de paralelismo permite que operadores sejam distribuídos entre múltiplos TaskManagers, com cada operador podendo ter seu próprio grau de paralelismo. O Flink suporta três garantias de processamento: at-most-once, at-least-once e exactly-once, sendo esta última alcançada através de checkpointing baseado em barreiras distribuídas (algoritmo de Chandy-Lamport).

3. Configuração do ambiente e primeiros passos

Para iniciar, baixe o Flink em flink.apache.org e extraia o arquivo. Para modo standalone local:

# Iniciar cluster Flink local
./bin/start-cluster.sh

# Verificar se está rodando
curl http://localhost:8081

A estrutura básica de um job Flink inclui ambiente de execução, fonte de dados, transformações e sumidouro. Exemplo prático com socket TCP:

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

object SocketWordCount {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val text = env.socketTextStream("localhost", 9999)

    val counts = text
      .flatMap(_.toLowerCase.split("\\W+"))
      .filter(_.nonEmpty)
      .map((_, 1))
      .keyBy(0)
      .timeWindow(Time.seconds(5))
      .sum(1)

    counts.print()

    env.execute("Socket Word Count")
  }
}

Execute com nc -lk 9999 em outro terminal e envie palavras para testar.

4. Fontes de dados (Sources) e sumidouros (Sinks) em tempo real

O Flink oferece conectores integrados para diversas fontes:

Fontes comuns:
- Kafka: FlinkKafkaConsumer para ler tópicos em tempo real
- Kinesis: FlinkKinesisConsumer para AWS
- RabbitMQ: RMQSource para filas AMQP
- Sockets: socketTextStream para testes
- Arquivos: readTextFile para processamento de arquivos (modo streaming ou batch)

Sumidouros comuns:
- Kafka: FlinkKafkaProducer para escrever em tópicos
- JDBC: JDBCOutputFormat para bancos relacionais
- Elasticsearch: ElasticsearchSink para indexação
- File systems: StreamingFileSink para HDFS, S3, etc.

Exemplo de fonte Kafka com sumidouro Elasticsearch:

val kafkaSource = new FlinkKafkaConsumer[String](
  "eventos",
  new SimpleStringSchema(),
  properties
)

val esSink = new ElasticsearchSink.Builder[String](
  httpHosts,
  new ElasticsearchSinkFunction[String] {
    def process(element: String, ctx: RuntimeContext, indexer: RequestIndexer) {
      indexer.add(createIndexRequest(element))
    }
  }
).build()

env.addSource(kafkaSource)
   .addSink(esSink)

5. Processamento de streams: transformações e operações

Operações básicas:

val stream = env.fromElements(1, 2, 3, 4, 5)

// map: transforma cada elemento
val dobrados = stream.map(_ * 2)

// flatMap: zero ou mais saídas por entrada
val palavras = stream.flatMap { n => 
  if (n % 2 == 0) Seq(s"par_$n") else Seq() 
}

// filter: seleciona elementos
val pares = stream.filter(_ % 2 == 0)

// keyBy + reduce: agregação por chave
val streamChaveado = stream.keyBy(_ % 2).reduce(_ + _)

Operações baseadas em tempo:

Windowing é crucial para processamento de eventos. Três tipos principais:

  • Tumbling window: janelas fixas sem sobreposição
  • Sliding window: janelas com sobreposição
  • Session window: janelas baseadas em gaps de inatividade
// Tumbling window de 1 minuto
stream
  .keyBy(event => event.userId)
  .window(TumblingEventTimeWindows.of(Time.minutes(1)))
  .aggregate(new CountAggregate())

// Watermarking para eventos atrasados
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val watermarkStrategy = WatermarkStrategy
  .forBoundedOutOfOrderness[Event](Duration.ofSeconds(10))
  .withTimestampAssigner((event, _) => event.timestamp)

stream
  .assignTimestampsAndWatermarks(watermarkStrategy)
  .keyBy(_.userId)
  .window(TumblingEventTimeWindows.of(Time.minutes(5)))
  .allowedLateness(Time.minutes(1))
  .sideOutputLateData(lateOutputTag)

6. Gerenciamento de estado e tolerância a falhas

O Flink gerencia estado de forma eficiente com tipos especializados:

  • ValueState: estado simples de valor único
  • ListState: lista de valores
  • MapState: mapa chave-valor
  • State TTL: expiração automática de estado
class MeuProcessFunction extends KeyedProcessFunction[String, Event, Result] {
  private var estado: ValueState[Long] = _

  override def open(parameters: Configuration): Unit = {
    val desc = new ValueStateDescriptor[Long](
      "contador",
      TypeInformation.of(classOf[Long])
    )
    desc.enableTimeToLive(StateTtlConfig
      .newBuilder(Time.hours(1))
      .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
      .build())
    estado = getRuntimeContext.getState(desc)
  }

  override def processElement(
    value: Event,
    ctx: KeyedProcessFunction[String, Event, Result]#Context,
    out: Collector[Result]
  ): Unit = {
    val atual = Option(estado.value()).getOrElse(0L) + 1
    estado.update(atual)
    out.collect(Result(value.key, atual))
  }
}

Checkpointing é configurado no ambiente:

env.enableCheckpointing(5000) // checkpoint a cada 5 segundos
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
env.getCheckpointConfig.setCheckpointTimeout(60000)
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)

7. Otimização de desempenho e boas práticas

Ajustes de configuração:

  • Paralelismo: env.setParallelism(n) ou por operador
  • Buffer timeout: env.setBufferTimeout(100) (milissegundos)
  • Rede: ajustar taskmanager.network.memory para evitar backpressure

Monitoramento: a Flink Web UI (porta 8081) mostra métricas como throughput, latência, backpressure e uso de memória. Para métricas customizadas:

val contador = getRuntimeContext.getMetricGroup.counter("eventos_processados")
contador.inc()

Boas práticas:
- Use ValueState em vez de variáveis mutáveis para consistência
- Configure watermarks adequadamente para evitar perda de dados
- Teste com env.fromElements() antes de conectar fontes reais
- Utilize savepoints para atualizações de jobs sem perda de estado

8. Casos de uso reais e integração com ecossistema

Detecção de fraudes em tempo real:

val transacoes = env.addSource(kafkaConsumer)

transacoes
  .keyBy(_.contaId)
  .process(new FraudDetector())
  .addSink(alertaSink)

class FraudDetector extends KeyedProcessFunction[String, Transacao, Alerta] {
  private var ultimasTransacoes: ListState[Transacao] = _

  override def processElement(
    transacao: Transacao,
    ctx: Context,
    out: Collector[Alerta]
  ): Unit = {
    val historico = ultimasTransacoes.get().asScala.toList
    if (historico.size >= 3 && 
        historico.last.valor > 10000 && 
        transacao.valor > 50000) {
      out.collect(Alerta(transacao.contaId, "Movimentação suspeita"))
    }
    ultimasTransacoes.add(transacao)
  }
}

Monitoramento de infraestrutura: pipelines que consomem métricas de CPU, memória e disco de servidores, aplicam janelas para calcular médias móveis e disparam alertas no Grafana via Elasticsearch sink.

Integrações comuns:
- Kafka para ingestão e saída de eventos
- Hadoop/S3 para armazenamento de checkpoints
- Grafana + Elasticsearch para visualização de métricas
- Prometheus para monitoramento do próprio Flink


Referências