Comunicação entre processos e filas

1. Introdução à Comunicação entre Processos (IPC)

Em Python, cada processo possui seu próprio espaço de memória isolado, o que significa que variáveis e objetos não são compartilhados entre processos por padrão. Essa característica, embora traga segurança e estabilidade, cria a necessidade de mecanismos de Comunicação entre Processos (IPC) para que processos possam trocar dados e coordenar suas ações.

A comunicação entre processos é essencial quando:
- Precisamos distribuir tarefas pesadas entre múltiplos núcleos da CPU
- Desejamos construir sistemas produtor-consumidor
- Requeremos processamento paralelo com troca de resultados intermediários

O módulo multiprocessing do Python oferece diversos mecanismos de IPC, sendo as filas (Queue) e os pipes (Pipe) os mais comuns. Diferente de threading, onde o GIL (Global Interpreter Lock) limita a execução paralela em CPU-bound tasks, o multiprocessamento contorna essa limitação criando processos independentes.

2. O Módulo multiprocessing: Pilares da Comunicação

O módulo multiprocessing fornece APIs similares ao threading, mas com processos ao invés de threads. Vamos explorar seus componentes principais:

from multiprocessing import Process, Value, Array, Manager

# Compartilhamento de estado simples com Value e Array
contador = Value('i', 0)  # Inteiro compartilhado
numeros = Array('d', [1.0, 2.0, 3.0])  # Array de doubles

# Gerenciador para objetos complexos
manager = Manager()
dicionario_compartilhado = manager.dict()
lista_compartilhada = manager.list()

3. Filas (Queues) para Troca de Mensagens

As filas são o mecanismo mais seguro e flexível para comunicação entre processos. O módulo multiprocessing oferece a classe Queue, que é thread-safe e process-safe.

Queue Básica

from multiprocessing import Process, Queue

def produtor(fila):
    for i in range(5):
        fila.put(f"Item {i}")
    fila.put(None)  # Sinal de término

def consumidor(fila):
    while True:
        item = fila.get()
        if item is None:
            break
        print(f"Processado: {item}")

if __name__ == "__main__":
    fila = Queue()
    p1 = Process(target=produtor, args=(fila,))
    p2 = Process(target=consumidor, args=(fila,))

    p1.start()
    p2.start()
    p1.join()
    p2.join()

JoinableQueue para Sincronização

A JoinableQueue estende a Queue adicionando métodos task_done() e join() para sincronização:

from multiprocessing import Process, JoinableQueue

def worker(fila):
    while True:
        tarefa = fila.get()
        if tarefa is None:
            fila.task_done()
            break
        resultado = tarefa ** 2
        print(f"Resultado: {resultado}")
        fila.task_done()

if __name__ == "__main__":
    fila = JoinableQueue()

    # Iniciar workers
    workers = [Process(target=worker, args=(fila,)) for _ in range(3)]
    for w in workers:
        w.start()

    # Alimentar tarefas
    for i in range(10):
        fila.put(i)

    # Sinais de término
    for _ in workers:
        fila.put(None)

    fila.join()  # Aguarda todas as tarefas serem processadas

Comparação entre Tipos de Filas

  • multiprocessing.Queue: Thread-safe e process-safe, suporta objetos serializáveis
  • queue.Queue: Apenas thread-safe, não funciona entre processos
  • SimpleQueue: Mais leve que Queue, sem funcionalidades extras

4. Pipes e Conexões Bidirecionais

Pipes oferecem comunicação direta entre dois processos, sendo mais rápidos que filas para comunicação ponto a ponto.

from multiprocessing import Process, Pipe

def processador(conn):
    """Processo filho que recebe e envia dados"""
    dados = conn.recv()
    print(f"Recebido: {dados}")
    conn.send(f"Processado: {dados * 2}")
    conn.close()

if __name__ == "__main__":
    # Pipe() retorna duas extremidades: (parent_conn, child_conn)
    parent_conn, child_conn = Pipe()

    p = Process(target=processador, args=(child_conn,))
    p.start()

    parent_conn.send(42)
    resultado = parent_conn.recv()
    print(f"Resultado final: {resultado}")

    p.join()

