Multiprocessing: paralelismo real

1. Introdução ao Multiprocessing em Python

Python é famoso por sua simplicidade, mas também por uma limitação histórica: o Global Interpreter Lock (GIL). O GIL impede que múltiplas threads executem bytecode Python simultaneamente, limitando o paralelismo real em programas CPU-bound. É aqui que o módulo multiprocessing entra em cena.

Concorrência vs Paralelismo: concorrência é sobre lidar com múltiplas tarefas ao mesmo tempo (intercalando sua execução), enquanto paralelismo é sobre executar múltiplas tarefas simultaneamente. Threads em Python oferecem concorrência, mas não paralelismo real para código CPU-bound. Processos, por outro lado, contornam o GIL porque cada processo tem seu próprio interpretador Python e espaço de memória.

O módulo multiprocessing permite criar processos filhos, gerenciar pools de workers e implementar comunicação entre processos (IPC) de forma segura e eficiente.

2. Criando e Gerenciando Processos

Process: spawn, start, join e daemon

import multiprocessing
import time

def worker(name, delay):
    print(f"Processo {name} iniciando")
    time.sleep(delay)
    print(f"Processo {name} finalizado")

if __name__ == "__main__":
    # Criando processos
    p1 = multiprocessing.Process(target=worker, args=("A", 2))
    p2 = multiprocessing.Process(target=worker, args=("B", 1))

    # Iniciando
    p1.start()
    p2.start()

    # Aguardando conclusão
    p1.join()
    p2.join()

    print("Todos os processos concluídos")

Pool: map, apply, starmap e map_async

from multiprocessing import Pool

def quadrado(x):
    return x ** 2

def soma(a, b):
    return a + b

if __name__ == "__main__":
    with Pool(processes=4) as pool:
        # map: aplica função a cada item
        resultados = pool.map(quadrado, range(10))
        print(f"Map: {resultados}")

        # apply: executa função com argumentos
        resultado = pool.apply(soma, args=(5, 3))
        print(f"Apply: {resultado}")

        # starmap: para múltiplos argumentos
        pares = [(1, 2), (3, 4), (5, 6)]
        resultados_star = pool.starmap(soma, pares)
        print(f"Starmap: {resultados_star}")

        # map_async: execução assíncrona
        async_result = pool.map_async(quadrado, range(10))
        async_result.wait()  # Aguarda conclusão
        print(f"Map async: {async_result.get()}")

Contextos de inicialização

import multiprocessing as mp

# Contextos disponíveis: spawn, fork, forkserver
ctx = mp.get_context('spawn')  # Compatível com Windows

def funcao():
    print(f"Processo filho: {mp.current_process().name}")

if __name__ == "__main__":
    p = ctx.Process(target=funcao)
    p.start()
    p.join()

3. Comunicação entre Processos (IPC)

Filas e Pipes

from multiprocessing import Process, Queue, Pipe

def produtor(queue, conn):
    for i in range(5):
        queue.put(f"Mensagem {i}")
    conn.send("Pipe: Dado enviado")
    conn.close()

def consumidor(queue, conn):
    while not queue.empty():
        print(f"Queue: {queue.get()}")
    print(f"Pipe: {conn.recv()}")

if __name__ == "__main__":
    queue = Queue()
    parent_conn, child_conn = Pipe()

    p1 = Process(target=produtor, args=(queue, parent_conn))
    p2 = Process(target=consumidor, args=(queue, child_conn))

    p1.start()
    p2.start()

    p1.join()
    p2.join()

Gerenciadores (Manager) e Variáveis Compartilhadas

from multiprocessing import Process, Manager, Value, Array

def modificador(manager_dict, valor_compartilhado, array_compartilhado):
    manager_dict['chave'] = 'valor modificado'
    valor_compartilhado.value += 10
    for i in range(len(array_compartilhado)):
        array_compartilhado[i] *= 2

if __name__ == "__main__":
    with Manager() as manager:
        dicionario = manager.dict()
        dicionario['chave'] = 'valor original'

        valor = Value('i', 0)  # Inteiro
        array = Array('d', [1.0, 2.0, 3.0])  # Array de doubles

        p = Process(target=modificador, args=(dicionario, valor, array))
        p.start()
        p.join()

        print(f"Dict: {dicionario}")
        print(f"Value: {valor.value}")
        print(f"Array: {list(array)}")

