java解析JT808协议的实现代码

网友投稿 269 2023-05-08

java解析JT808协议的实现代码

本篇文章将介绍JT808协议的解析思路。

另请大神绕路,不喜勿喷!

先写个大致的思路,有疑问可以联系本人,联系方式:

emial: hylexus@163.com

1 JT808协议扫盲

1.1 数据类型

数据类型

描述及要求

BYTE

无符号单字节整形(字节, 8 位)

WORD

无符号双字节整形(字, 16 位)

DWORD

无符号四字节整形(双字, 32 位)

BYTE[n]

n 字节

BCD[n]

8421 码, n 字节

STRING

GBK 编码,若无数据,置空

1.2 消息结构

标识位

消息头

消息体

校验码

标识位

1byte(0x7e)

16byte

1byte

1byte(0x7e)

1.3 消息头

消息ID(0-1)   消息体属性(2-3)  终端手机号(4-9)  消息流水号(10-11)    消息包封装项(12-15)

byte[0-1]   消息ID word(16)

byte[2-3]   消息体属性 word(16)

        bit[0-9]    消息体长度

        bit[10-12]  数据加密方式

                        此三位都为 0,表示消息体不加密

                        第 10 位为 1,表示消息体经过 RSA 算法加密

                        其它保留

        bit[13]     分包

                        1:消息体卫长消息,进行分包发送处理,具体分包信息由消息包封装项决定

                        0:则消息头中无消息包封装项字段

        bit[14-15]  保留

byte[4-9]   终端手机号或设备ID bcd[6KHFwZ]

        根据安装后终端自身的手机号转换

        手机号不足12 位,则在前面补 0

byte[10-11]     消息流水号 word(16)

        按发送顺序从 0 开始循环累加

byte[12-15]     消息包封装项

        byte[0-1]   消息包总数(word(16))

                        该消息分包后得总包数

        byte[2-3]   包序号(word(16))

                        从 1 开始

        如果消息体属性中相关标识位确定消息分包处理,则该项有内容

        否则无该项

2 解析

整个消息体结构中最复杂的就是消息头了。

2.1 消息体实体类

以下是对整个消息体抽象出来的一个java实体类。

import java.nio.channels.Channel;

public class PackageData {

/**

* 16byte 消息头

*/

protected MsgHeader msgHeader;

// 消息体字节数组

protected byte[] msgBodyBytes;

/**

* 校验码 1byte

*/

protected int checkSum;

//记录每个客户端的channel,以便下发信息给客户端

protected Channel channel;

public MsgHeader getMsgHeader() {

return msgHeader;

}

//TODO set 和 get 方法在此处省略

//消息头

public static class MsgHeader {

// 消息ID

protected int msgId;

/////// ========消息体属性

// byte[2-3]

protected int msgBodyPropsField;

// 消息体长度

protected int msgBodyLength;

// 数据加密方式

protected int encryptionType;

// 是否分包,true==>有消息包封装项

protected boolean hasSubPackage;

// 保留位[14-15]

protected String reservedBit;

/////// ========消息体属性

// 终端手机号

protected String terminalPhone;

// 流水号

protected int flowId;

//////// =====消息包封装项

// byte[12-15]

protected int packageInfoField;

// 消息包总数(word(16))

protected long totalSubPackage;

// 包序号(word(16))这次发送的这个消息包是分包中的第几个消息包, 从 1 开始

protected long subPackageSeq;

//////// =====消息包封装项

//TODO set 和 get 方法在此处省略

}

}

2.2 字节数组到消息体实体类的转换

2.2.1 消息转换器

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import cn.hylexus.jt808.util.BCD8421Operater;

import cn.hylexus.jt808.util.BitOperator;

import cn.hylexus.jt808.vo.PackageData;

import cn.hylexus.jt808.vo.PackageData.MsgHeader;

