/*
 * Decompiled with CFR 0.152.
 */
package com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.transforms;

import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.cache.Cache;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.cache.LRUCache;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.cache.SynchronizedCache;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.config.ConfigDef;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.connector.ConnectRecord;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Field;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Schema;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.DataException;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.transforms.Transformation;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.transforms.util.NonEmptyListValidator;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.transforms.util.Requirements;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.transforms.util.SimpleConfig;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class ValueToKey<R extends ConnectRecord<R>>
implements Transformation<R> {
    public static final String OVERVIEW_DOC = "Replace the record key with a new key formed from a subset of fields in the record value.";
    public static final String FIELDS_CONFIG = "fields";
    public static final ConfigDef CONFIG_DEF = new ConfigDef().define("fields", ConfigDef.Type.LIST, ConfigDef.NO_DEFAULT_VALUE, new NonEmptyListValidator(), ConfigDef.Importance.HIGH, "Field names on the record value to extract as the record key.");
    private static final String PURPOSE = "copying fields from value to key";
    private List<String> fields;
    private Cache<Schema, Schema> valueToKeySchemaCache;

    @Override
    public void configure(Map<String, ?> configs) {
        SimpleConfig config = new SimpleConfig(CONFIG_DEF, configs);
        this.fields = config.getList(FIELDS_CONFIG);
        this.valueToKeySchemaCache = new SynchronizedCache<Schema, Schema>(new LRUCache(16));
    }

    @Override
    public R apply(R record) {
        if (((ConnectRecord)record).valueSchema() == null) {
            return this.applySchemaless(record);
        }
        return this.applyWithSchema(record);
    }

    private R applySchemaless(R record) {
        Map<String, Object> value = Requirements.requireMap(((ConnectRecord)record).value(), PURPOSE);
        HashMap<String, Object> key = new HashMap<String, Object>(this.fields.size());
        for (String field : this.fields) {
            key.put(field, value.get(field));
        }
        return ((ConnectRecord)record).newRecord(((ConnectRecord)record).topic(), ((ConnectRecord)record).kafkaPartition(), null, key, ((ConnectRecord)record).valueSchema(), ((ConnectRecord)record).value(), ((ConnectRecord)record).timestamp());
    }

    private R applyWithSchema(R record) {
        Struct value = Requirements.requireStruct(((ConnectRecord)record).value(), PURPOSE);
        Schema keySchema = this.valueToKeySchemaCache.get(value.schema());
        if (keySchema == null) {
            SchemaBuilder keySchemaBuilder = SchemaBuilder.struct();
            for (String field : this.fields) {
                Field fieldFromValue = value.schema().field(field);
                if (fieldFromValue == null) {
                    throw new DataException("Field does not exist: " + field);
                }
                keySchemaBuilder.field(field, fieldFromValue.schema());
            }
            keySchema = keySchemaBuilder.build();
            this.valueToKeySchemaCache.put(value.schema(), keySchema);
        }
        Struct key = new Struct(keySchema);
        for (String field : this.fields) {
            key.put(field, value.get(field));
        }
        return ((ConnectRecord)record).newRecord(((ConnectRecord)record).topic(), ((ConnectRecord)record).kafkaPartition(), keySchema, key, value.schema(), value, ((ConnectRecord)record).timestamp());
    }

    @Override
    public ConfigDef config() {
        return CONFIG_DEF;
    }

    @Override
    public void close() {
        this.valueToKeySchemaCache = null;
    }
}

