/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.nacos.core.distributed.raft;

import com.alibaba.nacos.common.JustForTest;
import com.alibaba.nacos.common.model.RestResult;
import com.alibaba.nacos.common.utils.ConvertUtils;
import com.alibaba.nacos.common.utils.InternetAddressUtil;
import com.alibaba.nacos.common.utils.LoggerUtils;
import com.alibaba.nacos.common.utils.StringUtils;
import com.alibaba.nacos.common.utils.ThreadUtils;
import com.alibaba.nacos.consistency.RequestProcessor;
import com.alibaba.nacos.consistency.SerializeFactory;
import com.alibaba.nacos.consistency.Serializer;
import com.alibaba.nacos.consistency.cp.RequestProcessor4CP;
import com.alibaba.nacos.consistency.entity.ReadRequest;
import com.alibaba.nacos.consistency.entity.Response;
import com.alibaba.nacos.consistency.exception.ConsistencyException;
import com.alibaba.nacos.core.distributed.raft.JRaftMaintainService;
import com.alibaba.nacos.core.distributed.raft.NacosClosure;
import com.alibaba.nacos.core.distributed.raft.NacosStateMachine;
import com.alibaba.nacos.core.distributed.raft.RaftConfig;
import com.alibaba.nacos.core.distributed.raft.exception.DuplicateRaftGroupException;
import com.alibaba.nacos.core.distributed.raft.exception.JRaftException;
import com.alibaba.nacos.core.distributed.raft.exception.NoLeaderException;
import com.alibaba.nacos.core.distributed.raft.exception.NoSuchRaftGroupException;
import com.alibaba.nacos.core.distributed.raft.utils.FailoverClosure;
import com.alibaba.nacos.core.distributed.raft.utils.FailoverClosureImpl;
import com.alibaba.nacos.core.distributed.raft.utils.JRaftUtils;
import com.alibaba.nacos.core.distributed.raft.utils.RaftExecutor;
import com.alibaba.nacos.core.distributed.raft.utils.RaftOptionsBuilder;
import com.alibaba.nacos.core.monitor.MetricsMonitor;
import com.alibaba.nacos.core.utils.Loggers;
import com.alibaba.nacos.sys.env.EnvUtil;
import com.alipay.sofa.jraft.CliService;
import com.alipay.sofa.jraft.Closure;
import com.alipay.sofa.jraft.Node;
import com.alipay.sofa.jraft.NodeManager;
import com.alipay.sofa.jraft.RaftGroupService;
import com.alipay.sofa.jraft.RaftServiceFactory;
import com.alipay.sofa.jraft.RouteTable;
import com.alipay.sofa.jraft.StateMachine;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.closure.ReadIndexClosure;
import com.alipay.sofa.jraft.conf.Configuration;
import com.alipay.sofa.jraft.core.CliServiceImpl;
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.entity.Task;
import com.alipay.sofa.jraft.error.RaftError;
import com.alipay.sofa.jraft.option.CliOptions;
import com.alipay.sofa.jraft.option.NodeOptions;
import com.alipay.sofa.jraft.option.RaftOptions;
import com.alipay.sofa.jraft.rpc.CliClientService;
import com.alipay.sofa.jraft.rpc.InvokeCallback;
import com.alipay.sofa.jraft.rpc.RpcProcessor;
import com.alipay.sofa.jraft.rpc.RpcServer;
import com.alipay.sofa.jraft.rpc.impl.cli.CliClientServiceImpl;
import com.alipay.sofa.jraft.util.BytesUtil;
import com.alipay.sofa.jraft.util.Endpoint;
import com.google.protobuf.Message;
import java.nio.ByteBuffer;
import java.nio.file.Paths;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import org.slf4j.Logger;
import org.springframework.util.CollectionUtils;