public class MsgDecoder {

private static final Logger log = LoggerFactory.getLogger(MsgDecoder.class);

private BitOperator bitOperator;

private BCD8421Operater bcd8421Operater;

public MsgDecoder() {

this.bitOperator = new BitOperator();

this.bcd8421Operater = new BCD8421Operater();

}

//字节数组到消息体实体类

public PackageData queueElement2PackageData(byte[] data) {

PackageData ret = new PackageData();

// 1. 16byte 或 12byte 消息头

MsgHeader msgHeader = this.parseMsgHeaderFromBytes(data);

ret.setMsgHeader(msgHeader);

int msgBodyByteStartIndex = 12;

// 2. 消息体

// 有子包信息,消息体起始字节后移四个字节:消息包总数(word(16))+包序号(word(16))

if (msgHeader.isHasSubPackage()) {

msgBodyByteStartIndex = 16;

}

byte[] tmp = new byte[msgHeader.getMsgBodyLength()];

System.arraycopy(data, msgBodyByteStartIndex, tmp, 0, tmp.length);

ret.setMsgBodyBytes(tmp);

// 3. 去掉分隔符之后,最后一位就是校验码

// int checkSumInPkg =

// this.bitOperator.oneByteToInteger(data[data.length - 1]);

int checkSumInPkg = data[data.length - 1];

int calculatedCheckSum = this.bitOperator.getCheckSum4JT808(data, 0, data.length - 1);

ret.setCheckSum(checkSumInPkg);

if (checkSumInPkg != calculatedCheckSum) {

log.warn("检验码不一致,msgid:{},pkg:{},calculated:{}", msgHeader.getMsgId(), checkSumInPkg, calculatedCheckSum);

}

return ret;

}

private MsgHeader parseMsgHeaderFromBytes(byte[] data) {

MsgHeader msgHeader = new MsgHeader();

// 1. 消息ID word(16)

// byte[] tmp = new byte[2];

// System.arraycopy(data, 0, tmp, 0, 2);

// msgHeader.setMsgId(this.bitOperator.twoBytesToInteger(tmp));

msgHeader.setMsgId(this.parseIntFromBytes(data, 0, 2));

// 2. 消息体属性 word(16)=================>

// System.arraycopy(data, 2, tmp, 0, 2);

// int msgBodyProps = this.bitOperator.twoBytesToInteger(tmp);

int msgBodyProps = this.parseIntFromBytes(data, 2, 2);

msgHeader.setMsgBodyPropsField(msgBodyProps);

// [ 0-9 ] 0000,0011,1111,1111(3FF)(消息体长度)

msgHeader.setMsgBodyLength(msgBodyProps & 0x1ff);

// [10-12] 0001,1100,0000,0000(1C00)(加密类型)

msgHeader.setEncryptionType((msgBodyProps & 0xe00) >> 10);

// [ 13_ ] 0010,0000,0000,0000(2000)(是否有子包)

msgHeader.setHasSubPackage(((msgBodyProps & 0x2000) >> 13) == 1);

// [14-15] 1100,0000,0000,0000(C000)(保留位)

msgHeader.setReservedBit(((msgBodyProps & 0xc000) >> 14) + "");

// 消息体属性 word(16)<=================

// 3. 终端手机号 bcd[6]

// tmp = new byte[6];

// System.arraycopy(data, 4, tmp, 0, 6);

// msgHeader.setTerminalPhone(this.bcd8421Operater.bcd2String(tmp));

msgHeader.setTerminalPhone(this.parseBcdStringFromBytes(data, 4, 6));

// 4. 消息流水号 word(16) 按发送顺序从 0 开始循环累加

// tmp = new byte[2];

// System.arraycopy(data, 10, tmp, 0, 2);

// msgHeader.setFlowId(this.bitOperator.twoBytesToInteger(tmp));

msgHeader.setFlowId(this.parseIntFromBytes(data, 10, 2));

// 5. 消息包封装项

// 有子包信息

if (msgHeader.isHasSubPackage()) {

// 消息包封装项字段

msgHeader.setPackageInfoField(this.parseIntFromBytes(data, 12, 4));

// byte[0-1] 消息包总数(word(16))

// tmp = new byte[2];

// System.arraycopy(data, 12, tmp, 0, 2);

// msgHeader.setTotalSubPackage(this.bitOperator.twoBytesToInteger(tmp));

msgHeader.setTotalSubPackage(this.parseIntFromBytes(data, 12, 2));

// byte[2-3] 包序号(word(16)) 从 1 开始

// tmp = new byte[2];

// System.arraycopy(data, 14, tmp, 0, 2);

// msgHeader.setSubPackageSeq(this.bitOperator.twoBytesToInteger(tmp));

msgHeader.setSubPackageSeq(this.parseIntFromBytes(data, 12, 2));

}

return msgHeader;

}

protected String parseStringFromBytes(byte[] data, int startIndex, int lenth) {

return this.parseStringFromBytes(data, startIndex, lenth, null);

}

private String parseStringFromBytes(byte[] data, int startIndex, int lenth, String defaultVal) {

try {

byte[] tmp = new byte[lenth];

System.arraycopy(data, startIndex, tmp, 0, lenth);

return new String(tmp, "UTF-8");

} catch (Exception e) {

log.error("解析字符串出错:{}", e.getMessage());

e.printStackTrace();

return defaultVal;

}

}

private String parseBcdStringFromBytes(byte[] data, int startIndex, int lenth) {

return this.parseBcdStringFromBytes(data, startIndex, lenth, null);

}

private String parseBcdStringFromBytes(byte[] data, int startIndex, int lenth, String defaultVal) {

try {

byte[] tmp = new byte[lenth];

System.arraycopy(data, startIndex, tmp, 0, lenth);

return this.bcd8421Operater.bcd2String(tmp);

} catch (Exception e) {

log.error("解析BCD(8421码)出错:{}", e.getMessage());

e.printStackTrace();

return defaultVal;

}

}

private int parseIntFromBytes(byte[] data, int startIndex, int length) {

return this.parseIntFromBytes(data, startIndex, length, 0);

}

private int parseIntFromBytes(byte[] data, int startIndex, int length, int defaultVal) {

try {

// 字节数大于4,从起始索引开始向后处理4个字节,其余超出部分丢弃

final int len = length > 4 ? 4 : length;

byte[] tmp = new byte[len];

System.arraycopy(data, startIndex, tmp, 0, len);

return bitOperator.byteToInteger(tmp);

} catch (Exception e) {

log.error("解析整数出错:{}", e.getMessage());

e.printStackTrace();

return defaultVal;

}

}

}

