import chunk from 'lodash/chunk';
import flatten from 'lodash/flatten';
import map from 'lodash/map';
import omit from 'lodash/omit';
import pick from 'lodash/pick';
import * as jsonpatch from 'readwise-fast-json-patch';
import { Operation } from 'readwise-fast-json-patch';
import { fillObjectDataBeforeInsert, RxDocumentData } from 'rxdb';
import { ulid } from 'ulid';

import Database from '../database/Database';
import documentUpsertChunkSize from '../database/internals/documentUpsertChunkSize';
import lockManager from '../lockManager.platform';
// eslint-disable-next-line import/no-cycle
import networkDetector from '../networkDetector.platform';
import safelyApplyJsonPatch from '../safelyApplyJsonPatch';
import {
  type IdToDocumentMap,
  type MinimalPersistentState,
  type PersistentState,
  type PersistentStateWithDocuments,
  type PersistentUpdate,
  type PersistentUpdateForServer,
  type StateFetchMeta,
  type StateSyncingOptions,
  type UserEventWithDataUpdate,
  AnyDocument,
  PersistentStateLoadingState,
} from '../types';
import type { DatabaseCollectionNamesToDocType } from '../types/database';
import cloneDeep from '../utils/cloneDeep';
import nowTimestamp from '../utils/dates/nowTimestamp';
import delay from '../utils/delay';
import { isDesktopApp, isExtension, isMobile, isTest, isWebApp } from '../utils/environment';
import exceptionHandler from '../utils/exceptionHandler.platform';
import getServerBaseUrl from '../utils/getServerBaseUrl.platform';
import makeLogger from '../utils/makeLogger';
import minimizeState from '../utils/minimizeState';
import removeNonSyncingPropertiesFromDocument from '../utils/removeNonSyncingPropertiesFromDocument';
import requestWithAuth from '../utils/requestWithAuth.platformIncludingExtension';
import stringifySafe from '../utils/stringifySafe';
import HttpError from './HttpError';
// eslint-disable-next-line import/no-cycle
import foreground, {
  portalGate as foregroundPortalGate,
} from './portalGates/toForeground/singleProcess';

const logger = makeLogger(__filename, {
  shouldLog: (methodName) => {
    return isMobile
      ? ['info', 'warning', 'error'].includes(methodName)
      : ['warning', 'error'].includes(methodName);
  },
});

// Some important constants
const PERSISTENT_SCHEMA_VERSION = 9;
const STATE_PULL_FROM_SERVER_RATE_LIMIT_MS = isExtension ? 2000 : 1000;
const UPDATE_QUEUE_INTERVAL = 200;
const WEB_LOCK_KEY = 'stateSyncing';

// Only the webapp running in a browser can potentially have multiple tabs syncing at once:
const environmentHasSingleTab = isExtension || isMobile || isDesktopApp;

export class StateSyncer {
  database: Database;
  useFullServerSyncOnNextPull = false;

  // State to coordinate syncing
  readonly serverToForegroundUpdateQueue: PersistentUpdate[] = [];
  lastStateFetchMeta: StateFetchMeta | null = null;
  nextNetworkPostRetryAt: number | null = null;
  consumeServerToForegroundUpdatesUntilTime = 0;
  lastServerToForegroundUpdateThisSession = 0;
  cachedUpdatesStats = {
    serverUpdates: 0,
    updates: 0,
    patches: 0,
  };

  // Flags for whether any dangerous operations are happening
  isMergingStateCache = false;
  isSendingUpdatesToServer = false;
  isCommittingServerChangesToCache = false;

  // Declare the default versions of these functions, which can be overridden by the options if needed
  onNewUpdatesForForeground: NonNullable<StateSyncingOptions['onNewUpdatesForForeground']>;
  getCurrentPersistentStateWithDocuments: StateSyncingOptions['getCurrentPersistentStateWithDocuments'];
  runStateChecksum: StateSyncingOptions['runStateChecksum'];
  onNewEventFromForeground: StateSyncingOptions['onNewEventFromForeground'];
  onLoggedOut: StateSyncingOptions['onLoggedOut'];
  shouldSkipSearchIndexing: boolean;

  syncingStopped = true;

  // Focus tracking logic, assume tab is focused when syncer is created:
  isFocused = true;
  wentFromUnFocusedToFocused = false;

  // Temporary hack to clear out the caches of users who have overly-large payloads from a bug.
  // TODO (April 2023): remove this flag and all related logic
  truncateCachedEvents = false;

  onPersistentStateLoadedFromServer: (() => void) | undefined = undefined;

  // Only used for testing:
  cacheKeyPrefix = '';
  _exception: any = null; // eslint-disable-line @typescript-eslint/no-explicit-any

  constructor(options: StateSyncingOptions) {
    // TODO(minor): add check here that this is only initialized once in non-tests, to make it a singleton

    this.database = options.database;

    if (options.cacheKeyPrefix) {
      this.cacheKeyPrefix = options.cacheKeyPrefix;
    }

    // Update these functions from the defaults only if they're passed in as options:
    this.onNewUpdatesForForeground =
      options.onNewUpdatesForForeground ?? foreground.onBackgroundStateUpdates;
    // eslint-disable-next-line func-name-matching
    this.getCurrentPersistentStateWithDocuments =
      options.getCurrentPersistentStateWithDocuments ||
      foreground.getCurrentPersistentStateWithDocuments;
    this.runStateChecksum = options.runStateChecksum || foreground.runForegroundStateChecksum;
    this.onNewEventFromForeground = options.onNewEventFromForeground || (() => undefined);
    this.onLoggedOut = options.onLoggedOut || (() => undefined);
    this.shouldSkipSearchIndexing = options.shouldSkipSearchIndexing ?? false;

    this.initUnsafeAppClosingWarnings();
  }

  isOnline() {
    return networkDetector.isOnline;
  }

