From 07e2a3e5117251d37ddd5aa376902d4fab5eedb9 Mon Sep 17 00:00:00 2001 From: crs Date: Wed, 29 Sep 2004 21:59:26 +0000 Subject: [PATCH] Removed recursive mutexes. Simplified stream filters as a side effect. Removed -D_BSD_SOURCE and -D_XOPEN_SOURCE=500 from compile since they're not longer necessary. --- cmd/synergyc/CClientTaskBarReceiver.cpp | 5 +- cmd/synergyc/CClientTaskBarReceiver.h | 2 - cmd/synergys/CServerTaskBarReceiver.cpp | 5 +- cmd/synergys/CServerTaskBarReceiver.h | 2 - configure.in | 10 -- lib/arch/CArchMultithreadPosix.cpp | 2 - lib/base/CEventQueue.cpp | 123 +++++++++--------------- lib/base/CEventQueue.h | 23 +---- lib/base/IEventQueue.h | 45 ++------- lib/io/CStreamFilter.cpp | 36 ++++--- lib/io/CStreamFilter.h | 16 ++- lib/io/IStream.h | 20 +--- lib/net/CTCPSocket.cpp | 71 +++++--------- lib/net/CTCPSocket.h | 6 +- lib/net/IDataSocket.h | 2 - lib/server/CClientListener.cpp | 7 +- lib/synergy/CPacketStreamFilter.cpp | 63 +++--------- lib/synergy/CPacketStreamFilter.h | 12 +-- 18 files changed, 148 insertions(+), 302 deletions(-) diff --git a/cmd/synergyc/CClientTaskBarReceiver.cpp b/cmd/synergyc/CClientTaskBarReceiver.cpp index e1cbcfac..ddd8ec5a 100644 --- a/cmd/synergyc/CClientTaskBarReceiver.cpp +++ b/cmd/synergyc/CClientTaskBarReceiver.cpp @@ -38,7 +38,6 @@ CClientTaskBarReceiver::updateStatus(CClient* client, const CString& errorMsg) { { // update our status - CLock lock(&m_mutex); m_errorMessage = errorMsg; if (client == NULL) { if (m_errorMessage.empty()) { @@ -95,13 +94,13 @@ CClientTaskBarReceiver::onStatusChanged(CClient*) void CClientTaskBarReceiver::lock() const { - m_mutex.lock(); + // do nothing } void CClientTaskBarReceiver::unlock() const { - m_mutex.unlock(); + // do nothing } std::string diff --git a/cmd/synergyc/CClientTaskBarReceiver.h b/cmd/synergyc/CClientTaskBarReceiver.h index ab9c371c..a9fe50c2 100644 --- a/cmd/synergyc/CClientTaskBarReceiver.h +++ b/cmd/synergyc/CClientTaskBarReceiver.h @@ -15,7 +15,6 @@ #ifndef CCLIENTTASKBARRECEIVER_H #define CCLIENTTASKBARRECEIVER_H -#include "CMutex.h" #include "CString.h" #include "IArchTaskBarReceiver.h" @@ -76,7 +75,6 @@ protected: virtual void onStatusChanged(CClient* client); private: - CMutex m_mutex; EState m_state; CString m_errorMessage; }; diff --git a/cmd/synergys/CServerTaskBarReceiver.cpp b/cmd/synergys/CServerTaskBarReceiver.cpp index b3858827..ee04d79e 100644 --- a/cmd/synergys/CServerTaskBarReceiver.cpp +++ b/cmd/synergys/CServerTaskBarReceiver.cpp @@ -38,7 +38,6 @@ CServerTaskBarReceiver::updateStatus(CServer* server, const CString& errorMsg) { { // update our status - CLock lock(&m_mutex); m_errorMessage = errorMsg; if (server == NULL) { if (m_errorMessage.empty()) { @@ -100,13 +99,13 @@ CServerTaskBarReceiver::onStatusChanged(CServer*) void CServerTaskBarReceiver::lock() const { - m_mutex.lock(); + // do nothing } void CServerTaskBarReceiver::unlock() const { - m_mutex.unlock(); + // do nothing } std::string diff --git a/cmd/synergys/CServerTaskBarReceiver.h b/cmd/synergys/CServerTaskBarReceiver.h index 372d73f2..d6ec8571 100644 --- a/cmd/synergys/CServerTaskBarReceiver.h +++ b/cmd/synergys/CServerTaskBarReceiver.h @@ -15,7 +15,6 @@ #ifndef CSERVERTASKBARRECEIVER_H #define CSERVERTASKBARRECEIVER_H -#include "CMutex.h" #include "CString.h" #include "IArchTaskBarReceiver.h" #include "stdvector.h" @@ -81,7 +80,6 @@ protected: virtual void onStatusChanged(CServer* server); private: - CMutex m_mutex; EState m_state; CString m_errorMessage; CClients m_clients; diff --git a/configure.in b/configure.in index b180baba..a066a4d1 100644 --- a/configure.in +++ b/configure.in @@ -74,16 +74,6 @@ fi dnl check compiler ACX_CHECK_CXX -dnl different platforms have somewhat incompatible requirements for -dnl BSD and Posix macros. -case $host in -*-*-openbsd* | *-*-freebsd*) - ;; -*) - CXXFLAGS="$CXXFLAGS -D_BSD_SOURCE -D_XOPEN_SOURCE=500" - ;; -esac - dnl checks for libraries if test x"$acx_host_arch" = xUNIX; then ACX_PTHREAD(,AC_MSG_ERROR(You must have pthreads to compile synergy)) diff --git a/lib/arch/CArchMultithreadPosix.cpp b/lib/arch/CArchMultithreadPosix.cpp index b3b2eeba..64baf6af 100644 --- a/lib/arch/CArchMultithreadPosix.cpp +++ b/lib/arch/CArchMultithreadPosix.cpp @@ -301,8 +301,6 @@ CArchMultithreadPosix::newMutex() pthread_mutexattr_t attr; int status = pthread_mutexattr_init(&attr); assert(status == 0); - status = pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE); - assert(status == 0); CArchMutexImpl* mutex = new CArchMutexImpl; status = pthread_mutex_init(&mutex->m_mutex, &attr); assert(status == 0); diff --git a/lib/base/CEventQueue.cpp b/lib/base/CEventQueue.cpp index 8e309ce3..16a0d52b 100644 --- a/lib/base/CEventQueue.cpp +++ b/lib/base/CEventQueue.cpp @@ -183,6 +183,9 @@ CEventQueue::dispatchEvent(const CEvent& event) { void* target = event.getTarget(); IEventJob* job = getHandler(event.getType(), target); + if (job == NULL) { + job = getHandler(CEvent::kUnknown, target); + } if (job != NULL) { job->run(event); return true; @@ -273,65 +276,56 @@ CEventQueue::deleteTimer(CEventQueueTimer* timer) m_buffer->deleteTimer(timer); } -void -CEventQueue::adoptHandler(void* target, IEventJob* handler) -{ - doAdoptHandler(CEvent::kUnknown, target, handler); -} - void CEventQueue::adoptHandler(CEvent::Type type, void* target, IEventJob* handler) { - assert(type != CEvent::kUnknown); - doAdoptHandler(type, target, handler); -} - -IEventJob* -CEventQueue::orphanHandler(void* target) -{ - return doOrphanHandler(CEvent::kUnknown, target); -} - -IEventJob* -CEventQueue::orphanHandler(CEvent::Type type, void* target) -{ - assert(type != CEvent::kUnknown); - return doOrphanHandler(type, target); -} - -void -CEventQueue::removeHandler(void* target) -{ - delete orphanHandler(target); + CArchMutexLock lock(m_mutex); + IEventJob*& job = m_handlers[target][type]; + delete job; + job = handler; } void CEventQueue::removeHandler(CEvent::Type type, void* target) { - delete orphanHandler(type, target); + IEventJob* handler = NULL; + { + CArchMutexLock lock(m_mutex); + CHandlerTable::iterator index = m_handlers.find(target); + if (index != m_handlers.end()) { + CTypeHandlerTable& typeHandlers = index->second; + CTypeHandlerTable::iterator index2 = typeHandlers.find(type); + if (index2 != typeHandlers.end()) { + handler = index2->second; + typeHandlers.erase(index2); + } + } + } + delete handler; } void -CEventQueue::doAdoptHandler(CEvent::Type type, void* target, IEventJob* handler) +CEventQueue::removeHandlers(void* target) { - CArchMutexLock lock(m_mutex); - IEventJob*& job = m_handlers[CTypeTarget(type, target)]; - delete job; - job = handler; -} - -IEventJob* -CEventQueue::doOrphanHandler(CEvent::Type type, void* target) -{ - CArchMutexLock lock(m_mutex); - CHandlerTable::iterator index = m_handlers.find(CTypeTarget(type, target)); - if (index != m_handlers.end()) { - IEventJob* handler = index->second; - m_handlers.erase(index); - return handler; + std::vector handlers; + { + CArchMutexLock lock(m_mutex); + CHandlerTable::iterator index = m_handlers.find(target); + if (index != m_handlers.end()) { + // copy to handlers array and clear table for target + CTypeHandlerTable& typeHandlers = index->second; + for (CTypeHandlerTable::iterator index2 = typeHandlers.begin(); + index2 != typeHandlers.end(); ++index2) { + handlers.push_back(index2->second); + } + typeHandlers.clear(); + } } - else { - return NULL; + + // delete handlers + for (std::vector::iterator index = handlers.begin(); + index != handlers.end(); ++index) { + delete *index; } } @@ -345,14 +339,13 @@ IEventJob* CEventQueue::getHandler(CEvent::Type type, void* target) const { CArchMutexLock lock(m_mutex); - CHandlerTable::const_iterator index = - m_handlers.find(CTypeTarget(type, target)); + CHandlerTable::const_iterator index = m_handlers.find(target); if (index != m_handlers.end()) { - return index->second; - } - index = m_handlers.find(CTypeTarget(CEvent::kUnknown, target)); - if (index != m_handlers.end()) { - return index->second; + const CTypeHandlerTable& typeHandlers = index->second; + CTypeHandlerTable::const_iterator index2 = typeHandlers.find(type); + if (index2 != typeHandlers.end()) { + return index2->second; + } } return NULL; } @@ -454,30 +447,6 @@ CEventQueue::getNextTimerTimeout() const } -// -// CEventQueue::CTypeTarget -// - -CEventQueue::CTypeTarget::CTypeTarget(CEvent::Type type, void* target) : - m_type(type), - m_target(target) -{ - // do nothing -} - -CEventQueue::CTypeTarget::~CTypeTarget() -{ - // do nothing -} - -bool -CEventQueue::CTypeTarget::operator<(const CTypeTarget& tt) const -{ - return (m_type < tt.m_type || - (m_type == tt.m_type && m_target < tt.m_target)); -} - - // // CEventQueue::CTimer // diff --git a/lib/base/CEventQueue.h b/lib/base/CEventQueue.h index b61eb1e3..a63c7b16 100644 --- a/lib/base/CEventQueue.h +++ b/lib/base/CEventQueue.h @@ -43,13 +43,10 @@ public: virtual CEventQueueTimer* newOneShotTimer(double duration, void* target); virtual void deleteTimer(CEventQueueTimer*); - virtual void adoptHandler(void* target, IEventJob* dispatcher); virtual void adoptHandler(CEvent::Type type, void* target, IEventJob* handler); - virtual IEventJob* orphanHandler(void* target); - virtual IEventJob* orphanHandler(CEvent::Type type, void* target); - virtual void removeHandler(void* target); virtual void removeHandler(CEvent::Type type, void* target); + virtual void removeHandlers(void* target); virtual CEvent::Type registerType(const char* name); virtual CEvent::Type @@ -59,27 +56,12 @@ public: virtual const char* getTypeName(CEvent::Type type); private: - void doAdoptHandler(CEvent::Type type, - void* target, IEventJob* handler); - IEventJob* doOrphanHandler(CEvent::Type type, void* target); - UInt32 saveEvent(const CEvent& event); CEvent removeEvent(UInt32 eventID); bool hasTimerExpired(CEvent& event); double getNextTimerTimeout() const; private: - class CTypeTarget { - public: - CTypeTarget(CEvent::Type type, void* target); - ~CTypeTarget(); - - bool operator<(const CTypeTarget&) const; - - private: - CEvent::Type m_type; - void* m_target; - }; class CTimer { public: CTimer(CEventQueueTimer*, double timeout, double initialTime, @@ -111,8 +93,9 @@ private: typedef CPriorityQueue CTimerQueue; typedef std::map CEventTable; typedef std::vector CEventIDList; - typedef std::map CHandlerTable; typedef std::map CTypeMap; + typedef std::map CTypeHandlerTable; + typedef std::map CHandlerTable; CArchMutex m_mutex; diff --git a/lib/base/IEventQueue.h b/lib/base/IEventQueue.h index 765d4143..6f48f25c 100644 --- a/lib/base/IEventQueue.h +++ b/lib/base/IEventQueue.h @@ -114,47 +114,17 @@ public: */ virtual void deleteTimer(CEventQueueTimer*) = 0; - //! Register an event handler - /*! - Registers an event handler for \p target. The \p handler is - adopted. Any existing handler for the target is deleted. - \c dispatchEvent() will invoke \p handler for any event for - \p target that doesn't have a type specific handler. - */ - virtual void adoptHandler(void* target, IEventJob* handler) = 0; - //! Register an event handler for an event type /*! Registers an event handler for \p type and \p target. The \p handler is adopted. Any existing handler for the type,target pair is deleted. \c dispatchEvent() will invoke \p handler for any event for \p target - of type \p type. + of type \p type. If no such handler exists it will use the handler + for \p target and type \p kUnknown if it exists. */ virtual void adoptHandler(CEvent::Type type, void* target, IEventJob* handler) = 0; - //! Unregister an event handler - /*! - Unregisters an event handler for \p target and returns it. - Returns NULL if there was no such handler. The client becomes - responsible for deleting the returned handler. - */ - virtual IEventJob* orphanHandler(void* target) = 0; - - //! Unregister an event handler for an event type - /*! - Unregisters an event handler for the \p type, \p target pair and - returns it. Returns NULL if there was no such handler. The - client becomes responsible for deleting the returned handler. - */ - virtual IEventJob* orphanHandler(CEvent::Type type, void* target) = 0; - - //! Unregister an event handler - /*! - Unregisters an event handler for \p target and deletes it. - */ - virtual void removeHandler(void* target) = 0; - //! Unregister an event handler for an event type /*! Unregisters an event handler for the \p type, \p target pair and @@ -162,6 +132,12 @@ public: */ virtual void removeHandler(CEvent::Type type, void* target) = 0; + //! Unregister all event handlers for an event target + /*! + Unregisters all event handlers for the \p target and deletes them. + */ + virtual void removeHandlers(void* target) = 0; + //! Creates a new event type /*! Returns a unique event type id. @@ -192,9 +168,8 @@ public: //! Get an event handler /*! - Finds and returns the event handler for the \p type, \p target pair. - If there is no such handler, returns the handler for \p target. If - that doesn't exist, returns NULL. + Finds and returns the event handler for the \p type, \p target pair + if it exists, otherwise it returns NULL. */ virtual IEventJob* getHandler(CEvent::Type type, void* target) const = 0; diff --git a/lib/io/CStreamFilter.cpp b/lib/io/CStreamFilter.cpp index d5a2a350..312b0dfd 100644 --- a/lib/io/CStreamFilter.cpp +++ b/lib/io/CStreamFilter.cpp @@ -13,6 +13,8 @@ */ #include "CStreamFilter.h" +#include "IEventQueue.h" +#include "TMethodEventJob.h" // // CStreamFilter @@ -22,11 +24,16 @@ CStreamFilter::CStreamFilter(IStream* stream, bool adoptStream) : m_stream(stream), m_adopted(adoptStream) { - // do nothing + // replace handlers for m_stream + EVENTQUEUE->removeHandlers(m_stream->getEventTarget()); + EVENTQUEUE->adoptHandler(CEvent::kUnknown, m_stream->getEventTarget(), + new TMethodEventJob(this, + &CStreamFilter::handleUpstreamEvent)); } CStreamFilter::~CStreamFilter() { + EVENTQUEUE->removeHandler(CEvent::kUnknown, m_stream->getEventTarget()); if (m_adopted) { delete m_stream; } @@ -68,16 +75,10 @@ CStreamFilter::shutdownOutput() getStream()->shutdownOutput(); } -void -CStreamFilter::setEventFilter(IEventJob* filter) -{ - getStream()->setEventFilter(filter); -} - void* CStreamFilter::getEventTarget() const { - return getStream()->getEventTarget(); + return const_cast(reinterpret_cast(this)); } bool @@ -92,14 +93,21 @@ CStreamFilter::getSize() const return getStream()->getSize(); } -IEventJob* -CStreamFilter::getEventFilter() const -{ - return getStream()->getEventFilter(); -} - IStream* CStreamFilter::getStream() const { return m_stream; } + +void +CStreamFilter::filterEvent(const CEvent& event) +{ + EVENTQUEUE->dispatchEvent(CEvent(event.getType(), + getEventTarget(), event.getData())); +} + +void +CStreamFilter::handleUpstreamEvent(const CEvent& event, void*) +{ + filterEvent(event); +} diff --git a/lib/io/CStreamFilter.h b/lib/io/CStreamFilter.h index 5706cf4a..4dc87094 100644 --- a/lib/io/CStreamFilter.h +++ b/lib/io/CStreamFilter.h @@ -33,19 +33,17 @@ public: ~CStreamFilter(); // IStream overrides - // These all just forward to the underlying stream. Override as - // necessary. + // These all just forward to the underlying stream except getEventTarget. + // Override as necessary. getEventTarget returns a pointer to this. virtual void close(); virtual UInt32 read(void* buffer, UInt32 n); virtual void write(const void* buffer, UInt32 n); virtual void flush(); virtual void shutdownInput(); virtual void shutdownOutput(); - virtual void setEventFilter(IEventJob* filter); virtual void* getEventTarget() const; virtual bool isReady() const; virtual UInt32 getSize() const; - virtual IEventJob* getEventFilter() const; protected: //! Get the stream @@ -54,6 +52,16 @@ protected: */ IStream* getStream() const; + //! Handle events from source stream + /*! + Does the event filtering. The default simply dispatches an event + identical except using this object as the event target. + */ + virtual void filterEvent(const CEvent&); + +private: + void handleUpstreamEvent(const CEvent&, void*); + private: IStream* m_stream; bool m_adopted; diff --git a/lib/io/IStream.h b/lib/io/IStream.h index 37d8514f..cb5b54c9 100644 --- a/lib/io/IStream.h +++ b/lib/io/IStream.h @@ -18,8 +18,6 @@ #include "IInterface.h" #include "CEvent.h" -class IEventJob; - //! Bidirectional stream interface /*! Defines the interface for all streams. @@ -77,22 +75,14 @@ public: */ virtual void shutdownOutput() = 0; - //! Set the event filter - /*! - If not NULL, the \p filter is passed any event that would've been - added to the queue. The filter can discard the event, modify it - and add it to the queue, and add other events. The default filter - is NULL. The caller retains ownership of the filter. - */ - virtual void setEventFilter(IEventJob* filter) = 0; - //@} //! @name accessors //@{ //! Get event target /*! - Returns the event target for events generated by this stream. + Returns the event target for events generated by this stream. It + should be the source stream in a chain of stream filters. */ virtual void* getEventTarget() const = 0; @@ -113,12 +103,6 @@ public: */ virtual UInt32 getSize() const = 0; - //! Get the event filter - /*! - Returns the current event filter. - */ - virtual IEventJob* getEventFilter() const = 0; - //! Get input ready event type /*! Returns the input ready event type. A stream sends this event diff --git a/lib/net/CTCPSocket.cpp b/lib/net/CTCPSocket.cpp index 61ff3b8c..c6c87789 100644 --- a/lib/net/CTCPSocket.cpp +++ b/lib/net/CTCPSocket.cpp @@ -31,8 +31,7 @@ CTCPSocket::CTCPSocket() : m_mutex(), - m_flushed(&m_mutex, true), - m_eventFilter(NULL) + m_flushed(&m_mutex, true) { try { m_socket = ARCH->newSocket(IArchNetwork::kINET, IArchNetwork::kSTREAM); @@ -47,8 +46,7 @@ CTCPSocket::CTCPSocket() : CTCPSocket::CTCPSocket(CArchSocket socket) : m_mutex(), m_socket(socket), - m_flushed(&m_mutex, true), - m_eventFilter(NULL) + m_flushed(&m_mutex, true) { assert(m_socket != NULL); @@ -92,7 +90,7 @@ CTCPSocket::close() // clear buffers and enter disconnected state if (m_connected) { - sendSocketEvent(getDisconnectedEvent()); + sendEvent(getDisconnectedEvent()); } onDisconnected(); @@ -132,7 +130,7 @@ CTCPSocket::read(void* buffer, UInt32 n) // if no more data and we cannot read or write then send disconnected if (n > 0 && m_inputBuffer.getSize() == 0 && !m_readable && !m_writable) { - sendSocketEvent(getDisconnectedEvent()); + sendEvent(getDisconnectedEvent()); m_connected = false; } @@ -148,7 +146,7 @@ CTCPSocket::write(const void* buffer, UInt32 n) // must not have shutdown output if (!m_writable) { - sendStreamEvent(getOutputErrorEvent()); + sendEvent(getOutputErrorEvent()); return; } @@ -197,7 +195,7 @@ CTCPSocket::shutdownInput() // shutdown buffer for reading if (m_readable) { - sendStreamEvent(getInputShutdownEvent()); + sendEvent(getInputShutdownEvent()); onInputShutdown(); useNewJob = true; } @@ -224,7 +222,7 @@ CTCPSocket::shutdownOutput() // shutdown buffer for writing if (m_writable) { - sendStreamEvent(getOutputShutdownEvent()); + sendEvent(getOutputShutdownEvent()); onOutputShutdown(); useNewJob = true; } @@ -234,13 +232,6 @@ CTCPSocket::shutdownOutput() } } -void -CTCPSocket::setEventFilter(IEventJob* filter) -{ - CLock lock(&m_mutex); - m_eventFilter = filter; -} - bool CTCPSocket::isReady() const { @@ -255,13 +246,6 @@ CTCPSocket::getSize() const return m_inputBuffer.getSize(); } -IEventJob* -CTCPSocket::getEventFilter() const -{ - CLock lock(&m_mutex); - return m_eventFilter; -} - void CTCPSocket::connect(const CNetworkAddress& addr) { @@ -276,7 +260,7 @@ CTCPSocket::connect(const CNetworkAddress& addr) try { if (ARCH->connectSocket(m_socket, addr.getAddress())) { - sendSocketEvent(getConnectedEvent()); + sendEvent(getConnectedEvent()); onConnected(); } else { @@ -357,12 +341,6 @@ CTCPSocket::newJob() } } -void -CTCPSocket::sendSocketEvent(CEvent::Type type) -{ - EVENTQUEUE->addEvent(CEvent(type, getEventTarget(), NULL)); -} - void CTCPSocket::sendConnectionFailedEvent(const char* msg) { @@ -374,14 +352,9 @@ CTCPSocket::sendConnectionFailedEvent(const char* msg) } void -CTCPSocket::sendStreamEvent(CEvent::Type type) +CTCPSocket::sendEvent(CEvent::Type type) { - if (m_eventFilter != NULL) { - m_eventFilter->run(CEvent(type, getEventTarget(), NULL)); - } - else { - EVENTQUEUE->addEvent(CEvent(type, getEventTarget(), NULL)); - } + EVENTQUEUE->addEvent(CEvent(type, getEventTarget(), NULL)); } void @@ -455,7 +428,7 @@ CTCPSocket::serviceConnecting(ISocketMultiplexerJob* job, } if (write) { - sendSocketEvent(getConnectedEvent()); + sendEvent(getConnectedEvent()); onConnected(); return newJob(); } @@ -470,7 +443,7 @@ CTCPSocket::serviceConnected(ISocketMultiplexerJob* job, CLock lock(&m_mutex); if (error) { - sendSocketEvent(getDisconnectedEvent()); + sendEvent(getDisconnectedEvent()); onDisconnected(); return newJob(); } @@ -488,7 +461,7 @@ CTCPSocket::serviceConnected(ISocketMultiplexerJob* job, if (n > 0) { m_outputBuffer.pop(n); if (m_outputBuffer.getSize() == 0) { - sendStreamEvent(getOutputFlushedEvent()); + sendEvent(getOutputFlushedEvent()); m_flushed = true; m_flushed.broadcast(); needNewJob = true; @@ -499,9 +472,9 @@ CTCPSocket::serviceConnected(ISocketMultiplexerJob* job, // remote read end of stream hungup. our output side // has therefore shutdown. onOutputShutdown(); - sendStreamEvent(getOutputShutdownEvent()); + sendEvent(getOutputShutdownEvent()); if (!m_readable && m_inputBuffer.getSize() == 0) { - sendSocketEvent(getDisconnectedEvent()); + sendEvent(getDisconnectedEvent()); m_connected = false; } needNewJob = true; @@ -509,14 +482,14 @@ CTCPSocket::serviceConnected(ISocketMultiplexerJob* job, catch (XArchNetworkDisconnected&) { // stream hungup onDisconnected(); - sendSocketEvent(getDisconnectedEvent()); + sendEvent(getDisconnectedEvent()); needNewJob = true; } catch (XArchNetwork&) { // other write error onDisconnected(); - sendStreamEvent(getOutputErrorEvent()); - sendSocketEvent(getDisconnectedEvent()); + sendEvent(getOutputErrorEvent()); + sendEvent(getDisconnectedEvent()); needNewJob = true; } } @@ -536,16 +509,16 @@ CTCPSocket::serviceConnected(ISocketMultiplexerJob* job, // send input ready if input buffer was empty if (wasEmpty) { - sendStreamEvent(getInputReadyEvent()); + sendEvent(getInputReadyEvent()); } } else { // remote write end of stream hungup. our input side // has therefore shutdown but don't flush our buffer // since there's still data to be read. - sendStreamEvent(getInputShutdownEvent()); + sendEvent(getInputShutdownEvent()); if (!m_writable && m_inputBuffer.getSize() == 0) { - sendSocketEvent(getDisconnectedEvent()); + sendEvent(getDisconnectedEvent()); m_connected = false; } m_readable = false; @@ -554,7 +527,7 @@ CTCPSocket::serviceConnected(ISocketMultiplexerJob* job, } catch (XArchNetworkDisconnected&) { // stream hungup - sendSocketEvent(getDisconnectedEvent()); + sendEvent(getDisconnectedEvent()); onDisconnected(); needNewJob = true; } diff --git a/lib/net/CTCPSocket.h b/lib/net/CTCPSocket.h index 8b60d5b6..aa1df8c1 100644 --- a/lib/net/CTCPSocket.h +++ b/lib/net/CTCPSocket.h @@ -46,10 +46,8 @@ public: virtual void flush(); virtual void shutdownInput(); virtual void shutdownOutput(); - virtual void setEventFilter(IEventJob* filter); virtual bool isReady() const; virtual UInt32 getSize() const; - virtual IEventJob* getEventFilter() const; // IDataSocket overrides virtual void connect(const CNetworkAddress&); @@ -59,9 +57,8 @@ private: void setJob(ISocketMultiplexerJob*); ISocketMultiplexerJob* newJob(); - void sendSocketEvent(CEvent::Type); void sendConnectionFailedEvent(const char*); - void sendStreamEvent(CEvent::Type); + void sendEvent(CEvent::Type); void onConnected(); void onInputShutdown(); @@ -84,7 +81,6 @@ private: bool m_connected; bool m_readable; bool m_writable; - IEventJob* m_eventFilter; }; #endif diff --git a/lib/net/IDataSocket.h b/lib/net/IDataSocket.h index 85b85480..d760d4ab 100644 --- a/lib/net/IDataSocket.h +++ b/lib/net/IDataSocket.h @@ -79,10 +79,8 @@ public: virtual void flush() = 0; virtual void shutdownInput() = 0; virtual void shutdownOutput() = 0; - virtual void setEventFilter(IEventJob* filter) = 0; virtual bool isReady() const = 0; virtual UInt32 getSize() const = 0; - virtual IEventJob* getEventFilter() const = 0; private: static CEvent::Type s_connectedEvent; diff --git a/lib/server/CClientListener.cpp b/lib/server/CClientListener.cpp index 45f3a713..803f44a0 100644 --- a/lib/server/CClientListener.cpp +++ b/lib/server/CClientListener.cpp @@ -75,7 +75,12 @@ CClientListener::~CClientListener() for (CNewClients::iterator index = m_newClients.begin(); index != m_newClients.end(); ++index) { CClientProxyUnknown* client = *index; - EVENTQUEUE->removeHandler(client); + EVENTQUEUE->removeHandler( + CClientProxyUnknown::getSuccessEvent(), client); + EVENTQUEUE->removeHandler( + CClientProxyUnknown::getFailureEvent(), client); + EVENTQUEUE->removeHandler( + CClientProxy::getDisconnectedEvent(), client); delete client; } diff --git a/lib/synergy/CPacketStreamFilter.cpp b/lib/synergy/CPacketStreamFilter.cpp index d59d2fa1..4aad9c02 100644 --- a/lib/synergy/CPacketStreamFilter.cpp +++ b/lib/synergy/CPacketStreamFilter.cpp @@ -24,19 +24,14 @@ CPacketStreamFilter::CPacketStreamFilter(IStream* stream, bool adoptStream) : CStreamFilter(stream, adoptStream), m_size(0), - m_eventFilter(NULL), m_inputShutdown(false) { - // install event filter - getStream()->setEventFilter(new TMethodEventJob( - this, &CPacketStreamFilter::filterEvent, NULL)); + // do nothing } CPacketStreamFilter::~CPacketStreamFilter() { - IEventJob* job = getStream()->getEventFilter(); - getStream()->setEventFilter(NULL); - delete job; + // do nothing } void @@ -79,7 +74,8 @@ CPacketStreamFilter::read(void* buffer, UInt32 n) readPacketSize(); if (m_inputShutdown && m_size == 0) { - sendEvent(CEvent(getInputShutdownEvent(), getEventTarget(), NULL)); + EVENTQUEUE->addEvent(CEvent(getInputShutdownEvent(), + getEventTarget(), NULL)); } return n; @@ -109,13 +105,6 @@ CPacketStreamFilter::shutdownInput() CStreamFilter::shutdownInput(); } -void -CPacketStreamFilter::setEventFilter(IEventJob* filter) -{ - CLock lock(&m_mutex); - m_eventFilter = filter; -} - bool CPacketStreamFilter::isReady() const { @@ -130,13 +119,6 @@ CPacketStreamFilter::getSize() const return isReadyNoLock() ? m_size : 0; } -IEventJob* -CPacketStreamFilter::getEventFilter() const -{ - CLock lock(&m_mutex); - return m_eventFilter; -} - bool CPacketStreamFilter::isReadyNoLock() const { @@ -159,11 +141,9 @@ CPacketStreamFilter::readPacketSize() } } -void +bool CPacketStreamFilter::readMore() { - // note -- m_mutex must be locked on entry - // note if we have whole packet bool wasReady = isReadyNoLock(); @@ -184,40 +164,27 @@ CPacketStreamFilter::readMore() // if we weren't ready before but now we are then send a // input ready event apparently from the filtered stream. - if (wasReady != isReady) { - sendEvent(CEvent(getInputReadyEvent(), getEventTarget(), NULL)); - } + return (wasReady != isReady); } void -CPacketStreamFilter::sendEvent(const CEvent& event) +CPacketStreamFilter::filterEvent(const CEvent& event) { - if (m_eventFilter != NULL) { - m_eventFilter->run(event); - } - else { - EVENTQUEUE->addEvent(event); - } -} - -void -CPacketStreamFilter::filterEvent(const CEvent& event, void*) -{ - CLock lock(&m_mutex); - if (event.getType() == getInputReadyEvent()) { - readMore(); - return; + CLock lock(&m_mutex); + if (!readMore()) { + return; + } } else if (event.getType() == getInputShutdownEvent()) { // discard this if we have buffered data + CLock lock(&m_mutex); m_inputShutdown = true; - if (m_size == 0) { - sendEvent(CEvent(getInputShutdownEvent(), getEventTarget(), NULL)); + if (m_size != 0) { + return; } - return; } // pass event - sendEvent(event); + CStreamFilter::filterEvent(event); } diff --git a/lib/synergy/CPacketStreamFilter.h b/lib/synergy/CPacketStreamFilter.h index 3156f40d..93ddd8fa 100644 --- a/lib/synergy/CPacketStreamFilter.h +++ b/lib/synergy/CPacketStreamFilter.h @@ -33,24 +33,22 @@ public: virtual UInt32 read(void* buffer, UInt32 n); virtual void write(const void* buffer, UInt32 n); virtual void shutdownInput(); - virtual void setEventFilter(IEventJob* filter); virtual bool isReady() const; virtual UInt32 getSize() const; - virtual IEventJob* getEventFilter() const; + +protected: + // CStreamFilter overrides + virtual void filterEvent(const CEvent&); private: bool isReadyNoLock() const; void readPacketSize(); - - void readMore(); - void sendEvent(const CEvent&); - void filterEvent(const CEvent&, void*); + bool readMore(); private: CMutex m_mutex; UInt32 m_size; CStreamBuffer m_buffer; - IEventJob* m_eventFilter; bool m_inputShutdown; };