本系列文章均转自:http://blog.csdn.net/kobejayandy/article/details/20163431

在ZeroMQ中并没有绝对的服务端与客户端之分,所有的数据接收与发送都是以连接为单位的,只区分ZeroMQ定义的类型,例如Response与Request,Publisher与Subscriber,Push与Pull等。。。

例如在前面我们最开始的Response/Request模式,因为只有一个Response端,而有多个Request端,所以我们选择在Response端调用bind方法来建立监听,而在Request端调用connect方法与Response端建立连接。。。因此根据以前常用的概念,可以简单的将Response理解为服务端,将Request理解为客户端。。。。

这种状态下,整个系统大概用下面的图形来形容:

这里因此按照我们常规的区分方法,将建立监听的叫做服务端,发起连接的叫做客户端,但其实呢,在ZeroMQ这种按照监听的方式来区分是不成立的。。。将上图的网络构建变成如下这个样子:

上图的网络结构中,在Request端建立监听,而在Response发起与Request端的连接,这样,request同样可以发送请求到Response端。。。

其实说这么多无非就像强调:在ZeroMQ中,不要用常规的server/client模式来对组件进行分类,而应该采用ZeroMQ中定义的类型(Request,Response,Push,Pull)。

好了,接下来回到Push/Pull模式,这算是非常经典的了吧,Push产生消息,Pull角色来拿消息。。。甚至可以用生产者/消费者模型来对应。。。应用场景就非常的广泛了。。。最典型的应用场景就是任务分发。。。

在ZeroMQ中根据Push与Pull角色各自的数量又定义了一些比较有趣的名词:

(1)parallel pipeline,并行流水线,这种情况下是一个push,多个pull,可以理解为一个push不断的产生任务,并将这些任务分发给pull角色。。。如下图:

在这种网络结构中,manager不断的将任务分发给worker,要实现这种通信,在ZeroMQ中就直接用Push/Pull就可以了,代码也很简单,首先是Manager端(Push):

  1. package pushpull13;
  2. import org.zeromq.ZMQ;
  3. public class Push {
  4. public static void main(String args[]) {
  5. ZMQ.Context context = ZMQ.context();
  6. ZMQ.Socket push  = context.socket(ZMQ.PUSH);
  7. push.bind("ipc://fjs");
  8. for (int i = ; i < ; i++) {
  9. push.send("hello".getBytes());
  10. }
  11. push.close();
  12. context.term();
  13. }
  14. }

代码很简单吧,这里建立的是PUSH类型的socket,然后循环一千万次,给建立连接的worker发送数据,那么接下来来看看Worker(Pull)部分的代码:

  1. package pushpull13;
  2. import java.util.concurrent.atomic.AtomicInteger;
  3. import org.zeromq.ZMQ;
  4. public class Pull {
  5. public static void main(String args[]) {
  6. final AtomicInteger number = new AtomicInteger();
  7. for (int i = ; i < ; i++) {
  8. new Thread(new Runnable(){
  9. private int here = ;
  10. public void run() {
  11. // TODO Auto-generated method stub
  12. ZMQ.Context context = ZMQ.context();
  13. ZMQ.Socket pull = context.socket(ZMQ.PULL);
  14. pull.connect("ipc://fjs");
  15. //pull.connect("ipc://fjs");
  16. while (true) {
  17. String message = new String(pull.recv());
  18. int now = number.incrementAndGet();
  19. here++;
  20. if (now %  == ) {
  21. System.out.println(now + "  here is : " + here);
  22. }
  23. }
  24. }
  25. }).start();
  26. }
  27. }
  28. }

这里建立了5个worker,建立于与manager的连接,这里可能就会涉及到一个问题,manager是怎么将数据分发给这5个woker的呢,这里由于还没有看过实现代码,所以不知道这里具体是怎么个策略,不过后来测试数据之后发现各个worker之间收到的数量相差不大,可以猜测大概是轮询发送的。。。。

好了,到这里整个所谓的并行流水线的网络构建就算是差不多了。。。那么接下来来看另外一种,这里Push与Pull之间的对应关系是多个Push角色对应一个Pull角色,在ZeroMQ中,给这种结构取的名叫做公平队列,结构如下图:

这里也就是说将Pull角色理解为一个队列,各个Push角色不断的向这个队列中发送数据。。。这种结构应用场景也很广泛吧,例如分布式的数据统计啥的。。。

好了,来看看实现代码形式吧,先来看看Push:

  1. package pushpull31;
  2. import org.zeromq.ZMQ;
  3. public class Push {
  4. public static void main(String args[]) {
  5. for (int j = ; j < ; j++) {
  6. new Thread(new Runnable(){
  7. public void run() {
  8. // TODO Auto-generated method stub
  9. ZMQ.Context context = ZMQ.context();
  10. ZMQ.Socket push = context.socket(ZMQ.PUSH);
  11. push.connect("ipc://fjs");
  12. for (int i = ; i < ; i++) {
  13. push.send("hello".getBytes());
  14. System.out.println(i);
  15. }
  16. push.close();
  17. context.term();
  18. }
  19. }).start();;
  20. }
  21. }
  22. }

由于这部分push与pull的关系是多对一,所以选择在pull端建立监听,让push端来连接pull端。。。代码还是很简单的吧。。。

不过这里有一个比较有意思的现象,加入我们先运行push端,而这个时候pull端并没有运行的话,会发现send方法也会被执行,只不过执行一会以后就阻塞了,这里可以猜测,ZeroMQ的push端是先将数据写到了一个缓冲区,然后数据是从缓冲区中写到已经建立好连接的pull端的,当然这个只是猜测,具体是什么样子的以后看看源码的实现就知道了。。。。