2.2.2 用到的工具类

2.2.2.1 BCD操作工具类

package cn.hylexus.jt808.util;

public class BCD8421Operater {

/**

* BCD字节数组===>String

*

* @param bytes

* @return 十进制字符串

*/

public String bcd2String(byte[] bytes) {

StringBuilder temp = new StringBuilder(bytes.length * 2);

for (int i = 0; i < bytes.length; i++) {

// 高四位

temp.append((bytes[i] & 0xf0) >>> 4);

// 低四位

temp.append(bytes[i] & 0x0f);

}

return temp.toString().substring(0, 1).equalsIgnoreCase("0") ? temp.toString().substring(1) : temp.toString();

}

/**

* 字符串==>BCD字节数组

*

* @param str

* @return BCD字节数组

*/

public byte[] string2Bcd(String str) {

// 奇数,前补零

if ((str.length() & 0x1) == 1) {

str = "0" + str;

}

byte ret[] = new byte[str.length() / 2];

byte bs[] = str.getBytes();

for (int i = 0; i < ret.length; i++) {

byte high = ascII2Bcd(bs[2 * i]);

byte low = ascII2Bcd(bs[2 * i + 1]);

// TODO 只遮罩BCD低四位?

ret[i] = (byte) ((high << 4) | low);

}

return ret;

}

private byte ascII2Bcd(byte asc) {

if ((asc >= '0') && (asc <= '9'))

return (byte) (asc - '0');

else if ((asc >= 'A') && (asc <= 'F'))

return (byte) (asc - 'A' + KHFwZ10);

else if ((asc >= 'a') && (asc <= 'f'))

return (byte) (asc - 'a' + 10);

else

return (byte) (asc - 48);

}

}

