import {
  HubConnection,
  HubConnectionBuilder,
  HubConnectionState,
  RetryContext,
} from "@microsoft/signalr";
import { Injectable, OnDestroy } from "@angular/core";
import { IRealTimeNotification } from "@models";
import { AuthTokenService } from "@core/auth";
import { LoggerService } from "./loggers/logger.service";
import { BehaviorSubject, Observable, Subject } from "rxjs";
import { distinctUntilChanged, map, takeUntil } from "rxjs/operators";
import { SeverityLevel } from "./loggers/severity-level";
import { environment } from "@env/environment";
import { StatusCodes } from "@core/api";
import { MessageVm } from "@core/api/api-client";
import { debug } from "@core/utils";

@Injectable()
export class RealTimeService implements OnDestroy {
  readonly #connected$ = new BehaviorSubject<boolean>(false);
  readonly #message$ = new Subject<MessageVm>();
  readonly #notification$ = new Subject<IRealTimeNotification>();
  readonly #destroy$ = new Subject<void>();
  #hubConnection?: HubConnection;

  /** The state of the connection. */
  get connected$(): Observable<boolean> {
    return this.#connected$.pipe(distinctUntilChanged());
  }

  /** Real time messages stream. */
  get message$(): Observable<MessageVm> {
    return this.#message$.pipe(debug("New realtime message"));
  }

  /** Real time notification stream. */
  get notification$(): Observable<IRealTimeNotification> {
    return this.#notification$.pipe(debug("New realtime notification"));
  }

  /** Indicates the current state of the connection to the serve. */
  get state(): HubConnectionState {
    return this.#hubConnection?.state || HubConnectionState.Disconnected;
  }

  constructor(
    private readonly _tokenSrv: AuthTokenService,
    private readonly _logger: LoggerService
  ) {}

  ngOnDestroy(): void {
    this._disconnect();
    this.#destroy$.next();
    this.#destroy$.complete();
  }

  connect(): void {
    this._tokenSrv.token$
      .pipe(
        map((token) => !!token && token.isValid),
        takeUntil(this.#destroy$)
      )
      .subscribe((isAuthenticated) => {
        if (!isAuthenticated) {
          this._disconnect();
          return;
        }

        if (
          !this.#hubConnection ||
          this.#hubConnection.state === HubConnectionState.Disconnected
        ) {
          this._build();
          this._connect();
        }
      });
  }

  private _build(): void {
    this.#hubConnection = new HubConnectionBuilder()
      .withAutomaticReconnect({
        nextRetryDelayInMilliseconds: this._exponentialRetryPolicy,
      })
      .withUrl(environment.apiUrl + "/hub", {
        accessTokenFactory: () => this._tokenSrv.token.value,
      })
      .build();

    // Register any new real-time methods here.
    this.#hubConnection.on("message", (data: MessageVm) => {
      this.#message$.next(data);
    });

    this.#hubConnection.on("notify", (data: IRealTimeNotification) => {
      this.#notification$.next(data);
    });
  }

  private _connect(): void {
    if (!this.#hubConnection) {
      return;
    }

    this.#hubConnection
      .start()
      .then(() => {
        this._logger.logTrace(
          `@Hub connection started [${this.#hubConnection.connectionId}]`
        );
        this.#connected$.next(true);
      })
      .catch((err) => {
        if (err && err.status === StatusCodes.Unauthorized) {
          this._tokenSrv.clear();
          return;
        }

        this._logger.logException(err, SeverityLevel.Error, {
          message: "Failed to connect to hub",
        });
      });

    this.#hubConnection.onreconnected((_) => this.#connected$.next(true));
    this.#hubConnection.onclose((_) => this.#connected$.next(false));
  }

  private _disconnect(): void {
    if (!this.#hubConnection) {
      return;
    }

    this.#hubConnection.stop().then(
      () => {
        this.#hubConnection = undefined;
        this._logger.logTrace("@Hub connection closed...");
      },
      (err) =>
        this._logger.logException(err, SeverityLevel.Error, {
          message: "Failed to stop hub connection",
        })
    );
  }

  /** Exponential retry to reconnect for 3 minutes and then stop. */
  private _exponentialRetryPolicy(retryContext: RetryContext): number | null {
    // start with 100 milliseconds.
    if (retryContext.previousRetryCount === 1) {
      return 100;
    }

    // stop after 3min.
    if (retryContext.elapsedMilliseconds > 180000) {
      return null;
    }

    // log after the 5th failure.
    if (retryContext.previousRetryCount > 5) {
      this._logger.logException(retryContext.retryReason, SeverityLevel.Error, {
        message: `Can not establish hub connection. retry #${
          retryContext.previousRetryCount - 5
        }`,
      });
    }

    return 2 * retryContext.elapsedMilliseconds;
  }
}
