/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.nacos.core.cluster.remote;

import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.remote.RequestCallBack;
import com.alibaba.nacos.api.remote.request.Request;
import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.common.notify.listener.Subscriber;
import com.alibaba.nacos.common.remote.ConnectionType;
import com.alibaba.nacos.common.remote.client.RpcClient;
import com.alibaba.nacos.common.remote.client.RpcClientFactory;
import com.alibaba.nacos.common.remote.client.ServerListFactory;
import com.alibaba.nacos.common.utils.CollectionUtils;
import com.alibaba.nacos.core.cluster.Member;
import com.alibaba.nacos.core.cluster.MemberChangeListener;
import com.alibaba.nacos.core.cluster.MemberUtil;
import com.alibaba.nacos.core.cluster.MembersChangeEvent;
import com.alibaba.nacos.core.cluster.ServerMemberManager;
import com.alibaba.nacos.core.utils.Loggers;
import com.alibaba.nacos.sys.env.EnvUtil;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class ClusterRpcClientProxy
extends MemberChangeListener {
    private static final long DEFAULT_REQUEST_TIME_OUT = 3000L;
    @Autowired
    ServerMemberManager serverMemberManager;

    @PostConstruct
    public void init() {
        try {
            NotifyCenter.registerSubscriber((Subscriber)this);
            List<Member> members = this.serverMemberManager.allMembersWithoutSelf();
            this.refresh(members);
            Loggers.CLUSTER.warn("[ClusterRpcClientProxy] success to refresh cluster rpc client on start up,members ={} ", members);
        }
        catch (NacosException e) {
            Loggers.CLUSTER.warn("[ClusterRpcClientProxy] fail to refresh cluster rpc client,{} ", (Object)e.getMessage());
        }
    }

    private void refresh(List<Member> members) throws NacosException {
        for (Member member : members) {
            if (!MemberUtil.isSupportedLongCon(member)) continue;
            this.createRpcClientAndStart(member, ConnectionType.GRPC);
        }
        Set allClientEntrys = RpcClientFactory.getAllClientEntries();
        Iterator iterator = allClientEntrys.iterator();
        List newMemberKeys = members.stream().filter(MemberUtil::isSupportedLongCon).map(this::memberClientKey).collect(Collectors.toList());
        while (iterator.hasNext()) {
            Map.Entry next1 = (Map.Entry)iterator.next();
            if (!((String)next1.getKey()).startsWith("Cluster-") || newMemberKeys.contains(next1.getKey())) continue;
            Loggers.CLUSTER.info("member leave,destroy client of member - > : {}", next1.getKey());
            RpcClientFactory.getClient((String)((String)next1.getKey())).shutdown();
            iterator.remove();
        }
    }

    private String memberClientKey(Member member) {
        return "Cluster-" + member.getAddress();
    }

    private void createRpcClientAndStart(final Member member, ConnectionType type) throws NacosException {
        HashMap<String, String> labels = new HashMap<String, String>(2);
        labels.put("source", "cluster");
        String memberClientKey = this.memberClientKey(member);
        RpcClient client = this.buildRpcClient(type, labels, memberClientKey);
        if (!client.getConnectionType().equals((Object)type)) {
            Loggers.CLUSTER.info(",connection type changed,destroy client of member - > : {}", (Object)member);
            RpcClientFactory.destroyClient((String)memberClientKey);
            client = this.buildRpcClient(type, labels, memberClientKey);
        }
        if (client.isWaitInitiated()) {
            Loggers.CLUSTER.info("start a new rpc client to member - > : {}", (Object)member);
            client.serverListFactory(new ServerListFactory(){

                public String genNextServer() {
                    return member.getAddress();
                }

                public String getCurrentServer() {
                    return member.getAddress();
                }

                public List<String> getServerList() {
                    return CollectionUtils.list((Object[])new String[]{member.getAddress()});
                }
            });
            client.start();
        }
    }

    private RpcClient buildRpcClient(ConnectionType type, Map<String, String> labels, String memberClientKey) {
        return RpcClientFactory.createClusterClient((String)memberClientKey, (ConnectionType)type, (Integer)EnvUtil.getAvailableProcessors((int)2), (Integer)EnvUtil.getAvailableProcessors((int)8), labels);
    }

    public Response sendRequest(Member member, Request request) throws NacosException {
        return this.sendRequest(member, request, 3000L);
    }

    public Response sendRequest(Member member, Request request, long timeoutMills) throws NacosException {
        RpcClient client = RpcClientFactory.getClient((String)this.memberClientKey(member));
        if (client != null) {
            return client.request(request, timeoutMills);
        }
        throw new NacosException(-400, "No rpc client related to member: " + member);
    }

    public void asyncRequest(Member member, Request request, RequestCallBack callBack) throws NacosException {
        RpcClient client = RpcClientFactory.getClient((String)this.memberClientKey(member));
        if (client == null) {
            throw new NacosException(-400, "No rpc client related to member: " + member);
        }
        client.asyncRequest(request, callBack);
    }

    public void sendRequestToAllMembers(Request request) throws NacosException {
        List<Member> members = this.serverMemberManager.allMembersWithoutSelf();
        for (Member member1 : members) {
            this.sendRequest(member1, request);
        }
    }

    public void onEvent(MembersChangeEvent event) {
        try {
            List<Member> members = this.serverMemberManager.allMembersWithoutSelf();
            this.refresh(members);
        }
        catch (NacosException e) {
            Loggers.CLUSTER.warn("[serverlist] fail to refresh cluster rpc client, event:{}, msg: {} ", (Object)event, (Object)e.getMessage());
        }
    }
}

