Real-time systems: previsibilidade e deadlines

1. Fundamentos de Sistemas de Tempo Real em C

Sistemas de tempo real são aqueles onde a correção da computação depende não apenas do resultado lógico, mas também do instante em que esse resultado é produzido. Em linguagem C, essa característica exige controle fino sobre o hardware e o escalonamento de tarefas.

Classificamos esses sistemas em três categorias principais:
- Hard real-time: a violação de um deadline causa falha catastrófica (ex.: airbags, controle de motor)
- Soft real-time: deadlines perdidas degradam qualidade, mas não causam desastre (ex.: streaming de áudio)
- Firm real-time: resultados úteis apenas até o deadline; após, são descartados (ex.: sistemas de radar)

Os conceitos fundamentais incluem:
- Deadline: tempo máximo para conclusão de uma tarefa
- Jitter: variação na latência de resposta
- Latência: atraso entre estímulo e resposta

O scheduler do sistema operacional interage diretamente com o código C, determinando quando cada thread ou processo executará. Em sistemas críticos, o escalonamento deve ser previsível e controlado pelo programador.

2. Previsibilidade Temporal: O Desafio da Linguagem C

A linguagem C oferece grande poder de controle, mas também introduz fontes de imprevisibilidade:

// Exemplo de alocação dinâmica imprevisível
void tarefa_critica() {
    int *buffer = (int*)malloc(1024 * sizeof(int)); // tempo variável
    // ... processamento
    free(buffer); // pode causar fragmentação
}

Para garantir determinismo, adotamos:
- Alocação estática: usar arrays de tamanho fixo em vez de malloc
- Memory pools: pré-alocar blocos de memória em tempo de inicialização
- Evitar chamadas de sistema: printf, scanf e E/S de arquivo introduzem latência imprevisível
- Evitar dynamic dispatch: funções virtuais em C++ não existem em C, mas ponteiros para função devem ser usados com cautela

// Exemplo de pool de memória determinístico
#define POOL_SIZE 10
typedef struct {
    int dados[256];
    int livre;
} Bloco;

Bloco pool[POOL_SIZE];

void* pool_aloca() {
    for (int i = 0; i < POOL_SIZE; i++) {
        if (pool[i].livre) {
            pool[i].livre = 0;
            return &pool[i].dados;
        }
    }
    return NULL; // pool esgotado
}

3. Escalonamento em Tempo Real: Implementação Prática

Rate Monotonic Scheduling (RMS)

No RMS, tarefas com período menor recebem prioridade maior. Implementação simplificada:

typedef struct {
    void (*funcao)(void);
    int periodo_ms;
    int deadline_ms;
    int tempo_restante;
} Tarefa;

Tarefa tarefas[] = {
    { &sensor_leitura, 10, 10, 0 },
    { &atuador_controle, 20, 20, 0 },
    { &display_atualiza, 50, 50, 0 }
};

void scheduler_rms() {
    int prioridade_max = -1;
    int indice_exec = -1;
    for (int i = 0; i < NUM_TAREFAS; i++) {
        if (tarefas[i].tempo_restante > 0 && 
            tarefas[i].periodo_ms > prioridade_max) {
            prioridade_max = tarefas[i].periodo_ms;
            indice_exec = i;
        }
    }
    if (indice_exec >= 0) {
        tarefas[indice_exec].funcao();
        tarefas[indice_exec].tempo_restante -= 1;
    }
}

Earliest Deadline First (EDF)

EDF executa a tarefa com deadline mais próxima:

void scheduler_edf() {
    int deadline_min = INT_MAX;
    int indice_exec = -1;
    for (int i = 0; i < NUM_TAREFAS; i++) {
        if (tarefas[i].tempo_restante > 0 && 
            tarefas[i].deadline_ms < deadline_min) {
            deadline_min = tarefas[i].deadline_ms;
            indice_exec = i;
        }
    }
    if (indice_exec >= 0) {
        tarefas[indice_exec].funcao();
        tarefas[indice_exec].tempo_restante--;
    }
}

Para uso real, esses schedulers devem ser acionados por timers de hardware (ex.: temporizador do microcontrolador).

4. Gerenciamento de Interrupções e Context Switch

Interrupt Service Routines (ISRs) devem ser extremamente eficientes:

// ISR eficiente - apenas sinaliza tarefa
volatile int flag_sensor = 0;

void ISR_sensor() __attribute__((interrupt)) {
    flag_sensor = 1; // sinaliza para a tarefa principal
    // NÃO fazer E/S ou alocação aqui
}

void tarefa_principal() {
    while (1) {
        if (flag_sensor) {
            flag_sensor = 0;
            processa_dados_sensor();
        }
    }
}

