netty简介

Netty是 一个异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。
 

netty作为alluxio中重要的通讯组件

在常见的客户端上传,下载中,都会有netty的参与

关于这个图,可以看上篇文章的介绍:

https://www.cnblogs.com/victor2302/p/10490253.html

 
 
netty作为alluxio交互重要的组件,扮演者重要的角色:
  • 解耦ufs和worker缓存的功能
  • 解耦 BlockHandler和 ShortCircuitBlockHandler
  • 解耦异步上传,同步上传
  • 高性能传输
 

netty客户端部分:

1.固定的处理器:alluxio.network.netty.NettyClient

final Bootstrap boot = new Bootstrap();

boot.group(WORKER_GROUP)
.channel(NettyUtils.getClientChannelClass(!(address instanceof InetSocketAddress)));
boot.option(ChannelOption.SO_KEEPALIVE, true);
boot.option(ChannelOption.TCP_NODELAY, true);
boot.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
if (NettyUtils.USER_CHANNEL_TYPE == ChannelType.EPOLL) {
boot.option(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED);
} // After 10 missed heartbeat attempts and no write activity, the server will close the channel.
final long timeoutMs = Configuration.getMs(PropertyKey.NETWORK_NETTY_HEARTBEAT_TIMEOUT_MS);
final long heartbeatPeriodMs = Math.max(timeoutMs / 10, 1);
boot.handler(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(RPCMessage.createFrameDecoder());
pipeline.addLast(ENCODER);
pipeline.addLast(DECODER);
pipeline.addLast(new IdleStateHandler(0, heartbeatPeriodMs, 0, TimeUnit.MILLISECONDS));
pipeline.addLast(new IdleWriteHandler());
}
});

2.临时的处理器:针对通用response注册回调(ShortCircuitBlockHandler 调用)

public static ProtoMessage call(final NettyRPCContext context, ProtoMessage request)
throws IOException {
Channel channel = Preconditions.checkNotNull(context.getChannel());
final Promise<ProtoMessage> promise = channel.eventLoop().newPromise();
channel.pipeline().addLast(new RPCHandler(promise));
channel.writeAndFlush(new RPCProtoMessage(request)).addListener((ChannelFuture future) -> {
if (future.cause() != null) {
future.channel().close();
promise.tryFailure(future.cause());
}
});
ProtoMessage message;
try {
message = promise.get(context.getTimeoutMs(), TimeUnit.MILLISECONDS);
} catch (ExecutionException | TimeoutException e) {
CommonUtils.closeChannel(channel);
throw new IOException(e);
} catch (InterruptedException e) {
CommonUtils.closeChannel(channel);
throw new RuntimeException(e);
} finally {
if (channel.isOpen()) {
channel.pipeline().removeLast();
}
}
if (message.isResponse()) {
CommonUtils.unwrapResponseFrom(message.asResponse(), context.getChannel());
}
return message;
}

3.临时的处理器:针对读写操作注册回调(BlockHandler)

  private NettyPacketReader(FileSystemContext context, WorkerNetAddress address,
Protocol.ReadRequest readRequest) throws IOException {
mContext = context;
mAddress = address;
mPosToRead = readRequest.getOffset();
mReadRequest = readRequest; mChannel = mContext.acquireNettyChannel(address);
mChannel.pipeline().addLast(new PacketReadHandler());
mChannel.writeAndFlush(new RPCProtoMessage(new ProtoMessage(mReadRequest)))
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} private NettyPacketWriter(FileSystemContext context, final WorkerNetAddress address, long id,
long length, long packetSize, Protocol.RequestType type, OutStreamOptions options,
Channel channel) {
mContext = context;
mAddress = address;
mLength = length;
Protocol.WriteRequest.Builder builder =
Protocol.WriteRequest.newBuilder().setId(id).setTier(options.getWriteTier()).setType(type);
if (type == Protocol.RequestType.UFS_FILE) {
Protocol.CreateUfsFileOptions ufsFileOptions =
Protocol.CreateUfsFileOptions.newBuilder().setUfsPath(options.getUfsPath())
.setOwner(options.getOwner()).setGroup(options.getGroup())
.setMode(options.getMode().toShort()).setMountId(options.getMountId()).build();
builder.setCreateUfsFileOptions(ufsFileOptions);
}
mPartialRequest = builder.buildPartial();
mPacketSize = packetSize;
mChannel = channel;
mChannel.pipeline().addLast(new PacketWriteResponseHandler());
}

