import { Injectable, Inject, OnDestroy } from '@angular/core';
import { Observable, Subject, forkJoin, from, of, throwError } from 'rxjs';
import { HttpClient, HttpHeaders, HttpParams, HttpParamsOptions } from '@angular/common/http';
import { IngestQueueItem } from '../../ingest-queues/interfaces';
import { EndpointsApiService } from './endpoints.api.service';
import { SaveTransactionService } from '../utils/save-transaction/save-transaction.service';
import { Endpoint } from '../types';
import { ApiUpdatesService } from '../utils/updates/api-updates.service';
import { catchError, finalize, map, switchMap, take, takeUntil } from 'rxjs/operators';
import { AsyncCacheService } from '../utils/async-cache.service';
import { ApiConfigService } from './api-config.service';
import { StreamEndpoints } from '../types';
import { RAW_DATA } from '../constants/raw-data';
import { Filter } from '../../ingest-queues/interfaces';
import { ErrorMessageService } from './error-message.service';
import { AlertService } from './alert.service';
import { EdgeServicesApiService } from './edge-services.api.service';
import { EndpointsEnabled } from '../types/endpoints-selection.interface';
import { EndpointInfo } from '../types/endpoint-info.interface';
import { UrlService } from '../services/url.service';

export type Entity = 'Account' | 'Edge' | 'Arrivals' | 'Endpoint';

interface RawCollection {
   results: any[];
   channel?: any;
   version?: any;
}

interface ResultObj {
   data: RawCollection;
}

interface ItemToForce {
   id: string;
   forced: boolean;
}

export interface EntityInfo {
   entity: Entity;
   entityId: string;
}

export interface PriorityMap {
   [key: string]: number;
}

export interface PageOffset {
   [id: string]: {
      [key: string]: number | string;
      'id': number;
   };
}

export interface TableSort {
   active: string;
   direction: number;
}

export interface Actions {
   changeRank: boolean;
}

export interface EndpointsByEdge {
   [key: string]: string[];
}

const fileTrashableStatuses = ['actionRequired', 'duplicated', 'failed', 'ingested'];
const streamTrashableStatuses = ['ingested', 'failed'];
const canSendToTopOfQueueStatuses = ['queued', 'actionRequired', 'duplicated'];
const statusChangeMap = {
   queued: ['actionRequired'],
   ingested: ['queued'],
   ingesting: ['abort'],
   failed: ['queued', 'actionRequired'],
   missing: ['queued', 'actionRequired'],
   duplicated: ['queued'],
   actionRequired: ['queued'],
   deleted: [],
   unknown: []
};

@Injectable({
   providedIn: 'root',
})
export class IngestQueueItemsService implements OnDestroy {
   private apiUpdates: any;
   private asyncCache: any;
   private destroy$ = new Subject<void>();
   private moment: any;
   private apiVersion: string;
   private headers: HttpHeaders;
   private rawDataOptions = { headers: RAW_DATA, };
   private readonly endpointNotStartableWhen = ["disabled", "error", "warning"];

   constructor(
      private saveTransactionService: SaveTransactionService,
      private urlService: UrlService,
      private edgeServicesApi: EdgeServicesApiService,
      private alertService: AlertService,
      private httpClient: HttpClient,
      private endpointsApiService: EndpointsApiService,
      private apiUpdatesService: ApiUpdatesService,
      private asyncCacheService: AsyncCacheService<IngestQueueItem>,
      private errorMessageService: ErrorMessageService,
   )
   {
      this.moment = window.moment;
      this.apiUpdates = this.apiUpdatesService.initApiUpdatesService('IngestQueueItemsService');
      this.asyncCache = this.asyncCacheService.initAsyncCache('IngestQueueItemsService');
      this.apiVersion = ApiConfigService.settings.version;
      this.headers = new HttpHeaders().set('X-Api-Version', this.apiVersion);
   }

   ngOnDestroy() {
      this.destroy$.next();
      this.destroy$.complete();
   }

   public fetchIngests(endpointsInfo$: Observable<EndpointInfo[]>, endpointsSelection: EndpointsEnabled, offset: PageOffset, sort: TableSort, filterString: string, searchString: string, pageSize: number, priorityMap: PriorityMap, handler: any, errorHandler: any): Observable<RawCollection[]> | Observable<RawCollection> {
      return endpointsInfo$
         .pipe(
            switchMap((endpointsInfo: EndpointInfo[]) => {
               const filteredEndpoints = endpointsInfo.filter((endp: EndpointInfo) => (endpointsSelection[endp.id]));
               const endpByEdges: EndpointsByEdge = {};
               let edgeIds = [];
               const serviceGroupEndpoints: string[] = [];
               filteredEndpoints.forEach(endp => {
                  if (!endp.edgeServerId) {
                     if (!serviceGroupEndpoints.includes(endp.serviceGroupId)) {
                        serviceGroupEndpoints.push(endp.serviceGroupId);
                        endp.serviceGroupServices.forEach(serviceGroup => {
                           if( !(serviceGroup.edgeId in endpByEdges) ) {
                              endpByEdges[serviceGroup.edgeId] = [];
                              edgeIds.push(serviceGroup.edgeId);
                           }
                        });
                     }
                  }
                  else {
                     if( !(endp.edgeServerId in endpByEdges) ) {
                        endpByEdges[endp.edgeServerId] = [];
                        edgeIds.push(endp.edgeServerId);
                     }
                     endpByEdges[endp.edgeServerId].push(endp.id);
                  }
               });
               edgeIds = edgeIds.filter(eId => (offset === null || (eId in offset && offset[eId] !== null))); // we fetch ingest info from an edge if it is the first request or we have a valid offset
               const observableArray$ = edgeIds.map((edgeServerId: string) => {
                  const endpByEdge: EndpointsByEdge = {};
                  const edgeOffset: PageOffset = {};
                  const endps: string[] = endpByEdges[edgeServerId];
                  endpByEdge[edgeServerId] = endps.concat(serviceGroupEndpoints);
                  if (offset) {
                     edgeOffset[edgeServerId] = offset[edgeServerId];
                  }
                  return this.fetchIngestForEdgeServer(edgeServerId, edgeOffset, sort, filterString, searchString, pageSize, priorityMap, handler, errorHandler, endpByEdge);
               });
               return forkJoin(observableArray$);
            })
         );
   }


   public fetchUnprocessed(endpointsInfo$: Observable<EndpointInfo[]>, endpointsSelection: EndpointsEnabled, offset: PageOffset, sort: TableSort, filterString: string, searchString: string, pageSize: number, priorityMap: PriorityMap, handler: any, errorHandler: any): Observable<RawCollection[]> | Observable<RawCollection> {
      return endpointsInfo$
      .pipe(
         switchMap((endpointsInfo: EndpointInfo[]) => {
            const filteredEndpoints = endpointsInfo.filter(endp => (endpointsSelection[endp.id] && endp.online && endp?.category && endp.category === 'source' && (offset === null || (endp.id in offset && offset[endp.id] !== null))));
            const observableArray = filteredEndpoints.map((endp: EndpointInfo) => {
               const endpointOffset: PageOffset = {};
               if( offset ) {
                  endpointOffset[endp.id] = offset[endp.id];
               }
               return this.fetchUnprocessedForEndpoint(endp.id, endpointOffset, sort, searchString, filterString, pageSize, handler, errorHandler);
            });
            return forkJoin(observableArray);
         })
      );

   }

   public fetchEntityEndpoints(entityInfo: EntityInfo, forceFetch: boolean = false): Observable<Endpoint[]> {
      if( entityInfo.entity === "Account" ) {
         return this.endpointsApiService.fetchForAccountId(entityInfo.entityId, true, forceFetch=forceFetch); // fetches endpoints and serviceGroup Endpoints, for serviceGroup endpoints returns the edges associated
      }
      else if (entityInfo.entity === "Edge" ) {
         return this.endpointsApiService.fetchForServerId(entityInfo.entityId, forceFetch=forceFetch);
      }
      else if (entityInfo.entity === "Arrivals" ) {
         return this.endpointsApiService.fetchForArrivalsId(entityInfo.entityId, forceFetch=forceFetch);
      }
      else {
         of([]);
      }
   }


