我们在实际的应用中最常遇到的场景如下:

A向B发送请求,B向A返回结果。。。。

但是这种场景就会很容易变成这个样子:

很多A向B发送请求,所以B要不断的处理这些请求,所以就会很容易想到对B进行扩展,由多个B来处理这些请求,那么这里就出现了另外一个问题:

B对请求处理的速度可能不同,那么B之间他们的负载也是不同的,那么应该如何对请求进行分发就成了一个比较重要的问题。。。也就变成了负载均衡的问题了。。。

其实最好的负载均衡解决方案也很简单:

绝大多数的任务都是独立的,这里中间层可以将A发送过来的请求先缓存起来,然后B的行为就是主动的找中间层获取请求处理,然后返回,再获取。。。。也就是中间层只是做一个请求的缓存。。。由B自己来掌控合适来处理请求,也就是当B已经处理完了任务之后,自己去主动获取。。。而不是由中间层自己去主动分发。。。。

嗯,那么在ZeroMQ中应该如何实现这种模式呢,恩其实还挺简单的,如下图:

这里由两个Router来作为中间层,具体的数据流程如下:

(1)中间层启动,Worker连接Backend,向其发送Request请求(ready),这个时候中间层就能够知道哪一个worker现在是空闲的,将其保存起来(放到worker队列),可以处理请求

worker的执行流程就是send(发送ready)--->recv(获取请求),

(2)Client端向Fronted发送请求,中间层将请求缓存到一个任务队列

(3)中间层从任务队里里面取出一个任务,将其发送给worker队列中的一个worker,并将其从woker队列中移除

(4)worker处理完以后,发送执行结果,也就是send,中间层收到woker的数据 之后,将其发送给相应的client,然后在讲这个worker放到worker队列中,表示当前这个worker可用。。。。

