import * as _ from 'lodash';
import { Observable, of, Subject, Subscription, throwError } from 'rxjs';
import { Logger } from '@shared/services/logger/logger';
import { catchError, delay, filter, mergeMap, repeatWhen, takeUntil, tap } from 'rxjs/operators';
import { LogEntry } from '@shared/api/log-loop/models/log-entry';
import { LogService } from '@shared/api/log-loop/api.module';
import { SyncServiceStatus } from '@shared/models/sync/synchronization-service-status.model';
import { WatchdogService } from '@shared/services/watchdog/watchdog.service';
import { LogTag } from '@dta/shared/models/logger.model';
import { ProcessType, StopWatch } from '@dta/shared/utils/stop-watch';

export abstract class LogSynchronizationService {
  // State variables
  active: boolean = false;
  protected syncPeriodSeconds: number = 10;

  constructor(
    protected _watchdogService: WatchdogService,
    protected _logService: LogService,
  ) {}

  get constructorName(): string {
    return 'LogSynchronizationService';
  }

  protected abstract getNextLogBundle(): Observable<{ logBundle: LogEntry[]; inflightLogPaths: string[] }>;

  protected abstract removeInflightLogBundle(inflightLogPaths: string[]): Observable<any>;

  getStatus(): SyncServiceStatus {
    return this.active ? SyncServiceStatus.ACTIVE : SyncServiceStatus.INACTIVE;
  }

  /**
   * Start logs synchronization
   */
  startLogSync(unsubscribe$: Subject<void>): Subscription {
    if (this.active) {
      return;
    }

    let watch: StopWatch;
    this.active = true;

    Logger.log(`[SYNC] - LogSync: started`);
    return of(undefined)
      .pipe(
        filter(() => {
          return this._watchdogService.isConnectionActive;
        }),
        tap(() => {
          watch = new StopWatch(this.constructorName + '.startLogSync', ProcessType.SERVICE);
        }),
        mergeMap(() => {
          return this.getNextLogBundle();
        }),
        mergeMap((data: { logBundle: LogEntry[]; inflightLogPaths: string[] }) => {
          if (_.isEmpty(data)) {
            return of([]);
          }

          let logs = data.logBundle.filter((log: LogEntry) => !_.isUndefined(log));
          if (logs.length > 0) {
            let logBearer = logs[0].deviceId + ';' + logs[0].localTime;
            return this._logService.LogApiService.Log_ClientLog({ logEntries: logs }, logBearer).pipe(
              mergeMap(() => of(data.inflightLogPaths)),
            );
          }
          return of(data.inflightLogPaths);
        }),
        catchError(err => {
          console.log('Error when sending logs', err);
          return of([]);
        }),
        mergeMap((inflightLogPaths: string[]) => {
          return this.removeInflightLogBundle(inflightLogPaths);
        }),
        repeatWhen((notifier: Observable<any>) => {
          return notifier.pipe(
            takeUntil(unsubscribe$),
            filter(() => {
              return this.active;
            }),
            delay(this.syncPeriodSeconds * 1000),
          );
        }),
        catchError(err => {
          // Watchdog will restart the sync
          this.active = false;

          Logger.error(err, `[SYNC] - LogSync: error`, LogTag.SYNC);
          return throwError(err);
        }),
      )
      .subscribe();
  }
}
