Create a class template for publish subscribe pattern (#443)
This commit is contained in:
parent
84e503a948
commit
49472a3cc4
|
@ -382,7 +382,8 @@ set(DROGON_HEADERS
|
|||
lib/inc/drogon/WebSocketController.h
|
||||
lib/inc/drogon/drogon.h
|
||||
lib/inc/drogon/version.h
|
||||
lib/inc/drogon/drogon_callbacks.h)
|
||||
lib/inc/drogon/drogon_callbacks.h
|
||||
lib/inc/drogon/PubSubService.h)
|
||||
install(FILES ${DROGON_HEADERS} DESTINATION ${INSTALL_INCLUDE_DIR}/drogon)
|
||||
|
||||
set(ORM_HEADERS
|
||||
|
|
|
@ -0,0 +1,272 @@
|
|||
/**
|
||||
*
|
||||
* PubSubService.h
|
||||
* An Tao
|
||||
*
|
||||
* Copyright 2018, An Tao. All rights reserved.
|
||||
* https://github.com/an-tao/drogon
|
||||
* Use of this source code is governed by a MIT license
|
||||
* that can be found in the License file.
|
||||
*
|
||||
* Drogon
|
||||
*
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <trantor/utils/NonCopyable.h>
|
||||
#include <string>
|
||||
#include <unordered_map>
|
||||
#include <functional>
|
||||
#include <shared_mutex>
|
||||
|
||||
namespace drogon
|
||||
{
|
||||
using SubscriberID = uint64_t;
|
||||
|
||||
/**
|
||||
* @brief This class template presents an unnamed topic.
|
||||
*
|
||||
* @tparam MessageType
|
||||
*/
|
||||
template <typename MessageType>
|
||||
class Topic : public trantor::NonCopyable
|
||||
{
|
||||
public:
|
||||
using MessageHandler = std::function<void(const MessageType &)>;
|
||||
#if __cplusplus >= 201703L | defined _WIN32
|
||||
using SharedMutex = std::shared_mutex;
|
||||
#else
|
||||
using SharedMutex = std::shared_timed_mutex;
|
||||
#endif
|
||||
/**
|
||||
* @brief Publish a message, every subscriber in the topic will receive the
|
||||
* message.
|
||||
*
|
||||
* @param message
|
||||
*/
|
||||
void publish(const MessageType &message) const
|
||||
{
|
||||
std::shared_lock<SharedMutex> lock(mutex_);
|
||||
for (auto &pair : handlersMap_)
|
||||
{
|
||||
pair.second(message);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Subcribe to the topic.
|
||||
*
|
||||
* @param handler is invoked when a message arrives.
|
||||
* @return SubscriberID
|
||||
*/
|
||||
SubscriberID subscribe(const MessageHandler &handler)
|
||||
{
|
||||
std::unique_lock<SharedMutex> lock(mutex_);
|
||||
handlersMap_[++id_] = handler;
|
||||
return id_;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Subcribe to the topic.
|
||||
*
|
||||
* @param handler is invoked when a message arrives.
|
||||
* @return SubscriberID
|
||||
*/
|
||||
SubscriberID subscribe(MessageHandler &&handler)
|
||||
{
|
||||
std::unique_lock<SharedMutex> lock(mutex_);
|
||||
handlersMap_[++id_] = std::move(handler);
|
||||
return id_;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Unsubscribe from the topic.
|
||||
*/
|
||||
void unsubscribe(SubscriberID id)
|
||||
{
|
||||
std::unique_lock<SharedMutex> lock(mutex_);
|
||||
handlersMap_.erase(id);
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Check if the topic is empty.
|
||||
*
|
||||
* @return true means there are no subscribers.
|
||||
* @return false means there are subscribers in the topic.
|
||||
*/
|
||||
bool empty() const
|
||||
{
|
||||
std::shared_lock<SharedMutex> lock(mutex_);
|
||||
return handlersMap_.empty();
|
||||
}
|
||||
/**
|
||||
* @brief Remove all subscribers from the topic.
|
||||
*
|
||||
*/
|
||||
void clear()
|
||||
{
|
||||
std::unique_lock<SharedMutex> lock(mutex_);
|
||||
handlersMap_.clear();
|
||||
}
|
||||
|
||||
private:
|
||||
std::unordered_map<SubscriberID, MessageHandler> handlersMap_;
|
||||
mutable SharedMutex mutex_;
|
||||
SubscriberID id_;
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief This class template implements a publish-subscribe pattern with
|
||||
* multiple named topics.
|
||||
*
|
||||
* @tparam MessageType The message type.
|
||||
*/
|
||||
template <typename MessageType>
|
||||
class PubSubService : public trantor::NonCopyable
|
||||
{
|
||||
public:
|
||||
using MessageHandler =
|
||||
std::function<void(const std::string &, const MessageType &)>;
|
||||
#if __cplusplus >= 201703L | defined _WIN32
|
||||
using SharedMutex = std::shared_mutex;
|
||||
#else
|
||||
using SharedMutex = std::shared_timed_mutex;
|
||||
#endif
|
||||
|
||||
/**
|
||||
* @brief Publish a message to a topic. The message will be broadcasted to
|
||||
* every subscriber.
|
||||
*/
|
||||
void publish(const std::string &topicName, const MessageType &message) const
|
||||
{
|
||||
std::shared_ptr<Topic<MessageType>> topicPtr;
|
||||
{
|
||||
std::shared_lock<SharedMutex> lock(mutex_);
|
||||
auto iter = topicMap_.find(topicName);
|
||||
if (iter != topicMap_.end())
|
||||
{
|
||||
topicPtr = iter->second;
|
||||
}
|
||||
else
|
||||
{
|
||||
return;
|
||||
}
|
||||
}
|
||||
topicPtr->publish(message);
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Subscribe to a topic. When a message is published to the topic,
|
||||
* the handler is invoked by passing the topic and message as parameters.
|
||||
*/
|
||||
SubscriberID subscribe(const std::string &topicName,
|
||||
const MessageHandler &handler)
|
||||
{
|
||||
auto topicHandler = [topicName, handler](const MessageType &message) {
|
||||
handler(topicName, message);
|
||||
};
|
||||
return subscribeToTopic(topicName, std::move(topicHandler));
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Subscribe to a topic. When a message is published to the topic,
|
||||
* the handler is invoked by passing the topic and message as parameters.
|
||||
*/
|
||||
SubscriberID subscribe(const std::string &topicName,
|
||||
MessageHandler &&handler)
|
||||
{
|
||||
auto topicHandler = [topicName, handler = std::move(handler)](
|
||||
const MessageType &message) {
|
||||
handler(topicName, message);
|
||||
};
|
||||
return subscribeToTopic(topicName, std::move(topicHandler));
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Unsubscribe from a topic.
|
||||
*
|
||||
* @param topic
|
||||
* @param id The subscriber ID returned from the subscribe method.
|
||||
*/
|
||||
void unsubscribe(const std::string &topicName, SubscriberID id)
|
||||
{
|
||||
{
|
||||
std::shared_lock<SharedMutex> lock(mutex_);
|
||||
auto iter = topicMap_.find(topicName);
|
||||
if (iter == topicMap_.end())
|
||||
{
|
||||
return;
|
||||
}
|
||||
iter->second->unsubscribe(id);
|
||||
if (!iter->second->empty())
|
||||
return;
|
||||
}
|
||||
std::unique_lock<SharedMutex> lock(mutex_);
|
||||
auto iter = topicMap_.find(topicName);
|
||||
if (iter == topicMap_.end())
|
||||
{
|
||||
return;
|
||||
}
|
||||
if (iter->second->empty())
|
||||
topicMap_.erase(iter);
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief return the number of topics.
|
||||
*/
|
||||
size_t size() const
|
||||
{
|
||||
std::shared_lock<SharedMutex> lock(mutex_);
|
||||
return topicMap_.size();
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief remove all topics.
|
||||
*/
|
||||
void clear()
|
||||
{
|
||||
std::unique_lock<SharedMutex> lock(mutex_);
|
||||
topicMap_.clear();
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Remove a topic
|
||||
*
|
||||
*/
|
||||
void removeTopic(const std::string &topicName)
|
||||
{
|
||||
std::unique_lock<SharedMutex> lock(mutex_);
|
||||
topicMap_.erase(topicName);
|
||||
}
|
||||
|
||||
private:
|
||||
std::unordered_map<std::string, std::shared_ptr<Topic<MessageType>>>
|
||||
topicMap_;
|
||||
mutable SharedMutex mutex_;
|
||||
SubscriberID subID_ = 0;
|
||||
SubscriberID subscribeToTopic(
|
||||
const std::string &topicName,
|
||||
typename Topic<MessageType>::MessageHandler &&handler)
|
||||
{
|
||||
{
|
||||
std::shared_lock<SharedMutex> lock(mutex_);
|
||||
auto iter = topicMap_.find(topicName);
|
||||
if (iter != topicMap_.end())
|
||||
{
|
||||
return iter->second->subscribe(std::move(handler));
|
||||
}
|
||||
}
|
||||
std::unique_lock<SharedMutex> lock(mutex_);
|
||||
auto iter = topicMap_.find(topicName);
|
||||
if (iter != topicMap_.end())
|
||||
{
|
||||
return iter->second->subscribe(std::move(handler));
|
||||
}
|
||||
auto topicPtr = std::make_shared<Topic<MessageType>>();
|
||||
auto id = topicPtr->subscribe(std::move(handler));
|
||||
topicMap_[topicName] = std::move(topicPtr);
|
||||
return id;
|
||||
}
|
||||
};
|
||||
} // namespace drogon
|
|
@ -337,11 +337,11 @@ std::string getUuid()
|
|||
return ret;
|
||||
#elif defined __FreeBSD__
|
||||
uuid_t *uuid = new uuid_t;
|
||||
char* binstr = (char *) malloc(16);
|
||||
char *binstr = (char *)malloc(16);
|
||||
uuidgen(uuid, 1);
|
||||
#if _BYTE_ORDER == _LITTLE_ENDIAN
|
||||
uuid_enc_le(binstr, uuid);
|
||||
#else /* _BYTE_ORDER != _LITTLE_ENDIAN */
|
||||
#else /* _BYTE_ORDER != _LITTLE_ENDIAN */
|
||||
uuid_enc_be(binstr, uuid);
|
||||
#endif /* _BYTE_ORDER == _LITTLE_ENDIAN */
|
||||
delete uuid;
|
||||
|
|
|
@ -5,6 +5,7 @@ add_executable(md5_unittest MD5Unittest.cpp ../lib/src/ssl_funcs/Md5.cc)
|
|||
add_executable(sha1_unittest SHA1Unittest.cpp ../lib/src/ssl_funcs/Sha1.cc)
|
||||
add_executable(ostringstream_unittest OStringStreamUnitttest.cpp)
|
||||
add_executable(base64_unittest Base64Unittest.cpp)
|
||||
add_executable(pubsubservice_unittest PubSubServiceUnittest.cpp)
|
||||
if(Brotli_FOUND)
|
||||
add_executable(brotli_unittest BrotliUnittest.cpp)
|
||||
endif()
|
||||
|
@ -16,7 +17,8 @@ set(UNITTEST_TARGETS
|
|||
md5_unittest
|
||||
sha1_unittest
|
||||
ostringstream_unittest
|
||||
base64_unittest)
|
||||
base64_unittest
|
||||
pubsubservice_unittest)
|
||||
if(Brotli_FOUND)
|
||||
set(UNITTEST_TARGETS ${UNITTEST_TARGETS} brotli_unittest)
|
||||
endif()
|
||||
|
|
|
@ -0,0 +1,25 @@
|
|||
#include <gtest/gtest.h>
|
||||
#include <drogon/PubSubService.h>
|
||||
#include <string>
|
||||
#include <iostream>
|
||||
|
||||
TEST(PubSubServiceTest, normal)
|
||||
{
|
||||
drogon::PubSubService<std::string> service;
|
||||
auto id=service.subscribe("topic1",
|
||||
[](const std::string &topic, const std::string &message) {
|
||||
EXPECT_STREQ(topic.c_str(), "topic1");
|
||||
EXPECT_STREQ(message.c_str(), "hello world");
|
||||
});
|
||||
service.publish("topic1", "hello world");
|
||||
service.publish("topic2", "hello world");
|
||||
EXPECT_EQ(service.size(), 1);
|
||||
service.unsubscribe("topic1", id);
|
||||
EXPECT_EQ(service.size(), 0);
|
||||
}
|
||||
|
||||
int main(int argc, char **argv)
|
||||
{
|
||||
testing::InitGoogleTest(&argc, argv);
|
||||
return RUN_ALL_TESTS();
|
||||
}
|
Loading…
Reference in New Issue