4. Sincronização e Concorrência Segura

from multiprocessing import Process, Lock, Semaphore, Event, Barrier
import time

def tarefa_lock(lock, id):
    with lock:
        print(f"Processo {id} na seção crítica")
        time.sleep(1)

def tarefa_semaphore(sem, id):
    with sem:
        print(f"Processo {id} usando recurso limitado")
        time.sleep(1)

def tarefa_barreira(barrier, id):
    print(f"Processo {id} aguardando na barreira")
    barrier.wait()
    print(f"Processo {id} passou da barreira")

if __name__ == "__main__":
    # Lock
    lock = Lock()
    processos = [Process(target=tarefa_lock, args=(lock, i)) for i in range(3)]

    # Semaphore (permite 2 acessos simultâneos)
    sem = Semaphore(2)

    # Barrier (sincroniza 3 processos)
    barrier = Barrier(3)

    for p in processos:
        p.start()
    for p in processos:
        p.join()

5. Pool de Processos e Paralelismo de Dados

from multiprocessing import Pool
import time

def processar_arquivo(nome_arquivo):
    # Simula processamento pesado
    time.sleep(1)
    return f"{nome_arquivo} processado"

if __name__ == "__main__":
    arquivos = [f"arquivo_{i}.txt" for i in range(20)]

    # Pool com chunksize para distribuir carga
    with Pool(processes=4) as pool:
        # imap_unordered: resultados na ordem de conclusão
        resultados = []
        for resultado in pool.imap_unordered(processar_arquivo, arquivos, chunksize=3):
            resultados.append(resultado)
            print(resultado)

        print(f"Total: {len(resultados)} arquivos processados")

6. Multiprocessing com Shared Memory e Serialização

from multiprocessing import shared_memory, Process
import numpy as np

def worker(shm_name, shape):
    shm = shared_memory.SharedMemory(name=shm_name)
    arr = np.ndarray(shape, dtype=np.float64, buffer=shm.buf)
    arr[:] *= 2  # Modifica dados in-place
    shm.close()

if __name__ == "__main__":
    shape = (1000, 1000)
    arr = np.ones(shape, dtype=np.float64)

    shm = shared_memory.SharedMemory(create=True, size=arr.nbytes)
    arr_shared = np.ndarray(shape, dtype=np.float64, buffer=shm.buf)
    arr_shared[:] = arr[:]

    p = Process(target=worker, args=(shm.name, shape))
    p.start()
    p.join()

    print(f"Média após processamento: {arr_shared.mean()}")
    shm.close()
    shm.unlink()

7. Comparação com Threading e Asyncio

import multiprocessing
import threading
import time

def cpu_intensivo(n):
    return sum(i * i for i in range(n))

# Multiprocessing (verdadeiro paralelismo)
def test_multiprocessing():
    with multiprocessing.Pool(4) as pool:
        return pool.map(cpu_intensivo, [10**7] * 4)

# Threading (não paralelo para CPU-bound)
def test_threading():
    with threading.ThreadPoolExecutor(4) as executor:
        return list(executor.map(cpu_intensivo, [10**7] * 4))

if __name__ == "__main__":
    inicio = time.time()
    test_multiprocessing()
    print(f"Multiprocessing: {time.time() - inicio:.2f}s")

    inicio = time.time()
    test_threading()
    print(f"Threading: {time.time() - inicio:.2f}s")

8. Erros Comuns e Depuração

import multiprocessing
import logging

def tarefa_problematica():
    # Erro comum: função lambda não picklable
    return (lambda x: x * 2)(5)

def tarefa_segura():
    return 42

if __name__ == "__main__":
    # Logging para depuração
    multiprocessing.log_to_stderr(logging.DEBUG)

    # Evitando problemas de pickling
    with multiprocessing.Pool(2) as pool:
        # Isso falharia: pool.apply(tarefa_problematica)
        resultado = pool.apply(tarefa_segura)
        print(f"Resultado: {resultado}")

Referências