深入浅出Redis-redis哨兵集群.doc_第1页
深入浅出Redis-redis哨兵集群.doc_第2页
深入浅出Redis-redis哨兵集群.doc_第3页
深入浅出Redis-redis哨兵集群.doc_第4页
深入浅出Redis-redis哨兵集群.doc_第5页
已阅读5页,还剩5页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

深入浅出Redis-redis哨兵集群源码设计接收器Acceptor/* * * author seaboat * date 2016-08-25 * version 1.0 * email: 849586227 * blog: /wangyangzhizhou * This Acceptor provides a NIO mode to accept client sockets. */public final class Acceptor extends Thread private static final Logger LOGGER = LoggerFactory .getLogger(Acceptor.class); private final int port; private final Selector selector; private final ServerSocketChannel serverChannel; private long acceptCount; private static final AcceptIdGenerator IdGenerator = new AcceptIdGenerator(); private ReactorPool reactorPool; public Acceptor(ReactorPool reactorPool, String name, String bindIp, int port) throws IOException super.setName(name); this.port = port; this.selector = Selector.open(); this.serverChannel = ServerSocketChannel.open(); this.serverChannel.configureBlocking(false); this.serverChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true); this.serverChannel.setOption(StandardSocketOptions.SO_RCVBUF, 1024); this.serverChannel.bind(new InetSocketAddress(bindIp, port), 100); this.serverChannel.register(selector, SelectionKey.OP_ACCEPT); this.reactorPool = reactorPool; public int getPort() return port; public long getAcceptCount() return acceptCount; Override public void run() final Selector selector = this.selector; for (;) +acceptCount; try selector.select(1000L); Set keys = selector.selectedKeys(); try for (SelectionKey key : keys) if (key.isValid() & key.isAcceptable() accept(); else key.cancel(); finally keys.clear(); catch (Throwable e) LOGGER.warn(getName(), e); /* * Accept client sockets. */ private void accept() SocketChannel channel = null; try channel = serverChannel.accept(); channel.configureBlocking(false); channel.setOption(StandardSocketOptions.SO_REUSEADDR, true); channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true); channel.setOption(StandardSocketOptions.SO_RCVBUF, 1024); channel.setOption(StandardSocketOptions.SO_SNDBUF, 1024); channel.setOption(StandardSocketOptions.TCP_NODELAY, true); reactorPool.getNextReactor().postRegister( new FrontendConnection(channel, IdGenerator.getId(); catch (Throwable e) closeChannel(channel); LOGGER.warn(getName(), e); /* * Close a channel. * * param channel */ private static void closeChannel(SocketChannel channel) if (channel = null) return; Socket socket = channel.socket(); if (socket != null) try socket.close(); LOGGER.info(channel close.); catch (IOException e) LOGGER.warn(IOException happens when closing socket : , e); try channel.close(); catch (IOException e) LOGGER.warn(IOException happens when closing channel : , e); /* * ID Generator. */ private static class AcceptIdGenerator private static final long MAX_VALUE = 0xffffffffL; private long acceptId = 0L; private final Object lock = new Object(); private long getId() synchronized (lock) if (acceptId = MAX_VALUE) acceptId = 0L; return +acceptId; Reactor类/* * * author seaboat * date 2016-08-25 * version 1.0 * email: 849586227 * blog: /wangyangzhizhou * Reactor reacts all sockets. */public final class Reactor extends Thread private static final Logger LOGGER = LoggerFactory.getLogger(Reactor.class); private final String name; private final Selector selector; private final ConcurrentLinkedQueue queue; private long doCount; private Handler handler; public Reactor(String name, Handler handler) throws IOException = name; this.selector = Selector.open(); this.queue = new ConcurrentLinkedQueue(); this.handler = handler; final void postRegister(FrontendConnection frontendConnection) queue.offer(frontendConnection); this.selector.wakeup(); Override public void run() final Selector selector = this.selector; Set keys = null; for (;) +doCount; try selector.select(500L); register(selector); keys = selector.selectedKeys(); for (SelectionKey key : keys) FrontendConnection connection = null; Object attach = key.attachment(); if (attach != null & key.isValid() connection = (FrontendConnection) attach; if (key.isReadable() try connection.read(); handler.handle(connection); catch (IOException e) connection.close(); LOGGER.warn(IOException happens : , e); continue; catch (Throwable e) LOGGER.warn(Throwable happens : , e); continue; if (key.isValid() & key.isWritable() connection.write(); else key.cancel(); catch (Throwable e) LOGGER.warn(exception happens selecting : , e); finally if (keys != null) keys.clear(); private void register(Selector selector) FrontendConnection c = null; if (queue.isEmpty() return; while (c = queue.poll() != null) try c.register(selector); catch (Throwable e) LOGGER.warn(ClosedChannelException happens : , e); final Queue getRegisterQueue() return queue; final long getReactCount() return doCount; Reactor池/* * * author seaboat * date 2016-08-25 * version 1.0 * email: 849586227 * blog: /wangyangzhizhou * Reactor pool. Socket connections are polling to the reactor of this pool. */public class ReactorPool private final Reactor reactors; private volatile int nextReactor; private String name = reactor; public ReactorPool(int poolSize, Handler handler) throws IOException reactors = new ReactorpoolSize; for (int i = 0; i poolSize; i+) Reactor reactor = new Reactor(name + - + i,handler); reactorsi = reactor; reactor.start(); public Reactor getNextReactor() if (+nextReactor = reactors.length) nextReactor = 0; return reactorsnextReactor; 前端连接抽象/* * * author seaboat * date 2016-08-25 * version 1.0 * email: 849586227 * blog: /wangyangzhizhou * This is a abstraction of frontend. */public class FrontendConnection private static final Logger LOGGER = LoggerFactory .getLogger(FrontendConnection.class); private long id; private SocketChannel channel; private SelectionKey selectionKey; private ByteBuffer readBuffer; private static int BYFFERSIZE = 1024; protected ConcurrentLinkedQueue writeQueue = new ConcurrentLinkedQueue(); public FrontendConnection(SocketChannel channel, long id) this.id = id; this.channel = channel; public SocketChannel getChannel() return channel; public long getId() return id; public void read() throws IOException readBuffer = ByteBuffer.allocate(BYFFERSIZE); channel.read(readBuffer); public void close() throws IOException channel.close(); public void write() throws IOException ByteBuffer buffer; while (buffer = writeQueue.poll() != null) buffer.flip(); while (buffer.hasRemaining() int len = channel.write(buffer); if (len 0) throw new EOFException(); if (len = 0) selectionKerestOps(selectionKerestOps() | SelectionKey.OP_WRITE); selectionKey.selector().wakeup(); break; selectionKerestOps(selectionKerestOps() & SelectionKey.OP_WRITE); public ByteBuffer getReadBuffer() return readBuffer; public ConcurrentLinkedQueue getWriteQueue() return writeQueue; public void register(Selector selector) throws Throwable selectionKey = channel.register(selector, SelectionKey.OP_READ, this); 处理/* * * author seaboat * date 2016-08-25 * version 1.0 * email: 849586227 * blog: /wangyangzhizhou * This Handler will be call when there is a data having be ready. */public interface Handler public void handle(FrontendConnection connection);定义自己的处理/* * * author seaboat * date 2016-08-25 * version 1.0 * email: 849586227 * blog: /wangyangzhizhou * Demo. */public class MyHandler implements Handler private static final Logger LOGGER = LoggerFactory .getLogger(MyHandler.class); private long readSize; /* * The logic to deal with the received data. * * It means that reactor will trigger this function once the data is received. */ public void handle(FrontendConnection connection) Buffer buff = connection.getReadBuffer(); readSize = +Size + buff.position(); LOGGER.info(connection.getId() + connection has receive + readSize); if (readSize % 5 = 0) ByteBuffer sendBuffer = ByteBuffer.allocate(10); sendBuffer.wrap(hello.getBytes(); connection.getWriteQueue().add(sendBuffer); try connection.write(); catch (IOException e) LOGGER.warn(IOException, e); 启动/* * * author seaboat * date 2016-08-25 * version 1.0 * email: 849586227 * blog: /wangyangzhizhou * The reactor bootstrap. */public class Bootstrap private s

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

最新文档

评论

0/150

提交评论