impl of DC_waitMasterEvent

redesign of DC_processMasterEvents
check wu state before modifying it

git-svn-id: svn+ssh://cvs.lpds.sztaki.hu/var/lib/svn/szdg/dcapi/trunk@668 a7169a2c-3604-0410-bc95-c702d8d87f7a
This commit is contained in:
drdani 2006-06-09 11:44:03 +00:00 committed by Adam Visegradi
parent 263242de0a
commit 905cca8a01
1 changed files with 134 additions and 48 deletions

View File

@ -172,7 +172,13 @@ DC_destroyWU(DC_Workunit *wu)
if (_DC_wu_table)
g_hash_table_remove(_DC_wu_table, wu->name);
if (wu->workdir)
if (wu->state == DC_WU_RUNNING ||
wu->state == DC_WU_SUSPENDED)
{
DC_log(LOG_INFO, "WU has been started but not finished "
"do not remove its workdir %s", wu->workdir);
}
else if (wu->workdir)
{
const char *name;
GDir *dir;
@ -236,6 +242,12 @@ DC_addWUInput(DC_Workunit *wu,
DC_log(LOG_DEBUG, "DC_addWUInput(%p-\"%s\", %s, %s, %d)",
wu, wu->name, logicalFileName, URL, fileMode);
if (wu->state != DC_WU_READY)
{
DC_log(LOG_INFO, "Modifying started wu %s", wu->name);
return(DC_ERR_BADPARAM);
}
/* Sanity checks */
ret= _DC_wu_check_logical_name(wu, logicalFileName);
if (ret)
@ -304,6 +316,12 @@ DC_addWUOutput(DC_Workunit *wu, const char *logicalFileName)
DC_log(LOG_DEBUG, "DC_addWUOutput(%p-\"%s\", %s)",
wu, wu->name, logicalFileName);
if (wu->state != DC_WU_READY)
{
DC_log(LOG_INFO, "Modifying started wu %s", wu->name);
return(DC_ERR_BADPARAM);
}
/* Sanity checks */
ret= _DC_wu_check_logical_name(wu, logicalFileName);
if (ret)
@ -326,7 +344,13 @@ DC_setWUPriority(DC_Workunit *wu, int priority)
DC_log(LOG_DEBUG, "DC_setWUPriority(%p-\"%s\", %d)",
wu, wu->name, priority);
return(0);
if (wu->state != DC_WU_READY)
{
DC_log(LOG_INFO, "Modifying started wu %s", wu->name);
return(DC_ERR_BADPARAM);
}
return(DC_OK);
}
@ -359,9 +383,20 @@ DC_getWUState(DC_Workunit *wu)
char *
DC_getWUId(const DC_Workunit *wu)
{
GString *s;
struct _DC_condor_event *ce;
if (!_DC_wu_check(wu))
return(NULL);
return(NULL);
_DC_wu_update_condor_events((DC_Workunit *)wu);
if (wu->condor_events->len == 0)
return(NULL);
ce= &g_array_index(wu->condor_events,
struct _DC_condor_event,
wu->condor_events->len-1);
s= g_string_new("");
g_string_printf(s, "%d.%d", ce->cluster, ce->proc);
return(g_string_free(s, FALSE));
}
@ -407,7 +442,7 @@ static void _DC_dd_check_state(void *key, void *value, void *ptr)
int
DC_getWUNumber(DC_WUState state)
{
int val;
int val= 0;
_DC_dd_look_for_state= state;
g_hash_table_foreach(_DC_wu_table, (GHFunc)_DC_dd_check_state, &val);
@ -417,34 +452,31 @@ DC_getWUNumber(DC_WUState state)
/************************************************************** Main cycles */
static void _DC_check_all_wus_for_event(void *key, void *value, void *ptr)
static void
_DC_process_event(DC_MasterEvent *event)
{
DC_Workunit *wu= (DC_Workunit *)value;
DC_MasterEvent *event;
int *processed= (int *)ptr;
event= DC_waitWUEvent(wu, 0);
if (event)
if (!event)
return;
switch (event->type)
{
switch (event->type)
{
case DC_MASTER_RESULT:
{
if (_DC_result_callback)
(*_DC_result_callback)(wu, event->result);
break;
}
case DC_MASTER_SUBRESULT:
{
break;
}
case DC_MASTER_MESSAGE:
{
break;
}
}
DC_destroyMasterEvent(event);
(*processed)++;
case DC_MASTER_RESULT:
{
if (_DC_result_callback)
(*_DC_result_callback)(event->wu,
event->result);
break;
}
case DC_MASTER_SUBRESULT:
{
break;
}
case DC_MASTER_MESSAGE:
{
break;
}
}
DC_destroyMasterEvent(event);
}
/* Waits for events and processes them. */
@ -452,43 +484,78 @@ int
DC_processMasterEvents(int timeout)
{
/* call callback and destry event */
int processed;
time_t start, now;
DC_MasterEvent *event;
DC_log(LOG_DEBUG, "DC_processMasterEvents(%d)",
timeout);
processed= 0;
g_hash_table_foreach(_DC_wu_table,
(GHFunc)_DC_check_all_wus_for_event,
&processed);
if ((!processed &&
timeout == 0) ||
g_hash_table_size(_DC_wu_table) == 0)
return(0);
if ((event= DC_waitMasterEvent(NULL, 0)) != NULL)
_DC_process_event(event);
if (timeout==0)
return(DC_OK);
start= time(NULL);
now= time(NULL);
while (now-start <= timeout &&
g_hash_table_size(_DC_wu_table) > 0)
{
/*printf("%ds wus=%d\n", (int)(now-start),
g_hash_table_size(_DC_wu_table));*/
sleep(1);
processed= 0;
g_hash_table_foreach(_DC_wu_table,
(GHFunc)_DC_check_all_wus_for_event,
&processed);
if ((event= DC_waitMasterEvent(NULL, 0)) != NULL)
_DC_process_event(event);
now= time(NULL);
}
return(0);
return(DC_OK);
}
static DC_MasterEvent *_DC_filtered_event;
static gboolean
_DC_check_filtered_wu_event(gpointer wu_name, gpointer w, gpointer ptr)
{
DC_Workunit *wu= (DC_Workunit *)w;
char *filter= (char *)ptr;
if (filter &&
((strcmp(DC_getWUTag(wu), filter) != 0)))
return(FALSE);
_DC_filtered_event= DC_waitWUEvent(wu, 0);
return(!_DC_filtered_event);
}
/* Checks for events and return them. */
DC_MasterEvent *
DC_waitMasterEvent(const char *wuFilter, int timeout)
{
/* no callback called! */
return(0);
DC_Workunit *wu;
time_t start, now;
DC_log(LOG_DEBUG, "DC_waitMasterEvent(%s, %d)",
wuFilter, timeout);
_DC_filtered_event= NULL;
wu= (DC_Workunit *)g_hash_table_find(_DC_wu_table,
_DC_check_filtered_wu_event,
(gpointer)wuFilter);
if (_DC_filtered_event ||
timeout==0)
return(_DC_filtered_event);
start= time(NULL);
now= time(NULL);
while (now-start <= timeout)
{
sleep(1);
_DC_filtered_event= NULL;
wu= (DC_Workunit *)
g_hash_table_find(_DC_wu_table,
_DC_check_filtered_wu_event,
(gpointer)wuFilter);
if (_DC_filtered_event)
return(_DC_filtered_event);
}
return(NULL);
}
@ -506,8 +573,23 @@ DC_waitWUEvent(DC_Workunit *wu, int timeout)
DC_log(LOG_DEBUG, "DC_waitWUEvent(%p-\"%s\", %d)",
wu, wu->name, timeout);
events= wu->condor_events->len;
_DC_wu_update_condor_events(wu);
me= _DC_wu_condor2api_event(wu);
if (me || timeout==0)
return(me);
start= time(NULL);
now= start;
while (now-start <= timeout)
{
sleep(1);
_DC_wu_update_condor_events(wu);
me= _DC_wu_condor2api_event(wu);
if (me)
return(me);
now= time(NULL);
}
return(0);
e= wu->condor_events->len;
if (e != events)
{
@ -579,7 +661,11 @@ DC_sendWUMessage(DC_Workunit *wu, const char *message)
unsigned
DC_getResultCapabilities(const DC_Result *result)
{
return(DC_ERR_NOTIMPL);
int cap;
cap= DC_GC_STDOUT | DC_GC_STDERR;
return(cap);
}