Multiprocessing: paralelismo real
1. Introdução ao Multiprocessing em Python
Python é famoso por sua simplicidade, mas também por uma limitação histórica: o Global Interpreter Lock (GIL). O GIL impede que múltiplas threads executem bytecode Python simultaneamente, limitando o paralelismo real em programas CPU-bound. É aqui que o módulo multiprocessing entra em cena.
Concorrência vs Paralelismo: concorrência é sobre lidar com múltiplas tarefas ao mesmo tempo (intercalando sua execução), enquanto paralelismo é sobre executar múltiplas tarefas simultaneamente. Threads em Python oferecem concorrência, mas não paralelismo real para código CPU-bound. Processos, por outro lado, contornam o GIL porque cada processo tem seu próprio interpretador Python e espaço de memória.
O módulo multiprocessing permite criar processos filhos, gerenciar pools de workers e implementar comunicação entre processos (IPC) de forma segura e eficiente.
2. Criando e Gerenciando Processos
Process: spawn, start, join e daemon
import multiprocessing
import time
def worker(name, delay):
print(f"Processo {name} iniciando")
time.sleep(delay)
print(f"Processo {name} finalizado")
if __name__ == "__main__":
# Criando processos
p1 = multiprocessing.Process(target=worker, args=("A", 2))
p2 = multiprocessing.Process(target=worker, args=("B", 1))
# Iniciando
p1.start()
p2.start()
# Aguardando conclusão
p1.join()
p2.join()
print("Todos os processos concluídos")
Pool: map, apply, starmap e map_async
from multiprocessing import Pool
def quadrado(x):
return x ** 2
def soma(a, b):
return a + b
if __name__ == "__main__":
with Pool(processes=4) as pool:
# map: aplica função a cada item
resultados = pool.map(quadrado, range(10))
print(f"Map: {resultados}")
# apply: executa função com argumentos
resultado = pool.apply(soma, args=(5, 3))
print(f"Apply: {resultado}")
# starmap: para múltiplos argumentos
pares = [(1, 2), (3, 4), (5, 6)]
resultados_star = pool.starmap(soma, pares)
print(f"Starmap: {resultados_star}")
# map_async: execução assíncrona
async_result = pool.map_async(quadrado, range(10))
async_result.wait() # Aguarda conclusão
print(f"Map async: {async_result.get()}")
Contextos de inicialização
import multiprocessing as mp
# Contextos disponíveis: spawn, fork, forkserver
ctx = mp.get_context('spawn') # Compatível com Windows
def funcao():
print(f"Processo filho: {mp.current_process().name}")
if __name__ == "__main__":
p = ctx.Process(target=funcao)
p.start()
p.join()
3. Comunicação entre Processos (IPC)
Filas e Pipes
from multiprocessing import Process, Queue, Pipe
def produtor(queue, conn):
for i in range(5):
queue.put(f"Mensagem {i}")
conn.send("Pipe: Dado enviado")
conn.close()
def consumidor(queue, conn):
while not queue.empty():
print(f"Queue: {queue.get()}")
print(f"Pipe: {conn.recv()}")
if __name__ == "__main__":
queue = Queue()
parent_conn, child_conn = Pipe()
p1 = Process(target=produtor, args=(queue, parent_conn))
p2 = Process(target=consumidor, args=(queue, child_conn))
p1.start()
p2.start()
p1.join()
p2.join()
Gerenciadores (Manager) e Variáveis Compartilhadas
from multiprocessing import Process, Manager, Value, Array
def modificador(manager_dict, valor_compartilhado, array_compartilhado):
manager_dict['chave'] = 'valor modificado'
valor_compartilhado.value += 10
for i in range(len(array_compartilhado)):
array_compartilhado[i] *= 2
if __name__ == "__main__":
with Manager() as manager:
dicionario = manager.dict()
dicionario['chave'] = 'valor original'
valor = Value('i', 0) # Inteiro
array = Array('d', [1.0, 2.0, 3.0]) # Array de doubles
p = Process(target=modificador, args=(dicionario, valor, array))
p.start()
p.join()
print(f"Dict: {dicionario}")
print(f"Value: {valor.value}")
print(f"Array: {list(array)}")
4. Sincronização e Concorrência Segura
from multiprocessing import Process, Lock, Semaphore, Event, Barrier
import time
def tarefa_lock(lock, id):
with lock:
print(f"Processo {id} na seção crítica")
time.sleep(1)
def tarefa_semaphore(sem, id):
with sem:
print(f"Processo {id} usando recurso limitado")
time.sleep(1)
def tarefa_barreira(barrier, id):
print(f"Processo {id} aguardando na barreira")
barrier.wait()
print(f"Processo {id} passou da barreira")
if __name__ == "__main__":
# Lock
lock = Lock()
processos = [Process(target=tarefa_lock, args=(lock, i)) for i in range(3)]
# Semaphore (permite 2 acessos simultâneos)
sem = Semaphore(2)
# Barrier (sincroniza 3 processos)
barrier = Barrier(3)
for p in processos:
p.start()
for p in processos:
p.join()
5. Pool de Processos e Paralelismo de Dados
from multiprocessing import Pool
import time
def processar_arquivo(nome_arquivo):
# Simula processamento pesado
time.sleep(1)
return f"{nome_arquivo} processado"
if __name__ == "__main__":
arquivos = [f"arquivo_{i}.txt" for i in range(20)]
# Pool com chunksize para distribuir carga
with Pool(processes=4) as pool:
# imap_unordered: resultados na ordem de conclusão
resultados = []
for resultado in pool.imap_unordered(processar_arquivo, arquivos, chunksize=3):
resultados.append(resultado)
print(resultado)
print(f"Total: {len(resultados)} arquivos processados")
6. Multiprocessing com Shared Memory e Serialização
from multiprocessing import shared_memory, Process
import numpy as np
def worker(shm_name, shape):
shm = shared_memory.SharedMemory(name=shm_name)
arr = np.ndarray(shape, dtype=np.float64, buffer=shm.buf)
arr[:] *= 2 # Modifica dados in-place
shm.close()
if __name__ == "__main__":
shape = (1000, 1000)
arr = np.ones(shape, dtype=np.float64)
shm = shared_memory.SharedMemory(create=True, size=arr.nbytes)
arr_shared = np.ndarray(shape, dtype=np.float64, buffer=shm.buf)
arr_shared[:] = arr[:]
p = Process(target=worker, args=(shm.name, shape))
p.start()
p.join()
print(f"Média após processamento: {arr_shared.mean()}")
shm.close()
shm.unlink()
7. Comparação com Threading e Asyncio
import multiprocessing
import threading
import time
def cpu_intensivo(n):
return sum(i * i for i in range(n))
# Multiprocessing (verdadeiro paralelismo)
def test_multiprocessing():
with multiprocessing.Pool(4) as pool:
return pool.map(cpu_intensivo, [10**7] * 4)
# Threading (não paralelo para CPU-bound)
def test_threading():
with threading.ThreadPoolExecutor(4) as executor:
return list(executor.map(cpu_intensivo, [10**7] * 4))
if __name__ == "__main__":
inicio = time.time()
test_multiprocessing()
print(f"Multiprocessing: {time.time() - inicio:.2f}s")
inicio = time.time()
test_threading()
print(f"Threading: {time.time() - inicio:.2f}s")
8. Erros Comuns e Depuração
import multiprocessing
import logging
def tarefa_problematica():
# Erro comum: função lambda não picklable
return (lambda x: x * 2)(5)
def tarefa_segura():
return 42
if __name__ == "__main__":
# Logging para depuração
multiprocessing.log_to_stderr(logging.DEBUG)
# Evitando problemas de pickling
with multiprocessing.Pool(2) as pool:
# Isso falharia: pool.apply(tarefa_problematica)
resultado = pool.apply(tarefa_segura)
print(f"Resultado: {resultado}")
Referências
- Documentação oficial do módulo multiprocessing — Referência completa com todas as classes, funções e exemplos do módulo.
- Python Multiprocessing: The Complete Guide — Guia abrangente com exemplos práticos, dicas de performance e padrões de design.
- Understanding the Python GIL — Artigo detalhado sobre o GIL, suas implicações e como multiprocessing contorna essa limitação.
- Python Concurrency: Multiprocessing vs Threading vs Asyncio — Comparação prática entre as três abordagens de concorrência em Python.
- Effective Python: 90 Specific Ways to Write Better Python — Livro com capítulos dedicados a concorrência e paralelismo, incluindo padrões avançados de multiprocessing.
- Python Shared Memory Documentation — Documentação oficial sobre memória compartilhada para comunicação eficiente entre processos.
- Python Multiprocessing Pool: When to Use map, apply, starmap — Tutorial focado em escolher a função certa do Pool para cada cenário.