package org.apache.kafka.streams.state.internals;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.state.QueryableStoreType;

/* loaded from: input_file:org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.class */
public class StreamThreadStateStoreProvider implements StateStoreProvider {
    private final StreamThread streamThread;

    public StreamThreadStateStoreProvider(StreamThread streamThread) {
        this.streamThread = streamThread;
    }

    @Override // org.apache.kafka.streams.state.internals.StateStoreProvider
    public <T> List<T> stores(String str, QueryableStoreType<T> queryableStoreType) {
        if (this.streamThread.state() == StreamThread.State.DEAD) {
            return Collections.emptyList();
        }
        if (!this.streamThread.isRunningAndNotRebalancing()) {
            throw new InvalidStateStoreException("Cannot get state store " + str + " because the stream thread is " + this.streamThread.state() + ", not RUNNING");
        }
        ArrayList arrayList = new ArrayList();
        for (StreamTask streamTask : this.streamThread.tasks().values()) {
            StateStore store = streamTask.getStore(str);
            if (store != null && queryableStoreType.accepts(store)) {
                if (!store.isOpen()) {
                    throw new InvalidStateStoreException("Cannot get state store " + str + " for task " + streamTask + " because the store is not open. The state store may have migrated to another instances.");
                }
                arrayList.add(store);
            }
        }
        return arrayList;
    }
}
