package org.apache.kafka.connect.transforms;

import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.kafka.common.cache.Cache;
import org.apache.kafka.common.cache.LRUCache;
import org.apache.kafka.common.cache.SynchronizedCache;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.ConnectSchema;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.transforms.util.Requirements;
import org.apache.kafka.connect.transforms.util.SchemaUtil;
import org.apache.kafka.connect.transforms.util.SimpleConfig;

/* loaded from: input_file:org/apache/kafka/connect/transforms/Flatten.class */
public abstract class Flatten<R extends ConnectRecord<R>> implements Transformation<R> {
    private static final String DELIMITER_DEFAULT = ".";
    private static final String PURPOSE = "flattening";
    private String delimiter;
    private Cache<Schema, Schema> schemaUpdateCache;
    public static final String OVERVIEW_DOC = "Flatten a nested data structure, generating names for each field by concatenating the field names at each level with a configurable delimiter character. Applies to Struct when schema present, or a Map in the case of schemaless data. The default delimiter is '.'.<p/>Use the concrete transformation type designed for the record key (<code>" + Key.class.getName() + "</code>) or value (<code>" + Value.class.getName() + "</code>).";
    private static final String DELIMITER_CONFIG = "delimiter";
    public static final ConfigDef CONFIG_DEF = new ConfigDef().define(DELIMITER_CONFIG, ConfigDef.Type.STRING, ".", ConfigDef.Importance.MEDIUM, "Delimiter to insert between field names from the input record when generating field names for the output record");

    /* loaded from: input_file:org/apache/kafka/connect/transforms/Flatten$Key.class */
    public static class Key<R extends ConnectRecord<R>> extends Flatten<R> {
        @Override // org.apache.kafka.connect.transforms.Flatten
        protected Schema operatingSchema(R r) {
            return r.keySchema();
        }

        @Override // org.apache.kafka.connect.transforms.Flatten
        protected Object operatingValue(R r) {
            return r.key();
        }

        @Override // org.apache.kafka.connect.transforms.Flatten
        protected R newRecord(R r, Schema schema, Object obj) {
            return (R) r.newRecord(r.topic(), r.kafkaPartition(), schema, obj, r.valueSchema(), r.value(), r.timestamp());
        }
    }

    /* loaded from: input_file:org/apache/kafka/connect/transforms/Flatten$Value.class */
    public static class Value<R extends ConnectRecord<R>> extends Flatten<R> {
        @Override // org.apache.kafka.connect.transforms.Flatten
        protected Schema operatingSchema(R r) {
            return r.valueSchema();
        }

        @Override // org.apache.kafka.connect.transforms.Flatten
        protected Object operatingValue(R r) {
            return r.value();
        }

        @Override // org.apache.kafka.connect.transforms.Flatten
        protected R newRecord(R r, Schema schema, Object obj) {
            return (R) r.newRecord(r.topic(), r.kafkaPartition(), r.keySchema(), r.key(), schema, obj, r.timestamp());
        }
    }

    @Override // org.apache.kafka.common.Configurable
    public void configure(Map<String, ?> map) {
        this.delimiter = new SimpleConfig(CONFIG_DEF, map).getString(DELIMITER_CONFIG);
        this.schemaUpdateCache = new SynchronizedCache(new LRUCache(16));
    }

    @Override // org.apache.kafka.connect.transforms.Transformation
    public R apply(R r) {
        return operatingSchema(r) == null ? applySchemaless(r) : applyWithSchema(r);
    }

