import logger from '@wisdom-utils/utils/lib/logger'; import _, { isEmpty, isString } from 'lodash'; import MqttClient from 'mqtt-client'; import { isJSON } from '@wisdom-utils/components/lib/AppLayout/helpers/index'; import { api } from './api'; import { replaceSpeak, generatedId } from './utils'; import { postInformationStatus } from './api/service'; import { DEFAULT_KEEPLIVE, DEFAULT_MQTT_PATH, DEFAULT_PARSE_LEVEL, DEFAULT_TCP_IP, DEFAULT_TCP_PORT, DEFAULT_TIMEOUT, ERR_OK, MESSAGE_TYPE, NEW_MESSAGE, PASSWORD, PLATFORM_LEVEL, REQUEST_SERVICE, SYS_LEVEL, USERNAME, VIDEO_LEVEL, } from './constants'; import { createMessageFromHis, createMessageFromReal } from './message'; /* eslint-disable */ // eslint-disable-next-line no-undef const Logger = logger('mqtt'); class Notifier { constructor(userInfo, renderVideo, renderPlatform, renderSysPlatform, props) { this.userInfo = userInfo; this.messageCache = { totalCount: 0, messages: [], }; // 当前消息缓存 this._subscribers = {}; // 订阅器缓存 this._siteConfig = { site_code: this.userInfo.site, TcpIP: '', TcpPort: DEFAULT_TCP_PORT, TimeOut: '', KeepAlive: '', IsSSL: true, mqtt_path: DEFAULT_MQTT_PATH, mqtt_mess: {}, nginxStart: false, }; this.MQTTCount = 0; this.MQTTClient = null; this.MQTTOptions = {}; this.IsNeedReconnect = true; this.currentPageIndex = 1; this.currentPageSize = 10; this.start = this.start.bind(this); this.stop = this.stop.bind(this); this.subscribe = this.subscribe.bind(this); this.unsubscribe = this.unsubscribe.bind(this); this.confirmRead = this.confirmRead.bind(this); this.loadMore = this.loadMore.bind(this); this.hasMore = this.hasMore.bind(this); this.renderVideo = renderVideo; this.renderPlatform = renderPlatform; this.renderSysPlatform = renderSysPlatform; this.props = props; this.speakState = new window.SpeechSynthesisUtterance(); this.speakState.rate = 1; this.speakState.lang = 'zh'; } // 对外接口 async start() { this.getMqttSiteCode().then(res => { this.loadHisMessages(this.currentPageIndex, this.currentPageSize); this.connectMQTTServer(); }); } stop() { this.disconnectMQTTServer(); } subscribe(type, handler) { if (!(type in this._subscribers)) { this._subscribers[type] = []; } this._subscribers[type].push(handler); } unsubscribe(type, handler) { if (!(type in this._subscribers)) { logger.info('无效事件无法删除'); } if (!handler) { delete this._subscribers[type]; } else { const idx = this._subscribers[type].findIndex(ele => ele === handler); if (idx === -1) { logger.info('无效事件无法删除'); return; } this._subscribers[type].splice(idx, 1); if (this._subscribers[type].length === 0) { delete this._subscribers[type]; } } } publish(type, payload) { if (!(type in this._subscribers) || this._subscribers[type].length === 0) { return; } this._subscribers[type].forEach(handler => { try { handler(payload); } catch (e) { // eslint-disable-next-line no-undef logger.warn(`订阅器委托错误${e.message}`); } }); } confirmRead(isAll = false, hisIDs = []) { if (this.messageCache && this.messageCache.totalCount === 0 && this.messageCache.messages.length === 0) { return; } if (isAll) hisIDs = this.messageCache.messages.map(item => item.id); const self = this; // eslint-disable-next-line no-undef postInformationStatus({ query: { userID: this.userInfo.OID || window.globalConfig.userInfo.OID, hisID: hisIDs.join(','), isAll: isAll ? 1 : '', }, }) .then(res => { if (res.code !== 0) { Logger.info(res.errMsg); return; } if (isAll) { self.messageCache.totalCount = 0; self.messageCache.messages = []; self.currentPageIndex = 1; } else { hisIDs.forEach(id => { const index = self.messageCache.messages.findIndex(item => item.id === id); if (index > -1) { self.messageCache.messages.splice(index, 1); // eslint-disable-next-line no-plusplus self.messageCache.totalCount--; } }); } self.publish(NEW_MESSAGE, self.messageCache); }) .catch(err => { // eslint-disable-next-line no-undef logger.error(`postInformationStatus调用失败${err}`); }); } hasMore() { if (!this.messageCache) return false; if (!this.messageCache.totalCount) return false; return this.messageCache.totalCount > this.messageCache.messages.length; } loadMore(callback) { if (!this.hasMore()) return Promise.resolve([]); this.currentPageIndex += 1; return this.loadHisMessages(this.currentPageIndex, this.currentPageSize); } // mqtt async connectMQTTServer() { const hostname = this._siteConfig.TcpIP; const port = this._siteConfig.TcpPort; const clientId = `client-${generatedId()}`; const timeout = DEFAULT_TIMEOUT; const keepAlive = DEFAULT_KEEPLIVE; const cleanSession = true; const ssl = this._siteConfig.IsSSL; const userName = USERNAME; const password = PASSWORD; const path = this._siteConfig.mqtt_path; const siteCode = this._siteConfig.site_code ?? ''; this.MQTTCount = 0; if (hostname) { this.MQTTClient = new MqttClient.Client(hostname, port, `${path}?_site=${siteCode}`, clientId); this.MQTTOptions = { invocationContext: { host: hostname, port, path, clientId, }, timeout, keepAliveInterval: keepAlive, cleanSession, useSSL: ssl, userName, password, onSuccess: this.onMQTTConnect.bind(this), onFailure(e) { console.log(e); }, }; this.MQTTClient.connect(this.MQTTOptions); this.MQTTClient.onConnectionLost = this.onMQTTConnectionLost.bind(this); this.MQTTClient.onMessageArrived = this.onMessageArrived.bind(this); } } disconnectMQTTServer() { if (this.MQTTClient) { this.IsNeedReconnect = false; this.MQTTClient.disconnect(); this.MQTTClient = null; } } async getMqttSiteCode() { const self = this; return api.getMqttSiteCode({ 'request.preventCache': Date.now() }).then(res => { if (res && res.code === 0) { let mqttConfig = { mqtt_mess: {}, mqtt_path: self._siteConfig.mqtt_path, nginxStart: self._siteConfig.NginxStart, mqtt_IsSSL: true, }; if (Array.isArray(res.data) && res.data.length > 0) { if (res.data[0]) { const data = res.data[0]; mqttConfig.mqtt_IsSSL = self._siteConfig.IsSSL = data.IsSSL ? data.IsSSL : false; mqttConfig.mqtt_mess.site_code = mqttConfig.mqtt_site_code = self._siteConfig.site_code = data.SiteCode || self._siteConfig.site_code; mqttConfig.mqtt_mess.TcpIP = self._siteConfig.TcpIP = data.TcpIP; mqttConfig.mqtt_mess.TcpPort = self._siteConfig.TcpPort = data.TcpPort ? parseInt(data.TcpPort) : 8083; mqttConfig.mqtt_mess.MessageLevel = self._siteConfig.MessageLevel = data.MessageLevel ? data.MessageLevel : DEFAULT_PARSE_LEVEL; if (data.NginxStart) { mqttConfig.NginxStart = self._siteConfig.NginxStart = data.NginxStart; mqttConfig.mqtt_mess.TcpIP = self._siteConfig.TcpIP = self._siteConfig.mqtt_mess.TcpIP = window.location.hostname; mqttConfig.mqtt_mess.TcpPort = self._siteConfig.TcpPort = self._siteConfig.mqtt_mess.TcpPort = parseInt(window.location.port) || 443; mqttConfig.mqtt_path = self._siteConfig.mqtt_path = '/ws/'; } else { mqttConfig.nginxStart = data.NginxStart; } } else { mqttConfig.mqtt_mess.TcpIP = self._siteConfig.TcpIP = self._siteConfig.mqtt_mess.TcpIP = DEFAULT_TCP_IP; mqttConfig.mqtt_mess.TcpPort = self._siteConfig.TcpPort = self._siteConfig.mqtt_mess.TcpPort = DEFAULT_TCP_PORT; mqttConfig.mqtt_IsSSL = self._siteConfig.IsSSL = self._siteConfig.mqtt_mess.TcpIP + ':' + self._siteConfig.mqtt_mess.TcpPort; } mqttConfig.mqtt_iotIP = self._siteConfig.mqtt_iotIP = mqttConfig.mqtt_mess.TcpIP + ':' + (mqttConfig.mqtt_mess.TcpPort ? mqttConfig.mqtt_mess.TcpPort : '443'); // self.props.updateConfig && self.props.updateConfig(Object.assign({}, self.props.global, { // ...mqttConfig // })) // 应用共享状态只在initMicro中更新了一次,异步修改了的globalConfig,子应用读取到的是旧的,需要更新一下应用共享状态 // setGlobalState({ // globalConfig: window.globalConfig // }); } } else { Logger.info('获取mqtt服务器参数失败'); } }); } getSiteCode() { return this._siteConfig.site_code; } getUserInfo() { return this.userInfo; } getSiteConfig() { return this._siteConfig; } getMessageVersion() { return this._siteConfig.MessageLevel; } onMQTTConnect() { const site = this.getSiteCode(); // 信息化主题 this.MQTTClient.subscribe(site + REQUEST_SERVICE.EIMTopic); // 节水主题 this.MQTTClient.subscribe(site + REQUEST_SERVICE.SaveWaTopic); // 系统主题 this.MQTTClient.subscribe(site + REQUEST_SERVICE.SystemTopic); // 工单主题 this.MQTTClient.subscribe(site + REQUEST_SERVICE.WorkerOrderTopic); // 报警主题 this.MQTTClient.subscribe(site + REQUEST_SERVICE.ScadaTopic); // 用户主题 this.MQTTClient.subscribe(`${site}${REQUEST_SERVICE.UserTopic}${this.userInfo.OID}`); } onMQTTConnectionLost(responseObject) { const self = this; if (this.IsNeedReconnect) { this.MQTTClient.connect(self.MQTTOptions); this.MQTTtester = setInterval(function() { if (self.MQTTClient.isConnected()) { clearInterval(self.MQTTtester); } else { self.MQTTClient.connect(self.MQTTOptions); } }, 1000); } } onMessageArrived(buffer) { try { const parseMessage = JSON.parse(buffer.payloadString); const userInfo = this.getUserInfo(); if ( _.isEmpty(parseMessage.tousers) || userInfo.OID == parseMessage.tousers || parseMessage.tousers.includes(`${userInfo.OID},`) || parseMessage.tousers.includes(`,${userInfo.OID}`) ) { const message = createMessageFromReal(parseMessage, { version: this.getMessageVersion(), }); // 创建消息对象 this.messageCache.totalCount += 1; // if (message.infoLevel === SYS_LEVEL) { this.messageCache.messages.unshift(message); // 新消息置顶 // } else { // this.messageCache.messages.push(message); // } this.publish(NEW_MESSAGE, this.messageCache); this.renderWindowsInfo(buffer); // 1.windows桌面消息提醒 this.speakMessage(message); // 2.语音播报 this.noticeHandler(message); // 3.其他业务 } } catch (e) { Logger.error(`收到消息处理异常:${e.message}`); } finally { window?.share?.event?.emit?.('event:messageArrived', buffer); } } /** * 实时消息浏览器桌面通知 * @param {*} message 推送的原始实时消息 */ renderWindowsInfo(message) { if (document.visibilityState !== 'visible' && !document.visibilityState !== 'hidden') return; const self = this; function notifyMessage(message) { const parseMessage = JSON.parse(message.payloadString); let content = ''; if (message.level !== SYS_LEVEL) { if (self.getMessageVersion() === '2.0') { if (message.level === '4') { const messageContent = JSON.parse(parseMessage.content); content += `${messageContent.alarmType} ${messageContent.alarmDevice} ${messageContent.alarmContent} ${ messageContent.alarmValue } / ${messageContent.alarmThreshold}`; } } } const messageBody = { title: '', content, }; if (parseMessage.tousers === '') { messageBody.title = '新公告:'; } else { messageBody.title = '新通知:'; } if (content !== '') { if (!('Notification' in window)) { message.warn('This browser does not support desktop notification'); } else if (Notification.permission === 'granted') { const notification = new Notification(messageBody.title, { body: `${messageBody.content}`, icon: 'https://panda-water.com/web4/assets/images/icon/熊猫新1.png', }); notification.onclick = () => { notification.close(); }; } else if (Notification.permission !== 'denied') { Notification.requestPermission(permission => { if (permission === 'granted') { const notification = new Notification(messageBody.title, { body: `${messageBody.content}`, icon: 'https://panda-water.com/web4/assets/images/icon/熊猫新1.png', }); notification.onclick = () => { notification.close(); }; } }); } } } try { notifyMessage(message); } catch (error) {} } /** * 实时消息语音播报 * @param {*} message * @returns */ speakMessage = message => { if (!message) return; const { version, webVoice } = message; if(webVoice) return this.speakWebVoice(message); if(version === '3.0') return this.speakDefault(message); switch(message.infoType) { case 'SCADA报警': case '通用报警': this.speakAlarm(message); break; case '工单流程': case '工单提醒': this.speakCase(message); break; case '系统通知': case '系统消息': this.speakSys(message); break; default: this.speakOther(message); } }; speakAlarm = message => { const device = message?.infoContent?.title ?.replace(/_/g, ',') ?.split(' ') ?.splice(1) ?.join(' ') ?? ''; // 报警设备:“【阈值报警】 二供泵房 | 光谷物联港” const alarmType = message?.infoContent?.alarmType; // 报警类型:“状态报警” const content = message?.infoContent?.alarmContent; // 报警内容:“出水超压报警” const alarmValue = message?.infoContent?.alarmValue ?? ''; // 报警值 const level = message?.infoLevel !== '1' ? '紧急报警,紧急报警,紧急报警:' : '' let msg = `${level}${device},${alarmType},${content},报警值:${isString(alarmValue) ? replaceSpeak(alarmValue) :'' },请注意!!!`; this.speak(msg); }; speakCase = message => { const caseType = message?.infoContent?.caseType; const flowName = message?.infoContent?.flowName; const nodeName = message?.infoContent?.aliasNodeName || message?.infoContent?.nodeName; const flowNo = message?.infoContent?.flowNo; const content = message?.infoContent?.content; let msg = `您有新的${caseType}:${flowName},${nodeName},${flowNo ? '工单编号:' + flowNo : ''},${content}`; this.speak(msg); }; speakSys = message => { const noticeType = message?.infoContent?.noticeType ?? '系统通知'; const noticeTitle = message?.infoContent?.noticeTitle ?? ''; const noticeContent = message?.infoContent?.noticeContent ?? ''; let msg = `${noticeType}:${noticeTitle},${noticeContent}`; this.speak(msg); }; speakOther = message => { // 类型区分‘全员公告’ 还是 ‘个人消息’ const type = message.title ?? (isEmpty(message.tousers) || parseMessage.tousers === '' ? '公告' : '消息'); // 构建语音消息内容 const { infoContent, time } = message; let content = replaceSpeak((infoContent?.content ?? message.defaultContent ?? '').replace(/\\n/g, ',')); const text = content.substring(0, content.lastIndexOf(',')).replace(':', ','); let msg = `您有新的${type}:${text !== '' ? text : content} 时间:${time}`; this.speak(msg); }; speakDefault = message => { let msg = replaceSpeak(`${message.title},${message.title}: ${message.infoContent ?? ''}`) this.speak(msg); }; speakWebVoice = message => { const { webVoice } = message; if(typeof webVoice === "string") { this.speak(replaceSpeak(webVoice.replace(/_/g, ','))); } } speak = msg => { if (!this.speakState) return; this.speakState.text = msg; window.speechSynthesis.cancel(); window.speechSynthesis.speak(this.speakState); }; /** * 实时消息特殊处理 * @param {*} message * @returns */ noticeHandler = message => { if (!message) return; if (message.infoLevel === SYS_LEVEL) { this.renderSysNoticePlatform(message); // 显示系统弹窗 return; } if (message.infoLevel === VIDEO_LEVEL) { this.renderPopVideo(message); // 显示视频弹窗 return; } if (message.infoLevel === PLATFORM_LEVEL) { this.renderPopPlatform(message); // 显示报警弹窗 return; } }; renderPopPlatform(message) { this.renderPlatformElement = this.renderPlatform && this.renderPlatform(message); } renderSysNoticePlatform(message) { this.renderSysPlatform && this.renderSysPlatform(message); } renderPopVideo(message) { this.renderVideoElement = this.renderVideo && this.renderVideo(message); } // 工具类 async loadHisMessages(pageIndex, pageSize) { const self = this; return api .getInformationInfo({ userID: self.getUserInfo().OID, pageIndex, pageSize, 'request.preventCache': Date.now(), }) .then(res => { if (res && res.code === 0) { const data = res.data || {}; const result = { totalCount: data.totalCount, messages: (Array.isArray(data.list) ? data.list : []).map(item => createMessageFromHis(item, { version: this.getMessageVersion() }), ), }; self.messageCache.totalCount = result.totalCount; (result.messages || []).forEach(message => { const index = self.messageCache.messages.findIndex(item => item.id === message.id); if (index === -1) { self.messageCache.messages.push(message); } else { self.messageCache.messages[index].time = message.time; } }); if (self.messageCache.totalCount > self.messageCache.messages.length && data.list.length === 0) { // 服务端返回总数还有,但是查不到数据了,前端修正服务端返回的总数 self.messageCache.totalCount = self.messageCache.messages.length; } self.publish(NEW_MESSAGE, self.messageCache); console.log('render'); const selectData = result.messages.find(item => { return item.infoLevel == 6; }); if (selectData) { self.renderSysNoticePlatform(selectData); } return Promise.resolve(result); } }); } // 1.0报警消息格式解析播报 parseScadaMessage(message) { const data = message.messContent.split('\\n'); const last = data[2]; const alarmType = data[0].split('】')[0].split('【')[1]; const sensor = data[1]; const station = data[0].split('】')[1]; let lastValue = last; if (last.includes(' / ')) { lastValue = ''; const alaVal = last.split(' / '); let newVal = 0; let setVal = 0; let unit = ''; if (alaVal[0].includes(' ')) { newVal = alaVal[0].split(' ')[0] * 1; setVal = alaVal[1].split(' ')[0] * 1; unit = alaVal[0].split(' ')[1]; } else { newVal = alaVal[0] * 1; setVal = alaVal[1] * 1; } lastValue = Math.abs(setVal - newVal).toFixed(2) + unit; } else { lastValue = `,报警实时值${lastValue}`; } let msg = `紧急报警:${station},${alarmType},${sensor}${lastValue},请注意!!!`; for (let i = 0; i < 3; i++) { msg += msg; } const state = new window.SpeechSynthesisUtterance(msg); state.lang = 'zh'; state.rate = 1; window.speechSynthesis.cancel(); window.speechSynthesis.speak(state); } } export default Notifier;