import { sinkPromise } from "@redotech/util/promise";
import { nativeRandom, randomHexString } from "@redotech/util/random";
import { sleep } from "@redotech/util/schedule";
import { RedoMerchantClient } from "../client";
import { getConversationsStream, listen } from "../support/utils";

type Callback = () => any | (() => Promise<any>);

export class MerchantAppEventServer {
  private readonly abortController = new AbortController();

  constructor(private readonly client: RedoMerchantClient) {
    this.init();
  }

  private init() {
    sinkPromise(this.setupTeamListener());
  }

  private async setupTeamListener() {
    try {
      for await (const _ of listen({
        query: () =>
          getConversationsStream({
            authorization: this.client.authorization(),
            signal: this.abortController.signal,
          }),
        loopCondition: true,
      })) {
        Array.from(this.teamTopicListeners.values()).forEach((callback) =>
          callback(),
        );
      }
    } catch (e: unknown) {
      // Retry
      await sleep(Temporal.Duration.from({ seconds: 10 })).then(() => {
        sinkPromise(this.setupTeamListener());
      });
    }
  }

  private readonly teamTopicListeners: Map<string, Callback> = new Map();

  /**
   * Publish-subscribe pattern for getting notified when a conversation has changed. Pass the callback (e.g. a fetch, or setState) that will happen once conversation is updated,
   * and the callback will be run when the conversation has new data to fetch.
   * The callback function will immediately be run at the beginning as well, so you can set-and-forget.
   */
  public subscribeAndCallOnce(callback: Callback): Callback {
    callback();
    return this.subscribe(callback);
  }

  public subscribe(callback: Callback): Callback {
    const callbackId = randomHexString(nativeRandom, 24);
    this.teamTopicListeners.set(callbackId, callback);
    const unlistenCallback = () => this.teamTopicListeners.delete(callbackId);
    return unlistenCallback;
  }
}
