import {
  BehaviorSubject,
  defer,
  Observable,
  ReplaySubject,
  share,
  Subject,
  switchMap,
} from "rxjs";
import {catchError, finalize, takeUntil} from "rxjs/operators";

/**
 * Класс создает очередь из {@link Observable}<br>
 * Следующий стрим выполнится только после завершения предыдущего стрима<br>
 * Действие при ошибке стрима в очереди определяются параметром переданным в конструктор<br>
 */
export class ObservableQueue{
  public static readonly awaitErrorMessage = 'Предыдущий стрим в очереди завершился ошибкой';

  /** Был ли вызван метод {@link onDestroy} */
  private isDestroyed = false;
  private _unsubscribe$ = new ReplaySubject<any>(1);

  /** Очередь действующих стримов */
  private queues: Subject<any>[] = [];

  /** Длина очереди */
  public get length(){
    return this.queues.length;
  }

  /**
   * Конструктор
   * @param throwIfPreviousObservableThrown завершить все последующие стримы ошибкой {@link awaitErrorMessage}, если текущий завершился ошибкой.
   */
  constructor(private throwIfPreviousObservableThrown: boolean) {
  }

  /**
   * Добавить стрим в очередь.<br>
   * ВНИМАНИЕ: Добавление в очередь произойдет только в момент подписки на возвращаемый стрим.<br>
   * ВНИМАНИЕ: Порядок в очереди == порядку подписке на результирующий стрим
   * ВНИМАНИЕ: действие со следующими стримами при ошибке текущего зависят от {@link throwIfPreviousObservableThrown}
   * @param stream$ стрим
   */
  public push$<T>(stream$: Observable<T>): Observable<T>{
    if(this.isDestroyed){
      throw new Error('Объект разрушен');
    }

    return defer(() => {
      const await$: Subject<any> = this.length === 0 ? new BehaviorSubject(null) : new Subject<any>(); //если нет стримов то сразу запустит первый
      this.queues.push(await$);

      let hasError = false;

      return await$.pipe(
        switchMap(value => stream$.pipe(
          finalize(() => {await$.complete()}) // БЕЗ ЭТОГО НЕ ЗАПУСКАЕТ finalize ниже
        )),
        catchError((err, caught) => {
          hasError = true;
          throw err;
        }),
        finalize(() => {
          this.complete(await$);
          this.emitFirst(hasError && this.throwIfPreviousObservableThrown ? ObservableQueue.awaitErrorMessage : undefined);
        })
      );
    }).pipe(
      takeUntil(this._unsubscribe$),
      share()
    )
  }

  /** Завершить стрим ожидания */
  private complete(await$: Subject<any>){
    this.removeStream$(await$);
    await$.complete();
  }

  /** Запустить, или сообщить об ошибке, первый стрим ожидания. Под индексом [0], если имеется */
  private emitFirst(errorMessage: string){
    if(this.length === 0){
      return;
    }

    if(!errorMessage){
      this.queues[0].next(null);
    } else {
      this.queues[0].error(new Error(errorMessage));
    }
  }

  /** Освободить ресурсы. Все стримы в очереди будут завершены */
  public onDestroy(){
    if(!this.isDestroyed){
      this.isDestroyed = true;
      this._unsubscribe$.next(null);
      this._unsubscribe$.complete();
    }
  }

  /** Удалить из очереди стрим */
  private removeStream$(stream$: Observable<any>){
    this.queues = this.queues.filter(x => x !== stream$);
  }
}
