From c229cfd4ccd7ddf9365daa68891a09b8618763bd Mon Sep 17 00:00:00 2001 From: wuhan <18852676227@163.com> Date: Mon, 2 Sep 2024 14:01:27 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E5=9F=BA=E7=A1=80=E6=9C=8D?= =?UTF-8?q?=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../{hcc_info.ts => hcc_info.entity.ts} | 5 +- src/main.ts | 2 - src/mqtt/mqtt.service.ts | 91 +++++++++++++++++++ src/mqtt/mqtt.ts | 48 ++++++++-- src/mqtt/mqttTool.ts | 42 ++++++++- 5 files changed, 172 insertions(+), 16 deletions(-) rename src/entities/{hcc_info.ts => hcc_info.entity.ts} (79%) create mode 100644 src/mqtt/mqtt.service.ts diff --git a/src/entities/hcc_info.ts b/src/entities/hcc_info.entity.ts similarity index 79% rename from src/entities/hcc_info.ts rename to src/entities/hcc_info.entity.ts index c3677d1..53cc879 100644 --- a/src/entities/hcc_info.ts +++ b/src/entities/hcc_info.entity.ts @@ -9,8 +9,11 @@ export class HccInfo extends BaseEntity { @PrimaryColumn({ type: 'varchar' }) id: string; - //标题 + //地址 @Column({ type: 'varchar', name: 'address_detail' }) addressDetail: string; + @Column({ type: 'varchar', name: 'address_code' }) + addressCode: string; + } diff --git a/src/main.ts b/src/main.ts index f114b20..d1a6610 100644 --- a/src/main.ts +++ b/src/main.ts @@ -12,7 +12,6 @@ import { AllExceptionsFilter } from './common/exceptions/base.exception.filter'; import { HttpExceptionFilter } from './common/exceptions/http.exception.filter'; import { NestExpressApplication } from '@nestjs/platform-express'; import { join } from 'path'; -import { Mqtt } from './mqtt/mqtt'; async function bootstrap() { if (module.hot) { @@ -37,7 +36,6 @@ async function bootstrap() { app.useGlobalPipes(new ValidationPipe()); app.useStaticAssets(join(__dirname, '..', 'public')); - Mqtt.init() await app.listen(7666, '0.0.0.0'); } bootstrap(); diff --git a/src/mqtt/mqtt.service.ts b/src/mqtt/mqtt.service.ts new file mode 100644 index 0000000..8ba52ee --- /dev/null +++ b/src/mqtt/mqtt.service.ts @@ -0,0 +1,91 @@ +// src/mqtt/mqtt.service.ts +import { Injectable, OnModuleInit, OnModuleDestroy } from '@nestjs/common'; +import * as mqtt from 'mqtt'; +import MqttTool from './mqttTool'; +import AppLogger from '../utils/logger'; +import clientMapper from '../client/client.mapper'; +import { EventEmitter2 } from '@nestjs/event-emitter'; +// import { EventEmitterService } from 'src/utils/event.service'; +// import Ev from '../event/event'; +const logger = new AppLogger(); + +@Injectable() +export class MqttService implements OnModuleInit, OnModuleDestroy { + constructor(private eventEmitter: EventEmitter2) { } + + private client: mqtt.MqttClient; + + + async onModuleInit() { + const mqttUrl = 'mqtt://192.168.3.18:1883'; + const { + clientId, + userName, + password, + securityKey, + } = await new MqttTool().getMqttInfo() + console.log(`clientId:${clientId},mqttUrl:${mqttUrl}`); + if (!clientId) { + console.log(`无法成功连接mqtt!!!`); + return; + } + + const options = { + clientId, + username: userName, + password, + clean: true, // true: 清除会话, false: 保留会话 + connectTimeout: 60 * 60, //超时时间 + rejectUnauthorized: false, + }; + + this.client = mqtt.connect(mqttUrl, options); + const that = this.eventEmitter + + // 订阅主题 + + this.client.on('connect', () => { + console.log('MQTT连接成功'); + that.emit('message', '101室报警') + // new EventGateway().message( + // `${111}${111}` + // // areaCode: address.addressCode + // ) + // 订阅主题 + this.client.subscribe(['rep/hcc/+/msgPush/report']); + + this.client.on('message', async (topic, mes) => { + const arr = topic.split('/'); + console.log(`订阅到topic: ${topic}`); + logger.info(null, `订阅到topic: ${topic}`); + const data = JSON.parse(mes.toString()) + logger.info(null, `data:${JSON.stringify(data)}`); + // 获取监听到的数据 + const { pushFlag, pushId, msgType, title, content } = data + // 查询当前信息箱所在地址 + const address = await clientMapper.getHccAddress(); + if (address) { + // new EventGateway().message( + // `${address.addressDetail}${content}` + // // areaCode: address.addressCode + // ) + } + }); + }); + + this.client.on('error', (err) => { + console.error('MQTT error:', err); + }); + } + + onModuleDestroy() { + if (this.client) { + this.client.end(); + } + } + + // 你可以添加更多方法来发送消息等 + publish(topic: string, message: string) { + this.client.publish(topic, message); + } +} \ No newline at end of file diff --git a/src/mqtt/mqtt.ts b/src/mqtt/mqtt.ts index 7dbf3e1..1fed53d 100644 --- a/src/mqtt/mqtt.ts +++ b/src/mqtt/mqtt.ts @@ -1,23 +1,29 @@ import * as mqtt from 'mqtt'; import MqttTool from './mqttTool' import AppLogger from '../utils/logger'; -import { EventGateway } from 'src/gateway/gateway'; -import { HccInfo } from 'src/entities/hcc_info'; +import { Gateway } from '../gateway/gateway'; +import clientMapper from '../client/client.mapper'; +import { Inject } from '@nestjs/common'; +const { EventEmitter } = require('events'); const logger = new AppLogger(); +const event = new EventEmitter(); export class Mqtt { - protected static client: mqtt.MqttClient; + @Inject() + protected gateway: Gateway; + protected client: mqtt.MqttClient; - static async init() { + async init() { + const mqttUrl = 'mqtt://192.168.3.18:1883' const { clientId, userName, password, securityKey, - mqttUrl } = await new MqttTool().getMqttInfo() console.log(`clientId:${clientId},mqttUrl:${mqttUrl}`); if (!clientId) { + console.log(`无法成功连接mqtt!!!`); return; } @@ -27,12 +33,13 @@ export class Mqtt { password, clean: true, // true: 清除会话, false: 保留会话 connectTimeout: 60 * 60, //超时时间 + rejectUnauthorized: false, }; this.client = mqtt.connect(mqttUrl, options); // 订阅主题 - this.client.subscribe(['rep/hcc/#/msgPush/report']); + this.client.subscribe(['rep/hcc/+/msgPush/report']); this.client.on('connect', async () => { //初始化日志数据 @@ -47,15 +54,20 @@ export class Mqtt { this.client.on('message', async (topic, mes) => { const arr = topic.split('/'); + console.log(`订阅到topic: ${topic}`); logger.info(null, `订阅到topic: ${topic}`); const data = JSON.parse(mes.toString()) logger.info(null, `data:${JSON.stringify(data)}`); // 获取监听到的数据 const { pushFlag, pushId, msgType, title, content } = data // 查询当前信息箱所在地址 - const list = await HccInfo.find() - const address = list[0].addressDetail - new EventGateway().message({ message: `${address}${content}` }) + const address = await clientMapper.getHccAddress(); + if (address) { + // new EventGateway().message( + // `${address.addressDetail}${content}` + // // areaCode: address.addressCode + // ) + } }); this.client.on('error', (err) => { @@ -68,4 +80,22 @@ export class Mqtt { console.log('已断开'); }); } + + async so() { + console.log('socket推送中'); + // cli.emit( + // 'message', `${100}${'报警'}` + // // areaCode: address.addressCode + // ) + // this.emit('message', `${100}${'报警'}`) + let currentTime = 0; + // 每秒触发一次 update 事件 + setInterval(() => { + currentTime++; + this.gateway.message( + `${101}${111}` + // areaCode: address.addressCode + ) + }, 1000); + } } \ No newline at end of file diff --git a/src/mqtt/mqttTool.ts b/src/mqtt/mqttTool.ts index ee579f0..cd4e2f6 100644 --- a/src/mqtt/mqttTool.ts +++ b/src/mqtt/mqttTool.ts @@ -1,16 +1,50 @@ import axios from 'axios'; +import clientMapper from 'src/client/client.mapper'; +const https = require('https') export default class MqttTool { // 先请求接口获取mqtt连接地址 async getMqttInfo() { try { - const result = await axios.post('http:localhost:8000/iot/hcc/getConnInfo') - const connInfo = result.data.connInfo; - return connInfo; + let id: any; + const ids = await clientMapper.getAgentId(); + if (ids.length) { + id = ids[0].id + } + const result = await axios.post('https://192.168.3.18:60011/iot/hcc/getConnInfo', { + agentId: id, + agentType: 5, + mac: this.getMac() + }, + { + httpsAgent: new https.Agent({ + rejectUnauthorized: false + }), + headers: { + accessToken: '0f089849bace7ebf5e2a8cc4683a10283c0941cb9ed617b7ad3ce052950568c2' + }, + } + ) + const connInfo = result.data?.respData?.connInfo; + return connInfo || {}; } catch (error) { - console.log('请求失败'); + console.log(error); return {} } } + getMac() { + var os = require("os"); + var mac = '' + var networkInterfaces = os.networkInterfaces(); + for (var i in networkInterfaces) { + for (var j in networkInterfaces[i]) { + if (networkInterfaces[i][j]["family"] === "IPv4" && networkInterfaces[i][j]["mac"] !== "00:00:00:00:00:00" && networkInterfaces[i][j]["address"] !== "127.0.0.1") { + mac = networkInterfaces[i][j]["mac"] + } + } + } + return mac + } + } \ No newline at end of file