5. Sincronização e Controle de Concorrência

Para evitar condições de corrida e garantir acesso seguro a recursos compartilhados, o módulo multiprocessing oferece primitivas de sincronização:

from multiprocessing import Process, Lock, Semaphore, Event, Barrier

# Lock para exclusão mútua
lock = Lock()

def tarefa_segura(lock, recurso):
    with lock:
        # Acesso exclusivo ao recurso
        recurso['valor'] += 1

# Semáforo para controlar acesso a recursos limitados
semaforo = Semaphore(2)  # Permite 2 acessos simultâneos

# Evento para sincronização
evento = Event()

def esperar_evento(evento):
    print("Aguardando evento...")
    evento.wait()
    print("Evento recebido!")

# Barreira para sincronizar múltiplos processos
barreira = Barrier(3)  # 3 processos devem chegar antes de prosseguir

6. Padrões Práticos com Filas e Processos

Padrão Produtor-Consumidor Avançado

from multiprocessing import Process, Queue
import time
import random

def produtor(fila, id_produtor):
    for i in range(3):
        item = f"P{id_produtor}-Item{i}"
        fila.put(item)
        print(f"Produzido: {item}")
        time.sleep(random.random())

def consumidor(fila, id_consumidor):
    while True:
        item = fila.get()
        if item is "FIM":
            fila.put("FIM")  # Repassar para outros consumidores
            break
        print(f"Consumidor {id_consumidor} processou: {item}")
        time.sleep(random.random())

if __name__ == "__main__":
    fila = Queue(maxsize=5)

    produtores = [Process(target=produtor, args=(fila, i)) for i in range(2)]
    consumidores = [Process(target=consumidor, args=(fila, i)) for i in range(3)]

    for p in produtores:
        p.start()
    for c in consumidores:
        c.start()

    for p in produtores:
        p.join()

    fila.put("FIM")  # Sinal para consumidores

    for c in consumidores:
        c.join()

Pool de Workers com Fila de Resultados

from multiprocessing import Process, Queue
import time

def worker(tarefas, resultados):
    while True:
        tarefa = tarefas.get()
        if tarefa is None:
            break
        resultado = tarefa ** 2
        resultados.put(resultado)
        time.sleep(0.1)

if __name__ == "__main__":
    tarefas = Queue()
    resultados = Queue()

    # Criar workers
    workers = [Process(target=worker, args=(tarefas, resultados)) 
               for _ in range(4)]

    for w in workers:
        w.start()

    # Enviar tarefas
    for i in range(20):
        tarefas.put(i)

    # Sinal de término
    for _ in workers:
        tarefas.put(None)

    # Coletar resultados
    for _ in range(20):
        resultado = resultados.get()
        print(f"Resultado coletado: {resultado}")

    for w in workers:
        w.join()

7. Boas Práticas e Armadilhas Comuns

Deadlocks em Filas

# Evite deadlocks definindo timeout
try:
    item = fila.get(timeout=1)
except:
    print("Timeout ao aguardar item")

# Sempre defina maxsize para filas longas
fila = Queue(maxsize=100)

Gerenciamento de Recursos

from multiprocessing import Process, Queue
import atexit

def cleanup(fila, processos):
    """Limpeza segura de recursos"""
    for _ in processos:
        fila.put(None)  # Sinal para workers pararem
    for p in processos:
        p.join(timeout=2)
        if p.is_alive():
            p.terminate()

# Use context managers ou try/finally para garantir limpeza

Dicas Importantes

  • Serialização: Objetos enviados via filas/pipe são serializados com pickle; classes complexas precisam ser serializáveis
  • Memória: Filas armazenam objetos em memória; evite itens muito grandes
  • Debugging: Use multiprocessing.log_to_stderr() para logs detalhados
  • Testes: Isole o código IPC para facilitar testes unitários

Referências