netty服务端:

注册处理器列表:

alluxio.worker.netty.PipelineHandler
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline(); final long timeoutMs = Configuration.getMs(PropertyKey.NETWORK_NETTY_HEARTBEAT_TIMEOUT_MS); // Decoders & Encoders
pipeline.addLast("frameDecoder", RPCMessage.createFrameDecoder());
pipeline.addLast("RPCMessageDecoder", new RPCMessageDecoder());
pipeline.addLast("RPCMessageEncoder", new RPCMessageEncoder()); // Idle Event Handlers
pipeline.addLast("idleEventHandler", new IdleStateHandler(timeoutMs, 0, 0,
TimeUnit.MILLISECONDS));
pipeline.addLast("idleReadHandler", new IdleReadHandler());
pipeline.addLast("heartbeatHandler", new HeartbeatHandler()); // Block Handlers
pipeline.addLast("blockReadHandler",
new BlockReadHandler(NettyExecutors.BLOCK_READER_EXECUTOR,
mWorkerProcess.getWorker(BlockWorker.class), mFileTransferType));
pipeline.addLast("blockWriteHandler", new BlockWriteHandler(
NettyExecutors.BLOCK_WRITER_EXECUTOR, mWorkerProcess.getWorker(BlockWorker.class),
mWorkerProcess.getUfsManager()));
pipeline.addLast("shortCircuitBlockReadHandler",
new ShortCircuitBlockReadHandler(NettyExecutors.RPC_EXECUTOR,
mWorkerProcess.getWorker(BlockWorker.class)));
pipeline.addLast("shortCircuitBlockWriteHandler",
new ShortCircuitBlockWriteHandler(NettyExecutors.RPC_EXECUTOR,
mWorkerProcess.getWorker(BlockWorker.class)));
pipeline.addLast("asyncCacheHandler", new AsyncCacheHandler(mRequestManager)); // UFS Handlers
pipeline.addLast("ufsFileWriteHandler", new UfsFileWriteHandler(
NettyExecutors.FILE_WRITER_EXECUTOR, mWorkerProcess.getUfsManager())); // Unsupported Message Handler
pipeline.addLast("unsupportedMessageHandler", new UnsupportedMessageHandler());
}

写入或者读取配置

alluxio.client.file.options.CreateFileOptions是FileSystem类createFile的第二个参数,可以选定不同的写入策略

例如:

  • MUST_CACHE(只写入Alluxio,必须存储在Alluxio中)
  • CACHE_THROUGH(尝试缓存,同步写入到UnderFS)
  • THROUGH(无缓存,同步写入到UnderFS)
  • ASYNC_THROUGH(异步写入到UnderFS,实现特性)
FileOutStream createFile(AlluxioURI path, CreateFileOptions options)
throws FileAlreadyExistsException, InvalidPathException, IOException, AlluxioException;

而这种写入选项,就是通过在传递netty message时,设置不同的标识,然后在netty中分派到不同的pipeline节点,处理各自的特性的

代码实例:

是否需要写入到ufs,则在UfsFileWriteHandler的acceptMessage方法中进行判断的

alluxio.worker.netty.UfsFileWriteHandler#acceptMessage

  protected boolean acceptMessage(Object object) {
if (!super.acceptMessage(object)) {
return false;
}
Protocol.WriteRequest request = ((RPCProtoMessage) object).getMessage().asWriteRequest();
return request.getType() == Protocol.RequestType.UFS_FILE;
}

