1.概述

  在《Hadoop2源码分析-RPC机制初识》博客中,我们对RPC机制有了初步的认识和了解,下面我们对Hadoop V2的RPC机制做进一步探索,在研究Hadoop V2的RPC机制,我们需要掌握相关的Java基础知识,如:Java NIO、动态代理与反射等。本篇博客介绍的内容目录如下所示:

  • Java NIO简述
  • Java NIO实例演示
  • 动态代理与反射简述
  • 动态代理与反射实例演示
  • Hadoop V2 RPC框架使用实例

  下面开始今天的博客介绍。

2.Java NIO简述

  Java NIO又称Java New IO,它替代了Java IO API,提供了与标准IO不同的IO工作方式。Java NIO由一下核心组件组成:

  • Channels:连接通道,即能从通道读取数据,又能写数据到通道。可以异步读写,读写从Buffer开始。
  • Buffers:消息缓冲区,用于和NIO通道进行交互。所谓缓冲区,它是一块可以读写的内存,该内存被封装成NIO的Buffer对象,并提供相应的方法,以便于访问。
  • Selectors:通道管理器,它能检测到Java NIO中多个通道,单独的线程可以管理多个通道,间接的管理多个网络连接。

  下图为Java NIO的工作原理图,如下图所示:

3.Java NIO实例演示

  • NIOServer

  首先,我们来看NIOServer的代码块。代码内容如下所示:

package cn.hadoop.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import cn.hadoop.conf.ConfigureAPI; /**
* @Date May 8, 2015
*
* @Author dengjie
*
* @Note Defined nio server
*/
public class NIOServer { private static final Logger LOGGER = LoggerFactory.getLogger(NIOServer.class); // The channel manager
private Selector selector; /**
* Get ServerSocket channel and initialize
*
* 1.Get a ServerSocket channel
*
* 2.Set channel for non blocking
*
* 3.The channel corresponding to the ServerSocket binding to port port
*
* 4.Get a channel manager
*
* 5.The channel manager and the channel binding, and the channel registered
* SelectionKey.OP_ACCEPT event
*
* @param port
* @throws IOException
*/
public void init(int port) throws IOException {
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
serverChannel.socket().bind(new InetSocketAddress(port));
this.selector = Selector.open();
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
} /**
* listen selector
*
* @throws IOException
*/
public void listen() throws IOException {
LOGGER.info("Server has start success");
while (true) {
selector.select();
Iterator<SelectionKey> ite = this.selector.selectedKeys().iterator();
while (ite.hasNext()) {
SelectionKey key = (SelectionKey) ite.next();
ite.remove();
if (key.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel channel = server.accept();
channel.configureBlocking(false);// 非阻塞
channel.write(ByteBuffer.wrap(new String("Send test info to client").getBytes()));
channel.register(this.selector, SelectionKey.OP_READ);// 设置读的权限
} else if (key.isReadable()) {
read(key);
}
}
}
} /**
* Deal client send event
*/
public void read(SelectionKey key) throws IOException {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
channel.read(buffer);
byte[] data = buffer.array();
String info = new String(data).trim();
LOGGER.info("Server receive info : " + info);
ByteBuffer outBuffer = ByteBuffer.wrap(info.getBytes());
channel.write(outBuffer);// 将消息回送给客户端
} public static void main(String[] args) {
try {
NIOServer server = new NIOServer();
server.init(ConfigureAPI.ServerAddress.NIO_PORT);
server.listen();
} catch (Exception ex) {
ex.printStackTrace();
LOGGER.error("NIOServer main run error,info is " + ex.getMessage());
}
}
}
  • NIOClient

  然后,我们在来看NIOClient的代码块,代码具体内容如下所示:

package cn.hadoop.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import cn.hadoop.conf.ConfigureAPI; /**
* @Date May 8, 2015
*
* @Author dengjie
*
* @Note Defined NIO client
*/
public class NIOClient { private static final Logger LOGGER = LoggerFactory.getLogger(NIOClient.class); private Selector selector; /**
* Get ServerSocket channel and initialize
*/
public void init(String ip, int port) throws Exception {
SocketChannel channel = SocketChannel.open();
channel.configureBlocking(false);
this.selector = Selector.open();
channel.connect(new InetSocketAddress(ip, port));
channel.register(selector, SelectionKey.OP_CONNECT);
} /**
* listen selector
*/
public void listen() throws Exception {
while (true) {
selector.select();
Iterator<SelectionKey> ite = this.selector.selectedKeys().iterator();
while (ite.hasNext()) {
SelectionKey key = (SelectionKey) ite.next();
ite.remove();
if (key.isConnectable()) {
SocketChannel channel = (SocketChannel) key.channel();
if (channel.isConnectionPending()) {
channel.finishConnect();
}
channel.configureBlocking(false);// 非阻塞 channel.write(ByteBuffer.wrap(new String("Send test info to server").getBytes()));
channel.register(this.selector, SelectionKey.OP_READ);
} else if (key.isReadable()) {
read(key);
} } }
} /**
* Deal client send event
*/
public void read(SelectionKey key) throws IOException {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
channel.read(buffer);
byte[] data = buffer.array();
String info = new String(data).trim();
LOGGER.info("Client receive info : " + info);
ByteBuffer outBuffer = ByteBuffer.wrap(info.getBytes());
channel.write(outBuffer);
} public static void main(String[] args) {
try {
NIOClient client = new NIOClient();
client.init(ConfigureAPI.ServerAddress.NIO_IP, ConfigureAPI.ServerAddress.NIO_PORT);
client.listen();
} catch (Exception ex) {
ex.printStackTrace();
LOGGER.error("NIOClient main run has error,info is " + ex.getMessage());
}
}
}
  • ConfigureAPI

  下面给出ConfigureAPI类的代码,内容如下所示:

package cn.hadoop.conf;

/**
* @Date May 7, 2015
*
* @Author dengjie
*
* @Note Defined rpc info
*/
public class ConfigureAPI { public interface VersionID {
public static final long RPC_VERSION = 7788L;
} public interface ServerAddress {
public static final int NIO_PORT = 8888;
public static final String NIO_IP = "127.0.0.1";
} }

4.动态代理和反射简述

  在Java中,动态代理主要用来做方法的增强,可以在不修改源码的情况下,增强一些方法。另外,还有一个作用就是做远程调用,比如现在有Java接口,该接口的实现部署在非本地服务器上,在编写客户端代码时,由于没法直接生成该对象,这个时候就需要考虑使用动态代理了。

  而反射,利用了Class类作为反射实例化对象的基本应用,对于一个实例化对象而言,它需要调用类中的构造方法,属性和一般方法,这些操作都可以通过反射机制来完成。下面我们用一个实例来理解这些理论。

5.动态代理和反射实例演示

5.1动态代理

  • JProxy
package cn.java.base;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy; /**
* @Date May 7, 2015
*
* @Author dengjie
*/
public class JProxy { public static void main(String[] args) {
JInvocationHandler ji = new JInvocationHandler();
Subject sub = (Subject) ji.bind(new RealSubject());
System.out.println(sub.say("dengjie", 25));
} } interface Subject {
public String say(String name, int age);
} class RealSubject implements Subject { @Override
public String say(String name, int age) {
return name + "," + age;
} } class JInvocationHandler implements InvocationHandler { private Object object = null; public Object bind(Object object) {
this.object = object;
return Proxy.newProxyInstance(object.getClass().getClassLoader(), object.getClass().getInterfaces(), this);
} @Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Object tmp = method.invoke(this.object, args);
return tmp;
} }

5.2反射

  • JReflect
package cn.java.base;

/**
* @Date May 7, 2015
*
* @Author dengjie
*/
public class JReflect {
public static void main(String[] args) {
Fruit f = Factory.getInstance(Orange.class.getName());
if (f != null) {
f.eat();
}
}
} interface Fruit {
public abstract void eat();
} class Apple implements Fruit { @Override
public void eat() {
System.out.println("apple");
} } class Orange implements Fruit { @Override
public void eat() {
System.out.println("orange");
} } class Factory {
public static Fruit getInstance(String className) {
Fruit f = null;
try {
f = (Fruit) Class.forName(className).newInstance();
} catch (Exception e) {
e.printStackTrace();
}
return f;
}
}

6.Hadoop V2 RPC框架使用实例

  本实例主要演示通过Hadoop V2的RPC框架实现一个计算两个整数的Add和Sub,服务接口为 CaculateService ,继承于 VersionedProtocol ,具体代码如下所示:

  • CaculateService
package cn.hadoop.service;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.ipc.ProtocolInfo;
import org.apache.hadoop.ipc.VersionedProtocol; import cn.hadoop.conf.ConfigureAPI; /**
* @Date May 7, 2015
*
* @Author dengjie
*
* @Note Data calculate service interface
*/
@ProtocolInfo(protocolName = "", protocolVersion = ConfigureAPI.VersionID.RPC_VERSION)
public interface CaculateService extends VersionedProtocol { // defined add function
public IntWritable add(IntWritable arg1, IntWritable arg2); // defined sub function
public IntWritable sub(IntWritable arg1, IntWritable arg2); }

  注意,本工程使用的是Hadoop-2.6.0版本,这里CaculateService接口需要加入注解,来声明版本号。

  CaculateServiceImpl类实现CaculateService接口。代码如下所示:

  • CaculateServiceImpl
