springboot的netty代码实操
参考:https://www.cnblogs.com/mc-74120/p/13622008.html
pom文件
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
启动类
@EnableFeignClients
@EnableDiscoveryClient
@EnableScheduling
@SpringBootApplication
@EnableAsync
public class ChimetaCoreApplication implements CommandLineRunner{
@Autowired
private NettyServerListener nettyServerListener;
public static void main(String[] args) {
SpringApplication.run(ChimetaCoreApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
nettyServerListener.start();
}
}
服务端代码的listener
package com.chimeta.netty;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import com.chimeta.netty.protobuf.ImProto;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.extern.slf4j.Slf4j;
/**
* 服务启动监听器
*
* @author mwan
*/
@Component
@Slf4j
public class NettyServerListener {
/**
* 创建bootstrap
*/
ServerBootstrap serverBootstrap = new ServerBootstrap();
/**
* BOSS
*/
EventLoopGroup boss = new NioEventLoopGroup();
/**
* Worker
*/
EventLoopGroup work = new NioEventLoopGroup();
/**
* 通道适配器
*/
@Resource
private ServerChannelHandlerAdapter channelHandlerAdapter;
/**
* 从配置中心获取NETTY服务器配置
*/
@Value("${server.netty.port:10001}")
private int NETTY_PORT;
@Value("${server.netty.maxthreads:5000}")
private int MAX_THREADS;
/**
* 关闭服务器方法
*/
@PreDestroy
public void close() {
log.info("关闭服务器....");
//优雅退出
boss.shutdownGracefully();
work.shutdownGracefully();
}
/**
* 开启及服务线程
*/
public void start() {
serverBootstrap.group(boss, work)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, MAX_THREADS) //最大客户端连接数为1024
.handler(new LoggingHandler(LogLevel.INFO)).childOption(ChannelOption.SO_KEEPALIVE, true); ;
try {
//设置事件处理
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 下面的每一个addLast都有自己的含义,需要每个都过一下
ch.pipeline().addLast(new IdleStateHandler(18,0,0));
ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());
//ch.pipeline().addLast(new CustomProtobufInt32FrameDecoder());
ch.pipeline().addLast(new ProtobufDecoder(ImProto.ImMsg.getDefaultInstance()));
ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
//ch.pipeline().addLast(new CustomProtobufInt32LengthFieldPrepender());
ch.pipeline().addLast(new ProtobufEncoder());
// 业务处理
ch.pipeline().addLast(channelHandlerAdapter);
}
});
log.info("netty服务器在[{}]端口启动监听", NETTY_PORT);
ChannelFuture f = serverBootstrap.bind(NETTY_PORT).sync();
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("[出现异常] 释放资源", e);
boss.shutdownGracefully();
work.shutdownGracefully();
log.info("服务已关闭!");
}
}
}
ServerChannelHandlerAdapter处理类
package com.chimeta.netty;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.chimeta.netty.model.SessionCloseReason;
import com.chimeta.netty.protobuf.ImProto.ImMsg;
import com.chimeta.netty.util.ChannelUtils;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
/**
* 通信服务处理器
*/
@Component
@Sharable
@Slf4j
public class ServerChannelHandlerAdapter extends ChannelInboundHandlerAdapter {
/**
* 注入请求分排器
*/
@Autowired
private MessageDispatcher messageDispatcher;
@Autowired
private DeviceSessionManager sessionManager;
/** 用来记录当前在线连接数。应该把它设计成线程安全的。 */
//private AtomicInteger sessionCount = new AtomicInteger(0);
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
super.handlerAdded(ctx);
if (!ChannelUtils.addChannelSession(ctx.channel(), new IoSession(ctx.channel()))) {
ctx.channel().close();
log.error("Duplicate session,IP=[{}]",ChannelUtils.getRemoteIp(ctx.channel()));
}
//String server_ip = NetworkUtils.getRealIp();//获得本机IP
// 缓存计数器加1
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
super.handlerRemoved(ctx);
// 缓存计数器减1
//String server_ip = NetworkUtils.getRealIp();//获得本机IP
log.info(ctx.channel().id()+"离开了");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ImMsg gameMessage = (ImMsg)msg;
final Channel channel = ctx.channel();
IoSession session = ChannelUtils.getSessionBy(channel);
if(session.isHeartbeated()) {
session.setHeartbeated(false);
}
String deviceCode="";
if(session.getDevice() != null && StringUtils.isNotBlank(session.getDevice().getDeviceCode())) {
deviceCode = session.getDevice().getDeviceCode();
}
// if(!MessagingConst.TYPE_UPOS_REQUEST.equals(gameMessage.getMsg().getTypeUrl())) {
try {
log.info("Inbound message is :" + JsonFormat.printer().usingTypeRegistry(DeviceSessionManager.typeRegistry).print(gameMessage.toBuilder())
+ ", from device " + deviceCode);
} catch (InvalidProtocolBufferException e) {
log.info("Inbound message is :" + gameMessage.toString());
}
// }
messageDispatcher.dispatch(gameMessage, session);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
log.error("通信发生异常:", cause);
ctx.close();
}
/**
* 一段时间未进行读写操作 回调
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
/*心跳处理*/
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.READER_IDLE) {
/*读超时*/
log.info("READER_IDLE read overtime,close session");
final Channel channel = ctx.channel();
IoSession session = ChannelUtils.getSessionBy(channel);
/*
* if(messageDispatcher.sendHeartbeat(session) == false) { //如果心跳检测失败,则连接异常,主动断开
* session.setSessionCloseReason(SessionCloseReason.OVER_TIME); ctx.close(); };
*/
session.setSessionCloseReason(SessionCloseReason.OVER_TIME);
ctx.close();
} else if (event.state() == IdleState.WRITER_IDLE) {
/*写超时*/
log.info("WRITER_IDLE 写超时");
} else if (event.state() == IdleState.ALL_IDLE) {
/*总超时*/
log.info("ALL_IDLE 总超时");
}
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
sessionManager.unregisterUserContext(ctx.channel());
log.info(ctx.channel().id() + "已掉线!");
// 这里加入玩家的掉线处理
ctx.close();
}
}
MessageDispatcher分派各个处理器
package com.chimeta.netty;
import com.chimeta.netty.service.TerminalService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import com.chimeta.netty.constant.MessagingConst;
import com.chimeta.netty.model.SessionCloseReason;
import com.chimeta.netty.protobuf.ImProto.ImMsg;
import com.chimeta.netty.service.LoginService;
import com.chimeta.netty.util.MessageBuilder;
import com.google.protobuf.InvalidProtocolBufferException;
import lombok.extern.slf4j.Slf4j;
import javax.annotation.Resource;
/**
* 请求分排器
*/
@Component
@Slf4j
public class MessageDispatcher{
@Autowired
private LoginService loginService;
@Resource
private TerminalService terminalService;
/**
* 消息分发处理
*
* @param gameMsg
* @throws InvalidProtocolBufferException
*/
@Async
public void dispatch(ImMsg imMsg, IoSession currSession) throws InvalidProtocolBufferException {
if(imMsg.getId() < 0) {
currSession.sendMessage(MessageBuilder.buildErrorResponse(imMsg, MessagingConst.RESPONSE_ERR_CODE_400, "Invalid message!"));
return;
}
//log.info("接收到的消息TypeUrl是: "+imMsg.getMsg().getTypeUrl());
switch(imMsg.getMsg().getTypeUrl()) {
case MessagingConst.TYPE_ONLINE_REQUEST:
// 处理设备上线请求
loginService.doLogin(imMsg, currSession);
break;
case MessagingConst.TYPE_USER_LOGON_REQUEST:
// 处理请求
loginService.doUserLogon(imMsg, currSession);
break;
case MessagingConst.TYPE_USER_LOGOFF_REQUEST:
// 处理请求
loginService.doUserLogoff(imMsg, currSession);
break;
case MessagingConst.TYPE_TERMINAL_STATE_REQUEST:
// 我写的
terminalService.multiInsert(imMsg, currSession);
break;
default:
if(currSession != null) {
// 返回客户端发来的心跳消息
responseHeartbeat(imMsg, currSession);
}
break;
}
}
/**
* 发送心跳包消息
* @param gameMsg
* @param currSession
* @return
*/
public boolean sendHeartbeat(IoSession currSession) {
try {
if(currSession.isHeartbeated()) {
return false;
}
ImMsg.Builder imMsgBuilder = ImMsg.newBuilder();
currSession.sendMessage(imMsgBuilder.build());
currSession.setHeartbeated(true);
return true;
}catch(Exception e) {
log.error("主动发送心跳包时发生异常:", e);
currSession.close(SessionCloseReason.EXCEPTION);
return false;
}
}
/**
* 返回客户端发来的心跳包消息
* @param imMsg
* @param currSession
*/
private void responseHeartbeat(ImMsg imMsg,IoSession currSession) {
ImMsg.Builder imMsgBuilder = ImMsg.newBuilder();
currSession.sendMessage(imMsgBuilder.build());
}
}
最后到service业务处理TerminalService
package com.chimeta.netty.service;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.chimeta.common.entity.terminal.TerminalStateMonitorDO;
import com.chimeta.netty.IoSession;
import com.chimeta.netty.constant.MessagingConst;
import com.chimeta.netty.model.DeviceInfo;
import com.chimeta.netty.protobuf.ImProto;
import com.chimeta.netty.util.MessageBuilder;
import com.chimeta.terminal.mapper.TerminalStateMonitorMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
/**
* 盒子设备相关的实现类
*/
@Service
@Slf4j
public class TerminalService extends ServiceImpl<TerminalStateMonitorMapper, TerminalStateMonitorDO> {
@Transactional(rollbackFor = Exception.class)
public void multiInsert(ImProto.ImMsg imMsg, IoSession currSession){
DeviceInfo deviceInfo = currSession.getDevice();
if(deviceInfo == null) {
currSession.sendMessage(MessageBuilder.buildErrorResponse(imMsg, MessagingConst.RESPONSE_ERR_CODE_400, "device not online!"));
return;
}
try {
ImProto.TerminalStateList terminalStateList = imMsg.getMsg().unpack(ImProto.TerminalStateList.class);
log.info("TerminalService multiInsert TerminalStateList:{}", terminalStateList);
List<ImProto.TerminalState> requestTerminalStateList = terminalStateList.getTerminalStateList();
if (!CollectionUtils.isEmpty(requestTerminalStateList)){
List<TerminalStateMonitorDO> tmplist = new ArrayList<>();
for (ImProto.TerminalState requestTerminalState : requestTerminalStateList){
TerminalStateMonitorDO terminalStateMonitorDO = new TerminalStateMonitorDO();
terminalStateMonitorDO.setBatteryLevel(requestTerminalState.getBatteryLevel());
terminalStateMonitorDO.setChargingState(requestTerminalState.getChargingState());
terminalStateMonitorDO.setTemperature(BigDecimal.valueOf(requestTerminalState.getTemperature()));
terminalStateMonitorDO.setUniqueCode(deviceInfo.getDeviceCode());
terminalStateMonitorDO.setStateTime(requestTerminalState.getStateTime());
tmplist.add(terminalStateMonitorDO);
}
this.saveBatch(tmplist);
}
} catch (Exception e) {
log.error("TerminalService multiInsert error:{}", e);
}
}
}
至此,服务端的处理逻辑写完,然后比较费时间的是自己写client的请求,终于经过两三天时间总结好了,写了个test类,如下
package com.chimeta.core;
import com.chimeta.netty.protobuf.ImProto;
import com.google.protobuf.Any;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
@Slf4j
@RunWith(MockitoJUnitRunner.class)
class NettyTerminalTest {
@Test
public void tryTest() throws InterruptedException {
ImProto.TerminalStateList terminalstateList = ImProto.TerminalStateList.newBuilder().build();
for (int i = 0; i < 3; i++) {
ImProto.TerminalState build = ImProto.TerminalState.newBuilder()
.setBatteryLevel(i)
.setChargingState(i * 11)
.setTemperature(i * 11.1)
.setStateTime(i * 111)
.build();
terminalstateList = terminalstateList.toBuilder().addTerminalState(build).build();
}
ImProto.ImMsg imMsg = ImProto.ImMsg.newBuilder().setId(66).setMsg(Any.pack(terminalstateList)).build();
Channel channel = new Bootstrap()
.group(new NioEventLoopGroup(1))
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
System.out.println("初始化连接...");
ch.pipeline().addLast("encode", new ProtobufEncoder())
.addLast(new ProtobufVarint32FrameDecoder()).addLast(new ProtobufVarint32LengthFieldPrepender());
}
})
.channel(NioSocketChannel.class).connect("192.168.123.170", 10001)
.sync()
.channel();
// channel.pipeline().addLast(new StringEncoder()).writeAndFlush(ByteBufAllocator.DEFAULT.buffer().writeBytes(imMsg.toByteArray()));
channel.pipeline().writeAndFlush(Unpooled.copiedBuffer(imMsg.toByteArray()));
System.out.println("over!");
}
}
好了,记录下,以后就不会忘记了