本系列Netty源码解析文章基于 4.1.56.Final版本
10多年的随州网站建设经验,针对设计、前端、开发、售后、文案、推广等六对一服务,响应快,48小时及时工作处理。营销型网站建设的优势是能够根据用户设备显示端的尺寸不同,自动调整随州建站的显示方式,使网站能够适用不同显示终端,在浏览器中调整网站的宽度,无论在任何一种浏览器上浏览网站,都能展现优雅布局与设计,从而大程度地提升浏览体验。创新互联从事“随州网站设计”,“随州网站推广”以来,每个客户项目都认真落实执行。
在上篇文章《聊聊Netty那些事儿之从内核角度看IO模型》中我们花了大量的篇幅来从内核角度详细讲述了五种IO模型的演进过程以及ReactorIO线程模型的底层基石IO多路复用技术在内核中的实现原理。
最后我们引出了netty中使用的主从Reactor IO线程模型。
通过上篇文章的介绍,我们已经清楚了在IO调用的过程中内核帮我们搞了哪些事情,那么俗话说的好内核领进门,修行在netty,netty在用户空间又帮我们搞了哪些事情?
那么从本文开始,笔者将从源码角度来带大家看下上图中的Reactor IO线程模型在Netty中是如何实现的。
本文作为Reactor在Netty中实现系列文章中的开篇文章,笔者先来为大家介绍Reactor的骨架是如何创建出来的。
在上篇文章中我们提到Netty采用的是主从Reactor多线程的模型,但是它在实现上又与Doug Lea在Scalable IO in Java论文中提到的经典主从Reactor多线程模型有所差异。
Netty中的Reactor是以Group的形式出现的,主从Reactor在Netty中就是主从Reactor组,每个Reactor Group中会有多个Reactor用来执行具体的IO任务。当然在netty中Reactor不只用来执行IO任务,这个我们后面再说。
一个客户端SocketChannel只能分配给一个固定的Sub Reactor。一个Sub Reactor负责处理多个客户端SocketChannel,这样可以将服务端承载的全量客户端连接分摊到多个Sub Reactor中处理,同时也能保证客户端SocketChannel上的IO处理的线程安全性。
由于文章篇幅的关系,作为Reactor在netty中实现的第一篇我们主要来介绍主从Reactor Group的创建流程,骨架脉络先搭好。
下面我们来看一段Netty服务端代码的编写模板,从代码模板的流程中我们来解析下主从Reactor的创建流程以及在这个过程中所涉及到的Netty核心类。
/**
* Echoes back any received data from a client.
*/
public final class EchoServer {
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
public static void main(String[] args) throws Exception {
// Configure the server.
//创建主从Reactor线程组
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
final EchoServerHandler serverHandler = new EchoServerHandler();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)//配置主从Reactor
.channel(NioServerSocketChannel.class)//配置主Reactor中的channel类型
.option(ChannelOption.SO_BACKLOG, 100)//设置主Reactor中channel的option选项
.handler(new LoggingHandler(LogLevel.INFO))//设置主Reactor中Channel->pipline->handler
.childHandler(new ChannelInitializer() {//设置从Reactor中注册channel的pipeline
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});
// Start the server. 绑定端口启动服务,开始监听accept事件
ChannelFuture f = b.bind(PORT).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
首先我们要创建Netty最核心的部分 -> 创建主从Reactor Group,在Netty中EventLoopGroup就是Reactor Group的实现类。对应的EventLoop就是Reactor的实现类。
//创建主从Reactor线程组
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
创建用于IO处理的ChannelHandler,实现相应IO事件的回调函数,编写对应的IO处理逻辑。注意这里只是简单示例哈,详细的IO事件处理,笔者会单独开一篇文章专门讲述。
final EchoServerHandler serverHandler = new EchoServerHandler();
/**
* Handler implementation for the echo server.
*/
@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
................省略IO处理逻辑................
ctx.write(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}
创建ServerBootstrapNetty服务端启动类,并在启动类中配置启动Netty服务端所需要的一些必备信息。
在上篇文章介绍Socket内核结构小节中我们提到,在编写服务端网络程序时,我们首先要创建一个Socket用于listen和bind端口地址,我们把这个叫做监听Socket,这里对应的就是NioServerSocketChannel.class。当客户端连接完成三次握手,系统调用accept函数会基于监听Socket创建出来一个新的Socket专门用于与客户端之间的网络通信我们称为客户端连接Socket,这里对应的就是NioSocketChannel.class。
netty有两种Channel类型:一种是服务端用于监听绑定端口地址的NioServerSocketChannel,一种是用于客户端通信的NioSocketChannel。每种Channel类型实例都会对应一个PipeLine用于编排对应channel实例上的IO事件处理逻辑。PipeLine中组织的就是ChannelHandler用于编写特定的IO处理逻辑。
注意:serverBootstrap.handler设置的是服务端NioServerSocketChannel PipeLine中的ChannelHandler。
ChannelFuture f = serverBootstrap.bind(PORT).sync()这一步会是下篇文章要重点分析的主题Main Reactor Group的启动,绑定端口地址,开始监听客户端连接事件(OP_ACCEPT)。本文我们只关注创建流程。
f.channel().closeFuture().sync()等待服务端NioServerSocketChannel关闭。Netty服务端到这里正式启动,并准备好接受客户端连接的准备。
shutdownGracefully优雅关闭主从Reactor线程组里的所有Reactor线程。
在上篇文章中我们介绍了五种IO模型,Netty中支持BIO,NIO,AIO以及多种操作系统下的IO多路复用技术实现。
在Netty中切换这几种IO模型也是非常的方便,下面我们来看下Netty如何对这几种IO模型进行支持的。
首先我们介绍下几个与IO模型相关的重要接口:
EventLoop就是Netty中的Reactor,可以说它就是Netty的引擎,负责Channel上IO就绪事件的监听,IO就绪事件的处理,异步任务的执行驱动着整个Netty的运转。
不同IO模型下,EventLoop有着不同的实现,我们只需要切换不同的实现类就可以完成对NettyIO模型的切换。
在NIO模型下Netty会自动根据操作系统以及版本的不同选择对应的IO多路复用技术实现。比如Linux 2.6版本以上用的是Epoll,2.6版本以下用的是Poll,Mac下采用的是Kqueue。
其中Linux kernel 在5.1版本引入的异步IO库io_uring正在netty中孵化。
Netty中的Reactor是以Group的形式出现的,EventLoopGroup正是Reactor组的接口定义,负责管理Reactor,Netty中的Channel就是通过EventLoopGroup注册到具体的Reactor上的。
Netty的IO线程模型是主从Reactor多线程模型,主从Reactor线程组在Netty源码中对应的其实就是两个EventLoopGroup实例。
不同的IO模型也有对应的实现:
用于Netty服务端使用的ServerSocketChannel,对应于上篇文章提到的监听Socket,负责绑定监听端口地址,接收客户端连接并创建用于与客户端通信的SocketChannel。
不同的IO模型下的实现:
用于与客户端通信的SocketChannel,对应于上篇文章提到的客户端连接Socket,当客户端完成三次握手后,由系统调用accept函数根据监听Socket创建。
不同的IO模型下的实现:
我们看到在不同IO模型的实现中,Netty这些围绕IO模型的核心类只是前缀的不同:
我们只需要将IO模型的这些核心接口对应的实现类前缀改为对应IO模型的前缀,就可以轻松在Netty中完成对IO模型的切换。
我们通常在使用NIO模型的时候会使用Common列下的这些IO模型核心类,Common类也会根据操作系统的不同自动选择JDK在对应平台下的IO多路复用技术的实现。
而Netty自身也根据操作系统的不同提供了自己对IO多路复用技术的实现,比JDK的实现性能更优。比如:
我们编写Netty服务端程序的时候也可以根据操作系统的不同,采用Netty自身的实现来进一步优化程序。做法也很简单,直接将上图中红框里的实现类替换成Netty的自身实现类即可完成切换。
经过以上对Netty服务端代码编写模板以及IO模型相关核心类的简单介绍,我们对Netty的创建流程有了一个简单粗略的总体认识,下面我们来深入剖析下创建流程过程中的每一个步骤以及这个过程中涉及到的核心类实现。
以下源码解析部分我们均采用Common列下NIO相关的实现进行解析。
在Netty服务端程序编写模板的开始,我们首先会创建两个Reactor线程组:
//创建主从Reactor线程组
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
Netty中Reactor线程组的实现类为NioEventLoopGroup,在创建bossGroup和workerGroup的时候用到了NioEventLoopGroup的两个构造函数:
public class NioEventLoopGroup extends MultithreadEventLoopGroup {
/**
* Create a new instance using the default number of threads, the default {@link ThreadFactory} and
* the {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}.
*/
public NioEventLoopGroup() {
this(0);
}
/**
* Create a new instance using the specified number of threads, {@link ThreadFactory} and the
* {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}.
*/
public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
}
......................省略...........................
}
nThreads参数表示当前要创建的Reactor线程组内包含多少个Reactor线程。不指定nThreads参数的话采用默认的Reactor线程个数,用0表示。
最终会调用到构造函数。
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
下面简单介绍下构造函数中这几个参数的作用,后面我们在讲解本文主线的过程中还会提及这几个参数,到时在详细介绍,这里只是让大家有个初步印象,不必做过多的纠缠。
Reactor线程组NioEventLoopGroup负责创建Reactor线程,在创建的时候会将executor传入。
最终会将这些参数交给NioEventLoopGroup的父类构造器,下面我们来看下NioEventLoopGroup类的继承结构:
NioEventLoopGroup类的继承结构乍一看比较复杂,大家不要慌,笔者会随着主线的深入慢慢地介绍这些父类接口,我们现在重点关注Mutithread前缀的类。
我们知道NioEventLoopGroup是Netty中的Reactor线程组的实现,既然是线程组那么肯定是负责管理和创建多个Reactor线程的,所以Mutithread前缀的类定义的行为自然是对Reactor线程组内多个Reactor线程的创建和管理工作。
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(MultithreadEventLoopGroup.class);
//默认Reactor个数
private static final int DEFAULT_EVENT_LOOP_THREADS;
static {
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
}
}
/**
* @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor, Object...)
*/
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
...................省略.....................
}
MultithreadEventLoopGroup类主要的功能就是用来确定Reactor线程组内Reactor的个数。
默认的Reactor的个数存放于字段DEFAULT_EVENT_LOOP_THREADS中。
从static {}静态代码块中我们可以看出默认Reactor的个数的获取逻辑:
当nThread参数设置为0采用默认设置时,Reactor线程组内的Reactor个数则设置为DEFAULT_EVENT_LOOP_THREADS。
MultithreadEventExecutorGroup这里就是本小节的核心,主要用来定义创建和管理Reactor的行为。
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
//Reactor线程组中的Reactor集合
private final EventExecutor[] children;
private final SetreadonlyChildren;
//从Reactor group中选择一个特定的Reactor的选择策略 用于channel注册绑定到一个固定的Reactor上
private final EventExecutorChooserFactory.EventExecutorChooser chooser;
/**
* Create a new instance.
*
* @param nThreads the number of threads that will be used by this instance.
* @param executor the Executor to use, or {@code null} if the default should be used.
* @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call
*/
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
............................省略................................
}
首先介绍一个新的构造器参数EventExecutorChooserFactory chooserFactory。当客户端连接完成三次握手后,Main Reactor会创建客户端连接NioSocketChannel,并将其绑定到Sub Reactor Group中的一个固定Reactor,那么具体要绑定到哪个具体的Sub Reactor上呢?这个绑定策略就是由chooserFactory来创建的。默认为DefaultEventExecutorChooserFactory。
下面就是本小节的主题Reactor线程组的创建过程:
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
if (executor == null) {
//用于创建Reactor线程
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
children = new EventExecutor[nThreads];
//循环创建reaactor group中的Reactor
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
//创建reactor
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
................省略................
}
}
}
//创建channel到Reactor的绑定策略
chooser = chooserFactory.newChooser(children);
................省略................
SetchildrenSet = new LinkedHashSet (children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
在Netty Reactor Group中的单个Reactor的IO线程模型为上篇文章提到的单Reactor单线程模型,一个Reactor线程负责轮询注册其上的所有Channel中的IO就绪事件,处理IO事件,执行Netty中的异步任务等工作。正是这个Reactor线程驱动着整个Netty的运转,可谓是Netty的核心引擎。
而这里的executor就是负责启动Reactor线程的,从创建源码中我们可以看到executor的类型为ThreadPerTaskExecutor。
public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
this.threadFactory = ObjectUtil.checkNotNull(threadFactory, "threadFactory");
}
@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
}
我们看到ThreadPerTaskExecutor做的事情很简单,从它的命名前缀ThreadPerTask我们就可以猜出它的工作方式,就是来一个任务就创建一个线程执行。而创建的这个线程正是netty的核心引擎Reactor线程。
在Reactor线程启动的时候,Netty会将Reactor线程要做的事情封装成Runnable,丢给exexutor启动。
而Reactor线程的核心就是一个死循环不停的轮询IO就绪事件,处理IO事件,执行异步任务。一刻也不停歇,堪称996典范。
这里向大家先卖个关子,"Reactor线程是何时启动的呢??"
线程组NioEventLoopGroup包含多个Reactor,存放于private final EventExecutor[] children数组中。
所以下面的事情就是创建nThread个Reactor,并存放于EventExecutor[] children字段中,
我们来看下用于创建Reactor的newChild(executor, args)方法:
newChild方法是MultithreadEventExecutorGroup中的一个抽象方法,提供给具体子类实现。
protected abstract EventExecutor newChild(Executor executor, Object... args) throws Exception;
这里我们解析的是NioEventLoopGroup,我们来看下newChild在该类中的实现:
public class NioEventLoopGroup extends MultithreadEventLoopGroup {
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
}
}
前边提到的众多构造器参数,这里会通过可变参数Object... args传入到Reactor类NioEventLoop的构造器中。
这里介绍下新的参数EventLoopTaskQueueFactory queueFactory,前边提到Netty中的Reactor主要工作是轮询注册其上的所有Channel上的IO就绪事件,处理IO就绪事件。除了这些主要的工作外,Netty为了极致的压榨Reactor的性能,还会让它做一些异步任务的执行工作。既然要执行异步任务,那么Reactor中就需要一个队列来保存任务。
这里的EventLoopTaskQueueFactory就是用来创建这样的一个队列来保存Reactor中待执行的异步任务。
可以把Reactor理解成为一个单线程的线程池,类似于JDK中的SingleThreadExecutor,仅用一个线程来执行轮询IO就绪事件,处理IO就绪事件,执行异步任务。同时待执行的异步任务保存在Reactor里的taskQueue中。
public final class NioEventLoop extends SingleThreadEventLoop {
//用于创建JDK NIO Selector,ServerSocketChannel
private final SelectorProvider provider;
//Selector轮询策略 决定什么时候轮询,什么时候处理IO事件,什么时候执行异步任务
private final SelectStrategy selectStrategy;
/**
* The NIO {@link Selector}.
*/
private Selector selector;
private Selector unwrappedSelector;
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory queueFactory) {
super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
rejectedExecutionHandler);
this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
final SelectorTuple selectorTuple = openSelector();
this.selector = selectorTuple.selector;
this.unwrappedSelector = selectorTuple.unwrappedSelector;
}
}
这里就正式开始了Reactor的创建过程,我们知道Reactor的核心是采用的IO多路复用模型来对客户端连接上的IO事件进行监听,所以最重要的事情是创建Selector(JDK NIO 中IO多路复用技术的实现)。
可以把Selector理解为我们上篇文章介绍的Select,poll,epoll,它是JDK NIO对操作系统内核提供的这些IO多路复用技术的封装。
openSelector是NioEventLoop类中用于创建IO多路复用的Selector,并对创建出来的JDK NIO 原生的Selector进行性能优化。
首先会通过SelectorProvider#openSelector创建JDK NIO原生的Selector。
private SelectorTuple openSelector() {
final Selector unwrappedSelector;
try {
//通过JDK NIO SelectorProvider创建Selector
unwrappedSelector = provider.openSelector();
} catch (IOException e) {
throw new ChannelException("failed to open a new selector", e);
}
..................省略.............
}
SelectorProvider会根据操作系统的不同选择JDK在不同操作系统版本下的对应Selector的实现。Linux下会选择Epoll,Mac下会选择Kqueue。
下面我们就来看下SelectorProvider是如何做到自动适配不同操作系统下IO多路复用实现的。
public NioEventLoopGroup(ThreadFactory threadFactory) {
this(0, threadFactory, SelectorProvider.provider());
}
SelectorProvider是在前面介绍的NioEventLoopGroup类构造函数中通过调用SelectorProvider.provider()被加载,并通过NioEventLoopGroup#newChild方法中的可变长参数Object... args传递到NioEventLoop中的private final SelectorProvider provider字段中。
SelectorProvider的加载过程:
public abstract class SelectorProvider {
public static SelectorProvider provider() {
synchronized (lock) {
if (provider != null)
return provider;
return AccessController.doPrivileged(
new PrivilegedAction() {
public SelectorProvider run() {
if (loadProviderFromProperty())
return provider;
if (loadProviderAsService())
return provider;
provider = sun.nio.ch.DefaultSelectorProvider.create();
return provider;
}
});
}
}
}
从SelectorProvider加载源码中我们可以看出,SelectorProvider的加载方式有三种,优先级如下:
通过系统变量-D java.nio.channels.spi.SelectorProvider指定SelectorProvider的自定义实现类全限定名。通过应用程序类加载器(Application Classloader)加载。
通过SPI方式加载。在工程目录META-INF/services下定义名为java.nio.channels.spi.SelectorProvider的SPI文件,文件中第一个定义的SelectorProvider实现类全限定名就会被加载。
private static boolean loadProviderAsService() {
ServiceLoadersl =
ServiceLoader.load(网页标题:聊聊Netty那些事儿之Reactor在Netty中的实现(创建篇)
链接URL:http://www.mswzjz.cn/qtweb/news21/67871.html攀枝花网站建设、攀枝花网站运维推广公司-贝锐智能,是专注品牌与效果的网络营销公司;服务项目有等
声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 贝锐智能