`

Mina 之 入门篇(1)

    博客分类:
  • Java
阅读更多

最近自己在工作之余做一个金融类的项目(类似股票),采用mina开源框架进行服务端和客户端之间交互。开始学学mina。转入正题...

(1)首先需要引入以下几个包:

 

mina-core-2.0.x.jar,slf4j-api-1.6.x.jar,slf4j-log4j12-1.6.x.jar

这里需要注意下:slf4j-api-1.6.x.jar,slf4j-log4j12-1.6.x.jar 必须是同样二级版本号,即1.6版本,如果一个是1.5版本、一个是1.6版本,运行时会报错:java.lang.NoSuchMethodError: org.slf4j.helpers.MessageFormatter.format,原因是包冲突。

(2)创建一个主服务类:TradeSocket

public class TradeSocket extends SocketBase implements ISocket {
.....
@Override
    public boolean initSocket(Config config) {
        logger.info("正在初始化 tradeSocket");
        TradeSocketConfig tradeConfig = (TradeSocketConfig) config;
        this.setSocket_IP(tradeConfig.getIp());
        logger.info("正在初始化 tradeSocket的Ip");
        this.setSocket_port(tradeConfig.getPost());
        logger.info("正在初始化 tradeSocket的端口");
        this.setHandlerAdapter(new TradeSocketHandler());
        logger.info("正在初始化 tradeSocket的处理器");
        this.setAcceptor(new NioSocketAcceptor());
        logger.info("正在初始化 tradeSocket的接收器");

        logger.info("成功初始化 tradeSocket");
        logger.info("tradeSocket服务IP:" + this.getSocket_IP());
        logger.info("tradeSocket服务端口号:" + this.getSocket_port());
        return true;
    }

    @Override
    public boolean startSocket() {
        boolean result = false;
        if (this.isStart() == false) {
            initSocket(this.tradeSocketConfig);
            // 设置过滤器,ObjectSerializationCodecFactory可以对象序列化方式在服务端和客户端之间传递数据
            this.getAcceptor().getFilterChain().addLast(DGConstants.SERVER_SOCKET_FILTER_CHIN, new ProtocolCodecFilter(new ObjectSerializationCodecFactory()));
             // 设置读取数据的缓冲区大小
            this.getAcceptor().getSessionConfig().setReadBufferSize(2048);
            // 设置长连接
            this.getAcceptor().getSessionConfig().setKeepAlive(true);
            // 读写通道10秒内无操作进入空闲状态
            this.getAcceptor().getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);
            this.getAcceptor().setHandler(this.getHandlerAdapter());
            try {
                this.getAcceptor().bind(new InetSocketAddress(this.getSocket_IP(), this.getSocket_port()));
                this.setStart(true);
                result = true;
                logger.info("成功启动 tradeSocket");
            } catch (IOException e) {
                logger.error(e.getMessage(), e);
            }
        }
        return result;
    }

    @Override
    public boolean closeSocket() {
        this.getAcceptor().dispose();
        this.setStart(false);
        logger.error("关闭了交易Socket");
        return false;
    }

 (3)创建一个handler来管理事件,是业务处理关注点

 

 

public class TradeSocketHandler extends IoHandlerAdapter {
public static Logger logger = Logger.getLogger(TradeSocketHandler.class);

    @Override
    public void sessionCreated(IoSession session) throws Exception {
        super.sessionCreated(session);
        logger.info("服务端与客户端创建连接...");
    }

    @Override
    public void sessionOpened(IoSession session) throws Exception {
        InetSocketAddress remoteAddress = (InetSocketAddress) session.getRemoteAddress();
        String clientIp = remoteAddress.getAddress().getHostAddress();
        logger.info("服务器打开Session ID=" + session.getId());
        logger.info( "服务端与客户端["+session.getRemoteAddress()+"]连接打开...");

//        ConnectResponseMessage connectResponseMessage = new ConnectResponseMessage();
//        connectResponseMessage.setConnectResponse(ConnectResponseMessage.ConnectSuccess);
//        session.write(connectResponseMessage);
                
    }

    @Override
    public void sessionClosed(IoSession session) throws Exception {
        super.sessionClosed(session);
        logger.info("服务端与客户端断开连接");
    }

    @Override
    public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
        super.sessionIdle(session, status);
        logger.info("服务端进入空闲状态...");
    }

    @Override
    public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
        logger.error(cause.getMessage(), cause);
        super.exceptionCaught(session, cause);
    }

    @Override
    public void messageReceived(IoSession session, Object message) throws Exception {
        if (message instanceof BaseMessage) {
            BaseMessage baseMessage = (BaseMessage) message;
            logger.info("服务器收到的消息为:" + baseMessage);
            if (baseMessage.getMsgType() == MessageType.LoginMessage_Type) {
                LoginMessage loginMessage = (LoginMessage) baseMessage;
                UserConnect userConnect = new UserConnect();
                userConnect.setTradeSession(session);
                loginMessage.setUserid(loginMessage.getUserid() + session.hashCode());
                ServerCache cache = ServerCache.getInstance();
                cache.addUserConnect(loginMessage.getUserid(), userConnect);
                userConnect.getTradeSession().setAttribute("userID", loginMessage.getUserid());
                userConnect.getTradeSession().setAttribute("sessionID", session.hashCode());
                //这里主要是处理各种业务
                ServerReceivedMessageProcess.getInstance().processMessage(loginMessage);
            }
        }
    }

    @Override
    public void messageSent(IoSession session, Object message) throws Exception {
        super.messageSent(session, message);
    }

 ServerReceivedMessageProcess.getInstance().processMessage(.....);方法是根据从客户端传过来不同message,实例化出不同处理类。

(4)创建客户端主类

 

public class ClientTradeSocket extends ClientSocketBase {

    private static Logger logger = Logger.getLogger(ClientTradeSocket.class);
    private static ClientTradeSocket instance;

    public static ClientTradeSocket getInstance() {
        if (instance == null) {
            instance = new ClientTradeSocket();
        }
        return instance;
    }

    private ClientTradeSocket() {
    }

    @Override
    public boolean closedSocket() {
        this.getConnector().dispose();
        this.setStart(false);
        logger.info("客户关闭了与服务器的连接");
        return true;
    }

    @Override
    public void initSocket() {
        this.setHandlerAdapter(new ClientTradeHandler());
        this.setConnector(new NioSocketConnector());
        logger.info("客户初始化了与服务器的连接");
    }

    @Override
    public void startSocket() {
        this.initSocket();
        try {
            if (this.isStart() == false) {
                /**
                 * 创建接收数据的过滤器
                 */
                DefaultIoFilterChainBuilder chain = this.getConnector().getFilterChain();
                chain.addLast(DGConstants.SERVER_SOCKET_FILTER_CHIN, new ProtocolCodecFilter(new ObjectSerializationCodecFactory()));
                this.getConnector().setHandler(this.getHandlerAdapter());
                /**
                 * 设置连接超时的时间 为:一分钟 这个超时不可以过小
                 */
                this.getConnector().setConnectTimeoutMillis(ClientSocketBase.timeOutMillis);
                logger.info("ip/port:" + this.getServer_IP_Port() + "/" + this.getServer_Socket_Port());
                InetSocketAddress socketAddress = new InetSocketAddress(this.getServer_IP_Port(), this.getServer_Socket_Port());
                /**
                 * 建立连接
                 */
                this.connectFuture = this.getConnector().connect(socketAddress);
                //等待连接,这句话比较重要,如果没有,则后面的this.connectFuture.isConnected()为false的。
                //原因在于:connect方法是异步的,awaitUninterruptibly方式阻塞主线程,等待服务端返回,从而实现同步
                //如果不用awaitUninterruptibly,可以采用this.connectFuture.addListener方法进行回调
                 this.connectFuture.awaitUninterruptibly();
                //连接后,获得session
                if (this.connectFuture.isConnected()) {
                    logger.info("已经建立用户连接");
                    this.setStart(true);
                    logger.info("客户开启了与服务器的连接");
                    ConnectSession session = ConnectSession.getInstance();
                    //将session保存到缓存中
                    session.setTradeSession(this.connectFuture.getSession());
                } else {
                    logger.info("不能建立用户连接");
                    this.setStart(false);
                    ConnectSession session = ConnectSession.getInstance();
                    session.setTradeSession(null);
                }
            }

        } catch (Exception e) {
            logger.error(e.getMessage(), e);
            this.setStart(false);
        }

    }
}

 (5)客户端业务处理类handler

 

 

public class ClientTradeHandler extends IoHandlerAdapter {

    private static Logger logger = Logger.getLogger(ClientTradeHandler.class);
    private String user;


    public void setUser(String user) {
        this.user = user;
    }

    public String getUser() {
        return this.user;
    }

    /**
     *
     * 当接口中其他方法抛出异常未被捕获时触发此方法
     */
    @Override
    public void exceptionCaught(IoSession ioSession, Throwable throwable) throws Exception {
    }

    /**
     * 当接收到消息后被触发
     *
     */
    @Override
    public void messageReceived(IoSession ioSession, Object msg) throws Exception {
        BaseMessage message = (BaseMessage) msg;
        logger.info("接受到得消息为:" + message);
        if(message!=null) {
            if(message instanceof ConnectResponseMessage) {
                ConnectResponseMessage connectResponseMessage = (ConnectResponseMessage)message;
                if(connectResponseMessage.getConnectResponse() == ConnectResponseMessage.ConnectSuccess) {
                     ConnectSession clientConnect = ConnectSession.getInstance();
                    clientConnect.setTradeSession(ioSession);
                }
            } else {
                //根据服务端返回的message,实例化不同处理类
                ClientReceivedMessageProcess messageProcess = new ClientReceivedMessageProcess();
                messageProcess.processMessage(message);
            }
        }
       
    }

    /**
     * 当发送消息后被触发
     *
     */
    @Override
    public void messageSent(IoSession ioSession, Object msg) throws Exception {
    }

    /**
     * 当会话关闭时被触发
     *
     */
    @Override
    public void sessionClosed(IoSession ioSession) throws Exception {
        logger.info("与服务器断开连接!!");
//        EIMTrayIcon trayIcon = EIMTrayIcon.getInStance();
//        trayIcon.showIcon(EIMClientConfig.OffLineTryIcon_Type);
    }

    /**
     * 当会话创建时被触发
     *
     */
    @Override
    public void sessionCreated(IoSession ioSession) throws Exception {
    }

    /**
     *
     * 当会话空闲时被触发
     */
    @Override
    public void sessionIdle(IoSession ioSession, IdleStatus msg) throws Exception {
    }

    /**
     *
     * 当会话开始时被触发
     */
    @Override
    public void sessionOpened(IoSession ioSession) throws Exception {
        logger.info("会话已经打开");
    }
}

 

最后,可以进行运行测试了。。。。

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics