import { Injectable } from '@angular/core';
import { RxStomp, RxStompConfig, RxStompState } from '@stomp/rx-stomp';
import { Observable, Subscription } from 'rxjs';
import { filter } from 'rxjs/operators';
import * as SockJS from 'sockjs-client';
import { environment } from 'src/environments/environment';
import { SessionService } from '../session/session.service';
import { MessageObserver, MessagingMessage } from './messaging.model';


@Injectable({
    providedIn: 'root'
})
export class MessagingService {

    constructor(
    ) { }

    private rxStomp = new RxStomp();  //library which provides RxJs STOMP over websocket
    private config: RxStompConfig; //Represents a configuration object
    private socketDisconnectedTimer: any //NodeJS.Timer;
    private socketRetryCount = 0;

    private init(): void {
        const me = this;
        if (!me.rxStomp.active) {

            me.config = {
                // Keep it off for production, it can be quit verbose
                // Skip this key to disable
                debug(str) {
                    // tslint:disable-next-line: no-console
                },

                // If disconnected, it will retry after 200ms
                reconnectDelay: 3000,

                webSocketFactory: () => {
                    //function returns a websocket or similar object
                    return new SockJS(
                        (environment?.api?.core?.base + '/ws').toString(),
                        null,
                        {

                        }
                    );
                }
            };

            me.rxStomp.configure(me.config);   // sets configuration, may be called several times and each call will add to the existing configuration
            me.rxStomp.activate();   //initiates the connection
        }

        // Socket disconnection event capture

        const socketDisconnected = me.rxStomp.connectionState$.pipe(  //connectionState will emit current state
            filter((currentState: RxStompState) => {
                return currentState === RxStompState.CLOSED;
            })
        );

        if (me.socketDisconnectedTimer) {
            clearTimeout(me.socketDisconnectedTimer);
        }

        // will be called for every disconnect

        me.socketDisconnectedTimer = setTimeout(() => {
            socketDisconnected.subscribe(() => {
                me.socketRetryCount++;
                if (me.socketRetryCount === 5) {
                    // me.alertDialog();
                    me.rxStomp.deactivate();
                }
            });
        }, 0);

    }

    //watch() will subscribe to server message queues

    watch(destination: string): Observable<MessagingMessage> {
        const me = this;

        const output: MessageObserver<MessagingMessage> = new MessageObserver<MessagingMessage>();

        me.init();

        const subscription: Subscription = me.rxStomp.watch(destination).subscribe((message) => {
            try {
                const m: MessagingMessage = JSON.parse(message.body); //converts string into javascript object
                output.next(m);

            } catch (e) {
                output.error(e);

            }
        }, (e: any) => {
            output.error(e);

        });

        output.onUnsubscribe(() => {     //takes no argument and disposes the resource held by subscription
            if (subscription) {
                subscription.unsubscribe();

            }
        });
        return output;
        
    }

    send(destine: string, message: MessagingMessage | any) {
        const me = this;
        me.init();
        me.rxStomp.publish({         // publish willl send the message to named destination , body must be string
            destination: destine,
            body: JSON.stringify(message)
        });
    }
}