   public fetchBookingsForEndpoint(edgeServerId: string,  endpointId: string): Observable<IngestQueueItem[]> {
      const filterString = "queued|streams";
      const url = '/api/edgeServers/' + edgeServerId + '/ingestQueueItems';

      const params: any = {};
      params.sortKey = 'id';
      params.sortDirection = 1;
      params.filters = filterString;
      params.pageSize = 100; // I suspect it is more than enough...
      return this.httpClient.get<RawCollection>(url, { params: this.ToHttpParams(params), headers: RAW_DATA })
         .pipe(
            map((rawResults: RawCollection) => {
               if ( rawResults && rawResults?.results && rawResults.results.length > 0 ) {
                  const endpointResults: IngestQueueItem[] = rawResults.results.filter(item => item.endpointId === endpointId);
                  endpointResults.map(item => this.unmarshall(item));
                  return endpointResults;
               }
               else {
                  return [];
               }
            }),
         );
   }

   public queueItemCommand(queueItem: Partial<IngestQueueItem>, command: string, data?: Partial<IngestQueueItem>): Observable<IngestQueueItem> {
      return this.queueItemPost({...data, ...{ id: queueItem.id, action: command }});
   }

   public saveQueueItemStatus(queueItem: IngestQueueItem, status: string, $event: JQLite): any {
      $event.stopPropagation();
      queueItem.status = status;
      if(!queueItem.hasOwnProperty('$$savigStates')) {
         queueItem.$$savingStates = {};
      }
      return this.saveTransactionService.save(queueItem, 'status', this.queueItemPost.bind(this, { id: queueItem.id, status: queueItem.status, }, 'status'));
   }

   public saveQueueItemNames(queueItem: IngestQueueItem): Observable<any> {
      return from(this.saveTransactionService.saveObject(queueItem,  this.queueItemPost.bind(this, { id: queueItem.id, reelName: queueItem.reelName, diskLabel: queueItem.diskLabel, clipName: queueItem.clipName, })));
   }

   public saveQueueItemsStatus(queueItems: IngestQueueItem[], status: string): Observable<any> {
      queueItems.forEach(queueItem => queueItem.status = status);
      const api = () => {
         return this.httpClient.post<RawCollection>('/api/ingestQueueItems', queueItems.map(item => { return { id: item.id, status: item.status, }; }), this.rawDataOptions)
            .pipe(
               map((results: RawCollection) => this.unmarshallResults(results)),
            );
      };
      return this.saveTransactionService.saveCollectionKey(queueItems, 'status', api);
   }

   public rankToTopQueueItemForEdgeServer(edgeServerId: string, queueItems: IngestQueueItem[]): Observable<any> {
      return this.rankToTop('/api/edgeServers/' + edgeServerId + '/ingestQueueItemRanks', queueItems);
   }

   public rankToTopQueueItemForArrivalsFolder(arrivalsFolderId: string, queueItems: IngestQueueItem[]): Observable<any> {
      return this.rankToTop('/api/arrivalsFolders/' + arrivalsFolderId + '/ingestQueueItemRanks', queueItems);
   }

   public rankToTopQueueItemForAccount(accountId: string, queueItems: IngestQueueItem[]): Observable<any> {
      return this.rankToTop('/api/accounts/' + accountId + '/ingestQueueItemRanks', queueItems);
   }

   public fetchActions(entityInfo: EntityInfo): Observable<Actions> {
      let url: string;
      if( entityInfo.entity === 'Edge' ) {
         url = '/api/edgeServers/' + entityInfo.entityId + '/ingestQueueItemActions';
      } else if( entityInfo.entity === 'Arrivals' ) {
         url = '/api/arrivalsFolders/' + entityInfo.entityId + '/ingestQueueItemActions';
      } else if( entityInfo.entity === 'Account' ) {
         url = '/api/accounts/' + entityInfo.entityId +  '/ingestQueueItemActions';
      }
      const api = this.httpClient.get<any>(url);
      return this.asyncCache.fetch('ingest-queue-actions-on-' + entityInfo.entityId, api);
   }

   public trashQueueItem(queueItem: IngestQueueItem, $event: JQLite): void {
      $event.stopPropagation();
      const obs$ = (queueItem.isSchedule) ?  this.queueItemCommand(queueItem, 'reschedule', { duration: null, startTime: null }): this.trashIngestQueueItem(queueItem);
      obs$
         .pipe(
            take(1),
            catchError(error => {
               this.alertService.show({
                  text: error,
                  type: 'danger'
               });
               return of(-1); // if the item cannot be trashed, a message should have been now displayed, but what to do with that?
            })
         )
         .subscribe(result => {
            this.alertService.show({
               text: `QueueItem: ${queueItem.id} deleted`,
               type: 'info',
            }, 0);
         });
   }

