import { Injectable } from '@angular/core';
import { forEach } from 'lodash';

import { UpdatesService } from './updates.service';
import { take } from 'rxjs/operators';

@Injectable()
export class ApiUpdatesService {
   private globalUpdates = {};

   constructor(private updatesService: UpdatesService)
   {}

   public initApiUpdatesService(serviceKey: string, modelFetcher?: (...args: any) => void) {
      this.globalUpdates[serviceKey] ||= {
         updates: this.updatesService.init((update) => {
            this.globalUpdates[serviceKey].updaters[update.channel](update.data);
         }),
         updaters: {}
      };

      const updates = this.globalUpdates[serviceKey].updates;
      const updaters = this.globalUpdates[serviceKey].updaters;

      function _registerUpdater(channel, updater) {
         updaters[channel] = updater;
      }

      function _registerModelUpdater(model) {
         _registerUpdater(model._channel, function() {
            modelFetcher(model);
         });
         return model;
      }

      function subscribeToUpdate(model) {
         _registerModelUpdater(model);
         updates.subscribeToUpdate([{channel: model._channel}]);
         return model;
      }

      function unsubscribeToUpdate(model) {
         updates.unsubscribeToUpdate([{channel: model._channel}]);
         return model;
      }

      // Subscribe to a channel that triggers incremental updates for
      // some thing, using some kind of update offset data. Only one
      // callback to updater will run at once, another will not start
      // until updater's promise resolves. updater must return the new
      // updateOffset token which will be passed back in to it. Only
      // one updater is supported per channel - new subscribes will
      // override old ones.
      function subscribeToIncrementalUpdates(updater, channel, initialUpdateOffset) {
         let updateOffset = initialUpdateOffset; // null during a running update
         let pendingUpdate = false; // set if updates come in while an update is running

         function update() {
            const offset = updateOffset;
            if (!offset) {
               pendingUpdate = true;
               return;
            }
            updateOffset = null;
            updater(offset)
               .pipe(take(1))
               .subscribe(
                  newUpdateOffset => {
                     updateOffset = newUpdateOffset;
                     if (pendingUpdate) {
                        pendingUpdate = false;
                        update();
                     }
                  }
               );
         }
         _registerUpdater(channel, update);
         updates.subscribeToUpdate([{channel: channel}]);
      }

      function subscribeToCollection(rawCollection, fetcher, fetcherArgs, fetcherObservable=false) {
         const channels = [];

         forEach(rawCollection.results, (model) => {
            if (model._channel) {
               _registerModelUpdater(model);
               channels.push({channel: model._channel});
            }
         });

         if (rawCollection.channel && fetcher) {
            _registerUpdater(rawCollection.channel, () => {
               if ( fetcherObservable ) {
                  fetcher(...fetcherArgs)
                     .pipe(take(1))
                     .subscribe();
               }
               else {
                  fetcher.apply(null, fetcherArgs);
               }
            });
            channels.push({channel: rawCollection.channel});
         }

         updates.subscribeToUpdate(channels);

         return rawCollection.results;
      }

      return { subscribeToUpdate, unsubscribeToUpdate, subscribeToCollection, subscribeToIncrementalUpdates };
   }
}
