import { Logger } from '@frontend/Logger';
import mqtt from 'mqtt';
import { Subject } from 'rxjs';

export default class MqttConnection {
    private static instance: MqttConnection | null = null;
    private options: mqtt.IClientOptions = {
        host: process.env['NX_PUBLIC_MQTT_HOST'],
        port: process.env['NX_PUBLIC_MQTT_PORT'] as unknown as number,
        protocol: process.env['NX_PUBLIC_MQTT_PROTOCOL'] as unknown as mqtt.MqttProtocol,
        path: process.env['NX_PUBLIC_MQTT_PATH'] || '/',
        username: process.env['NX_PUBLIC_MQTT_USERNAME'],
        password: process.env['NX_PUBLIC_MQTT_PASSWORD'],
        log: (args) => Logger.debug('MQTT log', {}, args)
    };
    private client: mqtt.MqttClient | null = null;
    private _topics: string[] = [];
    public get topics(): string[] {
        return this._topics;
    }

    private _messageSubject = new Subject<{ topic: string; payload: { message: string; data: any } }>();
    public get message$() {
        return this._messageSubject.asObservable();
    }

    private constructor() {
        this.client = mqtt.connect(this.options);
        this.init(this.client);
    }

    static getInstance(): MqttConnection {
        if (this.instance == null) {
            this.instance = new MqttConnection();
        }
        return this.instance;
    }

    onMessage(topic: string, message: Uint8Array): any {
        Logger.log('MQTT message received', {}, topic);
        this._messageSubject.next({ topic, payload: JSON.parse(new TextDecoder().decode(message)) });
    }

    onConnect(): void {
        Logger.log('Connected to MQTT broker');
    }

    onError(event: any): any {
        Logger.error('MQTT connection error', {}, event);
    }

    onDisconnect(): void {
        Logger.log('Disconnected from MQTT broker');
    }

    async subscribe(topic: string): Promise<void> {
        await this.client!.subscribeAsync(topic).then((res) => {
            this.topics.push(topic);
        });
    }

    async unsubscribe(topic: string): Promise<void> {
        await this.client!.unsubscribeAsync(topic).then((res) => {
            if (this.topics.indexOf(topic) !== -1) this.topics.splice(this.topics.indexOf(topic), 1);
        });
    }

    private init(client: mqtt.MqttClient) {
        client.on('connect', this.onConnect.bind(this));
        client.on('message', this.onMessage.bind(this));
        client.on('error', this.onError.bind(this));
    }
}