public class JRaftServer {
    private RpcServer rpcServer;
    private CliClientServiceImpl cliClientService;
    private CliService cliService;
    private Map<String, RaftGroupTuple> multiRaftGroup = new ConcurrentHashMap<String, RaftGroupTuple>();
    private volatile boolean isStarted = false;
    private volatile boolean isShutdown = false;
    private Configuration conf;
    private RpcProcessor userProcessor;
    private NodeOptions nodeOptions;
    private Serializer serializer;
    private Collection<RequestProcessor4CP> processors = Collections.synchronizedSet(new HashSet());
    private String selfIp;
    private int selfPort;
    private RaftConfig raftConfig;
    private PeerId localPeerId;
    private int failoverRetries;
    private int rpcRequestTimeoutMs;

    public JRaftServer() {
        this.conf = new Configuration();
    }

    public void setFailoverRetries(int failoverRetries) {
        this.failoverRetries = failoverRetries;
    }

    void init(RaftConfig config) {
        this.raftConfig = config;
        this.serializer = SerializeFactory.getDefault();
        Loggers.RAFT.info("Initializes the Raft protocol, raft-config info : {}", (Object)config);
        RaftExecutor.init(config);
        String self = config.getSelfMember();
        String[] info = InternetAddressUtil.splitIPPortStr((String)self);
        this.selfIp = info[0];
        this.selfPort = Integer.parseInt(info[1]);
        this.localPeerId = PeerId.parsePeer((String)self);
        this.nodeOptions = new NodeOptions();
        int electionTimeout = Math.max(ConvertUtils.toInt((String)config.getVal("election_timeout_ms"), (int)5000), 5000);
        this.rpcRequestTimeoutMs = ConvertUtils.toInt((String)this.raftConfig.getVal("rpc_request_timeout_ms"), (int)5000);
        this.nodeOptions.setSharedElectionTimer(true);
        this.nodeOptions.setSharedVoteTimer(true);
        this.nodeOptions.setSharedStepDownTimer(true);
        this.nodeOptions.setSharedSnapshotTimer(true);
        this.nodeOptions.setElectionTimeoutMs(electionTimeout);
        RaftOptions raftOptions = RaftOptionsBuilder.initRaftOptions(this.raftConfig);
        this.nodeOptions.setRaftOptions(raftOptions);
        this.nodeOptions.setEnableMetrics(true);
        CliOptions cliOptions = new CliOptions();
        this.cliService = RaftServiceFactory.createAndInitCliService((CliOptions)cliOptions);
        this.cliClientService = (CliClientServiceImpl)((CliServiceImpl)this.cliService).getCliClientService();
    }

    synchronized void start() {
        if (!this.isStarted) {
            Loggers.RAFT.info("========= The raft protocol is starting... =========");
            try {
                NodeManager raftNodeManager = NodeManager.getInstance();
                for (String address : this.raftConfig.getMembers()) {
                    PeerId peerId = PeerId.parsePeer((String)address);
                    this.conf.addPeer(peerId);
                    raftNodeManager.addAddress(peerId.getEndpoint());
                }
                this.nodeOptions.setInitialConf(this.conf);
                this.rpcServer = JRaftUtils.initRpcServer(this, this.localPeerId);
                if (!this.rpcServer.init(null)) {
                    Loggers.RAFT.error("Fail to init [BaseRpcServer].");
                    throw new RuntimeException("Fail to init [BaseRpcServer].");
                }
                this.isStarted = true;
                this.createMultiRaftGroup(this.processors);
                Loggers.RAFT.info("========= The raft protocol start finished... =========");
            }
            catch (Exception e) {
                Loggers.RAFT.error("raft protocol start failure, cause: ", (Throwable)e);
                throw new JRaftException(e);
            }
        }
    }

