/* * machinarium. * * cooperative multitasking engine. */ #include #include void mm_channel_init(mm_channel_t *channel) { channel->type.is_shared = 1; pthread_mutex_init(&channel->lock, NULL); mm_list_init(&channel->msg_list); channel->msg_list_count = 0; mm_list_init(&channel->readers); channel->readers_count = 0; } void mm_channel_free(mm_channel_t *channel) { mm_list_t *i, *n; mm_list_foreach_safe(&channel->msg_list, i, n) { mm_msg_t *msg = mm_container_of(i, mm_msg_t, link); mm_msg_unref(&machinarium.msg_cache, msg); } pthread_mutex_destroy(&channel->lock); } void mm_channel_write(mm_channel_t *channel, mm_msg_t *msg) { pthread_mutex_lock(&channel->lock); if (channel->readers_count) { mm_channelrd_t *reader; reader = mm_container_of(channel->readers.next, mm_channelrd_t, link); reader->result = msg; mm_list_unlink(&reader->link); channel->readers_count--; int event_mgr_fd; event_mgr_fd = mm_eventmgr_signal(&reader->event); pthread_mutex_unlock(&channel->lock); if (event_mgr_fd > 0) mm_eventmgr_wakeup(event_mgr_fd); return; } mm_list_append(&channel->msg_list, &msg->link); channel->msg_list_count++; pthread_mutex_unlock(&channel->lock); } mm_msg_t* mm_channel_read(mm_channel_t *channel, uint32_t time_ms) { /* try to get first message, if no other readers are * waiting, otherwise put reader in the wait * channel */ pthread_mutex_lock(&channel->lock); mm_list_t *next; if (channel->msg_list_count > 0 && channel->readers_count == 0) { next = mm_list_pop(&channel->msg_list); channel->msg_list_count--; pthread_mutex_unlock(&channel->lock); return mm_container_of(next, mm_msg_t, link); } /* put reader into channel and register event */ mm_channelrd_t reader; reader.result = NULL; mm_list_init(&reader.link); mm_eventmgr_add(&mm_self->event_mgr, &reader.event); mm_list_append(&channel->readers, &reader.link); channel->readers_count++; pthread_mutex_unlock(&channel->lock); /* wait for cancel, timedout or writer event */ mm_eventmgr_wait(&mm_self->event_mgr, &reader.event, time_ms); pthread_mutex_lock(&channel->lock); if (! reader.result) { assert(channel->readers_count > 0); channel->readers_count--; mm_list_unlink(&reader.link); } pthread_mutex_unlock(&channel->lock); /* timedout or cancel */ if (reader.event.call.status != 0) { if (reader.result) mm_msg_unref(&machinarium.msg_cache, reader.result); return NULL; } return reader.result; }