diff --git a/src/lib/ipc/CIpcClientProxy.cpp b/src/lib/ipc/CIpcClientProxy.cpp index 6d8184a8..c48e4413 100644 --- a/src/lib/ipc/CIpcClientProxy.cpp +++ b/src/lib/ipc/CIpcClientProxy.cpp @@ -16,8 +16,10 @@ */ #include "CIpcClientProxy.h" +#include "IStream.h" -CIpcClientProxy::CIpcClientProxy() +CIpcClientProxy::CIpcClientProxy(IStream& stream) : +m_stream(stream) { } diff --git a/src/lib/ipc/CIpcClientProxy.h b/src/lib/ipc/CIpcClientProxy.h index 6e95e526..6722cecc 100644 --- a/src/lib/ipc/CIpcClientProxy.h +++ b/src/lib/ipc/CIpcClientProxy.h @@ -17,8 +17,13 @@ #pragma once +class IStream; + class CIpcClientProxy { public: - CIpcClientProxy(); + CIpcClientProxy(IStream& stream); virtual ~CIpcClientProxy(); + +private: + IStream& m_stream; }; diff --git a/src/lib/ipc/CIpcServer.cpp b/src/lib/ipc/CIpcServer.cpp index 1e16c17c..9ab9241b 100644 --- a/src/lib/ipc/CIpcServer.cpp +++ b/src/lib/ipc/CIpcServer.cpp @@ -17,15 +17,30 @@ #include "CIpcServer.h" #include "Ipc.h" +#include "IEventQueue.h" +#include "TMethodEventJob.h" +#include "CEvent.h" +#include "CLog.h" +#include "CIpcClientProxy.h" +#include "IStream.h" +#include "IDataSocket.h" + +CEvent::Type CIpcServer::s_clientConnectedEvent = CEvent::kUnknown; CIpcServer::CIpcServer() : m_address(CNetworkAddress(IPC_HOST, IPC_PORT)) { m_address.resolve(); + + EVENTQUEUE->adoptHandler( + IListenSocket::getConnectingEvent(), &m_socket, + new TMethodEventJob( + this, &CIpcServer::handleClientConnecting)); } CIpcServer::~CIpcServer() { + EVENTQUEUE->removeHandler(IListenSocket::getConnectingEvent(), &m_socket); } void @@ -33,3 +48,26 @@ CIpcServer::listen() { m_socket.bind(m_address); } + +void +CIpcServer::handleClientConnecting(const CEvent&, void*) +{ + IStream* stream = m_socket.accept(); + if (stream == NULL) { + return; + } + LOG((CLOG_NOTE "accepted ipc client connection")); + + // TODO: delete on disconnect + CIpcClientProxy* proxy = new CIpcClientProxy(*stream); + m_clients.insert(proxy); + + EVENTQUEUE->addEvent(CEvent(getClientConnectedEvent(), this, proxy)); +} + +CEvent::Type +CIpcServer::getClientConnectedEvent() +{ + return EVENTQUEUE->registerTypeOnce( + s_clientConnectedEvent, "CIpcServer::clientConnected"); +} diff --git a/src/lib/ipc/CIpcServer.h b/src/lib/ipc/CIpcServer.h index f4ab5395..608bdd7f 100644 --- a/src/lib/ipc/CIpcServer.h +++ b/src/lib/ipc/CIpcServer.h @@ -19,23 +19,47 @@ #include "CTCPListenSocket.h" #include "CNetworkAddress.h" +#include + +class CEvent; +class CIpcClientProxy; //! IPC server for communication between daemon and GUI. /*! - * The IPC server listens on localhost. The IPC client runs on both the - * client/server process or the GUI. The IPC server runs on the daemon process. - * This allows the GUI to send config changes to the daemon and client/server, - * and allows the daemon and client/server to send log data to the GUI. - */ +The IPC server listens on localhost. The IPC client runs on both the +client/server process or the GUI. The IPC server runs on the daemon process. +This allows the GUI to send config changes to the daemon and client/server, +and allows the daemon and client/server to send log data to the GUI. +*/ class CIpcServer { public: CIpcServer(); virtual ~CIpcServer(); - //! Opens a TCP socket only allowing local connections - void listen(); + //! @name manipulators + //@{ + + //! Opens a TCP socket only allowing local connections. + void listen(); + + //@} + //! @name accessors + //@{ + + //! This event is raised when we have created the client proxy. + static CEvent::Type getClientConnectedEvent(); + + //@} private: - CTCPListenSocket m_socket; - CNetworkAddress m_address; + void handleClientConnecting(const CEvent&, void*); + +private: + typedef std::set CClientSet; + + CTCPListenSocket m_socket; + CNetworkAddress m_address; + CClientSet m_clients; + + static CEvent::Type s_clientConnectedEvent; }; diff --git a/src/test/integtests/CIpcTests.cpp b/src/test/integtests/CIpcTests.cpp index 8bbcd700..6ae4b99f 100644 --- a/src/test/integtests/CIpcTests.cpp +++ b/src/test/integtests/CIpcTests.cpp @@ -20,17 +20,105 @@ #include "CIpcClient.h" #include "CSocketMultiplexer.h" #include "CEventQueue.h" +#include "TMethodEventJob.h" +#include "CThread.h" +#include "TMethodJob.h" +#include "CArch.h" +#include "CLog.h" -TEST(CIpcTests, connectToServer) +class CIpcTests : public ::testing::Test { +public: + CIpcTests(); + virtual ~CIpcTests(); + void handleClientConnected(const CEvent&, void* vclient); + void raiseQuitEvent(); + +private: + void timeoutThread(void*); + +public: + bool m_quitOnClientConnect; + bool m_clientConnected; + bool m_timeoutCheck; + double m_timeout; + +private: + CThread* m_timeoutThread; +}; + +TEST_F(CIpcTests, connectToServer) +{ + m_quitOnClientConnect = true; + CSocketMultiplexer multiplexer; - CEventQueue eventQueue; + CEventQueue events; CIpcServer server; server.listen(); + events.adoptHandler( + CIpcServer::getClientConnectedEvent(), &server, + new TMethodEventJob( + this, &CIpcTests::handleClientConnected)); + CIpcClient client; client.connect(); - eventQueue.loop(); + m_timeoutCheck = true; + m_timeout = ARCH->time() + 5; // 5 sec timeout. + events.loop(); + + EXPECT_EQ(true, m_clientConnected); } + +CIpcTests::CIpcTests() : +m_timeoutThread(nullptr), +m_quitOnClientConnect(false), +m_clientConnected(false), +m_timeoutCheck(false), +m_timeout(0) +{ + m_timeoutThread = new CThread( + new TMethodJob( + this, &CIpcTests::timeoutThread, nullptr)); +} + +CIpcTests::~CIpcTests() +{ + delete m_timeoutThread; +} + + +void +CIpcTests::handleClientConnected(const CEvent&, void* vclient) +{ + m_clientConnected = true; + + if (m_quitOnClientConnect) { + raiseQuitEvent(); + } +} + +void +CIpcTests::raiseQuitEvent() +{ + EVENTQUEUE->addEvent(CEvent(CEvent::kQuit, nullptr)); +} + +void +CIpcTests::timeoutThread(void*) +{ + while (true) { + if (!m_timeoutCheck) { + ARCH->sleep(1); + continue; + } + + if (ARCH->time() > m_timeout) { + LOG((CLOG_ERR "timeout")); + raiseQuitEvent(); + m_timeoutCheck = false; + } + } +} \ No newline at end of file