    synchronized void createMultiRaftGroup(Collection<RequestProcessor4CP> processors) {
        if (!this.isStarted) {
            this.processors.addAll(processors);
            return;
        }
        String parentPath = Paths.get(EnvUtil.getNacosHome(), "data/protocol/raft").toString();
        for (RequestProcessor4CP processor : processors) {
            String groupName = processor.group();
            if (this.multiRaftGroup.containsKey(groupName)) {
                throw new DuplicateRaftGroupException(groupName);
            }
            Configuration configuration = this.conf.copy();
            NodeOptions copy = this.nodeOptions.copy();
            JRaftUtils.initDirectory(parentPath, groupName, copy);
            NacosStateMachine machine = new NacosStateMachine(this, processor);
            copy.setFsm((StateMachine)machine);
            copy.setInitialConf(configuration);
            int doSnapshotInterval = ConvertUtils.toInt((String)this.raftConfig.getVal("snapshot_interval_secs"), (int)1800);
            doSnapshotInterval = CollectionUtils.isEmpty((Collection)processor.loadSnapshotOperate()) ? 0 : doSnapshotInterval;
            copy.setSnapshotIntervalSecs(doSnapshotInterval);
            Loggers.RAFT.info("create raft group : {}", (Object)groupName);
            RaftGroupService raftGroupService = new RaftGroupService(groupName, this.localPeerId, copy, this.rpcServer, true);
            Node node = raftGroupService.start(false);
            machine.setNode(node);
            RouteTable.getInstance().updateConfiguration(groupName, configuration);
            RaftExecutor.executeByCommon(() -> this.registerSelfToCluster(groupName, this.localPeerId, configuration));
            Random random = new Random();
            long period = this.nodeOptions.getElectionTimeoutMs() + random.nextInt(5000);
            RaftExecutor.scheduleRaftMemberRefreshJob(() -> this.refreshRouteTable(groupName), this.nodeOptions.getElectionTimeoutMs(), period, TimeUnit.MILLISECONDS);
            this.multiRaftGroup.put(groupName, new RaftGroupTuple(node, (RequestProcessor)processor, raftGroupService, machine));
        }
    }

