diff --git a/dcapi/boinc/assimilator.C b/dcapi/boinc/assimilator.C index 39a1c2a0a3..a0db48720e 100644 --- a/dcapi/boinc/assimilator.C +++ b/dcapi/boinc/assimilator.C @@ -2,50 +2,50 @@ #include #endif -#include -#include -#include #include -#include -#include #include #include "dc_boinc.h" #include -#include -#include #include #include int DC_processEvents(int timeout) { DC_Result *result; - char *wu_query; DB_WORKUNIT wu; - int ret = 0; + char *query; + + if (!_dc_resultcb || !_dc_subresultcb || !_dc_messagecb) + { + DC_log(LOG_ERR, "DC_processEvents: callbacks are not set up"); + return DC_ERR_CONFIG; + } /* XXX Check LIMIT value */ - wu_query = g_strdup_printf("WHERE name LIKE '%s_%%' " + query = g_strdup_printf("WHERE name LIKE '%s_%%' " "AND assimilate_state = %d LIMIT 100", project_uuid_str, ASSIMILATE_READY); - while (!wu.enumerate(wu_query)) + while (!wu.enumerate(query)) { DB_RESULT canonical_result; - char *result_query; /* We are only interested in the canonical result */ if (!wu.canonical_resultid) { - DC_log(LOG_DEBUG, "No canonical result for work unit %d", - wu.id); + /* This should never happen */ + DC_log(LOG_ERR, "No canonical result for work " + "unit %d - bug in validator?", wu.id); continue; } - result_query = g_strdup_printf("WHERE id = %d", - wu.canonical_resultid); - canonical_result.lookup(result_query); - g_free(result_query); + if (canonical_result.lookup_id(wu.canonical_resultid)) + { + DC_log(LOG_ERR, "Result #%d is not in the database", + wu.canonical_resultid); + continue; + } /* Call the callback function */ result = _DC_createResult(wu.name, canonical_result.xml_doc_in); @@ -60,6 +60,100 @@ int DC_processEvents(int timeout) wu.update(); } - g_free(wu_query); - return ret; + g_free(query); + return 0; +} + +/* Look for a single result that matches the filter */ +static DC_Event *look_for_results(const char *wuFilter, const char *wuName, + int timeout) +{ + DB_RESULT result; + DC_Event *event; + DB_WORKUNIT wu; + char *query; + + if (wuFilter) + query = g_strdup_printf("WHERE name LIKE '%s_%%_%s' " + "AND assimilate_state = %d LIMIT 1", project_uuid_str, + wuFilter, ASSIMILATE_READY); + else if (wuName) + query = g_strdup_printf("WHERE name LIKE '%s_%s%%' " + "AND assimilate_state = %d LIMIT 1", project_uuid_str, + wuName, ASSIMILATE_READY); + else + query = g_strdup_printf("WHERE name LIKE '%s_%%' " + "AND assimilate_state = %d LIMIT 1", project_uuid_str, + ASSIMILATE_READY); + + if (wu.enumerate(query)) + { + g_free(query); + return NULL; + } + g_free(query); + + if (result.lookup_id(wu.canonical_resultid)) + { + DC_log(LOG_ERR, "Result #%d is not in the database", + wu.canonical_resultid); + return NULL; + } + + event = g_new(DC_Event, 1); + event->type = DC_EVENT_RESULT; + event->result = _DC_createResult(wu.name, result.xml_doc_in); + if (!event->result) + { + g_free(event); + return NULL; + } + + return event; +} + +DC_Event *DC_waitEvent(const char *wuFilter, int timeout) +{ + DC_Event *event; + + event = look_for_results(wuFilter, NULL, timeout); +/* + if (!event) + event = look_for_notifications(...) +*/ + return event; +} + +DC_Event *DC_waitWUEvent(DC_Workunit *wu, int timeout) +{ + DC_Event *event; + + event = look_for_results(NULL, wu->name, timeout); +/* + if (!event) + event = look_for_notifications(...) +*/ + return event; +} + +void DC_DestroyEvent(DC_Event *event) +{ + if (!event) + return; + + switch (event->type) + { + case DC_EVENT_RESULT: + _DC_destroyResult(event->result); + break; + case DC_EVENT_SUBRESULT: + _DC_destroyPhysicalFile(event->subresult); + break; + case DC_EVENT_MESSAGE: + g_free(event->message); + break; + default: + DC_log(LOG_ERR, "Unknown event type %d", event->type); + break; + } }