From 3759718634217a40699318cc187bb5bff873f262 Mon Sep 17 00:00:00 2001 From: drotos Date: Sun, 4 Feb 2007 17:23:35 +0000 Subject: [PATCH] implement slave side of message passing git-svn-id: svn+ssh://cvs.lpds.sztaki.hu/var/lib/svn/szdg/dcapi/trunk@1030 a7169a2c-3604-0410-bc95-c702d8d87f7a --- dcapi/local/local_client.c | 166 ++++++++++++++++++++++++++++++++++++- 1 file changed, 164 insertions(+), 2 deletions(-) diff --git a/dcapi/local/local_client.c b/dcapi/local/local_client.c index e363bbe022..7cc722177c 100644 --- a/dcapi/local/local_client.c +++ b/dcapi/local/local_client.c @@ -22,6 +22,8 @@ #include #include "local_common.h" +/*#include "local_slave.h"*/ +#include "local_utils.h" static int _DC_checkpoint_file_requested= 0; @@ -63,13 +65,35 @@ unsigned DC_getGridCapabilities(void) /******************************************************************** * Client API functions */ - +/*done*/ int DC_initClient(void) { + char *message; + int ret= _DC_parseCfg(CLIENT_CONFIG_NAME); + + if (ret) + fprintf(stderr, "Error parsing configfile %s", + CLIENT_CONFIG_NAME); + + _DC_init_utils(); _DC_init_common(); - return 0; + + DC_log(LOG_DEBUG, "Slave dcapi initialized"); + + message= _DC_read_message(_DC_cfg(cfg_management_box), + _DCAPI_MSG_COMMAND, /*FALSE*/0); + if (message && + strcmp(message, _DCAPI_CMD_RESUME) == 0) + { + DC_log(LOG_INFO, "Resume, restarting..."); + _DC_read_message(_DC_cfg(cfg_management_box), + _DCAPI_MSG_COMMAND, /*TRUE*/1); + } + + return ret; } +/*done*/ char *DC_resolveFileName(DC_FileType type, const char *logicalFileName) { /* init_log calls this fn, so it is not possible to call DC_log @@ -144,32 +168,170 @@ char *DC_resolveFileName(DC_FileType type, const char *logicalFileName) */ } +/*done*/ int DC_sendResult(const char *logicalFileName, const char *path, DC_FileMode fileMode) { + char *fn; + int ret; + + DC_log(LOG_DEBUG, "DC_sendResult(%s,%s,%d)", + logicalFileName, + path, + fileMode); + fn= malloc(strlen(logicalFileName)+100); + strcpy(fn, _DC_cfg(cfg_subresults_box)); + strcat(fn, "/real_files"); + if ((ret= _DC_mkdir_with_parents(fn, S_IRWXU| + S_IRGRP|S_IXGRP| + S_IROTH|S_IXOTH)) != DC_OK) + { + DC_log(LOG_ERR, "Failed to create dir for subresult (%s): %s", + fn, strerror(errno)); + free(fn); + return(ret); + } + strcat(fn, "/"); + strcat(fn, logicalFileName); + if ((ret= _DC_copyFile(path, fn)) != DC_OK) + { + DC_log(LOG_ERR, "Failed to copy subresult file %s to " + "%s: %s", path, fn, strerror(errno)); + free(fn); + return(ret); + } + ret= _DC_create_message(_DC_cfg(cfg_subresults_box), + _DCAPI_MSG_LOGICAL, logicalFileName, NULL); + free(fn); + return(ret); + // not implemented yet! return DC_ERR_NOTIMPL; } +/*done*/ int DC_sendMessage(const char *message) { + DC_log(LOG_DEBUG, "DC_sendMessage(%s)", message); + return _DC_create_message(_DC_cfg(cfg_client_message_box), + _DCAPI_MSG_MESSAGE, message, NULL); // not implemented yet! return DC_ERR_NOTIMPL; } +/*done*/ DC_ClientEvent *DC_checkClientEvent(void) { + char *message; + DC_ClientEvent *e= NULL; + + message= _DC_read_message(_DC_cfg(cfg_master_message_box), + _DCAPI_MSG_MESSAGE, /*TRUE*/1); + if (message) + { + if ((e= calloc(1, sizeof(DC_ClientEvent)))) + { + DC_log(LOG_DEBUG, "API event created: %p", e); + e->type= DC_CLIENT_MESSAGE; + e->message= message; + DC_log(LOG_DEBUG, "Message of the event: %s", + e->message); + } + else + { + free(message); + DC_log(LOG_ERR, "Failed to create " + "API event, memory allocation " + "error"); + } + return(e); + } + + message= _DC_read_message(_DC_cfg(cfg_management_box), + _DCAPI_MSG_COMMAND, /*TRUE*/1); + if (message && + strcmp(message, _DCAPI_CMD_SUSPEND) == 0) + { + char *fn; + int f; + + DC_log(LOG_INFO, "Master asked me to suspend"); + _DC_create_message(_DC_cfg(cfg_management_box), + _DCAPI_MSG_ACK, + _DCAPI_ACK_SUSPEND, + NULL); + + fn= DC_resolveFileName(DC_FILE_IN, DC_CHECKPOINT_FILE); + if (fn != NULL) + free(fn); + else + { + DC_log(LOG_WARNING, "Slave suspending but " + "no checkpoint made yet"); + free(fn); + } + + fflush(NULL); + fn= DC_resolveFileName(DC_FILE_OUT, DC_LABEL_STDOUT); + if (fn) + { + if ((f= open(fn, O_APPEND)) > 0) + { + fsync(f); + close(f); + } + free(fn); + } + _DC_create_message(_DC_cfg(cfg_output_cache), + DC_LABEL_STDOUT, + NULL, + DC_LABEL_STDOUT); + fn= DC_resolveFileName(DC_FILE_OUT, DC_LABEL_STDERR); + if (fn) + { + if ((f= open(fn, O_APPEND)) > 0) + { + fsync(f); + close(f); + } + free(fn); + } + _DC_create_message(_DC_cfg(cfg_output_cache), + DC_LABEL_STDERR, + NULL, + DC_LABEL_STDERR); + + DC_finishClient(0); + } // not implemented yet! return NULL; } +/*done*/ void DC_destroyClientEvent(DC_ClientEvent *event) { + DC_log(LOG_DEBUG, "DC_destroyClientEvent(%p)", event); + if (event) + { + switch (event->type) + { + case DC_CLIENT_MESSAGE: + { + if (event->message) + free(event->message); + break; + } + default: + break; + } + free(event); + } // not implemented yet! } +/*done*/ void DC_checkpointMade(const char *fileName) { DC_log(LOG_DEBUG, "DC_checkpointMade(%s)", fileName);