2.2.2.2 位操作工具类

package cn.hylexus.jt808.util;

import java.util.Arrays;

import java.util.List;

public class BitOperator {

/**

* 把一个整形该为byte

*

* @param value

* @return

* @throws Exception

*/

public byte integerTo1Byte(int value) {

return (byte) (value & 0xFF);

}

/**

* 把一个整形该为1位的byte数组

*

* @param value

* @return

* @throws Exception

*/

public byte[] integerTo1Bytes(int value) {

byte[] result = new byte[1];

result[0] = (byte) (value & 0xFF);

return result;

}

/**

* 把一个整形改为2位的byte数组

*

* @param value

* @return

* @throws Exception

*/

public byte[] integerTo2Bytes(int value) {

byte[] result = new byte[2];

result[0] = (byte) ((value >>> 8) & 0xFF);

result[1] = (byte) (value & 0xFF);

return result;

}

/**

* 把一个整形改为3位的byte数组

*

* @param value

* @return

* @throws Exception

*/

public byte[] integerTo3Bytes(int value) {

byte[] result = new byte[3];

result[0] = (byte) ((value >>> 16) & 0xFF);

result[1] = (byte) ((value >>> 8) & 0xFF);

result[2] = (byte) (value & 0xFF);

return result;

}

/**

* 把一个整形改为4位的byte数组

*

* @param value

* @return

* @throws Exception

*/

public byte[] integerTo4Bytes(int value){

byte[] result = new byte[4];

result[0] = (byte) ((value >>> 24) & 0xFF);

result[1] = (byte) ((value >>> 16) & 0xFF);

result[2] = (byte) ((value >>> 8) & 0xFF);

result[3] = (byte) (value & 0xFF);

return result;

}

/**

* 把byte[]转化位整形,通常为指令用

*

* @param value

* @return

* @throws Exception

*/

public int byteToInteger(byte[] value) {

int result;

if (value.length == 1) {

result = oneByteToInteger(value[0]);

} else if (value.length == 2) {

result = twoBytesToInteger(value);

} else if (value.length == 3) {

result = threeBytesToInteger(value);

} else if (value.length == 4) {

result = fourBytesToInteger(value);

} else {

result = fourBytesToInteger(value);

}

return result;

}

/**

* 把一个byte转化位整形,通常为指令用

*

* @param value

* @return

* @throws Exception

*/

public int oneByteToInteger(byte value) {

return (int) value & 0xFF;

}

/**

* 把一个2位的数组转化位整形

*

* @param value

* @return

* @throws Exception

*/

public int twoBytesToInteger(byte[] value) {

// if (value.length < 2) {

// throw new Exception("Byte array too short!");

// }

int temp0 = value[0] & 0xFF;

int temp1 = value[1] & 0xFF;

return ((temp0 << 8) + temp1);

}

/**

* 把一个3位的数组转化位整形

*

* @param value

* @return

* @throws Exception

*/

public int threeBytesToInteger(byte[] value) {

int temp0 = value[0] & 0xFF;

int temp1 = value[1] & 0xFF;

int temp2 = value[2] & 0xFF;

return ((temp0 << 16) + (temp1 << 8) + temp2);

}

/**

* 把一个4位的数组转化位整形,通常为指令用

*

* @param value

* @return

* @throws Exception

*/

public int fourBytesToInteger(byte[] value) {

// if (value.length < 4) {

// throw new Exception("Byte array too short!");

// }

int temp0 = value[0] & 0xFF;

int temp1 = value[1] & 0xFF;

int temp2 = value[2] & 0xFF;

int temp3 = value[3] & 0xFF;

return ((temp0 << 24) + (temp1 << 16) + (temp2 << 8) + temp3);

}

/**

* 把一个4位的数组转化位整形

*

* @param value

* @return

* @throws Exception

*/

public long fourBytesToLong(byte[] value) throws Exception {

// if (value.length < 4) {

// throw new Exception("Byte array too short!");

// }

int temp0 = value[0] & 0xFF;

int temp1 = value[1] & 0xFF;

int temp2 = value[2] & 0xFF;

int temp3 = value[3] & 0xFF;

return (((long) temp0 << 24) + (temp1 << 16) + (temp2 << 8) + temp3);

}

/**

* 把一个数组转化长整形

*

* @param value

* @return

* @throws Exception

*/

public long bytes2Long(byte[] value) {

long result = 0;

int len = value.length;

int temp;

for (int i = 0; i < len; i++) {

temp = (len - 1 - i) * 8;

if (temp == 0) {

result += (value[i] & 0x0ff);

} else {

result += (value[i] & 0x0ff) << temp;

}

}

return result;

}

/**

* 把一个长整形改为byte数组

*

* @param value

* @return

* @throws Exception

*/

public byte[] longToBytes(long value){

return longToBytes(value, 8);

}

/**

* 把一个长整形改为byte数组

*

* @param value

* @return

* @throws Exception

*/

public byte[] longToBytes(long value, int len) {

byte[] result = new byte[len];

int temp;

for (int i = 0; i < len; i++) {

temp = (len - 1 - i) * 8;

if (temp == 0) {

result[i] += (value & 0x0ff);

} else {

result[i] += (value >>> temp) & 0x0ff;

}

}

return result;

}

/**

* 得到一个消息ID

*

* @return

* @throws Exception

*/

public byte[] generateTransactionID() throws Exception {

byte[] id = new byte[16];

System.arraycopy(integerTo2Bytes((int) (Math.random() * 65536)), 0, id, 0, 2);

System.arraycopy(integerTo2Bytes((int) (Math.random() * 65536)), 0, id, 2, 2);

System.arraycopy(integerTo2Bytes((int) (Math.random() * 65536)), 0, id, 4, 2);

System.arraycopy(integerTo2Bytes((int) (Math.random() * 65536)), 0, id, 6, 2);

System.arraycopy(integerTo2Bytes((int) (Math.random() * 65536)), 0, id, 8, 2);

System.arraycopy(integerTo2Bytes((int) (Math.random() * 65536)), 0, id, 10, 2);

System.arraycopy(integerTo2Bytes((int) (Math.random() * 65536)), 0, id, 12, 2);

System.arraycopy(integerTo2Bytes((int) (Math.random() * 65536)), 0, id, 14, 2);

return id;

}

/**

* 把IP拆分位int数组

*

* @param ip

* @return

* @throws Exception

*/

public int[] getIntIPValue(String ip) throws Exception {

String[] sip = ip.split("[.]");

// if (sip.length != 4) {

// throw new Exception("error IPAddress");

// }

int[] intIP = { Integer.parseInt(sip[0]), Integer.parseInt(sip[1]), Integer.parseInt(sip[2]),

Integer.parseInt(sip[3]) };

return intIP;

}

/**

* 把byte类型IP地址转化位字符串

*

* @param address

* @return

* @throws Exception

*/

public String getStringIPValue(byte[] address) throws Exception {

int first = this.oneByteToInteger(address[0]);

int second = this.oneByteToInteger(address[1]);

int third = this.oneByteToInteger(address[2]);

int fourth = thihttp://s.oneByteToInteger(address[3]);

return first + "." + second + "." + third + "." + fourth;

}

/**

* 合并字节数组

*

* @param first

* @param rest

* @return

*/

public byte[] concatAll(byte[] first, byte[]... rest) {

int totalLength = first.length;

for (byte[] array : rest) {

if (array != null) {

totalLength += array.length;

}

}

byte[] result = Arrays.copyOf(first, totalLength);

int offset = first.length;

for (byte[] array : rest) {

if (array != null) {

System.arraycopy(array, 0, result, offset, array.length);

offset += array.length;

}

}

return result;

}

/**

* 合并字节数组

*

* @param rest

* @return

*/

public byte[] concatAll(List rest) {

int totalLength = 0;

for (byte[] array : rest) {

if (array != null) {

totalLength += array.length;

}

}

byte[] result = new byte[totalLength];

int offset = 0;

for (byte[] array : rest) {

if (array != null) {

System.arraycopy(array, 0, result, offset, array.length);

offset += array.length;

}

}

return result;

}

public float byte2Float(byte[] bs) {

return Float.intBitsToFloat(

(((bs[3] & 0xFF) << 24) + ((bs[2] & 0xFF) << 16) + ((bs[1] & 0xFF) << 8) + (bs[0] & 0xFF)));

}

public float byteBE2Float(byte[] bytes) {

int l;

l = bytes[0];

l &= 0xff;

l |= ((long) bytes[1] << 8);

l &= 0xffff;

l |= ((long) bytes[2] << 16);

l &= 0xffffff;

l |= ((long) bytes[3] << 24);

return Float.intBitsToFloat(l);

}

public int getCheckSum4JT808(byte[] bs, int start, int end) {

if (start < 0 || end > bs.length)

throw new ArrayIndexOutOfBoundsException("getCheckSum4JT808 error : index out of bounds(start=" + start

+ ",end=" + end + ",bytes length=" + bs.length + ")");

int cs = 0;

for (int i = start; i < end; i++) {

cs ^= bs[i];

}

return cs;

}

public int getBitRange(int number, int start, int end) {

if (start < 0)

throw new IndexOutOfBoundsException("min index is 0,but start = " + start);

if (end >= Integer.SIZE)

throw new IndexOutOfBoundsException("max index is " + (Integer.SIZE - 1) + ",but end = " + end);

return (number << Integer.SIZE - (end + 1)) >>> Integer.SIZE - (end - start + 1);

}

public int getBitAt(int number, int index) {

if (index < 0)

throw new IndexOutOfBoundsException("min index is 0,but " + index);

if (index >= Integer.SIZE)

throw new IndexOutOfBoundsException("max index is " + (Integer.SIZE - 1) + ",but " + index);

return ((1 << index) & number) >> index;

}

public int getBitAtS(int number, int index) {

String s = Integer.toBinaryString(number);

return Integer.parseInt(s.charAt(index) + "");

}

@Deprecated

public int getBitRangeS(int number, int start, int end) {

String s = Integer.toBinaryString(number);

StringBuilder sb = new StringBuilder(s);

while (sb.length() < Integer.SIZE) {

sb.insert(0, "0");

}

String tmp = sb.reverse().substring(start, end + 1);

sb = new StringBuilder(tmp);

return Integer.parseInt(sb.reverse().toString(), 2);

}

}