package cn.hadoop.service.impl;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.ipc.ProtocolSignature; import cn.hadoop.conf.ConfigureAPI;
import cn.hadoop.service.CaculateService; /**
* @Date May 7, 2015
*
* @Author dengjie
*
* @Note Implements CaculateService class
*/
public class CaculateServiceImpl implements CaculateService { public ProtocolSignature getProtocolSignature(String arg0, long arg1, int arg2) throws IOException {
return this.getProtocolSignature(arg0, arg1, arg2);
} /**
* Check the corresponding version
*/
public long getProtocolVersion(String arg0, long arg1) throws IOException {
return ConfigureAPI.VersionID.RPC_VERSION;
} /**
* Add nums
*/
public IntWritable add(IntWritable arg1, IntWritable arg2) {
return new IntWritable(arg1.get() + arg2.get());
} /**
* Sub nums
*/
public IntWritable sub(IntWritable arg1, IntWritable arg2) {
return new IntWritable(arg1.get() - arg2.get());
} }

  CaculateServer服务类,对外提供服务,具体代码如下所示:

  • CaculateServer
package cn.hadoop.rpc;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RPC.Server;
import org.slf4j.LoggerFactory;
import org.slf4j.Logger; import cn.hadoop.service.CaculateService;
import cn.hadoop.service.impl.CaculateServiceImpl; /**
* @Date May 7, 2015
*
* @Author dengjie
*
* @Note Server Main
*/
public class CaculateServer { private static final Logger LOGGER = LoggerFactory.getLogger(CaculateServer.class); public static final int IPC_PORT = 9090; public static void main(String[] args) {
try {
Server server = new RPC.Builder(new Configuration()).setProtocol(CaculateService.class)
.setBindAddress("127.0.0.1").setPort(IPC_PORT).setInstance(new CaculateServiceImpl()).build();
server.start();
LOGGER.info("CaculateServer has started");
System.in.read();
} catch (Exception ex) {
ex.printStackTrace();
LOGGER.error("CaculateServer server error,message is " + ex.getMessage());
}
} }

  注意,在Hadoop V2版本中,获取RPC下的Server对象不能在使用RPC.getServer()方法了,该方法已被移除,取而代之的是使用Builder方法来构建新的Server对象。

  RPCClient客户端类,用于访问Server端,具体代码实现如下所示:

  • RPCClient
package cn.hadoop.rpc;

import java.net.InetSocketAddress;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.ipc.RPC;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import cn.hadoop.service.CaculateService; /**
* @Date May 7, 2015
*
* @Author dengjie
*
* @Note RPC Client Main
*/
public class RPCClient { private static final Logger LOGGER = LoggerFactory.getLogger(RPCClient.class); public static void main(String[] args) {
InetSocketAddress addr = new InetSocketAddress("127.0.0.1", CaculateServer.IPC_PORT);
try {
RPC.getProtocolVersion(CaculateService.class);
CaculateService service = (CaculateService) RPC.getProxy(CaculateService.class,
RPC.getProtocolVersion(CaculateService.class), addr, new Configuration());
int add = service.add(new IntWritable(2), new IntWritable(3)).get();
int sub = service.sub(new IntWritable(5), new IntWritable(2)).get();
LOGGER.info("2+3=" + add);
LOGGER.info("5-2=" + sub);
} catch (Exception ex) {
ex.printStackTrace();
LOGGER.error("Client has error,info is " + ex.getMessage());
}
} }

  Hadoop V2 RPC服务端截图预览,如下所示:

  Hadoop V2 RPC客户端截图预览,如下所示:

7.总结

  Hadoop V2 RPC框架对Socket通信进行了封装,定义了自己的基类接口VersionProtocol。该框架需要通过网络以序列化的方式传输对象,关于Hadoop V2的序列化可以参考《Hadoop2源码分析-序列化篇》,传统序列化对象较大。框架内部实现了基于Hadoop自己的服务端对象和客户端对象。服务端对象通过new RPC.Builder().builder()的方式来获取,客户端对象通过RPC.getProxy()的方式来获取。并且都需要接受Configuration对象,该对象实现了Hadoop相关文件的配置。

8.结束语

  这篇博客就和大家分享到这里,如果大家在研究学习的过程当中有什么问题,可以加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!

