// The contents of this file are subject to the Mozilla Public License // Version 1.0 (the "License"); you may not use this file except in // compliance with the License. You may obtain a copy of the License at // http://www.mozilla.org/MPL/ // // Software distributed under the License is distributed on an "AS IS" // basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the // License for the specific language governing rights and limitations // under the License. // // The Original Code is the Berkeley Open Infrastructure for Network Computing. // // The Initial Developer of the Original Code is the SETI@home project. // Portions created by the SETI@home project are Copyright (C) 2002 // University of California at Berkeley. All Rights Reserved. // // Contributor(s): // // This file contains high-level logic for communicating with // scheduling servers: // - what project to ask for work // - how much work to ask for // - merging the result of a scheduler RPC into the client state // Note: code for actually doing a scheduler RPC is elsewhere, // namely scheduler_op.C #include #include #include #ifdef _USING_FCGI_ #undef _USING_FCGI_ #endif #include "error_numbers.h" #include "file_names.h" #include "parse.h" #include "log_flags.h" #include "message.h" #include "scheduler_op.h" #include "client_state.h" // quantities like avg CPU time decay by a factor of e every week #define EXP_DECAY_RATE (1./(3600*24*7)) //estimates amount of time a workunit will take to complete // double CLIENT_STATE::estimate_duration(WORKUNIT* wup) { return wup->rsc_fpops/host_info.p_fpops + wup->rsc_iops/host_info.p_iops; } //estimates the number of days of work remaining // double CLIENT_STATE::current_water_days() { unsigned int i; double seconds_remaining=0; for (i=0; iis_compute_done) continue; if (rp->cpu_time > 0) seconds_remaining += (estimate_duration(rp->wup) - rp->cpu_time); else seconds_remaining += estimate_duration(rp->wup); } return (seconds_remaining * 86400); } bool CLIENT_STATE::need_work() { return (current_water_days() <= prefs->low_water_days); } void CLIENT_STATE::update_avg_cpu(PROJECT* p) { int now = time(0); double deltat = now - p->exp_avg_mod_time; if (deltat > 0) { if (p->exp_avg_cpu != 0) { p->exp_avg_cpu *= exp(deltat*EXP_DECAY_RATE); } p->exp_avg_mod_time = now; } } // choose a project to ask for work // PROJECT* CLIENT_STATE::choose_project() { PROJECT* p, *bestp; unsigned int i; double best_ratio, ratio; // update the average CPU times of all projects // for (i=0; inext_request_time) continue; if (p->exp_avg_cpu == 0) return p; ratio = p->resource_share/p->exp_avg_cpu; if (ratio >= best_ratio) { best_ratio = ratio; bestp = p; } } return bestp; } int CLIENT_STATE::make_scheduler_request(PROJECT* p, int work_req) { FILE* f = fopen(SCHED_OP_REQUEST_FILE, "wb"); unsigned int i; RESULT* rp; if (!f) return ERR_FOPEN; fprintf(f, "\n" " %s\n" " %d\n" " %d\n" " %s\n" " %d\n" " %d\n", p->authenticator, p->hostid, p->rpc_seqno, platform_name, version, work_req ); FILE* fprefs = fopen(PREFS_FILE_NAME, "r"); copy_stream(fprefs, f); fclose(fprefs); time_stats.write(f, true); net_stats.write(f, true); host_info.write(f); for (i=0; iproject == p && !rp->is_server_ack) { if (rp->is_upload_done()) { rp->write(f, true); } } } fprintf(f, "\n"); fclose(f); return 0; } // manage the task of maintaining an adequate supply of work. // bool CLIENT_STATE::get_work() { PROJECT* project; int retval, work_secs; bool action=false; if (need_work()) { switch(scheduler_op->state) { case SCHEDULER_OP_STATE_IDLE: // if no scheduler request pending, start one // project = choose_project(); if (!project) { if (log_flags.sched_op_debug) { printf("all projects temporarily backed off\n"); } return false; } work_secs = (int) (prefs->high_water_days - current_water_days())*86400; retval = make_scheduler_request(project, work_secs); if (retval) { fprintf(stderr, "make_scheduler_request: %d\n", retval); break; } scheduler_op->start_op(project); action = true; break; default: scheduler_op->poll(); if (scheduler_op->state == SCHEDULER_OP_STATE_DONE) { scheduler_op->state = SCHEDULER_OP_STATE_IDLE; action = true; if (scheduler_op->scheduler_op_retval) { fprintf(stderr, "scheduler RPC to %s failed: %d\n", scheduler_op->project->master_url, scheduler_op->scheduler_op_retval ); if (log_flags.sched_ops) { printf("HTTP work request failed: %d\n", retval); } break; } else { handle_scheduler_reply(*scheduler_op); client_state_dirty = true; } } break; } } return action; } // see whether a new preferences set, obtained from // the given project, looks "reasonable". // Currently this is primitive: just make sure there's at least 1 project // bool PREFS::looks_reasonable(PROJECT& project) { if (projects.size() > 0) return true; return false; } void CLIENT_STATE::handle_scheduler_reply(SCHEDULER_OP& sched_op) { SCHEDULER_REPLY sr; FILE* f; int retval; unsigned int i; char prefs_backup[256]; PROJECT *project, *pp, *sp; PREFS* new_prefs; project = sched_op.project; contacted_sched_server = true; if (log_flags.sched_ops) { printf("Got reply from scheduler %s\n", sched_op.scheduler_url); } if (log_flags.sched_op_debug) { f = fopen(SCHED_OP_RESULT_FILE, "r"); printf("------------- SCHEDULER REPLY ----------\n"); copy_stream(f, stdout); fclose(f); printf("------------- END ----------\n"); } f = fopen(SCHED_OP_RESULT_FILE, "r"); retval = sr.parse(f); if (strlen(sr.message)) { show_message(sr.message, sr.message_priority); } // copy new entities to client state // if (sr.request_delay) { project->next_request_time = time(0) + sr.request_delay; } if (sr.hostid) { project->hostid = sr.hostid; project->rpc_seqno = 0; } // if the scheduler reply includes preferences // that are newer than what we have on disk, then // - verify that the new prefs look reasonable; // they should include at least one project. // - rename the current prefs file // - copy new preferences to prefs.xml // - copy any new projects into the CLIENT_STATE structure // - update the preferences info of projects already in CLIENT_STATE // // There may be projects in CLIENT_STATE that are not in the new prefs; // i.e. the user has dropped out of these projects. // We'll continue with any ongoing work for these projects, // but they'll be dropped the next time the client starts up. // if (sr.prefs_mod_time > prefs->mod_time) { f = fopen(PREFS_TEMP_FILE_NAME, "w"); fprintf(f, "\n" " %d\n" " %s\n" " %s\n", sr.prefs_mod_time, project->master_url, sched_op.scheduler_url ); fputs(sr.prefs_xml, f); fprintf(f, "\n" ); fclose(f); f = fopen(PREFS_TEMP_FILE_NAME, "r"); new_prefs = new PREFS; new_prefs->parse(f); fclose(f); if (new_prefs->looks_reasonable(*project)) { make_prefs_backup_name(*prefs, prefs_backup); rename(PREFS_FILE_NAME, prefs_backup); rename(PREFS_TEMP_FILE_NAME, PREFS_FILE_NAME); for (i=0; iprojects.size(); i++) { pp = new_prefs->projects[i]; sp = lookup_project(pp->master_url); if (sp) { sp->copy_prefs_fields(*pp); } else { projects.push_back(pp); } } delete prefs; prefs = new_prefs; } else { fprintf(stderr, "New preferences don't look reasonable, ignoring\n"); } } for (i=0; iis_server_ack = true; } else { fprintf(stderr, "ERROR: got ack for result %s, can't find\n", sr.result_acks[i].name ); } } if (log_flags.state_debug) { printf("State after handle_scheduler_reply():\n"); print_counts(); } }