2.3 和netty结合

2.3.1 netty处理器链

import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import cn.kkbc.tpms.tcp.service.TCPServerHandler;

import io.netty.bootstrap.ServerBootstrap;

import io.netty.buffer.Unpooled;

import io.netty.channel.ChannelFuture;

import io.netty.channel.ChannelInitializer;

import io.netty.channel.ChannelOption;

import io.netty.channel.EventLoopGroup;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.SocketChannel;

import io.netty.channel.socket.nio.NioServerSocketChannel;

import io.netty.handler.codec.DelimiterBasedFrameDecoder;

import io.netty.handler.timeout.IdleStateHandler;

import io.netty.util.concurrent.Future;

public class TCPServer2 {

private Logger log = LoggerFactory.getLogger(getClass());

private volatile boolean isRunning = false;

private EventLoopGroup bossGroup = null;

private EventLoopGroup workerGroup = null;

private int port;

public TCPServer2() {

}

public TCPServer2(int port) {

this();

this.port = port;

}

private void bind() throws Exception {

this.bossGroup = new NioEventLoopGroup();

this.workerGroup = new NioEventLoopGroup();

ServerBootstrap serverBootstrap = new ServerBootstrap();

serverBootstrap.group(bossGroup, workerGroup)//

.channel(NioServerSocketChannel.class) //

.childHandler(new ChannelInitializer() { //

@Override

public void initChannel(SocketChannel ch) throws Exception {

//超过15分钟未收到客户端消息则自动断开客户端连接

ch.pipeline().addLast("idleStateHandler",

new IdleStateHandler(15, 0, 0, TimeUnit.MINUTES));

//ch.pipeline().addLast(new Decoder4LoggingOnly());

// 1024表示单条消息的最大长度,解码器在查找分隔符的时候,达到该长度还没找到的话会抛异常

ch.pipeline().addLast(

new DelimiterBasedFrameDecoder(1024, Unpooled.copiedBuffer(new byte[] { 0x7e }),

Unpooled.copiedBuffer(new byte[] { 0x7e, 0x7e })));

ch.pipeline().addLast(new TCPServerHandler());

}

}).option(ChannelOption.SO_BACKLOG, 128) //

.childOption(ChannelOption.SO_KEEPALIVE, true);

this.log.info("TCP服务启动完毕,port={}", this.port);

ChannelFuture channelFuture = serverBootstrap.bind(port).sync();

channelFuture.channel().closeFuture().sync();

}

public synchronized void startServer() {

if (this.isRunning) {

throw new IllegalStateException(this.getName() + " is already started .");

}

this.isRunning = true;

new Thread(() -> {

try {

this.bind();

} catch (Exception e) {

this.log.info("TCP服务启动出错:{}", e.getMessage());

e.printStackTrace();

}

}, this.getName()).start();

}

public synchronized void stopServer() {

if (!this.isRunning) {

throw new IllegalStateException(this.getName() + " is not yet started .");

}

this.isRunning = false;

try {

Future> future = this.workerGroup.shutdownGracefully().await();

if (!future.isSuccess()) {

log.error("workerGroup 无法正常停止:{}", future.cause());

}

future = this.bossGroup.shutdownGracefully().await();

if (!future.isSuccess()) {

log.error("bossGroup 无法正常停止:{}", future.cause());

}

} catch (InterruptedException e) {

e.printStackTrace();

}

this.log.info("TCP服务已经停止...");

}

private String getName() {

return "TCP-Server";

}

public static void main(String[] args) throws Exception {

TCPServer2 server = new TCPServer2(20048);

server.startServer();

// Thread.sleep(3000);

// server.stopServer();

}

}

