import * as _ from 'lodash';
import * as moment from 'moment';
import { EMPTY, merge, Observable, of, Subject, Subscription, throwError } from 'rxjs';
import {
  catchError,
  delay,
  exhaustMap,
  filter,
  map,
  mergeMap,
  repeatWhen,
  startWith,
  takeWhile,
  tap,
} from 'rxjs/operators';
import { Logger } from '@shared/services/logger/logger';
import { RetryModel, RetryPriority, RetryType } from '../../../dta/shared/models-api-loop/retry.model';
import { BaseModel } from '../../../dta/shared/models-api-loop/base/base.model';
import { CommentBaseModel, CommentModel } from '../../../dta/shared/models-api-loop/comment/comment.model';
import { CONSTANTS } from '@shared/models/constants/constants';
import { OnDestroy } from '@angular/core';
import { AutoUnsubscribe } from '../../../dta/shared/utils/subscriptions/auto-unsubscribe';
import { Encryption, isWebApp } from '../../../dta/shared/utils/common-utils';
import { SharedUserManagerService } from '../../../dta/shared/services/shared-user-manager/shared-user-manager.service';
import { LogLevel, LogTag } from '../../../dta/shared/models/logger.model';
import { SyncServiceStatus } from '@shared/models/sync/synchronization-service-status.model';
import { DataServiceShared } from '@shared/services/data/data.service';

@AutoUnsubscribe()
export class RetrySynchronizationService implements OnDestroy {
  // State variables
  active: boolean = false;
  static dbLoaderPageSize: number = 20;
  static maxRetryCount: number = 5;
  private verboseLogging: boolean = true; // enable for extra logging (for i.e. debugging)

  // DB watch variables
  private _pollDB$: Subject<void> = new Subject<void>();

  // In memory retry queue
  private inMemoryRetryQueue: InMemoryQueue;
  private _inMemoryRetryQueueEmpty$: Subject<boolean> = new Subject<boolean>();

  // Triggers
  private _insertIntoQueue$: Subject<RetryModel[]> = new Subject();

  //////////////////
  // Subscriptions
  //////////////////
  private dbLoaderSub: Subscription;
  private retryWorkerSub: Subscription;
  private dbWatchAllSub: Subscription;

  constructor(
    private _userEmail: string,
    private _data: DataServiceShared,
    private _sharedUserManagerService: SharedUserManagerService,
  ) {}

  ngOnDestroy() {}

  get constructorName(): string {
    return 'RetrySynchronizationService';
  }

  /**
   * Start retry synchronization
   */
  start() {
    if (this.active || !this._sharedUserManagerService.isUserAccountVerified(this._userEmail)) {
      return;
    }

    Logger.log('[SYNC] - RetrySync [%s]: started', this._userEmail);
    this.active = true;

    // Init queue
    this.inMemoryRetryQueue = new InMemoryQueue();

    // Start both processes
    this.startDBLoaderWorker();
    this.startRetryWorker();
  }

  /**
   * Stop active synchronization
   */
  stop() {
    this.active = false;

    this.dbLoaderSub?.unsubscribe();
    this.retryWorkerSub?.unsubscribe();

    Logger.log('[SYNC] - RetrySync [%s]: stopped', this._userEmail);
  }

  /**
   * Persist new retry entry or entries
   */
  enqueueSynchronization(data: RetryModel | RetryModel[]) {
    let models = _.castArray(data);

    // Filter out duplicates
    let uniqueModels = _.uniqBy(models, (model: RetryModel) => model.data._id);

    this.filterOutEntriesAlreadyInQueue(uniqueModels)
      .pipe(
        /**
         * Log for better debugging
         */
        tap((uniqueModels: RetryModel[]) => {
          let logMessage = `Models scheduled for retry queue for ${this._userEmail}: `;
          _.forEach(uniqueModels, (uniqueModel: RetryModel) => {
            logMessage += `{id: ${uniqueModel.data.id}, retryType: ${uniqueModel.retryType}}`;
          });

          Logger.customLog(logMessage, LogLevel.INFO, LogTag.SYNC);
        }),
        mergeMap((uniqueModels: RetryModel[]) => {
          this._insertIntoQueue$.next(uniqueModels);

          return this._data.RetryQueueService.saveAllToQueue(this._userEmail, uniqueModels);
        }),
      )
      .subscribe();
  }

