前一篇文章分析了netty的服务端流程,接下来分析一下客户端的大致流程,客户端启动代码如下
ClientBootstrap bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipleline = pipeline();
pipleline.addLast("encode", new ObjectEncoder(1048576 * 16));
pipleline.addLast("decode", new ObjectDecoder(1048576 * 16, ClassResolvers.weakCachingConcurrentResolver(null)));
pipleline.addLast("handler", handler);
return pipleline;
}
});
bootstrap.setOption("receiveBufferSize", 1048576 * 64);
bootstrap.setOption("child.tcpNoDelay", true); //关闭Nagle算法
//tcp定期发送心跳包 比如IM里边定期探测对方是否下线
//只有tcp长连接下才有意义
// bootstrap.setOption("child.keepAlive", true);
ChannelFuture future = bootstrap.connect(new InetSocketAddress(address, port));
Channel channel = future.awaitUninterruptibly().getChannel();
客户端事件处理顺序如下:
UpStream.ChannelState.OPEN(已经open)–>DownStream.ChannelState.BOUND(需要绑定)——>DownStream.CONNECTED(需要连接)—–>UpStream.ChannelState.BOUND(已经绑定)——->UpStream.CONNECTED(连接成功)
在connect的时候做了如下处理
public ChannelFuture connect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
if (remoteAddress == null) {
throw new NullPointerException("remoteAddress");
}
ChannelPipeline pipeline;
try {
pipeline = getPipelineFactory().getPipeline();
} catch (Exception e) {
throw new ChannelPipelineException("Failed to initialize a pipeline.", e);
}
// Set the options.先创建Channel
Channel ch = getFactory().newChannel(pipeline);
ch.getConfig().setOptions(getOptions());
// Bind.
if (localAddress != null) {
ch.bind(localAddress);
}
// Connect. 再进行连接
return ch.connect(remoteAddress);
}
首先要创建出Channel
NioClientSocketChannel(
ChannelFactory factory, ChannelPipeline pipeline,
ChannelSink sink, NioWorker worker) {
super(null, factory, pipeline, sink, newSocket(), worker);
fireChannelOpen(this);
}
紧接着会fire一个ChannelOpen事件,
if (channel.getParent() != null) {
fireChildChannelStateChanged(channel.getParent(), channel);
}
channel.getPipeline().sendUpstream(
new UpstreamChannelStateEvent(
channel, ChannelState.OPEN, Boolean.TRUE));
这样会出发Upstream的ChannelState.OPEN事件。
接下来要继续connect了
if (remoteAddress == null) {
throw new NullPointerException("remoteAddress");
}
ChannelFuture future = future(channel, true);
channel.getPipeline().sendDownstream(new DownstreamChannelStateEvent(
channel, future, ChannelState.CONNECTED, remoteAddress));
return future;
这样就会出发Downstream的ChannelState.CONNECTED事件。
接下来就要由NioClientSocketPipelineSink来进行处理了
switch (state) {
case OPEN:
if (Boolean.FALSE.equals(value)) {
channel.worker.close(channel, future);
}
break;
case BOUND:
if (value != null) {
bind(channel, future, (SocketAddress) value);
} else {
channel.worker.close(channel, future);
}
break;
case CONNECTED:
if (value != null) {
connect(channel, future, (SocketAddress) value);
} else {
channel.worker.close(channel, future);
}
break;
case INTEREST_OPS:
channel.worker.setInterestOps(channel, future, ((Integer) value).intValue());
break;
下面看下channel注册到worker的代码,连接的时候是在内部的一个Boss类里处理的
所有的连接connect操作都被封装成一个RegisterTask对象,Boss类持有registerTask队列,在loop中不断的去进行select
private static final class RegisterTask implements Runnable {
private final Boss boss;
private final NioClientSocketChannel channel;
RegisterTask(Boss boss, NioClientSocketChannel channel) {
this.boss = boss;
this.channel = channel;
}
public void run() {
try {
channel.socket.register(
boss.selector, SelectionKey.OP_CONNECT, channel);
} catch (ClosedChannelException e) {
channel.worker.close(channel, succeededFuture(channel));
}
int connectTimeout = channel.getConfig().getConnectTimeoutMillis();
if (connectTimeout > 0) {
channel.connectDeadlineNanos = System.nanoTime() + connectTimeout * 1000000L;
}
}
}
register方法
void register(NioClientSocketChannel channel) {
Runnable registerTask = new RegisterTask(this, channel);
Selector selector;
synchronized (startStopLock) {
if (!started) {
// Open a selector if this worker didn't start yet.
try {
this.selector = selector = Selector.open();
} catch (Throwable t) {
throw new ChannelException(
"Failed to create a selector.", t);
}
// Start the worker thread with the new Selector.
boolean success = false;
try {
DeadLockProofWorker.start(
bossExecutor,
new ThreadRenamingRunnable(
this, "New I/O client boss #" + id + '-' + subId));
success = true;
} finally {
if (!success) {
// Release the Selector if the execution fails.
try {
selector.close();
} catch (Throwable t) {
logger.warn("Failed to close a selector.", t);
}
this.selector = selector = null;
// The method will return to the caller at this point.
}
}
} else {
// Use the existing selector if this worker has been started.
selector = this.selector;
}
assert selector != null && selector.isOpen();
started = true;
boolean offered = registerTaskQueue.offer(registerTask);
assert offered;
}
RegisterTask,放到Boss类持有的registerTaskQueue之后,Boss类会从boss executer线程池中取出一个线程不断地处理队列、选择准备就绪的键等。
然后run方法处理感兴趣的事件
public void run() {
boolean shutdown = false;
Selector selector = this.selector;
long lastConnectTimeoutCheckTimeNanos = System.nanoTime();
for (;;) {
wakenUp.set(false);
try {
int selectedKeyCount = selector.select(500);
.......
processRegisterTaskQueue();
if (selectedKeyCount > 0) {
processSelectedKeys(selector.selectedKeys());
}
在loop中,processRegisterTaskQueue会处理需要注册的任务,processSelectedKeys处理连接事件
private void processSelectedKeys(Set<SelectionKey> selectedKeys) {
for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
SelectionKey k = i.next();
i.remove();
if (!k.isValid()) {
close(k);
continue;
}
if (k.isConnectable()) {
connect(k);
}
}
}
将连接上的Channel注册到worker中,交给worker去注册read和write
private void connect(SelectionKey k) {
NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment();
try {
if (ch.socket.finishConnect()) {
k.cancel();
ch.worker.register(ch, ch.connectFuture);
}
} catch (Throwable t) {
ch.connectFuture.setFailure(t);
fireExceptionCaught(ch, t);
k.cancel(); // Some JDK implementations run into an infinite loop without this.
ch.worker.close(ch, succeededFuture(ch));
}
}
在这一系列初始化都完成之后,channel就可以拿来write和接收read数据了。
分享到:
相关推荐
Netty 框架学习 —— 第一个 Netty 应用(csdn)————程序
实现Java服务端和C#客户端联通 Java使用Netty 开发环境为IDEA C#使用DotNetty 开发环境为VS2017 运行时先开启Java服务端 再开启客户端
java服务器端(Netty_Proto)和c++客户端tcp通讯
Android Studio 开发Netty网络访问框架,实现了客户端、服务端两种访问方式,支持发送心跳数据,使用Handler实现外部数据交互,有调用Demo,在实际项目中使用暂时没有问题
springboot整合netty,分客户端和服务端两个项目,springboot整合netty,分客户端和服务端两个项目,springboot整合netty,分客户端和服务端两个项目,springboot整合netty,分客户端和服务端两个项目
详细的netty框架的简单案例,包括客户端服务端
Netty4长连接、断开重连、心跳检测、Msgpack编码解码 http://blog.csdn.net/giousa/article/details/72846303#t2
C# Netty 客户端,服务器端 自己研究一才研究出来的与大家分享一下
Netty中实现多客户端连接与通信-以实现聊天室群聊功能为例示例代码
通过netty编写文件传输的客户端与服务端,以及协议说明, 通用的netty传输协议 通过该协议进行文件传输 文件传输客户端与服务端 可以根据文件的最后更新时间来增量传输文件 源码开放,通过eclipse或者idea导入代码...
SCANFISH-II 型声呐系统数据接口协议,对接tcp转发app,json封装
Netty服务器与客户端
java应用netty服务端和客户端示例,客户端和服务端的model对象目录必须一致
然后你就可以仔细JavaFX代码和Netty的代码,很简单的呢。声明一下,本人使用Mina开发游戏服务器,没有打算使用Netty开发应用。制作这个示例只是为给别人帮个忙而已,然后就可以带你入门,最后你自己飞^_^
Netty中使用WebSocket实现服务端与客户端的长连接通信发送消息示例代码;Netty中使用WebSocket实现服务端与客户端的长连接通信发送消息示例代码;Netty中使用WebSocket实现服务端与客户端的长连接通信发送消息示例代码
注:下载前请查看本人博客文章,看是否...里面包含模拟TCP客户端发送报文工具,硬件厂商提供的协议,服务端(springboot+netty)解析报文源码,源码里整合了redis,不需要可自行删除,如有需要客户端代码,可联系我。
spring boot 整合的netty 实现的socket的demo(包括服务端和客户端是分开的两个项目,导入idea,启动即可)。
netty是一个异步非阻塞高并发通信框架,基于聚合多路复用技术,编解码,google的protobuf序列化等技术的高性能通信框架。
netty服务器解析16进制数据
Netty权威指南2.0--源代码,Netty权威指南2.0--源代码