   public trashIngestQueueItem(queueItem: IngestQueueItem): Observable<any> {
      queueItem.$$deleting = true;
      queueItem.error = null;
      return this.httpClient.post<any>('/api/ingestQueueItems/' + queueItem.id + '/trash', {})
         .pipe(
            map(item => {
                  queueItem = item;
                  this.unmarshall(queueItem);
            }),
            finalize(() => {
               queueItem.$$deleting = false;
            }),
         );
   }

   public trashQueueItems(queueItemsToTrash: IngestQueueItem[], mode: 'file'| 'stream'): Observable<any> {
      queueItemsToTrash.map(queueItem => {
         queueItem.$$deleting = true;
         queueItem.error = null;
      });
      const idsToTrash = queueItemsToTrash.map(item => item.edgeServerId + ':' + item.endpointId + ':' + item.queueId.toString());
      return this.httpClient.post<any>('/api/ingestQueueItems/trash', { ids: idsToTrash, mode: mode} , this.rawDataOptions)
         .pipe(
            catchError(error => {
                  this.alertService.show({
                  text: error,
                  type: 'danger'
               });
               return throwError(error);
            }),
            map((results: RawCollection) => this.unmarshallResults(results)),
            finalize(() => queueItemsToTrash.forEach(item => {
               item.$$deleting = false;
               this.alertService.show({
                  text: `QueueItem: ${item.id} deleted`,
                  type: 'info',
               }, 0);
            })),
         );
   }

   public isQueueItemTrashable(queueItem: IngestQueueItem): boolean {
      if (queueItem.isStream) {
         return queueItem.isSchedule || streamTrashableStatuses.includes(queueItem.status);
      } else {
         return fileTrashableStatuses.includes(queueItem.status);
      }
   };

   public canSendToTop(queueItem: IngestQueueItem): boolean {
      return !queueItem.isStream && canSendToTopOfQueueStatuses.includes(queueItem.status);
   };

   public selectedChangableItem(queueItem: IngestQueueItem, selectedItems: string[], keyName: string): boolean {
      return selectedItems.includes(queueItem[keyName].toString()) && !queueItem?.$$saving && !queueItem?.$$deleting;
   }

   public forceUnprocessedFiles(unprocessedFiles: IngestQueueItem[]) {
      unprocessedFiles.forEach(item => item.forced = true);
      return this.saveTransactionService.saveCollectionKey(unprocessedFiles, 'forced', () =>  {
         const itemsToForce: ItemToForce[] = [];
         unprocessedFiles.forEach(item => itemsToForce.push({ id: item.id, forced: item.forced }));
         return this.httpClient.post('/api/unprocessedFiles', itemsToForce)
            .pipe(
               catchError(error => {
                  unprocessedFiles.forEach(item => item.forced = false);
                  this.alertService.show({
                     text: 'Unable to force items ' + error,
                     type: 'warning'
                  });
                  return of(-1);
               }),
            );
      });
   };

   public rescanUnprocessedFiles(unprocessedFiles: IngestQueueItem[]) {
      unprocessedFiles.forEach(item =>  { item.$$deleting = true; });
      const itemsToRescan=[];
      unprocessedFiles.forEach(item => itemsToRescan.push(item.id));
      return this.httpClient.post('/api/unprocessedFiles/trash', itemsToRescan)
         .pipe(
            take(1),
            catchError(error => {
               unprocessedFiles.forEach(item =>  { item.$$deleting = false; });
               this.alertService.show({
                  text: 'Unable to rescan multiple items ' + error,
                  type: 'warning'
               });
               return of(-1);
            }),
         )
         .subscribe(
            result => {
               unprocessedFiles.forEach(item => item.$$deleting = false);
            }
         );
   }

   public itemsThatCanBeTrashed(items: IngestQueueItem[]): IngestQueueItem[] {
      return items.filter(item => this.isQueueItemTrashable(item));
   };

   public itemsThatCanBeSentToTop(items: IngestQueueItem[]): IngestQueueItem[] {
      return items.filter(item => this.canSendToTop(item));
   }