  /**
   * Remove entry from retry queue
   */
  dequeueSynchronization(data: BaseModel | BaseModel[]) {
    let models = _.castArray(data);
    let modelsByDataId = _.map(models, (model: BaseModel) => model._id);

    this._data.RetryQueueService.removeByDataId(this._userEmail, modelsByDataId).subscribe();
  }

  getStatus(): SyncServiceStatus {
    return this.active ? SyncServiceStatus.ACTIVE : SyncServiceStatus.INACTIVE;
  }

  /**
   * Start DB loader (loads retry entry's from DB in batch)
   */
  private startDBLoaderWorker() {
    this.dbLoaderSub?.unsubscribe();
    this.dbLoaderSub = merge(
      this.watchRetryQueueForHighPriorityEntries(), // Based on priority
      this.watchRetryForQueueNewEntries(), // Based on new inserts
      this.watchRetryQueueForRetryableEntries(), // Based on "RetryAfter"
    )
      .pipe(
        exhaustMap(() => {
          if (this.verboseLogging) {
            Logger.log('[SYNC] - RetrySync [%s]: will fill in memory queue', this._userEmail);
          }
          return this.fillInMemoryQueue();
        }),
        catchError(err => {
          Logger.error(err, `[SYNC] - RetrySync [${this._userEmail}]: DBLoaderWorker failed`, LogTag.SYNC);

          this.stop();

          return EMPTY;
        }),
      )
      .subscribe();

    /**
     * Init db watches
     */
    this.initWatchRetryQueueForRetryableEntries();

    Logger.log('[SYNC] - RetrySync [%s]: DBLoaderWorker started', this._userEmail);
  }

  /**
   * Start Retry worker (retry sync or requeue)
   */
  private startRetryWorker() {
    this.retryWorkerSub?.unsubscribe();
    this.retryWorkerSub = this.inMemoryQueueNotEmpty()
      .pipe(
        exhaustMap(() => {
          if (this.verboseLogging) {
            Logger.log('[SYNC] - RetrySync [%s]: will retry entries from queue', this._userEmail);
          }

          return this.retryAll();
        }),
        catchError(err => {
          Logger.customLog(`[SYNC] - RetrySync [${this._userEmail}]: RetryWorker failed`, LogLevel.ERROR, LogTag.SYNC);

          this.stop();

          return EMPTY;
        }),
      )
      .subscribe();

    Logger.log('[SYNC] - RetrySync [%s]: RetryWorker started', this._userEmail);
  }

  //////////////////
  // RETRY HANDLERS
  //////////////////

  private retryAll() {
    return of(undefined).pipe(
      mergeMap(() => {
        let retryEntry = this.inMemoryRetryQueue.getFirst();

        if (retryEntry === undefined) {
          return EMPTY;
        }

        let retryAction;

        switch (retryEntry.retryType) {
          case RetryType.GET_COMMENT_HTML_BODY:
          case RetryType.GET_COMMENT_503:
            retryAction = this.retryGetComment(retryEntry);
            break;
          case RetryType.GET_FILE:
            retryAction = this.retryGetFile(retryEntry);
            break;
          case RetryType.GET_FILE_PREVIEW:
            retryAction = this.retryGetFilePreview(retryEntry);
            break;
          default:
            throw new Error('Retry type not supported: ' + retryEntry.retryType);
        }

        // Retry and remove from queue as last action
        return retryAction.pipe(
          tap(() => {
            this.inMemoryRetryQueue.removeFirst();
          }),
        );
      }),
      /**
       * Process queue until it is empty
       */
      repeatWhen((notifier: Observable<any>) => {
        return notifier.pipe(
          takeWhile(() => {
            let hasMore = this.inMemoryRetryQueue.size() > 0;

            if (!hasMore) {
              this._inMemoryRetryQueueEmpty$.next(true);
            }

            return hasMore && this.active;
          }),
        );
      }),
    );
  }