  initUnsafeAppClosingWarnings() {
    if (!isWebApp) {
      return;
    }
    window.addEventListener('beforeunload', (event) => {
      if (!this.safeToInterrupt()) {
        event.returnValue = "We're still saving your changes. If you exit now you risk losing them.";
      }
    });
  }

  async requestWithAuth(input: RequestInfo, init?: RequestInit): ReturnType<typeof requestWithAuth> {
    // Helper just so we can stub this for tests
    return requestWithAuth(input, init);
  }

  async setUpInitialState({ shouldMinimize }: { shouldMinimize?: boolean } = {}): Promise<{
    persistentState: PersistentState | MinimalPersistentState;
  }> {
    // Be careful. It's best if the types are strictly correct in here. That's why lodash isn't used a lot.
    const prepareResult = (
      persistentState: PersistentState,
    ): Awaited<ReturnType<StateSyncer['setUpInitialState']>> => {
      // this is a bit of a hack to make sure that older clients with cached state don't have undefined keys:
      if (
        persistentState.rssFoldersAndItems === undefined ||
        JSON.stringify(persistentState.rssFoldersAndItems) === '{}'
      ) {
        persistentState.rssFoldersAndItems = [];
      }

      if (!shouldMinimize) {
        return { persistentState };
      }
      return minimizeState(persistentState);
    };

    let draftStateResult = await this.getPersistentStateFromCache();

    if (!draftStateResult) {
      // Special case: new client that had no state cached
      foreground.updatePersistentStateLoadingState(PersistentStateLoadingState.DownloadingDocuments);
      logger.time('initial state write');
      let totalNumberOfDocumentsToLoad = 0;
      const insertionPromises: Promise<void>[] = [];
      const writer = await this._createInitialSyncWriter();

      // Fetch the documents from the server one page at a time, inserting as they arrive for optimal speed:
      // Ensure search DB is initialized before starting initial sync to prevent migration state data races.
      await foreground.searchInit();
      const latestStateFromCloud = await this.pullLatestStateFromCloud(
        false,
        async (partialServerState, hasMoreToFetch, pageNumber) => {
          const documentValues = Object.values(partialServerState.documents ?? {});
          totalNumberOfDocumentsToLoad += documentValues.length;
          foreground.updatePersistentStateLoadingState(
            PersistentStateLoadingState.AddingDocumentsToDatabase,
          );
          foreground.updatePersistentStateTotalNumberOfDocumentsToLoad(
            `${totalNumberOfDocumentsToLoad}${hasMoreToFetch ? '+' : ''}`,
          );
          insertionPromises.push(writer.writeBatch(documentValues, insertionPromises, pageNumber));
        },
      );
      await Promise.all(insertionPromises);
      logger.timeEnd('initial state write');

      // Now that the docs exist in rxdb, we don't actually need them in state anymore.
      draftStateResult = latestStateFromCloud;
      // what a stupid ugly but necessary Typescript hack to prevent an unnecessary copy
      delete (draftStateResult as PersistentState & { documents?: IdToDocumentMap }).documents;
      await this.database.collections.state_syncing_key_value_pairs.bulkUpsert(
        [
          {
            id: this._prefixCacheKey('stateFetchMeta'),
            value: stringifySafe(this.lastStateFetchMeta),
          },
          {
            id: this._prefixCacheKey('persistentStateBlob2'),
            value: stringifySafe(draftStateResult),
          },
        ],
        {},
      );
    }

    if (this.syncingStopped) {
      this.syncingStopped = false;
      this.consumeUpdateQueues();
    }

    this.syncServerStateForNext(20);

    foregroundPortalGate.emit('persistentStateLoadedFromServer');
    if (this.onPersistentStateLoadedFromServer) {
      this.onPersistentStateLoadedFromServer();
    }

    return prepareResult(draftStateResult);
  }

  // eslint-disable-next-line @typescript-eslint/no-explicit-any
  onSyncingError(err: any, errorMessage: string, extraData: any) {
    if (err?.response?.status === 401) {
      // User is logged out: just stop syncing
      // TODO: should we clear the cache here? Probably...
      this.syncingStopped = true;
      this.onLoggedOut();
      return;
    }

    const currentSyncingState = {
      useFullServerSyncOnNextPull: this.useFullServerSyncOnNextPull,
      serverToForegroundUpdateQueue: this.serverToForegroundUpdateQueue,
      consumeServerToForegroundUpdatesUntilTime: this.consumeServerToForegroundUpdatesUntilTime,
      lastServerToForegroundUpdateThisSession: this.lastServerToForegroundUpdateThisSession,
      cachedUpdatesStats: this.cachedUpdatesStats,
      isMergingStateCache: this.isMergingStateCache,
      isSendingUpdatesToServer: this.isSendingUpdatesToServer,
      isCommittingServerChangesToCache: this.isCommittingServerChangesToCache,
    };

    // TODO: do we want to capture more debugging info here, eg cached persistent state, relevant state data, properties from this class instance, etc?
    logger.error(errorMessage, { err, extraData, currentSyncingState });
    exceptionHandler.captureException(err, {
      extra: { errorMessage, ...currentSyncingState, ...extraData },
    });
    this._exception = err;
  }

  async syncServerStateForNext(numSeconds: number) {
    this.consumeServerToForegroundUpdatesUntilTime = Math.max(
      this.consumeServerToForegroundUpdatesUntilTime,
      nowTimestamp() + numSeconds * 1000,
    );
  }

  async foregroundToServerUpdatesStillQueued() {
    return (await this.getQueuedUpdateKeysForServer(true)).length;
  }

  async getQueuedUpdateKeysForServer(forceDiskCheck = false): Promise<string[]> {
    return (await this.getQueuedUpdatesForServer(forceDiskCheck)).map(({ id }) => id);
  }

  async getQueuedUpdatesForServer(
    forceDiskCheck = false,
  ): Promise<DatabaseCollectionNamesToDocType['state_syncing_updates'][]> {
    if (!forceDiskCheck && this.cachedUpdatesStats.serverUpdates <= 0) {
      // We use our serverUpdates count solely for this optimization, to avoid expensive calls to cache.keys()
      return [];
    }

    if (!this.database.isInitialized()) {
      return [];
    }
    const results = await this.getAllUpdatesFromDisk(0);
    this.cachedUpdatesStats.serverUpdates = results.length;

    return results;
  }

