/* Local variables: */ /* c-file-style: "linux" */ /* End: */ #include #include #include #include #include #include #include #include #include "dc.h" #include "condor_master.h" #include "condor_common.h" #include "condor_defs.h" #include "condor_wu.h" #include "condor_log.h" #include "condor_utils.h" #include "condor_event.h" /********************************************************************* INIT */ static GHashTable *_DC_wu_table= NULL; char *_DC_project_uuid_str= NULL; char *_DC_config_file= NULL; DC_ResultCallback _DC_result_callback= NULL; DC_SubresultCallback _DC_subresult_callback= NULL; DC_MessageCallback _DC_message_callback= NULL; /* Initializes the DC-API. */ int DC_initMaster(const char *configFile) { int ret; char *cfgval= NULL; GString *gs; if (!configFile) configFile= DC_CONFIG_FILE; if (configFile[0] != '/') { gs= g_string_new(g_get_current_dir()); g_string_append(gs, "/"); g_string_append(gs, configFile); } else gs= g_string_new(configFile); _DC_config_file= gs->str; g_string_free(gs, FALSE); ret= _DC_parseCfg(configFile); if (ret) { DC_log(LOG_ERR, "DC-API config file (%s) parse error", configFile); return(ret); } DC_log(LOG_DEBUG, "DC_initMaster(%s)", configFile); _DC_init_utils(); _DC_init_common(); if (!_DC_wu_table) _DC_wu_table= g_hash_table_new_full(g_str_hash, g_str_equal, NULL, NULL); cfgval= DC_getCfgStr(CFG_INSTANCEUUID); if (!cfgval) { DC_log(LOG_ERR, "Setting of %s is missing from config file %s", CFG_INSTANCEUUID, configFile); return(DC_ERR_CONFIG); } /* ret= uuid_parse((char *)cfgval, _DC_project_uuid); if (ret) { DC_log(LOG_ERR, "Invalid project UUID"); g_free(cfgval); return(DC_ERR_CONFIG); } */ if (_DC_project_uuid_str) g_free(_DC_project_uuid_str); _DC_project_uuid_str= g_strdup(cfgval); g_free(cfgval); /* Enforce a canonical string representation of the UUID */ /*uuid_unparse_lower(_DC_project_uuid, _DC_project_uuid_str);*/ return(DC_OK); } /****************************************************** Manage WU structure */ /* Creates one work unit. */ DC_Workunit * DC_createWU(const char *clientName, const char *arguments[], int subresults, const char *tag) { DC_Workunit *wu; /*char uuid_str[37];*/ /*char *cfgval;*/ /*GString *str;*/ int ret; wu= g_new0(DC_Workunit, 1); _DC_wu_changed(wu); DC_log(LOG_DEBUG, "DC_createWU(%s, %p, %d, %s)=%p", clientName, arguments, subresults, tag, wu); _DC_wu_set_client_name(wu, clientName); wu->argv= g_strdupv((char **)arguments); for (_DC_wu_set_argc(wu, 0); arguments && arguments[wu->data.argc]; _DC_wu_set_argc(wu, wu->data.argc+1)) ; _DC_wu_set_subresults(wu, subresults); _DC_wu_set_tag(wu, g_strdup(tag)); /* uuid_generate(wu->uuid); uuid_unparse_lower(wu->uuid, uuid_str); wu->uuid_str= g_strdup(uuid_str); */ { char *wd= DC_getCfgStr(CFG_WORKDIR); GString *s= g_string_new(""); /*GString *hd= g_string_new("");*/ GString *d= g_string_new(""); gpointer *t= ((gpointer)wu)-1; char *h; gboolean ok; do { t++; g_string_printf(s, "%p", t); if (strlen(s->str) > 2) h= (s->str)+strlen(s->str)-2; else h= s->str; /*g_string_printf(hd, "%s/.dcapi-%s/%s", wd, _DC_project_uuid_str, h);*/ g_string_printf(d, "%s/.dcapi-%s/%s/%s", wd, _DC_project_uuid_str, h, s->str); ok= /*!g_file_test(hd->str, G_FILE_TEST_IS_DIR) &&*/ !g_file_test(d->str, G_FILE_TEST_IS_DIR); } while (!ok); _DC_wu_set_uuid_str(wu, s->str); g_string_free(s, FALSE); /*g_string_free(hd, TRUE);*/ _DC_wu_set_workdir(wu, d->str); g_string_free(d, FALSE); free(wd); } if (tag) _DC_wu_set_name(wu, g_strdup_printf("%s_%s_%s", _DC_project_uuid_str, wu->data.uuid_str, tag)); else _DC_wu_set_name(wu, g_strdup_printf("%s_%s", _DC_project_uuid_str, wu->data.uuid_str)); DC_log(LOG_DEBUG, "wu name=\"%s\"", wu->data.name); /* Calculate & create the working directory. The working directory * has the form: * /.dcapi-// * Where is the first 2 hex digits of the uuid */ /* cfgval= DC_getCfgStr(CFG_WORKDIR); str= g_string_new(cfgval); free(cfgval); g_string_append_c(str, G_DIR_SEPARATOR); g_string_append(str, ".dcapi-"); g_string_append(str, _DC_project_uuid_str); g_string_append_c(str, G_DIR_SEPARATOR); g_string_append_printf(str, "%02x", wu->uuid[0]); g_string_append_c(str, G_DIR_SEPARATOR); g_string_append(str, uuid_str); wu->workdir= str->str; g_string_free(str, FALSE); */ wu->condor_events= g_array_new(FALSE, FALSE, sizeof(struct _DC_condor_event)); if (!_DC_wu_table) DC_initMaster(NULL); g_hash_table_insert(_DC_wu_table, wu->data.name, wu); ret= _DC_mkdir_with_parents(wu->data.workdir, 0700); if (ret) { DC_log(LOG_ERR, "Failed to create WU working directory %s: %s", wu->data.workdir, strerror(errno)); DC_destroyWU(wu); return(NULL); } _DC_wu_make_client_config(wu); _DC_wu_make_client_executables(wu); _DC_wu_set_state(wu, DC_WU_READY); return(wu); } /* Releases internal resources allocated to a work unit. */ void DC_destroyWU(DC_Workunit *wu) { int i; GString *s; if (!_DC_wu_check(wu)) return; DC_log(LOG_DEBUG, "DC_destroyWU(%p-\"%s\")", wu, wu->data.name); if (_DC_wu_table) g_hash_table_remove(_DC_wu_table, wu->data.name); s= g_string_new(""); g_string_printf(s, "%s/%s", wu->data.workdir, _DC_wu_cfg(wu, cfg_master_message_box)); if ((i= _DC_nuof_messages(s->str, _DCAPI_MSG_MESSAGE)) > 0) DC_log(LOG_NOTICE, "%d master messages unhandled by " "destroying wu: %s", i, wu->data.name); g_string_printf(s, "%s/%s", wu->data.workdir, _DC_wu_cfg(wu, cfg_client_message_box)); if ((i= _DC_nuof_messages(s->str, _DCAPI_MSG_MESSAGE)) > 0) DC_log(LOG_NOTICE, "%d client messages unhandled of wu: %s", i, wu->data.name); g_string_printf(s, "%s/%s", wu->data.workdir, _DC_wu_cfg(wu, cfg_subresults_box)); if ((i= _DC_nuof_messages(s->str, _DCAPI_MSG_LOGICAL)) > 0) DC_log(LOG_NOTICE, "%d client subresults unhandled of wu: %s", i, wu->data.name); g_string_free(s, TRUE); if (wu->data.state == DC_WU_READY || wu->data.state == DC_WU_UNKNOWN) DC_log(LOG_NOTICE, "Destroying an unstarted wu: %s", wu->data.name); if (wu->data.state == DC_WU_RUNNING || wu->data.state == DC_WU_SUSPENDED) { DC_log(LOG_NOTICE, "Destroying a started but not yet " "finished wu: %s", wu->data.name); DC_log(LOG_INFO, "WU has been started but not finished " "not removing its workdir %s", wu->data.workdir); } else if (wu->data.workdir) { const char *name; char *exec; GDir *dir; int ret, i; GString *fn; int leave; leave= strtol(_DC_wu_cfg(wu, cfg_leave_files), 0, 0); if (leave) DC_log(LOG_INFO, "WU's files not deleted"); /* Removing generated files */ fn= g_string_new(wu->data.workdir); if (!leave) { fn= g_string_append(fn, "/"); fn= g_string_append(fn, _DC_wu_cfg(wu, cfg_submit_file)); unlink(fn->str); g_string_printf(fn, "%s/%s", wu->data.workdir, CLIENT_CONFIG_NAME); unlink(fn->str); g_string_printf(fn, "%s/%s", wu->data.workdir, _DC_wu_cfg(wu, cfg_executable)); unlink(fn->str); g_string_printf(fn, "%s/%s", wu->data.workdir, _DC_wu_cfg(wu, cfg_condor_log)); unlink(fn->str); g_string_printf(fn, "%s/%s", wu->data.workdir, DC_LABEL_STDOUT); unlink(fn->str); g_string_printf(fn, "%s/%s", wu->data.workdir, DC_LABEL_STDERR); unlink(fn->str); exec= _DC_wu_cfg(wu, cfg_executable); if (!exec) exec= wu->data.client_name; g_string_printf(fn, "%s/%s", wu->data.workdir, exec); unlink(fn->str); g_string_printf(fn, "%s/%s", wu->data.workdir, DC_getClientCfgStr(wu->data.client_name, "LogFile", TRUE)); unlink(fn->str); } if (!leave) { g_string_printf(fn, "%s/%s", wu->data.workdir, _DC_wu_cfg(wu, cfg_client_message_box)); i= _DC_rm(fn->str); if (i > 0) DC_log(LOG_NOTICE, "%d unhandled client messages " "remained", i); g_string_printf(fn, "%s/%s", wu->data.workdir, _DC_wu_cfg(wu, cfg_master_message_box)); i= _DC_rm(fn->str); if (i > 0) DC_log(LOG_NOTICE, "%d unhandled master messages " "remained", i); g_string_printf(fn, "%s/%s", wu->data.workdir, _DC_wu_cfg(wu, cfg_subresults_box)); i= _DC_rm(fn->str); if (i > 0) DC_log(LOG_NOTICE, "%d unhandled client subresults " "remained", i); } g_string_free(fn, TRUE); /* The work directory should not contain any extra files, but * just in case */ if (!leave) { dir= g_dir_open(wu->data.workdir, 0, NULL); while (dir && (name= g_dir_read_name(dir))) { GString *str= g_string_new(wu->data.workdir); g_string_append_c(str, G_DIR_SEPARATOR); g_string_append(str, name); DC_log(LOG_INFO, "Removing unknown file %s", str->str); unlink(str->str); g_string_free(str, TRUE); } if (dir) g_dir_close(dir); ret= rmdir(wu->data.workdir); if (ret) DC_log(LOG_WARNING, "Failed to remove WU working " "directory %s: %s", wu->data.workdir, strerror(errno)); } g_free(wu->data.workdir); } _DC_wu_set_client_name(wu, NULL); g_free(wu->data.uuid_str); g_strfreev(wu->argv); g_free(wu->data.tag); g_free(wu->data.name); g_array_free(wu->condor_events, TRUE); g_free(wu); } /* Sets an input file for the work unit. */ int DC_addWUInput(DC_Workunit *wu, const char *logicalFileName, const char *URL, DC_FileMode fileMode) { DC_PhysicalFile *file; char *workpath; int ret; if (!_DC_wu_check(wu)) return(DC_ERR_UNKNOWN_WU); DC_log(LOG_DEBUG, "DC_addWUInput(%p-\"%s\", %s, %s, %d)", wu, wu->data.name, logicalFileName, URL, fileMode); if (wu->data.state != DC_WU_READY) { DC_log(LOG_INFO, "Modifying started wu %s", wu->data.name); return(DC_ERR_BADPARAM); } /* Sanity checks */ ret= _DC_wu_check_logical_name(wu, logicalFileName); if (ret) return(ret); workpath= _DC_wu_get_workdir_path(wu, logicalFileName, FILE_IN); file= _DC_createPhysicalFile(logicalFileName, workpath); g_free(workpath); if (!file) return(DC_ERR_INTERNAL); switch (fileMode) { case DC_FILE_REGULAR: DC_log(LOG_DEBUG, "Copying regular file %s to %s", URL, file->path); ret= _DC_copyFile(URL, file->path); if (ret) { _DC_destroyPhysicalFile(file); return(ret/*DC_ERR_BADPARAM*/); /* XXX */ } break; case DC_FILE_PERSISTENT: DC_log(LOG_DEBUG, "Copying persistent file %s to %s", URL, file->path); ret= _DC_copyFile(URL, file->path); if (ret) { DC_log(LOG_ERR, "Failed to link %s to %s: %s", URL, file->path, strerror(errno)); _DC_destroyPhysicalFile(file); return(DC_ERR_BADPARAM); /* XXX */ } /* Remember the file mode */ file->mode= DC_FILE_PERSISTENT; break; case DC_FILE_VOLATILE: DC_log(LOG_DEBUG, "Renaming %s to %s", URL, file->path); ret= rename(URL, file->path); if (ret) { DC_log(LOG_ERR, "Failed to rename %s to %s: %s", URL, file->path, strerror(errno)); _DC_destroyPhysicalFile(file); return(DC_ERR_BADPARAM); } break; } wu->input_files= g_list_append(wu->input_files, file); /*wu->num_inputs++; */ return(0); } /* Defines an output file for the work unit. */ int DC_addWUOutput(DC_Workunit *wu, const char *logicalFileName) { int ret; if (!_DC_wu_check(wu)) return(DC_ERR_UNKNOWN_WU); DC_log(LOG_DEBUG, "DC_addWUOutput(%p-\"%s\", %s)", wu, wu->data.name, logicalFileName); if (wu->data.state != DC_WU_READY) { DC_log(LOG_INFO, "Modifying started wu %s", wu->data.name); return(DC_ERR_BADPARAM); } /* Sanity checks */ ret= _DC_wu_check_logical_name(wu, logicalFileName); if (ret) return ret; DC_log(LOG_DEBUG, "Adding out file %s", logicalFileName); wu->output_files= g_list_append(wu->output_files, g_strdup(logicalFileName)); /*wu->num_outputs++;*/ return(0); } /* Sets the priority for the work unit. */ int DC_setWUPriority(DC_Workunit *wu, int priority) { if (!_DC_wu_check(wu)) return(DC_ERR_UNKNOWN_WU); DC_log(LOG_DEBUG, "DC_setWUPriority(%p-\"%s\", %d)", wu, wu->data.name, priority); if (wu->data.state != DC_WU_READY) { DC_log(LOG_INFO, "Modifying started wu %s", wu->data.name); return(DC_ERR_BADPARAM); } return(DC_OK); } /* Sets the callback functions that will be called when a particular event. */ void DC_setMasterCb(DC_ResultCallback resultcb, DC_SubresultCallback subresultcb, DC_MessageCallback msgcb) { DC_log(LOG_DEBUG, "DC_setMasterCb(%p, %p, %p)", resultcb, subresultcb, msgcb); _DC_result_callback= resultcb; _DC_subresult_callback= subresultcb; _DC_message_callback= msgcb; } /* Queries the state of a work unit. */ DC_WUState DC_getWUState(DC_Workunit *wu) { if (!_DC_wu_check(wu)) return(DC_WU_UNKNOWN); return(wu->data.state); } /* Queries the low-level ID of the work unit. */ char * DC_getWUId(const DC_Workunit *wu) { GString *s; struct _DC_condor_event *ce; if (!_DC_wu_check(wu)) return(NULL); _DC_wu_update_condor_events((DC_Workunit *)wu); if (wu->condor_events->len == 0) return(NULL); ce= &g_array_index(wu->condor_events, struct _DC_condor_event, wu->condor_events->len-1); s= g_string_new(""); g_string_printf(s, "%d.%d", ce->cluster, ce->proc); return(g_string_free(s, FALSE)); } /* Queries the tag of a work unit. */ char * DC_getWUTag(const DC_Workunit *wu) { if (!_DC_wu_check(wu)) return(NULL); return((wu->data.tag)?strdup(wu->data.tag):NULL); } /* Serializes a work unit description. */ char * DC_serializeWU(DC_Workunit *wu) { if (!_DC_wu_check(wu)) return(NULL); return(0); } /* Restores a serialized work unit. */ DC_Workunit * DC_deserializeWU(const char *buf) { return(0); } /* iterator for DC_getWUNumber() */ static DC_WUState _DC_dd_look_for_state; static void _DC_dd_check_state(void *key, void *value, void *ptr) { DC_Workunit *wu=(DC_Workunit *)value; int *count= (int *)ptr; if (wu->data.state == _DC_dd_look_for_state) ++(*count); } /* Queries the number of WUs known to the API in the given state. */ int DC_getWUNumber(DC_WUState state) { int val= 0; _DC_dd_look_for_state= state; g_hash_table_foreach(_DC_wu_table, (GHFunc)_DC_dd_check_state, &val); return(val); } /************************************************************** Main cycles */ static void _DC_process_event(DC_MasterEvent *event) { if (!event) return; DC_log(LOG_DEBUG, "Processing an event %d, calling callback...", event->type); switch (event->type) { case DC_MASTER_RESULT: { if (_DC_result_callback) (*_DC_result_callback)(event->wu, event->result); break; } case DC_MASTER_SUBRESULT: { if (_DC_subresult_callback) (*_DC_subresult_callback)(event->wu, event->subresult->label, event->subresult->path); break; } case DC_MASTER_MESSAGE: { if (_DC_message_callback) (*_DC_message_callback)(event->wu, event->message); break; } } DC_destroyMasterEvent(event); } /* Waits for events and processes them. */ int DC_processMasterEvents(int timeout) { /* call callback and destry event */ time_t start, now; DC_MasterEvent *event; DC_log(LOG_DEBUG, "DC_processMasterEvents(%d)", timeout); if ((event= DC_waitMasterEvent(NULL, 0)) != NULL) _DC_process_event(event); if (timeout==0) return(DC_OK); start= time(NULL); now= time(NULL); while (now-start <= timeout && g_hash_table_size(_DC_wu_table) > 0) { sleep(1); if ((event= DC_waitMasterEvent(NULL, 0)) != NULL) _DC_process_event(event); now= time(NULL); } return(DC_OK); } static DC_MasterEvent *_DC_filtered_event; static gboolean _DC_check_filtered_wu_event(gpointer wu_name, gpointer w, gpointer ptr) { DC_Workunit *wu= (DC_Workunit *)w; char *filter= (char *)ptr; if (filter && ((strcmp(DC_getWUTag(wu), filter) != 0))) return(FALSE); if ((_DC_filtered_event= DC_waitWUEvent(wu, 0)) != NULL) DC_log(LOG_DEBUG, "DC_waitMasterEvent found an event for wu %s", wu->data.name); return(!_DC_filtered_event); } /* Checks for events and return them. */ DC_MasterEvent * DC_waitMasterEvent(const char *wuFilter, int timeout) { /* no callback called! */ DC_Workunit *wu; time_t start, now; if (timeout) DC_log(LOG_DEBUG, "DC_waitMasterEvent(%s, %d)", wuFilter, timeout); _DC_filtered_event= NULL; wu= (DC_Workunit *)g_hash_table_find(_DC_wu_table, _DC_check_filtered_wu_event, (gpointer)wuFilter); if (_DC_filtered_event || timeout==0) return(_DC_filtered_event); start= time(NULL); now= time(NULL); while (now-start <= timeout) { sleep(1); _DC_filtered_event= NULL; wu= (DC_Workunit *) g_hash_table_find(_DC_wu_table, _DC_check_filtered_wu_event, (gpointer)wuFilter); if (_DC_filtered_event) return(_DC_filtered_event); } return(NULL); } /* Checks for events for a particular WU. */ DC_MasterEvent * DC_waitWUEvent(DC_Workunit *wu, int timeout) { /* no callback called! */ time_t start, now; DC_MasterEvent *me= NULL; if (!_DC_wu_check(wu)) return(NULL); if (timeout) DC_log(LOG_DEBUG, "DC_waitWUEvent(%p-\"%s\", %d)", wu, wu->data.name, timeout); _DC_wu_update_condor_events(wu); if ((me= _DC_wu_check_client_messages(wu)) != NULL) DC_log(LOG_DEBUG, "DC_waitWUEvent found a message"); if (!me) { if ((me= _DC_wu_condor2api_event(wu)) != NULL) DC_log(LOG_DEBUG, "DC_waitWUEvent found a condor event"); } if (me || timeout==0) return(me); start= time(NULL); now= start; while (now-start <= timeout) { sleep(1); _DC_wu_update_condor_events(wu); if ((me= _DC_wu_check_client_messages(wu)) != NULL) DC_log(LOG_DEBUG, "DC_waitWUEvent found a message"); if (!me) { if ((me= _DC_wu_condor2api_event(wu)) != NULL) DC_log(LOG_DEBUG, "DC_waitWUEvent found a " "condor event"); } if (me) return(me); now= time(NULL); } return(NULL); } /* Destroys an event. */ void DC_destroyMasterEvent(DC_MasterEvent *event) { DC_log(LOG_DEBUG, "DC_destroyMasterEvent(%p)", event); _DC_event_destroy(event); } /**************************************************************** Messaging */ /* Sends a message to a running work unit. */ int DC_sendWUMessage(DC_Workunit *wu, const char *message) { GString *dn; int ret; if (!_DC_wu_check(wu)) return(DC_ERR_UNKNOWN_WU); DC_log(LOG_DEBUG, "DC_sendWUMessage(%p-\"%s\", %s)", wu, wu->data.name, message); dn= g_string_new(wu->data.workdir); g_string_append(dn, "/"); g_string_append(dn, _DC_wu_cfg(wu, cfg_master_message_box)); ret= _DC_create_message(dn->str, _DCAPI_MSG_MESSAGE, message, NULL); g_string_free(dn, TRUE); return(ret); } /********************************************************** Result handling */ /* Queries what optional fields are present in the result. */ unsigned DC_getResultCapabilities(const DC_Result *result) { int cap; cap= DC_GC_STDOUT | DC_GC_STDERR; cap|= DC_GC_MESSAGING | DC_GC_LOG | DC_GC_SUBRESULT; cap|= DC_GC_EXITCODE; return(cap); } /* Returns the WU that generated this result. */ DC_Workunit * DC_getResultWU(DC_Result *result) { if (!result) return(NULL); return(result->wu); } /* Returns the exit code of the client application. */ int DC_getResultExit(const DC_Result *result) { int i; if (!result) return(0); if (!_DC_wu_check(result->wu)) return(0); if (_DC_wu_exit_code(result->wu, &i) != DC_OK) return(0); return(i); } /* Returns the local name of an output file. */ char * DC_getResultOutput(const DC_Result *result, const char *logicalFileName) { GString *fn; if (!result) return(NULL); if (!_DC_wu_check(result->wu)) return(NULL); fn= g_string_new(result->wu->data.workdir); fn= g_string_append(fn, "/"); fn= g_string_append(fn, logicalFileName); return(g_string_free(fn, FALSE)); } /* Application level configuration parameter */ char * _DC_acfg(enum _DC_e_param what) { char *v; v= DC_getCfgStr(_DC_params[what].name); if (v && *v) return(v); return(_DC_params[what].def); } /* End of condor/condor_master.c */