    CompletableFuture<Response> get(final ReadRequest request) {
        String group = request.getGroup();
        final CompletableFuture<Response> future = new CompletableFuture<Response>();
        RaftGroupTuple tuple = this.findTupleByGroup(group);
        if (Objects.isNull(tuple)) {
            future.completeExceptionally(new NoSuchRaftGroupException(group));
            return future;
        }
        Node node = tuple.node;
        final RequestProcessor processor = tuple.processor;
        try {
            node.readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure(){

                public void run(Status status, long index, byte[] reqCtx) {
                    if (status.isOk()) {
                        try {
                            Response response = processor.onRequest(request);
                            future.complete(response);
                        }
                        catch (Throwable t) {
                            MetricsMonitor.raftReadIndexFailed();
                            future.completeExceptionally((Throwable)new ConsistencyException("The conformance protocol is temporarily unavailable for reading", t));
                        }
                        return;
                    }
                    MetricsMonitor.raftReadIndexFailed();
                    Loggers.RAFT.error("ReadIndex has error : {}, go to Leader read.", (Object)status.getErrorMsg());
                    MetricsMonitor.raftReadFromLeader();
                    JRaftServer.this.readFromLeader(request, future);
                }
            });
            return future;
        }
        catch (Throwable e) {
            MetricsMonitor.raftReadFromLeader();
            Loggers.RAFT.warn("Raft linear read failed, go to Leader read logic : {}", (Object)e.toString());
            this.readFromLeader(request, future);
            return future;
        }
    }

    public void readFromLeader(ReadRequest request, CompletableFuture<Response> future) {
        this.commit(request.getGroup(), (Message)request, future);
    }

    public CompletableFuture<Response> commit(String group, Message data, CompletableFuture<Response> future) {
        LoggerUtils.printIfDebugEnabled((Logger)Loggers.RAFT, (String)"data requested this time : {}", (Object[])new Object[]{data});
        RaftGroupTuple tuple = this.findTupleByGroup(group);
        if (tuple == null) {
            future.completeExceptionally(new IllegalArgumentException("No corresponding Raft Group found : " + group));
            return future;
        }
        FailoverClosureImpl closure = new FailoverClosureImpl(future);
        Node node = tuple.node;
        if (node.isLeader()) {
            this.applyOperation(node, data, closure);
        } else {
            this.invokeToLeader(group, data, this.rpcRequestTimeoutMs, closure);
        }
        return future;
    }

    void registerSelfToCluster(String groupId, PeerId selfIp, Configuration conf) {
        while (true) {
            try {
                List peerIds = this.cliService.getPeers(groupId, conf);
                if (peerIds.contains(selfIp)) {
                    return;
                }
                Status status = this.cliService.addPeer(groupId, conf, selfIp);
                if (status.isOk()) {
                    return;
                }
                Loggers.RAFT.warn("Failed to join the cluster, retry...");
            }
            catch (Exception e) {
                Loggers.RAFT.error("Failed to join the cluster, retry...", (Throwable)e);
            }
            ThreadUtils.sleep((long)1000L);
        }
    }

    protected PeerId getLeader(String raftGroupId) {
        return RouteTable.getInstance().selectLeader(raftGroupId);
    }

    synchronized void shutdown() {
        if (this.isShutdown) {
            return;
        }
        this.isShutdown = true;
        try {
            Loggers.RAFT.info("========= The raft protocol is starting to close =========");
            for (Map.Entry<String, RaftGroupTuple> entry : this.multiRaftGroup.entrySet()) {
                RaftGroupTuple tuple = entry.getValue();
                Node node = tuple.getNode();
                tuple.node.shutdown();
                tuple.raftGroupService.shutdown();
            }
            this.cliService.shutdown();
            this.cliClientService.shutdown();
            Loggers.RAFT.info("========= The raft protocol has been closed =========");
        }
        catch (Throwable t) {
            Loggers.RAFT.error("There was an error in the raft protocol shutdown, cause: ", t);
        }
    }

    public void applyOperation(Node node, Message data, FailoverClosure closure) {
        Task task = new Task();
        task.setDone((Closure)new NacosClosure(data, status -> {
            NacosClosure.NacosStatus nacosStatus = (NacosClosure.NacosStatus)status;
            closure.setThrowable(nacosStatus.getThrowable());
            closure.setResponse(nacosStatus.getResponse());
            closure.run(nacosStatus);
        }));
        byte[] requestTypeFieldBytes = new byte[]{56, data instanceof ReadRequest ? (byte)1 : 2};
        byte[] dataBytes = data.toByteArray();
        task.setData(ByteBuffer.allocate(requestTypeFieldBytes.length + dataBytes.length).put(requestTypeFieldBytes).put(dataBytes).position(0));
        node.apply(task);
    }

    private void invokeToLeader(String group, Message request, int timeoutMillis, final FailoverClosure closure) {
        try {
            Endpoint leaderIp = Optional.ofNullable(this.getLeader(group)).orElseThrow(() -> new NoLeaderException(group)).getEndpoint();
            this.cliClientService.getRpcClient().invokeAsync(leaderIp, (Object)request, new InvokeCallback(){

                public void complete(Object o, Throwable ex) {
                    if (Objects.nonNull(ex)) {
                        closure.setThrowable(ex);
                        closure.run(new Status(RaftError.UNKNOWN, ex.getMessage(), new Object[0]));
                        return;
                    }
                    if (!((Response)o).getSuccess()) {
                        closure.setThrowable(new IllegalStateException(((Response)o).getErrMsg()));
                        closure.run(new Status(RaftError.UNKNOWN, ((Response)o).getErrMsg(), new Object[0]));
                        return;
                    }
                    closure.setResponse((Response)o);
                    closure.run(Status.OK());
                }

                public Executor executor() {
                    return RaftExecutor.getRaftCliServiceExecutor();
                }
            }, (long)timeoutMillis);
        }
        catch (Exception e) {
            closure.setThrowable(e);
            closure.run(new Status(RaftError.UNKNOWN, e.toString(), new Object[0]));
        }
    }

    boolean peerChange(final JRaftMaintainService maintainService, Set<String> newPeers) {
        HashSet<String> oldPeers = new HashSet<String>(this.raftConfig.getMembers());
        this.raftConfig.setMembers(this.localPeerId.toString(), newPeers);
        oldPeers.removeAll(newPeers);
        if (oldPeers.isEmpty()) {
            return true;
        }
        final HashSet<String> waitRemove = oldPeers;
        final AtomicInteger successCnt = new AtomicInteger(0);
        this.multiRaftGroup.forEach(new BiConsumer<String, RaftGroupTuple>(){

            @Override
            public void accept(String group, RaftGroupTuple tuple) {
                HashMap<String, String> params = new HashMap<String, String>();
                params.put("groupId", group);
                params.put("command", "removePeers");
                params.put("value", StringUtils.join((Collection)waitRemove, (String)","));
                RestResult<String> result = maintainService.execute(params);
                if (result.ok()) {
                    successCnt.incrementAndGet();
                } else {
                    Loggers.RAFT.error("Node removal failed : {}", result);
                }
            }
        });
        return successCnt.get() == this.multiRaftGroup.size();
    }

    void refreshRouteTable(String group) {
        if (this.isShutdown) {
            return;
        }
        String groupName = group;
        Status status = null;
        try {
            RouteTable instance = RouteTable.getInstance();
            Configuration oldConf = instance.getConfiguration(groupName);
            String oldLeader = Optional.ofNullable(instance.selectLeader(groupName)).orElse(PeerId.emptyPeer()).getEndpoint().toString();
            status = instance.refreshLeader((CliClientService)this.cliClientService, groupName, this.rpcRequestTimeoutMs);
            if (!status.isOk()) {
                Loggers.RAFT.error("Fail to refresh leader for group : {}, status is : {}", (Object)groupName, (Object)status);
            }
            if (!(status = instance.refreshConfiguration((CliClientService)this.cliClientService, groupName, this.rpcRequestTimeoutMs)).isOk()) {
                Loggers.RAFT.error("Fail to refresh route configuration for group : {}, status is : {}", (Object)groupName, (Object)status);
            }
        }
        catch (Exception e) {
            Loggers.RAFT.error("Fail to refresh raft metadata info for group : {}, error is : {}", (Object)groupName, (Object)e);
        }
    }

    public RaftGroupTuple findTupleByGroup(String group) {
        RaftGroupTuple tuple = this.multiRaftGroup.get(group);
        return tuple;
    }

    public Node findNodeByGroup(String group) {
        RaftGroupTuple tuple = this.multiRaftGroup.get(group);
        if (Objects.nonNull(tuple)) {
            return tuple.node;
        }
        return null;
    }

    Map<String, RaftGroupTuple> getMultiRaftGroup() {
        return this.multiRaftGroup;
    }

    @JustForTest
    void mockMultiRaftGroup(Map<String, RaftGroupTuple> map) {
        this.multiRaftGroup = map;
    }

    CliService getCliService() {
        return this.cliService;
    }

    static {
        System.getProperties().setProperty("bolt.netty.buffer.low.watermark", String.valueOf(0x8000000));
        System.getProperties().setProperty("bolt.netty.buffer.high.watermark", String.valueOf(0x10000000));
    }

    public static class RaftGroupTuple {
        private RequestProcessor processor;
        private Node node;
        private RaftGroupService raftGroupService;
        private NacosStateMachine machine;

        @JustForTest
        public RaftGroupTuple() {
        }

        public RaftGroupTuple(Node node, RequestProcessor processor, RaftGroupService raftGroupService, NacosStateMachine machine) {
            this.node = node;
            this.processor = processor;
            this.raftGroupService = raftGroupService;
            this.machine = machine;
        }

        public Node getNode() {
            return this.node;
        }

        public RequestProcessor getProcessor() {
            return this.processor;
        }

        public RaftGroupService getRaftGroupService() {
            return this.raftGroupService;
        }
    }
}

