import { ExpandedConversation } from "@redotech/redo-model/conversation";
import { FilterOptionsV2 } from "@redotech/redo-model/conversation-filters";
import {
  SortableConversationTableColumn,
  TableSort,
} from "@redotech/redo-model/table";
import { GetUser } from "@redotech/redo-model/user";
import { TableFetcher, YieldType } from "@redotech/redo-web/table";
import { overwriteCombineMaps } from "@redotech/util/map";
import { BehaviorSubject } from "rxjs";
import { RedoMerchantClient } from "../client";
import { getConversations } from "../client/conversations";
import { getUsers } from "../client/user";

export class ConversationFetcher implements TableFetcher<ExpandedConversation> {
  constructor(
    private readonly client: RedoMerchantClient,
    private readonly userCache: Map<string, GetUser>,
    private readonly onConversationsLoaded?: (() => void) | undefined,
  ) {}

  private readonly conversationIdToConversationSubject = new BehaviorSubject<
    Map<string, ExpandedConversation>
  >(new Map());

  public conversationIdToConversation =
    this.conversationIdToConversationSubject.asObservable();

  private updateConversationsMap(conversations: ExpandedConversation[]) {
    const originalMap = this.conversationIdToConversationSubject.value;
    const newMap = new Map(conversations.map((c) => [c._id, c]));
    this.conversationIdToConversationSubject.next(
      overwriteCombineMaps(originalMap, newMap),
    );
  }

  private async fetchUsers(
    data: ExpandedConversation[],
    signal?: AbortSignal,
  ): Promise<void> {
    const userIdsToFetch = new Set<string>();
    for (const conversation of data) {
      for (const message of conversation.messages) {
        if (
          message.user &&
          !this.userCache.has(message.user) &&
          !userIdsToFetch.has(message.user)
        ) {
          userIdsToFetch.add(message.user);
        }
      }
    }
    if (userIdsToFetch.size > 0) {
      const users = await getUsers(this.client, {
        userIds: Array.from(userIdsToFetch),
        signal,
      });
      for (const user of users) {
        this.userCache.set(user._id, user);
      }
    }
  }
  async counts() {
    return {};
  }
  async *data(
    primaryFilter: string,
    filters: FilterOptionsV2,
    search: string | undefined,
    sort: TableSort<SortableConversationTableColumn> | undefined,
    pageSize: number = 50,
    pageNumber?: number,
    signal?: AbortSignal,
    passThroughValues?: FilterOptionsV2,
  ): AsyncIterator<YieldType<ExpandedConversation>> {
    let cursor: string | undefined;

    const refresh = async (
      pageStop: string | undefined,
      signal: AbortSignal | undefined,
    ) => {
      let items: ExpandedConversation[] = [];
      for (let refreshCursor: string | undefined; ; ) {
        try {
          const { data, pageNext } = await getConversations(this.client, {
            pageContinue: refreshCursor,
            pageStop,
            pageSize,
            filters: passThroughValues,
            sort: passThroughValues?.sort || sort,
            signal,
          });
          this.updateConversationsMap(data);
          await this.fetchUsers(data, signal);
          if (data && data.length > 0) {
            items = [...items, ...data];
          }
          if (pageNext) {
            refreshCursor = pageNext;
          } else {
            break;
          }
        } catch (e: any) {
          if (e.code === "ERR_CANCELED") {
            return [];
          }
          throw e;
        }
      }
      return items;
    };
    for (let pageContinue: string | undefined; ; ) {
      try {
        const { data, pageNext } = await getConversations(this.client, {
          pageContinue,
          pageSize,
          filters: passThroughValues,
          sort: passThroughValues?.sort || sort,
          signal,
        });
        this.updateConversationsMap(data);
        await this.fetchUsers(data, signal);

        cursor = pageNext;
        this.onConversationsLoaded?.();
        yield {
          items: data,
          refresh: async (signal) => refresh(cursor, signal),
        };
        if (pageNext === undefined) {
          break;
        }
        pageContinue = pageNext;
      } catch (e: any) {
        if (e.code !== "ERR_CANCELED") {
          throw e;
        }
        return {
          items: [],
          refresh: async (signal) => refresh(cursor, signal),
          aborted: true,
        } as YieldType<ExpandedConversation>;
      }
    }
  }
}
