import {Injectable, OnDestroy} from "@angular/core";
import {SignalRService} from "./signal-r.service";
import {
  from,
  Observable,
  ReplaySubject,
  retry,
  skipWhile,
  Subject, switchMap, tap,
} from "rxjs";
import {filter, map, takeUntil} from "rxjs/operators";
import {AlertService} from "../alert.service";
import {ISendObj} from "./i-send-obj";
import {
  DeepSerializerForServerService
} from "../object-serializers/deep-serializer-for-server-services/deep-serializer-for-server.service";
import {TracerServiceBase} from "../../modules/trace/tracers2/trace-services/tracer-base.service";
import {traceClass} from "../../modules/trace/decorators/class.decorator";
import {traceFunc} from "../../modules/trace/decorators/func.decorator";
import {RetryStorage} from "../../classes/retry-storage.class";
import {CustomRetryConfig} from "../../pipes/classes/custom-retry-config";

/** Тип {@link ISendObj} возвращаемый сервером */
type SendServerObjType = Omit<ISendObj<any>, 'isSelfInitiator'>;

/**
 * Hub для подписи/отписи прослушки методов signalR сервера
 * Данный hub сообщает серверу, что данный пользователю необходимо прослушивать методы signalR
 * На стороне сервера сообщения методов отправляются только тем пользователям, которые подписаны на него(для оптимизации трафика)
 */
@Injectable({providedIn: "root"})
@traceClass('SignalRHub')
export class SignalRHub implements OnDestroy{
  private readonly subscribeMethodName = 'SubscribeToMethods';
  private readonly unsubscribeMethodName = 'UnsubscribeFromMethods';

  /** Хранение подписок */
  private readonly storage = new SubscribersStorage();

  /** Стримы */
  private readonly streams$ = {
    unsubscribe: new ReplaySubject<any>(1)
  }

  /** Конструктор */
  public constructor(public readonly signalRService: SignalRService,
                     public readonly deepSerializerForServerService: DeepSerializerForServerService,
                     public readonly alertService: AlertService,

                     private readonly traceService: TracerServiceBase) {
    //При подключении signalR, так как это новое подключение, отправляем используемые подписи на сервер
    this.signalRService.hasConnection$
      .pipe(
        filter(value => value), //Только если соединение установлено
        map(() => this.storage.getAllMethods()),
        filter(value => value.length > 0), //Только если имеются подписи
        switchMap(methods => this.send$(this.subscribeMethodName, methods)),
      )
      .subscribe();
  }

  /**
   * Подписаться на прослушку метода signalR.<br>
   * Если уже есть подпись, вернет готовый стрим.<br>
   * Если подпись первая, то отправит запрос серверу на прослушку.<br>
   */
  @traceFunc()
  public subscribe<TData>(method: string): Observable<ISendObj<TData>>{
    return this.subscribes(method).get(method);
  }

  /**
   * Подписаться на прослушку нескольких методов signalR.<br>
   * Для каждого переданного метода будет выполнен принцип описанный в {@link subscribe}.<br>
   * @param methods Методы
   * @return {@link Map} где ключ это метод, а значение это стрим
   */
  @traceFunc()
  public subscribes(...methods: string[]): Map<string, Observable<ISendObj<any>>>{
    const result = new Map<string, Observable<ISendObj<any>>>();
    /** То что нужно отправить на сервер для подписки */
    const toSendSet = new Set<string>();

    for (let method of methods) {
      let stream$ = this.storage.getSubject(method);

      if (stream$) { //Если уже есть подпись
        this.storage.manage(method, 'subscribe');
        result.set(method, stream$);
        continue;
      }

      //Если подписи нет
      toSendSet.add(method);
      stream$ = new Subject<ISendObj<any>>();
      this.storage.add(method, stream$);
      result.set(method, stream$);

      this.signalRService.hubConnection.on(method, (value: SendServerObjType) => {
        const deserialized: SendServerObjType = this.deepSerializerForServerService.deserialize(value);

        const sendObj: ISendObj<any> = {
          ...deserialized,
          isSelfInitiator: deserialized.tokenGuid === this.signalRService.hubConnectionTokenGuid
        }

        stream$.next(sendObj);
      });
    }

    //Отправка подписей на сервер
    if(toSendSet.size > 0 && this.signalRService.hasConnection) {
      this.send$(this.subscribeMethodName, Array.from(toSendSet))
        .subscribe();
    }

    return result;
  }

  /**
   * Отписаться от прослушки метода signalR.<br>
   * Если больше подписантов к данному методу signalR нет, то отпишется от прослушки сервера
   * @return true если отписался от прослушки сервера
   */
  @traceFunc()
  public unsubscribe(method: string): boolean{
    return this.unsubscribes(method).get(method);
  }

  /**
   * Отписаться от прослушки методов signalR.<br>
   * Для каждого переданного метода будет выполнен принцип описанный в {@link unsubscribe}.<br>
   * @param methods методы подлежащие отписки
   * @return {@link Map} где ключ это метод, а значение это отписался от прослушки сервера или нет
   */
  @traceFunc()
  public unsubscribes(...methods: string[]): Map<string, boolean>{
    const result = new Map<string, boolean>();
    /** То что нужно отправить на сервер для подписки */
    const toSendSet = new Set<string>();

    for (let method of methods) {
      const isSubscriptionLeft = this.storage.manage(method, 'unsubscribe');
      if(isSubscriptionLeft){ //Если еще имеются подписки
        result.set(method, false);
        continue;
      }

      //Если подписок больше нет
      toSendSet.add(method);
      result.set(method, true);
      this.signalRService.hubConnection.off(method);
    }

    if(toSendSet.size > 0 && this.signalRService.hasConnection){ //Только если есть подключение signalR
      this.send$(this.unsubscribeMethodName, Array.from(toSendSet))
        .subscribe();
    }

    return result;
  }