   public queueItemCanChangeTo(queueItem: IngestQueueItem, status: string): boolean {
      return !(queueItem.isStream || (queueItem?.endpoint && queueItem.endpoint.category === "stream")) && queueItem.status in statusChangeMap && statusChangeMap[queueItem.status].includes(status);
   };

   public itemsThatCanChangeTo(items: IngestQueueItem[], status: string): IngestQueueItem[] {
      return items.filter(item =>  this.queueItemCanChangeTo(item, status));
   };

   public queueItemCanStart(queueItem: IngestQueueItem): boolean {
      return queueItem.isStream && queueItem.status === 'actionRequired' && !this.endpointNotStartableWhen.includes(queueItem.endpoint.status);
   };

   public queueItemCanStop(queueItem: IngestQueueItem): boolean {
      return queueItem.isStream && queueItem.status === 'ingesting';
   };

   public queueItemCanAbort(queueItem: IngestQueueItem): boolean {
      return !queueItem.isStream && queueItem.status === 'ingesting';
   };

   public queueItemStart(queueItem: IngestQueueItem, $event: JQLite) {
      $event.stopPropagation();
      return this.queueItemCommand(queueItem, 'start')
         .pipe(
            take(1),
            catchError(error => {
               this.alertService.show({
                  text: 'Unable to start stream - ' + error,
                  type: 'danger'
               });
               return throwError(error);
            }),
         ).subscribe();
   };

   public queueItemStop(queueItem: IngestQueueItem, $event: JQLite) {
      $event.stopPropagation();
      return this.queueItemCommand(queueItem, 'stop')
         .pipe(
            take(1),
            catchError(error => {
               this.alertService.show({
                  text: 'Unable to stop stream - ' + error,
                  type: 'danger'
               });
               return throwError(error);
            }),
         ).subscribe();
   };

   public queueItemCanEdit(queueItem: IngestQueueItem): boolean {
      return queueItem.status === 'actionRequired' || queueItem.isSchedule;
   };

   public queueItemEndpointLink(queueItem: IngestQueueItem): string {
      if (queueItem.endpoint && queueItem.edgeServerId) {
         return this.urlService.makeUrl('edge-server-endpoint', {
            edgeServerId: queueItem.edgeServerId,
            serviceId: queueItem.endpoint.serviceId,
            endpointId: queueItem.endpoint.id
         });
      } else {
         return null;
      }
   };

   public queueItemCanCreateSchedule(queueItem: IngestQueueItem): boolean {
      return (queueItem.isStream && queueItem.status === 'actionRequired') ||
         queueItem.isSchedule;
   };

   public queueItemMonitorUrl(queueItem: IngestQueueItem, width: number, height: number): string {
      return '/api/ingestQueueItems/' + queueItem.id +
         '/thumbnail?width=' + width + '&height=' + height;
   }

   public queueItemLogUrl(queueItem: IngestQueueItem): string | null {
      if (['failed', 'ingesting', 'ingested'].includes(queueItem.status)) {
         return '/api/ingestQueueItems/' + queueItem.id + '/log';
      }
      return null;
   }

   public canMonitor(endpoint: Endpoint): boolean {
      return endpoint.typeId in StreamEndpoints && endpoint.queueItemId && endpoint.recording === true;
   }

   public canControl(endpoint: Endpoint): boolean | undefined {
      // Recording controls to be displayed for manual and hourly recording mode
      // ondemand has automated start/stop so no controls to be displayed.
      return endpoint.typeId in StreamEndpoints && endpoint.options.autoCapture.value !== 'ondemand';
   }

   public monitorUrl(endpoint: Endpoint, width: number, height: number): string {
      return '/api/ingestQueueItems/' + endpoint.queueItemId + '/thumbnail?width=' + width + '&height=' + height;
   }

   public startEndpoint(endpoint: Endpoint, $event: JQLite) {
      this.start(endpoint.queueItemId, $event, endpoint.name);
   }

   public stopEndpoint(endpoint: Endpoint, $event: JQLite) {
      this.stop(endpoint.queueItemId, $event, endpoint.name);
   }

   public start(id: string, $event: JQLite, name?: string): void {
      $event.stopPropagation();
      const queueItem = { id: id };
      this.queueItemCommand(queueItem, 'start')
            .pipe(
               take(1),
               catchError((error) => {
                  this.alertService.show({
                     text: 'Unable to start stream ' +  name + ' - ' + this.errorMessageService.errorMessage(error),
                     type: 'danger'
                  });
               return throwError(error);
               })
            )
            .subscribe();
   }

