import {GrpcWebError, Status} from '@pbkit/grpc-web-client';
import {first, fromSingle} from 'pbkit/core/runtime/async/async-generator';
import {defer} from 'pbkit/core/runtime/async/observer';
import {RpcClientImpl} from 'pbkit/core/runtime/rpc';

interface RetryConfig<TMetadata, THeader, TTrailer> {
  rpcClientImpl: RpcClientImpl<TMetadata, THeader, TTrailer>;
  retryCount: number;
  refreshTokenFn: () => any;
}

export function retry<TMetadata, THeader, TTrailer>({
  rpcClientImpl,
  retryCount,
  refreshTokenFn,
}: RetryConfig<TMetadata, THeader, TTrailer>): RpcClientImpl<TMetadata, THeader, TTrailer> {
  return function newRpcClientImpl(methodDescriptor) {
    if (methodDescriptor.requestStream || methodDescriptor.responseStream) {
      throw new Error('retry only supports unary call');
    }
    const rpcMethodImpl = rpcClientImpl(methodDescriptor);
    return function newRpcMethodImpl(req, metadata) {
      const headerPromise = defer<THeader>();
      const trailerPromise = defer<TTrailer>();
      const resAsyncGenerator = (async function* () {
        let tmp: Error | undefined;
        const reqPayload = await first(req);
        for (let i = 0; i <= retryCount; i++) {
          try {
            const rpcMethodResult = rpcMethodImpl(fromSingle(reqPayload), metadata);
            const [header, payload, trailer] = await Promise.all([
              rpcMethodResult[1],
              first(rpcMethodResult[0]),
              rpcMethodResult[2],
            ]);
            headerPromise.resolve(header);
            yield payload;
            trailerPromise.resolve(trailer);
            return;
          } catch (error) {
            tmp = error as Error;
            if (error instanceof GrpcWebError && error.status === Status.UNAUTHENTICATED) await refreshTokenFn();
            else break;
          }
        }
        if (tmp) throw tmp;
      })();
      return [resAsyncGenerator, headerPromise, trailerPromise];
    };
  };
}
