/*
 * Decompiled with CFR 0.152.
 */
package com.centit.dde.bizopt;

import com.alibaba.fastjson2.JSONObject;
import com.centit.dde.bizopt.BuiltInOperation;
import com.centit.dde.core.BizModel;
import com.centit.dde.core.BizOperation;
import com.centit.dde.core.DataOptContext;
import com.centit.dde.core.DataSet;
import com.centit.dde.producer.KafkaProducerConfig;
import com.centit.dde.utils.BizModelJSONTransform;
import com.centit.dde.utils.DataSetOptUtil;
import com.centit.framework.common.ResponseData;
import com.centit.product.metadata.dao.SourceInfoDao;
import com.centit.product.metadata.po.SourceInfo;
import com.centit.support.algorithm.BooleanBaseOpt;
import com.centit.support.algorithm.NumberBaseOpt;
import com.centit.support.algorithm.StringBaseOpt;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class ProducerBizOperation
implements BizOperation {
    private SourceInfoDao sourceInfoDao;

    public ProducerBizOperation(SourceInfoDao sourceInfoDao) {
        this.sourceInfoDao = sourceInfoDao;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public ResponseData runOpt(BizModel bizModel, JSONObject bizOptJson, DataOptContext dataOptContext) throws ExecutionException, InterruptedException {
        String databaseId = bizOptJson.getString("databaseId");
        String topic = bizOptJson.getString("topic");
        if (StringUtils.isBlank((CharSequence)topic) || StringUtils.isBlank((CharSequence)databaseId)) {
            return ResponseData.makeErrorMessage((String)"Kafka\u670d\u52a1\u5730\u5740\u6216topic\u4e0d\u80fd\u4e3a\u7a7a\uff01");
        }
        String source = bizOptJson.getString("source");
        if (StringUtils.isBlank((CharSequence)source)) {
            return ResponseData.makeErrorMessage((String)"\u63a8\u9001\u6d88\u606f\u4e0d\u80fd\u4e3a\u7a7a\uff01");
        }
        SourceInfo sourceInfo = this.sourceInfoDao.getDatabaseInfoById(databaseId);
        if (sourceInfo == null) {
            return ResponseData.makeErrorMessage((String)"Kafka\u670d\u52a1\u8d44\u6e90\u4e0d\u5b58\u5728\u6216\u5df2\u88ab\u5220\u9664\uff01");
        }
        KafkaProducer producer = KafkaProducerConfig.getKafkaProducer(sourceInfo);
        Integer partition = NumberBaseOpt.castObjectToInteger((Object)bizOptJson.getInteger("partition"));
        String key = bizOptJson.getString("key");
        key = StringBaseOpt.castObjectToString((Object)DataSetOptUtil.fetchFieldValue((Object)new BizModelJSONTransform(bizModel), (String)bizOptJson.getString("key")), (String)key);
        Boolean isAsyn = BooleanBaseOpt.castObjectToBoolean((Object)bizOptJson.getBoolean("isAsyn"), (Boolean)false);
        DataSet dataSet = bizModel.getDataSet(source);
        ProducerRecord record = new ProducerRecord(topic, partition, (Object)key, (Object)StringBaseOpt.castObjectToString((Object)dataSet.getData()));
        AtomicReference<String> resut = new AtomicReference<String>("");
        String id = bizOptJson.getString("id");
        try {
            if (!isAsyn.booleanValue()) {
                RecordMetadata res = (RecordMetadata)producer.send(record).get();
                if (res != null) {
                    resut.set("\u6d88\u606f\u53d1\u9001\u6210\u529f\uff0ctopic=" + res.topic() + "\uff0c\u5206\u533a=" + res.partition() + "\uff0coffset=" + res.offset());
                }
                bizModel.putDataSet(id, new DataSet(resut));
            } else {
                producer.send(record, (metadata, exception) -> {
                    if (metadata != null) {
                        resut.set("\u6d88\u606f\u53d1\u9001\u6210\u529f\uff0ctopic=" + metadata.topic() + "\uff0c\u5206\u533a=" + metadata.partition() + "\uff0coffset=" + metadata.offset());
                    } else if (exception != null) {
                        resut.set("\u6d88\u606f\u53d1\u9001\u5931\u8d25\uff0c\u5f02\u5e38\u4fe1\u606f\uff1a" + exception.getMessage());
                    }
                    bizModel.putDataSet(id, new DataSet((Object)resut));
                });
            }
        }
        finally {
            if (producer != null) {
                producer.close();
            }
        }
        return BuiltInOperation.createResponseSuccessData((int)bizModel.getDataSet(id).getSize());
    }
}

