Como usar o módulo threading e multiprocessing em Python com segurança
1. Fundamentos: Threads vs Processos no Contexto do GIL
O Global Interpreter Lock (GIL) é um mutex que protege o interpretador Python, garantindo que apenas uma thread execute bytecode por vez. Isso significa que threads em Python não executam código em paralelo real para operações CPU-bound. Para operações I/O-bound (como leitura de arquivos, requisições HTTP), o GIL é liberado durante operações de I/O, permitindo concorrência eficiente.
import threading
import time
def operacao_io_bound():
time.sleep(2) # Simula operação de I/O
return "Concluído"
def operacao_cpu_bound():
soma = 0
for i in range(10**7):
soma += i
return soma
# Threading é eficiente para I/O-bound
threads = [threading.Thread(target=operacao_io_bound) for _ in range(4)]
inicio = time.time()
for t in threads: t.start()
for t in threads: t.join()
print(f"Tempo I/O-bound com threads: {time.time() - inicio:.2f}s")
Para operações CPU-bound, multiprocessing cria processos com espaços de memória isolados, contornando o GIL e permitindo paralelismo real.
import multiprocessing as mp
def operacao_cpu_paralela():
soma = 0
for i in range(10**7):
soma += i
return soma
with mp.Pool(4) as pool:
resultados = pool.map(operacao_cpu_paralela, range(4))
2. Sincronização Segura com threading.Lock e threading.RLock
O Lock evita condições de corrida quando múltiplas threads acessam recursos compartilhados.
import threading
contador = 0
lock = threading.Lock()
def incrementar_seguro():
global contador
with lock: # Padrão seguro
contador += 1
def incrementar_inseguro():
global contador
contador += 1 # Condição de corrida!
threads = [threading.Thread(target=incrementar_seguro) for _ in range(1000)]
for t in threads: t.start()
for t in threads: t.join()
print(f"Contador seguro: {contador}") # Sempre 1000
RLock (Reentrant Lock) permite que a mesma thread adquira o lock múltiplas vezes sem deadlock, útil em chamadas aninhadas.
rlock = threading.RLock()
def funcao_aninhada():
with rlock:
# Já possui o lock, pode adquirir novamente
with rlock:
print("Reentrada segura")
# Com Lock comum, isso causaria deadlock
threading.Thread(target=funcao_aninhada).start()
3. Evitando Deadlocks e Starvation em Múltiplos Locks
A hierarquia de locks é a técnica mais eficaz contra deadlocks: sempre adquirir locks na mesma ordem.
lock_a = threading.Lock()
lock_b = threading.Lock()
def tarefa_segura():
with lock_a:
with lock_b: # Ordem consistente: A -> B
print("Operação segura")
def tarefa_perigosa():
with lock_b:
with lock_a: # Ordem invertida: B -> A (risco de deadlock)
print("Operação perigosa")
Timeouts ajudam a detectar bloqueios:
def tarefa_com_timeout():
if lock_a.acquire(timeout=5):
try:
# Operação crítica
pass
finally:
lock_a.release()
else:
print("Timeout ao adquirir lock_a")
4. Comunicação Segura entre Processos com multiprocessing.Queue e Pipe
Queue é thread-safe e process-safe, ideal para padrão produtor-consumidor.
import multiprocessing as mp
def produtor(queue):
for i in range(10):
queue.put(f"Item {i}")
def consumidor(queue):
while True:
item = queue.get()
if item is None: # Sinal de término
break
print(f"Processado: {item}")
queue = mp.Queue()
p1 = mp.Process(target=produtor, args=(queue,))
p2 = mp.Process(target=consumidor, args=(queue,))
p1.start(); p2.start()
p1.join()
queue.put(None) # Sinaliza término
p2.join()
Pipe oferece comunicação bidirecional, mas requer cuidados com fechamento de extremidades.
conexao_pai, conexao_filho = mp.Pipe()
def processo_filho(conn):
conn.send("Mensagem do filho")
conn.close()
p = mp.Process(target=processo_filho, args=(conexao_filho,))
p.start()
mensagem = conexao_pai.recv()
print(f"Recebido: {mensagem}")
conexao_pai.close()
p.join()
5. Gerenciamento de Estado Compartilhado com multiprocessing.Manager
Manager cria objetos compartilhados entre processos com sincronização implícita.
import multiprocessing as mp
def trabalhador(dicionario, chave, valor):
dicionario[chave] = valor
with mp.Manager() as manager:
dicionario_compartilhado = manager.dict()
processos = []
for i in range(5):
p = mp.Process(target=trabalhador, args=(dicionario_compartilhado, f"chave_{i}", i))
processos.append(p)
p.start()
for p in processos:
p.join()
print(dicionario_compartilhado)
Para maior performance, use multiprocessing.Value e multiprocessing.Array com locks embutidos.
contador_compartilhado = mp.Value('i', 0)
array_compartilhado = mp.Array('d', [0.0, 0.0, 0.0])
def incrementar(valor):
with valor.get_lock(): # Lock embutido
valor.value += 1
6. Pool de Trabalhadores e Tratamento de Exceções
multiprocessing.Pool e concurrent.futures.ThreadPoolExecutor simplificam o gerenciamento de trabalhadores.
from concurrent.futures import ThreadPoolExecutor, as_completed
def tarefa_com_erro(x):
if x == 5:
raise ValueError(f"Erro no item {x}")
return x * 2
with ThreadPoolExecutor(max_workers=4) as executor:
futures = {executor.submit(tarefa_com_erro, i): i for i in range(10)}
for future in as_completed(futures):
try:
resultado = future.result()
print(f"Sucesso: {resultado}")
except Exception as e:
print(f"Exceção capturada: {e}")
Para processos, encerramento gracioso é essencial:
with mp.Pool(4) as pool:
try:
resultados = pool.map(tarefa_com_erro, range(10))
except Exception as e:
print(f"Erro no pool: {e}")
pool.terminate() # Encerra imediatamente
finally:
pool.join() # Aguarda término
7. Padrões Avançados de Segurança: Barrier, Semaphore e Event
Barrier sincroniza múltiplos pontos de encontro:
barreira = threading.Barrier(3)
def tarefa_sincronizada(nome):
print(f"{nome} chegou na barreira")
barreira.wait() # Aguarda todas as 3 threads
print(f"{nome} passou da barreira")
threads = [threading.Thread(target=tarefa_sincronizada, args=(f"Thread {i}",)) for i in range(3)]
for t in threads: t.start()
for t in threads: t.join()
Semaphore controla acesso a recursos limitados:
semaforo = threading.Semaphore(3) # Máximo 3 conexões simultâneas
def conexao_banco(id_conexao):
with semaforo:
print(f"Conexão {id_conexao} ativa")
time.sleep(1)
print(f"Conexão {id_conexao} liberada")
Event para sinalização sem polling:
evento = threading.Event()
def trabalhador():
print("Aguardando sinal...")
evento.wait() # Bloqueia até evento ser setado
print("Sinal recebido!")
def sinalizador():
time.sleep(2)
evento.set() # Libera todos os workers
threading.Thread(target=trabalhador).start()
threading.Thread(target=sinalizador).start()
8. Testes e Debug de Concorrência em Python
Para reproduzir condições de corrida, use time.sleep() controlado:
import time
contador_teste = 0
lock_teste = threading.Lock()
def operacao_lenta():
global contador_teste
with lock_teste:
temp = contador_teste
time.sleep(0.001) # Força condição de corrida
contador_teste = temp + 1
Ferramentas úteis para debug:
import faulthandler
faulthandler.enable() # Dump de stacks em caso de deadlock
# Para detectar vazamento de threads
import threading
print(f"Threads ativas: {threading.active_count()}")
for thread in threading.enumerate():
print(f"Thread: {thread.name}")
Referências
- Documentação oficial do threading — Referência completa do módulo threading, incluindo locks, semaphores e events
- Documentação oficial do multiprocessing — Guia completo do módulo multiprocessing, com exemplos de Pool, Queue e Manager
- Python Concurrency: Threading vs Multiprocessing — Tutorial prático da Real Python sobre quando usar threading vs multiprocessing
- Understanding the Python GIL — Explicação detalhada do Global Interpreter Lock e seu impacto na concorrência
- Deadlock Prevention in Python — Técnicas de prevenção de deadlocks com exemplos práticos
- Python ThreadPoolExecutor Tutorial — Guia avançado sobre o ThreadPoolExecutor do módulo concurrent.futures
- Debugging Concurrency in Python — Técnicas para debug de problemas de concorrência, incluindo faulthandler e objgraph