   public stop(id: string, $event: JQLite, name?: string): void {
      $event.stopPropagation();
      const queueItem = { id: id };
      this.queueItemCommand(queueItem, 'stop')
         .pipe(
            take(1),
            catchError(error => {
               this.alertService.show({
                  text: 'Unable to stop stream - '+ name + this.errorMessageService.errorMessage(error),
                  type: 'danger'
               });
            return throwError(error);
            }),
         )
         .subscribe();
   }

   public filterToString(obj: Filter): string {
      const trueKeys = Object.keys(obj).filter(key => obj[key] === true);
      return trueKeys.join('|');
   }

   public filterFromString(filterString: string): Filter {
      const filter: Filter = {};
      if ( filterString ) {
         const filtArray = filterString.split('|').map(item => item.trim());
         filtArray.map(key => {
            filter[key] = true;
         });
      }
      return filter;
   }

   public rankToTopFor(queueItems: IngestQueueItem[], entityInfo: EntityInfo): Observable<IngestQueueItem[]> {
      const url = (entityInfo.entity === 'Edge') ? '/api/edgeServers/' + entityInfo.entityId + '/ingestQueueItemRanks' :
                  (entityInfo.entity === 'Arrivals') ? '/api/arrivalsFolders/' + entityInfo.entityId + '/ingestQueueItemRanks':
                  '/api/accounts/' + entityInfo.entityId + '/ingestQueueItemRanks';
      return this.rankToTop(url, queueItems);
   }


   public selectedFilesThatCanBeRescanned(ingestQueueItems: IngestQueueItem[]): IngestQueueItem[] {
      return ingestQueueItems.filter(item => this.fileCanBeRescanned(item));
   };

   public selectedFilesThatCanBeForced(ingestQueueItems: IngestQueueItem[]): IngestQueueItem[] {
      return ingestQueueItems.filter(item => this.fileCanBeForced(item));
   };

   public fileCanBeRescanned(item: IngestQueueItem): boolean {
      // All files can be rescanned
      return true;
   };

   public fileCanBeForced(item: IngestQueueItem): boolean {
      return item.status === 'missingFiles';
   };

   public unsubscribeUpdateChannel(channel: string) {
      this.apiUpdates.unsubscribeToUpdate({_channel: channel});
   }

   public fetchUnprocessedForEndpoint(endpointId: string,offset: PageOffset, sort: TableSort, searchString: string, filterString: string, pageSize: number, handler, errorHandler): Observable<any> {
      const asyncCacheKey = 'unprocessed-for-endpoint-' + endpointId + '-' + offset + '-' + sort.active + '-' + sort.direction + searchString;
      const url = '/api/endpoints/' + endpointId + '/unprocessedFiles';
      const channelUrl = '/api/endpoints/' + endpointId;
      return this.fetchAndSubscribe(url, asyncCacheKey, offset, sort, filterString, null, searchString, pageSize, null, handler, errorHandler, {}, channelUrl);
   }

   public fetchIngestForEdgeServer(edgeServerId: string, offset: PageOffset, sort: TableSort, filterString: string, searchString: string, pageSize: number, priorityMap: PriorityMap, handler, errorHandler, edgesEndpointIds: EndpointsByEdge): Observable<any> {
      const asyncCacheKey = 'ingest-queue-items-for-edge-server-' + edgeServerId + '-' + offset + '-' + sort.active + '-' + sort.direction + '-' + filterString + '-' + searchString + '-' + pageSize;
      const url = '/api/edgeServers/' + edgeServerId + '/ingestQueueItems';
      return this.fetchAndSubscribe(url, asyncCacheKey, offset, sort, filterString, null, searchString, pageSize, priorityMap, handler, errorHandler, edgesEndpointIds);
   }

   private fetchIngestForArrivalsFolder(arrivalsFolderId: string, offset: PageOffset, sort: TableSort, filterString: string, searchString: string, pageSize: number, priorityMap: PriorityMap, handler, errorHandler): Observable<any> {
      const asyncCacheKey = 'ingest-queue-items-for-arrivals-folder-' + arrivalsFolderId + '-' + offset + '-' + sort.active + '-' + sort.direction + '-' + filterString + '-' + searchString + '-' + pageSize;
      const url = '/api/arrivalsFolders/' + arrivalsFolderId + '/ingestQueueItems';
      return this.fetchAndSubscribe(url, asyncCacheKey, offset, sort, filterString, null, searchString, pageSize, priorityMap, handler, errorHandler, {});
   }

