import { Injectable } from '@angular/core';

// 3rd party
import { combineLatest, Observable } from 'rxjs';
import { filter, map } from 'rxjs/operators';

// App
import { DeviceService } from '../device';
import {
  RealtimeSocketMessages,
  RealtimeServerSocketMessage,
  SocketAction,
  SocketConnectionState,
  SocketService
} from '../socket';
import { NotifierSubject, PaginatedQueryNotifierSubject } from './models';
import {
  PaginatedQueryInput,
  QueryInput,
  QuerySummary,
  RealtimeSocketEventHandler,
  ObserversChangedCallback
} from './types';
import { PaginatedQuerySummary } from 'models';

/*

A utility service designed to be utilized by store services to construct
real-time observable streams of data for both document lookups and collection
queries.

The service is designed to be agnostic as to the data structures used and
provides mechanisms for common use cases such as infinite scrolling and
optimistic rendering. For more, see:

https://www.notion.so/norby/Observable-Data-Service-d15acbcca6bd4fdeaa0f4f5154b15279

*/

@Injectable({
  providedIn: 'root'
})
export class ObservableDataService {
  private _slug: string;

  // Internal socket state
  private _socketState: SocketConnectionState;

  // Internal cache of notifier streams
  private _cache: { [k: string]: NotifierSubject<any> } = {};

  // Map of topics to keys for active streams
  // Outer keys are concatenated topics and resource identifiers
  // ex: { CONTENT_UPDATED_IN_SLUG_resourceId: { [someStreamKey]: true } }
  private _topicRegistry: { [k: string]: { [k: string]: boolean } } = {};

  constructor(
    private _socket: SocketService,
    private _device: DeviceService
  ) {
    combineLatest([
      this._socket.connectionStateChanged$,
      this._device.currentSlug$
    ]).subscribe(([state, slug]) => {
      const { connected, uid } = state;
      const userDidChange =
        this._socketState?.uid && uid !== this._socketState.uid;
      const slugDidChange = this._slug && slug !== this._slug;

      if (userDidChange || slugDidChange) {
        // If the current user or slug changed, close all streams
        // and clear the cache
        this._clearCache();
      } else if (uid && connected && !this._socketState?.connected) {
        // If the user didn't change but we're now connected
        // and we were previously disconnected (e.g. because
        // of an error), restart listening on all the cached
        // streams that still have active observers
        this._restartCachedStreams();
      }

      this._socketState = state;
      this._slug = slug;
    });
  }

  // Paginated query entry point
  query$<T>(input: PaginatedQueryInput): Observable<PaginatedQuerySummary<T>> {
    // Create or fetch the stream from the cache
    const protectedStream = this._getNotifierSubjectForQuery<T>(input);

    // Wrap the stream in callbacks
    protectedStream.enable(
      this._getOnObserverAddedCallback<T>(protectedStream, input),
      this._getOnObserverRemovedCallback<T>(protectedStream, input)
    );

    // Limit the output stream if necessary
    return protectedStream.stream.pipe(
      map((summary) =>
        input?.args?.limit &&
        summary?.items?.length &&
        input.args.limit < summary.items.length
          ? {
              ...summary,
              items: summary.items.slice(0, input.args.limit)
            }
          : summary
      )
    );
  }

  // Document query entry point
  document$<T>(input: QueryInput): Observable<QuerySummary<T>> {
    // Create or fetch the stream from the cache
    const protectedStream = this._getNotifierSubjectForDocumentLookup<T>(input);

    // Wrap the stream in callbacks
    protectedStream.enable(
      this._getOnObserverAddedCallback<T>(protectedStream, input),
      this._getOnObserverRemovedCallback<T>(protectedStream, input)
    );

    return protectedStream.stream;
  }

  // Concatenated topic and resource ID to form keys for
  // the internal topic registry
  private _getKeyForHandler(handler) {
    return `${handler.event}_${handler.payload.resourceId}`;
  }

  // Run whenever a new observer subscribes to the stream
  // If we're the first subscriber, trigger a lookup call
  // and start listening for events over the socket
  private _getOnObserverAddedCallback<T>(
    protectedStream: NotifierSubject<T>,
    input: QueryInput
  ): ObserversChangedCallback {
    return (oldCount, newCount) => {
      const isFirstObserver = oldCount === 0 && newCount === 1;
      if (isFirstObserver) {
        protectedStream.fetch().then(() => {
          this._initSocketEvents(input.handlers, protectedStream);
        });
      }
    };
  }

