<dl id="opymh"></dl>

<div id="opymh"></div>
      <div id="opymh"><tr id="opymh"></tr></div>

        <em id="opymh"><ins id="opymh"><mark id="opymh"></mark></ins></em><sup id="opymh"><menu id="opymh"></menu></sup>

        <em id="opymh"></em>

        <em id="opymh"><ol id="opymh"></ol></em>

              频道栏目
              首页 > 考试 > 等级考试 > 二级 > JAVA > 正文

              ZeroMQ(java)的负载均衡

              2018-07-25 09:55:11         来源£º人工智能  
              收藏   我要投稿

              我们在实际的应用中最常遇到的场景如下£º

              \

              A向B发送请求£¬B向A返回结果¡£¡£¡£¡£

              但是这种场景就会很容易变成这个样子£º

              \

              很多A向B发送请求£¬所以B要不断的处理这些请求£¬所以就会很容易想到对B进行扩展£¬由多个B来处理这些请求£¬那么这里就出现了另外一个问题£º

              B对请求处理的速度可能不同£¬那么B之间他们的负载也是不同的£¬那么应该如何对请求进行分发就成了一个比?#29616;?#35201;的问题¡£¡£¡£也就变成了负载均衡的问题了¡£¡£¡£

              其实最好的负载均衡解决方案也很简单£º

              绝大多数的任务?#38469;?#29420;立的£¬这里中间层可以将A发送过来的请求先缓存起来£¬?#32531;óB的行为就是主动的找中间层获取请求处理£¬?#32531;?#36820;回£¬再获取¡£¡£¡£¡£也就是中间层只是做一个请求的缓存¡£¡£¡£由B自己来掌控合适来处理请求£¬也就是当B已经处理完了任务之后£¬自己去主动获取¡£¡£¡£而不是由中间层自己去主动分发¡£¡£¡£¡£

              嗯£¬那么在ZeroMQ中应该如何实现这种模式呢£¬恩其实还挺简单的£¬如下图£º

              \

              这里由两个Router来作为中间层£¬具体的数据流程如下£º

              £¨1£©中间层启动£¬Worker连接Backend£¬向其发送Request请求£¨ready£©£¬这个时候中间层就能够知道哪一个worker现在是空闲的£¬将其保存起来£¨放到worker队列£©£¬可?#28304;?#29702;请求

              worker的执行流程就是send£¨发送ready£©---recv£¨获取请求£©£¬

              £¨2£©Client端向Fronted发送请求£¬中间层将请求缓存到一个任务队列

              £¨3£©中间层从任务队里里面取出一个任务£¬将其发送给worker队列中的一个worker£¬并将其从woker队列中移除

              £¨4£©worker处理完?#38498;ó£?#21457;送执行结果£¬也就是send£¬中间层收到woker的数据 之后£¬将其发送给相应的client£¬?#32531;?#22312;讲这个worker放到worker队列中£¬表示当前这个worker可用¡£¡£¡£¡£

              好了£¬前面就基本上介绍了整个结构用ZeroMQ应该是怎么实现的£¬那么接下来就直接来上代码吧£º

              package balance;

              import java.util.LinkedList;

              import org.zeromq.ZFrame;

              import org.zeromq.ZMQ;

              import org.zeromq.ZMsg;

              public class Balance {

              public static class Client {

              public void start() {

              new Thread(new Runnable(){

              public void run() {

              // TODO Auto-generated method stub

              ZMQ.Context context = ZMQ.context(1);

              ZMQ.Socket socket = context.socket(ZMQ.REQ);

              socket.connect("ipc://front"); //连接router£¬想起发送请求

              for (int i = 0; i 1000; i++) {

              socket.send("hello".getBytes(), 0); //发送hello请求

              String bb = new String(socket.recv()); //获取返回的数据

              System.out.println(bb);

              }

              socket.close();

              context.term();

              }

              }).start();

              }

              }

              public static class Worker {

              public void start() {

              new Thread(new Runnable(){

              public void run() {

              // TODO Auto-generated method stub

              ZMQ.Context context = ZMQ.context(1);

              ZMQ.Socket socket = context.socket(ZMQ.REQ);

              socket.connect("ipc://back"); //连接£¬用于获取要处理的请求£¬并发送回去处理结果

              socket.send("ready".getBytes()); //发送ready£¬表示当前可用

              while (!Thread.currentThread().isInterrupted()) {

              ZMsg msg = ZMsg.recvMsg(socket); //获取需要处理的请求£¬其实这里msg最外面的标志frame是router对分配给client的标志frame

              ZFrame request = msg.removeLast(); //最后一个frame其实保存的就是实际的请求数据£¬这里将其移除£¬待会用新的frame代替

              ZFrame frame = new ZFrame("hello fjs".getBytes());

              msg.addLast(frame); //将刚刚创建的frame放到msg的最后£¬worker将会收到

              msg.send(socket); //将数据发送回去

              }

              socket.close();

              context.term();

              }

              }).start();

              }

              }

              public static class Middle {

              private LinkedList workers;

              private LinkedList requests;

              private ZMQ.Context context;

              private ZMQ.Poller poller;

              public Middle() {

              this.workers = new LinkedList();

              this.requests = new LinkedList();

              this.context = ZMQ.context(1);

              this.poller = new ZMQ.Poller(2);

              }

              public void start() {

              ZMQ.Socket fronted = this.context.socket(ZMQ.ROUTER); //创建一个router£¬用于接收client发送过来的请求£¬以及向client发送处理结果

              ZMQ.Socket backend = this.context.socket(ZMQ.ROUTER); //创建一个router£¬用于向后面的worker发送数据£¬?#32531;?#25509;收处理的结果

              fronted.bind("ipc://front"); //监听£¬等待client的连接

              backend.bind("ipc://back"); //监听£¬等待worker连接

              //创建pollItem

              ZMQ.PollItem fitem = new ZMQ.PollItem(fronted, ZMQ.Poller.POLLIN);

              ZMQ.PollItem bitem = new ZMQ.PollItem(backend, ZMQ.Poller.POLLIN);

              this.poller.register(fitem); //注册pollItem

              this.poller.register(bitem);

              while (!Thread.currentThread().isInterrupted()) {

              this.poller.poll();

              if (fitem.isReadable()) { //表示前面有请求发过来了

              ZMsg msg = ZMsg.recvMsg(fitem.getSocket()); //获取client发送过来的请求£¬这里router会在实际请求上面套一个连接的标志frame

              this.requests.addLast(msg); //将其挂到请求队列

              }

              if (bitem.isReadable()) { //这里表示worker发送数据过来了

              ZMsg msg = ZMsg.recvMsg(bitem.getSocket()); //获取msg£¬这里也会在实际发送的数据前面包装一个连接的标志frame

              //这里需要注意£¬这里返回的是最外面的那个frame£¬另外它还会将后面的接着的空的标志frame都去掉

              ZFrame workerID = msg.unwrap(); //把外面那层包装取下来£¬也就是router对连接的标志frame

              this.workers.addLast(workerID); //将当前的worker的标志frame放到worker队列里面£¬表示这个worker可以用了

              ZFrame readyOrAddress = msg.getFirst(); //这里获取标志frame后面的数据£¬如果worker刚?#25484;?#21160;£¬那么应该是发送过来的ready£¬

              if (new String(readyOrAddress.getData()).equals("ready")) { //表示是worker刚?#25484;?#21160;£¬发过来的ready

              msg.destroy();

              } else {

              msg.send(fronted); //表示是worker处理完的返回结果£¬那么返回给客户端

              }

              }

              while (this.workers.size() 0 && this.requests.size() 0) {

              ZMsg request = this.requests.removeFirst();

              ZFrame worker = this.workers.removeFirst();

              request.wrap(worker); //在request前面包装一层£¬把可以用的worker的标志frame包装上£¬这样router就会发给相应的worker的连接

              request.send(backend); //将这个包装过的消息发送出去

              }

              }

              fronted.close();

              backend.close();

              this.context.term();

              }

              }

              public static void main(String args[]) {

              Worker worker = new Worker();

              worker.start();

              Client client = new Client();

              client.start();

              Middle middle = new Middle();

              middle.start();

              }

              }

              其实根据前面已经提出来的实现原理来编写代码还是比较顺利的£¬中途也没有遇到什么问题¡£¡£¡£不过要理解这部分要比较了解ZeroMQ的数据格式才行

              上一篇£ºJAVA可变参数的学习?#22363;?/a>
              下一篇£º以阿里HBase的GC优化实践为例 -如何降低90%Java垃圾回收时间£¿
              相关文章
              图文推荐

              关于我们 | 联系我们 | 广告服务 | 投资合作 | 版权申明 | 在线帮助 | 网站地图 | 作品发布 | Vip?#38469;?#22521;训 | 举报中心

              版权所有: 红黑联盟--致力于做实用的IT?#38469;?#23398;习网站

              ¼«ËÙ·ÉͧºÃ¼Ù
              <dl id="opymh"></dl>

              <div id="opymh"></div>
                  <div id="opymh"><tr id="opymh"></tr></div>

                    <em id="opymh"><ins id="opymh"><mark id="opymh"></mark></ins></em><sup id="opymh"><menu id="opymh"></menu></sup>

                    <em id="opymh"></em>

                    <em id="opymh"><ol id="opymh"></ol></em>

                          <dl id="opymh"></dl>

                          <div id="opymh"></div>
                              <div id="opymh"><tr id="opymh"></tr></div>

                                <em id="opymh"><ins id="opymh"><mark id="opymh"></mark></ins></em><sup id="opymh"><menu id="opymh"></menu></sup>

                                <em id="opymh"></em>

                                <em id="opymh"><ol id="opymh"></ol></em>