import { Injectable } from '@angular/core';
import { Paho } from 'ng2-mqtt/mqttws31';
import { Observable, ReplaySubject } from 'rxjs';
export class TopicDefinition {
name: string;
option?: any;
}
@Injectable({
providedIn: 'root'
})
export class MqttService {
private client: Paho.MQTT.Client;
private topics: string[];
private payload: ReplaySubject<Paho.MQTT.Message>;
private connection: ReplaySubject<boolean>;
public status: boolean;
private config: any;
constructor() {
this.topics = [];
this.payload = new ReplaySubject<Paho.MQTT.Message>(1);
this.connection = new ReplaySubject<boolean>(1);
}
//constructor(host: string, clientId: string);
//constructor(host: string, port: number, clientId: string);
//constructor(host: string, port: number, path: string, clientId: string);
public initMqttClient(host: string, port: number, clientID?: string): Promise<boolean> {
clientID = clientID ? clientID : this.generateUUID();
//創(chuàng)建客戶端實(shí)例
this.client = new Paho.MQTT.Client(host, port, '', clientID);
// 建立連線,連接成功返回true,否則返回false
return new Promise((resolve, reject) => {
// 建立連線
this.client.connect({
cleanSession: false,
onSuccess: () => {
console.log(
'%c ?? MQTT Connection Success ',
'background: #000; color: #5FBA7D; line-height: 26px;'
);
this.status = true;
this.connection.next(true);
resolve(true);
}
});
// 無法連接或斷線時(shí)觸發(fā)
this.client.onConnectionLost = async (responseObject: object) => {
console.log(
'%c ? MQTT Connection Lost ',
'background: #000; color: ##E43935; line-height: 26px;'
);
this.status = false;
this.connection.next(false);
await this.reonnect();
reject(false);
};
// 當(dāng)接收到訂閱訊息時(shí)觸發(fā)
this.client.onMessageArrived = this.onMessageArrived.bind(this);
});
}
/**
* MQTT的Client Getter
*
* @method public
* @return 回傳MQTT的Client
*/
public get mqttClient(): Paho.MQTT.Client {
return this.client;
}
/**
* 監(jiān)聽MQTT連線是否斷線
*
* @method public
* @return 回傳一個(gè)Observable,讓呼叫者可以監(jiān)聽MQTT連線是否斷線
*/
public listenConnection(): Observable<boolean> {
return this.connection.asObservable();
}
/**
* MQTT重新連縣
*
* @method public
*/
public reonnect(): Promise<boolean> {
return new Promise<boolean>((resolve, reject) => {
this.client.connect({
cleanSession: false,
onSuccess: () => {
console.log(
'%c ?? MQTT Connection Success ',
'background: #000; color: #5FBA7D; line-height: 26px;'
);
this.status = true;
this.connection.next(true);
resolve(true);
}
});
});
}
/**
* 將MQTT斷線
*
* @method public
*/
public disconnect(): void {
this.client.disconnect();
}
/**
* ---------------------------------------------------------------------------
* @NOTE MQTT訂閱Topic
* ---------------------------------------------------------------------------
*/
/**
* 訂閱MQTT的Topic
*
* @method public
* @param topic 訂閱的Topics
*/
public subscribeTopic(topics: TopicDefinition[]): void {
console.log('Subscribe topic:', topics);
console.log(
'%c ?? Subscribe MQTT Topics ',
'background: #000; color: #5FBA7D; line-height: 26px;'
);
if (this.topics.length === 0) {
this.topics = [];
}
topics.forEach(topic => {
this.topics.push(topic.name);
this.client.subscribe(topic.name, topic.option);
});
}
/**
* 取消訂閱所有的Topics
*
* @method public
*/
public unsubscribeAllTopics(): void {
console.log(
'%c ?? Unsubscribe MQTT All Topics ',
'background: #000; color: #5FBA7D; line-height: 26px;'
);
if (this.topics) {
this.topics.forEach(topic => {
this.client.unsubscribe(topic, null);
});
this.topics = [];
}
}
/**
* 取消訂閱MQTT的特定Topic
*
* @method public
* @param topic MQTT的特定Topic
*/
public unsubscribeTopic(topic: string): void {
console.log(
`%c ?? Unsubscribe MQTT Topics: ${topic}`,
'background: #000; color: #5FBA7D; line-height: 26px;'
);
const topicIndex = this.topics.indexOf(topic);
this.topics.splice(topicIndex, 1);
this.client.unsubscribe(topic, null);
}
/**
* 當(dāng)收到MQTT訂閱的訊息
*
* @method private
* @param payload 訂閱的訊息
*/
private onMessageArrived(payload: Paho.MQTT.Message): void {
// console.log(
// '%c ?? MQTT Message Arrvived ',
// 'background: #000; color: #5FBA7D; line-height: 26px;'
// );
const payloads: Paho.MQTT.Message = {
destinationName: payload.destinationName,
payloadBytes: payload.payloadBytes,
payloadString: this.byteToString(payload.payloadBytes),
duplicate: payload.duplicate,
retained: payload.retained,
qos: payload.qos,
};
this.payload.next(payloads);
}
/**
* 監(jiān)聽MQTT訂閱的訊息
*
* @method public
* @return 回傳一個(gè)Observable,讓呼叫者可以訂閱MQTT的訊息
*/
public listenMessage(): Observable<Paho.MQTT.Message> {
return this.payload.asObservable();
}
/**
* 將MQTT Payload Bytes轉(zhuǎn)成String
*
* @method private
* @param bytes MQTT Payload的Bytes
* @return 回傳轉(zhuǎn)換後的Payload
*/
private decodePayloads(bytes: any): string {
const result = String.fromCharCode(...bytes);
return result;
}
/**
* ---------------------------------------------------------------------------
* @NOTE 送出訊息
* ---------------------------------------------------------------------------
*/
/**
* Publish至MQTT
*
* @method public
* @param topic 目的地Topic
* @param value 要送出的數(shù)據(jù)
*/
public send(topic: string, value: any): void {
const payloads = new Paho.MQTT.Message(value);
payloads.destinationName = topic;
this.client.send(payloads);
}
/**
* ---------------------------------------------------------------------------
* @NOTE UUID
* ---------------------------------------------------------------------------
*/
/**
* 產(chǎn)生UUID
*
* @method public
* @return 回傳UUID
*/
public generateUUID(): string {
return this.generateS4() + this.generateS4() + '-' + this.generateS4() + '-'
+ this.generateS4() + '-' + this.generateS4() + '-' + this.generateS4() +
this.generateS4() + this.generateS4();
}
/**
* 產(chǎn)生16位隨機(jī)碼
* @return 回傳隨機(jī)碼
*/
private generateS4(): string {
return Math.floor((1 + Math.random()) * 0x10000).toString(16).substring(1);
}
byteToString(arr): string {
if (typeof arr === 'string') {
return arr;
}
let str = '';
// tslint:disable-next-line:variable-name
const _arr = arr;
for (let i = 0; i < _arr.length; i++) {
// tslint:disable-next-line:one-variable-per-declaration
const one = _arr[i].toString(2),
v = one.match(/^1+?(?=0)/);
if (v && one.length === 8) {
const bytesLength = v[0].length;
let store = _arr[i].toString(2).slice(7 - bytesLength);
for (let st = 1; st < bytesLength; st++) {
store += _arr[st + i].toString(2).slice(2);
}
str += String.fromCharCode(parseInt(store, 2));
i += bytesLength - 1;
} else {
str += String.fromCharCode(_arr[i]);
}
}
return str;
}
InitMessage(msg): { evt_tp: any, value: any } {
try {
const rawMsg = JSON.parse(msg.payloadString);
// console.log(rawMsg.evt_data.value);
return { evt_tp: rawMsg.evt_tp, value: rawMsg.evt_data.value };
} catch (e) {
console.log(e);
}
}
InitMessageforlist(msg): { linename: any,stationname:any, value: any } {
try {
const rawMsg = JSON.parse(msg.payloadString);
return { linename: rawMsg.evt_data['line'],stationname:rawMsg.evt_data['station'], value: rawMsg.evt_data.value[0] };
} catch (e) {
console.log(e);
}
}
}
MQTT services
最后編輯于 :
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。
相關(guān)閱讀更多精彩內(nèi)容
- 問題 早上使用SC命令部署Service,啟動(dòng)服務(wù)一直報(bào)The **** service on Local Com...
- composer 自己的 git 倉庫出現(xiàn)問題: composer update "hdll/services:d...
- 協(xié)議就是通信雙方的一個(gè)約定,即,表示第1位傳輸?shù)氖裁?、?位傳輸?shù)氖裁础?。在MQTT協(xié)議中,一個(gè)MQTT數(shù)據(jù)包由...
- 如果不了解MQTT的可以看這篇文章http://www.cnblogs.com/yangfengwu/p/7764...
- 在項(xiàng)目開發(fā)過程中,由于需要數(shù)據(jù)的實(shí)時(shí)推送,但是在實(shí)際過程中,用到了MQtt的方法,搜遍網(wǎng)上 那個(gè)的講解,...