2.3.2 netty针对于JT808的消息处理器

package cn.hylexus.jt808.service;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import cn.hylexus.jt808.server.SessionManager;

import cn.hylexus.jt808.service.codec.MsgDecoder;

import cn.hylexus.jt808.vo.PackageData;

import cn.hylexus.jt808.vo.Session;

import io.netty.buffer.ByteBuf;

import io.netty.channel.ChannelHandlerContext;

import io.netty.channel.ChannelInboundHandlerAdapter;

import io.netty.handler.timeout.IdleState;

import io.netty.handler.timeout.IdleStateEvent;

import io.netty.util.ReferenceCountUtil;

public class TCPServerHandler extends ChannelInboundHandlerAdapter { // (1)

private final Logger logger = LoggerFactory.getLogger(getClass());

// 一个维护客户端连接的类

private final SessionManager sessionManager;

private MsgDecoder decoder = new MsgDecoder();

public TCPServerHandler() {

this.sessionManager = SessionManager.getInstance();

}

@Override

public void channelRead(ChannelHandlerContext ctx, Object msg) throws InterruptedException { // (2)

try {

ByteBuf buf = (ByteBuf) msg;

if (buf.readableBytes() <= 0) {

// ReferenceCountUtil.safeRelease(msg);

return;

}

byte[] bs = new byte[buf.readableBytes()];

buf.readBytes(bs);

PackageData jt808Msg = this.decoder.queueElement2PackageData(bs);

// 处理客户端消息

this.processClientMsg(jt808Msg);

} finally {

release(msg);

}

}

private void processClientMsg(PackageData jt808Msg) {

// TODO 更加消息ID的不同,分别实现自己的业务逻辑

if (jt808Msg.getMsgHeader().getMsgId() == 0x900) {

// TODO ...

} else if (jt808Msg.getMsgHeader().getMsgId() == 0x9001) {

// TODO ...

}

// else if(){}

// else if(){}

// else if(){}

// else if(){}

// ...

else {

logger.error("位置消息,消息ID={}", jt808Msg.getMsgHeader().getMsgId());

}

}

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)

