在Node.js中使用SO_RESUEPORT

前言:今天下载了Node.js最新版代码,并为Node.js的TCP模块增加了SO_RESUEPORT的能力,本文介绍一下具体的实现,关于SO_RESUEPORT的知识可以参考之前的文章或者网上文章。

1 Libuv

SO_RESUEPORT是操作系统内核提供的能力,所以第一步首先修改Libuv。考虑到操作系统兼容性的问题,目前只支持Linux系统,旧版Mac OS也支持相关属性但是效果不符合预期,新版Mac OS倒是支持,考虑到Node.js在几乎都是部署到Linux,所以可以先关注Linux内核。首先修改deps/uv/include/uv.h。

 
 
 
 
  1. enum uv_tcp_flags { 
  2.   UV_TCP_IPV6ONLY = 1, 
  3.   // 支持SO_RESUEPORT flags 
  4.   UV_TCP_REUSEPORT = 2 
  5.  
  6. }; 

接着修改deps/uv/src/unix/tcp.c。

 
 
 
 
  1. #if defined(SO_REUSEPORT) && defined(__linux__)  
  2.   on = 1; 
  3.   if ((flags & UV_TCP_REUSEPORT) && setsockopt(tcp->io_watcher.fd, SOL_SOCKET, SO_REUSEPORT, &on, sizeof(on))) 
  4.     return UV__ERR(errno); 
  5. #endif 

这里判断一下是否有两个宏,有的话才能使用SO_RESUEPORT。如果支持则通过setsockopt设置socket的SO_REUSEPORT标记,这是最核心的逻辑。

2 修改C++层

修改完底层的Libuv后,继续修改C++层,因为这是一个可选的属性,所以我们需要增加相关的逻辑。修改src/tcp_wrap.cc。首先导出一个新的常量

 
 
 
 
  1. #if defined(SO_REUSEPORT) && defined(__linux__)  
  2.  NODE_DEFINE_CONSTANT(constants, UV_TCP_REUSEPORT); 
  3.  
  4. #endif 