  /** Отправить подписи на сервер */
  @traceFunc()
  private send$(method: typeof this.subscribeMethodName | typeof this.unsubscribeMethodName,
                methods: string[]){
    return this._send$(method, methods)
      .pipe(
        takeUntil(this.signalRService.hasConnection$.pipe(skipWhile(value => value))), //Пока есть подключение singalR
        takeUntil(this.streams$.unsubscribe)
      )
  }

  /** Отправить подписи на сервер. Внутренний метод */
  private _send$(method: typeof this.subscribeMethodName | typeof this.unsubscribeMethodName,
                 methods: string[]){
    const methodsAsStr = methods.join(', ');
    return from(this.signalRService.hubConnection.send(method, methods))
      .pipe(
        retry(new CustomRetryConfig(
          new RetryStorage(100, {count: 4, delay: 500}, {count: 200, delay: 1000}),
          this.signalRService.serverTimeout * 2 , //Дальше будет переподключение
          (err, retryCount) => this.traceService.add(`Повтор ${retryCount} выполнить метод ${method} signalR с передачей ${methodsAsStr}`)
        )),
        tap(() => this.traceService.add(`Удачно выполнен метод ${method} signalR с передачей ${methodsAsStr}`)),
      );
  }

  /** @inheritDoc */
  @traceFunc()
  public ngOnDestroy() {
    this.streams$.unsubscribe.next(null);
    this.streams$.unsubscribe.complete();
    this.storage.onDestroy();
  }
}

/** Класс хранения подписок */
class SubscribersStorage{
  /** {@link Map} для хранения метод/подписка */
  private readonly _map = new Map<string, SubscribersStorage_Subscription>();

  /** Содержится ли метод в хранилище */
  public has(method: string){
    return this._map.has(method);
  }

  /** Получить {@link Subject} по методу signalR. */
  public getSubject(method: string): Subject<any> | undefined {
    return this._map.get(method)?.subject;
  }

  /**
   * Добавление в хранилище
   * @exception Error Если такой метод присутствует
   */
  public add(method: string, subject: Subject<ISendObj<any>>){
    if(this.has(method)){
      throw new Error(`В массиве уже содержится элемент с методом ${method}`)
    }

    this._map.set(
      method,
      new SubscribersStorage_Subscription(method, subject)
    )
  }

  /**
   * Управлять подписками.
   * @return true если после действий еще имеются подписчики
   * @exception Error если элемент по методу НЕ найден
   */
  public manage(method: string, value: 'subscribe' | 'unsubscribe'): boolean{
    const item = this._map.get(method);
    if(!item){
      throw new Error('Элемент отсутствует в массиве')
    }

    switch (value){
      case 'subscribe':
        item.increment();
        return true;
      case 'unsubscribe':
        item.decrement();
        if(!item.hasSubscribers){
          this.remove(item.method)
          return false;
        }
        return true;
      default: throw new Error('out of range');
    }
  }

  /** Получить все методы */
  public getAllMethods(): string[]{
    return Array.from(this._map.keys());
  }

  /** Разрушить объект */
  public onDestroy(){
    for (let value of this._map.values()) {
      value.onDestroy();
    }

    this._map.clear();
  }

  /**
   * Удаление из хранилища.
   * @exception Error Если отсутствует элемент с методом
   * @exception Error Если у элемента есть подписчики
   */
  private remove(method: string){
    const subscribersStorageItem = this._map.get(method);

    if(!subscribersStorageItem){
      throw new Error(`В массиве отсутствует элемент с методом === '${method}'`)
    }

    if(subscribersStorageItem.hasSubscribers){
      throw new Error('Не допускается удаление из хранилища метод signalR если у него есть подписчики')
    }

    this._map.delete(method);
    subscribersStorageItem.onDestroy();
  }
}

/** Подписка на метод signalR */
class SubscribersStorage_Subscription {
  /** Разрушен ли объект */
  private isDestroyed: boolean;
  /** Количество подписок */
  private _count: number;

  /**
   * Конструктор
   * @param method метод signalR который слушает подписка
   * @param subject стрим трансляции из метода
   * @param стартовое количество подписок
   */
  constructor(public readonly method: string,
              public readonly subject: Subject<ISendObj<any>>,
              count: number = 1) {
    if(!method){ throw new Error('Не допускается добавление с !method'); }
    if(!subject){ throw new Error('Не допускается добавление с !subject') }
    if(typeof count !== 'number'){ throw new Error('Тип count переданного в конструктор !== number'); }
    if(count < 0){ throw new Error('Параметр count переданный в конструктор НЕ может быть меньше 0'); }

    this._count = count;
  }

  /** Добавить слушателя в подписку */
  public increment(){
    this.checkDestroyed();
    this._count += 1;
  }

  /** Убрать слушателя из подписки */
  public decrement(){
    this.checkDestroyed();
    if(this._count < 1){
      throw new Error('Количество подписчиков НЕ может быть отрицательным');
    }

    this._count -= 1;
  }

  /** Количество слушателей */
  public get count(){
    this.checkDestroyed();
    return this._count;
  }

  /** Отсутствуют ли слушатели */
  public get hasSubscribers(){
    this.checkDestroyed();
    return this._count > 0;
  }

  /**
   * Разрушить
   * @exception Error если {@link count} !== 0
   */
  public onDestroy(){
    this.checkDestroyed();
    this._count = 0;
    this.isDestroyed = true;
    this.subject.complete();
  }

  /** Проверка - разрушен ли объект */
  private checkDestroyed(){
    if(this.isDestroyed){
      throw new Error('Подписка уже разрушена');
    }
  }
}