  // Run whenever an observer unsubscribes from the stream
  // If we're the last subscriber, we can stop listening for
  // the specified events and reset the cursor
  private _getOnObserverRemovedCallback<T>(
    protectedStream: NotifierSubject<T>,
    input: QueryInput
  ): ObserversChangedCallback {
    return (oldCount, newCount) => {
      const isLastObserver = newCount === 0;
      if (isLastObserver) {
        // If the last subscriber just unsubscribed, the stream
        // should go dormant
        input.handlers?.forEach((handler) => {
          // Remove the handler dispatcher subscription
          handler.subscription?.unsubscribe();

          // Unregister this stream as being interested in the handler's
          // the topic/resourceId key
          const topicRegistryKey = this._getKeyForHandler(handler);
          delete this._topicRegistry[topicRegistryKey]?.[protectedStream.key];

          // If there are no other streams that are interested in the handler's
          // topic/resourceId key, tell the server to stop sending messages
          // for that topic/resourceId
          const hasRemainingHandlers =
            Object.keys(this._topicRegistry[topicRegistryKey]).length > 0;
          if (!hasRemainingHandlers) {
            this._sendSocketMessage(handler.event, 'stop', handler.payload);
          }
        });

        // If this is a paginated query, also reset our max cursor
        if (protectedStream instanceof PaginatedQueryNotifierSubject) {
          protectedStream.maxCursor = null;
        }
      }
    };
  }

  private _getNotifierSubjectForQuery<T>(
    input: PaginatedQueryInput
  ): PaginatedQueryNotifierSubject<T> {
    const key = PaginatedQueryNotifierSubject.constructQueryKey(input);
    this._cache[key] =
      this._cache[key] || new PaginatedQueryNotifierSubject<T>(input);
    return this._cache[key] as PaginatedQueryNotifierSubject<T>;
  }

  private _getNotifierSubjectForDocumentLookup<T>(
    input: QueryInput
  ): NotifierSubject<T> {
    const key = NotifierSubject.constructQueryKey(input);
    this._cache[key] = this._cache[key] || new NotifierSubject<T>(input);
    return this._cache[key];
  }

  // Remove all private cached streams and close all private subscriptions
  private _clearCache() {
    const keys = Object.keys(this._cache);
    keys.forEach((key) => {
      const stream = this._cache[key];

      // No op if public stream
      if (stream.isPublic) {
        return;
      }

      // Disconnect all event handlers
      stream.handlers?.forEach((handler) => {
        handler.subscription?.unsubscribe();
        const topicRegistryKey = this._getKeyForHandler(handler);
        delete this._topicRegistry[topicRegistryKey]?.[stream.key];

        if (this._socketState?.connected) {
          this._sendSocketMessage(handler.event, 'stop', handler.payload);
        }
      });

      // Clear the current value of the stream
      stream.next(null);
      stream.complete();
      delete this._cache[key];
    });
  }

  // After a disconnect, restart listening for socket events
  // Only restart on streams that still have active observers
  private _restartCachedStreams() {
    Object.values(this._cache).forEach((stream) => {
      if (stream?.numberOfObservers > 0) {
        stream?.handlers?.forEach((handler) => {
          this._sendSocketMessage(handler.event, 'start', handler.payload);
        });
      }
    });
  }

  // Send a start or stop message
  private _sendSocketMessage(
    message: RealtimeSocketMessages,
    listen: SocketAction,
    payload: any = {}
  ) {
    const obj = {
      message,
      payload: {
        ...payload,
        listen
      }
    };

    this._socket.sendMessage(obj);
  }

  // Start listening for events for a given query
  // Subscribe to the real time socket event stream, filter
  // for the event types we're interested in, and use the
  // handler's provided transformer function to incorporate
  // updates into the stream
  private _initSocketEvents(
    handlers: RealtimeSocketEventHandler[],
    protectedStream: NotifierSubject<any>
  ) {
    handlers?.forEach((handler) => {
      // Add the topic for this handler to the topic registry if it's not already there
      // Register this stream as interested in that topic
      const topicRegistryKey = this._getKeyForHandler(handler);
      this._topicRegistry[topicRegistryKey] = {
        ...this._topicRegistry[topicRegistryKey],
        [protectedStream.key]: true
      };

      // Initiate socket events for this topic
      this._sendSocketMessage(handler.event, 'start', handler.payload);

      // Remove any previous subscription and set up a new subscription on socket events
      // to dispatch relevant events when they come in
      handler.subscription?.unsubscribe();
      handler.subscription = this._socket.realtimeSocketServerMessages$
        .pipe(
          filter((event: RealtimeServerSocketMessage) => {
            return (
              event &&
              event.topic === handler.event &&
              event.identifier === handler.payload.resourceId
            );
          })
        )
        .subscribe((event: RealtimeServerSocketMessage) =>
          protectedStream.next(
            handler.transformer(event, protectedStream.currentValue)
          )
        );
    });
  }
}