    @Override // org.apache.kafka.connect.transforms.Transformation, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
    }

    @Override // org.apache.kafka.connect.transforms.Transformation
    public ConfigDef config() {
        return CONFIG_DEF;
    }

    protected abstract Schema operatingSchema(R r);

    protected abstract Object operatingValue(R r);

    protected abstract R newRecord(R r, Schema schema, Object obj);

    private R applySchemaless(R r) {
        Map<String, Object> requireMap = Requirements.requireMap(operatingValue(r), PURPOSE);
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        applySchemaless(requireMap, "", linkedHashMap);
        return newRecord(r, null, linkedHashMap);
    }

    private void applySchemaless(Map<String, Object> map, String str, Map<String, Object> map2) {
        for (Map.Entry<String, Object> entry : map.entrySet()) {
            String fieldName = fieldName(str, entry.getKey());
            Object value = entry.getValue();
            if (value == null) {
                map2.put(fieldName(str, entry.getKey()), null);
                return;
            }
            Schema.Type schemaType = ConnectSchema.schemaType(value.getClass());
            if (schemaType == null) {
                throw new DataException("Flatten transformation was passed a value of type " + value.getClass() + " which is not supported by Connect's data API");
            }
            switch (schemaType) {
                case INT8:
                case INT16:
                case INT32:
                case INT64:
                case FLOAT32:
                case FLOAT64:
                case BOOLEAN:
                case STRING:
                case BYTES:
                    map2.put(fieldName(str, entry.getKey()), entry.getValue());
                    break;
                case MAP:
                    applySchemaless(Requirements.requireMap(entry.getValue(), PURPOSE), fieldName, map2);
                    break;
                default:
                    throw new DataException("Flatten transformation does not support " + entry.getValue().getClass() + " for record without schemas (for field " + fieldName + ").");
            }
        }
    }

    private R applyWithSchema(R r) {
        Struct requireStructOrNull = Requirements.requireStructOrNull(operatingValue(r), PURPOSE);
        Schema operatingSchema = operatingSchema(r);
        Schema schema = this.schemaUpdateCache.get(operatingSchema);
        if (schema == null) {
            SchemaBuilder copySchemaBasics = SchemaUtil.copySchemaBasics(operatingSchema, SchemaBuilder.struct());
            buildUpdatedSchema(operatingSchema, "", copySchemaBasics, operatingSchema.isOptional(), (Struct) operatingSchema.defaultValue());
            schema = copySchemaBasics.build();
            this.schemaUpdateCache.put(operatingSchema, schema);
        }
        if (requireStructOrNull == null) {
            return newRecord(r, schema, null);
        }
        Struct struct = new Struct(schema);
        buildWithSchema(requireStructOrNull, "", struct);
        return newRecord(r, schema, struct);
    }

    private void buildUpdatedSchema(Schema schema, String str, SchemaBuilder schemaBuilder, boolean z, Struct struct) {
        for (Field field : schema.fields()) {
            String fieldName = fieldName(str, field.name());
            boolean z2 = z || field.schema().isOptional();
            Object obj = null;
            if (field.schema().defaultValue() != null) {
                obj = field.schema().defaultValue();
            } else if (struct != null) {
                obj = struct.get(field);
            }
            switch (field.schema().type()) {
                case INT8:
                case INT16:
                case INT32:
                case INT64:
                case FLOAT32:
                case FLOAT64:
                case BOOLEAN:
                case STRING:
                case BYTES:
                    schemaBuilder.field(fieldName, convertFieldSchema(field.schema(), z2, obj));
                    break;
                case MAP:
                default:
                    throw new DataException("Flatten transformation does not support " + field.schema().type() + " for record without schemas (for field " + fieldName + ").");
                case STRUCT:
                    buildUpdatedSchema(field.schema(), fieldName, schemaBuilder, z2, (Struct) obj);
                    break;
            }
        }
    }

    private Schema convertFieldSchema(Schema schema, boolean z, Object obj) {
        SchemaBuilder copySchemaBasics = SchemaUtil.copySchemaBasics(schema);
        if (z) {
            copySchemaBasics.optional();
        }
        if (obj != null) {
            copySchemaBasics.defaultValue(obj);
        }
        return copySchemaBasics.build();
    }

    private void buildWithSchema(Struct struct, String str, Struct struct2) {
        if (struct == null) {
            return;
        }
        for (Field field : struct.schema().fields()) {
            String fieldName = fieldName(str, field.name());
            switch (field.schema().type()) {
                case INT8:
                case INT16:
                case INT32:
                case INT64:
                case FLOAT32:
                case FLOAT64:
                case BOOLEAN:
                case STRING:
                case BYTES:
                    struct2.put(fieldName, struct.get(field));
                    break;
                case MAP:
                default:
                    throw new DataException("Flatten transformation does not support " + field.schema().type() + " for record without schemas (for field " + fieldName + ").");
                case STRUCT:
                    buildWithSchema(struct.getStruct(field.name()), fieldName, struct2);
                    break;
            }
        }
    }

    private String fieldName(String str, String str2) {
        return str.isEmpty() ? str2 : str + this.delimiter + str2;
    }
}
