// Berkeley Open Infrastructure for Network Computing // http://boinc.berkeley.edu // Copyright (C) 2005 University of California // // This is free software; you can redistribute it and/or // modify it under the terms of the GNU Lesser General Public // License as published by the Free Software Foundation; // either version 2.1 of the License, or (at your option) any later version. // // This software is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. // See the GNU Lesser General Public License for more details. // // To view the GNU Lesser General Public License visit // http://www.gnu.org/copyleft/lesser.html // or write to the Free Software Foundation, Inc., // 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA // scheduler code related to sending work #include "config.h" #include "main.h" #include "server_types.h" #include "sched_shmem.h" #include "sched_hr.h" #include "sched_config.h" #include "sched_util.h" #include "sched_msgs.h" #include "sched_send.h" #include "sched_array.h" #ifdef _USING_FCGI_ #include "fcgi_stdio.h" #else #define FCGI_ToFILE(x) (x) #endif // Make a pass through the wu/results array, sending work. // If "infeasible_only" is true, send only results that were // previously infeasible for some host // void scan_work_array( SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply, PLATFORM& platform, SCHED_SHMEM& ss ) { int i, j, retval, n, rnd_off; WORKUNIT wu; DB_RESULT result; char buf[256]; APP* app; APP_VERSION* avp; bool found; if (config.homogeneous_redundancy) { if (hr_unknown_platform(sreq)) { reply.wreq.hr_reject_perm = true; return; } } lock_sema(); rnd_off = rand() % ss.nwu_results; for (j=0; j0) { log_messages.printf( SCHED_MSG_LOG::MSG_DEBUG, "send_work: user %d already has %d result(s) for WU %d\n", reply.user.id, n, wu_result.workunit.id ); goto dont_send; } } } // if desired, make sure redundancy is homogeneous // if (config.homogeneous_redundancy || app->homogeneous_redundancy) { if (already_sent_to_different_platform( sreq, wu_result.workunit, reply.wreq )) { log_messages.printf(SCHED_MSG_LOG::MSG_DEBUG, "[HOST#%d] [WU#%d %s] WU is infeasible (assigned to different platform)\n", reply.host.id, wu.id, wu.name ); // Mark the workunit as infeasible. // This ensures that work already assigned to a platform // is processed first. // wu_result.infeasible_count++; goto dont_send; } } result.id = wu_result.resultid; // mark slot as empty AFTER we've copied out of it // (since otherwise feeder might overwrite it) // wu_result.state = WR_STATE_EMPTY; // reread result from DB, make sure it's still unsent // TODO: from here to add_result_to_reply() // (which updates the DB record) should be a transaction // retval = result.lookup_id(result.id); if (retval) { log_messages.printf(SCHED_MSG_LOG::MSG_CRITICAL, "[RESULT#%d] result.lookup_id() failed %d\n", result.id, retval ); goto done; } if (result.server_state != RESULT_SERVER_STATE_UNSENT) { log_messages.printf(SCHED_MSG_LOG::MSG_DEBUG, "[RESULT#%d] expected to be unsent; instead, state is %d\n", result.id, result.server_state ); goto done; } if (result.workunitid != wu.id) { log_messages.printf(SCHED_MSG_LOG::MSG_CRITICAL, "[RESULT#%d] wrong WU ID: wanted %d, got %d\n", result.id, wu.id, result.workunitid ); goto done; } retval = add_result_to_reply( result, wu, sreq, reply, platform, app, avp ); // add_result_to_reply() fails only in fairly pathological cases - // e.g. we couldn't update the DB record or modify XML fields. // If this happens, don't replace the record in the array // (we can't anyway, since we marked the entry as "empty"). // The feeder will eventually pick it up again, // and hopefully the problem won't happen twice. // goto done; dont_send: // here we couldn't send the result for some reason -- // set its state back to PRESENT // wu_result.state = WR_STATE_PRESENT; done: lock_sema(); } unlock_sema(); } const char *BOINC_RCSID_d9f764fd14="$Id$";