  async getAllCachedUpdates(): Promise<DatabaseCollectionNamesToDocType['state_syncing_updates'][]> {
    return this.database.collections.state_syncing_updates.find({
      selector: this._getIdPrefixSelector(),
    });
  }

  removeNonSyncingPropertiesFromAllDocuments(documents: IdToDocumentMap | null) {
    if (!documents) {
      return;
    }

    Object.keys(documents).forEach((id) => {
      removeNonSyncingPropertiesFromDocument(documents[id]);
    });
  }

  async pullLatestStateFromCloud(
    onlyFetchUpdates = false,
    // optional callback to allow processing server state in batches (of 10000 elements)
    runInBatch: (
      partialServerState: ReaderStateResponse,
      hasMoreToFetch: boolean,
      pageNumber: number,
    ) => Promise<void> = async (
      _partialServerState: ReaderStateResponse,
      hasMoreToFetch: boolean,
      pageNumber: number,
    ) => {},
  ): Promise<PersistentStateWithDocuments> {
    const queryParams = new URLSearchParams();
    queryParams.append('schemaVersion', PERSISTENT_SCHEMA_VERSION.toString());

    if (onlyFetchUpdates) {
      const timeOfLastStateFetch = (
        (await this._getKeyValuePairFromDatabaseById(
          this._prefixCacheKey('stateFetchMeta'),
        )) as StateFetchMeta | null
      )?.time_of_last_fetch;
      if (timeOfLastStateFetch) {
        queryParams.append('filter[updated_at][gt]', timeOfLastStateFetch.toString());
      }
    }
    const serverState: ReaderStateResponse = await this._fetchServerState(queryParams);
    let paginationStartId: string | undefined = serverState.meta?.pagination_next_start_id;
    let pageNumber = 0;
    runInBatch(serverState, Boolean(paginationStartId), pageNumber);

    // If there are more documents, keep on fetching pages until we have all of them:
    while (paginationStartId !== undefined) {
      queryParams.set('pagination_start_id', paginationStartId);
      const partialServerState = await this._fetchServerState(queryParams);
      paginationStartId = partialServerState.meta?.pagination_next_start_id;
      pageNumber += 1;
      this.removeNonSyncingPropertiesFromAllDocuments(partialServerState.documents);
      Object.assign(serverState.documents ?? {}, partialServerState.documents);
      runInBatch(partialServerState, Boolean(paginationStartId), pageNumber);
    }

    // Pop the syncing metadata (from the server) of the last state fetch off of our response and save it
    // We'll commit it to cache once this sync goes through...
    if (serverState.meta === undefined) {
      throw new Error('fetch state returned no meta');
    }
    this.lastStateFetchMeta = serverState.meta;
    delete serverState.meta;

    return serverState;
  }

  async consumeServerToForegroundUpdates(): Promise<void> {
    // Only pull from the server once every STATE_PULL_FROM_SERVER_RATE_LIMIT_MS to not overwhelm the server
    if (
      this.lastServerToForegroundUpdateThisSession >
      nowTimestamp() - STATE_PULL_FROM_SERVER_RATE_LIMIT_MS
    ) {
      return;
    }
    // Only pull state if something has triggered a sync -- we don't want to be running this allll the time.
    // TODO(minor): fix this so that a client will pull from server even if it was just sending out updates for the first n seconds.
    if (nowTimestamp() > this.consumeServerToForegroundUpdatesUntilTime) {
      return;
    }

    // If we have queued updates to be sent out from the client to server, then wait until they're done first
    // before pulling the updates from the cloud. This is to avoid a race condition where our stale server state
    // overwrites local changes.
    if (await this.foregroundToServerUpdatesStillQueued()) {
      logger.debug(
        'Waiting till next loop before pulling updates from server because we already have updates queued from the client..',
      );
      return;
    }

    let currentState: PersistentStateWithDocuments | undefined;
    let newState: PersistentStateWithDocuments;

    if (!this.useFullServerSyncOnNextPull) {
      newState = await this.pullLatestStateFromCloud(true);
      currentState = await this.getCurrentPersistentStateWithDocuments(
        this.database,
        Object.keys(newState.documents ?? {}),
      );

      if (!currentState || !currentState.documents) {
        return;
      }

      const onlyRelevantCachedDocuments: IdToDocumentMap = {};
      // Loop through each document that the server has told us was updated:
      Object.keys(newState?.documents ?? {}).forEach((id) => {
        // If the document already existed locally, use the cached local document for diffing.
        if (currentState?.documents?.[id]) {
          onlyRelevantCachedDocuments[id] = currentState.documents[id];
        }

        // If the server is telling us this document was deleted, remove it from our new state entirely.
        if (newState?.documents?.[id] && newState?.documents?.[id]?.deleted_at) {
          // This will cause the jsondiff to delete the document locally if it does exist.
          // If it doesn't exist locally, nothing will happen.
          delete newState.documents[id];
        }
      });
      // eslint-disable-next-line require-atomic-updates
      currentState.documents = onlyRelevantCachedDocuments;
    } else {
      currentState = await this.getCurrentPersistentStateWithDocuments(this.database);
      if (!currentState || !currentState.documents) {
        return;
      }
      // If we're not doing a quick sync, just compare the full state blob we have from the server:
      try {
        newState = await this.pullLatestStateFromCloud();
      } catch (e) {
        return;
      }
    }
    // eslint-disable-next-line require-atomic-updates
    this.lastServerToForegroundUpdateThisSession = nowTimestamp();
    // eslint-disable-next-line require-atomic-updates
    this.useFullServerSyncOnNextPull = false;

    // Generate PersistentUpdates for changes to both documents or normal state (these can't mix):
    const docPatch = jsonpatch.compare(pick(currentState, 'documents'), pick(newState, 'documents'));
    if (docPatch.length > 0) {
      this.serverToForegroundUpdateQueue.push({
        patch: docPatch,
        reverse: jsonpatch.compare(newState.documents, currentState.documents),
        timestamp: nowTimestamp(),
      });
    }
    const statePatch = jsonpatch.compare(omit(currentState, 'documents'), omit(newState, 'documents'));
    if (statePatch.length > 0) {
      this.serverToForegroundUpdateQueue.push({
        patch: statePatch,
        reverse: jsonpatch.compare(omit(newState, 'documents'), omit(currentState, 'documents')),
        timestamp: nowTimestamp(),
      });
    }

    if (docPatch.length === 0 && statePatch.length === 0) {
      return;
    }
    // If we have updates, apply them to the foreground:
    logger.debug(
      `Updates from server on consumeServerToForegroundUpdates: at ${nowTimestamp()} ${
        this.cacheKeyPrefix
      }`,
      {
        docPatch,
        statePatch,
      },
    );
    foreground.setAreServerChangesBeingAppliedToForeground(true);
    await this.applyServerChangesToForeground();
    foreground.setAreServerChangesBeingAppliedToForeground(false);
  }

