foolyc

linux message queue

最近尝试将围棋AI程序中蒙托卡罗搜索与GPU使用分开,使用多线程尽量压榨机器性能,那么必然涉及到生产-消费模型中通信的队列。

网上查阅到的两种方式,一种是线程条件变量,一种是利用eventfd实现事件通知 (源自推酷)。
简单的环形队列实现见附录1。

线程条件变量

相关函数介绍

pthread_cond_init:初始化一个线程条件变量。
pthread_cond_wait:等待条件触发。
pthread_cond_signal:通知一个线程,线程条件发生。
pthread_cond_timedwait:等待条件触发,可以设置超时时间。
pthread_cond_reltimedwait_np:和pthread_cond_timedwait使用基本相同,区别是使用的是相对时间间隔而不是绝对时间间隔。
pthread_cond_broadcast:通知所有等待线程,线程条件发生。
pthread_cond_destroy:销毁条件变量。

唤醒丢失问题

如果线程未持有与条件相关联的互斥锁,则调用 pthread_cond_signal() 或 pthread_cond_broadcast() 会产生唤醒丢失错误。满足以下所有条件时,即会出现唤醒丢失问题:

一个线程调用 pthread_cond_signal() 或 pthread_cond_broadcast()
另一个线程已经测试了该条件,但是尚未调用 pthread_cond_wait()
没有正在等待的线程

信号不起作用,因此将会丢失,仅当修改所测试的条件但未持有与之相关联的互斥锁时,才会出现此问题。只要仅在持有关联的互斥锁同时修改所测试的条件,即可调用 pthread_cond_signal() 和 pthread_cond_broadcast(),而无论这些函数是否持有关联的互斥锁。

线程条件变量使用方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
get_resources(int amount)
{
pthread_mutex_lock(&rsrc_lock);
while (resources < amount)
{
pthread_cond_wait(&rsrc_add, &rsrc_lock);
}
resources -= amount;
pthread_mutex_unlock(&rsrc_lock);
}
add_resources(int amount)
{
pthread_mutex_lock(&rsrc_lock);
resources += amount;
pthread_cond_broadcast(&rsrc_add);
pthread_mutex_unlock(&rsrc_lock);
}

线程变量实现的异步队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
typedef struct async_queue
{
pthread_mutex_t mutex;
pthread_cond_t cond;
int waiting_threads;
queue_t *_queue;
} async_queue_t;
async_queue_t *async_queue_create(int size)
{
async_queue_t *q = malloc(sizeof (async_queue_t));
q->_queue = queue_create(size);
q->waiting_threads = 0;
pthread_mutex_init(&(q->mutex), NULL);
pthread_cond_init(&(q->cond), NULL);
return q;
}
void async_queue_push_tail(async_queue_t *q, void *data)
{
if (!queue_is_full(q->_queue))
{
pthread_mutex_lock(&(q->mutex));
queue_push_tail(q->_queue, data);
if (q->waiting_threads > 0)
{
pthread_cond_signal(&(q->cond));
}
pthread_mutex_unlock(&(q->mutex));
}
}
void *async_queue_pop_head(async_queue_t *q, struct timeval *tv)
{
void *retval = NULL;
pthread_mutex_lock(&(q->mutex));
if (queue_is_empty(q->_queue))
{
q->waiting_threads++;
while (queue_is_empty(q->_queue))
{
pthread_cond_wait(&(q->cond), &(q->mutex));
}
q->waiting_threads--;
}
retval = queue_pop_head(q->_queue);
pthread_mutex_unlock(&(q->mutex));
return retval;
}
void async_queue_free(async_queue_t *q)
{
queue_free(q->_queue);
pthread_cond_destroy(&(q->cond));
pthread_mutex_destroy(&(q->mutex));
free(q);
}

Event事件

eventfd

int eventfd(unsigned int initval, int flags);

eventfd 是Linux提供内核态的事件等待/通知机制,内核维护了一个8字节的整型数,该整型数由 initval 来初始化, flags 参数可以由以下值位或而来:

EFD_CLOEXEC:设置该描述符的 O_CLOEXEC 标志。
EFD_NONBLOCK:设置描述符为非阻塞模式。
EFD_SEMAPHORE:设置描述符为信号量工作模式,在此模式下, read 模式会使整型数减1并返回数值1。

当内核维护的8字节整型数为0时, read 操作会阻塞,如果为fd设置为非阻塞模式,则返回 EAGAIN 错误。

eventfd实现的异步队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
typedef struct async_queue
{
int efd; //event fd
fd_set rdfds; //for select
queue_t *_queue;
} async_queue_t;
async_queue_t *async_queue_create(int size)
{
async_queue_t *q = malloc(sizeof (async_queue_t));
q->efd = eventfd(0, EFD_SEMAPHORE|EFD_NONBLOCK);
q->_queue = queue_create(size);
FD_ZERO(&(q->rdfds));
FD_SET(q->efd, &(q->rdfds));
return q;
}
void async_queue_push_tail(async_queue_t *q, void *data)
{
unsigned long long i = 1;
if (!queue_is_full(q->_queue))
{
queue_push_tail(q->_queue, data);
write(q->efd, &i, sizeof (i));
}
}
void *async_queue_pop_head(async_queue_t *q, struct timeval *tv)
{
unsigned long long i = 0;
void *data = NULL;
if (select(q->efd + 1, &(q->rdfds), NULL, NULL, tv) == 0)
{
return data;
}
else
{
read(q->efd, &i, sizeof (i));
return queue_pop_head(q->_queue);
}
}
void async_queue_free(async_queue_t *q)
{
queue_free(q->_queue);
close(q->efd);
free(q);
}

atomic原子变量

这种方式和第一种比较接近,只不过采用原子操作全局标志变量,没有mutex上锁,只是粗糙地满足通信队列的需求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
typedef struct async_queue
{
int busy_flag;
queue_t *_queue;
} async_queue_t;
void async_queue_push_tail(async_queue_t *q, void *data)
{
while(__sync_lock_test_and_set(&q->busy_flag, 1)){}
if(!queue_is_full(q->_queue))
queue_push_tail(q->_queue, data);
__sync_lock_test_and_set(&q->busy_flag, 0);
return;
}
void *async_queue_pop_head(async_queue_t *q, struct timeval *tv)
{
void *retval = NULL;
while(__sync_lock_test_and_set(&q->busy_flag, 1)){}
if(queue_is_empty(q->_queue))
{
retval = queue_pop_head(q->_queue);
}
tv = retval;
__sync_lock_test_and_set(&q->busy_flag, 0);
return;
}

附录1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
#define default_size 1024
typedef struct queue
{
int header;
int tail;
int size;
int capcity;
void **_buf;
} queue_t;
queue_t *queue_create(int size)
{
queue_t *q = malloc(sizeof (queue_t));
if (q != NULL)
{
if (size > 0)
{
q->_buf = malloc(size);
q->capcity = size;
}
else
{
q->_buf = malloc(default_size * sizeof (void *));
q->capcity = default_size;
}
q->header = q->tail = q->size = 0;
}
return q;
}
int queue_is_full(queue_t *q)
{
return q->size == q->capcity;
}
int queue_is_empty(queue_t *q)
{
return q->size == 0;
}
void queue_push_tail(queue_t *q, void *data)
{
if (!queue_is_full(q))
{
q->_buf[q->tail] = data;
q->tail = (q->tail + 1) % q->capcity;
q->size++;
}
}
void *queue_pop_head(queue_t *q)
{
void *data = NULL;
if (!queue_is_empty(q))
{
data = q->_buf[(q->header)];
q->header = (q->header + 1) % q->capcity;
q->size--;
}
return data;
}
int *queue_free(queue_t *q)
{
free(q->_buf);
free(q);
}
本文由foolyc创作和发表,采用BY-NC-SA国际许可协议进行许可
转载请注明作者及出处,本文作者为foolyc
本文标题为linux message queue
本文链接为http://foolyc.com//2017/03/29/linux-message-queue/.