package io.confluent.controlcenter.rest;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.google.inject.Provider;
import io.confluent.controlcenter.ControlCenterRbacConfig;
import io.confluent.controlcenter.Rollup;
import io.confluent.controlcenter.data.ClusterMetadataDao;
import io.confluent.controlcenter.data.ScopedKafkaMetadataDao;
import io.confluent.controlcenter.keys.Keys;
import io.confluent.controlcenter.record.Controlcenter;
import io.confluent.controlcenter.rest.req.RangeRequest;
import io.confluent.controlcenter.rest.res.DeliveryResponse;
import io.confluent.controlcenter.serialization.OrderedKeyPrefixedSerdeSupplier;
import io.confluent.controlcenter.streams.TopicStoreModule;
import io.confluent.monitoring.record.Monitoring;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import javax.ws.rs.BadRequestException;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.ForbiddenException;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.state.ReadOnlyWindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Produces({"application/json"})
@Path("/2.0/monitoring/{clusterId}")
/* loaded from: input_file:io/confluent/controlcenter/rest/MessageDeliveryResource.class */
public class MessageDeliveryResource {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) MessageDeliveryResource.class);
    private final OrderedKeyPrefixedSerdeSupplier<Keys.KeyType, Monitoring.MonitoringMessage> keySerdeSupplier;
    private final ControlCenterRbacConfig rbacConfig;
    private Provider<Map<Rollup, ReadOnlyWindowStore<Bytes, Controlcenter.WindowedGrouping>>> groupDb;
    private Provider<Map<Rollup, ReadOnlyWindowStore<Bytes, Monitoring.MonitoringMessage>>> expectedDb;
    private Provider<Map<Rollup, ReadOnlyWindowStore<Bytes, Monitoring.MonitoringMessage>>> actualDb;

    @Context
    private ScopedKafkaMetadataDao scopedKafkaMetadataDao;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:io/confluent/controlcenter/rest/MessageDeliveryResource$AggregationType.class */
    public enum AggregationType {
        ConsumerGroup,
        Topic
    }

    private static String validateClusterId(String str) {
        if (str == null) {
            throw new BadRequestException("missing clusterId");
        }
        return ClusterMetadataDao.getInternalKafkaId(str);
    }

    @Inject
    public MessageDeliveryResource(@TopicStoreModule.GroupStore Provider<Map<Rollup, ReadOnlyWindowStore<Bytes, Controlcenter.WindowedGrouping>>> provider, @TopicStoreModule.MonitoringMessageAggregatorWindowsStore Provider<Map<Rollup, ReadOnlyWindowStore<Bytes, Monitoring.MonitoringMessage>>> provider2, @TopicStoreModule.MonitoringStreamStore Provider<Map<Rollup, ReadOnlyWindowStore<Bytes, Monitoring.MonitoringMessage>>> provider3, OrderedKeyPrefixedSerdeSupplier<Keys.KeyType, Monitoring.MonitoringMessage> orderedKeyPrefixedSerdeSupplier, ControlCenterRbacConfig controlCenterRbacConfig) {
        this.groupDb = provider;
        this.expectedDb = provider2;
        this.actualDb = provider3;
        this.keySerdeSupplier = orderedKeyPrefixedSerdeSupplier;
        this.rbacConfig = controlCenterRbacConfig;
    }

    protected void checkParams(int i, Rollup rollup, Long l, Long l2) throws BadRequestException {
        if (i > 6000) {
            throw new BadRequestException("limit must be < 6000");
        }
        if (l == null) {
            throw new BadRequestException("missing startTimeMs");
        }
        if (l2 == null) {
            throw new BadRequestException("missing stopTimeMs");
        }
        if (l.longValue() > l2.longValue()) {
            throw new BadRequestException("startTimeMs cannot be later than stopTimeMs");
        }
        if (rollup == null) {
            throw new BadRequestException("missing rollup");
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private Controlcenter.WindowedGrouping getAggregateGroupInfo(RangeRequest rangeRequest, int i, Keys.KeyType keyType) {
        WindowStoreIterator<Controlcenter.WindowedGrouping> fetch = this.groupDb.get().get(rangeRequest.rollup).fetch((ReadOnlyWindowStore<Bytes, Controlcenter.WindowedGrouping>) this.keySerdeSupplier.get(keyType).key(rangeRequest.createStartKeySpec()), rangeRequest.getRollupAlignedStartTimeMs(), rangeRequest.getRollupAlignedStopTimeMs());
        Throwable th = null;
        try {
            HashSet newHashSet = Sets.newHashSet();
            for (int i2 = 0; fetch.hasNext() && i2 < i; i2++) {
                newHashSet.addAll(((Controlcenter.WindowedGrouping) ((KeyValue) fetch.next()).value).getMembersList());
            }
            Controlcenter.WindowedGrouping build = Controlcenter.WindowedGrouping.newBuilder().addAllMembers(newHashSet).setWindow(0L).build();
            if (fetch != null) {
                if (0 != 0) {
                    try {
                        fetch.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    fetch.close();
                }
            }
            return build;
        } catch (Throwable th3) {
            if (fetch != null) {
                if (0 != 0) {
                    try {
                        fetch.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    fetch.close();
                }
            }
            throw th3;
        }
    }

    private Set<String> getMetricsData(RangeRequest rangeRequest, int i, Keys.KeyType keyType, Function<Controlcenter.MemberInfo, String> function) {
        RangeRequest rangeRequest2 = new RangeRequest(rangeRequest);
        rangeRequest2.clientType = Monitoring.ClientType.CONSUMER;
        if (rangeRequest2.rollup == Rollup.ONE_MINUTE) {
            rangeRequest2.rollup = Rollup.THREE_HOURS;
        }
        FluentIterable from = FluentIterable.from(getAggregateGroupInfo(rangeRequest2, i, keyType).getMembersList());
        function.getClass();
        return from.transform((v1) -> {
            return r1.apply(v1);
        }).toSet();
    }

    private Set<String> getMetricsConsumerGroups(RangeRequest rangeRequest, int i) {
        return getMetricsData(rangeRequest, i, Keys.KeyType.INFO_GROUPS_IN_CLIENTTYPE, (v0) -> {
            return v0.getGroup();
        });
    }

    private Set<String> getMetricsTopics(RangeRequest rangeRequest, int i) {
        return getMetricsData(rangeRequest, i, Keys.KeyType.INFO_TOPICS_IN_CLIENTTYPE, (v0) -> {
            return v0.getTopic();
        });
    }

    private Set<String> getConsumerGroups(String str) throws InterruptedException, ExecutionException, TimeoutException {
        return ImmutableSet.copyOf((Collection) this.scopedKafkaMetadataDao.getConsumerGroups(str));
    }

    private Set<String> getTopics(String str) throws InterruptedException, ExecutionException, TimeoutException {
        return ImmutableSet.copyOf((Collection) this.scopedKafkaMetadataDao.getTopicNamesFromMetadataOrCache(str));
    }

    private void verifyAccessToAllConsumerGroupsInTopic(RangeRequest rangeRequest, int i) throws InterruptedException, ExecutionException, TimeoutException {
        Preconditions.checkArgument(rangeRequest.topic != null);
        if (!getConsumerGroups(rangeRequest.clusterId).containsAll(getMetricsConsumerGroups(rangeRequest, i))) {
            throw new ForbiddenException("cannot show topic overview data unless user has access to all intercepted consumer groups");
        }
    }

    private void verifyAccessToAllTopicsAndConsumerGroups(RangeRequest rangeRequest, int i) throws InterruptedException, ExecutionException, TimeoutException {
        Preconditions.checkArgument(rangeRequest.topic == null && rangeRequest.group == null);
        Set<String> metricsTopics = getMetricsTopics(rangeRequest, i);
        if (!getTopics(rangeRequest.clusterId).containsAll(metricsTopics)) {
            throw new ForbiddenException("cannot show total overview data unless user has access to all intercepted topics");
        }
        for (String str : metricsTopics) {
            RangeRequest rangeRequest2 = new RangeRequest(rangeRequest);
            rangeRequest2.topic = str;
            verifyAccessToAllConsumerGroupsInTopic(rangeRequest2, i);
        }
    }

    private void verifyRequestAccess(RangeRequest rangeRequest, int i) throws InterruptedException, ExecutionException, TimeoutException {
        if (this.rbacConfig.isRbacEnabled()) {
            if (rangeRequest.group == null) {
                if (rangeRequest.topic == null) {
                    verifyAccessToAllTopicsAndConsumerGroups(rangeRequest, i);
                } else {
                    verifyAccessToAllConsumerGroupsInTopic(rangeRequest, i);
                }
            } else if (!getConsumerGroups(rangeRequest.clusterId).contains(rangeRequest.group)) {
                throw new ForbiddenException("no access to this consumer group");
            }
            if (rangeRequest.topic != null && !getTopics(rangeRequest.clusterId).contains(rangeRequest.topic)) {
                throw new ForbiddenException("no access to this topic");
            }
        }
    }

    protected Map<Metric, ArrayList<Object>> getConsumedMessages(RangeRequest rangeRequest, int i, long j, Keys.KeyType keyType) throws InterruptedException, ExecutionException, TimeoutException {
        rangeRequest.clientType = Monitoring.ClientType.CONSUMER;
        verifyRequestAccess(rangeRequest, i);
        WindowStoreIterator<Monitoring.MonitoringMessage> fetch = this.actualDb.get().get(rangeRequest.rollup).fetch((ReadOnlyWindowStore<Bytes, Monitoring.MonitoringMessage>) this.keySerdeSupplier.get(keyType).key(rangeRequest.createStartKeySpec()), rangeRequest.getRollupAlignedStartTimeMs(), rangeRequest.getRollupAlignedStopTimeMs());
        Throwable th = null;
        try {
            try {
                ArrayList newArrayList = Lists.newArrayList();
                while (fetch.hasNext() && newArrayList.size() < i) {
                    newArrayList.add(((KeyValue) fetch.next()).value);
                }
                HashMap newHashMap = Maps.newHashMap();
                for (Metric metric : Metric.values()) {
                    newHashMap.put(metric, Util.extractValues(newArrayList, j, rangeRequest.getRollupAlignedStartTimeMs(), rangeRequest.getRollupAlignedStopTimeMs(), i, metric));
                }
                if (fetch != null) {
                    if (0 != 0) {
                        try {
                            fetch.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        fetch.close();
                    }
                }
                return newHashMap;
            } finally {
            }
        } catch (Throwable th3) {
            if (fetch != null) {
                if (th != null) {
                    try {
                        fetch.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    fetch.close();
                }
            }
            throw th3;
        }
    }

    protected Map<Metric, ArrayList<Object>> getProducedMessages(RangeRequest rangeRequest, int i, long j, Keys.KeyType keyType) throws InterruptedException, ExecutionException, TimeoutException {
        rangeRequest.clientType = Monitoring.ClientType.PRODUCER;
        verifyRequestAccess(rangeRequest, i);
        WindowStoreIterator<Monitoring.MonitoringMessage> fetch = this.expectedDb.get().get(rangeRequest.rollup).fetch((ReadOnlyWindowStore<Bytes, Monitoring.MonitoringMessage>) this.keySerdeSupplier.get(keyType).key(rangeRequest.createStartKeySpec()), rangeRequest.getRollupAlignedStartTimeMs(), rangeRequest.getRollupAlignedStopTimeMs());
        Throwable th = null;
        try {
            try {
                ArrayList newArrayList = Lists.newArrayList();
                while (fetch.hasNext() && newArrayList.size() < i) {
                    newArrayList.add(((KeyValue) fetch.next()).value);
                }
                HashMap newHashMap = Maps.newHashMap();
                for (Metric metric : new Metric[]{Metric.count, Metric.error, Metric.crc}) {
                    newHashMap.put(metric, Util.extractValues(newArrayList, j, rangeRequest.getRollupAlignedStartTimeMs(), rangeRequest.getRollupAlignedStopTimeMs(), i, metric));
                }
                if (fetch != null) {
                    if (0 != 0) {
                        try {
                            fetch.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        fetch.close();
                    }
                }
                return newHashMap;
            } finally {
            }
        } catch (Throwable th3) {
            if (fetch != null) {
                if (th != null) {
                    try {
                        fetch.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    fetch.close();
                }
            }
            throw th3;
        }
    }

    protected DeliveryResponse addOverviewData(DeliveryResponse deliveryResponse, RangeRequest rangeRequest, DeliveryResponse.DeliveryType deliveryType, int i) throws InterruptedException, ExecutionException, TimeoutException {
        deliveryResponse.produced = getProducedMessages(rangeRequest, i, rangeRequest.rollup.getMillis(), deliveryType.baseProductionKeyType);
        deliveryResponse.consumed = getConsumedMessages(rangeRequest, i, rangeRequest.rollup.getMillis(), deliveryType.baseConsumptionKeyType);
        if (deliveryType != DeliveryResponse.DeliveryType.CONSUMER) {
            deliveryResponse.consumed.remove(Metric.group);
        }
        return deliveryResponse;
    }

    protected void addMembers(DeliveryResponse deliveryResponse, RangeRequest rangeRequest, DeliveryResponse.DeliveryType deliveryType, int i, String str, String str2, Long l, RequestType requestType, AggregationType aggregationType) throws InterruptedException, ExecutionException, TimeoutException {
        Set<String> consumerGroups = this.rbacConfig.isRbacEnabled() ? getConsumerGroups(rangeRequest.clusterId) : null;
        if (consumerGroups != null && deliveryResponse.group != null && !consumerGroups.contains(deliveryResponse.group)) {
            throw new ForbiddenException("no access to this consumer group");
        }
        if (requestType == RequestType.MEMBER_LIST && rangeRequest.rollup == Rollup.ONE_MINUTE) {
            rangeRequest.rollup = Rollup.THREE_HOURS;
        }
        Controlcenter.WindowedGrouping aggregateGroupInfo = getAggregateGroupInfo(rangeRequest, i, deliveryType.subGroupingKeyType);
        deliveryResponse.members = Lists.newArrayList();
        deliveryResponse.sources = Lists.newArrayList();
        deliveryResponse.destinations = Lists.newArrayList();
        if (deliveryType.subGroupingKeyType != null) {
            for (Controlcenter.MemberInfo memberInfo : aggregateGroupInfo.getMembersList()) {
                rangeRequest = Util.createMergedRangeRequest(memberInfo, rangeRequest, str, str2);
                DeliveryResponse.MemberResponse memberResponse = new DeliveryResponse.MemberResponse(memberInfo);
                if (requestType == RequestType.ALL) {
                    memberResponse.consumed = getConsumedMessages(rangeRequest, i, l.longValue(), deliveryType.subConsumptionKeyType);
                    memberResponse.consumed.remove(Metric.group);
                    memberResponse.produced = getProducedMessages(rangeRequest, i, l.longValue(), deliveryType.subProductionKeyType);
                }
                if (consumerGroups == null || (deliveryResponse.group == null && consumerGroups.contains(memberInfo.getGroup()))) {
                    deliveryResponse.members.add(memberResponse);
                    if (aggregationType == AggregationType.Topic) {
                        deliveryResponse.sources.add(new DeliveryResponse.MemberResponse(memberInfo));
                    } else {
                        deliveryResponse.destinations.add(new DeliveryResponse.MemberResponse(memberInfo));
                    }
                }
            }
        }
        if (deliveryType.secondarySubGroupingKeyType != null) {
            rangeRequest.clientType = Monitoring.ClientType.CONSUMER;
            for (Controlcenter.MemberInfo memberInfo2 : getAggregateGroupInfo(rangeRequest, i, deliveryType.secondarySubGroupingKeyType).getMembersList()) {
                if (consumerGroups == null || (deliveryResponse.group == null && consumerGroups.contains(memberInfo2.getGroup()))) {
                    if (aggregationType == AggregationType.Topic) {
                        deliveryResponse.destinations.add(new DeliveryResponse.MemberResponse(memberInfo2));
                    } else {
                        deliveryResponse.sources.add(new DeliveryResponse.MemberResponse(memberInfo2));
                    }
                }
            }
        }
    }

    protected DeliveryResponse constructResponse(int i, String str, String str2, String str3, Rollup rollup, Long l, Long l2, RequestType requestType, String str4, String str5, Integer num, String str6, AggregationType aggregationType) throws InterruptedException, ExecutionException, TimeoutException {
        Controlcenter.TopicPartition build;
        checkParams(i, rollup, l, l2);
        RangeRequest rangeRequest = new RangeRequest(str, l.longValue(), l2.longValue());
        rangeRequest.group = str2;
        rangeRequest.rollup = rollup;
        rangeRequest.clientId = str3;
        rangeRequest.clientType = Monitoring.ClientType.CONSUMER;
        long millis = rollup.getMillis();
        DeliveryResponse.DeliveryType deliveryType = aggregationType == AggregationType.Topic ? DeliveryResponse.DeliveryType.CONSUMER_GROUPS_TOPICS : DeliveryResponse.DeliveryType.CONSUMER_GROUPS;
        if (str2 != null) {
            deliveryType = aggregationType == AggregationType.Topic ? DeliveryResponse.DeliveryType.CONSUMER_GROUP_TOPICS : DeliveryResponse.DeliveryType.CONSUMER_GROUP;
        }
        if (rangeRequest.clientId != null) {
            deliveryType = DeliveryResponse.DeliveryType.CONSUMER;
        }
        DeliveryResponse deliveryResponse = new DeliveryResponse(rangeRequest.rollup, rangeRequest.getRollupAlignedStartTimeMs(), rangeRequest.rollup.getMillis(), rangeRequest.group, rangeRequest.clientId, deliveryType);
        if (requestType == RequestType.ALL || requestType == RequestType.OVERVIEW) {
            addOverviewData(deliveryResponse, rangeRequest, deliveryType, i);
        }
        if (requestType == RequestType.OVERVIEW) {
            return deliveryResponse;
        }
        if (requestType == RequestType.ALL || requestType == RequestType.MEMBER_LIST) {
            try {
                addMembers(deliveryResponse, rangeRequest, deliveryType, i, str3, str2, Long.valueOf(millis), requestType, aggregationType);
                return deliveryResponse;
            } catch (IllegalArgumentException e) {
                log.error("request=range", (Throwable) e);
                throw new BadRequestException(e);
            }
        }
        String str7 = deliveryType == DeliveryResponse.DeliveryType.CONSUMER_GROUPS_TOPICS ? str5 : null;
        if (deliveryType == DeliveryResponse.DeliveryType.CONSUMER_GROUPS_TOPICS) {
            build = Controlcenter.TopicPartition.getDefaultInstance();
        } else {
            build = Controlcenter.TopicPartition.newBuilder().setTopic(Strings.nullToEmpty(str5)).setPartition(num == null ? 0 : num.intValue()).build();
        }
        Controlcenter.MemberInfo build2 = Controlcenter.MemberInfo.newBuilder().setClientId(Strings.nullToEmpty(str6)).setGroup(Strings.nullToEmpty(str4)).setTopic(Strings.nullToEmpty(str7)).setTopicPartition(build).build();
        RangeRequest createMergedRangeRequest = Util.createMergedRangeRequest(build2, rangeRequest, str3, str2);
        DeliveryResponse.MemberResponse memberResponse = new DeliveryResponse.MemberResponse(build2);
        memberResponse.consumed = getConsumedMessages(createMergedRangeRequest, i, millis, deliveryType.subConsumptionKeyType);
        memberResponse.consumed.remove(Metric.group);
        memberResponse.produced = getProducedMessages(createMergedRangeRequest, i, millis, deliveryType.subProductionKeyType);
        deliveryResponse.members = Lists.newArrayList();
        deliveryResponse.members.add(memberResponse);
        return deliveryResponse;
    }

    @GET
    @Path("/consumer")
    public DeliveryResponse consumer(@PathParam("clusterId") VisibleCluster visibleCluster, @QueryParam("limit") @DefaultValue("6000") int i, @QueryParam("group") String str, @QueryParam("clientId") String str2, @QueryParam("rollup") Rollup rollup, @QueryParam("startTimeMs") Long l, @QueryParam("stopTimeMs") Long l2, @QueryParam("type") @DefaultValue("ALL") RequestType requestType, @QueryParam("memberTopic") String str3, @QueryParam("memberPartition") Integer num) throws InterruptedException, ExecutionException, TimeoutException {
        if (str == null) {
            throw new BadRequestException("missing group");
        }
        if (str2 == null) {
            throw new BadRequestException("missing clientId");
        }
        return constructResponse(i, validateClusterId(visibleCluster.getClusterId()), str, str2, rollup, l, l2, requestType, null, str3, num, null, AggregationType.ConsumerGroup);
    }

    @GET
    @Path("/consumer_group")
    public DeliveryResponse consumerGroup(@PathParam("clusterId") VisibleCluster visibleCluster, @QueryParam("limit") @DefaultValue("6000") int i, @QueryParam("group") String str, @QueryParam("rollup") Rollup rollup, @QueryParam("startTimeMs") Long l, @QueryParam("stopTimeMs") Long l2, @QueryParam("type") @DefaultValue("ALL") RequestType requestType, @QueryParam("memberClientId") String str2) throws InterruptedException, ExecutionException, TimeoutException {
        if (str == null) {
            throw new BadRequestException("missing group");
        }
        return constructResponse(i, validateClusterId(visibleCluster.getClusterId()), str, null, rollup, l, l2, requestType, null, null, null, str2, AggregationType.ConsumerGroup);
    }

    @GET
    @Path("/consumer_groups")
    public DeliveryResponse consumerGroups(@PathParam("clusterId") VisibleCluster visibleCluster, @QueryParam("limit") @DefaultValue("6000") int i, @QueryParam("group") String str, @QueryParam("clientId") String str2, @QueryParam("rollup") Rollup rollup, @QueryParam("startTimeMs") Long l, @QueryParam("stopTimeMs") Long l2, @QueryParam("type") @DefaultValue("ALL") RequestType requestType, @QueryParam("memberGroup") String str3) throws InterruptedException, ExecutionException, TimeoutException {
        return constructResponse(i, validateClusterId(visibleCluster.getClusterId()), str, str2, rollup, l, l2, requestType, str3, null, null, null, AggregationType.ConsumerGroup);
    }

    @GET
    @Path("/consumer_group_topics")
    public DeliveryResponse consumerGroupTopics(@PathParam("clusterId") VisibleCluster visibleCluster, @QueryParam("limit") @DefaultValue("6000") int i, @QueryParam("group") String str, @QueryParam("rollup") Rollup rollup, @QueryParam("startTimeMs") Long l, @QueryParam("stopTimeMs") Long l2, @QueryParam("type") @DefaultValue("ALL") RequestType requestType, @QueryParam("memberTopic") String str2, @QueryParam("memberPartition") Integer num) throws InterruptedException, ExecutionException, TimeoutException {
        return constructResponse(i, validateClusterId(visibleCluster.getClusterId()), str, null, rollup, l, l2, requestType, null, str2, num, null, AggregationType.Topic);
    }

    @GET
    @Path("/consumer_groups_topics")
    public DeliveryResponse consumerGroupsTopics(@PathParam("clusterId") VisibleCluster visibleCluster, @QueryParam("limit") @DefaultValue("6000") int i, @QueryParam("group") String str, @QueryParam("rollup") Rollup rollup, @QueryParam("startTimeMs") Long l, @QueryParam("stopTimeMs") Long l2, @QueryParam("type") @DefaultValue("ALL") RequestType requestType, @QueryParam("memberTopic") String str2) throws InterruptedException, ExecutionException, TimeoutException {
        return constructResponse(i, validateClusterId(visibleCluster.getClusterId()), str, null, rollup, l, l2, requestType, null, str2, null, null, AggregationType.Topic);
    }
}
