Asyncio: programação assíncrona com async/await

1. Fundamentos da Programação Assíncrona em Python

A programação assíncrona em Python resolve um problema clássico: operações de I/O (entrada/saída) que bloqueiam a execução do programa. Quando seu código precisa ler um arquivo, fazer uma requisição HTTP ou consultar um banco de dados, o tempo de espera pode ser significativo — e nesse período, o processador fica ocioso.

O problema do I/O bloqueante

Imagine um servidor web que precisa atender 100 requisições simultâneas. Com código síncrono tradicional, cada requisição bloquearia a thread até que a resposta fosse completamente processada. Isso resulta em baixa utilização de recursos e tempo de resposta elevado.

Concorrência vs Paralelismo vs Threading

  • Paralelismo (multiprocessing): Executa múltiplas tarefas simultaneamente em diferentes núcleos da CPU. Ideal para tarefas CPU-bound (processamento intensivo).
  • Threading: Alterna entre threads no mesmo processo. Útil para I/O, mas sofre com contenção de recursos e o GIL (Global Interpreter Lock).
  • Concorrência (asyncio): Usa um único thread com um loop de eventos que gerencia múltiplas tarefas de forma cooperativa. Cada tarefa cede o controle voluntariamente quando está esperando I/O.

O loop de eventos

O event loop é o coração do asyncio. Ele gerencia uma fila de tarefas (corrotinas), executa uma de cada vez, e quando uma tarefa encontra um await, ela "dorme" enquanto o loop passa para a próxima tarefa pronta para executar.

import asyncio

async def tarefa_demorada(nome, tempo):
    print(f"Iniciando {nome}")
    await asyncio.sleep(tempo)  # Cede o controle aqui
    print(f"Finalizando {nome}")
    return f"Resultado de {nome}"

async def main():
    # O loop gerencia ambas as tarefas concorrentemente
    resultado = await asyncio.gather(
        tarefa_demorada("A", 2),
        tarefa_demorada("B", 1)
    )
    print(resultado)

asyncio.run(main())

2. Sintaxe Essencial: async/await e Corrotinas

Definindo corrotinas

Uma corrotina é definida com async def. Diferente de funções comuns, ela não executa imediatamente — retorna um objeto corrotina.

async def minha_corrotina():
    return 42

# Isso NÃO executa a corrotina
corrotina_obj = minha_corrotina()
print(type(corrotina_obj))  # <class 'coroutine'>

# Para executar, precisamos de await ou asyncio.run()
resultado = await corrotina_obj  # Funciona apenas dentro de outra corrotina

O poder do await

await pausa a execução da corrotina atual sem bloquear o loop de eventos. Enquanto uma corrotina espera, outras podem executar.

async def buscar_dados():
    print("Buscando dados...")
    await asyncio.sleep(2)  # Simula I/O
    return {"status": "ok"}

async def processar():
    dados = await buscar_dados()  # Aguarda sem bloquear
    print(f"Dados recebidos: {dados}")

asyncio.run(processar())

asyncio.run() vs await

  • asyncio.run(): Cria um novo event loop, executa a corrotina e fecha o loop. Deve ser chamado apenas uma vez no ponto de entrada do programa.
  • await: Usado dentro de corrotinas para aguardar outras corrotinas.

3. Gerenciamento de Tarefas (Tasks) e Concorrência

Criando tasks com create_task()

Tasks permitem executar múltiplas corrotinas concorrentemente:

async def tarefa_1():
    await asyncio.sleep(2)
    return "Tarefa 1 concluída"

async def tarefa_2():
    await asyncio.sleep(1)
    return "Tarefa 2 concluída"

async def main():
    task1 = asyncio.create_task(tarefa_1())
    task2 = asyncio.create_task(tarefa_2())

    # Aguarda ambas as tasks
    resultado1 = await task1
    resultado2 = await task2
    print(resultado1, resultado2)

asyncio.run(main())

asyncio.gather() e asyncio.wait()

gather() agrupa múltiplas corrotinas e retorna uma lista com todos os resultados:

async def main():
    resultados = await asyncio.gather(
        tarefa_1(),
        tarefa_2(),
        return_exceptions=True  # Captura exceções sem interromper outras tasks
    )
    print(resultados)

Cancelamento e tratamento de exceções

async def tarefa_longa():
    try:
        await asyncio.sleep(10)
    except asyncio.CancelledError:
        print("Tarefa cancelada!")
        raise

async def main():
    task = asyncio.create_task(tarefa_longa())
    await asyncio.sleep(1)
    task.cancel()  # Cancela a task
    try:
        await task
    except asyncio.CancelledError:
        print("Cancelamento tratado")

4. Operações Assíncronas de I/O na Prática

