import { ApiService } from '@shared/api/api-loop/api.module';
import { DataServiceShared } from '@shared/services/data/data.service';
import * as moment from 'moment';
import { StorageKey, StorageService } from '@dta/shared/services/storage/storage.service';
import {
  animationFrameScheduler,
  from,
  fromEvent,
  interval,
  map,
  merge,
  mergeMap,
  Observable,
  of,
  Subject,
  Subscription,
  switchMap,
  timer
} from 'rxjs';
import { SharedSubjects } from '@shared/services/communication/shared-subjects/shared-subjects';
import { CONSTANTS } from '@shared/models/constants/constants';
import { exhaustMap, finalize, tap, throttle } from 'rxjs/operators';
import * as _ from 'lodash';
import { Logger } from '@shared/services/logger/logger';
import { LogLevel, LogTag } from '@dta/shared/models/logger.model';
import { ProcessType, StopWatch } from '@dta/shared/utils/stop-watch';
import { PullSynchronizationService } from '@shared/synchronization/pull-synchronization/pull-synchronization.service';
import { ConversationHelper } from '@shared/services/data/collection/conversation-collection/conversation-helper';
import { ConversationEventList } from '@shared/api/api-loop/models/conversation-event-list';
import { ConversationModel } from '@dta/shared/models-api-loop/conversation-card/conversation/conversation.model';
import { ConversationChangeModel } from '@dta/shared/models-api-loop/conversation-card/conversation/conversation-change.model';
import { CardType } from '@shared/api/api-loop/models';
import { ClearStorageCache } from '@shared/services/communication/shared-subjects/shared-subjects-models';

export abstract class ConversationSynchronizationService {
  active: boolean = false;
  private conversationHistoryId: string;

  protected constructor(
    protected _userEmail: string,
    protected _data: DataServiceShared,
    protected _apiService: ApiService,
    protected _storageService: StorageService,
    protected _pullSynchronizationService: PullSynchronizationService
  ) {}

  get constructorName(): string {
    return 'EventSynchronizationService';
  }

  get userEmail(): string {
    return this._userEmail;
  }

  public start(): Observable<any> {
    return of(undefined);
  }

  startConversationSync(unsubscribe$: Subject<void>): Subscription {
    if (this.active) {
      Logger.customLog(
        `[SYNC] - ConversationSync [${this._userEmail}]: startConversationSync() called but process is marked as active`,
        LogLevel.TRACE,
        LogTag.SYNC
      );

      return;
    }

    this.onUnsubscribe(unsubscribe$);

    let watch: StopWatch;
    this.active = true;

    /**
     * Check if valid historyId
     */
    return this.verifySyncState()
      .pipe(
        mergeMap(() => {
          Logger.log(
            `[SYNC] - ConversationSync [${this._userEmail}]: contact sync completed. Will start conversation periodic sync`
          );
          return this.runPeriodicSync(watch);
        }),
        /**
         * Always di-active
         */
        finalize(() => {
          this.active = false;
        })
      )
      .subscribe();
  }