  async applyServerChangesToForeground() {
    if (!this.serverToForegroundUpdateQueue.length || !this.database.isInitialized()) {
      return;
    }

    // Last chance: don't load our updates from the server until after we send out our local updates _to_ the server
    // TODO(minor): we can actually solve this by only "committing" changes from the foreground from memory->disk in the core event loop...
    //  this would also save memory, but would mean users could lose changes while waiting for network pulls to finish.
    if (await this.foregroundToServerUpdatesStillQueued()) {
      this.serverToForegroundUpdateQueue.splice(0, this.serverToForegroundUpdateQueue.length);
      logger.debug(
        'Waiting till next loop before pulling updates from server because we already have updates queued from the client..',
      );
      return;
    }

    try {
      await this.onNewUpdatesForForeground(this.serverToForegroundUpdateQueue);
    } catch (err) {
      this.onSyncingError(
        err,
        'Error applying server -> foreground update. Pulling from server to try again...',
        { updates: this.serverToForegroundUpdateQueue },
      );
      this.serverToForegroundUpdateQueue.splice(0, this.serverToForegroundUpdateQueue.length);
      return;
    }

    // Now that we've successfully applied our server patch to the foreground, commit it to the cache:
    this.isCommittingServerChangesToCache = true;
    for (const update of this.serverToForegroundUpdateQueue) {
      if (_isDocumentUpdate(update)) {
        // Document changes are already persisted via rxdb -- after they're sent to server we don't need them
        continue;
      }
      await this.database.collections.state_syncing_updates.incrementalUpsert(
        {
          id: this._prefixCacheKey(ulid().toLowerCase()),
          sentToServer: 1,
          value: stringifySafe(update),
        },
        {},
      );
      this.cachedUpdatesStats.updates++;
      this.cachedUpdatesStats.patches += update.patch.length;
    }

    // Now that the changes are committed to disk, save our stateFetchMeta so next time we only pull newer changes
    await this.database.collections.state_syncing_key_value_pairs.incrementalUpsert(
      {
        id: this._prefixCacheKey('stateFetchMeta'),
        value: stringifySafe(this.lastStateFetchMeta),
      },
      {},
    );

    this.isCommittingServerChangesToCache = false;

    if (this.lastStateFetchMeta?.documents_checksum) {
      // Run our checksum to make sure the sync worked
      const checkSumSucceeded = await this.runStateChecksum(this.lastStateFetchMeta.documents_checksum);
      if (!checkSumSucceeded) {
        // TODO(minor): make sure we reset state to the server even more safely than just skipping quick sync..
        //  Maybe even move out this checksum stuff to be independent of this loop (in case it fails somehow and never gets here)
        this.useFullServerSyncOnNextPull = true;
      }
    }

    // Remove the updates we've consumed
    this.serverToForegroundUpdateQueue.splice(0, this.serverToForegroundUpdateQueue.length);
  }

  async queueStateUpdateFromForeground(event: UserEventWithDataUpdate): Promise<void> {
    // TODO(minor): we could probably batch the writes of these events to disk by keeping them in memory first
    //  and then using some kind of bulk write op (which localforage doesn't support natively, but indexeddb/sqlite do)
    logger.debug('Queueing foreground update:', {
      event,
      forwardPatch: event.dataUpdates.forwardPatch,
      reversePatch: event.dataUpdates.reversePatch,
    });
    await this.database.collections.state_syncing_updates.incrementalUpsert(
      {
        id: this._prefixCacheKey(event.id),
        sentToServer: 0,
        value: stringifySafe({
          patch: event.dataUpdates.forwardPatch,
          reverse: event.dataUpdates.reversePatch,
          timestamp: event.timestamp,
          eventForServer: event,
        }),
      },
      {},
    );
    this.cachedUpdatesStats.serverUpdates++;
    this.cachedUpdatesStats.updates++;
    this.cachedUpdatesStats.patches += event.dataUpdates.forwardPatch.length;
    this.onNewEventFromForeground(event);
  }

  safeToInterrupt() {
    // When any of these flags are true, we are committing changes to cache that, if interrupted, could
    // leave the user's client in a broken state.

    // This function is only a stopgap -- we can only ever really be sure about safety by
    // batching our cache writes in transactions that either succeed or fail fully together as well as
    // having robust fallback mechanisms when syncing is broken locally.
    return (
      !this.isSendingUpdatesToServer &&
      !this.isMergingStateCache &&
      !this.isCommittingServerChangesToCache
    );
  }

