machinarium: implement machine_connect() and tests

This commit is contained in:
Dmitry Simonenko 2017-04-11 16:50:11 +03:00
parent 5900bb4d49
commit 74f9054ed8
5 changed files with 144 additions and 49 deletions

View File

@ -9,37 +9,73 @@
#include <machinarium.h>
static void
mm_connect_timeout_cb(uv_timer_t *handle)
mm_connect_timer_cb(mm_timer_t *handle)
{
mm_io_t *io = handle->data;
mm_io_t *io = handle->arg;
io->connect_status = ETIMEDOUT;
io->connect_timedout = 1;
/* cancel connection request,
* connect callback will be called anyway */
mm_io_close_handle(io, (uv_handle_t*)&io->handle);
}
static void
mm_connect_cb(uv_connect_t *handle, int status)
{
mm_io_t *io = handle->data;
if (mm_fiber_is_cancelled(io->connect_fiber))
goto wakeup;
if (io->connect_timedout)
goto wakeup;
mm_io_timer_stop(&io->connect_timer);
wakeup:
io->connect_status = status;
mm_scheduler_wakeup(io->connect_fiber);
}
static void
mm_connect_cancel_cb(void *obj, void *arg)
{
(void)obj;
mm_io_t *io = arg;
io->write_timedout = 0;
mm_io_timer_stop(&io->connect_timer);
mm_io_close_handle(io, (uv_handle_t*)&io->handle);
(void)obj;
io->connect_status = ECANCELED;
mm_scheduler_wakeup(io->connect_fiber);
}
static int
mm_connect_cb(mm_fd_t *handle, int mask)
{
mm_io_t *io = handle->arg;
(void)mask;
int rc;
rc = mm_socket_error(handle->fd);
io->connect_status = rc;
mm_scheduler_wakeup(io->connect_fiber);
return 0;
}
static int
mm_connect_socket(mm_io_t *io, struct sockaddr *sa)
{
int rc;
rc = mm_socket(sa->sa_family, SOCK_STREAM, 0);
if (rc == -1) {
mm_io_set_errno(io, errno);
return -1;
}
io->fd = rc;
rc = mm_socket_set_nosigpipe(io->fd, 1);
if (rc == -1) {
mm_io_set_errno(io, errno);
return -1;
}
rc = mm_socket_set_nonblock(io->fd, 1);
if (rc == -1) {
mm_io_set_errno(io, errno);
return -1;
}
if (io->opt_nodelay) {
rc = mm_socket_set_nodelay(io->fd, 1);
if (rc == -1) {
mm_io_set_errno(io, errno);
return -1;
}
}
if (io->opt_keepalive) {
rc = mm_socket_set_keepalive(io->fd, 1, io->opt_keepalive_delay);
if (rc == -1) {
mm_io_set_errno(io, errno);
return -1;
}
}
io->handle.fd = io->fd;
io->handle.callback = NULL;
io->handle.arg = io;
return 0;
}
static int
@ -47,8 +83,8 @@ mm_connect(mm_io_t *io, struct sockaddr *sa, uint64_t time_ms)
{
mm_t *machine = machine = io->machine;
mm_fiber_t *current = mm_scheduler_current(&machine->scheduler);
mm_io_set_errno(io, 0);
if (mm_fiber_is_cancelled(current)) {
mm_io_set_errno(io, ECANCELED);
return -1;
@ -57,37 +93,78 @@ mm_connect(mm_io_t *io, struct sockaddr *sa, uint64_t time_ms)
mm_io_set_errno(io, EINPROGRESS);
return -1;
}
if (io->connected) {
mm_io_set_errno(io, EINPROGRESS);
return -1;
}
io->connect_status = 0;
io->connect_timedout = 0;
io->connect_fiber = current;
/* start timer and connection */
mm_io_timer_start(&io->connect_timer, mm_connect_timeout_cb, time_ms);
/* create socket */
int rc;
rc = uv_tcp_connect(&io->connect, &io->handle,
sa, mm_connect_cb);
if (rc < 0) {
mm_io_timer_stop(&io->connect_timer);
io->connect_fiber = NULL;
mm_io_set_errno_uv(io, rc);
return -1;
rc = mm_connect_socket(io, sa);
if (rc == -1)
goto error;
/* start connection */
rc = mm_socket_connect(io->fd, sa);
if (rc == 0)
goto done;
assert(rc == -1);
if (errno != EINPROGRESS) {
mm_io_set_errno(io, errno);
goto error;
}
/* subscribe for connection event */
io->handle.callback = mm_connect_cb;
rc = mm_loop_add(&machine->loop, &io->handle, MM_W);
if (rc == -1) {
mm_io_set_errno(io, errno);
goto error;
}
/* start timer */
if (time_ms > 0) {
mm_timer_init(&io->connect_timer, mm_connect_timer_cb, io, time_ms);
mm_clock_timer_add(&machine->loop.clock, &io->connect_timer);
}
/* wait for completion */
mm_call_begin(&current->call, mm_connect_cancel_cb, io);
mm_scheduler_yield(&machine->scheduler);
mm_call_end(&current->call);
io->connect_fiber = NULL;
mm_timer_stop(&io->connect_timer);
/* result from timer or connect callback */
rc = io->connect_status;
if (rc == 0) {
assert(! io->connect_timedout);
io->connected = 1;
return 0;
io->handle.callback = NULL;
rc = mm_loop_delete(&machine->loop, &io->handle);
if (rc == -1) {
mm_io_set_errno(io, errno);
goto error;
}
mm_io_set_errno_uv(io, rc);
io->connect_fiber = NULL;
rc = io->connect_status;
if (rc != 0) {
mm_io_set_errno(io, rc);
goto error;
}
done:
assert(! io->connect_timedout);
io->connected = 1;
return 0;
error:
io->connect_fiber = NULL;
if (io->fd != -1) {
close(io->fd);
io->fd = -1;
}
io->handle.callback = NULL;
io->handle.fd = -1;
return -1;
}
@ -100,11 +177,13 @@ machine_connect(machine_io_t obj, struct sockaddr *sa, uint64_t time_ms)
return -1;
if (! io->tls_obj)
return 0;
#if 0
rc = mm_tlsio_connect(&io->tls, io->tls_obj);
if (rc == -1) {
/* todo: close */
return -1;
}
#endif
return 0;
}

View File

@ -1,17 +1,20 @@
CC = gcc
RM = rm
CFLAGS = -I. -Wall -g -O0 -I../src
LFLAGS_LIB = ../src/libmachinarium.a
LFLAGS_LIB_UV = -luv -pthread -lssl -lcrypto
LFLAGS = $(LFLAGS_LIB) $(LFLAGS_LIB_UV)
LFLAGS_LIB = ../src/libmachinarium.a -pthread -lssl -lcrypto
LFLAGS = $(LFLAGS_LIB)
TESTS = test_new \
test_create \
test_sleep \
test_wait \
test_condition \
test_condition_2 \
benchmark \
test_io_new \
test_getaddrinfo \
test_cancel_connect \
test_cancel_connect_2
# test_getaddrinfo \
test_getaddrinfo_2 \
test_getaddrinfo_3 \
test_client_server \
@ -24,7 +27,6 @@ TESTS = test_new \
test_cancel_connect_2 \
test_cancel_read \
test_tls_connect \
benchmark \
echo
all: validate clean $(TESTS)
test_new:

View File

@ -6,7 +6,7 @@
*/
#include <machinarium.h>
#include <uv.h>
#include <machinarium_private.h>
#include <assert.h>
static void
@ -15,13 +15,20 @@ test_connect(void *arg)
machine_t machine = arg;
printf("child started\n");
machine_io_t client = machine_create_io(machine);
struct sockaddr_in sa;
uv_ip4_addr("8.8.8.8", 1324, &sa);
sa.sin_family = AF_INET;
sa.sin_addr.s_addr = inet_addr("8.8.8.8");
sa.sin_port = htons(1234);
int rc;
rc = machine_connect(client, (struct sockaddr*)&sa, 0);
rc = machine_connect(client, (struct sockaddr *)&sa, 0);
printf("child resumed\n");
assert(rc < 0);
machine_close(client);
machine_free_io(client);
if (machine_cancelled(machine))
printf("child marked as cancel\n");
printf("child end\n");

View File

@ -16,13 +16,18 @@ test_connect(void *arg)
assert(machine_cancelled(machine));
printf("child started\n");
machine_io_t client = machine_create_io(machine);
struct sockaddr_in sa;
uv_ip4_addr("8.8.8.8", 1324, &sa);
sa.sin_family = AF_INET;
sa.sin_addr.s_addr = inet_addr("8.8.8.8");
sa.sin_port = htons(1234);
int rc;
rc = machine_connect(client, (struct sockaddr*)&sa, 0);
printf("child resumed\n");
assert(rc < 0);
machine_close(client);
machine_free_io(client);
if (machine_cancelled(machine))
printf("child marked as cancel\n");
printf("child end\n");

View File

@ -12,7 +12,9 @@ main(int argc, char *argv[])
{
machine_t machine = machine_create();
machine_io_t io = machine_create_io(machine);
machine_close(io);
machine_free_io(io);
machine_free(machine);
return 0;
}