/*
 * COPYRIGHT Motorola Solutions, INC.
 * ALL RIGHTS RESERVED.
 * MOTOROLA SOLUTIONS CONFIDENTIAL RESTRICTED
 */

import { Injectable, OnDestroy } from '@angular/core';
import { RxStomp, RxStompConfig, RxStompState } from '@stomp/rx-stomp';
import { Store } from '@ngrx/store';
import { from, of, Subject } from 'rxjs';
import { debounceTime, filter, takeUntil } from 'rxjs/operators';
import { updateStompConnectionState } from '../../call/+state/media.actions';
import { StompConsumer } from '../abstract/stomp-consumer';
import { selectPrimaryCtcStompConnectionProperties } from '../../user/+state/user.selectors';

@Injectable({
    providedIn: 'root'
})
export class StompService extends RxStomp implements OnDestroy {
    private unsubscribe$ = new Subject<void>();

    constructor(
        private store: Store
    ) {
        super();
        this.connectionState$.pipe(takeUntil(this.unsubscribe$)).subscribe((connectionState) => {
            console.info(`Stomp current connection state: ${RxStompState[connectionState]}`);
            this.store.dispatch(updateStompConnectionState({ rxStompState: connectionState }));
        });
        this.store
            .select(selectPrimaryCtcStompConnectionProperties)
            .pipe(
                filter(({ url, token }) => !!url && !!token),
                debounceTime(250)
            )
            .subscribe(({ url, token }) => {
                let configuration = this.getConfiguration(url, token);
                console.info(`Updating websocket configuration ${configuration.brokerURL}`);
                let hasNewBroker = this.stompClient.brokerURL !== configuration.brokerURL;
                this.configure(configuration);
                if (hasNewBroker) {
                    console.info(`Activating websocket: ${this.stompClient.brokerURL}`);
                    this.deactivate().then(() => this.activate());
                }
            });
    }

    ngOnDestroy(): void {
        this.unsubscribe$.next();
        this.unsubscribe$.complete();
        this.disconnectWebSocket();
    }

    private getConfiguration(url: string, token: string): RxStompConfig {
        let brokerUrl = new URL('/api/spring-websocket', url);
        brokerUrl.protocol = brokerUrl.protocol.replace('https', 'wss');
        brokerUrl.protocol = brokerUrl.protocol.replace('http', 'ws');
        return {
            brokerURL: brokerUrl.href,
            connectHeaders: {
                'X-Bearer-Token': `${token}`
            },
            reconnectDelay: 10000,
            heartbeatIncoming: 5000,
            heartbeatOutgoing: 5000
        };
    }

    public watchAsync(topic: string, receiptId: string, stompConsumer: StompConsumer) {
        let receipt = { receipt: receiptId, id: topic.replace('/', "").replaceAll('/', '-') };

        this.connected$.pipe(takeUntil(this.unsubscribe$)).subscribe(() =>
            this.asyncReceipt(receiptId).then(() => {
                stompConsumer.handleSubscriptionInit(topic, receiptId);
            })
        );

        this.connectionState$
            .pipe(
                filter((connectionState) => connectionState === RxStompState.CLOSED),
                takeUntil(this.unsubscribe$)
            )
            .subscribe(() => stompConsumer.handleSubscriptionEnd(topic, receiptId));

        this.stompErrors$.pipe(takeUntil(this.unsubscribe$)).subscribe((frame) => stompConsumer.handleStompError(topic, receiptId, frame.body));

        return super.watch(topic, receipt);
    }

    public connectWebSocket() {
        if (!this.connected()) {
            console.info(`Activating websocket`);
            this.activate();
        }
        return of(true);
    }

    public disconnectWebSocket() {
        console.info(`Deactivating websocket`);
        return from(this.deactivate());
    }
}