  getNumUpdatesToSendToServerInSingleRequest(updates: PersistentUpdateForServer[]) {
    // TODO: hopefully we can raise this a lot after we make update_state faster. See RW-5503.
    const maxPatchesPerRequest = 300;

    let eventsToSendThisRequest: number | null = null;
    let totalPatches = 0;

    for (let i = 0; i < updates.length; i++) {
      const update = updates[i];
      totalPatches += update.patch.length;
      if (totalPatches > maxPatchesPerRequest) {
        // If there are too many patches in this group of updates,
        // limit the number of updates to <10 to keep total patches below maxPatchesPerRequest
        eventsToSendThisRequest = i + 1;
        break;
      }
    }

    return eventsToSendThisRequest ?? 10;
  }

  // eslint-disable-next-line @typescript-eslint/no-explicit-any
  onServerUpdateRequestError(error: any) {
    // We're either offline or the server is erroring, leave the updates in our queue to be tried again
    // TODO(minor): detect if user is consistently 500ing, offer them to fix (clear cache) with risk of some
    //  of their changes being lost.
    const retryInSeconds = this.nextNetworkPostRetryAt ? 10 : 1;
    logger.debug(`Error in network request, trying again in ${retryInSeconds}s: `, error);
    this.nextNetworkPostRetryAt = nowTimestamp() + retryInSeconds * 1000;
    this.isSendingUpdatesToServer = false;
  }

  async consumeBackgroundToServerUpdates() {
    // If we're on web, we have to check the cache every time, as there could be updates
    //  from other tabs that we need to send to the server.
    const updatesFromDatabase = await this.getQueuedUpdatesForServer(isWebApp || isExtension);

    if (!updatesFromDatabase.length) {
      return;
    }
    if (this.nextNetworkPostRetryAt && nowTimestamp() < this.nextNetworkPostRetryAt) {
      return;
    }

    logger.debug('Starting consumeBackgroundToServerUpdates (we have updates)');

    logger.time('fetchingAllServerUpdatesFromDisk');
    const updates = updatesFromDatabase.map(
      ({ value }) => JSON.parse(value) as PersistentUpdateForServer,
    );
    logger.timeEnd('fetchingAllServerUpdatesFromDisk');

    // Sort the events from earliest to latest so we can send them to server in the right order:
    updates.sort((updateA, updateB) => updateA.timestamp - updateB.timestamp);

    if (this.truncateCachedEvents) {
      for (const update of updates) {
        for (const operation of update.reverse) {
          const isFullDocumentAdd =
            operation.op === 'add' &&
            operation.path.startsWith('/documents/') &&
            operation.path.split('/').length === 3;
          if (!isFullDocumentAdd) {
            continue;
          }
          removeNonSyncingPropertiesFromDocument(operation.value);
        }
      }
    }

    const updatesToSendThisRequest = this.getNumUpdatesToSendToServerInSingleRequest(updates);

    updates.splice(updatesToSendThisRequest, updates.length - updatesToSendThisRequest);

    // It's relaaatively safe for the user to interrupt these requests. It could result in us sending the same events
    // to the server again in the next session, but the backend is safe towards that -- it will swallow duplicate events
    // and return successfully so we'd clean up the cache in the next session.
    this.isSendingUpdatesToServer = true;
    try {
      const response = await this.requestWithAuth(`${getServerBaseUrl()}/reader/api/state/update/`, {
        ...(await this._getCommonFetchOptions()),
        body: JSON.stringify({
          events: updates.map((u) => u.eventForServer),
          schemaVersion: PERSISTENT_SCHEMA_VERSION,
        }),
        method: 'POST',
      });

      if (response.ok) {
        // TODO(minor): find a better way to communicate. We shouldn't need events.
        // It should be via the app state:
        // https://readwise.slack.com/archives/C6MTBU1J4/p1637249275060300
        foregroundPortalGate.emit(
          'backgroundToServerUpdatesConsumed',
          updates.map(({ eventForServer }) => eventForServer.id),
        );
      } else {
        const responseBody = await response.text();
        logger.error('Update network request failed', {
          responseStatus: response.status,
          responseBodyLength: responseBody.length,
          updates,
        });
        // alert(`ERROR: Update network request failed: Status ${response.status} \n\n See console for more details. \n\nEvents: ${JSON.stringify(updates)}`);
        this.onServerUpdateRequestError(null);
        return;
      }

      await Promise.all(
        map(updates, async (update) => {
          if (_isDocumentUpdate(update)) {
            // We don't need to hold on to document updates after sending to server, they're persisted via rxdb locally
            await this.database.collections.state_syncing_updates.deleteByIds(
              [this._prefixCacheKey(update.eventForServer.id)],
              {},
            );
            return;
          }
          // Now that it's been sent to the server, we convert our PersistentUpdateForServer to a regular PersistentUpdate
          const newUpdate: PersistentUpdate = omit(update, 'eventForServer');
          await this.database.collections.state_syncing_updates.incrementalUpsert(
            {
              id: this._prefixCacheKey(update.eventForServer.id),
              sentToServer: 1,
              value: stringifySafe(newUpdate),
            },
            {},
          );
        }),
      );
      this.cachedUpdatesStats.serverUpdates -= updates.length;
      this.isSendingUpdatesToServer = false;

      // eslint-disable-next-line @typescript-eslint/no-explicit-any
    } catch (error: any) {
      if (error.response?.status === 401) {
        if (!isExtension && !isTest) {
          // eslint-disable-next-line no-alert
          alert(
            "You're not logged in to the correct account! Try signing in at readwise.io before continuing...",
          );
        }
        this.onLoggedOut();
      } else if (error.response?.status === 413) {
        if (isExtension) {
          // This is catastrophic: sometimes a request payload is too large (due to a bug) and there's nothing we can do with it
          this.syncingStopped = true;
          await Promise.all([
            this.database.collections.documents.delete(
              {
                selector: this._getIdPrefixSelector(),
              },
              {
                eventName: 'documents-deleted',
                isUndoable: false,
                shouldNotSendPersistentChangesToServer: true,
                userInteraction: null,
              },
            ),
            this.database.collections.state_syncing_key_value_pairs.delete(
              {
                selector: this._getIdPrefixSelector(),
              },
              {},
            ),
            this.database.collections.state_syncing_updates.delete(
              {
                selector: this._getIdPrefixSelector(),
              },
              {},
            ),
          ]);
          this.onLoggedOut();
        } else {
          this.truncateCachedEvents = true;
        }
      }
      this.onServerUpdateRequestError(error);
      return;
    }
    // eslint-disable-next-line require-atomic-updates
    this.nextNetworkPostRetryAt = null;
    logger.debug('Finished consumeBackgroundToServerUpdates');
  }

