import { fabric } from '@grenton/gm-common';
import { SimpleMqttClient } from '@grenton/gm-logic';
import log from 'loglevel';

export class JsonMqttRpcClient {
    private pendingRequests = new Map<string, (response: fabric.JsonRpcResponse) => void>();

    constructor(private client: SimpleMqttClient) {
        this.client.onmessage.subscribe((msg) => {
            const onResponseCallback = this.pendingRequests.get(msg.topic);
            if (onResponseCallback) {
                log.info('mqtt - received rpc response', msg.payload);
                onResponseCallback(msg.payload);
            }
        });
    }

    async execute(request: fabric.JsonRpcRequest, topic: string, responseTopic?: string, requestTimeout = 10000): Promise<fabric.JsonRpcResponse | undefined> {
        return new Promise((resolve, rejected) => {
            if (request.id && responseTopic) {
                this.client.subscribe(responseTopic, { qos: 0 });

                let timerId: NodeJS.Timeout;
                const clean = () => {
                    this.pendingRequests.delete(responseTopic);
                    this.client.unsubscribe(responseTopic);
                    clearTimeout(timerId);
                };

                this.pendingRequests.set(responseTopic, (response: fabric.JsonRpcResponse) => {
                    clean();
                    resolve(response);
                });

                timerId = setTimeout(() => {
                    clean();
                    rejected(new Error('Request timed out'));
                }, requestTimeout);

                log.info('mqtt - send rpc request', request);
                this.client.publish(topic, request, { qos: 0 });
            } else {
                log.info('mqtt - send rpc notification', request);
                this.client.publish(topic, request, { qos: 0 });
                resolve(undefined);
            }
        });
    }
}

export class RpcError extends Error {
    constructor(readonly cause: fabric.JsonRpcError) {
        super(cause.message);
    }
}
