前言
在libuv中,我想在线程A绑定端口,只负责监听,uv_accept接收新连接,将接收到的client, 放到线程B中使用另外一个loop。然而,发现accept一个client, 并没有办法将client放到另外的loop中。 于是开始寻找解决办法, 本文使用uv_write2间接实现了想要的效果。
一、问题描述
在github和stackoverflow社区上,2016年时候,已经有人遇到几乎完全相同的问题。
uv_accept in one thread and uv_read_start from other thread
How to use uv_accept in multithread environment?
libuv官方开发人员指出,没有直接的办法实现这个需求, 但是可以间接实现这个要求。
There is no way to accept a client on another loop. There are a number of things you can do:
- create the listener handle, then create X new loops, send the handle to them using uv_write2 and listen in all of them
- if you’re on Linux (> 3.9 IIRC), create X new loops and bind a handle to the same IP and port using SO_REUSEPORT
- have a single loop accept all incoming connections and send them to other loops using uv_write2.
看上去,只有第三个可以间接实现这个需求。使用一个loop监听accept所有链接,将收到的链接client,通过uv_write2间接发送给其他loop。
二、accept多线程替代方案
在官方例子中,multi-echo-server恰好实现了类似的需求。 不同的是,demo是将在父进程接收新的链接,将到来的接连client,通过uv_write2发送子进程, 子进程将接收到的client加入到自己的loop中。
但我们将接收到新的连接client,发到另外一个线程,而不是进程。为了满足这个需求,因此阅读了例子源码。
参考multi-echo-server, 这个例子内容挺多,挺复杂的,但是其主要原理大致如下:
首先使用socketpair创建一对已链接套接字,假设pfds, 用于父子进程之间通信。
父进程监听指定端口,将uv_accept收到的链接client,以pfds方式用uv_write2通知子进程。
- 内部uv_write2传递client到子进程,本质上是使用sendmsg传递真正的fd(内部包含真正fd), 即父进程sendmsg发送fd到子进程,内核会将fd所对应的文件结构拷贝一份到子进程中。
- 子进程使用pfds等待父进程消息。有消息到来,会使用uv_accept接收client, 然后将client加入到自己进程的loop中。 uv_accept接收client,由于pfds初始化指明接收这个消息是以ipc形式,因此uv_accept本质上使用recvmsg接收父进程的fd,将其封装为client。
这样就成功将父进程监听到的新连接client, 传递到子进程中,并且加入子进程自己的loop中。
通过分析,可以简单将其多进程改为多线程方式即可,需要用到socketpair,将创建一对连接用于线程之间的通信。需要注意点:
- 可以使用内部接口uv__make_socketpair, 间接调用socketpair创建一对已连接的socket. 用于线程通信。demo中包含uv_ex.h文文件, 其中声明uv_make_socektpair包含内部uv__make_socketpair接口。
- 用于线程通信的句柄handle,必须指定ipc,即初始化参数指定1, 确保内部会使用sendmsg和recvmsg传递fd.
- 线程之间传递fd同样是使用sendmsg和recvmsg, 因此,不能确保fd值相同,但是fd所对应的文件节点是相同的。
我的实现代码,参考multi-echo-server
1 |
|
三、libuv实例源码解析
实例multi-echo-server大致分析。
- 父进程使用setup_workers安装子进程任务,然后监听新的连接到来, 回调on_new_connection
1 | int main() { |
当有新的连接到来,调用uv_accept,将连接client通过uv_write2发送给子进程(通过socketpair建立的IO),
在uv_write2内部会将收到client的fd以sendmsg发送给子进程。
1 | void on_new_connection(uv_stream_t *server, int status) { |
- setup_workers方法安装启动多个子进程,每个子进程都使用socketpair创建IO机制跟父进程通信。
1 | workers = calloc(sizeof(struct child_worker), cpu_count); |
- 重点理解uv_spawn函数(内容很多,有点复杂),主要是内部fork出子进程使用execvp执行worker程序,使用socketpair创建IO机制,并且一端绑定父进程,一端绑定子进程,并且重定向子进程的IO, 使得父进程向worker->pipe写入数据, worker程序的从标准输入即可读取。源码过多,不展示。
- 子进程主要是监听父进程的信息。uv_accept收到消息,内部会使用recvmsg将其封装句柄client中,然后添加到自己的loop.
1 | void on_new_connection(uv_stream_t *q, ssize_t nread, const uv_buf_t *buf) { |
四、总结
本次实现利用了libuv提供的接口uv_write2,将接收到的描述符fd,传递到另外地方,可以是进程,也可以是线程。然后接收端再调用accept封装成句柄添加到另外loop中。
顺便记录,几处代码阅读稍微卡住。
- 对于指针赋值,不够敏感。一直在寻找socketpair一端信息,如何绑定到父进程的pipe中。 检查之后,才发现原来child_stdio[0].data.stream已经获取了pipe地址。
- 对描述符属性UV_O_CLOEXEC,不够敏感。 使用UV__O_CLOEXEC创建了一对管道描述符,然后fork. 那么当子进程执行execvp系列函数, 对应的描述符会关闭。如果父进程有监听此描述符,那么会收到结束标志EOF, 即read正常返回0.