  applyUpdatesToStateBlob(state: PersistentState, updates: PersistentUpdate[]): boolean {
    let currentOperation: Operation | null = null;
    try {
      updates.filter(_isNonDocumentUpdate).forEach((update) => {
        update.patch.forEach((p) => {
          // TODO: why does currentOperation stay set to null in the Sentry call despite it being set here?
          currentOperation = cloneDeep(p);
          safelyApplyJsonPatch(state, [p]);
        });
      });
      return true;
    } catch (err) {
      this.onSyncingError(err, 'FAILED to apply cached updates to state blob', { currentOperation });
      return false;
    }
  }

  async getPersistentStateFromCache(): Promise<PersistentState | null> {
    logger.debug('Starting getPersistentStateFromCache');
    logger.time('getPersistentStateFromCache');
    const persistentState = (await this._getKeyValuePairFromDatabaseById(
      this._prefixCacheKey('persistentStateBlob2'),
    )) as PersistentState | null;
    if (!persistentState) {
      return null;
    }

    const allUpdatesFromDatabase = await this.database.collections.state_syncing_updates.find({
      selector: this._getIdPrefixSelector(),
    });
    const nonDocumentUpdates = allUpdatesFromDatabase
      .map(({ value }) => JSON.parse(value) as PersistentUpdate)
      .filter(_isNonDocumentUpdate);
    nonDocumentUpdates.sort((updateA, updateB) => updateA.timestamp - updateB.timestamp);

    this.cachedUpdatesStats.updates = nonDocumentUpdates.length;
    this.cachedUpdatesStats.patches = flatten(nonDocumentUpdates.map((u) => u.patch)).length;

    const didApplyUpdates = this.applyUpdatesToStateBlob(persistentState, nonDocumentUpdates);

    // Did it not apply updates?
    if (!didApplyUpdates) {
      // TODO: in this case, do we actually need to clear the docs? We do right now but may not be necessary.
      // If we fail applying updates from cache, revert to the state from the server...
      // This was potentially caused by the user switching accounts (without their cache being cleared) or some other bug.
      await Promise.all([
        this.database.collections.documents.delete(
          {
            selector: this._getIdPrefixSelector(),
          },
          {
            eventName: 'documents-deleted',
            isUndoable: false,
            shouldNotSendPersistentChangesToServer: true,
            userInteraction: null,
          },
        ),
        this.database.collections.state_syncing_key_value_pairs.deleteByIds(
          [this._prefixCacheKey('persistentStateBlob2')],
          {},
        ),
        this.database.collections.state_syncing_updates.deleteByIds(
          allUpdatesFromDatabase.filter(({ sentToServer }) => sentToServer).map(({ id }) => id),
          {},
        ),
      ]);
      return null;
    }

    logger.timeEnd('getPersistentStateFromCache');
    logger.debug('Finished getPersistentStateFromCache');
    return persistentState;
  }

  async catchUpForegroundStateFromChangesOnOtherTabs() {
    // Since documents are in rxdb, we only need to handle cross-tab updates for non-document state
    const latestMemoryState = omit(
      await this.getCurrentPersistentStateWithDocuments(this.database, []),
      'documents',
    );
    let latestCachedState;

    try {
      latestCachedState = await this.getPersistentStateFromCache();
    } catch (exception) {
      logger.warn(
        'Exception getting persistent state from cache to catch up current tab... will try again',
        { exception },
      );
      // Sometimes this can fail with a race condition if it happens right when another tab
      // is merging state. If that's the case, no problem, we'll try again on the next loop.
      return;
    }
    if (!latestCachedState || !latestMemoryState) {
      return;
    }

    const patch = jsonpatch.compare(latestMemoryState, latestCachedState);
    logger.debug('Applying update to foreground to get tab up to date:', { patch });
    if (patch.length) {
      const update: PersistentUpdate = {
        patch,
        reverse: jsonpatch.compare(latestCachedState, latestMemoryState),
        timestamp: nowTimestamp(),
      };
      await this.onNewUpdatesForForeground([update]);
    }
  }

  async getAllUpdatesFromDisk(sentToServer: number) {
    // Get all updates, but page 1000 at a time to avoid timeouts
    const limit = 1000;
    const allUpdatesFromDatabase = await this.database.collections.state_syncing_updates.find({
      selector: {
        ...this._getIdPrefixSelector(),
        sentToServer,
      },
      sort: [{ id: 'asc' }],
      limit,
    });
    if (allUpdatesFromDatabase.length === limit) {
      let moreUpdates = [];
      do {
        moreUpdates = await this.database.collections.state_syncing_updates.find({
          selector: {
            ...this._getIdPrefixSelector(),
            sentToServer,
            id: { $gt: allUpdatesFromDatabase[allUpdatesFromDatabase.length - 1].id },
          },
          sort: [{ id: 'asc' }],
          limit,
        });
        allUpdatesFromDatabase.push(...moreUpdates);
      } while (moreUpdates.length > 0);
    }
    return allUpdatesFromDatabase;
  }