Hadoop2源码分析-RPC探索实战的更多相关文章

  1. Hadoop2源码分析-YARN RPC 示例介绍

    1.概述 之前在<Hadoop2源码分析-RPC探索实战>一文当中介绍了Hadoop的RPC机制,今天给大家分享关于YARN的RPC的机制.下面是今天的分享目录: YARN的RPC介绍 Y ...

  2. Hadoop2源码分析-RPC机制初识

    1.概述 上一篇博客,讲述Hadoop V2的序列化机制,这为我们学习Hadoop V2的RPC机制奠定了基础.RPC的内容涵盖的信息有点多,包含Hadoop的序列化机制,RPC,代理,NIO等.若对 ...

  3. Hadoop2源码分析-HDFS核心模块分析

    1.概述 这篇博客接着<Hadoop2源码分析-RPC机制初识>来讲述,前面我们对MapReduce.序列化.RPC进行了分析和探索,对Hadoop V2的这些模块都有了大致的了解,通过对 ...

  4. Hadoop2源码分析-YARN 的服务库和事件库

    1.概述 在<Hadoop2源码分析-YARN RPC 示例介绍>一文当中,给大家介绍了YARN 的 RPC 机制,以及相关代码的演示,今天我们继续去学习 YARN 的服务库和事件库,分享 ...

  5. Hadoop2源码分析-MapReduce篇

    1.概述 前面我们已经对Hadoop有了一个初步认识,接下来我们开始学习Hadoop的一些核心的功能,其中包含mapreduce,fs,hdfs,ipc,io,yarn,今天为大家分享的是mapred ...

  6. Hadoop2源码分析-准备篇

    1.概述 我们已经能够搭建一个高可用的Hadoop平台了,也熟悉并掌握了一个项目在Hadoop平台下的开发流程,基于Hadoop的一些套件我们也能够使用,并且能利用这些套件进行一些任务的开发.在Had ...

  7. Hadoop2源码分析-Hadoop V2初识

    1.概述 在完成分析Hadoop2源码的准备工作后,我们进入到后续的源码学习阶段.本篇博客给大家分享,让大家对Hadoop V2有个初步认识,博客的目录内容如下所示: Hadoop的渊源 Hadoop ...

  8. Hadoop2源码分析-序列化篇

    1.概述 上一篇我们了解了MapReduce的相关流程,包含MapReduce V2的重构思路,新的设计架构,与MapReduce V1的区别等内容,今天我们在来学习下在Hadoop V2中的序列化的 ...

  9. Openstack Nova 源码分析 — RPC 远程调用过程

    目录 目录 Nova Project Services Project 的程序入口 setuppy Nova中RPC远程过程调用 nova-compute RPC API的实现 novacompute ...

随机推荐

  1. jq绑定事件的4种方式

    jQuery提供了多种绑定事件的方式,每种方式各有其特点,明白了它们之间的异同点,有助于我们在写代码的时候进行正确的选择,从而写出优雅而容易维护的代码.下面我们来看下jQuery中绑定事件的方式都有哪 ...

  2. 【GO】GO语言学习笔记二

    基本类型: 布尔型:boolean 整型:int8,byte,int16,int,uint,uintptr等 浮点型:float32,float64 复数类型:complex64,complex128 ...

  3. java中String类型变量的赋值问题

    第一节 String类型的方法参数 运行下面这段代码,其结果是什么? package com.test; public class Example { String str = new String( ...

  4. IAR修改工程名称Tab键设置模板建立

    IAR 修改工程名称 很多时候用IAR开发都是基于已有工程模板开发的,但是工程模板的名称经常让人头疼:以下是修改办法: 从一个实例工程复制后缀名为"dep,ewd,ewp,eww" ...

  5. dom4j修改,获取,增加xml中某个元素的属性值

    XML文件: <?xml version="1.0" encoding="UTF-8"?> <vrvscript> <item I ...

  6. [学姿势]实验室搬砖+node学习

    这周开始进行收尾工作,我当然没有进行核心技术的开发,主要负责的是对web端进行展示上的修修补补,主要包括添加VLC播放器.rtsp视频流以及一些js细节. 1.VLC 全称为Video Lan Cli ...

  7. [NOIP2002]自由落体

    NOIp2002提高组 题目描述 在高为 H 的天花板上有 n 个小球,体积不计,位置分别为 0,1,2,….n-1.在地面上有一个小车(长为 L,高为 K,距原点距离为 S1).已知小球下落距离计算 ...

  8. iOS 使用矢量图

    iOS 使用矢量图 iOS 图标通常用 PNG 格式的图片.PNG 图片放大到超过自身的大小就会模糊.可以使用 PDF 格式的矢量图,优点是任意改变图片大小并且保持清晰度. 简单使用 与 PNG 格式 ...

  9. RabbitMQ 实现远程过程调用RPC

    RPC调用的顺序1. 在客户端初始化的时候,也就是SimpleRpcClient类初始化的时候,它会随机的创建一个callback队列,用于存放服务的返回值,这个队列是exclusive的.连接断开就 ...

  10. 【51nod1847】 奇怪的数学题

    就当我是 A 了此题吧... 首先预备知识有点多(因为题目要处理的东西都挺毒瘤): 杜教筛运用(当然你可以用其他筛?) 第二类斯特林数相关定理 下降阶乘幂相关定理 min25 筛运用 好了可以关掉本题 ...