  public runPeriodicSync(watch: StopWatch): Observable<any> {
    let minimumCutOffDate = moment().subtract(14, 'days').toDate().toISOString();
    this._storageService.setItem(StorageKey.cutOffTimeStorageKey, new Date(minimumCutOffDate).toISOString());
    const intervalWorker = new Worker(new URL('shared/modules/workers/interval-worker', import.meta.url));
    intervalWorker.postMessage({ period: CONSTANTS.CONVERSATION_PULL_PERIOD });

    return merge(SharedSubjects._conversationUpdated$, fromEvent(intervalWorker, 'message')).pipe(
      tap(() => {
        watch = new StopWatch(this.constructorName + '.startActiveSync', ProcessType.SERVICE, this._userEmail);
      }),
      /**
       * Emits, then ignores subsequent source values for a duration of interval
       */
      throttle(() => interval(500)),
      /**
       * Check for updates
       */
      exhaustMap(() => {
        watch.log('Fetching updates');
        return this.getConversationEvents().pipe(
          tap((conversationEventList: ConversationEventList) => {
            this.persistConversationHistoryId(conversationEventList.lastEventId);
          })
        );
      }),
      /**
       * Process and save updates
       */
      map((conversationEventList: ConversationEventList) => {
        return ConversationModel.createList(conversationEventList.resources);
      }),
      mergeMap((conversations: ConversationModel[]) => {
        if (_.isEmpty(conversations)) {
          return of(conversations);
        }

        const conversationIds = conversations.map(conversation => conversation.cardId);
        conversationIds.forEach(conversationId => {
          const cacheClearRequest = new ClearStorageCache();
          cacheClearRequest.storageName = {
            storageName: 'smartCommentsApi',
            keyToDelete: conversationId
          };
          SharedSubjects._clearStorageCache.next(cacheClearRequest);
        });

        return this._data.ConversationChangeService.findAllValidConversationsChanges(this._userEmail).pipe(
          mergeMap((changes: ConversationChangeModel[]) => {
            let changesByCardId = _.groupBy(changes, 'cardId');
            let changesToDelete = [];

            // TODO optimize
            _.forEach(conversations, conversation => {
              if (_.has(changesByCardId, conversation.cardId)) {
                changesToDelete.push(...conversation.applyLocalChanges(changesByCardId[conversation.cardId]));
              }
            });

            return this._data.ConversationChangeService.removeByIds(this._userEmail, changesToDelete).pipe(
              map(() => conversations)
            );
          })
        );
      }),
      map((_conversations: ConversationModel[]) => {
        /**
         * Remove CardMail from ConversationSync
         */
        return _.filter(_conversations, conversation => {
          return conversation.cardType !== CardType.CARD_MAIL;
        });
      }),
      mergeMap((_conversations: ConversationModel[]) => {
        return this._data.ConversationService.saveAllAndPublish(this._userEmail, _conversations);
      })
    );
  }

  /**
   * Remove _syncedComments on Conversation model in order for FE to re-fetch comments
   */
  onUnsubscribe(unsubscribe$: Subject<void>) {
    return this._data.ConversationService.removeSyncedCommentAttribute(this._userEmail, undefined)
      .pipe(
        switchMap(() => unsubscribe$),
        mergeMap(() => {
          return this._data.ConversationService.removeSyncedCommentAttribute(this._userEmail, undefined);
        })
      )
      .subscribe();
  }

  /**
   * Returns conversation event id that was last synced
   */
  getConversationActiveSyncHistoryId(): string {
    if (_.isEmpty(this.conversationHistoryId)) {
      let key = this._storageService.getKey(this._userEmail, StorageKey.activeConversationSyncHistoryId);
      this.conversationHistoryId = this._storageService.getParsedItem(key);
    }

    return this.conversationHistoryId;
  }

  /**
   * Set conversation synced history id
   */
  persistConversationHistoryId(historyId: string) {
    this.conversationHistoryId = historyId;

    let key = this._storageService.getKey(this._userEmail, StorageKey.activeConversationSyncHistoryId);
    this._storageService.setStringifiedItem(key, historyId);
  }

  protected abstract isHistoryIdValid(): Observable<any>;

  protected abstract getMaxChanges(): number;

  protected handleWasOfflineTooLong(): Observable<any> {
    localStorage.setItem(StorageKey.syncRestartRequired, 'true');
    return of(undefined);
  }

  private getConversationEvents(): Observable<ConversationEventList> {
    let historyId = this.getConversationActiveSyncHistoryId();
    let query = ConversationHelper.getEventConversationQuery(1024, historyId);
    return this._apiService.EventApiService.Event_GetConversationsEvent(query, this._userEmail);
  }

  protected abstract isInvalidState(): boolean;

  /**
   * Check if all parameters are valid
   */
  private verifySyncState(): Observable<any> {
    // Reset state if there are any missing fields
    if (this.isInvalidState()) {
      Logger.customLog(
        `[SYNC] - PullSync [${this._userEmail}]: verifySyncState - invalid state, will resetSyncState()`,
        LogLevel.TRACE,
        LogTag.SYNC
      );

      return this.resetSyncState();
    }

    // Check if there are too many changes / invalid history ID
    return this.isHistoryIdValid();
  }

  protected resetSyncState(): Observable<any> {
    return this._apiService.EventApiService.Event_GetConversationsEvent({ size: 0 }, this._userEmail).pipe(
      tap((response: ConversationEventList) => {
        this.handleResetConversationSyncState(response.lastEventId);
      }),
      mergeMap(() => {
        return this._data.ConversationService.removeAll(this._userEmail);
      })
    );
  }

  private handleResetConversationSyncState(newConversationHistoryId: string) {
    this.persistConversationHistoryId(newConversationHistoryId);
  }
}