  async mergeUpdatesIntoCachedState() {
    logger.debug('Starting mergeUpdatesIntoCachedState');
    logger.time('mergeUpdatesIntoCachedState');

    // TODO: this only works if all sentToServer=1 updates are before any sentToServer=0 ones
    //  this should be the case, but what if there's a race? Maybe test this?
    const allUpdatesFromDatabase = await this.getAllUpdatesFromDisk(1);

    const allPersistentUpdates = allUpdatesFromDatabase.map(
      (item) => JSON.parse(item.value) as PersistentUpdateForServer,
    );
    allPersistentUpdates.sort((updateA, updateB) => updateA.timestamp - updateB.timestamp);

    const currentState: PersistentStateWithDocuments = await this._getKeyValuePairFromDatabaseById(
      this._prefixCacheKey('persistentStateBlob2'),
    );

    logger.time('writing data for mergeUpdatesIntoCachedState');
    logger.debug(
      'mergeUpdatesIntoCachedState: fetched currentState and allPersistentUpdates from cache. Going to apply changes now...',
    );

    const didApplyUpdates = this.applyUpdatesToStateBlob(currentState, allPersistentUpdates);
    if (!didApplyUpdates) {
      throw new Error('Failed to apply changes to cached state in mergeUpdatesIntoCachedState');
    }

    this.isMergingStateCache = true;

    await this.database.collections.state_syncing_key_value_pairs.incrementalUpsert(
      {
        id: this._prefixCacheKey('persistentStateBlob2'),
        value: stringifySafe(omit(currentState, ['documents'])),
      },
      {},
    );

    const groupsOfIds = chunk(
      allUpdatesFromDatabase.map(({ id }) => id),
      1000,
    );
    for (const ids of groupsOfIds) {
      await this.database.collections.state_syncing_updates.deleteByIds(ids, {});
    }

    this.cachedUpdatesStats.updates = 0;
    this.cachedUpdatesStats.patches = 0;

    this.isMergingStateCache = false;
    logger.timeEnd('writing data for mergeUpdatesIntoCachedState');
    logger.timeEnd('mergeUpdatesIntoCachedState');
    logger.debug('Finished mergeUpdatesIntoCachedState');
  }

  async maybeMergeUpdatesIntoCachedState() {
    if (this.cachedUpdatesStats.updates < 100 && this.cachedUpdatesStats.patches < 800) {
      return;
    }
    try {
      await this.mergeUpdatesIntoCachedState();
      // eslint-disable-next-line @typescript-eslint/no-explicit-any
    } catch (err: any) {
      if (err?.message !== 'Failed to apply changes to cached state in mergeUpdatesIntoCachedState') {
        this.onSyncingError(err, 'Error in mergeUpdatesIntoCachedState', {});
      }

      // If we have any error merging state updates into the cache, clear the cache so we can load
      // fresh data from the server and no have any inconsistencies
      this.isMergingStateCache = true;

      const data = await this.pullLatestStateFromCloud(true);

      await Promise.all([
        this.database.collections.state_syncing_key_value_pairs.bulkUpsert(
          [
            {
              id: this._prefixCacheKey('stateFetchMeta'),
              value: stringifySafe(this.lastStateFetchMeta),
            },
            {
              id: this._prefixCacheKey('persistentStateBlob2'),
              value: stringifySafe(omit(data, ['documents'])),
            },
          ],
          {},
        ),
        this.database.collections.state_syncing_updates.delete(
          {
            selector: {
              ...this._getIdPrefixSelector(),
              sentToServer: 1,
            },
          },
          {},
        ),
      ]);

      this.isMergingStateCache = false;
    }
  }

  // We use a lock to ensure that only a single tab (on web) does syncing at once, to avoid duplicate updates from
  // race conditions (ie two tabs pulling/pushing state at the same time).
  async runWithSyncingLock(lockedFunctionToRun: () => Promise<void>): Promise<boolean> {
    // We only have one thread ever writing to cache on mobile/desktop/extension, so no need for locking
    if (environmentHasSingleTab || !lockManager) {
      await lockedFunctionToRun();
      return true;
    }
    // Don't bother syncing if the web tab is not focused
    if (!this.isFocused) {
      logger.debug("Not taking lock, tab doesn't have focus ");
      return false;
    }

    return lockManager.request(
      `${WEB_LOCK_KEY}-${this.cacheKeyPrefix}`,
      { ifAvailable: true },
      async (lockAcquired) => {
        if (!lockAcquired) {
          logger.debug('Not taking lock, another tab already has it ');
          // if another tab has the lock, just skip. we'll get it next time we loop.
          return false;
        }
        await lockedFunctionToRun();
        return true;
      },
    );
  }

  async slowDownSyncingLoopIfNotFocused() {
    // For non-focused tabs (or if the app is in the background), let the thread idle.
    // But allow this delay to be interrupted if the tab/app is focused.
    if (this.isFocused) {
      return;
    }
    const totalDelayTime = isMobile ? 4000 : 400;
    const delayInterval = 100;
    for (let i = 0; i < totalDelayTime / delayInterval; i++) {
      if (this.isFocused) {
        logger.debug('Expediting sync because unfocused tab was focused');
        return;
      }
      await delay(delayInterval);
    }
    logger.debug(`slowed down syncing by ${totalDelayTime}ms because app in background`);
  }

  async consumeUpdateQueues() {
    if (this.syncingStopped) {
      return;
    }

    await this.slowDownSyncingLoopIfNotFocused();

    try {
      await this.runWithSyncingLock(async () => {
        logger.debug('Lock acquired, doing syncing ');

        if (this.wentFromUnFocusedToFocused && !environmentHasSingleTab) {
          logger.debug('Catching up tab that went from un-focused to focused');
          await this.catchUpForegroundStateFromChangesOnOtherTabs();
          this.wentFromUnFocusedToFocused = false;
        }

        if (this.isOnline() && this.database.isInitialized()) {
          await this.consumeBackgroundToServerUpdates();

          await this.consumeServerToForegroundUpdates();
          foreground.onConsumeServerToForegroundUpdatesFinished();

          await this.maybeMergeUpdatesIntoCachedState();
        }
      });
    } catch (e) {
      foreground.setAreServerChangesBeingAppliedToForeground(false);
      foreground.onConsumeServerToForegroundUpdatesFinished();
      this.onSyncingError(e, 'Error raised from consumeUpdateQueues loop', {});
    }

    if (this.syncingStopped) {
      return;
    }

    // loop once again:
    setTimeout(this.consumeUpdateQueues.bind(this), UPDATE_QUEUE_INTERVAL);
  }

