Keeper网络处理模型
TCPServer的创建和初始化
在keeper启动时,会根据配置文件中的listen_host标签个数创建对应的TCPServer以及HTTPServer等:
std::vector<std::string> listen_hosts = DB::getMultipleValuesFromConfig(config(), "", "listen_host");
1.初始化/创建TCPServer以及HTTPServer(这里省略别的server的创建逻辑):
for (const auto & listen_host : listen_hosts)
{
/// TCP Keeper
const char * port_name = "keeper_server.tcp_port";
createServer(listen_host, port_name, listen_try, [&](UInt16 port)
{
Poco::Net::ServerSocket socket;
auto address = socketBindListen(socket, listen_host, port);
socket.setReceiveTimeout(Poco::Timespan{tcp_receive_timeout, 0});
socket.setSendTimeout(Poco::Timespan{tcp_send_timeout, 0});
servers->emplace_back(
listen_host,
port_name,
"Keeper (tcp): " + address.toString(),
std::make_unique<TCPServer>(
new KeeperTCPHandlerFactory(
config_getter, global_context->getKeeperDispatcher(),
tcp_receive_timeout, tcp_send_timeout, false), server_pool, socket));
});
......
}
初始TCP Keeper server的时候有这么几个重要的类:
(1)TCPServer:主要作用是监听网络连接(内部会使用epoll / poll / select)
(2)KeeperTCPHandlerFactory:用来处理连接
(3)socket:服务端监听的端口号
2.启动阶段:初始化线程池,之后这个线程池也是TCPServer中的线程池
Poco::ThreadPool server_pool(3, config().getUInt("max_connections", 1024));
之后挨个启动创建好的网络服务器:
for (auto & server : *servers)
{
server.start();
LOG_INFO(log, "Listening for {}", server.getDescription());
}
这里主要说明下TCPServer的启动流程。
其中每个server的类型为:ProtocolServerAdapter,它就是一个适配器,对于不同的server会有不同的实现,TCPServer的实现为以下类:
可以看到最终调的还是TCPServer的start函数。
Clickhouse中的TCPServer继承Poco库中的TCPServer:
class TCPServer : public Poco::Net::TCPServer
{
public:
explicit TCPServer(
TCPServerConnectionFactory::Ptr factory,
Poco::ThreadPool & thread_pool,
Poco::Net::ServerSocket & socket,
Poco::Net::TCPServerParams::Ptr params = new Poco::Net::TCPServerParams,
const TCPServerConnectionFilter::Ptr & filter = nullptr);
/// Close the socket and ask existing connections to stop serving queries
void stop()
{
......
}
bool isOpen() const { return is_open; }
UInt16 portNumber() const { return port_number; }
const Poco::Net::ServerSocket& getSocket() { return socket; }
private:
TCPServerConnectionFactory::Ptr factory;
Poco::Net::ServerSocket socket;
std::atomic<bool> is_open;
UInt16 port_number;
};
所以最终走的还是Poco库中的TCPServer的start接口。
TCPServer的启动流程
这里,我们将目光聚焦于Poco库中的TCPServer的start接口。
void TCPServer::start()
{
poco_assert (_stopped);
_stopped = false;
_thread.start(*this);
}
_thread.start(*this)的实现,请记住这个target就是TCPServer,因为之后会调到它的run接口:
void Thread::start(Runnable& target)
{
startImpl(new RunnableHolder(target));
}
startImpl(new RunnableHolder(target))的实现,Foundation\src\Thread_POSIX.cpp下的
void ThreadImpl::startImpl(SharedPtr<Runnable> pTarget)
{
......
{
FastMutex::ScopedLock l(_pData->mutex);
_pData->pRunnableTarget = pTarget;
int errorCode;
if ((errorCode = pthread_create(&_pData->thread, &attributes, runnableEntry, this)))
{
_pData->pRunnableTarget = 0;
pthread_attr_destroy(&attributes);
throw SystemException(Poco::format("cannot start thread (%s)",
Error::getMessage(errorCode)));
}
}
......
}
可以看到最终使用pthread_create创建一个线程执行runnableEntry,runnableEntry是一个函数,会做以下事情,也就是调用Poco::Net::TCPServer的run接口:
void* ThreadImpl::runnableEntry(void* pThread)
{
......
try
{
pData->pRunnableTarget->run();
}
......
}
pData->pRunnableTarget在startImpl设置的:
所以总结一下:start接口就是创建一个线程去执行run接口,后续的其他类的start接口与之逻辑类似。
此时理一下TCPServer::run() [Net\src\TCPServer.cpp] 做了什么。
可以看到主要就是监听连接,如果连接被接受,就执行_pDispatcher->enqueue(ss)。
我们在创建TCPServer的时候,没有指定filter,所以_pConnectionFilter为nullptr。
先看一下:SocketImpl::poll((const Poco::Timespan& timeout, int mode)
根据POCO_HAVE_FD_EPOLL,POCO_HAVE_FD_POLL的设置采用不同的监听手段:
(1)epoll
Net\include\Poco\Net\Net.h
#if (POCO_OS == POCO_OS_LINUX) || (POCO_OS == POCO_OS_WINDOWS_NT) || (POCO_OS == POCO_OS_ANDROID)
#define POCO_HAVE_FD_EPOLL 1
#endif
(2)poll
Net\include\Poco\Net\Net.h
#if defined(POCO_OS_FAMILY_BSD)
#ifndef POCO_HAVE_FD_POLL
#define POCO_HAVE_FD_POLL 1
#endif
#endif
(3)select
监听到连接后,接受连接:
SocketImpl* SocketImpl::acceptConnection(SocketAddress& clientAddr)
{
if (_sockfd == POCO_INVALID_SOCKET) throw InvalidSocketException();
sockaddr_storage buffer;
struct sockaddr* pSA = reinterpret_cast<struct sockaddr*>(&buffer);
poco_socklen_t saLen = sizeof(buffer);
poco_socket_t sd;
do
{
sd = ::accept(_sockfd, pSA, &saLen);
}
while (sd == POCO_INVALID_SOCKET && lastError() == POCO_EINTR);
if (sd != POCO_INVALID_SOCKET)
{
clientAddr = SocketAddress(pSA, saLen);
return new StreamSocketImpl(sd);
}
error(); // will throw
return nullptr;
}
最后便是处理连接,
处理连接主要在TCPServerDispatcher::enqueue(const StreamSocket& socket)中进行
简短调用栈为:
hreadPool::startWithPriority() ->
PooledThread::start(...) ->
| getThread() ->
| createThread();
| pThread->start();
| Thread::start(Runnable& target) ->
| startImpl() ->
| PooledThread::run()
| _targetReady.wait();
| pTarget->run();
| _started.wait();
|
PooledThread::start() ->
_targetReady.set()
在pTarget->run()之前之前会卡在_targetReady.wait(),直到_targetReady.set()。
而pTarget就是TCPServerDispatcher,所以pTarget->run()即为TCPServerDispatcher::run(),
TCPServerDispatcher::run()主要来处理连接:
_pConnectionFactory即为KeeperTCPHandlerFactory,
KeeperTCPHandlerFactory会为连接创建一个处理线程KeeperTCPHandler,
TCPServerDispatcher::run()调用pConnection->start()使用1个线程来处理连接,pConnection也就是KeeperTCPHandler,而KeeperTCPHandler继承于Poco::Net::TCPServerConnection,所以KeeperTCPHandler的start就是Poco::Net::TCPServerConnection的start。
Poco::Net::TCPServerConnection的start参考以前的逻辑,最终会调用到KeeperTCPHandler的run接口,简短KeeperTCPHandler::run的调用栈为:
KeeperTCPHandler::run() ->
KeeperTCPHandler::runImpl()
KeeperTCPHandler处理连接
调用栈:
KeeperHandler监听两种fd:(1)socketfd(来自当前连接的请求)(2)来自KeeperRspT通过pipe的响应请求:
Keeper端通过KeeperRspT线程异步处理请求,所以当KeeperRspT处理完请求之后通过pipe将响应请求写到pipe的writeFd,之后KeeperHandler监听pipe的ReadFd。
TODO:KeeperTCPHandler线程处理请求逻辑。