import MqttConnection from './mqtt-connetion';

interface subscription {
    id: string;
    callback: (message: string, data: any) => void;
}
export class MqttBroker {
    private static instance: MqttBroker | null = null;
    private mqttConnection = MqttConnection.getInstance();
    private subscribers: Map<string, subscription[]> = new Map<string, subscription[]>();
    private constructor() {
        this.mqttConnection.message$.subscribe((message) => {
            this.subscribers.forEach((subscribers, topic) => {
                if (topic === message.topic) {
                    subscribers.forEach((subscriber) => {
                        subscriber.callback(message.payload.message, message.payload.data);
                    });
                }
            });
        });
    }

    public static getInstance(): MqttBroker {
        if (!MqttBroker.instance) {
            MqttBroker.instance = new MqttBroker();
        }
        return MqttBroker.instance;
    }

    subscribe(id: string, topic: string, callback: (value: string, data: any) => void) {
        const env = process.env['NX_PUBLIC_BUILD_ENV'];
        topic = env + '/' + topic;
        if (this.subscribers.has(topic)) {
            const topicSubscribers = this.subscribers.get(topic)!;
            const foundSubscription = topicSubscribers.find((s) => s.id === id);

            if (foundSubscription) {
                foundSubscription.callback = callback;
                return;
            } else {
                topicSubscribers.push({ id, callback });
                return;
            }
        } else {
            this.subscribers.set(topic, [{ id, callback }]);
            if (this.mqttConnection.topics.includes(topic)) return;
            else this.mqttConnection.subscribe(topic);
        }
    }

    unSubscribe(id: string, topic: string) {
        const env = process.env['NX_PUBLIC_BUILD_ENV'];
        topic = env + '/' + topic;
        if (!this.subscribers.has(topic)) return;
        const topicSubscribers = this.subscribers.get(topic)!;
        const index = topicSubscribers.findIndex((s) => s.id === id);
        if (index === -1) return;
        topicSubscribers.splice(index, 1);
        if (topicSubscribers.length === 0) {
            this.subscribers.delete(topic);
            setTimeout(() => {
                if (!this.subscribers.has(topic)) {
                    this.mqttConnection.unsubscribe(topic);
                }
            }, 2000);
        }
    }
}

export default MqttBroker;

export interface PubSubMessage {
    account_id: string;
    iot_id: string;
    workflow_id: string;
    id?: string;
    data: unknown;
    type: string;
    ids: unknown;
}
