diff --git a/CMakeLists.txt b/CMakeLists.txt index aab915d7..c0c8bacb 100755 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -382,7 +382,8 @@ set(DROGON_HEADERS lib/inc/drogon/WebSocketController.h lib/inc/drogon/drogon.h lib/inc/drogon/version.h - lib/inc/drogon/drogon_callbacks.h) + lib/inc/drogon/drogon_callbacks.h + lib/inc/drogon/PubSubService.h) install(FILES ${DROGON_HEADERS} DESTINATION ${INSTALL_INCLUDE_DIR}/drogon) set(ORM_HEADERS diff --git a/lib/inc/drogon/PubSubService.h b/lib/inc/drogon/PubSubService.h new file mode 100644 index 00000000..6e834383 --- /dev/null +++ b/lib/inc/drogon/PubSubService.h @@ -0,0 +1,272 @@ +/** + * + * PubSubService.h + * An Tao + * + * Copyright 2018, An Tao. All rights reserved. + * https://github.com/an-tao/drogon + * Use of this source code is governed by a MIT license + * that can be found in the License file. + * + * Drogon + * + */ + +#pragma once + +#include +#include +#include +#include +#include + +namespace drogon +{ +using SubscriberID = uint64_t; + +/** + * @brief This class template presents an unnamed topic. + * + * @tparam MessageType + */ +template +class Topic : public trantor::NonCopyable +{ + public: + using MessageHandler = std::function; +#if __cplusplus >= 201703L | defined _WIN32 + using SharedMutex = std::shared_mutex; +#else + using SharedMutex = std::shared_timed_mutex; +#endif + /** + * @brief Publish a message, every subscriber in the topic will receive the + * message. + * + * @param message + */ + void publish(const MessageType &message) const + { + std::shared_lock lock(mutex_); + for (auto &pair : handlersMap_) + { + pair.second(message); + } + } + + /** + * @brief Subcribe to the topic. + * + * @param handler is invoked when a message arrives. + * @return SubscriberID + */ + SubscriberID subscribe(const MessageHandler &handler) + { + std::unique_lock lock(mutex_); + handlersMap_[++id_] = handler; + return id_; + } + + /** + * @brief Subcribe to the topic. + * + * @param handler is invoked when a message arrives. + * @return SubscriberID + */ + SubscriberID subscribe(MessageHandler &&handler) + { + std::unique_lock lock(mutex_); + handlersMap_[++id_] = std::move(handler); + return id_; + } + + /** + * @brief Unsubscribe from the topic. + */ + void unsubscribe(SubscriberID id) + { + std::unique_lock lock(mutex_); + handlersMap_.erase(id); + } + + /** + * @brief Check if the topic is empty. + * + * @return true means there are no subscribers. + * @return false means there are subscribers in the topic. + */ + bool empty() const + { + std::shared_lock lock(mutex_); + return handlersMap_.empty(); + } + /** + * @brief Remove all subscribers from the topic. + * + */ + void clear() + { + std::unique_lock lock(mutex_); + handlersMap_.clear(); + } + + private: + std::unordered_map handlersMap_; + mutable SharedMutex mutex_; + SubscriberID id_; +}; + +/** + * @brief This class template implements a publish-subscribe pattern with + * multiple named topics. + * + * @tparam MessageType The message type. + */ +template +class PubSubService : public trantor::NonCopyable +{ + public: + using MessageHandler = + std::function; +#if __cplusplus >= 201703L | defined _WIN32 + using SharedMutex = std::shared_mutex; +#else + using SharedMutex = std::shared_timed_mutex; +#endif + + /** + * @brief Publish a message to a topic. The message will be broadcasted to + * every subscriber. + */ + void publish(const std::string &topicName, const MessageType &message) const + { + std::shared_ptr> topicPtr; + { + std::shared_lock lock(mutex_); + auto iter = topicMap_.find(topicName); + if (iter != topicMap_.end()) + { + topicPtr = iter->second; + } + else + { + return; + } + } + topicPtr->publish(message); + } + + /** + * @brief Subscribe to a topic. When a message is published to the topic, + * the handler is invoked by passing the topic and message as parameters. + */ + SubscriberID subscribe(const std::string &topicName, + const MessageHandler &handler) + { + auto topicHandler = [topicName, handler](const MessageType &message) { + handler(topicName, message); + }; + return subscribeToTopic(topicName, std::move(topicHandler)); + } + + /** + * @brief Subscribe to a topic. When a message is published to the topic, + * the handler is invoked by passing the topic and message as parameters. + */ + SubscriberID subscribe(const std::string &topicName, + MessageHandler &&handler) + { + auto topicHandler = [topicName, handler = std::move(handler)]( + const MessageType &message) { + handler(topicName, message); + }; + return subscribeToTopic(topicName, std::move(topicHandler)); + } + + /** + * @brief Unsubscribe from a topic. + * + * @param topic + * @param id The subscriber ID returned from the subscribe method. + */ + void unsubscribe(const std::string &topicName, SubscriberID id) + { + { + std::shared_lock lock(mutex_); + auto iter = topicMap_.find(topicName); + if (iter == topicMap_.end()) + { + return; + } + iter->second->unsubscribe(id); + if (!iter->second->empty()) + return; + } + std::unique_lock lock(mutex_); + auto iter = topicMap_.find(topicName); + if (iter == topicMap_.end()) + { + return; + } + if (iter->second->empty()) + topicMap_.erase(iter); + } + + /** + * @brief return the number of topics. + */ + size_t size() const + { + std::shared_lock lock(mutex_); + return topicMap_.size(); + } + + /** + * @brief remove all topics. + */ + void clear() + { + std::unique_lock lock(mutex_); + topicMap_.clear(); + } + + /** + * @brief Remove a topic + * + */ + void removeTopic(const std::string &topicName) + { + std::unique_lock lock(mutex_); + topicMap_.erase(topicName); + } + + private: + std::unordered_map>> + topicMap_; + mutable SharedMutex mutex_; + SubscriberID subID_ = 0; + SubscriberID subscribeToTopic( + const std::string &topicName, + typename Topic::MessageHandler &&handler) + { + { + std::shared_lock lock(mutex_); + auto iter = topicMap_.find(topicName); + if (iter != topicMap_.end()) + { + return iter->second->subscribe(std::move(handler)); + } + } + std::unique_lock lock(mutex_); + auto iter = topicMap_.find(topicName); + if (iter != topicMap_.end()) + { + return iter->second->subscribe(std::move(handler)); + } + auto topicPtr = std::make_shared>(); + auto id = topicPtr->subscribe(std::move(handler)); + topicMap_[topicName] = std::move(topicPtr); + return id; + } +}; +} // namespace drogon diff --git a/lib/src/Utilities.cc b/lib/src/Utilities.cc index cee30590..d17b499f 100644 --- a/lib/src/Utilities.cc +++ b/lib/src/Utilities.cc @@ -337,11 +337,11 @@ std::string getUuid() return ret; #elif defined __FreeBSD__ uuid_t *uuid = new uuid_t; - char* binstr = (char *) malloc(16); + char *binstr = (char *)malloc(16); uuidgen(uuid, 1); #if _BYTE_ORDER == _LITTLE_ENDIAN uuid_enc_le(binstr, uuid); -#else /* _BYTE_ORDER != _LITTLE_ENDIAN */ +#else /* _BYTE_ORDER != _LITTLE_ENDIAN */ uuid_enc_be(binstr, uuid); #endif /* _BYTE_ORDER == _LITTLE_ENDIAN */ delete uuid; diff --git a/unittest/CMakeLists.txt b/unittest/CMakeLists.txt index e8ce0659..ef4d8fda 100644 --- a/unittest/CMakeLists.txt +++ b/unittest/CMakeLists.txt @@ -5,6 +5,7 @@ add_executable(md5_unittest MD5Unittest.cpp ../lib/src/ssl_funcs/Md5.cc) add_executable(sha1_unittest SHA1Unittest.cpp ../lib/src/ssl_funcs/Sha1.cc) add_executable(ostringstream_unittest OStringStreamUnitttest.cpp) add_executable(base64_unittest Base64Unittest.cpp) +add_executable(pubsubservice_unittest PubSubServiceUnittest.cpp) if(Brotli_FOUND) add_executable(brotli_unittest BrotliUnittest.cpp) endif() @@ -16,7 +17,8 @@ set(UNITTEST_TARGETS md5_unittest sha1_unittest ostringstream_unittest - base64_unittest) + base64_unittest + pubsubservice_unittest) if(Brotli_FOUND) set(UNITTEST_TARGETS ${UNITTEST_TARGETS} brotli_unittest) endif() diff --git a/unittest/PubSubServiceUnittest.cpp b/unittest/PubSubServiceUnittest.cpp new file mode 100644 index 00000000..d24756f8 --- /dev/null +++ b/unittest/PubSubServiceUnittest.cpp @@ -0,0 +1,25 @@ +#include +#include +#include +#include + +TEST(PubSubServiceTest, normal) +{ + drogon::PubSubService service; + auto id=service.subscribe("topic1", + [](const std::string &topic, const std::string &message) { + EXPECT_STREQ(topic.c_str(), "topic1"); + EXPECT_STREQ(message.c_str(), "hello world"); + }); + service.publish("topic1", "hello world"); + service.publish("topic2", "hello world"); + EXPECT_EQ(service.size(), 1); + service.unsubscribe("topic1", id); + EXPECT_EQ(service.size(), 0); +} + +int main(int argc, char **argv) +{ + testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} \ No newline at end of file