  private retryGetComment(retryEntry: RetryModel): Observable<any> {
    if (this.verboseLogging) {
      Logger.log('[SYNC] - RetrySync [%s]: will retry comment get', this._userEmail);
    }

    return this._data.CommentService.updateCommentBody(
      this._userEmail,
      CommentBaseModel.create(retryEntry.data),
      false,
    ).pipe(
      mergeMap((response: CommentModel) => {
        if (!this._data.CommentService.hasBody(response)) {
          Logger.customLog(
            `${this.constructorName} fetch for ${response.id} successful but wrong body.` +
              (isWebApp() ? '' : ` Got ${Encryption.encrypt(JSON.stringify(response))}`),
            LogLevel.WARN,
            [LogTag.SYNC, LogTag.INTERESTING_ERROR],
            true,
            'Wrong response for comment retry',
          );

          return throwError('Wrong response for comment retry');
        }

        // Save updated comment body
        return this._data.CommentService.save(this._userEmail, response).pipe(
          mergeMap(() => {
            // Remove from retry queue
            return this.handleRetrySuccessful(retryEntry);
          }),
        );
      }),
      catchError(err => {
        // Retry in any case
        return this.handleBackToRetryQueue(retryEntry);
      }),
    );
  }

  private retryGetFile(retryEntry: RetryModel): Observable<any> {
    if (this.verboseLogging) {
      Logger.log('[SYNC] - RetrySync [%s]: will retry file get', this._userEmail);
    }
    throw new Error('Method not implemented');
  }

  private retryGetFilePreview(retryEntry: RetryModel): Observable<any> {
    if (this.verboseLogging) {
      Logger.log('[SYNC] - RetrySync [%s]: will retry file preview get', this._userEmail);
    }
    throw new Error('Method not implemented');
  }

  ///////////////////
  // PRIVATE HELPERS
  ///////////////////

  /**
   * Watch queue for HIGH priority entries and escalate them right away
   */
  private watchRetryQueueForHighPriorityEntries(): Observable<any> {
    return this._insertIntoQueue$.asObservable().pipe(
      filter((models: RetryModel[]) =>
        _.some(models, (model: RetryModel) => model.retryCount === 0 && model.retryPriority === RetryPriority.HIGH),
      ),
      delay(100),
    );
  }

  /**
   * Watch queue for HIGH priority entries and escalate them right away
   */
  private watchRetryForQueueNewEntries(): Observable<any> {
    return this._insertIntoQueue$.asObservable().pipe(
      filter((models: RetryModel[]) => _.some(models, (model: RetryModel) => model.retryCount === 0)),
      delay(100),
      /**
       * Trigger only when in memory queue is empty
       */
      filter(() => {
        return this.inMemoryRetryQueue.size() <= 0;
      }),
    );
  }

  /**
   * Periodically poll for retry entries to be retried (based on retry date)
   */
  private initWatchRetryQueueForRetryableEntries() {
    this.dbWatchAllSub?.unsubscribe();
    this.dbWatchAllSub = of(undefined)
      .pipe(
        /**
         * Poll db only if in memory queue is empty
         */
        filter(() => {
          return this.inMemoryRetryQueue.size() <= 0;
        }),
        /**
         * Poll db for pending retries
         */
        mergeMap(() => {
          return this._data.RetryQueueService.countWaiting(this._userEmail);
        }),
        /**
         * Trigger only when in memory queue is empty
         */
        filter((count: Number) => {
          return count > 0;
        }),
        /**
         * Emit value and register condition with new date
         */
        tap(() => {
          this._pollDB$.next();
        }),
        /**
         * Poll DB while sync is active
         */
        repeatWhen((notifier: Observable<any>) => {
          return notifier.pipe(
            filter(() => {
              return this.active;
            }),
            delay(CONSTANTS.RETRY_SYNC_DB_POLL_PERIOD),
          );
        }),
      )
      .subscribe();
  }

  /**
   * Watch queue for any entries and escalate them when in memory queue is empty
   */
  private watchRetryQueueForRetryableEntries() {
    return this._pollDB$;
  }