logger.error("发生异常:{}", cause.getMessage());

cause.printStackTrace();

}

@Override

public void channelActive(ChannelHandlerContext ctx) throws Exception {

Session session = Session.buildSession(ctx.channel());

sessionManager.put(session.getId(), session);

logger.debug("终端连接:{}", session);

}

@Override

public void channelInactive(ChannelHandlerContext ctx) throws Exception {

final String sessionId = ctx.channel().id().asLongText();

Session session = sessionManager.findBySessionId(sessionId);

this.sessionManager.removeBySessionId(sessionId);

logger.debug("终端断开连接:{}", session);

ctx.channel().close();

// ctx.close();

}

@Override

public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

if (IdleStateEvent.class.isAssignableFrom(evt.getClass())) {

IdleStateEvent event = (IdleStateEvent) evt;

if (event.state() == IdleState.READER_IDLE) {

Session session = this.sessionManager.removeBySessionId(Session.buildId(ctx.channel()));

logger.error("服务器主动断开连接:{}", session);

ctx.close();

}

}

}

private void release(Object msg) {

try {

ReferenceCountUtil.release(msg);

} catch (Exception e) {

e.printStackTrace();

}

}

}

2.3.3 用到的其他类

package cn.hylexus.jt808.server;

import java.util.List;

