266 lines
9.0 KiB
C++
266 lines
9.0 KiB
C++
#define DROGON_TEST_MAIN
|
|
#include <drogon/drogon_test.h>
|
|
#include <drogon/HttpAppFramework.h>
|
|
#include <drogon/config.h>
|
|
|
|
using namespace drogon;
|
|
using namespace trantor;
|
|
|
|
#if LIBPQ_SUPPORTS_BATCH_MODE
|
|
|
|
static int step = 0;
|
|
static std::vector<std::function<void()>> testSteps;
|
|
|
|
static auto runNextStep = [] {
|
|
LOG_INFO << "Step = " << step;
|
|
if (step < testSteps.size())
|
|
{
|
|
app().getIOLoop(0)->runAfter(0.5, testSteps[step++]);
|
|
}
|
|
else
|
|
{
|
|
testSteps.clear();
|
|
}
|
|
};
|
|
|
|
DROGON_TEST(PgPipelineTest)
|
|
{
|
|
auto testStep_init = [TEST_CTX]() {
|
|
auto clientPtr = app().getFastDbClient();
|
|
clientPtr->execSqlAsync(
|
|
"drop table if exists drogon_test_pipeline;",
|
|
[TEST_CTX](const drogon::orm::Result &r) {
|
|
LOG_INFO << "Create table drogon_test_pipeline.";
|
|
},
|
|
[TEST_CTX](const drogon::orm::DrogonDbException &e) {
|
|
FAULT("PgPipelineTest_init(0) what():" +
|
|
std::string(e.base().what()));
|
|
});
|
|
clientPtr->execSqlAsync(
|
|
"drop function if exists fn_drogon_test_pipeline;",
|
|
[TEST_CTX](const drogon::orm::Result &r) {
|
|
LOG_INFO << "Drop function fn_drogon_test_pipeline.";
|
|
},
|
|
[TEST_CTX](const drogon::orm::DrogonDbException &e) {
|
|
FAULT("PgPipelineTest_init(1) what():" +
|
|
std::string(e.base().what()));
|
|
});
|
|
|
|
clientPtr->execSqlAsync(
|
|
"create table drogon_test_pipeline"
|
|
"("
|
|
" id int primary key,"
|
|
" name text unique"
|
|
");",
|
|
[TEST_CTX](const drogon::orm::Result &r) {
|
|
LOG_INFO << "Create table drogon_test_pipeline.";
|
|
},
|
|
[TEST_CTX](const drogon::orm::DrogonDbException &e) {
|
|
FAULT("PgPipelineTest_init(2) what():" +
|
|
std::string(e.base().what()));
|
|
});
|
|
|
|
clientPtr->execSqlAsync(
|
|
"insert into"
|
|
" drogon_test_pipeline "
|
|
"values"
|
|
" (1, 'trantor'),"
|
|
" (2, 'drogon');",
|
|
[TEST_CTX](const drogon::orm::Result &r) {
|
|
LOG_INFO << "Insert data into drogon_test_pipeline.";
|
|
},
|
|
[TEST_CTX](const drogon::orm::DrogonDbException &e) {
|
|
FAULT("PgPipelineTest_init(3) what():" +
|
|
std::string(e.base().what()));
|
|
});
|
|
|
|
clientPtr->execSqlAsync(
|
|
"create function fn_drogon_test_pipeline(id int, name text) "
|
|
"returns void "
|
|
"language plpgsql "
|
|
"as $$ "
|
|
"begin "
|
|
" update drogon_test_pipeline t set name = $2 where t.id = $1;"
|
|
"end $$;",
|
|
[TEST_CTX](const drogon::orm::Result &r) {
|
|
LOG_INFO << "Insert data into drogon_test_pipeline.";
|
|
},
|
|
[TEST_CTX](const drogon::orm::DrogonDbException &e) {
|
|
FAULT("PgPipelineTest_init(4) what():" +
|
|
std::string(e.base().what()));
|
|
});
|
|
|
|
runNextStep();
|
|
};
|
|
|
|
auto testStep_allSuccess = [TEST_CTX]() {
|
|
auto clientPtr = app().getFastDbClient();
|
|
clientPtr->execSqlAsync(
|
|
"select 1",
|
|
[TEST_CTX](const drogon::orm::Result &r) {
|
|
MANDATE(r.size() == 1);
|
|
},
|
|
[TEST_CTX](const drogon::orm::DrogonDbException &e) {
|
|
FAULT("PgPipelineTest_allSuccess(0) what():" +
|
|
std::string(e.base().what()));
|
|
});
|
|
clientPtr->execSqlAsync(
|
|
"select * from drogon_test_pipeline",
|
|
[TEST_CTX](const drogon::orm::Result &r) {
|
|
MANDATE(r.size() == 2);
|
|
},
|
|
[TEST_CTX](const drogon::orm::DrogonDbException &e) {
|
|
FAULT("PgPipelineTest_allSuccess(1) what():" +
|
|
std::string(e.base().what()));
|
|
});
|
|
|
|
runNextStep();
|
|
};
|
|
|
|
auto testStep_testAbort = [TEST_CTX]() {
|
|
auto clientPtr = app().getFastDbClient();
|
|
clientPtr->execSqlAsync(
|
|
"select name from drogon_test_pipeline where id = $1",
|
|
[TEST_CTX](const drogon::orm::Result &r) {
|
|
MANDATE(r.size() == 1);
|
|
MANDATE(r[0][0].as<std::string>() == "trantor");
|
|
},
|
|
[TEST_CTX](const drogon::orm::DrogonDbException &e) {
|
|
FAULT("PgPipelineTest_testAbort(0) what():" +
|
|
std::string(e.base().what()));
|
|
},
|
|
1);
|
|
|
|
// Update name
|
|
// NOTICE: update in a function to bypass auto sync
|
|
// DO NOT USE like this in production
|
|
clientPtr->execSqlAsync(
|
|
"select * from fn_drogon_test_pipeline($1, $2)",
|
|
[TEST_CTX](const drogon::orm::Result &r) { SUCCESS(); },
|
|
[TEST_CTX](const drogon::orm::DrogonDbException &e) {
|
|
FAULT("PgPipelineTest_testAbort(1) what():" +
|
|
std::string(e.base().what()));
|
|
},
|
|
1,
|
|
"terminus");
|
|
|
|
// Check name
|
|
clientPtr->execSqlAsync(
|
|
"select name from drogon_test_pipeline where id = $1",
|
|
[TEST_CTX](const drogon::orm::Result &r) {
|
|
MANDATE(r.size() == 1);
|
|
MANDATE(r[0][0].as<std::string>() == "terminus");
|
|
},
|
|
[TEST_CTX](const drogon::orm::DrogonDbException &e) {
|
|
FAULT("PgPipelineTest_testAbort(2) what():" +
|
|
std::string(e.base().what()));
|
|
},
|
|
1);
|
|
|
|
// Cause an error
|
|
clientPtr->execSqlAsync(
|
|
"select 'abc'::int",
|
|
[TEST_CTX](const drogon::orm::Result &r) {
|
|
FAULT("PgPipelineTest_testAbort(3) should not pass");
|
|
},
|
|
[TEST_CTX](const drogon::orm::DrogonDbException &e) { SUCCESS(); },
|
|
1);
|
|
|
|
clientPtr->execSqlAsync(
|
|
"select 1",
|
|
[TEST_CTX](const drogon::orm::Result &r) {
|
|
FAULT("PgPipelineTest_testAbort(3) should not pass");
|
|
},
|
|
[TEST_CTX](const drogon::orm::DrogonDbException &e) { SUCCESS(); },
|
|
1);
|
|
|
|
runNextStep();
|
|
};
|
|
|
|
auto testStep_afterAbort = [TEST_CTX]() {
|
|
auto clientPtr = app().getFastDbClient();
|
|
// Should remain unchanged, because last sync point was rollback.
|
|
clientPtr->execSqlAsync(
|
|
"select name from drogon_test_pipeline where id = $1",
|
|
[TEST_CTX](const drogon::orm::Result &r) {
|
|
MANDATE(r.size() == 1);
|
|
MANDATE(r[0][0].as<std::string>() == "trantor");
|
|
},
|
|
[TEST_CTX](const drogon::orm::DrogonDbException &e) {
|
|
FAULT("PgPipelineTest_testAbort(0) what():" +
|
|
std::string(e.base().what()));
|
|
},
|
|
1);
|
|
|
|
runNextStep();
|
|
};
|
|
|
|
auto testStep_cleanup = [TEST_CTX]() {
|
|
auto clientPtr = app().getFastDbClient();
|
|
clientPtr->execSqlAsync(
|
|
"drop table drogon_test_pipeline;",
|
|
[TEST_CTX](const drogon::orm::Result &r) {
|
|
LOG_INFO << "Drop table drogon_test_pipeline.";
|
|
},
|
|
[TEST_CTX](const drogon::orm::DrogonDbException &e) {
|
|
FAULT("PgPipelineTest_cleanup(0) what():" +
|
|
std::string(e.base().what()));
|
|
});
|
|
|
|
clientPtr->execSqlAsync(
|
|
"drop function fn_drogon_test_pipeline;",
|
|
[TEST_CTX](const drogon::orm::Result &r) {
|
|
LOG_INFO << "Drop function fn_drogon_test_pipeline.";
|
|
},
|
|
[TEST_CTX](const drogon::orm::DrogonDbException &e) {
|
|
FAULT("PgPipelineTest_cleanup(1) what():" +
|
|
std::string(e.base().what()));
|
|
});
|
|
runNextStep();
|
|
};
|
|
|
|
testSteps = {std::move(testStep_init),
|
|
std::move(testStep_allSuccess),
|
|
std::move(testStep_testAbort),
|
|
std::move(testStep_afterAbort),
|
|
std::move(testStep_cleanup)};
|
|
|
|
runNextStep();
|
|
}
|
|
|
|
#endif
|
|
|
|
int main(int argc, char **argv)
|
|
{
|
|
std::promise<void> p1;
|
|
std::future<void> f1 = p1.get_future();
|
|
app().setThreadNum(1);
|
|
#if LIBPQ_SUPPORTS_BATCH_MODE
|
|
app().createDbClient( //
|
|
"postgresql", // dbType
|
|
"127.0.0.1", // host
|
|
5432, // port
|
|
"postgres", // dbname
|
|
"postgres", // username
|
|
"12345", // password
|
|
1, // connectionNum
|
|
"", // filename
|
|
"default", // name
|
|
true, // isFast
|
|
"", // charset
|
|
10, // timeout
|
|
true // autobatch
|
|
);
|
|
#endif
|
|
std::thread thr([&]() {
|
|
app().getLoop()->queueInLoop([&]() { p1.set_value(); });
|
|
app().run();
|
|
});
|
|
|
|
f1.get();
|
|
int testStatus = test::run(argc, argv);
|
|
app().getLoop()->queueInLoop([]() { app().quit(); });
|
|
thr.join();
|
|
return testStatus;
|
|
}
|