一万年来谁著史,三千里外欲封侯。这篇文章主要讲述全流程分析Netty设计思路与实践相关的知识,希望能为你提供帮助。
1. 背景通过上一篇文章:深入理解NIO多路复用,了解到内核态通过事件通知+中断检测socket事件,用户态可以使用1个线程处理所有socket请求,时间复杂度为O(1)。看上去该IO方案已经很完美了,但是当连接数越来越多时,且活跃的连接越来越多时,比如10w+,单线程处理可能会很吃力。而Netty可以设置线程池处理socket事件,从而分摊单线程压力;同时,Netty框架封装了通用逻辑,大大方便了业务开发。本文将会分析Netty线程池调用关系,即reactor模型,直接了解Netty最核心的设计思想;同时还会优化下NIO Server代码,实现一个最简单的Netty服务端Demo;最后介绍下Netty常见问题。
2. Netty基本概念Netty通用代码:
创建两个线程池,分别用于处理ServerSocket事件和Socket事件;并指定ServerSocket和Socket发生事件时执行自定义类ServerHandler中的方法:
文章图片
Netty业务代码:
ServerHander定义了方法,当服务端接受到了客户端发送的数据时,调用channelRead方法处理数据;当socket/serverSocket注册到selector中时,调用channelRegistered:
文章图片
上述代码中,netty架构图如下所示:
文章图片
从Netty架构图中可以看到NioEventLoopGroup和pipeline是最重要的概念,后面将会从Netty工作流程详细分析这两个概念的实现思想。
3. Netty工作流程 3.1 创建bossGroup和workerGroup对象如下,bossGroup对应NioEventLoopGroup创建1个NioEventLoop,workerGroup创建10个NioEventLoop。每个NioEventLoop内部包含一个新的多路复用器Selector和线程,bossGroup的selector用于注册serverSocketChannel,workerGroup的selector用于注册socketChannel。在线程中则是处理selector注册的socket上发生的事件。
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup(10);
NioEventLoopGroup从子类到父类的初始化顺序为:NioEventLoopGroup -> MultithreadEventLoopGroup -> MultithreadEventExecutorGroup。
3.1.1 创建SelectorProvider
SelectorProvider是Selector多路复用的工厂类,用于创建Selector的实现类。NioEventLoopGroup初始化时,创建了SelectorProvider对象:
public NioEventLoopGroup(int nThreads, Executor executor)
this(nThreads, executor, SelectorProvider.provider());
SelectorProvider类通过rt.jar包中的sun.nio.ch.DefaultSelectorProvider类调用create方法,创建SelectorProvider实现:
public abstract class SelectorProvider
public static SelectorProvider provider()
synchronized (lock)
if (provider != null)
return provider;
return AccessController.doPrivileged(
new PrivilegedAction<
SelectorProvider>
()
public SelectorProvider run()
if (loadProviderFromProperty())
return provider;
if (loadProviderAsService())
return provider;
provider = sun.nio.ch.DefaultSelectorProvider.create();
return provider;
);
不同操作系统的jdk包中rt.jar包中DefaultSelectorProvider实现不同,例如mac os的create方法返回KQueueSelectorProvider对象:
public class DefaultSelectorProvider
private DefaultSelectorProvider() public static SelectorProvider create()
return new KQueueSelectorProvider();
linux操作系统rt.jar包的create方法返回EPollSelectorProvider对象:
public class DefaultSelectorProvider
private DefaultSelectorProvider() public static SelectorProvider create()
String var0 = (String)AccessController.doPrivileged(new GetPropertyAction("os.name"));
if (var0.equals("SunOS"))
return createProvider("sun.nio.ch.DevPollSelectorProvider");
else
return (SelectorProvider)(var0.equals("Linux") ? createProvider("sun.nio.ch.EPollSelectorProvider") : new PollSelectorProvider());
EPollSelectorProvider可以通过openSelector方法创建EPollSelectorImpl对象:
public class EPollSelectorProvider extends SelectorProviderImpl
public EPollSelectorProvider() public AbstractSelector openSelector() throws IOException
return new EPollSelectorImpl(this);
EPollSelectorImpl最底层封装了socket系统调用epoll_create、epoll_ctl,完成多路复用功能。
3.1.2 创建线程池
有了SelectorProvider,就可以创建线程执行器Executor了。线程池中每一个线程的创建动作由DefaultThreadFactory定义。Executor直接从线程池中使用一个线程:
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args)
//创建线程执行器,
if (executor == null)
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
//省略//创建线程池
protected ThreadFactory newDefaultThreadFactory()
return new DefaultThreadFactory(getClass());
线程池的初始化操作如下:
public class DefaultThreadFactory implements ThreadFactory
public DefaultThreadFactory(Class<
?>
poolType)
this(poolType, false, Thread.NORM_PRIORITY);
public DefaultThreadFactory(Class<
?>
poolType, boolean daemon, int priority)
this(toPoolName(poolType), daemon, priority);
public DefaultThreadFactory(String poolName, boolean daemon, int priority, ThreadGroup threadGroup)
ObjectUtil.checkNotNull(poolName, "poolName");
if (priority <
Thread.MIN_PRIORITY || priority >
Thread.MAX_PRIORITY)
throw new IllegalArgumentException(
"priority: " + priority + " (expected: Thread.MIN_PRIORITY <
= priority <
= Thread.MAX_PRIORITY)");
//使用统一的前缀作为线程名
prefix = poolName + - + poolId.incrementAndGet() + -;
this.daemon = daemon;
this.priority = priority;
this.threadGroup = threadGroup;
//可以调用newThread直接创建一个线程
public Thread newThread(Runnable r)
Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet());
try
if (t.isDaemon() != daemon)
t.setDaemon(daemon);
if (t.getPriority() != priority)
t.setPriority(priority);
catch (Exception ignored)
// Doesnt matter even if failed to set.return t;
定义了线程名前缀:
文章图片
后续创建线程时,使用线程名做前缀:
文章图片
ThreadPerTaskExecutor调用execute时,直接从线程池中创建一个新线程:
public final class ThreadPerTaskExecutor implements Executor
private final ThreadFactory threadFactory;
public ThreadPerTaskExecutor(ThreadFactory threadFactory)
this.threadFactory = ObjectUtil.checkNotNull(threadFactory, "threadFactory");
@Override
public void execute(Runnable command)
threadFactory.newThread(command).start();
3.1.3 封装线程池和Selector
通过创建SelectorProvider和Executor两个重要依赖后,就可以构造NioEventLoop了:
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args)
//创建线程池
if (executor == null)
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
children = new EventExecutor[nThreads];
//创建NioEventLoop,bossGroup指定1个NioEventLoop,workerGroup指定10个NioEventLoop
for (int i = 0;
i <
nThreads;
i ++)
boolean success = false;
try
//创建NioEventLoop
children[i] = newChild(executor, args);
success = true;
catch (Exception e)
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
finally
//省略chooser = chooserFactory.newChooser(children);
//省略//创建NioEventLoop的方法由NioEventLoopGroup类实现
protected abstract EventLoop newChild(Executor executor, Object... args) throws Exception;
NioEventLoopGroup实现了newChild方法,创建NioEventLoop对象:
public class NioEventLoopGroup extends MultithreadEventLoopGroup
protected EventLoop newChild(Executor executor, Object... args) throws Exception
EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
NioEventLoop中,通过openSelector()方法创建selector,也就是EPollSelectorImpl对象。
public final class NioEventLoop extends SingleThreadEventLoop
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory queueFactory)
super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
rejectedExecutionHandler);
this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
final SelectorTuple selectorTuple = openSelector();
this.selector = selectorTuple.selector;
this.unwrappedSelector = selectorTuple.unwrappedSelector;
private SelectorTuple openSelector()
final Selector unwrappedSelector;
try
//创建EPollSelectorImpl对象
unwrappedSelector = provider.openSelector();
catch (IOException e)
throw new ChannelException("failed to open a new selector", e);
//省略
return new SelectorTuple(unwrappedSelector);
3.2 NioEventLoopGroup总结NioEventLoopGroup包含多个NioEventLoop。每个NioEventLoop内部包含一个新的多路复用器Selector和线程,bossGroup的selector用于注册serverSocketChannel,workerGroup的selector用于注册socketChannel。每个NioEventLoop中,都包含一个Selector以及一个线程,线程暂时用ThreadPerTaskExecutor表示,执行ThreadPerTaskExecutor#executor就会创建NioEventLoop专属的线程。
3.3 创建启动类ServerBootstrap对象ServerBootstrap是启动类,将NioEventLoopGroup等参数传递到ServerBootstrap中,ServerBootstrap负责启动netty服务端。
3.3.1 指定SeverSocketChannel的实现类
指定NioserverSocketChannel作为netty的SeverSocketChannel实现类:
serverBootstrap.channel(NioServerSocketChannel.class);
NioServerSocketChannel的构造函数通过EPollSelectorProvider创建ServerSocketChannel对象
public class NioServerSocketChannel extends AbstractNioMessageChannel
implements io.netty.channel.socket.ServerSocketChannel
//DEFAULT_SELECTOR_PROVIDER就是EPollSelectorProvider对象
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
public NioServerSocketChannel()
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
private static ServerSocketChannel newSocket(SelectorProvider provider)
try
//通过EPollSelectorProvider的父类SelectorProviderImpl的openServerSocketChannel()方法创建ServerSocketChannel对象。
return provider.openServerSocketChannel();
catch (IOException e)
throw new ChannelException(
"Failed to open a server socket.", e);
public NioServerSocketChannel(ServerSocketChannel channel)
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
NioServerSocketChannel通过父类的AbstractNioChannel构造方法设置ServerSocketChannel为非阻塞:
public abstract class AbstractNioChannel extends AbstractChannel
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp)
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try
ch.configureBlocking(false);
catch (IOException e)
try
ch.close();
catch (IOException e2)
logger.warn(
"Failed to close a partially initialized socket.", e2);
throw new ChannelException("Failed to enter non-blocking mode.", e);
NioServerSocketChannel的父类AbstractChannel会为ServerSocketChannel创建对应的Unsafe和Pipeline,这个后面再展开:
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel
protected AbstractChannel(Channel parent)
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
protected abstract AbstractUnsafe newUnsafe();
protected DefaultChannelPipeline newChannelPipeline()
return new DefaultChannelPipeline(this);
3.3.2 配置handler
handler表示socket发生事件时,应该执行的操作。
serverBootstrap.option(ChannelOption.SO_BACKLOG, 128)
.handler(new ChannelInitializer<
ServerSocketChannel>
()
@Override
protected void initChannel(ServerSocketChannel ch) throws Exception
ch.pipeline().addLast(new ServerHandler());
)
.childHandler(new ChannelInitializer<
SocketChannel>
()
@Override
protected void initChannel(SocketChannel ch) throws Exception
ch.pipeline().addLast(new ServerHandler());
);
ServerBootstrap的父类AbstractBootstrap保存ServerSocketChannel对应的handler:
public abstract class AbstractBootstrap<
B extends AbstractBootstrap<
B, C>
, C extends Channel>
implements Cloneable
public B handler(ChannelHandler handler)
this.handler = ObjectUtil.checkNotNull(handler, "handler");
return self();
ServerBootstrap保存SocketChannel对应的childHander:
public class ServerBootstrap extends AbstractBootstrap<
ServerBootstrap, ServerChannel>
public ServerBootstrap childHandler(ChannelHandler childHandler)
this.childHandler = ObjectUtil.checkNotNull(childHandler, "childHandler");
return this;
3.4 netty服务端启动通过ServerBootstrap#bind方法启动netty服务端:
ChannelFuture future = serverBootstrap.bind(8080).sync();
3.4.1 创建ServerSocketChannel
调用ServerBootstrap的父类AbstractBootstrap的doBind方法,通过AbstractBootstrap#initAndRegister开始创建ServerSocketChannel:
public abstract class AbstractBootstrap<
B extends AbstractBootstrap<
B, C>
, C extends Channel>
implements Cloneable
private ChannelFuture doBind(final SocketAddress localAddress)
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null)
return regFuture;
//省略//创建ServerSocketChannel
final ChannelFuture initAndRegister()
Channel channel = null;
try
channel = channelFactory.newChannel();
init(channel);
catch (Throwable t)
//省略//省略
从上面的AbstractBootstrap#initAndRegister可以看到channelFactory#newChannel方法,它就调用了NioServerSocketChannel的构造函数,而NioServerSocketChannel构造函数里面就创建了ServerSocketChannel,并设置了非阻塞。
3.4.2 初始化pipeline
在创建完NioServerSocketChannel后,通过init方法,将主程序中定义的的Handler放到NioServerSocketChannel的pipeline中:
public class ServerBootstrap extends AbstractBootstrap<
ServerBootstrap, ServerChannel>
void init(Channel channel)
setChannelOptions(channel, newOptionsArray(), logger);
setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<
ChannelOption<
?>
, Object>
[] currentChildOptions;
synchronized (childOptions)
currentChildOptions = childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY);
final Entry<
AttributeKey<
?>
, Object>
[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY);
p.addLast(new ChannelInitializer<
Channel>
()
@Override
public void initChannel(final Channel ch)
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null)
pipeline.addLast(handler);
ch.eventLoop().execute(new Runnable()
@Override
public void run()
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
);
);
3.4.3 DefaultChannelPipeline插入元素
pipeline基于设计模式中的责任链模式。责任链模式为请求创建了一个处理对象的链。发起请求和具体处理请求的过程进行解耦:职责链上的处理者(Handler)负责处理请求,客户只需要将请求发送到职责链上即可,无须关心请求的处理细节和请求的传递。
当用户发起请求时,服务端逐步调用Inbound Handler,响应用户请求时,服务端逐步调用Outbound Handler。如下所示:
文章图片
在创建ServerSocketChannel时,创建了NioEventLoop对应的DefaultChannelPipeline对象,该pipeline专属于ServerSocketChannel。
如下可以看到,DefaultChannelPipeline就是一个链表结构,每次addLast方法插入一个handler,就将handler封装成DefaultChannelHandlerContext,加入到链表结尾:
public class DefaultChannelPipeline implements ChannelPipeline
final AbstractChannelHandlerContext head;
final AbstractChannelHandlerContext tail;
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler)
final AbstractChannelHandlerContext newCtx;
synchronized (this)
checkMultiplicity(handler);
newCtx = newContext(group, filterName(name, handler), handler);
addLast0(newCtx);
// If the registered is false it means that the channel was not registered on an eventLoop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
if (!registered)
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
//执行callHandlerAdded0方法
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop())
callHandlerAddedInEventLoop(newCtx, executor);
return this;
callHandlerAdded0(newCtx);
return this;
private void addLast0(AbstractChannelHandlerContext newCtx)
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
关于pipeline中handler调用链中的调用方式,后面再展开。
3.4.4 ServerSocketChannel的pipeline添加handler
在3.4.2节中,init方法会增加一个handler,通过addLast添加到ServerSocketChannel的pipeline中:
void init(Channel channel)
//省略
p.addLast(new ChannelInitializer<
Channel>
()
@Override
public void initChannel(final Channel ch)
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null)
pipeline.addLast(handler);
ch.eventLoop().execute(new Runnable()
@Override
public void run()
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
);
);
上面这个handler是netty自定义的handler,它重写了ChannelInitializer的initChannel方法,当ServerSocketChannel发生了初始化事件时,会调用ChannelInitializer的initChannel方法,它负责将用户自定义的ServerHandler加入pipeline中。随后将netty自定义的ServerBootstrapAcceptor类放到pipeline中,而ServerBootstrapAcceptor负责将ServerSocketChannel创建的SocketChannel注册到Selector中:
public class ServerBootstrap extends AbstractBootstrap<
ServerBootstrap, ServerChannel>
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter
public void channelRead(ChannelHandlerContext ctx, Object msg)
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
setChannelOptions(child, childOptions, logger);
setAttributes(child, childAttrs);
try
childGroup.register(child).addListener(new ChannelFutureListener()
@Override
public void operationComplete(ChannelFuture future) throws Exception
if (!future.isSuccess())
forceClose(child, future.cause());
);
catch (Throwable t)
forceClose(child, t);
3.4.5 准备将ServerSocketChannel注册到selector中
public abstract class AbstractBootstrap<
B extends AbstractBootstrap<
B, C>
, C extends Channel>
implements Cloneable
final ChannelFuture initAndRegister()
Channel channel = null;
try
//创建socketServerChannel
channel = channelFactory.newChannel();
init(channel);
catch (Throwable t)
if (channel != null)
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
//注册serverSocketChannel到selector中
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null)
if (channel.isRegistered())
channel.close();
else
channel.unsafe().closeForcibly();
return regFuture;
上述
config().group()
返回bossGroup,调用NioEventLoopGroup的父类MultithreadEventLoopGroup的register方法进行注册:public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup
public ChannelFuture register(Channel channel)
//注册serversocketchannel
return next().register(channel);
next方法表示要选择一个在NioEventLoopGroup中选择一个NioEventLoop对象,NioEventLoop包含Selctor和线程。MultithreadEventExecutorGroup通过DefaultEventExecutorChooserFactory工厂创建chooser对象,该对象用于决定如何选择NioEventLoop执行对应任务。注意,由于bossGroup只有一个NioEventLoop,因此只会挑选唯一的NioEventLoop。而对于workerGroup,chooser才会在10个EventLoopGroup轮询选择。
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup private final EventExecutorChooserFactory.EventExecutorChooser chooser;
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args)
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
public EventExecutor next()
return chooser.next();
如下所示,当NioEventLoopGroup的线程数为2的次方时,使用PowerOfTwoEventExecutorChooser实现;否则用GenericEventExecutorChooser实现。PowerOfTwoEventExecutorChooser通过位运算计算下一次轮询的NioEventLoop;GenericEventExecutorChooser通过算术运算计算下一次轮询的NioEventLoop。显然PowerOfTwoEventExecutorChooser效率更高:
public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();
private DefaultEventExecutorChooserFactory()@Override
public EventExecutorChooser newChooser(EventExecutor[] executors)
if (isPowerOfTwo(executors.length))
return new PowerOfTwoEventExecutorChooser(executors);
else
return new GenericEventExecutorChooser(executors);
private static boolean isPowerOfTwo(int val)
return (val &
-val) == val;
private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
PowerOfTwoEventExecutorChooser(EventExecutor[] executors)
this.executors = executors;
@Override
public EventExecutor next()
//位运算
return executors[idx.getAndIncrement() &
executors.length - 1];
private static final class GenericEventExecutorChooser implements EventExecutorChooser
private final AtomicLong idx = new AtomicLong();
private final EventExecutor[] executors;
GenericEventExecutorChooser(EventExecutor[] executors)
this.executors = executors;
@Override
public EventExecutor next()
//算术运算
return executors[(int) Math.abs(idx.getAndIncrement() % executors.length)];
挑选好要执行的NioEventLoop对象后,调用其父类SingleThreadEventLoop的register方法:
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop
public ChannelFuture register(final ChannelPromise promise)
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
其中,
promise.channel()
就是NioServerSocketChannel,调用它的父类AbstractChannel的unsafe方法,返回unsafe成员:public abstract class AbstractChannel extends DefaultAttributeMap implements Channel private final Unsafe unsafe;
public Unsafe unsafe()
return unsafe;
Unsafe成员是在NioServerSocketChannel初始化时创建的,调用newUnsafe方法创建:
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel
protected AbstractChannel(Channel parent)
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
//抽象方法,子类实现
protected abstract AbstractUnsafe newUnsafe();
newUnsafe由子类AbstractNioMessageChannel实现的unsafe()方法:
public abstract class AbstractNioMessageChannel extends AbstractNioChannel
protected AbstractNioUnsafe newUnsafe()
return new NioMessageUnsafe();
//NioMessageUnsafe定义,重写了读方法,后面的ServerSocketChannel的读操作就执行这个方法
private final class NioMessageUnsafe extends AbstractNioUnsafe private final List<
Object>
readBuf = new ArrayList<
Object>
();
@Override
public void read()
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try
try
do
int localRead = doReadMessages(readBuf);
if (localRead == 0)
break;
if (localRead <
0)
closed = true;
break;
allocHandle.incMessagesRead(localRead);
while (allocHandle.continueReading());
catch (Throwable t)
exception = t;
int size = readBuf.size();
for (int i = 0;
i <
size;
i ++)
readPending = false;
pipeline.fireChannelRead(readBuf.get(i));
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (exception != null)
closed = closeOnReadError(exception);
pipeline.fireExceptionCaught(exception);
if (closed)
inputShutdown = true;
if (isOpen())
close(voidPromise());
finally
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending &
&
!config.isAutoRead())
removeReadOp();
由于NioMessageUnsafe继承了AbstractNioUnsafe方法,因此执行AbstractNioUnsafe父类AbstractChannel中的register方法。register方法确保EventLoop线程启动,如果没有启动,就当场启动。在线程中执行register0方法。register0负责注册及handler添加操作等,后面逐步展开:
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel
protected abstract class AbstractUnsafe implements Unsafe
public final void register(EventLoop eventLoop, final ChannelPromise promise)
ObjectUtil.checkNotNull(eventLoop, "eventLoop");
if (isRegistered())
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
if (!isCompatible(eventLoop))
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
AbstractChannel.this.eventLoop = eventLoop;
//确定eventLoop线程已经启动,如果已经启动,在线程中执行register0方法
if (eventLoop.inEventLoop())
register0(promise);
else
try
//如果eventLoop线程没有启动,先启动线程,再在线程中执行register0方法
eventLoop.execute(new Runnable()
@Override
public void run()
register0(promise);
);
catch (Throwable t)
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: ",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
private void register0(ChannelPromise promise)
try
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise))
return;
boolean firstRegistration = neverRegistered;
//执行注册动作
doRegister();
neverRegistered = false;
registered = true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
//添加Handler
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
if (isActive())
if (firstRegistration)
pipeline.fireChannelActive();
else if (config().isAutoRead())
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
beginRead();
catch (Throwable t)
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
3.4.6 启动NioEventLoop线程
NioEventLoop线程为慢启动,当需要执行task时,才启动线程。为了保证ServerSocketChannel注册到Selector是在子线程中执行的,在调用register时,会判断NioEventLoop是否启动,如下:
if (eventLoop.inEventLoop())
register0(promise);
else
try
//如果eventLoop线程没有启动,先启动线程,再在线程中执行register0方法
eventLoop.execute(new Runnable()
@Override
public void run()
register0(promise);
);
catch (Throwable t)
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: ",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
NioEventLoop的父类SingleThreadEventExecutor维护了一个线程thread对象,默认情况下线程为null,传入的是main线程,因此初始情况下,inEventLoop返回false:
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor
private volatile Thread thread;
//输入参数为Thread.currentThread()
public boolean inEventLoop(Thread thread)
return thread == this.thread;
因此执行else分支。它封装了register0方法成为一个线程对象,传入NioEventLoop#execute方法中,
eventLoop.execute(new Runnable()
@Override
public void run()
register0(promise);
);
NioEventLoop的execute由其父类SingleThreadEventExecutor实现。先将注册任务加入任务队列中,然后启动线程:
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor
private void execute(Runnable task, boolean immediate)
boolean inEventLoop = inEventLoop();
//先将注册任务加入任务队列中
addTask(task);
if (!inEventLoop)
//启动线程
startThread();
if (isShutdown())
boolean reject = false;
try
if (removeTask(task))
reject = true;
catch (UnsupportedOperationException e)
// The task queue does not support removal so the best thing we can do is to just move on and
// hope we will be able to pick-up the task before its completely terminated.
// In worst case we will log on termination.if (reject)
reject();
if (!addTaskWakesUp &
&
immediate)
wakeup(inEventLoop);
通过Executor成员启动子线程,而Executor就是在创建NioEventLoop对象时指定的ThreadPerTaskExecutor,上面分析过,每次ThreadPerTaskExecutor#execute会执行
threadFactory.newThread(command).start();
命令启动新线程。线程内部如下:public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor private final Executor executor;
private void doStartThread()
assert thread == null;
executor.execute(new Runnable()
@Override
public void run()
//当前线程为子线程,赋值给thread成员,供下次inEventLoop()判断是否启动子线程
thread = Thread.currentThread();
if (interrupted)
thread.interrupt();
boolean success = false;
updateLastExecutionTime();
try
//执行SingleThreadEventExecutor#run方法
SingleThreadEventExecutor.this.run();
success = true;
catch (Throwable t)
logger.warn("Unexpected exception from an event executor: ", t);
finally
//省略);
run
方法由NioEventLoop实现,负责处理发生的事件。它是一个死循环,有两个重要方法,processSelectedKeys负责处理事件,runAllTasks处理事件:public final class NioEventLoop extends SingleThreadEventLoop
protected void run()
int selectCnt = 0;
for (;
;
)
try
int strategy;
try
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy)
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
// fall-through to SELECT since the busy-wait is not supported with NIOcase SelectStrategy.SELECT:
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L)
curDeadlineNanos = NONE;
// nothing on the calendarnextWakeupNanos.set(curDeadlineNanos);
try
//如果有task执行,就不阻塞select;否则阻塞等待socket事件
if (!hasTasks())
strategy = select(curDeadlineNanos);
finally
// This update is just to help block unnecessary selector wakeups
// so use of lazySet is ok (no race condition)
nextWakeupNanos.lazySet(AWAKE);
// fall through
default:catch (IOException e)
// If we receive an IOException here its because the Selector is messed up. Lets rebuild
// the selector and retry. https://github.com/netty/netty/issues/8566
rebuildSelector0();
selectCnt = 0;
handleLoopException(e);
continue;
selectCnt++;
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
boolean ranTasks;
//ioRatio表示处理SelectedKeys和执行tasks的事件占比
//如果ioRatio为100,则执行完所有task
if (ioRatio == 100)
try
if (strategy >
0)
processSelectedKeys();
finally
// Ensure we always run tasks.
ranTasks = runAllTasks();
else if (strategy >
0)
//如果ioRatio小于100,则只能花费一半处理SelectedKeys时间,去处理task
final long ioStartTime = System.nanoTime();
try
processSelectedKeys();
finally
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
else
//当ioRatio小于等于0,只执行一个task
ranTasks = runAllTasks(0);
// This will run the minimum number of tasksif (ranTasks || strategy >
0)
if (selectCnt >
MIN_PREMATURE_SELECTOR_RETURNS &
&
logger.isDebugEnabled())
logger.debug("Selector.select() returned prematurelytimes in a row for Selector .",
selectCnt - 1, selector);
selectCnt = 0;
else if (unexpectedSelectorWakeup(selectCnt))// Unexpected wakeup (unusual case)
selectCnt = 0;
catch (CancelledKeyException e)
// Harmless exception - log anyway
if (logger.isDebugEnabled())
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector- JDK bug?",
selector, e);
catch (Throwable t)
handleLoopException(t);
// Always handle shutdown even if the loop processing threw an exception.
try
if (isShuttingDown())
closeAll();
if (confirmShutdown())
return;
catch (Throwable t)
handleLoopException(t);
由于这时ServerSocketChannel还没有注册到Selector中,因此Selector中不可能有事件,这时就直接开始执行register0这个task。
3.4.7 将ServerSocketChannel注册到selector中
上面发现NioEventLoop线程执行的死循环,最开始执行register0方法:
private void register0(ChannelPromise promise)
try
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise))
return;
boolean firstRegistration = neverRegistered;
//注册
doRegister();
neverRegistered = false;
registered = true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
``safeSetSuccess(promise);
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
if (isActive())
if (firstRegistration)
pipeline.fireChannelActive();
else if (config().isAutoRead())
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
beginRead();
上述方法中,通过doRegister方法注册,很简单就可以看到这是将ServerSocketChannel注册到了Selector中:
public abstract class AbstractNioChannel extends AbstractChannel
protected void doRegister() throws Exception
boolean selected = false;
for (;
;
)
try
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
catch (CancelledKeyException e)
if (!selected)
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
else
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
3.4.8 执行ServerSocketChannel的pipeline中的initChannel方法
register0执行到
pipeline.invokeHandlerAddedIfNeeded();
最终执行到DefaultChannelPipeline.callHandlerAddedForAllHandlers()
,执行pendingHandlerCallbackHead链表中的Handler:public class DefaultChannelPipeline implements ChannelPipeline
private void callHandlerAddedForAllHandlers()
final PendingHandlerCallback pendingHandlerCallbackHead;
synchronized (this)
assert !registered;
// This Channel itself was registered.
registered = true;
pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
// Null out so it can be GCed.
this.pendingHandlerCallbackHead = null;
// This must happen outside of the synchronized(...) block as otherwise handlerAdded(...) may be called while
// holding the lock and so produce a deadlock if handlerAdded(...) will try to add another handler from outside
// the EventLoop.
PendingHandlerCallback task = pendingHandlerCallbackHead;
while (task != null)
task.execute();
task = task.next;
Handler是在前面代码中放入的。在NioServerSocketChannel初始化时,调用了init方法。它加入了一个netty实现的ChannelInitializer类,内部负责在pipeline中加入自定义的Handler和ServerBootstrapAcceptor这个Handler:
p.addLast(new ChannelInitializer<
Channel>
()
@Override
public void initChannel(final Channel ch)
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null)
pipeline.addLast(handler);
ch.eventLoop().execute(new Runnable()
@Override
public void run()
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
);
);
而在addLast方法中,就通过callHandlerCallbackLater方法将ChannelInitializer这个Handler加入到pendingHandlerCallbackHead中:
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler)
final AbstractChannelHandlerContext newCtx;
synchronized (this)
checkMultiplicity(handler);
newCtx = newContext(group, filterName(name, handler), handler);
addLast0(newCtx);
// If the registered is false it means that the channel was not registered on an eventLoop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
if (!registered)
newCtx.setAddPending();
//将ChannelInitializer这个Handler加入到pendingHandlerCallbackHead中
callHandlerCallbackLater(newCtx, true);
return this;
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop())
callHandlerAddedInEventLoop(newCtx, executor);
return this;
callHandlerAdded0(newCtx);
return this;
callHandlerCallbackLater方法将ChannelInitializer加入到链表中:
private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added)
assert !registered;
PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
PendingHandlerCallback pending = pendingHandlerCallbackHead;
if (pending == null)
pendingHandlerCallbackHead = task;
else
// Find the tail of the linked-list.
while (pending.next != null)
pending = pending.next;
pending.next = task;
最终,执行Handler的initChannel方法,如下:
public abstract class ChannelInitializer<
C extends Channel>
extends ChannelInboundHandlerAdapter
public void handlerAdded(ChannelHandlerContext ctx) throws Exception
if (ctx.channel().isRegistered())
// This should always be true with our current DefaultChannelPipeline implementation.
// The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering
// surprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers
// will be added in the expected order.
if (initChannel(ctx)) // We are done with init the Channel, removing the initializer now.
removeState(ctx);
而initChannel方法则是向pipeline中加入Handler:
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null)
//这里的Handler时main方法中自定义的Handler
pipeline.addLast(handler);
//在当前子线程中,封装一个task,task中负责将一个ServerBootstrapAcceptor放入pipeline中
ch.eventLoop().execute(new Runnable()
@Override
public void run()
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
);
在当前ChannelInitializer的方法没执行完前,又加入了一个自定义的Handler:
serverBootstrap.option(ChannelOption.SO_BACKLOG, 128)
.handler(new ChannelInitializer<
ServerSocketChannel>
()
@Override
protected void initChannel(ServerSocketChannel ch) throws Exception
ch.pipeline().addLast(new ServerHandler());
)
下一次handlerAdded方法会执行这个initChannel方法,又加入了ServerHandler。
通过
pipeline.invokeHandlerAddedIfNeeded();
调用,依次执行了netty自定义的ChannelInitializer和main线程中自定义的ChannelInitializer。在ChannelInitializer中,放入了负责处理数据的Handler。这些Handler放到了pipeline中。3.4.9 执行Pipeline中的handler的channelRegistered方法
调用完initChannel方法后,最后执行
pipeline.fireChannelRegistered()
,用于执行所有Handler中channelRegistered方法,fireChannelRegistered方法最终执行AbstractChannelHandlerContext#invokeChannelRegistered,开始执行pipeline中的handler实现的channelRegistered方法:abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint
private void invokeChannelRegistered()
if (invokeHandler())
try
((ChannelInboundHandler) handler()).channelRegistered(this);
catch (Throwable t)
invokeExceptionCaught(t);
else
fireChannelRegistered();
由于pipeline中加入了自定义的NettyHandler类。该类实现了channelRegistered方法。最终必定会执行该方法:
public class ServerHandler extends ChannelInboundHandlerAdapter
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception
System.out.println("channelRegistered");
super.channelRegistered(ctx);
执行到
super.channelRegistered(ctx)
,会尝试找下一个Handler执行。如下,继续执行invokeChannelRegistered方法:abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint
public ChannelHandlerContext fireChannelRegistered()
invokeChannelRegistered(findContextInbound(MASK_CHANNEL_REGISTERED));
return this;
其中,findContextInbound方法非常重要。他负责找下一个Handler:
abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint
private AbstractChannelHandlerContext findContextInbound(int mask)
AbstractChannelHandlerContext ctx = this;
EventExecutor currentExecutor = executor();
do
ctx = ctx.next;
while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_INBOUND));
return ctx;
最终,通过在每个channelRegistered方法最后,加入
super.channelRegistered(ctx)
语句,就可以执行完所有责任链中所有Handler的channelRegistered方法。对于其他方法,比如channelActive
、channelRead
,通过加入类似语句,同样可以执行完责任链中的所有相关方法。3.4.10 绑定端口
在上面ServerSocketChannel绑定到Selector后,且自定义的Handler和最重要的ServerBootstrapAcceptor这个Handler加入pipeline后,就可以将ServerSocketChannel绑定端口,提供服务了:
public abstract class AbstractBootstrap<
B extends AbstractBootstrap<
B, C>
, C extends Channel>
implements Cloneable
private ChannelFuture doBind(final SocketAddress localAddress)
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null)
return regFuture;
if (regFuture.isDone())
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
//绑定端口
doBind0(regFuture, channel, localAddress, promise);
return promise;
3.4.11 ServerSocketChannel接受客户端请求
上述register注册任务完成,并且bind完成后,NioEventLoop子线程也在进行死循环。只要task队列没有任务,就阻塞进行select,如果阻塞超时或者有socket事件发生,后面就调用processSelectedKeys方法处理socket事件:
protected void run()
int selectCnt = 0;
for (;
;
)
try
int strategy;
try
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy)
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
// fall-through to SELECT since the busy-wait is not supported with NIOcase SelectStrategy.SELECT:
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L)
curDeadlineNanos = NONE;
// nothing on the calendarnextWakeupNanos.set(curDeadlineNanos);
try
if (!hasTasks())
strategy = select(curDeadlineNanos);
finally
// This update is just to help block unnecessary selector wakeups
// so use of lazySet is ok (no race condition)
nextWakeupNanos.lazySet(AWAKE);
// fall through
default://省略selectCnt++;
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
boolean ranTasks;
if (ioRatio == 100)
try
if (strategy >
0)
//处理socket事件
processSelectedKeys();
finally
// Ensure we always run tasks.
ranTasks = runAllTasks();
else if (strategy >
0)
final long ioStartTime = System.nanoTime();
try
processSelectedKeys();
finally
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
else
ranTasks = runAllTasks(0);
// This will run the minimum number of tasks//省略
【全流程分析Netty设计思路与实践】NioEventLoop类中的select()方法就是通过EPollSelectorImpl的select()方法阻塞等待socket事件发生,当然会设置阻塞的超时时间:
public final class NioEventLoop extends SingleThreadEventLoop private S
推荐阅读
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- 小程序开发快速上手快速入门篇
- 硬件开发笔记(硬件开发基本流程,制作一个USB转RS232的模块:设计原理图)
- Redis底层与5大数据类型
- 防火墙基础之单臂路由和DHCP接口地址池的配置
- 3分钟带你搞懂Vue双向绑定原理及问题剖析
- SpringBoot和@Aspect实现自建Log日志功能
- 路由基础之链路聚合和DHCP全局的地址池的配置
- 使用Rust实现一个Brainfuck解释器
- openGauss多主机主备集群安装及CM体验