  async getCacheDataForDebugging() {
    return {
      state_syncing_key_value_pairs:
        await this.database.collections.state_syncing_key_value_pairs.findAll(),
      state_syncing_updates: await this.database.collections.state_syncing_updates.findAll(),
    };
  }

  async downloadCacheForDebugging() {
    const cacheData = await this.getCacheDataForDebugging();
    const blob = new Blob([JSON.stringify(cacheData)], { type: 'text/json' });
    const elem = window.document.createElement('a');
    elem.href = window.URL.createObjectURL(blob);
    elem.download = 'debug.json';
    document.body.appendChild(elem);
    elem.click();
    document.body.removeChild(elem);
  }

  onAppFocusChange(isFocused: boolean) {
    logger.debug('App focus change: ', { isFocused });
    if (!this.isFocused) {
      this.wentFromUnFocusedToFocused = true;
    }
    this.isFocused = isFocused;
  }

  async _fetchServerState(queryParams: URLSearchParams): Promise<ReaderStateResponse> {
    const resp = await this.requestWithAuth(`${getServerBaseUrl()}/reader/api/state/?${queryParams}`, {
      ...(await this._getCommonFetchOptions()),
      method: 'GET',
    });

    // Raise an exception with all the response data included, so the caller(s) can use it:
    if (!resp.ok) {
      const exception = new HttpError(resp);
      await exception.getReasonFromJson();
      throw exception;
    }

    return resp.json();
  }

  async _getCommonFetchOptions() {
    return {
      credentials: 'include',
      headers: {},
      mode: 'cors',
    } as RequestInit & { headers: { [key: string]: string } };
  }

  _getIdPrefixSelector() {
    if (!this.cacheKeyPrefix) {
      return {};
    }
    return {
      id: {
        $regex: `^${this.cacheKeyPrefix}`,
      },
    };
  }

  async _getKeyValuePairFromDatabaseById(id: string) {
    const queryResult = await this.database.collections.state_syncing_key_value_pairs.findOne(id);
    if (!queryResult) {
      return null;
    }
    if (!queryResult.value) {
      throw new Error('!queryResult.value');
    }
    return JSON.parse(queryResult.value);
  }

  _prefixCacheKey(input: string) {
    return `${this.cacheKeyPrefix}${input}`;
  }

  async _createInitialSyncWriter() {
    if (!this.database.rxDbInstance) {
      // this should never happen, but I need a type check
      throw new Error('database instance not ready for sync');
    }

    const collection = this.database.rxDbInstance.collections.documents;
    const collectionStorage = collection.storageInstance;
    const collectionSchema = collection.schema;
    const nullObject = {};
    const metaObject = {
      lwt: nowTimestamp(),
    };
    const lastRxDBUpdate = { id: ulid() };

    const preInsertTransformers = collection.getHooks('pre', 'insert').series;
    const preInsertTransfomer = (doc: RxDocumentData<AnyDocument>) => {
      for (const transformer of preInsertTransformers) {
        transformer(doc);
      }
      return doc;
    };
    const postInsertTransformers = collection.getHooks('post', 'insert').series;
    const postInsertTransformer = (doc: RxDocumentData<AnyDocument>) => {
      for (const transformer of postInsertTransformers) {
        transformer(doc);
      }
    };

    const shouldSkipSearchIndexing = this.shouldSkipSearchIndexing;

    return {
      async writeBatch(values: AnyDocument[], insertionPromises: Promise<void>[], pageNumber: number) {
        // Wait for the previous insertions to finish before doing more of them:
        if (pageNumber > 0) {
          await insertionPromises[pageNumber - 1];
        }

        let batchSize = documentUpsertChunkSize;
        while (true) {
          const start = nowTimestamp();
          const batch = values.splice(0, batchSize);
          if (batch.length === 0) {
            break;
          }

          // this is similar to what importJSON does + our middleware
          // TODO: there is a lot of object copying going on everywhere,
          //  maybe some room for improvement
          const docsToWrite = batch.map((doc) => {
            const document: RxDocumentData<AnyDocument> = preInsertTransfomer(
              fillObjectDataBeforeInsert(collectionSchema, doc),
            );

            document.rxdbOnly!.lastRxDBUpdate = lastRxDBUpdate;
            document._meta = metaObject;
            document._rev = '';
            document._attachments = nullObject;
            document._deleted = false;
            return { document };
          });

          await collectionStorage.bulkWrite(docsToWrite, 'state-sync');

          for (const doc of docsToWrite) {
            postInsertTransformer(doc.document);
          }

          const took = nowTimestamp() - start;
          const factor = Math.round(500 / took);
          // make sure we never do less than a 100, or more than 1000
          batchSize = Math.min(1000, Math.max(documentUpsertChunkSize, batchSize * factor));

          foreground.updatePersistentStateLoadedDocumentCountByN(batch.length);
          if (!shouldSkipSearchIndexing) {
            // Need to manually upsert into metadata search, because the insert$ Observable is disabled for initial sync.
            foreground.searchUpsertDocuments(batch);
          }
        }
      },
    };
  }
}

const _isNonDocumentUpdate = (update: PersistentUpdate): boolean => {
  let hasDocUpdates = false;
  let hasNonDocUpdates = false;
  for (const operation of update.patch) {
    if (operation.path.startsWith('/documents/')) {
      hasDocUpdates = true;
    } else {
      hasNonDocUpdates = true;
    }
  }
  if (hasDocUpdates && hasNonDocUpdates) {
    logger.error(
      'Syncing is trying to filter a PersistentUpdate with doc and non-doc operations -- this will break syncing!!',
      { update },
    );
  }
  return hasNonDocUpdates;
};

const _isDocumentUpdate = (update: PersistentUpdate): boolean => {
  return !_isNonDocumentUpdate(update);
};

type ReaderStateResponse = {
  meta?: StateFetchMeta;
} & PersistentStateWithDocuments;