好了,前面就基本上介绍了整个结构用ZeroMQ应该是怎么实现的,那么接下来就直接来上代码吧:

  1. package balance;
  2. import java.util.LinkedList;
  3. import org.zeromq.ZFrame;
  4. import org.zeromq.ZMQ;
  5. import org.zeromq.ZMsg;
  6. public class Balance {
  7. public static class Client {
  8. public void start() {
  9. new Thread(new Runnable(){
  10. public void run() {
  11. // TODO Auto-generated method stub
  12. ZMQ.Context context = ZMQ.context();
  13. ZMQ.Socket socket = context.socket(ZMQ.REQ);
  14. socket.connect("ipc://front");  //连接router,想起发送请求
  15. for (int i = ; i < ; i++) {
  16. socket.send("hello".getBytes(), );  //发送hello请求
  17. String bb = new String(socket.recv());  //获取返回的数据
  18. System.out.println(bb);
  19. }
  20. socket.close();
  21. context.term();
  22. }
  23. }).start();
  24. }
  25. }
  26. public static class Worker {
  27. public void start() {
  28. new Thread(new Runnable(){
  29. public void run() {
  30. // TODO Auto-generated method stub
  31. ZMQ.Context context = ZMQ.context();
  32. ZMQ.Socket socket = context.socket(ZMQ.REQ);
  33. socket.connect("ipc://back");  //连接,用于获取要处理的请求,并发送回去处理结果
  34. socket.send("ready".getBytes());  //发送ready,表示当前可用
  35. while (!Thread.currentThread().isInterrupted()) {
  36. ZMsg msg = ZMsg.recvMsg(socket);  //获取需要处理的请求,其实这里msg最外面的标志frame是router对分配给client的标志frame
  37. ZFrame request = msg.removeLast();   //最后一个frame其实保存的就是实际的请求数据,这里将其移除,待会用新的frame代替
  38. ZFrame frame = new ZFrame("hello fjs".getBytes());
  39. msg.addLast(frame);  //将刚刚创建的frame放到msg的最后,worker将会收到
  40. msg.send(socket);  //将数据发送回去
  41. }
  42. socket.close();
  43. context.term();
  44. }
  45. }).start();
  46. }
  47. }
  48. public static class Middle {
  49. private LinkedList<ZFrame> workers;
  50. private LinkedList<ZMsg> requests;
  51. private ZMQ.Context context;
  52. private ZMQ.Poller poller;
  53. public Middle() {
  54. this.workers = new LinkedList<ZFrame>();
  55. this.requests = new LinkedList<ZMsg>();
  56. this.context = ZMQ.context();
  57. this.poller = new ZMQ.Poller();
  58. }
  59. public void start() {
  60. ZMQ.Socket fronted = this.context.socket(ZMQ.ROUTER);  //创建一个router,用于接收client发送过来的请求,以及向client发送处理结果
  61. ZMQ.Socket backend = this.context.socket(ZMQ.ROUTER);  //创建一个router,用于向后面的worker发送数据,然后接收处理的结果
  62. fronted.bind("ipc://front");  //监听,等待client的连接
  63. backend.bind("ipc://back");  //监听,等待worker连接
  64. //创建pollItem
  65. ZMQ.PollItem fitem = new ZMQ.PollItem(fronted, ZMQ.Poller.POLLIN);
  66. ZMQ.PollItem bitem = new ZMQ.PollItem(backend, ZMQ.Poller.POLLIN);
  67. this.poller.register(fitem);  //注册pollItem
  68. this.poller.register(bitem);
  69. while (!Thread.currentThread().isInterrupted()) {
  70. this.poller.poll();
  71. if (fitem.isReadable()) {  //表示前面有请求发过来了
  72. ZMsg msg = ZMsg.recvMsg(fitem.getSocket());  //获取client发送过来的请求,这里router会在实际请求上面套一个连接的标志frame
  73. this.requests.addLast(msg);   //将其挂到请求队列
  74. }
  75. if (bitem.isReadable()) {  //这里表示worker发送数据过来了
  76. ZMsg msg = ZMsg.recvMsg(bitem.getSocket());  //获取msg,这里也会在实际发送的数据前面包装一个连接的标志frame
  77. //这里需要注意,这里返回的是最外面的那个frame,另外它还会将后面的接着的空的标志frame都去掉
  78. ZFrame workerID = msg.unwrap();  //把外面那层包装取下来,也就是router对连接的标志frame
  79. this.workers.addLast(workerID);  //将当前的worker的标志frame放到worker队列里面,表示这个worker可以用了
  80. ZFrame readyOrAddress = msg.getFirst(); //这里获取标志frame后面的数据,如果worker刚刚启动,那么应该是发送过来的ready,
  81. if (new String(readyOrAddress.getData()).equals("ready")) {  //表示是worker刚刚启动,发过来的ready
  82. msg.destroy();
  83. } else {
  84. msg.send(fronted);  //表示是worker处理完的返回结果,那么返回给客户端
  85. }
  86. }
  87. while (this.workers.size() >  && this.requests.size() > ) {
  88. ZMsg request = this.requests.removeFirst();
  89. ZFrame worker = this.workers.removeFirst();
  90. request.wrap(worker);  //在request前面包装一层,把可以用的worker的标志frame包装上,这样router就会发给相应的worker的连接
  91. request.send(backend);  //将这个包装过的消息发送出去
  92. }
  93. }
  94. fronted.close();
  95. backend.close();
  96. this.context.term();
  97. }
  98. }
  99. public static void main(String args[]) {
  100. Worker worker = new Worker();
  101. worker.start();
  102. Client client = new Client();
  103. client.start();
  104. Middle middle = new Middle();
  105. middle.start();
  106. }
  107. }

其实根据前面已经提出来的实现原理来编写代码还是比较顺利的,中途也没有遇到什么问题。。。不过要理解这部分要比较了解ZeroMQ的数据格式才行

