diff --git a/src/mm_channel.h b/src/mm_channel.h index 5acade7a..b14fe089 100644 --- a/src/mm_channel.h +++ b/src/mm_channel.h @@ -7,7 +7,14 @@ * cooperative multitasking engine. */ -typedef struct mm_channel_t mm_channel_t; +typedef struct mm_channelreader_t mm_channelreader_t; +typedef struct mm_channel_t mm_channel_t; + +struct mm_channelreader_t { + mm_call_t call; + int signaled; + mm_list_t link; +}; struct mm_channel_t { mm_list_t incoming; @@ -47,10 +54,16 @@ mm_channel_write(mm_channel_t *channel, mm_msg_t *msg) mm_list_t *first; first = channel->readers.next; - mm_fiber_t *fiber; - fiber = mm_container_of(first, mm_fiber_t, link_channel); + mm_channelreader_t *reader; + reader = mm_container_of(first, mm_channelreader_t, link); + reader->signaled = 1; - mm_scheduler_wakeup(&mm_self->scheduler, fiber); + /* remove reader from the queue, to properly handle + * other waiting consumers */ + mm_list_unlink(&reader->link); + channel->readers_count--; + + mm_scheduler_wakeup(&mm_self->scheduler, reader->call.fiber); } static inline mm_msg_t* @@ -59,33 +72,30 @@ mm_channel_read(mm_channel_t *channel, int time_ms) if (channel->incoming_count > 0) goto fetch; - mm_fiber_t *fiber; - fiber = mm_scheduler_current(&mm_self->scheduler); - mm_list_append(&channel->readers, &fiber->link_channel); + mm_channelreader_t reader; + reader.signaled = 0; + mm_list_init(&reader.link); + + mm_list_append(&channel->readers, &reader.link); channel->readers_count++; - mm_call_t call; - mm_call(&call, time_ms); - - assert(channel->readers_count > 0); - channel->readers_count--; - mm_list_unlink(&fiber->link_channel); - - if (call.status != 0) { + mm_call(&reader.call, time_ms); + if (reader.call.status != 0) { /* timedout or cancel */ + if (! reader.signaled) { + assert(channel->readers_count > 0); + channel->readers_count--; + mm_list_unlink(&reader.link); + } return NULL; } + assert(reader.signaled); -fetch: - if (channel->incoming_count > 0) { - mm_list_t *first; - first = mm_list_pop(&channel->incoming); - channel->incoming_count--; - mm_msg_t *msg; - msg = mm_container_of(first, mm_msg_t, link); - return msg; - } - return NULL; +fetch:; + mm_list_t *first; + first = mm_list_pop(&channel->incoming); + channel->incoming_count--; + return mm_container_of(first, mm_msg_t, link); } #endif diff --git a/src/mm_fiber.c b/src/mm_fiber.c index effab196..79b3263f 100644 --- a/src/mm_fiber.c +++ b/src/mm_fiber.c @@ -17,7 +17,6 @@ void mm_fiber_init(mm_fiber_t *fiber) mm_list_init(&fiber->joiners); mm_list_init(&fiber->link); mm_list_init(&fiber->link_join); - mm_list_init(&fiber->link_channel); } mm_fiber_t* diff --git a/src/mm_fiber.h b/src/mm_fiber.h index 01be36c5..a969eb5d 100644 --- a/src/mm_fiber.h +++ b/src/mm_fiber.h @@ -29,7 +29,6 @@ struct mm_fiber_t { mm_fiber_t *resume; void *call_ptr; mm_list_t joiners; - mm_list_t link_channel; mm_list_t link_join; mm_list_t link; };