   private fetchIngestForEndpoint(endpointId: string,offset: PageOffset, sort: TableSort, filterString: string, searchString: string, pageSize: number, priorityMap: PriorityMap, handler, errorHandler): Observable<any> {
      const asyncCacheKey = 'ingest-queue-items-for-endpoint-' + endpointId + '-' + offset + '-' + sort.active + '-' + sort.direction + '-' + filterString + '-' + searchString + '-' + pageSize;
      const url = '/api/endpoints/' + endpointId + '/ingestQueueItems';
      return this.fetchAndSubscribe(url, asyncCacheKey, offset, sort, filterString, null, searchString, pageSize, priorityMap, handler, errorHandler, {});
   }


   private queueItemPost(data: Partial<IngestQueueItem>): Observable<IngestQueueItem> {
      const httpParams: HttpParamsOptions = { } as HttpParamsOptions;
      const options = { params: new HttpParams(httpParams), headers: this.headers };

      return this.httpClient.post<any>('/api/ingestQueueItems/' + data.id, data)
         .pipe(
            catchError(error => {
               this.alertService.show({
                  text: this.errorMessageService.errorMessage(error),
                  type: 'danger'
               });
               return throwError(error);
            }),
            map(result => {
                  if(result.error) {
                     return result;
                  }
                  else {
                     return this.unmarshall(result);
                  }
            }),
         );
   }

   private queueItemPostPromise(data: Partial<IngestQueueItem>): Promise<IngestQueueItem> {
      const httpParams: HttpParamsOptions = { } as HttpParamsOptions;
      const options = { params: new HttpParams(httpParams), headers: this.headers };

      return new Promise((resolve, reject) => this.httpClient.post<any>('/api/ingestQueueItems/' + data.id, data)
         .pipe(
            take(1),
         )
         .subscribe(
            (result) => {
               if(result.error) {
                  this.alertService.show({
                     text: this.errorMessageService.errorMessage(result.error),
                     type: 'danger'
                  });
                     reject(result.error);
               }
               else {
                  resolve(this.unmarshall(result));
               }
            },
            (error) => {
               this.alertService.show({
                  text: this.errorMessageService.errorMessage(error),
                  type: 'danger'
               });
               reject(error);
            }));
   }

   private fetchAndSubscribe(url: string, asyncCacheKey: string, offset: PageOffset, sort: TableSort, filterString: string, searchString: string, extendedSearchString: string, pageSize: number, priorityMap: PriorityMap, handler: any, errorHandler: any, edgesEndpointIds: EndpointsByEdge, subscribeChannel: string = null): Observable<any> {
      const params: any = {};
      if (offset) {
         params.offset = offset;
      }
      if (sort) {
         params.sortKey = sort.active;
         params.sortDirection = sort.direction;
      }
      if (filterString) {
         params.filters = filterString;
      }
      if (searchString) {
         params.search = searchString;
      }
      else {
         params.search = '';
      }
      if (extendedSearchString) {
         params.extendedSearch = extendedSearchString;
      }
      if (pageSize) {
         params.pageSize = pageSize;
      }
      if (priorityMap) {
         params.priorityMap = priorityMap;
      }
      if (edgesEndpointIds) {
         params.edgesEndpointIds = edgesEndpointIds;
      }
      const api = this.httpClient.get<RawCollection>(url, { params: this.ToHttpParams(params), headers: RAW_DATA })
         .pipe(
            map((rawResults) => {
               if ( rawResults && rawResults?.results && rawResults.results.length > 0 ) {
                  rawResults.results.map(item => this.unmarshall(item));
                  return rawResults;
               }
               else {
                  return rawResults;
               }
            }),
         );
      const fetchRes$ = this.asyncCache.fetch(asyncCacheKey, api, true);
      return fetchRes$
         .pipe(
            catchError(error => {
               errorHandler(error);
               return throwError(error);
            }),
            map((results: RawCollection) => {
               handler(results);
               const updater = (sinceVersion) => {
                  return this.fetchSince(url, sinceVersion, filterString)
                     .pipe(
                        catchError(error => {
                           errorHandler(error);
                           return throwError(error);
                        }),
                        map((fetched: RawCollection) => {
                           if ( edgesEndpointIds && Object.keys(edgesEndpointIds).length === 1) { // we can filter the updates only if we have the list of the endpoints we request items from
                              const edgeId = Object.keys(edgesEndpointIds)[0];
                              fetched.results = fetched.results.filter(item => edgesEndpointIds[edgeId].includes(item.endpointId));
                           }
                           this.unmarshallResults(fetched);
                           handler(fetched);
                           return fetched.version;
                        }),
                     );
               };
               this.apiUpdates.subscribeToIncrementalUpdates(updater, (subscribeChannel) ? subscribeChannel : results.channel, results.version);
               return results;
            }),
         );
   }

