diff --git a/src/mm_connect.c b/src/mm_connect.c index 031a9ef1..1ccf601f 100644 --- a/src/mm_connect.c +++ b/src/mm_connect.c @@ -9,37 +9,73 @@ #include static void -mm_connect_timeout_cb(uv_timer_t *handle) +mm_connect_timer_cb(mm_timer_t *handle) { - mm_io_t *io = handle->data; + mm_io_t *io = handle->arg; + io->connect_status = ETIMEDOUT; io->connect_timedout = 1; - /* cancel connection request, - * connect callback will be called anyway */ - mm_io_close_handle(io, (uv_handle_t*)&io->handle); -} - -static void -mm_connect_cb(uv_connect_t *handle, int status) -{ - mm_io_t *io = handle->data; - if (mm_fiber_is_cancelled(io->connect_fiber)) - goto wakeup; - if (io->connect_timedout) - goto wakeup; - mm_io_timer_stop(&io->connect_timer); -wakeup: - io->connect_status = status; mm_scheduler_wakeup(io->connect_fiber); } static void mm_connect_cancel_cb(void *obj, void *arg) { - (void)obj; mm_io_t *io = arg; - io->write_timedout = 0; - mm_io_timer_stop(&io->connect_timer); - mm_io_close_handle(io, (uv_handle_t*)&io->handle); + (void)obj; + io->connect_status = ECANCELED; + mm_scheduler_wakeup(io->connect_fiber); +} + +static int +mm_connect_cb(mm_fd_t *handle, int mask) +{ + mm_io_t *io = handle->arg; + (void)mask; + int rc; + rc = mm_socket_error(handle->fd); + io->connect_status = rc; + mm_scheduler_wakeup(io->connect_fiber); + return 0; +} + +static int +mm_connect_socket(mm_io_t *io, struct sockaddr *sa) +{ + int rc; + rc = mm_socket(sa->sa_family, SOCK_STREAM, 0); + if (rc == -1) { + mm_io_set_errno(io, errno); + return -1; + } + io->fd = rc; + rc = mm_socket_set_nosigpipe(io->fd, 1); + if (rc == -1) { + mm_io_set_errno(io, errno); + return -1; + } + rc = mm_socket_set_nonblock(io->fd, 1); + if (rc == -1) { + mm_io_set_errno(io, errno); + return -1; + } + if (io->opt_nodelay) { + rc = mm_socket_set_nodelay(io->fd, 1); + if (rc == -1) { + mm_io_set_errno(io, errno); + return -1; + } + } + if (io->opt_keepalive) { + rc = mm_socket_set_keepalive(io->fd, 1, io->opt_keepalive_delay); + if (rc == -1) { + mm_io_set_errno(io, errno); + return -1; + } + } + io->handle.fd = io->fd; + io->handle.callback = NULL; + io->handle.arg = io; + return 0; } static int @@ -47,8 +83,8 @@ mm_connect(mm_io_t *io, struct sockaddr *sa, uint64_t time_ms) { mm_t *machine = machine = io->machine; mm_fiber_t *current = mm_scheduler_current(&machine->scheduler); - mm_io_set_errno(io, 0); + if (mm_fiber_is_cancelled(current)) { mm_io_set_errno(io, ECANCELED); return -1; @@ -57,37 +93,78 @@ mm_connect(mm_io_t *io, struct sockaddr *sa, uint64_t time_ms) mm_io_set_errno(io, EINPROGRESS); return -1; } + if (io->connected) { + mm_io_set_errno(io, EINPROGRESS); + return -1; + } io->connect_status = 0; io->connect_timedout = 0; io->connect_fiber = current; - /* start timer and connection */ - mm_io_timer_start(&io->connect_timer, mm_connect_timeout_cb, time_ms); + /* create socket */ int rc; - rc = uv_tcp_connect(&io->connect, &io->handle, - sa, mm_connect_cb); - if (rc < 0) { - mm_io_timer_stop(&io->connect_timer); - io->connect_fiber = NULL; - mm_io_set_errno_uv(io, rc); - return -1; + rc = mm_connect_socket(io, sa); + if (rc == -1) + goto error; + + /* start connection */ + rc = mm_socket_connect(io->fd, sa); + if (rc == 0) + goto done; + + assert(rc == -1); + if (errno != EINPROGRESS) { + mm_io_set_errno(io, errno); + goto error; + } + + /* subscribe for connection event */ + io->handle.callback = mm_connect_cb; + rc = mm_loop_add(&machine->loop, &io->handle, MM_W); + if (rc == -1) { + mm_io_set_errno(io, errno); + goto error; + } + + /* start timer */ + if (time_ms > 0) { + mm_timer_init(&io->connect_timer, mm_connect_timer_cb, io, time_ms); + mm_clock_timer_add(&machine->loop.clock, &io->connect_timer); } /* wait for completion */ mm_call_begin(¤t->call, mm_connect_cancel_cb, io); mm_scheduler_yield(&machine->scheduler); mm_call_end(¤t->call); - io->connect_fiber = NULL; + mm_timer_stop(&io->connect_timer); - /* result from timer or connect callback */ - rc = io->connect_status; - if (rc == 0) { - assert(! io->connect_timedout); - io->connected = 1; - return 0; + io->handle.callback = NULL; + rc = mm_loop_delete(&machine->loop, &io->handle); + if (rc == -1) { + mm_io_set_errno(io, errno); + goto error; } - mm_io_set_errno_uv(io, rc); + io->connect_fiber = NULL; + rc = io->connect_status; + if (rc != 0) { + mm_io_set_errno(io, rc); + goto error; + } + +done: + assert(! io->connect_timedout); + io->connected = 1; + return 0; + +error: + io->connect_fiber = NULL; + if (io->fd != -1) { + close(io->fd); + io->fd = -1; + } + io->handle.callback = NULL; + io->handle.fd = -1; return -1; } @@ -100,11 +177,13 @@ machine_connect(machine_io_t obj, struct sockaddr *sa, uint64_t time_ms) return -1; if (! io->tls_obj) return 0; +#if 0 rc = mm_tlsio_connect(&io->tls, io->tls_obj); if (rc == -1) { /* todo: close */ return -1; } +#endif return 0; } diff --git a/tests/makefile b/tests/makefile index 25b36c38..985fb4d8 100644 --- a/tests/makefile +++ b/tests/makefile @@ -1,17 +1,20 @@ CC = gcc RM = rm CFLAGS = -I. -Wall -g -O0 -I../src -LFLAGS_LIB = ../src/libmachinarium.a -LFLAGS_LIB_UV = -luv -pthread -lssl -lcrypto -LFLAGS = $(LFLAGS_LIB) $(LFLAGS_LIB_UV) +LFLAGS_LIB = ../src/libmachinarium.a -pthread -lssl -lcrypto +LFLAGS = $(LFLAGS_LIB) TESTS = test_new \ test_create \ test_sleep \ test_wait \ test_condition \ test_condition_2 \ + benchmark \ test_io_new \ - test_getaddrinfo \ + test_cancel_connect \ + test_cancel_connect_2 + +# test_getaddrinfo \ test_getaddrinfo_2 \ test_getaddrinfo_3 \ test_client_server \ @@ -24,7 +27,6 @@ TESTS = test_new \ test_cancel_connect_2 \ test_cancel_read \ test_tls_connect \ - benchmark \ echo all: validate clean $(TESTS) test_new: diff --git a/tests/test_cancel_connect.c b/tests/test_cancel_connect.c index 5c5043e1..05ee05f1 100644 --- a/tests/test_cancel_connect.c +++ b/tests/test_cancel_connect.c @@ -6,7 +6,7 @@ */ #include -#include +#include #include static void @@ -15,13 +15,20 @@ test_connect(void *arg) machine_t machine = arg; printf("child started\n"); machine_io_t client = machine_create_io(machine); + struct sockaddr_in sa; - uv_ip4_addr("8.8.8.8", 1324, &sa); + sa.sin_family = AF_INET; + sa.sin_addr.s_addr = inet_addr("8.8.8.8"); + sa.sin_port = htons(1234); + int rc; - rc = machine_connect(client, (struct sockaddr*)&sa, 0); + rc = machine_connect(client, (struct sockaddr *)&sa, 0); printf("child resumed\n"); assert(rc < 0); + machine_close(client); + machine_free_io(client); + if (machine_cancelled(machine)) printf("child marked as cancel\n"); printf("child end\n"); diff --git a/tests/test_cancel_connect_2.c b/tests/test_cancel_connect_2.c index 3e58049b..30fd53a2 100644 --- a/tests/test_cancel_connect_2.c +++ b/tests/test_cancel_connect_2.c @@ -16,13 +16,18 @@ test_connect(void *arg) assert(machine_cancelled(machine)); printf("child started\n"); machine_io_t client = machine_create_io(machine); + struct sockaddr_in sa; - uv_ip4_addr("8.8.8.8", 1324, &sa); + sa.sin_family = AF_INET; + sa.sin_addr.s_addr = inet_addr("8.8.8.8"); + sa.sin_port = htons(1234); + int rc; rc = machine_connect(client, (struct sockaddr*)&sa, 0); printf("child resumed\n"); assert(rc < 0); machine_close(client); + machine_free_io(client); if (machine_cancelled(machine)) printf("child marked as cancel\n"); printf("child end\n"); diff --git a/tests/test_io_new.c b/tests/test_io_new.c index 37331c7b..9abc51a6 100644 --- a/tests/test_io_new.c +++ b/tests/test_io_new.c @@ -12,7 +12,9 @@ main(int argc, char *argv[]) { machine_t machine = machine_create(); machine_io_t io = machine_create_io(machine); + machine_close(io); + machine_free_io(io); machine_free(machine); return 0; }