Leitura/escrita assíncrona com aiofiles

import aiofiles

async def ler_arquivo():
    async with aiofiles.open('dados.txt', mode='r') as f:
        conteudo = await f.read()
        return conteudo

async def escrever_arquivo():
    async with aiofiles.open('saida.txt', mode='w') as f:
        await f.write("Dados assíncronos")

Requisições HTTP com aiohttp

import aiohttp

async def buscar_url(session, url):
    async with session.get(url) as response:
        return await response.text()

async def main():
    urls = [
        "https://api.exemplo.com/dados1",
        "https://api.exemplo.com/dados2"
    ]
    async with aiohttp.ClientSession() as session:
        tarefas = [buscar_url(session, url) for url in urls]
        resultados = await asyncio.gather(*tarefas)
        for resultado in resultados:
            print(len(resultado))

asyncio.run(main())

Timeouts e delays

async def operacao_lenta():
    await asyncio.sleep(5)
    return "Concluído"

async def main():
    try:
        resultado = await asyncio.wait_for(
            operacao_lenta(), 
            timeout=3.0  # Levanta TimeoutError após 3 segundos
        )
    except asyncio.TimeoutError:
        print("Operação excedeu o tempo limite")

5. Sincronização e Compartilhamento de Estado

Locks assíncronos

lock = asyncio.Lock()
recurso_compartilhado = 0

async def incrementar():
    global recurso_compartilhado
    async with lock:  # Garante acesso exclusivo
        valor_atual = recurso_compartilhado
        await asyncio.sleep(0.1)  # Simula processamento
        recurso_compartilhado = valor_atual + 1

Semáforos e eventos

# Limita a 3 conexões simultâneas
semaphore = asyncio.Semaphore(3)

async def acessar_api(id):
    async with semaphore:
        print(f"Requisição {id} iniciada")
        await asyncio.sleep(1)
        print(f"Requisição {id} finalizada")

# Evento para sincronização
evento = asyncio.Event()

async def produtor():
    await asyncio.sleep(2)
    evento.set()  # Sinaliza que o evento ocorreu

async def consumidor():
    await evento.wait()  # Aguarda até o evento ser setado
    print("Evento recebido!")

Filas assíncronas

async def produtor(fila):
    for i in range(5):
        await fila.put(f"Item {i}")
        await asyncio.sleep(0.5)

async def consumidor(fila):
    while True:
        item = await fila.get()
        print(f"Processando {item}")
        fila.task_done()

async def main():
    fila = asyncio.Queue(maxsize=10)
    await asyncio.gather(
        produtor(fila),
        consumidor(fila)
    )

6. Padrões Avançados e Boas Práticas

Context managers assíncronos

class ConexaoAsync:
    async def __aenter__(self):
        print("Abrindo conexão...")
        await asyncio.sleep(1)
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("Fechando conexão...")
        await asyncio.sleep(0.5)

async def main():
    async with ConexaoAsync() as conn:
        print("Usando conexão")

Iteração assíncrona

class AsyncRange:
    def __init__(self, start, end):
        self.current = start
        self.end = end

    def __aiter__(self):
        return self

    async def __anext__(self):
        if self.current >= self.end:
            raise StopAsyncIteration
        await asyncio.sleep(0.1)
        self.current += 1
        return self.current - 1

async def main():
    async for numero in AsyncRange(1, 5):
        print(numero)

Evitando código bloqueante

import concurrent.futures

def operacao_cpu_intensiva():
    # Código síncrono pesado
    return sum(i * i for i in range(10**7))

async def main():
    loop = asyncio.get_event_loop()
    # Executa em thread separada para não bloquear o event loop
    resultado = await loop.run_in_executor(
        None,  # Usa ThreadPoolExecutor padrão
        operacao_cpu_intensiva
    )
    print(resultado)

7. Comparação com Alternativas e Limitações do asyncio

Quando usar cada abordagem

Cenário Abordagem Recomendada
Muitas conexões de rede simultâneas asyncio
Processamento CPU intensivo multiprocessing
I/O com bibliotecas que não suportam async threading
Operações de I/O mistas com CPU asyncio + run_in_executor

Limitações do asyncio

  • Tasks cooperativas: Se uma task demorar muito sem usar await, todas as outras ficam bloqueadas
  • CPU-bound: Para processamento intensivo, use multiprocessing ou run_in_executor
  • GIL: Embora asyncio contorne o GIL para I/O, operações CPU-bound ainda sofrem com ele
  • Curva de aprendizado: Requer mudança de mentalidade em relação ao código síncrono

Ferramentas complementares

# uvloop: substitui o event loop padrão por uma implementação mais rápida
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

# Modo debug para identificar problemas
asyncio.run(main(), debug=True)

Referências