mirror of https://github.com/yandex/odyssey.git
machinarium: implement machine_accept()
This commit is contained in:
parent
8eda2208ef
commit
c932b593f7
|
@ -15,7 +15,8 @@ set(machine_src mm_loop.c
|
|||
mm_connect.c
|
||||
mm_bind.c
|
||||
mm_read.c
|
||||
mm_write.c) # mm_accept.c mm_dns.c mm_tls_io.c)
|
||||
mm_write.c
|
||||
mm_accept.c) # mm_dns.c mm_tls_io.c)
|
||||
|
||||
add_library(machine_library_shared SHARED ${machine_src})
|
||||
set_target_properties(machine_library_shared PROPERTIES OUTPUT_NAME ${machine_library})
|
||||
|
|
118
src/mm_accept.c
118
src/mm_accept.c
|
@ -9,38 +9,37 @@
|
|||
#include <machinarium.h>
|
||||
|
||||
static void
|
||||
mm_accept_cb(uv_stream_t *handle, int status)
|
||||
mm_accept_timer_cb(mm_timer_t *handle)
|
||||
{
|
||||
mm_io_t *io = handle->data;
|
||||
io->accept_status = status;
|
||||
mm_io_t *io = handle->arg;
|
||||
(void)io;
|
||||
io->accept_status = ETIMEDOUT;
|
||||
io->accept_timedout = 1;
|
||||
mm_scheduler_wakeup(io->accept_fiber);
|
||||
}
|
||||
|
||||
static inline machine_io_t
|
||||
mm_accept_client(mm_io_t *io)
|
||||
static void
|
||||
mm_accept_cancel_cb(void *obj, void *arg)
|
||||
{
|
||||
mm_io_t *client = machine_create_io(io->machine);
|
||||
if (client == NULL) {
|
||||
io->accept_status = -ENOMEM;
|
||||
return NULL;
|
||||
}
|
||||
io->accept_status =
|
||||
uv_accept((uv_stream_t*)&io->handle,
|
||||
(uv_stream_t*)&client->handle);
|
||||
if (io->accept_status < 0) {
|
||||
machine_close(client);
|
||||
return NULL;
|
||||
}
|
||||
client->accepted = 1;
|
||||
client->connected = 1;
|
||||
uv_fileno((uv_handle_t*)&client->handle,
|
||||
&client->fd);
|
||||
return client;
|
||||
mm_io_t *io = arg;
|
||||
(void)obj;
|
||||
io->accept_status = ECANCELED;
|
||||
mm_scheduler_wakeup(io->accept_fiber);
|
||||
}
|
||||
|
||||
static int
|
||||
mm_accept_on_read_cb(mm_fd_t *handle)
|
||||
{
|
||||
mm_io_t *io = handle->on_read_arg;
|
||||
io->connect_status = 0;
|
||||
mm_scheduler_wakeup(io->connect_fiber);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int
|
||||
mm_accept(mm_io_t *io, int backlog, machine_io_t *client)
|
||||
{
|
||||
mm_t *machine = machine = io->machine;
|
||||
mm_fiber_t *current = mm_scheduler_current(&io->machine->scheduler);
|
||||
mm_io_set_errno(io, 0);
|
||||
if (mm_fiber_is_cancelled(current)) {
|
||||
|
@ -51,27 +50,74 @@ mm_accept(mm_io_t *io, int backlog, machine_io_t *client)
|
|||
mm_io_set_errno(io, EINPROGRESS);
|
||||
return -1;
|
||||
}
|
||||
io->accept_status = 0;
|
||||
io->accept_fiber = current;
|
||||
if (io->connected) {
|
||||
mm_io_set_errno(io, EINPROGRESS);
|
||||
return -1;
|
||||
}
|
||||
if (io->fd == -1) {
|
||||
mm_io_set_errno(io, EBADF);
|
||||
return -1;
|
||||
}
|
||||
io->accept_status = 0;
|
||||
io->accept_fiber = NULL;
|
||||
io->accept_timedout = 0;
|
||||
|
||||
int rc;
|
||||
rc = uv_listen((uv_stream_t*)&io->handle, backlog, mm_accept_cb);
|
||||
if (rc < 0) {
|
||||
io->accept_fiber = NULL;
|
||||
mm_io_set_errno_uv(io, rc);
|
||||
if (! io->accept_listen) {
|
||||
rc = mm_socket_listen(io->fd, backlog);
|
||||
if (rc == -1) {
|
||||
mm_io_set_errno(io, errno);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
/* subscribe for accept event */
|
||||
rc = mm_loop_read(&machine->loop, &io->handle,
|
||||
mm_accept_on_read_cb,
|
||||
io, 1);
|
||||
if (rc == -1) {
|
||||
mm_io_set_errno(io, errno);
|
||||
return -1;
|
||||
}
|
||||
mm_scheduler_yield(&io->machine->scheduler);
|
||||
rc = io->accept_status;
|
||||
|
||||
/* wait for timedout, cancel or execution status */
|
||||
mm_timer_start(&machine->loop.clock, &io->accept_timer,
|
||||
mm_accept_timer_cb, io, 0);
|
||||
mm_call_begin(¤t->call, mm_accept_cancel_cb, io);
|
||||
io->accept_fiber = current;
|
||||
mm_scheduler_yield(&machine->scheduler);
|
||||
io->accept_fiber = NULL;
|
||||
if (rc < 0) {
|
||||
mm_io_set_errno_uv(io, rc);
|
||||
mm_call_end(¤t->call);
|
||||
mm_timer_stop(&io->accept_timer);
|
||||
|
||||
rc = mm_loop_read(&machine->loop, &io->handle, NULL, NULL, 0);
|
||||
if (rc == -1) {
|
||||
mm_io_set_errno(io, errno);
|
||||
return -1;
|
||||
}
|
||||
*client = mm_accept_client(io);
|
||||
if (*client == NULL) {
|
||||
mm_io_set_errno_uv(io, io->accept_status);
|
||||
|
||||
rc = io->accept_status;
|
||||
if (rc != 0) {
|
||||
mm_io_set_errno(io, rc);
|
||||
return -1;
|
||||
}
|
||||
|
||||
*client = machine_create_io(io->machine);
|
||||
if (client == NULL) {
|
||||
mm_io_set_errno(io, ENOMEM);
|
||||
return -1;
|
||||
}
|
||||
rc = mm_socket_accept(io->fd, NULL, NULL);
|
||||
if (rc == -1) {
|
||||
mm_io_set_errno(io, errno);
|
||||
machine_free_io(*client);
|
||||
*client = NULL;
|
||||
return -1;
|
||||
}
|
||||
mm_io_t *client_io = (mm_io_t*)*client;
|
||||
client_io->fd = rc;
|
||||
client_io->accepted = 1;
|
||||
client_io->connected = 1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -83,6 +129,7 @@ machine_accept(machine_io_t obj, int backlog, machine_io_t *client)
|
|||
rc = mm_accept(io, backlog, client);
|
||||
if (rc == -1)
|
||||
return -1;
|
||||
#if 0
|
||||
if (! io->tls_obj)
|
||||
return 0;
|
||||
mm_io_t *io_client = *client;
|
||||
|
@ -97,5 +144,6 @@ machine_accept(machine_io_t obj, int backlog, machine_io_t *client)
|
|||
/* todo: close */
|
||||
return -1;
|
||||
}
|
||||
#endif
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -12,6 +12,7 @@ MACHINE_API int
|
|||
machine_bind(machine_io_t obj, struct sockaddr *sa)
|
||||
{
|
||||
mm_io_t *io = obj;
|
||||
mm_t *machine = machine = io->machine;
|
||||
mm_io_set_errno(io, 0);
|
||||
if (io->connected) {
|
||||
mm_io_set_errno(io, EINPROGRESS);
|
||||
|
@ -31,6 +32,11 @@ machine_bind(machine_io_t obj, struct sockaddr *sa)
|
|||
mm_io_set_errno(io, errno);
|
||||
goto error;
|
||||
}
|
||||
rc = mm_loop_add(&machine->loop, &io->handle, 0);
|
||||
if (rc == -1) {
|
||||
mm_io_set_errno(io, errno);
|
||||
goto error;
|
||||
}
|
||||
return 0;
|
||||
error:
|
||||
if (io->fd != -1) {
|
||||
|
|
|
@ -40,11 +40,12 @@ struct mm_io_t {
|
|||
#endif
|
||||
|
||||
/* accept */
|
||||
#if 0
|
||||
mm_timer_t accept_timer;
|
||||
int accept_timedout;
|
||||
int accept_status;
|
||||
int accept_listen;
|
||||
mm_fiber_t *accept_fiber;
|
||||
int accepted;
|
||||
#endif
|
||||
|
||||
/* read */
|
||||
mm_timer_t read_timer;
|
||||
|
|
|
@ -135,6 +135,24 @@ int mm_socket_bind(int fd, struct sockaddr *sa)
|
|||
return 0;
|
||||
}
|
||||
|
||||
int mm_socket_listen(int fd, int backlog)
|
||||
{
|
||||
int rc;
|
||||
rc = listen(fd, backlog);
|
||||
if (rc == -1)
|
||||
return -1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int mm_socket_accept(int fd, struct sockaddr *sa, socklen_t *slen)
|
||||
{
|
||||
int rc;
|
||||
rc = accept(fd, sa, slen);
|
||||
if (rc == -1)
|
||||
return -1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int mm_socket_write(int fd, void *buf, int size)
|
||||
{
|
||||
int rc;
|
||||
|
|
|
@ -16,6 +16,8 @@ int mm_socket_set_reuseaddr(int, int);
|
|||
int mm_socket_error(int);
|
||||
int mm_socket_connect(int, struct sockaddr*);
|
||||
int mm_socket_bind(int, struct sockaddr*);
|
||||
int mm_socket_listen(int, int);
|
||||
int mm_socket_accept(int, struct sockaddr*, socklen_t*);
|
||||
int mm_socket_write(int, void*, int);
|
||||
int mm_socket_read(int, void*, int);
|
||||
|
||||
|
|
Loading…
Reference in New Issue