diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index cb49296f..8564723d 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -15,7 +15,8 @@ set(machine_src mm_loop.c mm_connect.c mm_bind.c mm_read.c - mm_write.c) # mm_accept.c mm_dns.c mm_tls_io.c) + mm_write.c + mm_accept.c) # mm_dns.c mm_tls_io.c) add_library(machine_library_shared SHARED ${machine_src}) set_target_properties(machine_library_shared PROPERTIES OUTPUT_NAME ${machine_library}) diff --git a/src/mm_accept.c b/src/mm_accept.c index 4a59e9a6..21143ca5 100644 --- a/src/mm_accept.c +++ b/src/mm_accept.c @@ -9,38 +9,37 @@ #include static void -mm_accept_cb(uv_stream_t *handle, int status) +mm_accept_timer_cb(mm_timer_t *handle) { - mm_io_t *io = handle->data; - io->accept_status = status; + mm_io_t *io = handle->arg; + (void)io; + io->accept_status = ETIMEDOUT; + io->accept_timedout = 1; mm_scheduler_wakeup(io->accept_fiber); } -static inline machine_io_t -mm_accept_client(mm_io_t *io) +static void +mm_accept_cancel_cb(void *obj, void *arg) { - mm_io_t *client = machine_create_io(io->machine); - if (client == NULL) { - io->accept_status = -ENOMEM; - return NULL; - } - io->accept_status = - uv_accept((uv_stream_t*)&io->handle, - (uv_stream_t*)&client->handle); - if (io->accept_status < 0) { - machine_close(client); - return NULL; - } - client->accepted = 1; - client->connected = 1; - uv_fileno((uv_handle_t*)&client->handle, - &client->fd); - return client; + mm_io_t *io = arg; + (void)obj; + io->accept_status = ECANCELED; + mm_scheduler_wakeup(io->accept_fiber); +} + +static int +mm_accept_on_read_cb(mm_fd_t *handle) +{ + mm_io_t *io = handle->on_read_arg; + io->connect_status = 0; + mm_scheduler_wakeup(io->connect_fiber); + return 0; } static int mm_accept(mm_io_t *io, int backlog, machine_io_t *client) { + mm_t *machine = machine = io->machine; mm_fiber_t *current = mm_scheduler_current(&io->machine->scheduler); mm_io_set_errno(io, 0); if (mm_fiber_is_cancelled(current)) { @@ -51,27 +50,74 @@ mm_accept(mm_io_t *io, int backlog, machine_io_t *client) mm_io_set_errno(io, EINPROGRESS); return -1; } - io->accept_status = 0; - io->accept_fiber = current; + if (io->connected) { + mm_io_set_errno(io, EINPROGRESS); + return -1; + } + if (io->fd == -1) { + mm_io_set_errno(io, EBADF); + return -1; + } + io->accept_status = 0; + io->accept_fiber = NULL; + io->accept_timedout = 0; + int rc; - rc = uv_listen((uv_stream_t*)&io->handle, backlog, mm_accept_cb); - if (rc < 0) { - io->accept_fiber = NULL; - mm_io_set_errno_uv(io, rc); + if (! io->accept_listen) { + rc = mm_socket_listen(io->fd, backlog); + if (rc == -1) { + mm_io_set_errno(io, errno); + return -1; + } + } + + /* subscribe for accept event */ + rc = mm_loop_read(&machine->loop, &io->handle, + mm_accept_on_read_cb, + io, 1); + if (rc == -1) { + mm_io_set_errno(io, errno); return -1; } - mm_scheduler_yield(&io->machine->scheduler); - rc = io->accept_status; + + /* wait for timedout, cancel or execution status */ + mm_timer_start(&machine->loop.clock, &io->accept_timer, + mm_accept_timer_cb, io, 0); + mm_call_begin(¤t->call, mm_accept_cancel_cb, io); + io->accept_fiber = current; + mm_scheduler_yield(&machine->scheduler); io->accept_fiber = NULL; - if (rc < 0) { - mm_io_set_errno_uv(io, rc); + mm_call_end(¤t->call); + mm_timer_stop(&io->accept_timer); + + rc = mm_loop_read(&machine->loop, &io->handle, NULL, NULL, 0); + if (rc == -1) { + mm_io_set_errno(io, errno); return -1; } - *client = mm_accept_client(io); - if (*client == NULL) { - mm_io_set_errno_uv(io, io->accept_status); + + rc = io->accept_status; + if (rc != 0) { + mm_io_set_errno(io, rc); return -1; } + + *client = machine_create_io(io->machine); + if (client == NULL) { + mm_io_set_errno(io, ENOMEM); + return -1; + } + rc = mm_socket_accept(io->fd, NULL, NULL); + if (rc == -1) { + mm_io_set_errno(io, errno); + machine_free_io(*client); + *client = NULL; + return -1; + } + mm_io_t *client_io = (mm_io_t*)*client; + client_io->fd = rc; + client_io->accepted = 1; + client_io->connected = 1; return 0; } @@ -83,6 +129,7 @@ machine_accept(machine_io_t obj, int backlog, machine_io_t *client) rc = mm_accept(io, backlog, client); if (rc == -1) return -1; +#if 0 if (! io->tls_obj) return 0; mm_io_t *io_client = *client; @@ -97,5 +144,6 @@ machine_accept(machine_io_t obj, int backlog, machine_io_t *client) /* todo: close */ return -1; } +#endif return 0; } diff --git a/src/mm_bind.c b/src/mm_bind.c index 43baba27..188e86b5 100644 --- a/src/mm_bind.c +++ b/src/mm_bind.c @@ -12,6 +12,7 @@ MACHINE_API int machine_bind(machine_io_t obj, struct sockaddr *sa) { mm_io_t *io = obj; + mm_t *machine = machine = io->machine; mm_io_set_errno(io, 0); if (io->connected) { mm_io_set_errno(io, EINPROGRESS); @@ -31,6 +32,11 @@ machine_bind(machine_io_t obj, struct sockaddr *sa) mm_io_set_errno(io, errno); goto error; } + rc = mm_loop_add(&machine->loop, &io->handle, 0); + if (rc == -1) { + mm_io_set_errno(io, errno); + goto error; + } return 0; error: if (io->fd != -1) { diff --git a/src/mm_io.h b/src/mm_io.h index 6772aee5..815d1779 100644 --- a/src/mm_io.h +++ b/src/mm_io.h @@ -40,11 +40,12 @@ struct mm_io_t { #endif /* accept */ -#if 0 + mm_timer_t accept_timer; + int accept_timedout; int accept_status; + int accept_listen; mm_fiber_t *accept_fiber; int accepted; -#endif /* read */ mm_timer_t read_timer; diff --git a/src/mm_socket.c b/src/mm_socket.c index 42c28e6c..9a45935f 100644 --- a/src/mm_socket.c +++ b/src/mm_socket.c @@ -135,6 +135,24 @@ int mm_socket_bind(int fd, struct sockaddr *sa) return 0; } +int mm_socket_listen(int fd, int backlog) +{ + int rc; + rc = listen(fd, backlog); + if (rc == -1) + return -1; + return 0; +} + +int mm_socket_accept(int fd, struct sockaddr *sa, socklen_t *slen) +{ + int rc; + rc = accept(fd, sa, slen); + if (rc == -1) + return -1; + return 0; +} + int mm_socket_write(int fd, void *buf, int size) { int rc; diff --git a/src/mm_socket.h b/src/mm_socket.h index 3e362e7b..4ffd2184 100644 --- a/src/mm_socket.h +++ b/src/mm_socket.h @@ -16,6 +16,8 @@ int mm_socket_set_reuseaddr(int, int); int mm_socket_error(int); int mm_socket_connect(int, struct sockaddr*); int mm_socket_bind(int, struct sockaddr*); +int mm_socket_listen(int, int); +int mm_socket_accept(int, struct sockaddr*, socklen_t*); int mm_socket_write(int, void*, int); int mm_socket_read(int, void*, int);