在JS层可以通过判断是否导出了这个常量来判断系统是否支持SO_RESUEPORT。接着修改bind函数,因为我们再bind的时候可以设置SO_RESUEPORT。

 
 
 
 
  1. template  
  2.  
  3. void TCPWrap::Bind( 
  4.  
  5.     const FunctionCallbackInfo& args, 
  6.     int family, 
  7.     std::function uv_ip_addr) { 
  8.   TCPWrap* wrap; 
  9.   ASSIGN_OR_RETURN_UNWRAP(&wrap, 
  10.                           args.Holder(), 
  11.                           args.GetReturnValue().Set(UV_EBADF)); 
  12.   Environment* env = wrap->env(); 
  13.   node::Utf8Value ip_address(env->isolate(), args[0]); 
  14.   int port; 
  15.   unsigned int flags = 0; 
  16.   if (!args[1]->Int32Value(env->context()).To(&port)) return; 
  17.   // ipv6支持ipv6Only和SO_RESUEPORT 
  18.   if (family == AF_INET6 && 
  19.       !args[2]->Uint32Value(env->context()).To(&flags)) { 
  20.     return; 
  21.   // ipv4之前是不支持任何标记的,这里需要加上这个逻辑,因为我们需要支持SO_RESUEPORT 
  22.   } else if (family == AF_INET4 && 
  23.       !args[2]->Uint32Value(env->context()).To(&flags)) { 
  24.     return; 
  25.   } 
  26.  
  27.   T addr; 
  28.   int err = uv_ip_addr(*ip_address, port, &addr); 
  29.  
  30.   if (err == 0) { 
  31.     err = uv_tcp_bind(&wrap->handle_, 
  32.                       reinterpret_cast(&addr), 
  33.                       flags); 
  34.   } 
  35.   args.GetReturnValue().Set(err); 
  36.  

C++主要是完成透传flags的逻辑。

3 修改JS层

修改JS层是最复杂的地方,主要是为了应用层的兼容性问题。也就是说如果Node.js真的支持了SO_RESUEPORT,在某些平台不支持SO_RESUEPORT的情况下,我们如何能保证我们的代码能在各个平台上跑。简单来说,如果我们平台支持SO_RESUEPORT,我们可以开启多个子进程,然后分别执行以下代码。

 
 
 
 
  1. const http = require('http'); 
  2. http.createServer((req, res) => { 
  3.     res.end('hello'); 
  4.  
  5. }) 
  6.  
  7. .listen({port: 8000, reuseport: true}); 

这时候,只需要修改一下Node.js的net.js,把reuseport标记传到C++层再传到Libuv就行,但是问题是,如果我们这样写代码,就无法在不支持SO_RESUEPORT的平台跑了,因为会导致重复监听端口的错误。所以为了兼容性,我想的方案是利用Cluster模块,目前Cluster模块支持轮询和共享两种模式,那么我们再加一种reuseport模式就好了,这样的好处是一旦我们平台不支持SO_RESUEPORT,我们可以降级到Node.js现在到模式。我们知道Cluster模块的原理有两种,一种是主进程监听,分发连接给子进程,另一种是主进程创建socket,通过文件描述符传递的方式传给子进程,所有的进程都是共享一个socket的。下面我们看看怎么做。首先修改lib/internal/cluster/primary.js。

 
 
 
 
  1. // 增加这if的逻辑 
  2. if ((message.addressType === 4 ||  
  3.  message.addressType === 6) &&  
  4.  (message.flags & TCPConstants.UV_TCP_REUSEPORT)) { 
  5.  handle = new ReusePort(key, address, message); 
  6. } else if (schedulingPolicy !== SCHED_RR || 
  7.     message.addressType === 'udp4' || 
  8.     message.addressType === 'udp6') { 
  9.   handle = new SharedHandle(key, address, message); 
  10. } else { 
  11.   handle = new RoundRobinHandle(key, address, message); 

我们在queryServer函数里增加了一个if的逻辑。如果addressType是4或6说明是TCP协议,并且设置了UV_TCP_REUSEPORT(listen的时候传入),就会走到reuseport的逻辑,剩下的两个else是目前Node.js的逻辑。我们看看ReusePort.js做了什么。

 
 
 
 
  1. 'use strict'; 
  2.  
  3. const assert = require('internal/assert'); 
  4.  
  5. const net = require('net'); 
  6.  
  7. const { constants: TCPConstants } = internalBinding('tcp_wrap'); 
  8.  
  9.  
  10. module.exports = ReusePort; 
  11.  
  12. function ReusePort(key, address, {port, addressType, fd, flags}) { 
  13.   this.key = key; 
  14.   this.workers = []; 
  15.   this.handles = []; 
  16.   this.list = [address, port, addressType, fd, flags]; 
  17.  
  18.  
  19.  
  20. ReusePort.prototype.add = function(worker, send) { 
  21.   assert(!this.workers.includes(worker)); 
  22.   const rval = net._createServerHandle(...this.list); 
  23.   let errno; 
  24.   let handle; 
  25.   if (typeof rval === 'number') 
  26.     errno = rval; 
  27.   else 
  28.     handle = rval; 
  29.   this.workers.push(worker); 
  30.   this.handles.push(handle); 
  31.   send(errno, null, handle); 
  32.  
  33. }; 
  34.  
  35.  
  36. ReusePort.prototype.remove = function(worker) { 
  37.   const index = this.workers.indexOf(worker); 
  38.  
  39.   if (index === -1) 
  40.     return false; // The worker wasn't sharing this handle. 
  41.  
  42.   this.workers.splice(index, 1); 
  43.   this.handles[index].close(); 
  44.   this.handles.splice(index, 1); 
  45.   return true; 
  46.  
  47. }; 

上面的代码我们只需要关注net._createServerHandle。在不能多个进程同时监听同一个端口的情况下,Node.js只会调net._createServerHandle创建一个socket,然后多个进程共享。而我们这里会给每个进程创建一个socket。这个socket就是在子进程调用queryServer的时候返回给子进程的。剩下的逻辑我们暂时不用关注。最后看一下_createServerHandle的逻辑。

 
 
 
 
  1. const handle = new TCP(TCPConstants.SERVER); 
  2.  
  3. if (addressType === 6) { 
  4.  
  5.   err = handle.bind6(address, port, flags);}  
  6.  
  7. else { 
  8.  
  9.   err = handle.bind(address, port, flags || 0); 
  10.  

_createServerHandle的逻辑是创建一个socket并且给socket绑定IP和端口,我们看到这里会给C++层传入flags,C++层就会传到LIbuv了,这样我们就完成了整个过程,整体的流程如下。

1 子进程执行listen的时候,传入reuseport为true

2 子进程通过进程间通信请求主进程

3 主进程返回一个新的socket并绑定到对应的地址

4 子进程执行listen启动服务器。

4 使用

接下来我们看看如何使用,首先创建一个server.js。

 
 
 
 
  1. const cluster = require('cluster'); 
  2.  
  3. const os = require('os'); 
  4.  
  5. const http = require('http'); 
  6.  
  7. const cpus = os.cpus().length; 
  8.  
  9.  
  10.  
  11.  if (cluster.isPrimary) { 
  12.  
  13.   const map = {}; 
  14.   for (let i = 0; i < cpus; i++) { 
  15.     const worker = cluster.fork(); 
  16.     map[worker.process.pid] = 0; 
  17.     worker.on('message', (pid) => { 
  18.         map[pid]++; 
  19.     }); 
  20.   } 
  21.  
  22.   process.on('SIGINT', () => { 
  23.     console.log(map); 
  24.   }); 
  25.  
  26. } else { 
  27.  
  28.   http.createServer((req, res) => { 
  29.       process.send(process.pid); 
  30.       res.end('hello'); 
  31.   }) 
  32.  
  33.   .listen({reuseport: true, port: 8000}); 
  34.  

再创建一个客户端client.js

 
 
 
 
  1. const http = require('http'); 
  2. function connect() { 
  3.     setTimeout(() => { 
  4.         http.get('http://localhost:8000/', (res) => { 
  5.             console.log(res.statusCode); 
  6.             connect(); 
  7.         }); 
  8.     }, 50); 
  9.  
  10.  
  11. connect(); 

客户端串行访问服务器,我们看到使用方式和目前Node.js的Cluster使用一样。即使我们把reuseport改成false或者其他平台跑也没问题,效果如下

我们看到在reuseport的情况下,负载还是挺均衡的。

后记:目前是通过listen的时候传入参数去控制是否开启SO_RESUEPORT的,后续可以增加通过设置cluster.schedulingPolicy的方式,和目前共享、轮询模式对齐,考虑到Cluster模块不是必须,因为我们可以直接用子进程模块监听同一个端口。所以通过listen函数去控制是非常必要的。目前通过修改Node.js内核大概体验了一下SO_RESUEPORT,后续review和改进一下代码。

文章标题:在Node.js中使用SO_RESUEPORT
标题链接:http://www.mswzjz.cn/qtweb/news40/385490.html

温江区贝锐智能技术服务部_成都网站建设公司,为您提供响应式网站关键词优化网站导航域名注册网站制作网站收录

广告

声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 贝锐智能