machinarium: make channel properly handle waiters queue

This commit is contained in:
Dmitry Simonenko 2017-05-22 15:23:15 +03:00
parent 7f275249f2
commit 3d0f5210b0
3 changed files with 35 additions and 27 deletions

View File

@ -7,7 +7,14 @@
* cooperative multitasking engine.
*/
typedef struct mm_channel_t mm_channel_t;
typedef struct mm_channelreader_t mm_channelreader_t;
typedef struct mm_channel_t mm_channel_t;
struct mm_channelreader_t {
mm_call_t call;
int signaled;
mm_list_t link;
};
struct mm_channel_t {
mm_list_t incoming;
@ -47,10 +54,16 @@ mm_channel_write(mm_channel_t *channel, mm_msg_t *msg)
mm_list_t *first;
first = channel->readers.next;
mm_fiber_t *fiber;
fiber = mm_container_of(first, mm_fiber_t, link_channel);
mm_channelreader_t *reader;
reader = mm_container_of(first, mm_channelreader_t, link);
reader->signaled = 1;
mm_scheduler_wakeup(&mm_self->scheduler, fiber);
/* remove reader from the queue, to properly handle
* other waiting consumers */
mm_list_unlink(&reader->link);
channel->readers_count--;
mm_scheduler_wakeup(&mm_self->scheduler, reader->call.fiber);
}
static inline mm_msg_t*
@ -59,33 +72,30 @@ mm_channel_read(mm_channel_t *channel, int time_ms)
if (channel->incoming_count > 0)
goto fetch;
mm_fiber_t *fiber;
fiber = mm_scheduler_current(&mm_self->scheduler);
mm_list_append(&channel->readers, &fiber->link_channel);
mm_channelreader_t reader;
reader.signaled = 0;
mm_list_init(&reader.link);
mm_list_append(&channel->readers, &reader.link);
channel->readers_count++;
mm_call_t call;
mm_call(&call, time_ms);
assert(channel->readers_count > 0);
channel->readers_count--;
mm_list_unlink(&fiber->link_channel);
if (call.status != 0) {
mm_call(&reader.call, time_ms);
if (reader.call.status != 0) {
/* timedout or cancel */
if (! reader.signaled) {
assert(channel->readers_count > 0);
channel->readers_count--;
mm_list_unlink(&reader.link);
}
return NULL;
}
assert(reader.signaled);
fetch:
if (channel->incoming_count > 0) {
mm_list_t *first;
first = mm_list_pop(&channel->incoming);
channel->incoming_count--;
mm_msg_t *msg;
msg = mm_container_of(first, mm_msg_t, link);
return msg;
}
return NULL;
fetch:;
mm_list_t *first;
first = mm_list_pop(&channel->incoming);
channel->incoming_count--;
return mm_container_of(first, mm_msg_t, link);
}
#endif

View File

@ -17,7 +17,6 @@ void mm_fiber_init(mm_fiber_t *fiber)
mm_list_init(&fiber->joiners);
mm_list_init(&fiber->link);
mm_list_init(&fiber->link_join);
mm_list_init(&fiber->link_channel);
}
mm_fiber_t*

View File

@ -29,7 +29,6 @@ struct mm_fiber_t {
mm_fiber_t *resume;
void *call_ptr;
mm_list_t joiners;
mm_list_t link_channel;
mm_list_t link_join;
mm_list_t link;
};