   private ToHttpParams(params: any): HttpParams {
      let httpParams = new HttpParams();
      for (const key of Object.keys(params)) {
        const value = params[key];
        if (Array.isArray(value)) {
          // If the value is an array, convert it to a comma-separated string
          httpParams = httpParams.set(key, value.join(','));
        } else if (typeof value === 'object') {
          // If the value is an object, convert it to a JSON string
          httpParams = httpParams.set(key, JSON.stringify(value));
        } else {
          // For other types (string, number, etc.), just append them as-is
          httpParams = httpParams.set(key, value.toString());
        }
      }
      return httpParams;
   }

   private rankToTop(url: string, queueItems: IngestQueueItem[]): Observable<any> {
      queueItems.forEach(queueItem => {
         queueItem.$$saving = true;
      });
      return this.httpClient.post<any>(url, queueItems.map((queueItem, i) =>  { return { queueItemId: queueItem.id, rank: i+1, }; }), this.rawDataOptions)
         .pipe(
            map((results: RawCollection) => {
               this.unmarshallResults(results);
            }),
            finalize(() => queueItems.forEach(queueItem => { queueItem.$$saving = false; })),
         );
   }

   private fetchSince(url: string, sinceVersion: number, stateFilters?: string): Observable<RawCollection> {
      const params: any = {};
      params.sinceVersion = sinceVersion;
      if(stateFilters) {
         params.filters = stateFilters;
      }
      return this.httpClient.get<RawCollection>(url, { params: this.ToHttpParams(params), headers: RAW_DATA });
   }

   private unmarshallResults(results: RawCollection): RawCollection {
      if( results && results?.results && results.results.length ) {
         results.results.map(item => this.unmarshall(item));
      }
      return results;
   }

   private convertItems(queueItem: IngestQueueItem): void {
      if (queueItem.scheduledStartTime) {
         queueItem.scheduledStartTime = this.moment(queueItem.scheduledStartTime);
         queueItem.priority = -queueItem.scheduledStartTime.unix();
      }
      if (queueItem.whenQueued) {
         queueItem.whenQueued = this.moment(queueItem.whenQueued);
      }
      if (queueItem.whenStarted) {
         queueItem.whenStarted = this.moment(queueItem.whenStarted);
      }
      if (queueItem.whenFinished) {
         queueItem.whenFinished = this.moment(queueItem.whenFinished);
      }
      if (queueItem.duration) {
         queueItem.duration = this.moment.duration(queueItem.duration, 'seconds');
      }
      if (queueItem.proxyWidth && queueItem.proxyHeight) {
         queueItem.playable = true;
      }
   }

   private unmarshall(queueItem: IngestQueueItem): IngestQueueItem {
      if (queueItem.endpointId && !queueItem.endpoint) {
         queueItem.endpoint = this.endpointsApiService.getSkeleton(queueItem.endpointId);
         this.endpointsApiService.populateSkeleton(queueItem.endpoint)
            .pipe(
               takeUntil(this.destroy$),
               switchMap(endpoint => {
                  queueItem.endpoint = endpoint;
                  return this.edgeServicesApi.fetchService(endpoint.serviceId);
               }),
            )
            .subscribe(srv => {
               queueItem.endpoint.service = srv;
            });
            this.convertItems(queueItem);
            return this.saveTransactionService.prepare(queueItem);
      }
      else {
         this.convertItems(queueItem);
         return queueItem;
      }
   }

}