  private inMemoryQueueNotEmpty(): Observable<any> {
    return this._inMemoryRetryQueueEmpty$.pipe(
      startWith(true),
      filter((isEmpty: boolean) => {
        return !isEmpty;
      }),
    );
  }

  private fillInMemoryQueue(): Observable<any> {
    return this._data.RetryQueueService.findNextBatch(
      this._userEmail,
      RetrySynchronizationService.dbLoaderPageSize,
    ).pipe(
      tap((retryEntries: RetryModel[]) => {
        if (this.verboseLogging) {
          Logger.log(
            '[SYNC] - RetrySync [%s]: filling in memory queue with [%d] entries',
            this._userEmail,
            retryEntries.length,
          );
        }

        if (retryEntries.length) {
          this.inMemoryRetryQueue.sortInsert(retryEntries);
          this._inMemoryRetryQueueEmpty$.next(false);
        }
      }),
    );
  }

  /**
   * Calculate page size
   */
  private calculateOffsetDate(retryCount: number, lastTryAfterDate: string): string {
    let offsetSeconds = Math.pow(10, retryCount);
    return moment(lastTryAfterDate).add(offsetSeconds, 'seconds').toDate().toISOString();
  }

  private filterOutEntriesAlreadyInQueue(models: RetryModel[]): Observable<any> {
    // Get ids from models
    let modelsByDataId = _.map(models, (model: RetryModel) => model.data._id);

    // Find entries with given model ids and filter out only unique ones
    return this._data.RetryQueueService.findByDataId(this._userEmail, modelsByDataId).pipe(
      map((dbModels: RetryModel[]) => {
        return _.differenceBy(models, dbModels, 'data._id');
      }),
    );
  }

  private handleBackToRetryQueue(retryEntry: RetryModel): Observable<any> {
    retryEntry.retryCount += 1;

    if (retryEntry.retryCount > RetrySynchronizationService.maxRetryCount) {
      Logger.customLog(
        `[SYNC] - RetrySync [${this._userEmail}]: will remove entry. Retry count exceeded. Object id: [${retryEntry.data.id}], retry type ${retryEntry.retryType}`,
        LogLevel.ERROR,
        [LogTag.SYNC, LogTag.INTERESTING_ERROR],
        true,
        'Retry count exceeded',
      );
      return this._data.RetryQueueService.removeById(this._userEmail, retryEntry._id);
    } else {
      if (this.verboseLogging) {
        Logger.log(
          '[SYNC] - RetrySync [%s]: will retry with retry count of [%d]',
          this._userEmail,
          retryEntry.retryCount,
        );
      }
      retryEntry.retryAfter = this.calculateOffsetDate(retryEntry.retryCount, retryEntry.retryAfter);

      this._insertIntoQueue$.next([retryEntry]);
      return this._data.RetryQueueService.saveToQueue(this._userEmail, retryEntry);
    }
  }

  private handleRetrySuccessful(retryEntry: RetryModel): Observable<any> {
    return this._data.RetryQueueService.removeById(this._userEmail, retryEntry._id);
  }
}

/**
 * Class for in memory queue
 */
export class InMemoryQueue {
  private queue = [];

  sortInsert(retryModels: RetryModel[]) {
    retryModels.forEach(retryModel => {
      this.sortInsertSingle(retryModel);
    });
  }

  private sortInsertSingle(retryModel: RetryModel) {
    for (let i = 0; i < this.size(); i++) {
      if (retryModel.retryPriority < this.queue[i].retryPriority) {
        this.queue.splice(i, 0, retryModel);
        return;
      }
    }

    this.queue.push(retryModel);
  }

  size(): number {
    let size = this.queue.length;

    let dbPageSize = RetrySynchronizationService.dbLoaderPageSize;
    if (size > dbPageSize * 3) {
      Logger.log(
        '[SYNC] - RetrySync [%s]: queue size is over the limit. DB page size is [%d] but current size id [%d].',
        dbPageSize,
        size,
      );
    }

    return size;
  }

  getFirst(): RetryModel {
    return this.queue[0];
  }

  removeFirst() {
    this.queue.splice(0, 1);
  }
}