好了,接下来来看看Pull端的代码实现吧:

  1. package pushpull31;
  2. import org.zeromq.ZMQ;
  3. public class Pull {
  4. public static void main(String args[]) {
  5. ZMQ.Context context = ZMQ.context();
  6. ZMQ.Socket pull = context.socket(ZMQ.PULL);
  7. pull.bind("ipc://fjs");
  8. int number = ;
  9. while (true) {
  10. String message = new String(pull.recv());
  11. number++;
  12. if (number %  == ) {
  13. System.out.println(number);
  14. }
  15. }
  16. }
  17. }

这里够简单吧,建立一个连接,然后循环里不断的接收数据就好了。。。

好了,到这里Push/Pull中的一对多与多对一的网络结构就算是讲完了吧,其实还有一种多对多的结构。。不过意思都差不多吧。。。。。就不具体弄出来了。。

ZeroMQ之Push与Pull (Java)的更多相关文章

  1. Push or Pull?

    采用Pull模型还是Push模型是很多中间件都会面临的一个问题.消息中间件.配置管理中心等都会需要考虑Client和Server之间的交互采用哪种模型: 服务端主动推送数据给客户端? 客户端主动从服务 ...

  2. Git的纯命令操作,Install,Clone , Commit,Push,Pull,版本回退,撤销更新,分支的创建/切换/更新/提交/合并,代码冲突

    Git的纯命令操作,Install,Clone , Commit,Push,Pull,版本回退,撤销更新,分支的创建/切换/更新/提交/合并,代码冲突 这篇是接着上篇分布式版本库--Windows下G ...

  3. Nginx-rtmp模块实现流媒体play、push、pull功能

    官方wiki:https://github.com/arut/nginx-rtmp-module#readme Nginx rtmp 功能特点 1.   支持音视频直播 2.   支持flv/mp4视 ...

  4. 【Bootstrapt】offset、push、pull

    实现方式的区别: col-md-offset-*,是利用margin-left实现的,col-md-push-*/col-md-pull-*是利用相对定位实现的. 效果的区别: col-md-offs ...

  5. 实战ZeroMQ的PUSH/PULL推拉模式

    原文地址: http://ju.outofmemory.cn/entry/235976

  6. oschina代码仓库远程push,pull免密实操总结

    刚做项目,用到开源中国(oschina)的git仓库,一个多月一直在痛苦的反复输密码的过程中度过.中间配置过几次免密登录,但总是时而登的上去,时而不行,大多数情况不行.近几日项目做完了,正好有空把这个 ...

  7. adb push和pull使用

    1.运行cmd> 进入adb.exe目录 2.>adb connect ip 3.>adb remount 4.>adb push 本地apk全路径 /system/app/ ...

  8. git向gitHub上push和pull数据.

    1.在gitHub上首先建立仓储.这个过程就不在啰嗦了. 2.注意上图中右下角的https,ssh等东西. 3.向git上传的工具特别多.我这里用的cygwin. 至于cygwin自己到网上去下载.安 ...

  9. 02_创建Git仓库,克隆仓库,git add,git commit,git push,git pull,同行冲突,不同行冲突的结局方案,git mergetool的使用

    1 创建Git资源库,残酷目录信息 创建git资源库的命令: git init –bare 仓库名称 (其中-bare表示的意思是空的库的意思) 进入E:\software\repository\gi ...

随机推荐

  1. likely &amp;&amp; unlikely in GCC

    在linux内核源码或一些比较成熟的c语言架构源码中,我们常会见到类似下面的代码: if (unlikely(!packet)) { return res_failed; } // OR if (li ...

  2. js验证姓名和身份证号

    js验证真实姓名,是用的unicode字符的来进行匹配,而中国人的姓名长度一般都是2-4,所以重复匹配{2,4}次 1.js验证真实姓名 1 var regName =/^[\u4e00-\u9fa5 ...

  3. Android中界面实现全屏显示的两种方式

    在开发android的应用当中,我们会遇到将一些界面设置为全屏显示的格式,有两种实现的方法.其一是在Java代码中实现,其二是在配置文件中实现. 1. 在Java代码中设置 super.onCreat ...

  4. Cannot create JDBC driver of class &#39;&#39; for connect URL &#39;jdbc:mysql://127.0.0.1:3306/test&#39;

    原来的配置如下: <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http ...

  5. trie树模板(统计难题)

    统计难题 Time Limit: 4000/2000 MS (Java/Others)    Memory Limit: 131070/65535 K (Java/Others)Total Submi ...

  6. DevOps之二 Maven的安装与配置

    CentOS7 安装Maven 一.安装Maven mkdir -p /usr/local/maven3wget http://mirrors.hust.edu.cn/apache/maven/mav ...

  7. 一次dropzone体验

    对于前端,本人不是太擅长,对于当前的一些网上的样例,也许是习武悟性太差,不是太透,所以只能通过blog的方式记录下一些武功套路,便于以后查询使用 首先,我们需要知道这个武功适应的战场. 什么是drop ...

  8. python 叠加装饰器详解

    def out1(func1): #7.func1=in2的内存地址,就是in2 print('out1') def in1(): #8.调用函数index() 因为函数在in1里,所以首先运行in1 ...

  9. Pandas的index属性

    我们在统计数据的长度或者个数,不用统计去专门获取数值,而是用index这个数据获取即可,DataFrame的index直接就是最前面的索引号,如果要统计列的个数,使用DataFrame.colums获 ...

  10. Ian Goodfellow——对抗神经网络之父

    争议.流派,有关GAN的一切:Ian Goodfellow Q&A:https://baijiahao.baidu.com/s?id=1595081179447191755&wfr=s ...