import java.util.Map;

import java.util.Map.Entry;

import java.util.Set;

import java.util.concurrent.ConcurrentHashMap;

import java.util.function.BiConsumer;

import java.util.stream.Collectors;

import cn.hylexus.jt808.vo.Session;

public class SessionManager {

private static volatile SessionManager instance = null;

// netty生成的sessionID和Session的对应关系

private Map sessionIdMap;

// 终端手机号和netty生成的sessionID的对应关系

private Map phoneMap;

public static SessionManager getInstance() {

if (instance == null) {

synchronized (SessionManager.class) {

if (instance == null) {

instance = new SessionManager();

}

}

}

return instance;

}

public SessionManager() {

this.sessionIdMap = new ConcurrentHashMap<>();

this.phoneMap = new ConcurrentHashMap<>();

}

public boolean containsKey(String sessionId) {

return sessionIdMap.containsKey(sessionId);

}

public boolean containsSession(Session session) {

return sessionIdMap.containsValue(session);

}

public Session findBySessionId(String id) {

return sessionIdMap.get(id);

}

public Session findByTerminalPhone(String phone) {

String sessionId = this.phoneMap.get(phone);

if (sessionId == null)

return null;

return this.findBySessionId(sessionId);

}

public synchronized Session put(String key, Session value) {

if (value.getTerminalPhone() != null && !"".equals(value.getTerminalPhone().trim())) {

this.phoneMap.put(value.getTerminalPhone(), value.getId());

}

return sessionIdMap.put(key, value);

}

public synchronized Session removeBySessionId(String sessionId) {

if (sessionId == null)

return null;

Session session = sessionIdMap.remove(sessionId);

if (session == null)

return null;

if (session.getTerminalPhone() != null)

this.phoneMap.remove(session.getTerminalPhone());

return session;

}

public Set keySet() {

return sessionIdMap.keySet();

}

public void forEach(BiConsumer super String, ? super Session> action) {

sessionIdMap.forEach(action);

}

public Set> entrySet() {

return sessionIdMap.entrySet();

}

public List toList() {

return this.sessionIdMap.entrySet().stream().map(e -> e.getValue()).collect(Collectors.toList());

}

}

3 demo级别java示例

请移步: https://github.com/hylexus/jt-808-protocol

另请不要吝啬,在GitHub给个star让小可装装逼…………^_^

急急忙忙写的博客,先写个大致的思路,有疑问可以联系本人,联系方式:

emial: hylexus@163.com

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Activiti explorer.war示例工程使用过程图解
下一篇:Spring security实现对账户进行加密
相关文章

 发表评论

暂时没有评论,来抢沙发吧~