Técnicas para minimizar context switch:
- Usar cooperative multitasking em vez de preemptivo
- Manter ISRs curtas (apenas sinalização)
- Desabilitar interrupções seletivamente em seções críticas

// Seção crítica com desabilitação seletiva
__disable_interrupts();
variavel_compartilhada = novo_valor;
__enable_interrupts();

5. Sincronização e Compartilhamento de Dados

Priority Inversion e Herança de Prioridade

A inversão de prioridade ocorre quando uma tarefa de baixa prioridade segura um recurso necessário por uma tarefa de alta prioridade. Solução com herança de prioridade:

typedef struct {
    int locked;
    int prioridade_original;
    int prioridade_herdada;
} MutexRT;

void mutex_lock(MutexRT *m, int prioridade_atual) {
    while (m->locked) {
        if (prioridade_atual > m->prioridade_herdada) {
            m->prioridade_herdada = prioridade_atual;
        }
        // yield ou spin
    }
    m->locked = 1;
    m->prioridade_original = prioridade_atual;
    m->prioridade_herdada = prioridade_atual;
}

Lock-free Data Structures

Para evitar bloqueios, usamos estruturas lock-free:

// Fila circular lock-free (single producer, single consumer)
#define FILA_TAM 64
typedef struct {
    int buffer[FILA_TAM];
    volatile int head;
    volatile int tail;
} FilaLF;

int fila_push(FilaLF *f, int valor) {
    int next = (f->head + 1) % FILA_TAM;
    if (next == f->tail) return -1; // cheia
    f->buffer[f->head] = valor;
    f->head = next;
    return 0;
}

int fila_pop(FilaLF *f, int *valor) {
    if (f->tail == f->head) return -1; // vazia
    *valor = f->buffer[f->tail];
    f->tail = (f->tail + 1) % FILA_TAM;
    return 0;
}

6. Análise de Deadlines e Worst-Case Execution Time (WCET)

O WCET é o tempo máximo que uma tarefa pode levar para executar. Para medi-lo:

// Medição de WCET com ciclos de CPU
unsigned long start, end, wcet = 0;

void tarefa_medida() {
    start = ler_contador_ciclos();
    // código da tarefa
    end = ler_contador_ciclos();
    if ((end - start) > wcet) wcet = end - start;
}

O teste de escalonabilidade para RMS utiliza a fórmula:

// Teste de Liu & Layland para RMS
float fator_utilizacao(int n, int *tempos_exec, int *periodos) {
    float U = 0;
    for (int i = 0; i < n; i++) {
        U += (float)tempos_exec[i] / periodos[i];
    }
    float limite = n * (pow(2.0, 1.0/n) - 1);
    return (U <= limite) ? 1 : 0; // 1 = escalonável
}

7. Boas Práticas e Padrões para C em Tempo Real

Watchdog Timers

void watchdog_init() {
    // Configura watchdog para 20ms
    WDT_CTRL = WDT_PERIOD_20MS;
}

void tarefa_principal() {
    while (1) {
        if (flag_deadline_perdida) {
            // Falha detectada - reinicia sistema
            while(1); // watchdog irá resetar
        }
        WDT_RESET(); // alimenta watchdog
        processa_tarefas();
    }
}

Exemplo Completo: Controle de Motor com Deadline de 10ms

#define PERIODO_MS 10
volatile int tick = 0;

void timer_isr() __attribute__((interrupt)) {
    tick = 1;
}

void controle_motor() {
    static int angulo_atual = 0;
    int angulo_desejado = le_sensor_posicao();
    int erro = angulo_desejado - angulo_atual;

    // Controle PID simplificado
    int saida_pwm = erro * 2;
    if (saida_pwm > 255) saida_pwm = 255;
    if (saida_pwm < 0) saida_pwm = 0;

    escreve_pwm(saida_pwm);
    angulo_atual += saida_pwm / 10;
}

int main() {
    timer_config(PERIODO_MS);
    while (1) {
        if (tick) {
            tick = 0;
            controle_motor();
        }
    }
}

8. Ferramentas e Depuração para Sistemas de Tempo Real

Para verificar jitter e cumprimento de deadlines:
- Logic analyzers: medem sinais de GPIO para rastrear tempo real
- Trace analyzers: como Tracealyzer ou Percepio para visualizar escalonamento
- Cycle-accurate simulators: como QEMU ou simuladores específicos de microcontroladores

// Medição de jitter com pino de depuração
#define PINO_DEBUG 5

void inicio_tarefa() {
    GPIO_SET(PINO_DEBUG);
    // ... execução da tarefa
    GPIO_CLEAR(PINO_DEBUG);
}

Testes de estresse devem incluir:
- Sobrecarga de 150% da CPU para verificar comportamento
- Injeção de falhas em recursos compartilhados
- Boundary value analysis nos tempos de execução

Referências