alluxio源码解析-netty部分(2)的更多相关文章

  1. Netty 源码解析: Netty 的 ChannelPipeline

    ChannelPipeline和Inbound.Outbound         我想很多读者应该或多或少都有 Netty 中 pipeline 的概念.前面我们说了,使用 Netty 的时候,我们通 ...

  2. alluxio源码解析-rpc调用概述-client和worker之间的block模块的通讯架构(netty版本)(3)

    (1.8版本)client和worker之间的block模块的通讯架构 block作为alluxio文件读取或者存储的最小基本单位,都是通过BlockOutStream和BlockInputtream ...

  3. alluxio源码解析-层次化存储(4)

    层次化存储-特性介绍: https://www.alluxio.org/docs/1.6/cn/Tiered-Storage-on-Alluxio.html 引入分层存储后,Alluxio管理的数据块 ...

  4. alluxio源码解析-rpc调用概述(1)

    alluxio中几种角色以及角色之间的rpc调用: 作为分布式架构的文件缓存系统,rpc调用必不可少 client作为客户端 master提供thrift rpc的服务,管理以下信息: block信息 ...

  5. Netty 4源码解析:请求处理

    Netty 4源码解析:请求处理 通过之前<Netty 4源码解析:服务端启动>的分析,我们知道在最前端"扛压力"的是NioEventLoop.run()方法.我们指定 ...

  6. Netty 4源码解析:服务端启动

    Netty 4源码解析:服务端启动 1.基础知识 1.1 Netty 4示例 因为Netty 5还处于测试版,所以选择了目前比较稳定的Netty 4作为学习对象.而且5.0的变化也不像4.0这么大,好 ...

  7. netty服务端启动--ServerBootstrap源码解析

    netty服务端启动--ServerBootstrap源码解析 前面的第一篇文章中,我以spark中的netty客户端的创建为切入点,分析了netty的客户端引导类Bootstrap的参数设置以及启动 ...

  8. Netty源码解析—客户端启动

    Netty源码解析-客户端启动 Bootstrap示例 public final class EchoClient { static final boolean SSL = System.getPro ...

  9. Netty源码解析---服务端启动

    Netty源码解析---服务端启动 一个简单的服务端代码: public class SimpleServer { public static void main(String[] args) { N ...

随机推荐

  1. HTTP权威协议笔记-6.代理

    6.1 Web的中间实体 Http的代理服务器即是客户端的服务器又是服务器的客户端. 它介于服务器与客户端之间,当客户端发送请求报文经过它时,它会像服务器一样正确的处理请求和返回响应,同时,代理服务器 ...

  2. Win10 UWP应用发布流程

    简介 Win10 UWP应用作为和Win8.1 UAP应用不同的一种新应用形式,其上传至Windows应用商店的流程也有了一些改变. 这篇博文记录了我们发布一款Win10 UWP应用的基本流程,希望为 ...

  3. linux基础-第十五单元 软件包的管理

    使用RPM安装及移除软件 什么是RPM rpm的文件名 rpm软件安装与移除工作中经常使用的选项 查看RPM软件包中的信息 查询已安装的软件包信息 RPM包的属性依赖性问题 什么是RPM包的属性依赖性 ...

  4. 如何使用mybatis《一》

    mybatis作为ORM轻量级框架一出现就吸引了无数人的眼球,比hibernate要简单且入门较容易,下面开始我的第一个mybatis程序. 一.下载mybatis的包 我们知道任何一个框架都会有其包 ...

  5. oracle疑难杂症问题

    在虚拟机中安装了oracle10g,由于虚拟机的空间有限,看到磁盘空间快没了,就手贱把oracle目录中的空文件夹(E:\oracle\product\10.2.0\flash_recovery_ar ...

  6. python分割sql文件

    之前用joomla帮一学校做了个网站,然后要部署到他们到服务器上,他们只提供了sftp和phpmyadmin的账号,上传网站文件倒是挺顺利的,但后来用phpmyadmin导入mysql数据就遇到问题了 ...

  7. sublime插件emmet的配置、使用及快捷键Ctrl+E修改成Tab键操作

    一.emmet在sublime中的配置与使用: 1.点击sublime text 3的图标,打开编辑器: 2.按键“ctrl+shift+p”,或者单击菜单->工具->命令面板: 3.打开 ...

  8. 人生苦短,我用 Python

    从2015开始国内就开始慢慢接触Python了,从16年开始Python就已经在国内的热度更高了,目前也可以算的上"全民Python"了.众所周知小学生的教材里面已经有Python ...

  9. 爬虫基础之requests模块

    1. 爬虫简介 1.1 概述 网络爬虫(又被称为网页蜘蛛,网络机器人,在FOAF社区中间,更经常的称为网页追逐者),是一种按照一定的规则,自动地抓取万维网信息的程序或者脚本. 1.2 爬虫的价值 在互 ...

  10. Spark开发环境搭建(IDEA、Scala、SVN、SBT)

    软件版本 软件信息 软件名称 版本 下载地址 备注 Java 1.8 https://www.oracle.com/technetwork/java/javase/downloads/jdk8-dow ...