From 5a86e7529e0836ea268831634ee7034fb60d9459 Mon Sep 17 00:00:00 2001 From: Charlie Fenton Date: Wed, 5 Nov 2008 06:03:13 +0000 Subject: [PATCH] MGR: Async RPCs: RPC thread is now a joinable thread which does one RPC and exits. Main thread creates a new RPC thread for each RPC request. svn path=/trunk/boinc/; revision=16423 --- checkin_notes | 13 ++- clientgui/AsyncRPC.cpp | 195 ++++++++++++++++--------------------- clientgui/MainDocument.cpp | 32 +----- clientgui/MainDocument.h | 7 +- 4 files changed, 102 insertions(+), 145 deletions(-) diff --git a/checkin_notes b/checkin_notes index 3f3c0bbf63..3b891e0d74 100644 --- a/checkin_notes +++ b/checkin_notes @@ -9184,4 +9184,15 @@ Rom 4 Nov 2008 clientgui/ BOINCGUIApp.cpp - \ No newline at end of file + +Charlie 4 Nov 2008 + - MGR: Async RPCs: RPC thread is now a joinable thread which does + one RPC and exits. Main thread creates a new RPC thread for + each RPC request after waiting for any previous RPC thread to + exit. This simplifies the logic, eliminates Yield(), Sleep() + and nanosleep() calls from RPC threadm and will hopefully + eliminate exess CPU usage on Fedora. + + clientgui/ + AsyncRPC.cpp + MainDocument.cpp, .h diff --git a/clientgui/AsyncRPC.cpp b/clientgui/AsyncRPC.cpp index 24c60cdbe9..f3b1a739aa 100755 --- a/clientgui/AsyncRPC.cpp +++ b/clientgui/AsyncRPC.cpp @@ -97,76 +97,46 @@ int AsyncRPC::RPC_Wait(RPC_SELECTOR which_rpc, void *arg1, void *arg2, } -RPCThread::RPCThread(CMainDocument *pDoc) : wxThread() { +RPCThread::RPCThread(CMainDocument *pDoc) : wxThread(wxTHREAD_JOINABLE) { m_pDoc = pDoc; } void RPCThread::OnExit() { - // Tell CMainDocument that thread has gracefully ended - m_pDoc->m_RPCThread = NULL; } -// We don't need critical sections because: -// 1. CMainDocument never modifies mDoc->current_rpc_request while the -// async RPC thread is using it. -// 2. The async RPC thread never modifies either mDoc->current_rpc_request -// or the vector of requests mDoc->RPC_requests. - void *RPCThread::Entry() { - int retval; + int retval = 0; CRPCFinishedEvent RPC_done_event( wxEVT_RPC_FINISHED ); + ASYNC_RPC_REQUEST *current_request = m_pDoc->GetCurrentRPCRequest(); - // check if we were asked to exit - while(!TestDestroy()) { - - if (! m_pDoc->IsConnected()) { -#ifdef __WXMSW__ - Sleep(1); -#else - timespec ts = {0, 1000000}; /// 1 imllisecond - nanosleep(&ts, NULL); /// 1 imllisecond or less -#endif + // Do nothing if no active RPC request (should never happen) + if (m_pDoc->GetCurrentRPCRequest()->isActive) { + // check if we were asked to exit + if(! TestDestroy()) { + retval = ProcessRPCRequest(); } - - // Wait until CMainDocument issues next RPC request - if (!m_pDoc->GetCurrentRPCRequest()->isActive) { -#ifdef __WXMSW__ // Until we can suspend the thread without Deadlock on Windows - Sleep(1); -#elif defined(__WXMAC__) - Yield(); -#else - // Some Linux systems may not support POSIX sched_yield(), - // in which case wxThread::Yield() returns immediately. - timespec ts = {0, 1}; /// 1 nanosecond - nanosleep(&ts, NULL); /// 1 nanosecond or less -#endif - continue; - } - - retval = ProcessRPCRequest(); - wxPostEvent( wxTheApp, RPC_done_event ); } - -#ifndef __WXMSW__ // Deadlocks on Windows - - // Use a critical section to prevent a crash during - // manager shutdown due to a rare race condition - m_pDoc->m_critsect.Enter(); - m_pDoc->m_critsect.Leave(); - -#endif // !!__WXMSW__ // Deadlocks on Windows - + + // We don't need critical sections because the RPC thread is + // joinable and the main thread calls Wait(). +// wxCriticalSectionLocker(m_pDoc->m_critsect); // Deactivation is an atomic operation + current_request->retval = retval; + current_request->isActive = false; + wxPostEvent( wxTheApp, RPC_done_event ); + + // Tell CMainDocument that thread has gracefully ended + m_pDoc->m_RPCThread = NULL; + Exit(); return NULL; } int RPCThread::ProcessRPCRequest() { int retval = 0; - ASYNC_RPC_REQUEST *current_request; + ASYNC_RPC_REQUEST *current_request = m_pDoc->GetCurrentRPCRequest(); - current_request = m_pDoc->GetCurrentRPCRequest(); switch (current_request->which_rpc) { // RPC_SELECTORS with no arguments case RPC_RUN_BENCHMARKS: @@ -391,20 +361,10 @@ int RPCThread::ProcessRPCRequest() { break; } - // Deactivation is an atomic operation - current_request->retval = retval; - current_request->isActive = false; - return retval; } -// We don't need critical sections (except when exiting Manager) because: -// 1. CMainDocument never modifies mDoc->current_rpc_request while the -// async RPC thread is using it. -// 2. The async RPC thread never modifies either mDoc->current_rpc_request -// or the vector of requests mDoc->RPC_requests. - // TODO: combine RPC requests for different buffers, then just copy the buffer. int CMainDocument::RequestRPC(ASYNC_RPC_REQUEST& request, bool hasPriority) { @@ -445,16 +405,29 @@ int CMainDocument::RequestRPC(ASYNC_RPC_REQUEST& request, bool hasPriority) { // Start this RPC if no other RPC is already in progress. if (RPC_requests.size() == 1) { - // Make sure activation is an atomic operation + if (m_RPCThread) { + m_RPCThread->Wait(); + } + // We don't need critical sections because the RPC thread is + // joinable and the main thread calls Wait(). +// m_critsect.Enter(); // Make sure activation is an atomic operation + request.isActive = false; current_rpc_request = request; current_rpc_request.isActive = true; + + wxASSERT(m_RPCThread == NULL); + m_RPCThread = new RPCThread(this); + wxASSERT(m_RPCThread); + + retval2 = m_RPCThread->Create(); + wxASSERT(!retval2); + + retval2 = m_RPCThread->Run(); + wxASSERT(!retval2); + +// m_critsect.Leave(); // Make sure activation is an atomic operation } -#ifndef __WXMSW__ // Deadlocks on Windows - if (current_rpc_request.isActive && m_RPCThread->IsPaused()) { - m_RPCThread->Resume(); - } -#endif // !!__WXMSW__ // Deadlocks on Windows // If this is a user-initiated event wait for completion but show // a dialog allowing the user to cancel. @@ -494,13 +467,6 @@ int CMainDocument::RequestRPC(ASYNC_RPC_REQUEST& request, bool hasPriority) { // Allow RPC thread to run while we wait for it. if (!current_rpc_request.isActive) { HandleCompletedRPC(); -#ifndef __WXMSW__ // Deadlocks on Windows - } else { - // for safety - if (m_RPCThread->IsPaused()) { - m_RPCThread->Resume(); - } -#endif // __WXMSW__ // Deadlocks on Windows } } else { // RPC_WAIT_DLG_DELAY has expired; check if Manager is minimized. @@ -512,6 +478,8 @@ int CMainDocument::RequestRPC(ASYNC_RPC_REQUEST& request, bool hasPriority) { // NOTE: CBOINCGUIApp::FilterEvent() blocks those events // which might cause posting of more RPC requests while // we are in this loop, to prevent undesirable recursion. + // It does allow wxEVT_RPC_FINISHED so we don't need to + // explicity call HandleCompletedRPC() here. // keepLooping = true; if (wxGetApp().Pending()) { @@ -547,29 +515,11 @@ int CMainDocument::RequestRPC(ASYNC_RPC_REQUEST& request, bool hasPriority) { // This is ugly but necessary. We must then reconnect and // start a new RPC thread. if (current_rpc_request.isActive) { - current_rpc_request.isActive = false; - m_RPCThread->Pause(); // Needed on Windows - rpcClient.close(); - m_RPCThread->Kill(); -#ifdef __WXMSW__ - m_RPCThread->Delete(); // Needed on Windows, crashes on Mac/Linux -#endif - m_RPCThread = NULL; - RPC_requests.clear(); - current_rpc_request.clear(); - m_bNeedRefresh = false; - m_bNeedTaskBarRefresh = false; + KillRPCThread(); // We will be reconnected to the same client (if possible) by // CBOINCDialUpManager::OnPoll() and CNetworkConnection::Poll(). m_pNetworkConnection->SetStateDisconnected(); - m_RPCThread = new RPCThread(this); - wxASSERT(m_RPCThread); - retval2 = m_RPCThread->Create(); - wxASSERT(!retval2); - retval2 = m_RPCThread->Run(); - wxASSERT(!retval2); -// m_RPCThread->Pause(); } } if (m_RPCWaitDlg) { @@ -583,32 +533,45 @@ int CMainDocument::RequestRPC(ASYNC_RPC_REQUEST& request, bool hasPriority) { } +void CMainDocument::KillRPCThread() { + // We don't need critical sections because the RPC thread is + // joinable and the main thread calls Wait(). +// wxCriticalSectionLocker(this->m_critsect); + + if (!m_RPCThread) { + return; + } + + current_rpc_request.isActive = false; + m_RPCThread->Pause(); // May be needed on Windows ?? + rpcClient.close(); + m_RPCThread->Kill(); +#ifdef __WXMSW__ + m_RPCThread->Delete(); // My be needed on Windows, but may crash on Mac/Linux ?? +#endif + m_RPCThread = NULL; + RPC_requests.clear(); + current_rpc_request.clear(); + m_bNeedRefresh = false; + m_bNeedTaskBarRefresh = false; +} + + void CMainDocument::OnRPCComplete(CRPCFinishedEvent&) { HandleCompletedRPC(); } void CMainDocument::HandleCompletedRPC() { - int retval; + int retval = 0, retval2 = 0; int i, n, requestIndex = -1; bool stillWaitingForPendingRequests = false; - if(current_rpc_request.isActive) return; + if (current_rpc_request.isActive) return; // We can get here either via a CRPCFinishedEvent event posted // by the RPC thread or by a call from RequestRPC. If we were // called from RequestRPC, the CRPCFinishedEvent will still be // on the event queue, so we get called twice. Check for this here. if (current_rpc_request.which_rpc == 0) return; // already handled by a call from RequestRPC -#ifndef __WXMSW__ // Deadlocks on Windows - m_RPCThread->Pause(); - // Pause() does not take effect until the RPC thread calls TestDestroy() - // We must wait for that to happen to avoid a possible race condition. - do { - // TODO: is there a way for main UNIX thread to yield wih no minimum delay? - timespec ts = {0, 1}; /// 1 nanosecond - nanosleep(&ts, NULL); /// 1 nanosecond or less - } while (!m_RPCThread->IsPaused()); - -#endif // Find our completed request in the queue n = RPC_requests.size(); @@ -826,15 +789,25 @@ void CMainDocument::HandleCompletedRPC() { // We can't start this until finished processing the previous RPC's // event because the two requests may write into the same buffer. if (RPC_requests.size() > 0) { - // Make sure activation is an atomic operation + if (m_RPCThread) { + m_RPCThread->Wait(); + } + // We don't need critical sections because the RPC thread is + // joinable and the main thread calls Wait(). +// wxCriticalSectionLocker(this->m_critsect); // Make sure activation is an atomic operation RPC_requests[0].isActive = false; current_rpc_request = RPC_requests[0]; current_rpc_request.isActive = true; -#ifndef __WXMSW__ // Deadlocks on Windows - if (m_RPCThread->IsPaused()) { - m_RPCThread->Resume(); - } -#endif // ! __WXMSW__ // Deadlocks on Windows + + wxASSERT(m_RPCThread == NULL); + m_RPCThread = new RPCThread(this); + wxASSERT(m_RPCThread); + + retval2 = m_RPCThread->Create(); + wxASSERT(!retval2); + + retval2 = m_RPCThread->Run(); + wxASSERT(!retval2); } } diff --git a/clientgui/MainDocument.cpp b/clientgui/MainDocument.cpp index 352730328a..04421136c7 100644 --- a/clientgui/MainDocument.cpp +++ b/clientgui/MainDocument.cpp @@ -431,23 +431,13 @@ int CMainDocument::OnInit() { m_pClientManager = new CBOINCClientManager(); wxASSERT(m_pClientManager); + m_RPCThread = NULL; m_RPCWaitDlg = NULL; m_bWaitingForRPC = false; m_bNeedRefresh = false; m_bNeedTaskBarRefresh = false; current_rpc_request.clear(); - m_RPCThread = new RPCThread(this); - wxASSERT(m_RPCThread); - - iRetVal = m_RPCThread->Create(); - wxASSERT(!iRetVal); - - m_RPCThread->Run(); -#ifndef __WXMSW__ - m_RPCThread->Pause(); -#endif - return iRetVal; } @@ -466,25 +456,7 @@ int CMainDocument::OnExit() { } if (m_RPCThread) { - // Use a critical section to prevent a crash during - // manager shutdown due to a rare race condition -#ifndef __WXMSW__ - m_critsect.Enter(); - m_RPCThread->Delete(); - // On some platforms, Delete() takes effect only when thread calls TestDestroy() - m_RPCThread->Resume(); - m_critsect.Leave(); -#endif - wxStopWatch ThreadDeleteTimer = wxStopWatch(); - // RPC thread sets m_RPCThread to NULL when it exits - while (m_RPCThread) { - // Allow 5 seconds for RPC thread to exit gracefully - if (ThreadDeleteTimer.Time() > 5000) { - m_RPCThread->Pause(); // Needed on Windows - m_RPCThread->Kill(); - break; - } - } + KillRPCThread(); m_RPCThread = NULL; } diff --git a/clientgui/MainDocument.h b/clientgui/MainDocument.h index 9d28b3d53c..b17678e2ce 100644 --- a/clientgui/MainDocument.h +++ b/clientgui/MainDocument.h @@ -177,22 +177,23 @@ public: public: int RequestRPC(ASYNC_RPC_REQUEST& request, bool hasPriority = false); void OnRPCComplete(CRPCFinishedEvent& event); - void HandleCompletedRPC(); ASYNC_RPC_REQUEST* GetCurrentRPCRequest() { return ¤t_rpc_request; } bool WaitingForRPC() { return m_bWaitingForRPC; } wxDialog* GetRPCWaitDialog() { return m_RPCWaitDlg; } // void TestAsyncRPC(); // For testing Async RPCs RPCThread* m_RPCThread; wxCriticalSection m_critsect; - bool m_bNeedRefresh; - bool m_bNeedTaskBarRefresh; private: + void HandleCompletedRPC(); + void KillRPCThread(); int CopyProjectsToStateFile(PROJECTS& p, CC_STATE& state); ASYNC_RPC_REQUEST current_rpc_request; AsyncRPCDlg* m_RPCWaitDlg; std::vector RPC_requests; bool m_bWaitingForRPC; + bool m_bNeedRefresh; + bool m_bNeedTaskBarRefresh; // // Project Tab