|
|
@@ -9,6 +9,7 @@ import com.gyjiot.common.enums.ServerType;
|
|
|
import com.gyjiot.common.utils.DateUtils;
|
|
|
import com.gyjiot.common.utils.StringUtils;
|
|
|
import com.gyjiot.common.utils.gateway.mq.TopicsUtils;
|
|
|
+import com.gyjiot.common.utils.json.JSONValidator;
|
|
|
import com.gyjiot.iot.ruleEngine.RuleProcess;
|
|
|
import com.gyjiot.mq.redischannel.producer.MessageProducer;
|
|
|
import com.gyjiot.mq.service.IDeviceReportMessageService;
|
|
|
@@ -65,34 +66,49 @@ public class MqttPublish implements MqttHandler {
|
|
|
/*获取客户端id*/
|
|
|
String clientId = AttributeUtils.getClientId(ctx.channel());
|
|
|
String topicName = publishMessage.variableHeader().topicName();
|
|
|
- log.debug("=>***客户端[{}],主题[{}],推送消息[{}]", clientId, topicName,
|
|
|
+ log.debug("=>***客户端[{}],主题[{}],推送消息:{}", clientId, topicName,
|
|
|
ByteBufUtil.hexDump(publishMessage.content()));
|
|
|
//格式化消息体
|
|
|
byte[] source = ByteBufUtil.getBytes(publishMessage.content());
|
|
|
String oldInfo = new String(source);
|
|
|
- log.debug("=>***客户端[{}],主题[{}],原消息[{}]", clientId, topicName, oldInfo);
|
|
|
+ log.debug("=>***客户端[{}],主题[{}],原消息:{}", clientId, topicName, oldInfo);
|
|
|
if(StringUtils.isNotEmpty(oldInfo)) {
|
|
|
- JSONObject infoObj = JSONObject.parseObject(oldInfo);
|
|
|
- if(infoObj.get("k0")!=null && infoObj.get("k1")!=null) {
|
|
|
- //走温湿度采集的逻辑
|
|
|
- BigDecimal humidity = infoObj.getBigDecimal("k0").divide(BigDecimal.valueOf(10), 1, RoundingMode.HALF_UP);
|
|
|
- BigDecimal temperature = infoObj.getBigDecimal("k1").divide(BigDecimal.valueOf(10), 1, RoundingMode.HALF_UP);
|
|
|
- String ts = infoObj.getString("time");
|
|
|
- JSONArray arr = new JSONArray();
|
|
|
- JSONObject humidityObj = new JSONObject();
|
|
|
- humidityObj.put("id", "humidity");
|
|
|
- humidityObj.put("name", "humidity");
|
|
|
- humidityObj.put("ts", ts);
|
|
|
- humidityObj.put("value", humidity);
|
|
|
- JSONObject temperatureObj = new JSONObject();
|
|
|
- temperatureObj.put("id", "temperature");
|
|
|
- temperatureObj.put("name", "temperature");
|
|
|
- temperatureObj.put("ts", ts);
|
|
|
- temperatureObj.put("value", temperature);
|
|
|
- arr.add(humidityObj);
|
|
|
- arr.add(temperatureObj);
|
|
|
- String modifiedJson = arr.toJSONString();
|
|
|
- ByteBuf byteBuf = Unpooled.copiedBuffer(modifiedJson, CharsetUtil.UTF_8);
|
|
|
+ if (JSONValidator.isValidJSONObject(oldInfo)) {
|
|
|
+ JSONObject infoObj = JSONObject.parseObject(oldInfo);
|
|
|
+ if(infoObj.get("k0")!=null && infoObj.get("k1")!=null) {
|
|
|
+ //走温湿度采集的逻辑
|
|
|
+ BigDecimal humidity = infoObj.getBigDecimal("k0").divide(BigDecimal.valueOf(10), 1, RoundingMode.HALF_UP);
|
|
|
+ BigDecimal temperature = infoObj.getBigDecimal("k1").divide(BigDecimal.valueOf(10), 1, RoundingMode.HALF_UP);
|
|
|
+ String ts = infoObj.getString("time");
|
|
|
+ JSONArray arr = new JSONArray();
|
|
|
+ JSONObject humidityObj = new JSONObject();
|
|
|
+ humidityObj.put("id", "humidity");
|
|
|
+ humidityObj.put("name", "humidity");
|
|
|
+ humidityObj.put("ts", ts);
|
|
|
+ humidityObj.put("value", humidity);
|
|
|
+ JSONObject temperatureObj = new JSONObject();
|
|
|
+ temperatureObj.put("id", "temperature");
|
|
|
+ temperatureObj.put("name", "temperature");
|
|
|
+ temperatureObj.put("ts", ts);
|
|
|
+ temperatureObj.put("value", temperature);
|
|
|
+ arr.add(humidityObj);
|
|
|
+ arr.add(temperatureObj);
|
|
|
+ String modifiedJson = arr.toJSONString();
|
|
|
+ ByteBuf byteBuf = Unpooled.copiedBuffer(modifiedJson, CharsetUtil.UTF_8);
|
|
|
+ // 重新构造 MqttPublishMessage
|
|
|
+ MqttPublishMessage newPublishMessage = MqttMessageBuilders.publish()
|
|
|
+ .topicName(topicName)
|
|
|
+ .retained(publishMessage.fixedHeader().isRetain()) // 保持 retain 标志
|
|
|
+ .qos(MqttQoS.valueOf(publishMessage.fixedHeader().qosLevel().value())) // 保持 QoS
|
|
|
+ .payload(byteBuf)
|
|
|
+ .build();
|
|
|
+ // 替换原始消息(如果是 ChannelHandlerContext 处理,可以覆盖 message)
|
|
|
+ ((MqttPublishMessage) message).release(); // 释放旧的 ByteBuf(避免内存泄漏)
|
|
|
+ publishMessage = newPublishMessage; // 替换为新消息
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ //不是JSON
|
|
|
+ ByteBuf byteBuf = Unpooled.copiedBuffer(oldInfo, CharsetUtil.UTF_8);
|
|
|
// 重新构造 MqttPublishMessage
|
|
|
MqttPublishMessage newPublishMessage = MqttMessageBuilders.publish()
|
|
|
.topicName(topicName)
|