package com.centit.dde.controller;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.centit.framework.common.ResponseData;
import com.centit.framework.common.ResponseSingleData;
import com.centit.framework.core.dao.PageQueryResult;
import com.centit.product.adapter.po.SourceInfo;
import com.centit.product.metadata.dao.SourceInfoDao;
import com.centit.support.security.AESSecurityUtils;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import java.io.IOException;
import java.util.Properties;
import javax.annotation.Resource;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.common.config.SaslConfigs;
import org.springframework.core.io.ClassPathResource;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;

@Api(value = "kafka参数管理接口", tags = {"kafka参数管理接口"})
@RequestMapping({"/kafka"})
@Controller
@ResponseBody
/* loaded from: input_file:WEB-INF/lib/centit-dde-kafka-plugin-5.2-SNAPSHOT.jar:com/centit/dde/controller/KafkaParameterController.class */
public class KafkaParameterController {

    @Resource
    private SourceInfoDao sourceInfoDao;

    @GetMapping({"/consumer"})
    @ApiOperation("消费者参数配置")
    public JSONObject consumerParame() throws IOException {
        return (JSONObject) JSON.parseObject(String.join("\n", IOUtils.readLines(new ClassPathResource("kafka/consumerParameter.json").getInputStream(), "UTF-8")), JSONObject.class);
    }

    @GetMapping({"/producer"})
    @ApiOperation("生产者参数配置")
    public JSONObject producerParame() throws IOException {
        return (JSONObject) JSON.parseObject(String.join("\n", IOUtils.readLines(new ClassPathResource("kafka/producerParameter.json").getInputStream(), "UTF-8")), JSONObject.class);
    }

    @GetMapping({"/topics/{databaseCode}"})
    @ApiOperation("获取topic列表")
    public ResponseData topics(@PathVariable String str) throws Exception {
        SourceInfo databaseInfoById = this.sourceInfoDao.getDatabaseInfoById(str);
        JSONArray jSONArray = new JSONArray();
        Properties properties = new Properties();
        String username = databaseInfoById.getUsername();
        String password = databaseInfoById.getPassword();
        if (StringUtils.isNotBlank(username) && StringUtils.isNotBlank(password)) {
            properties.setProperty(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required\nusername=\"" + username + "\"\npassword=\"" + AESSecurityUtils.decryptBase64String(password, "0123456789abcdefghijklmnopqrstuvwxyzABCDEF") + "\";");
        }
        properties.put("bootstrap.servers", databaseInfoById.getDatabaseUrl());
        AdminClient create = KafkaAdminClient.create(properties);
        try {
            for (String str2 : create.listTopics().names().get()) {
                JSONObject jSONObject = new JSONObject();
                jSONObject.put("name", (Object) str2);
                jSONArray.add(jSONObject);
            }
            return ResponseSingleData.makeResponseData(PageQueryResult.createResult(jSONArray, null));
        } finally {
            create.close();
        }
    }
}
