概述
netty,说人话就是封装NIO做出来的一个JAVA高性能通信框架。在JAVA领域,有高性能网络通信需求的时候,绝大多数都会选择netty作为通信框架。
netty底层就是封装的NIO。如果自己使用NIO的话至少会有以下的不便:
需要自己构建协议。
需要自己解决TCP传输问题,如粘包、半包。
API过于底层,不便于使用。
netty其实就是封装了一下NIO,使得NIO更便于使用。
简单使用
public class HelloClient {
public static void main(String[] args) throws InterruptedException {
new Bootstrap()
.group(new NioEventLoopGroup())
//用什么进行发送?
//可以是BIO,也可以是NIO,也可以是epoll
.channel(NioSocketChannel.class)
//处理器
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
//指定编码方式
nioSocketChannel.pipeline().addLast(new StringEncoder());
}
})
//连接到服务器
.connect(new InetSocketAddress("localhost",8080))
//同步通信
.sync()
//代表连接对象
.channel()
//发送数据
.writeAndFlush("hello world");
}
}
public class HelloServer {
public static void main(String[] args) {
//ServerBootstrap,启动器,负责组装netty组件
new ServerBootstrap()
//1.怎样去接收IO?
//事件组,事件组里面包含thread和selector,可以理解为netty种用来选择IO的组件
.group(new NioEventLoopGroup())
//2.接收成什么?
//服务器ServerSocketChannel实现,由于上面用的Nio的事件组,所选nio的
//除此以外,还支持BIO和特定操作系统的,如Linux的EpollServerSocketChannel
.channel(NioServerSocketChannel.class)
//3.做什么处理?
//支持用责任链模式来对收到的IO进行链式处理
.childHandler(new ChannelInitializer<NioSocketChannel>() {
//连接建立后才会调用初始化方法
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) {
//指定解码方式
nioSocketChannel.pipeline().addLast(new StringDecoder());
//ChannelInboundHandlerAdapter接口是netty让用户自定义handler的接口
nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println(msg);
}
});
}
})
//4.绑定监听端口
.bind(8080);
}
}
可见,在服务端成功输出hello world
EventLoop
eventLoop,事件循环对象,是一个单线程执行器,本质上就是一条线程+一个selector,用来单线程监听处理IO事件。实际使用上很少直接使用EventLoop,而是使用EventLoopGroup,EventLoopGroup的构造方法中可以指定其中的EventLoop数量。eventLoop除了继承Netty体系类的一些标准化接口外,还继承了JDK中的ScheduledExecutorService,使得其自身具备线程池一切的能力。既然是线程池,就可以用来执行任务。
上图是其中的源码,我们通过 DEFAULT_EVENT_LOOP_THREADS 这个常量可以知道默认线程池的数量 它是通过 SystemPropertyUtil.getInt 方法尝试从系统属性中获取名为 "io.netty.eventLoopThreads" 的整数值。 如果系统属性中没有设置该值,则使用默认值 NettyRuntime.availableProcessors() 2。 NettyRuntime.availableProcessors() 2: NettyRuntime.availableProcessors() 返回当前系统的可用处理器核心数。 默认情况下,EventLoop线程数设置为核心数的两倍,以充分利用CPU资源。
EventLoopGroup group =new NioEventLoopGroup(5);
group.next().submit(()->{
try {
Thread.sleep(10000);
System.out.println("success!");
} catch (Exception e) {
e.printStackTrace();
}
});
eventLoop执行IO任务:
一个EventGroupLoop其实就是一条线程,用来处理一条通信连接。
public static void main(String[] args) {
//ServerBootstrap,启动器,负责组装netty组件
new ServerBootstrap()
//1.怎样去接收IO?
//事件组,事件组里面包含thread和selector,可以理解为netty种用来选择IO的组件
.group(new NioEventLoopGroup())
//2.接收成什么?
//服务器ServerSocketChannel实现,由于上面用的Nio的事件组,所选nio的
//除此以外,还支持BIO和特定操作系统的,如Linux的EpollServerSocketChannel
.channel(NioServerSocketChannel.class)
//3.做什么处理?
//支持用责任链模式来对收到的IO进行链式处理
.childHandler(new ChannelInitializer<NioSocketChannel>() {
//连接建立后才会调用初始化方法
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
//指定解码方式
nioSocketChannel.pipeline().addLast(new StringDecoder());
//ChannelInboundHandlerAdapter接口是netty种让用户自定义handler的接口
nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(msg);
}
});
}
})
//4.绑定监听端口
.bind(8080);
}
channel
channel,对NIO的channel的二次封装,内核段缓冲区的抽象。不管是服务端还是客户端,只要调用channel()方法都能获取当前工作的这条channel。channel无非要注意的点就是它的同步和异步。
在实际应用中我们要知道在读的时候同步和异步是没有意义的,不可能在读IO的时候还区分同步读或者异步读,只可能是准备好了就读。只有写IO的时候区分同步和异步才是意义。所以在netty体系里很少会去服务端操作channel的同步和异步,一般都是在客户端操作channel的同步和异步。 之后我们看一个案例
同步
服务端:
在服务端让建立连接的时候休眠3秒。
public static void main(String[] args) {
//ServerBootstrap,启动器,负责组装netty组件
new ServerBootstrap()
//1.怎样去接收IO?
//事件组,事件组里面包含thread和selector,可以理解为netty种用来选择IO的组件
.group(new NioEventLoopGroup())
//2.接收成什么?
//服务器ServerSocketChannel实现,由于上面用的Nio的事件组,所选nio的
//除此以外,还支持BIO和特定操作系统的,如Linux的EpollServerSocketChannel
.channel(NioServerSocketChannel.class)
//3.做什么处理?
//支持用责任链模式来对收到的IO进行链式处理
.childHandler(new ChannelInitializer<NioSocketChannel>() {
//连接建立后才会调用初始化方法
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
//指定解码方式
nioSocketChannel.pipeline().addLast(new StringDecoder());
//ChannelInboundHandlerAdapter接口是netty种让用户自定义handler的接口
nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Thread.sleep(3000);
System.out.println(msg);
}
});
}
})
//4.绑定监听端口
.bind(8080);
}
客户端:
客户端使用channel的sync来进行同步通信,同步模式下在connect建立连接的时候,主线程会同步等待,连接建立后再向下执行。
public class HelloCleint {
public static void main(String[] args) throws InterruptedException {
new Bootstrap()
.group(new NioEventLoopGroup())
//用什么进行发送?
//可以是BIO,也可以是NIO,也可以是epoll
.channel(NioSocketChannel.class)
//处理器
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
//指定编码方式
nioSocketChannel.pipeline().addLast(new StringEncoder());
}
})
//连接到服务器
.connect(new InetSocketAddress("localhost",8080))
//同步通信
.sync()
//代表连接对象
.channel()
//发送数据
.writeAndFlush("hello world");
}
}
import java.util.Scanner;public class Main { public static void main(String[] args) { Scanner scanner = new Scanner(System.in); int n = scanner.nextInt(); int[] vec = new int[n]; int[] p = new int[n]; int presum = 0; for (int i = 0; i < n; i++) { vec[i] = scanner.nextInt(); presum += vec[i]; p[i] = presum; } while (scanner.hasNextInt()) { int a = scanner.nextInt(); int b = scanner.nextInt(); int sum; if (a == 0) { sum = p[b]; } else { sum = p[b] - p[a - 1]; } System.out.println(sum); } scanner.close(); }}java
channel默认处于异步通信模式,connect建立连接的时候,不会同步等待,而是会继续向下执行,由于服务器端延迟了3秒来建立连接,所以客户端发送这条“hello world”发送时,连接并未建立完成,最终效果就是丢包,服务器收不到这条数据。执行结果为空。
public class Client {
public static void main(String[] args) throws InterruptedException {
ChannelFuture channelFuture = new Bootstrap()
.group(new NioEventLoopGroup())
//用什么进行发送?
//可以是BIO,也可以是NIO,也可以是epoll
.channel(NioSocketChannel.class)
//处理器
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
//指定编码方式
nioSocketChannel.pipeline().addLast(new StringEncoder());
}
})
//连接到服务器
.connect(new InetSocketAddress("localhost", 8080));
//异步
channelFuture.channel().writeAndFlush("hello world");
}
}
当然,在异步通信上,netty支持了监听器,建立连接完成后,用事件回调的方式触发监听器。利用监听器,可以使得异步通信不丢包:
//异步
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
channelFuture.channel().writeAndFlush("hello world");
}
});
}
调试
EmbeddedChannel是Netty中提供的一种特殊类型的Channel实现,主要用于单元测试。它允许你在测试中模拟输入事件(例如读取数据、写入数据)并检查输出事件(例如读取到的数据)。使用EmbeddedChannel可以在不启动真实的网络连接的情况下测试你的ChannelHandler逻辑。
import io.netty.channel.embedded.EmbeddedChannel;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class UpperCaseHandlerTest {
@Test
public void testUpperCaseHandler() {
// 创建EmbeddedChannel,并添加要测试的Handler
EmbeddedChannel channel = new EmbeddedChannel(new UpperCaseHandler());
// 写入一个字符串消息到Channel
channel.writeInbound("hello");
// 读取Channel的输出
String output = channel.readOutbound();
System.out.println(output);
// 验证处理后的消息是否符合预期
assertEquals("HELLO", output);
// 关闭Channel
channel.finish();
}
}
public class UpperCaseHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) {
String upperCaseMsg = msg.toUpperCase();
ctx.writeAndFlush(upperCaseMsg);
}
}
关闭
由于channel的close方法是异步的,所以在关闭资源时会存在风险。比如代码顺序为:
close掉channel
close掉其它资源
有可能在close掉其它资源的时候,channel并没有close掉,也就可能出现,channel中还有数据没处理完,其它资源被关掉了,导致数据处理失败的问题。所以更为稳妥的方式是用同步的机制来关闭channel。netty中封装了CloseFuture来同步关闭channel。
ChannelFuture closeFuture = channelFuture.channel().closeFuture();
closeFuture.sync();
要注意的是channel停止后如果EventLoopGroup还有其它线程时,程序是不会中止的,想要中止程序,必须再close掉group,EventLoopGroup提供了优雅停机的API——shutdownGracefully,会先停止接收请求,驻留的请求处理完成后,关掉group。
为什么要用异步
我们可以看到channel里面大量的用到了异步,对一个channel的操作,connect是一条线程,write是一条线程,close也是一条线程......
用异步的方式来处理,不仅不会加快单个IO任务的速度,反而还会略微拉长一个IO的响应时间,但是异步能明显提高吞吐量。
举个例子,一个病人看病,分为挂号、看病、缴费。取药,同步的方式就是一个医生走完一个病人的所有流程:
而异步的方式就是医生分工合作,每个医生单独负责一个项目,这样一个时间段内虽然处理的任务综合是一样的,但是在峰值的吞吐量上,异步是同步的四倍:
future
JDK的future是表示一个任务,netty的future是对JDK的future做了二次封装。
同步
public static void main(String[] args) throws Exception {
NioEventLoopGroup nioEventLoopGroup=new NioEventLoopGroup();
Future<String> future = nioEventLoopGroup.submit(new Callable<String>() {
public String call() throws Exception {
Thread.sleep(1000);
return "success!";
}
});
//future的get方法是同步的,同步等待线程返回返回值为止
System.out.println(future.get());
}
异步:
用监听器实现异步
public static void main(String[] args) throws Exception {
NioEventLoopGroup nioEventLoopGroup=new NioEventLoopGroup();
Future<String> future = nioEventLoopGroup.submit(new Callable<String>() {
public String call() throws Exception {
Thread.sleep(1000);
return "success!";
}
});
//用监听器来实现异步
future.addListener(new GenericFutureListener<Future<? super String>>() {
public void operationComplete(Future<? super String> future) throws Exception {
System.out.println(future.get());
}
});
}
promise
光是有future是不够的,因为future必须处理完了,才能拿到结果,有些时候需要提前拿到结果开始处理,就需要在两个线程间进行通信,通信就需要一个存放数据的地方,也就有了promise,其可以理解为一个数据容器,可以向该容器中手动的存放数据、拿数据。
public static void main(String[] args) {
EventLoopGroup eventLoopGroup=new NioEventLoopGroup();
EventLoop eventLoop = eventLoopGroup.next();
final DefaultPromise<String> promise=new DefaultPromise<String>(eventLoop);
eventLoop.execute(new Runnable() {
public void run() {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
promise.setSuccess("success!");
}
});
//默认是同步
try {
System.out.println(promise.get());
} catch (Exception e) {
e.printStackTrace();
}
//可以用监听器来实现异步
//promise.addListener(new GenericFutureListener<Future<? super String>>() {
//public void operationComplete(Future<? super String> future) throws Exception {
//System.out.println(promise.get());
//}
//});
}
pipeline
在 Netty 中,pipeline
是一种机制,它由一系列的 ChannelHandler
组成。pipeline
负责处理进入或离开 Channel
的数据,并且将事件(比如连接建立、数据读取等)转发给正确的 handler
进行处理。
handler 是 pipeline的节点,每个 handler
会接收来自前一个 handler
的处理结果,并进行自己的处理。然后,它将处理结果传递给下一个 handler
,直到最终达到 pipeline
的尾部。pipeline
的头部和尾部都是特殊的 handler
,头部负责处理 Inbound
操作,尾部则负责处理 Outbound
操作。
public class Client {
public static void main(String[] args) throws InterruptedException {
new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress("localhost", 8080))
.sync()
.channel()
.writeAndFlush("hello world");
}
}
public class Server {
public static void main(String[] args) {
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
//H1->H2->H3->h4->h5->h6
//入站处理器
nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("H1");
//向下走
super.channelRead(ctx, msg);
}
});
nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("H2");
//向下走
super.channelRead(ctx, msg);
}
});
nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("H3");
//写操作,用来触发后面的出站处理器
nioSocketChannel.writeAndFlush(ctx.alloc().buffer().writeBytes("Servers......".getBytes()));
}
});
//出站处理器
nioSocketChannel.pipeline().addLast(new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object o, ChannelPromise channelPromise) throws Exception {
System.out.println("h4");
super.write(ctx, o, channelPromise);
}
});
nioSocketChannel.pipeline().addLast(new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object o, ChannelPromise channelPromise) throws Exception {
System.out.println("h5");
super.write(ctx, o, channelPromise);
}
});
nioSocketChannel.pipeline().addLast(new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object o, ChannelPromise channelPromise) throws Exception {
System.out.println("h6");
super.write(ctx, o, channelPromise);
}
});
}
}).bind(8080);
}
}
入站处理器顺序执行,出站处理器逆序执行。
netty中使用了责任链来处理对channel的读写请求,链上每一个节点都是一个处理器,有两种处理器:
出站处理器,用来处理write操作。
入站处理器,用来处理read操作。
出站(Outbound):数据从应用程序流向网络的过程被称为“出站”,因为数据是从应用程序向外发送,穿越协议栈的各个层级,最终到达网络。
入站(Inbound):数据从网络流向应用程序的过程被称为“入站”,因为数据是从外部网络进入应用程序,穿越协议栈的各个层级,最终到达应用程序。
Pipeline 提供了一种灵活而高效的方式来处理传入和传出的数据流,而 Inbound 和 Outbound 作为两个关键组成部分,负责处理数据的接收和发送。
byteBuf
在Java NIO(New I/O)中,ByteBuffer
是一个用来处理字节数据的缓冲区类,是对NIO的byteBuffer的二次封装和扩展,可以直接理解为用户段内存的抽象。
开辟byteBuf:
public static void main(String[] args) {
//可以通过传参来指定大小
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
}
根据所开辟的内存空间的位置的不同,byteBuf分为两类:
直接缓冲区
非直接缓冲区
直接缓冲区:
直接创建在物理机的缓冲区中,创建和销毁的代价昂贵,但是读写性能高。要注意的是直接内存不受GC的管理,需要注意手动释放内存,避免内存泄露。
ByteBuf directBuffer = ByteBufAllocator.DEFAULT.directBuffer();
非直接缓冲区:
创建在JVM中的缓冲区,创建和销毁的代价相对没那么高,但是读写性能相对较低。
创建池化的非直接缓冲区:
ByteBuf directBuffer1 = ByteBufAllocator.DEFAULT.heapBuffer();
ByteBuf 的池化是指将 ByteBuf 实例预先分配并存储在内存池中,以便在需要时进行重复使用。池化 ByteBuf 的主要目的是减少内存分配和垃圾回收的开销,从而提高性能。Netty 提供了池化 ByteBuf 的功能,它内置了两种 ByteBuf 池化的实现:PooledByteBufAllocator 和 UnpooledByteBufAllocator。 1.PooledByteBufAllocator(池化的内存分配器):
PooledByteBufAllocator 是 Netty 提供的默认的 ByteBuf 池化实现。它通过预先分配一些 ByteBuf 实例,并将它们存储在池中。当需要创建新的 ByteBuf 实例时,它会从池中获取已有的实例,而不是每次都重新分配内存。
使用 PooledByteBufAllocator 可以减少频繁的内存分配和释放操作,避免了堆内存的碎片化,提高了性能。
// 使用 PooledByteBufAllocator 创建 ByteBuf
ByteBufAllocator allocator = PooledByteBufAllocator.DEFAULT;
ByteBuf pooledBuffer = allocator.buffer(1024); // 创建1KB的池化 ByteBuf
// 使用 pooledBuffer...
// 释放 ByteBuf,将其返回到池中
pooledBuffer.release();
2.UnpooledByteBufAllocator(非池化的内存分配器):
UnpooledByteBufAllocator 是 Netty 提供的非池化的 ByteBuf 实现。它每次都会分配新的内存,不会重用已有的 ByteBuf 实例。虽然不会涉及到池的管理,但在一些短期存活或者需要手动管理内存的场景下使用非池化内存分配器可能更合适。
public static void main(String[] args) {
// 使用 UnpooledByteBufAllocator 创建 ByteBuf
ByteBufAllocator allocator = UnpooledByteBufAllocator.DEFAULT;
ByteBuf unpooledBuffer = allocator.buffer(1024); // 创建1KB的非池化 ByteBuf
// 使用 unpooledBuffer...
// 释放 ByteBuf(注意:在非池化情况下,需要手动释放 ByteBuf)
unpooledBuffer.release();
}
bytebuff 的组成
bytebuf一开始有个初始化容量(capacity),可以手动指定,没有手动指定时也有个默认值。
bytebuf是自动扩容的,扩容的上限(max capacity)其实就是机器的物理内存。
读写指针一开始在0位,随着读写,读写指针向后移动。要注意,bytebuf的读写,只涉及指针的移动,不涉及内存的回收,也就是读过的区域(废弃字节)并不会被释放,除非调用特殊的API(discardReadBytes())。
netty的bytebuf相较于NIO的bytebuffer,有以下优势:
bytebuffer读写公用一个指针,所以,读之前要切换到读模式;写之前要切换到写模式。
bytebuf自动扩容,而bytebuffer不行。
写操作
//写入数字
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
buffer.writeInt(666);
// 写入字符串
String stringValue = "Hello, World!";
byte[] stringBytes = stringValue.getBytes(StandardCharsets.UTF_8);
buffer.writeBytes(stringBytes);
// 读取整数
int readIntValue = buffer.readInt();
// 读取字符串
int readableBytes = buffer.readableBytes();
byte[] stringBytes = new byte[readableBytes];
buffer.readBytes(stringBytes);
String readStringValue = new String(stringBytes, StandardCharsets.UTF_8);
需要注意的是,在读取数据之前,你需要确保 ByteBuf 中有足够的可读字节数。可以使用 readableBytes() 方法来检查 ByteBuf 中的可读字节数。
此外,ByteBuf 还提供了其他的读写操作,比如 readableBytes() 用于获取可读字节数,writerIndex() 和 readerIndex() 用于获取写入和读取的索引位置等。在使用 ByteBuf 时,请确保在读写时不越界,并且注意释放 ByteBuf 以避免内存泄漏。在Netty中,通常会使用 ReferenceCountUtil.release(buffer) 来释放 ByteBuf,确保资源得到正确释放。 bytebuf要特别注意资源的释放,以避免内存泄漏。Netty使用引用计数(Reference Counting)来管理 ByteBuf 的生命周期,确保在不再需要使用时及时释放资源。
在Netty中,release() 和 retain() 是用于管理 ByteBuf 引用计数的方法。
release() 方法用于将 ByteBuf 的引用计数减少1。当引用计数减至0时,Netty会释放 ByteBuf 的内存(如果使用了池化的 ByteBuf,则将它归还给池)。
ByteBuf buffer = //... 从某个地方获取ByteBuf实例 buffer.release(); // 引用计数减少1,如果引用计数为0,释放ByteBuf的内存 retain() 方法用于将 ByteBuf 的引用计数增加1。当你调用 retain() 方法时,你告诉Netty你对这个 ByteBuf 感兴趣,即使在你使用完后,其他代码也可能继续使用它。
双向通信
public class Server {
public static void main(String[] args) {
//ServerBootstrap,启动器,负责组装netty组件
new ServerBootstrap()
//1.怎样去接收IO?
//事件组,事件组里面包含thread和selector,可以理解为netty种用来选择IO的组件
.group(new NioEventLoopGroup())
//2.接收成什么?
//服务器ServerSocketChannel实现,由于上面用的Nio的事件组,所选nio的
//除此以外,还支持BIO和特定操作系统的,如Linux的EpollServerSocketChannel
.channel(NioServerSocketChannel.class)
//3.做什么处理?
//支持用责任链模式来对收到的IO进行链式处理
.childHandler(new ChannelInitializer<NioSocketChannel>() {
//连接建立后才会调用初始化方法
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
//指定解码方式
nioSocketChannel.pipeline().addLast(new StringDecoder());
//ChannelInboundHandlerAdapter接口是netty种让用户自定义handler的接口
nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(msg);
ByteBuf response = ctx.alloc().buffer();
response.writeBytes(msg.toString().getBytes());
ctx.writeAndFlush(response);
}
});
}
})
//4.绑定监听端口
.bind(8080);
}
}
public class Client {
public static void main(String[] args) throws InterruptedException {
new Bootstrap()
.group(new NioEventLoopGroup())
//用什么进行发送?
.channel(NioSocketChannel.class)
//处理器
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
//指定编码方式
nioSocketChannel.pipeline().addLast(new StringEncoder());
nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf buffer = ctx.alloc().buffer();
buffer.writeBytes("hello".getBytes());
ctx.writeAndFlush(buffer);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(msg.toString());
}
});
}
})
//连接到服务器
.connect(new InetSocketAddress("localhost", 8080));
}
}