Modify the connection to the Mysql server

This commit is contained in:
antao 2018-11-27 13:56:12 +08:00
parent 6d5b1af726
commit 669c446656
3 changed files with 78 additions and 101 deletions

View File

@ -15,6 +15,7 @@
#include <drogon/utils/Utilities.h>
#include <regex>
#include <algorithm>
#include <poll.h>
using namespace drogon::orm;
@ -26,7 +27,7 @@ MysqlConnection::MysqlConnection(trantor::EventLoop *loop, const std::string &co
});
mysql_init(_mysqlPtr.get());
mysql_options(_mysqlPtr.get(), MYSQL_OPT_NONBLOCK, 0);
MYSQL *ret;
//Get the key and value
std::regex r(" *= *");
auto tmpStr = std::regex_replace(connInfo, r, "=");
@ -65,54 +66,44 @@ MysqlConnection::MysqlConnection(trantor::EventLoop *loop, const std::string &co
passwd = value;
}
}
_loop->queueInLoop([=]() {
MYSQL *ret;
_waitStatus = mysql_real_connect_start(&ret,
_mysqlPtr.get(),
host.empty() ? NULL : host.c_str(),
user.empty() ? NULL : user.c_str(),
passwd.empty() ? NULL : passwd.c_str(),
dbname.empty() ? NULL : dbname.c_str(),
port.empty() ? 3306 : atol(port.c_str()),
NULL,
0);
int status = mysql_real_connect_start(&ret,
_mysqlPtr.get(),
host.empty() ? NULL : host.c_str(),
user.empty() ? NULL : user.c_str(),
passwd.empty() ? NULL : passwd.c_str(),
dbname.empty() ? NULL : dbname.c_str(),
port.empty() ? 3306 : atol(port.c_str()),
NULL,
0);
auto fd = mysql_get_socket(_mysqlPtr.get());
_channelPtr = std::unique_ptr<trantor::Channel>(new trantor::Channel(loop, fd));
_channelPtr->setReadCallback([=]() {
handleRead();
auto fd = mysql_get_socket(_mysqlPtr.get());
_channelPtr = std::unique_ptr<trantor::Channel>(new trantor::Channel(loop, fd));
_channelPtr->setCloseCallback([=]() {
perror("sock close");
handleClosed();
});
_channelPtr->setEventCallback([=]() {
handleEvent();
});
setChannel();
});
_channelPtr->setWriteCallback([=]() {
handleWrite();
});
_channelPtr->setCloseCallback([=]() {
perror("sock close");
handleClosed();
});
_channelPtr->setErrorCallback([=]() {
perror("sock err");
handleClosed();
});
// LOG_TRACE << "channel index:" << _channelPtr->index();
// LOG_TRACE << "channel " << this << " fd:" << _channelPtr->fd();
_channelPtr->enableReading();
setChannel(status);
}
void MysqlConnection::setChannel(int status)
void MysqlConnection::setChannel()
{
_channelPtr->disableReading();
_channelPtr->disableWriting();
if (status & MYSQL_WAIT_READ)
_channelPtr->disableAll();
if ((_waitStatus & MYSQL_WAIT_READ) || (_waitStatus & MYSQL_WAIT_EXCEPT))
{
_channelPtr->enableReading();
}
if (status & MYSQL_WAIT_WRITE)
if (_waitStatus & MYSQL_WAIT_WRITE)
{
_channelPtr->enableWriting();
}
//(status & MYSQL_WAIT_EXCEPT) ///FIXME
if (status & MYSQL_WAIT_TIMEOUT)
if (_waitStatus & MYSQL_WAIT_TIMEOUT)
{
auto timeout = mysql_get_timeout_value(_mysqlPtr.get());
auto thisPtr = shared_from_this();
@ -121,58 +112,7 @@ void MysqlConnection::setChannel(int status)
});
}
}
void MysqlConnection::handleRead()
{
int status = 0;
status |= MYSQL_WAIT_READ;
MYSQL *ret;
if (_status != ConnectStatus_Ok)
{
status = mysql_real_connect_cont(&ret, _mysqlPtr.get(), status);
if (status == 0)
{
if (!ret)
{
LOG_ERROR << "Failed to mysql_real_connect()";
return;
}
_status = ConnectStatus_Ok;
LOG_TRACE << "connected!!!";
return;
}
else
{
setChannel(status);
}
}
}
void MysqlConnection::handleWrite()
{
LOG_TRACE << "channel index:" << _channelPtr->index();
int status = 0;
status |= MYSQL_WAIT_WRITE;
MYSQL *ret;
if (_status != ConnectStatus_Ok)
{
status = mysql_real_connect_cont(&ret, _mysqlPtr.get(), status);
if (status == 0)
{
if (!ret)
{
LOG_ERROR << "Failed to mysql_real_connect()";
return;
}
_status = ConnectStatus_Ok;
LOG_TRACE << "connected!!!";
return;
}
else
{
setChannel(status);
}
}
}
void MysqlConnection::handleClosed()
{
_status = ConnectStatus_Bad;
@ -191,22 +131,58 @@ void MysqlConnection::handleTimeout()
MYSQL *ret;
if (_status != ConnectStatus_Ok)
{
status = mysql_real_connect_cont(&ret, _mysqlPtr.get(), status);
if (status == 0)
_waitStatus = mysql_real_connect_cont(&ret, _mysqlPtr.get(), status);
if (_waitStatus == 0)
{
if (!ret)
{
handleClosed();
LOG_ERROR << "Failed to mysql_real_connect()";
return;
}
//I don't think the programe can run to here.
_status = ConnectStatus_Ok;
if (_okCb)
{
auto thisPtr = shared_from_this();
_okCb(thisPtr);
}
}
setChannel();
}
}
void MysqlConnection::handleEvent()
{
int status = 0;
auto revents = _channelPtr->revents();
if (revents & POLLIN)
status |= MYSQL_WAIT_READ;
if (revents & POLLOUT)
status |= MYSQL_WAIT_WRITE;
if (revents & POLLPRI)
status |= MYSQL_WAIT_EXCEPT;
status = (status & _waitStatus);
MYSQL *ret;
if (_status != ConnectStatus_Ok)
{
_waitStatus = mysql_real_connect_cont(&ret, _mysqlPtr.get(), status);
if (_waitStatus == 0)
{
if (!ret)
{
handleClosed();
perror("");
LOG_ERROR << "Failed to mysql_real_connect()";
return;
}
_status = ConnectStatus_Ok;
LOG_TRACE << "connected!!!";
return;
}
else
{
setChannel(status);
if (_okCb)
{
auto thisPtr = shared_from_this();
_okCb(thisPtr);
}
}
setChannel();
}
}
@ -219,5 +195,4 @@ void MysqlConnection::execSql(const std::string &sql,
const std::function<void(const std::exception_ptr &)> &exceptCallback,
const std::function<void()> &idleCb)
{
}

View File

@ -48,11 +48,13 @@ class MysqlConnection : public DbConnection, public std::enable_shared_from_this
private:
std::unique_ptr<trantor::Channel> _channelPtr;
std::shared_ptr<MYSQL> _mysqlPtr;
void handleRead();
void handleTimeout();
void handleWrite();
void handleClosed();
void setChannel(int status);
void handleEvent();
void setChannel();
int _waitStatus;
};
} // namespace orm

View File

@ -7,6 +7,6 @@ using namespace drogon::orm;
int main()
{
trantor::Logger::setLogLevel(trantor::Logger::TRACE);
auto clientPtr = DbClient::newMysqlClient("host= 127.0.0.1 port =3306 dbname= test user = root", 3);
auto clientPtr = DbClient::newMysqlClient("host= 127.0.0.1 port =3306 dbname= test user = root ", 5);
getchar();
}