基于Mina的配置中心(四)
继续编写Server
端代码,接下来是handler
消息处理器 MinaServerHandler
在IoHandlerAdapter
中有以下方法。
看名字就可以看出,有处理异常、接收消息,发送消息、连接打开,连接关闭、进入空闲状态等方法。
我们可以根据自己的实际情况复写父类中的方法。在MinaServerHandler
中,我们复写了以下方法:
最好复写exceptionCaught
这个方法,不然出现异常,连接关闭的时候,你可能无从下手。
作为服务器,在messageReceived
中处理客户端发出的请求。当客户端请求一次后,会把客户端连接信息保存下来,用来推送数据。
可以调用messageSent
这个方法,向客户端发送信息。
在sessionIdle
这个方法中,处理连接超时的情况,比如在一定时间内没有发送心跳包,关闭客户端连接。
package com.lww.mina.handler;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.lww.mina.dao.MessageDao;
import com.lww.mina.domain.Message; import com.lww.mina.protocol.MessagePack; import com.lww.mina.session.SessionManager; import com.lww.mina.util.Const; import com.lww.mina.util.SpringBeanFactoryUtils; import java.net.InetSocketAddress; import java.net.SocketAddress; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.mina.core.service.IoHandlerAdapter; import org.apache.mina.core.session.IdleStatus; import org.apache.mina.core.session.IoSession; /** * 处理客户端发送的消息 * * @author lww * @date 2020-07-06 22:53 */ @Slf4j public class MinaServerHandler extends IoHandlerAdapter { @Override public void sessionCreated(IoSession session) { InetSocketAddress isa = (InetSocketAddress) session.getRemoteAddress(); String ip = isa.getAddress().getHostAddress(); session.setAttribute("ip", ip); log.info("来自" + ip + " 的终端上线,sessionId:" + session.getId()); } @Override public void sessionClosed(IoSession session) { log.info(session.getAttribute(Const.SESSION_KEY) + " nid: " + session.getId() + "sessionClosed "); // 移除 属性 session.removeAttribute(Const.SESSION_KEY); // 移除超时属性 session.removeAttribute(Const.TIME_OUT_KEY); String key = (String) session.getAttribute(Const.SESSION_KEY); if (key != null) { SessionManager.removeSession(key); } session.closeNow(); } @Override public void sessionIdle(IoSession session, IdleStatus status) { if (session.getAttribute(Const.TIME_OUT_KEY) == null) { session.closeNow(); log.error(session.getAttribute(Const.SESSION_KEY) + " nid: " + session.getId() + " time_out_key null"); return; } try { int isTimeoutNum = (int) session.getAttribute(Const.TIME_OUT_KEY); isTimeoutNum++; // 没有超过最大次数,超时次数加1 if (isTimeoutNum < Const.TIME_OUT_NUM) { session.setAttribute(Const.TIME_OUT_KEY, isTimeoutNum); } else { // 超过最大次数,关闭会话连接 String key = (String) session.getAttribute(Const.SESSION_KEY); // 移除device属性 session.removeAttribute(Const.SESSION_KEY); // 移除超时属性 session.removeAttribute(Const.TIME_OUT_KEY); SessionManager.removeSession(key); session.closeOnFlush(); log.info("client user: " + key + " more than " + Const.TIME_OUT_NUM + " times have no response, connection closed!"); } } catch (Exception e) { log.error(session.getAttribute(Const.SESSION_KEY) + " nid: " + session.getId() + e.getMessage()); session.closeNow(); } } @Override public void exceptionCaught(IoSession session, Throwable cause) { log.error("终端用户:{} 连接发生异常,即将关闭连接,原因:{}", session.getAttribute(Const.SESSION_KEY), cause); } @Override public void messageReceived(IoSession session, Object message) { SocketAddress remoteAddress = session.getRemoteAddress(); log.info("server received MinaServerHandler_messageReceived_remoteAddress:{}", remoteAddress); MessagePack pack = (MessagePack) message; MessagePack response; String body = pack.getBody(); if (StringUtils.isBlank(body)) { log.error("ServerHandler_messageReceived_body:{}", body); response = new MessagePack(Const.BASE, "body empty"); session.write(response); session.close(false); return; } Message msg = JSONObject.parseObject(body, Message.class); if (msg == null) { log.error("ServerHandler_messageReceived_body:{}", body); response = new MessagePack(Const.BASE, "message empty"); session.write(response); session.close(false); return; } if (Const.CONF.equalsIgnoreCase(msg.getPropertyValue()) && pack.getModule() == Const.BASE) { log.info("ServerHandler_messageReceived_Susccess:{}", msg.getPropertyValue()); response = new MessagePack(pack.getModule(), body); session.write(response); return; } final String key = remoteAddress.toString(); //存储的key session.setAttribute(Const.SESSION_KEY, key); // 超时次数设为0 session.setAttribute(Const.TIME_OUT_KEY, 0); synchronized (this) { IoSession oldSession = SessionManager.getSession(key); if (oldSession != null && !oldSession.equals(session)) { // 移除key属性 oldSession.removeAttribute(Const.SESSION_KEY); // 移除超时时间 oldSession.removeAttribute(Const.TIME_OUT_KEY); // 替换oldSession SessionManager.replaceSession(key, session); session.closeOnFlush(); log.info("oldSession close!"); } if (oldSession == null) { SessionManager.addSession(key, session); } log.info("bind success: " + session.getRemoteAddress()); } MessageDao minaMessageDao = SpringBeanFactoryUtils.getApplicationContext().getBean(MessageDao.class); log.info("ServerHandler_messageReceived_projectName:{}, propertityValue:{}, envValue:{}", msg.getProjectName(), msg.getPropertyValue(), msg.getEnvValue()); Message configMessage = minaMessageDao.selectOne(new QueryWrapper<Message>().lambda() .eq(Message::getProjectName, msg.getProjectName()) .eq(Message::getPropertyValue, msg.getPropertyValue()) .eq(Message::getEnvValue, msg.getEnvValue())); if (configMessage == null && !msg.getPropertyValue().equalsIgnoreCase(Const.CONF)) { log.error(session.toString() + "select null"); response = new MessagePack(Const.BASE, "select error"); session.write(response); session.closeOnFlush(); } else { // 设置session key if (configMessage != null) { configMessage.setRemoteAddress(key); // AR模式 boolean updateSessionKey = configMessage.updateById(); log.info("ServerHandler_messageReceived_updateSessionKey:{}", updateSessionKey); } log.info(session.toString() + " succeed!"); response = new MessagePack(pack.getModule(), JSONObject.toJSONString(configMessage)); session.write(response); } } @Override public void messageSent(IoSession session, Object message) { if (message instanceof Message) { Message minaMessage = (Message) message; session.write(new MessagePack(Const.CONFIG_MANAGE, JSONObject.toJSONString(minaMessage))); } session.setAttribute(Const.TIME_OUT_KEY, 0); log.info("发送消息成功"); } }
Session管理 SessionManager
上面的代码里出现了这个类SessionManager
,这是一个管理Session
的工具类,我尝试过把Session
转为Json存储,可是会报异常,无法转为Json,序列化也不可行。
但是看文档,又是可以持久化的。
org.apache.mina.core.service.AbstractIoService#initSession
org.apache.mina.core.session.IoSessionDataStructureFactory
看代码也是定义了一个Map:org.apache.mina.core.session.IoSessionAttributeMap
, 使用这个类管理Session。
不过因为我们是配置中心,所以可以使用Map来存储到内存中,因为客户端数量不会很多。我用了一个线程安全的ConcurrentHashMap
来存储Session
对象,key就是客户端的连接信息。
这个样子:/127.0.0.1:55618
。
package com.lww.mina.session;
import com.lww.mina.util.Const;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.mina.core.session.IoSession; /** * @author lww * @date 2020-07-06 23:21 */ public class SessionManager { /** * 存放session的线程安全的map集合 */ private static ConcurrentHashMap<String, IoSession> sessions = new ConcurrentHashMap<>(); /** * 线程安全的自增类,用于统计连接数 */ private static final AtomicInteger CONNECTIONS_COUNTER = new AtomicInteger(0); /** * 添加session */ public static void addSession(String account, IoSession session) { sessions.put(account, session); CONNECTIONS_COUNTER.incrementAndGet(); } /** * 获取session */ public static IoSession getSession(String key) { return sessions.get(key); } /** * 替换session,通过key */ public static void replaceSession(String key, IoSession session) { sessions.put(key, session); } /** * 移除session通过key */ public static void removeSession(String key) { sessions.remove(key); CONNECTIONS_COUNTER.decrementAndGet(); } /** * 移除session通过session */ public static void removeSession(IoSession session) { String key = (String) session.getAttribute(Const.SESSION_KEY); removeSession(key); } public static ConcurrentHashMap<String, IoSession> getSessions() { return sessions; } }
客户端每次连接服务器,都会在message
表中更新连接信息,当连接不断,IP和端口是不会改变的,服务器也可以拿着这个Session
和客户端通信,而且客户端断开重连,客户端的端口每次都可能是不一样的,存在Map
中也可以方便的管理。
配置更新 主动推送 MessageChangeListener
可能有些人还记得我之前写过的 SpringBoot事件发布与订阅 ,在框架中,这个确实很常用,SpringBoot的源码中到处用到了事件的发布与订阅。
先说一下配置更新推送的原理,我是在更新的时候发布了一个事件
SpringBeanFactoryUtils.getApplicationContext().publishEvent(new MessageEvent(afterMessage));
事件类
package com.lww.mina.event;
import com.lww.mina.domain.Message;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationEvent;
/** * 消息更新事件 * * @author lww * @date 2020-07-07 00:28 */ @Slf4j public class MessageEvent extends ApplicationEvent { private Message message; public MessageEvent(Message message) { super(message); log.info("发布消息 MessageEvent_MessageEvent_message:{}", message); this.message = message; } public Message getMessage() { return message; } }
然后用MessageChangeListener
监听这个事件,从message
中取出客户端连接信息,然后作为key
从map
中取到对应的Session
,通过Session
发送消息给客户端。
MessageChangeListener
package com.lww.mina.listener;
import com.lww.mina.domain.Message;
import com.lww.mina.event.MessageEvent;
import com.lww.mina.handler.MinaServerHandler;
import com.lww.mina.session.SessionManager; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.mina.core.session.IoSession; import org.springframework.context.event.EventListener; import org.springframework.stereotype.Component; import org.springframework.util.Assert; /** * @author lww * @date 2020-07-07 00:30 */ @Slf4j @Component public class MessageChangeListener { @EventListener public void onApplicationEvent(MessageEvent event) { log.info("接收事件 MessageChangeListener_onApplicationEvent_event:{}", event); //推送配置 Message message = event.getMessage(); Assert.isTrue(StringUtils.isNotBlank(message.getRemoteAddress()), "初始配置无法发送配置信息,需要客户端连接一次后,获取客户端地址端口等信息!"); try { IoSession session = SessionManager.getSession(message.getRemoteAddress()); if (session != null) { MinaServerHandler handler = new MinaServerHandler(); handler.messageSent(session, message); } } catch (Exception e) { log.info("MessageChangeListener_onApplicationEvent_e:{}", e); } } }
总结
到这里,Server
端已经完成的差不多了,至于Controller
和Service
里的业务代码,就不粘了,都是些普通的CRUD,有一些地方用到了Mybatis-Plus
的AR模式,确实很好用。
不过Controller
和Service
的代码我都会提交到GitHub
的,感兴趣的可以去GitHub
看一下。
Server
端完成了,接下来就是Client
了。先说一下,在Client
里,如果是和Server
重复的或者类似的,我会简单说一下或者一笔带过。毕竟Client
里要讲的东西太多了。很多黑科技哦,敬请期待!
还有一点,我们要把项目打包,发布到maven
仓库,在client
中引入这个mina-base
模块,当然可以申请发布到maven
中央仓库,不过为了节省时间,我自己搭建了一个私人maven
仓库,在pom.xml
中配置仓库,就可以引入我的mina-base
包了。当然也可以把项目打成Jar
包,然后作为第三方包引入。
这个服务器是我买的最低配版的服务器,大家就只在这里用一下好了
<repositories>
<repository>
<id>public</id>
<name>Team Maven Repository</name>
<url>http://148.70.249.148:8081/nexus/content/groups/public/</url>
<releases> <enabled>true</enabled> </releases> <snapshots> <enabled>true</enabled> </snapshots> </repository> </repositories>
为了方便调试,还要把代码一起提交到仓库。在pom.xml
中添加下面的配置
<plugin>
<artifactId>maven-source-plugin</artifactId>
<version>2.1</version>
<configuration>
<attach>true</attach>
</configuration> <executions> <execution> <phase>compile</phase> <goals> <goal>jar</goal> </goals> </execution> </executions> </plugin>
使用这个命令就可以把代码发布到仓库了。
mvn clean deploy -Dmaven.test.skip=true -Dmaven.javadoc.skip=true
Server端现在还有一些问题,会在第五章解决。
欢迎大家关注我的公众号,共同学习,一起进步。加油
本文使用 tech.souyunku.com 排版