Inter-process communication: shared memory e message queues

1. Fundamentos da IPC em Sistemas UNIX

1.1. Visão geral dos mecanismos IPC

A comunicação entre processos (IPC) em sistemas UNIX oferece diversos mecanismos, cada um adequado a cenários específicos. Pipes e FIFOs permitem comunicação unidirecional baseada em fluxo de bytes, enquanto sockets oferecem comunicação bidirecional através da rede ou localmente. Os mecanismos System V (SysV) e POSIX fornecem abstrações mais poderosas: memória compartilhada, filas de mensagens e semáforos.

1.2. O papel da memória compartilhada e filas de mensagens

Memória compartilhada permite que múltiplos processos acessem a mesma região de memória, oferecendo a mais baixa latência entre mecanismos IPC. Filas de mensagens, por outro lado, fornecem comunicação estruturada baseada em mensagens com suporte a prioridades e sincronização embutida.

1.3. Estruturas de dados do kernel

O kernel mantém estruturas internas para gerenciar recursos IPC. Para memória compartilhada, shmid_ds armazena permissões, tamanho e timestamps. Para filas de mensagens, msqid_ds gerencia a fila de mensagens pendentes. Identificadores IPC são inteiros retornados por shmget e msgget.

2. Memória Compartilhada com SysV

2.1. Criação e configuração de segmentos

#include <sys/ipc.h>
#include <sys/shm.h>
#include <stdio.h>
#include <stdlib.h>

#define SHM_SIZE 1024

int main() {
    key_t key = ftok("/tmp", 'A');
    int shmid = shmget(key, SHM_SIZE, IPC_CREAT | 0666);
    if (shmid == -1) {
        perror("shmget");
        exit(1);
    }
    printf("Segmento criado com ID: %d\n", shmid);
    return 0;
}

2.2. Anexação e desanexação de segmentos

#include <sys/ipc.h>
#include <sys/shm.h>
#include <string.h>
#include <stdio.h>

int main() {
    key_t key = ftok("/tmp", 'A');
    int shmid = shmget(key, 1024, 0666);

    char *data = (char *)shmat(shmid, NULL, 0);
    if (data == (char *)-1) {
        perror("shmat");
        return 1;
    }

    strcpy(data, "Hello from process!");
    printf("Dados escritos: %s\n", data);

    shmdt(data);
    return 0;
}

2.3. Controle e remoção

#include <sys/ipc.h>
#include <sys/shm.h>

struct shmid_ds buf;
shmctl(shmid, IPC_STAT, &buf);
printf("Tamanho: %zu\n", buf.shm_segsz);

// Remover segmento
shmctl(shmid, IPC_RMID, NULL);

3. Memória Compartilhada com POSIX

3.1. Criação de objetos nomeados

#include <sys/mman.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdio.h>

int main() {
    int fd = shm_open("/meu_shm", O_CREAT | O_RDWR, 0666);
    if (fd == -1) {
        perror("shm_open");
        return 1;
    }

    ftruncate(fd, 4096);

    void *ptr = mmap(NULL, 4096, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
    if (ptr == MAP_FAILED) {
        perror("mmap");
        return 1;
    }

    sprintf((char *)ptr, "Dados compartilhados");

    munmap(ptr, 4096);
    close(fd);
    shm_unlink("/meu_shm");
    return 0;
}

4. Sincronização de Acesso à Memória Compartilhada

4.1. Semáforos POSIX em memória compartilhada

#include <semaphore.h>
#include <sys/mman.h>
#include <fcntl.h>
#include <unistd.h>

typedef struct {
    sem_t mutex;
    char buffer[256];
} shared_data;

int main() {
    int fd = shm_open("/shm_sync", O_CREAT | O_RDWR, 0666);
    ftruncate(fd, sizeof(shared_data));

    shared_data *data = mmap(NULL, sizeof(shared_data), 
                             PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);

    sem_init(&data->mutex, 1, 1);  // 1 = shared between processes

    sem_wait(&data->mutex);
    sprintf(data->buffer, "Acesso seguro!");
    sem_post(&data->mutex);

    munmap(data, sizeof(shared_data));
    close(fd);
    shm_unlink("/shm_sync");
    return 0;
}

4.2. Mutexes entre processos

pthread_mutexattr_t attr;
pthread_mutex_t *mutex;

pthread_mutexattr_init(&attr);
pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED);
pthread_mutex_init(mutex, &attr);

4.3. Armadilhas comuns

Data races ocorrem quando múltiplos processos acessam dados compartilhados sem sincronização. Deadlocks surgem quando dois processos aguardam recursos um do outro. Barreiras de memória (__sync_synchronize()) garantem visibilidade de escritas entre processos.

5. Filas de Mensagens com SysV

5.1. Criação e obtenção de filas

#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdio.h>

int main() {
    key_t key = ftok("/tmp", 'B');
    int msgid = msgget(key, IPC_CREAT | 0666);
    if (msgid == -1) {
        perror("msgget");
        return 1;
    }
    printf("Fila criada com ID: %d\n", msgid);
    return 0;
}

5.2. Envio e recebimento de mensagens

#include <sys/ipc.h>
#include <sys/msg.h>
#include <string.h>
#include <stdio.h>

struct msgbuf {
    long mtype;
    char mtext[256];
};

// Envio
struct msgbuf message;
message.mtype = 1;
strcpy(message.mtext, "Mensagem de teste");
msgsnd(msgid, &message, sizeof(message.mtext), 0);

// Recebimento
msgrcv(msgid, &message, sizeof(message.mtext), 1, 0);
printf("Recebido: %s\n", message.mtext);

5.3. Controle de filas

struct msqid_ds buf;
msgctl(msgid, IPC_STAT, &buf);
printf("Mensagens na fila: %lu\n", buf.msg_qnum);

// Remover fila
msgctl(msgid, IPC_RMID, NULL);

6. Filas de Mensagens com POSIX

6.1. Criação e configuração

#include <mqueue.h>
#include <fcntl.h>
#include <stdio.h>

int main() {
    struct mq_attr attr;
    attr.mq_flags = 0;
    attr.mq_maxmsg = 10;
    attr.mq_msgsize = 256;
    attr.mq_curmsgs = 0;

    mqd_t mq = mq_open("/minha_fila", O_CREAT | O_RDWR, 0666, &attr);
    if (mq == (mqd_t)-1) {
        perror("mq_open");
        return 1;
    }

    char buffer[256] = "Mensagem POSIX";
    mq_send(mq, buffer, strlen(buffer) + 1, 0);

    char recv[256];
    unsigned int priority;
    mq_receive(mq, recv, sizeof(recv), &priority);
    printf("Recebido: %s (prioridade: %u)\n", recv, priority);

    mq_close(mq);
    mq_unlink("/minha_fila");
    return 0;
}

6.2. Operações assíncronas

struct sigevent sev;
sev.sigev_notify = SIGEV_THREAD;
sev.sigev_notify_function = &handler;
mq_notify(mq, &sev);

7. Comparação e Seleção de Mecanismos IPC

7.1. SysV vs. POSIX

POSIX oferece interface mais limpa, melhor portabilidade e recursos como notificação assíncrona. SysV ainda é amplamente usado em código legado e tem suporte universal em UNIX.

7.2. Memória compartilhada vs. filas de mensagens

Memória compartilhada oferece menor latência (acesso direto à memória) mas requer sincronização explícita. Filas de mensagens fornecem comunicação estruturada com sincronização embutida, mas têm overhead maior.

7.3. Integração com signal handling

Em daemons, é comum usar memória compartilhada para estado compartilhado e filas para comandos, combinando com tratamento de sinais para limpeza graceful.

8. Exemplo Prático Completo em C

8.1. Buffer circular com semáforo

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/mman.h>
#include <fcntl.h>
#include <semaphore.h>
#include <signal.h>

#define BUFFER_SIZE 10

typedef struct {
    int buffer[BUFFER_SIZE];
    int in, out;
    sem_t empty, full, mutex;
} circular_buffer;

circular_buffer *buf;
int running = 1;

void cleanup(int sig) {
    running = 0;
}

int main() {
    signal(SIGINT, cleanup);
    signal(SIGTERM, cleanup);

    int fd = shm_open("/circular_buf", O_CREAT | O_RDWR, 0666);
    ftruncate(fd, sizeof(circular_buffer));

    buf = mmap(NULL, sizeof(circular_buffer), 
               PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);

    sem_init(&buf->empty, 1, BUFFER_SIZE);
    sem_init(&buf->full, 1, 0);
    sem_init(&buf->mutex, 1, 1);
    buf->in = 0;
    buf->out = 0;

    // Producer
    if (fork() == 0) {
        int item = 0;
        while (running) {
            sem_wait(&buf->empty);
            sem_wait(&buf->mutex);
            buf->buffer[buf->in] = item++;
            buf->in = (buf->in + 1) % BUFFER_SIZE;
            sem_post(&buf->mutex);
            sem_post(&buf->full);
            sleep(1);
        }
        exit(0);
    }

    // Consumer
    while (running) {
        sem_wait(&buf->full);
        sem_wait(&buf->mutex);
        int item = buf->buffer[buf->out];
        buf->out = (buf->out + 1) % BUFFER_SIZE;
        sem_post(&buf->mutex);
        sem_post(&buf->empty);
        printf("Consumido: %d\n", item);
        sleep(2);
    }

    // Cleanup
    sem_destroy(&buf->empty);
    sem_destroy(&buf->full);
    sem_destroy(&buf->mutex);
    munmap(buf, sizeof(circular_buffer));
    close(fd);
    shm_unlink("/circular_buf");

    return 0;
}

8.2. Servidor/Cliente com fila POSIX

#include <mqueue.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <signal.h>

mqd_t server_queue;
int running = 1;

void handler(int sig) {
    running = 0;
}

int main() {
    signal(SIGINT, handler);

    struct mq_attr attr = {0, 10, 256, 0};
    server_queue = mq_open("/server_queue", O_CREAT | O_RDWR, 0666, &attr);

    // Client
    if (fork() == 0) {
        mqd_t client_q = mq_open("/client_queue", O_CREAT | O_RDWR, 0666, &attr);
        char msg[256];
        sprintf(msg, "PID=%d: Hello server!", getpid());
        mq_send(server_queue, msg, strlen(msg) + 1, 0);

        char response[256];
        unsigned int prio;
        mq_receive(client_q, response, sizeof(response), &prio);
        printf("Cliente recebeu: %s\n", response);

        mq_close(client_q);
        mq_unlink("/client_queue");
        exit(0);
    }

    // Server
    char buffer[256];
    unsigned int priority;
    while (running) {
        if (mq_receive(server_queue, buffer, sizeof(buffer), &priority) != -1) {
            printf("Servidor recebeu: %s\n", buffer);

            // Extract client queue name from message
            mqd_t client_q = mq_open("/client_queue", O_WRONLY);
            char response[256] = "Processado com sucesso!";
            mq_send(client_q, response, strlen(response) + 1, 0);
            mq_close(client_q);
        }
    }

    mq_close(server_queue);
    mq_unlink("/server_queue");
    return 0;
}

Referências