ZeroMQ(java)之负载均衡的更多相关文章

  1. Java 客户端负载均衡

    客户端侧负载均衡 在下图中,负载均衡能力算法是由内容中心提供,内容中心相对于用户中心来说,是用户中心的客户端,所以又被称为客户端侧负载均衡 自定义实现Client Random负载均衡 获取所有的服务 ...

  2. 搭建nginx+tomcat+Java的负载均衡环境

    转载  未测 供参考 另外这篇文章也不错.http://blog.csdn.net/wang379275614/article/details/47778201 一.简介: Tomcat在高并发环境下 ...

  3. 【转】搭建nginx+tomcat+Java的负载均衡环境

    一.简介: Tomcat在高并发环境下处理动态请求时性能很低,而在处理静态页面更加脆弱.虽然Tomcat的最新版本支持epoll,但是通过Nginx来处理静态页面要比通过Tomcat处理在性能方面好很 ...

  4. Centos6.5生成环境配置--nginx1.9 + PHP+可多个tomcat(多个端口)+多域名+java web 负载均衡

    安装n p 参考: CentOS6.5搭建LNMP http://www.cnblogs.com/xiaoit/p/3991037.html http://blog.csdn.net/keyunq/a ...

  5. 15套java互联网架构师、高并发、集群、负载均衡、高可用、数据库设计、缓存、性能优化、大型分布式 项目实战视频教程

    * { font-family: "Microsoft YaHei" !important } h1 { color: #FF0 } 15套java架构师.集群.高可用.高可扩 展 ...

  6. java架构师负载均衡、高并发、nginx优化、tomcat集群、异步性能优化、Dubbo分布式、Redis持久化、ActiveMQ中间件、Netty互联网、spring大型分布式项目实战视频教程百度网盘

    15套Java架构师详情 * { font-family: "Microsoft YaHei" !important } h1 { background-color: #006; ...

  7. 高级java高并发,高性能,分布式,高可用,负载均衡,系统架构实战

    java架构师.集群.高可用.高可扩 展.高性能.高并发.性能优化.Spring boot.Redis.ActiveMQ.Nginx.Mycat.Netty.Jvm大型分布 式项目实战 视频课程包含: ...

  8. 几种简单的负载均衡算法及其Java代码实现

    什么是负载均衡 负载均衡,英文名称为Load Balance,指由多台服务器以对称的方式组成一个服务器集合,每台服务器都具有等价的地位,都可以单独对外提供服务而无须其他服务器的辅助.通过某种负载分担技 ...

  9. 从零开始学 Java - 利用 Nginx 负载均衡实现 Web 服务器更新不影响访问

    还记得那些美妙的夜晚吗 你洗洗打算看一个小电影就睡了,这个时候突然想起来今天晚上是服务器更新的日子,你要在凌晨时分去把最新的代码更新到服务器,以保证明天大家一觉醒来打开网站,发现昨天的 Bug 都不见 ...

随机推荐

  1. WebConfig节点详解

    <!-- Web.config配置文件详解(新手必看) 花了点时间整理了一下ASP.NET Web.config配置文件的基本使用方法. 很适合新手参看,由于Web.config在使用很灵活,可 ...

  2. 【BZOJ 1019】【SHOI2008】汉诺塔(待定系数法递推)

    1019: [SHOI2008]汉诺塔 Time Limit: 1 Sec  Memory Limit: 162 MBSubmit: 559  Solved: 341[Submit][Status] ...

  3. 机器学习中的矩阵方法(附录A): 病态矩阵与条件数

    1. 病态系统 现在有线性系统: Ax = b, 解方程 很容易得到解为: x1 = -100, x2 = -200. 如果在样本采集时存在一个微小的误差,比如,将 A 矩阵的系数 400 改变成 4 ...

  4. [Everyday Mathematics]20150104

    设 $a>0$, $$\bex x_1=1,\quad x_{n+1}=x_n+an\prod_{i=1}^n x_i^{-\frac{1}{n}}. \eex$$ 试证: $$\bex \vl ...

  5. WCF,WebAPI,WCFREST和WebService的区别

    Web ServiceIt is based on SOAP and return data in XML form.It support only HTTP protocol.It is not o ...

  6. (转)VmWare下安装CentOS7图文安装教程

    场景:克服安装Linux的恐惧,想装就装.在一篇博客中看到的,很有借鉴意义   欢迎转载,但请保留文章原始出处→_→ 生命壹号:http://www.cnblogs.com/smyhvae/ 文章来源 ...

  7. php提供的sapi有哪些?CGI、FastCGI、php-fpm、php-cgi解释

    一.前言 一直对PHP的sapi是什么东西好奇,在网上一查都是各种说fpm cgi fastcgi php-cgi 直到看了鸟哥的这篇文章介绍戳这里,看到源码下的sapi目录才有所了解. 二.sapi ...

  8. 3,列表的 深 浅 copy

    如果列表只有一层,深浅copy是一样一样的,没有什么区别,你修改了copy后的列表,copy前的列表并不会随之改变. 如果列表中嵌套这列表,这是你修改了copy后第二层列表里面的元素,copy前第二层 ...

  9. CF1131E String Multiplication(???)

    这题难度2200,应该值了. 题目链接:CF原网 题目大意:定义两个字符串 $s$ 和 $t$($s$ 的长度为 $m$)的乘积为 $t+s_1+t+s_2+\dots+t+s_m+t$.定义一个字符 ...

  10. micrometer自定义metrics

    micrometer提供了基于Java的monitor facade,其与springboot应用和prometheus的集成方式如下图展示 上图中展示的很清楚,应用通过micrometer采集和暴露 ...