/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.extend.client;

import hector.me.prettyprint.cassandra.service.CassandraHostConfigurator;
import hector.me.prettyprint.cassandra.service.FailoverPolicy;
import hector.me.prettyprint.cassandra.service.ThriftCluster;
import hector.me.prettyprint.hector.api.Cluster;
import hector.me.prettyprint.hector.api.ConsistencyLevelPolicy;
import hector.me.prettyprint.hector.api.Keyspace;
import hector.me.prettyprint.hector.api.factory.HFactory;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.cassandra.extend.client.BlobBean;
import org.apache.cassandra.extend.client.DataPiece;
import org.apache.cassandra.extend.client.FFSClientCommonModule;
import org.apache.cassandra.extend.client.FFSClientCompatible;
import org.apache.cassandra.extend.client.FFSClientDeleteModule;
import org.apache.cassandra.extend.client.FFSClientExecuteHandler;
import org.apache.cassandra.extend.client.FFSClientPutModule;
import org.apache.cassandra.extend.client.FFSClientReadBackupModule;
import org.apache.cassandra.extend.client.FFSClientReadModule;
import org.apache.cassandra.extend.client.FFSException;
import org.apache.cassandra.extend.client.FFSLoadBalancingPolicy;
import org.apache.cassandra.extend.client.InputStreamFromFFS;
import org.apache.cassandra.extend.client.UploadingBrokenPoint;
import org.apache.cassandra.extend.client.ZeroConsistencyLevelPolicy;
import org.apache.cassandra.extend.midlayer.common.BlockIndex;
import org.apache.cassandra.extend.midlayer.common.Constants;
import org.apache.cassandra.extend.midlayer.common.PartitionTable;
import org.apache.cassandra.thrift.commons.lang.ArrayUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class FFSClient
extends FFSClientCompatible {
    private int chunkMaxSize = 0x200000;
    private static Log logger;
    private final String defaultClusterName = "defaultClusterName";
    private FFSClientCommonModule commonModule;
    private FFSClientPutModule putModule;
    private FFSClientDeleteModule deleteModule;
    private FFSClientReadModule readModule;
    private FFSClientReadBackupModule backupReadModule;
    private static String charSetName;
    private static String appNameConnectingFFS;
    private static String pid;
    private static String clientVersion;
    private static Map<String, FFSClient> instances;
    private final PartitionTable partitionTable = new PartitionTable();
    public static final long INVAILABLE_RECORD_TIMESTAMP = 1L;
    private static final ThreadPoolExecutor executor;

    public static void setAppNameConnectingFFS(String appNameConnectingFFSP) {
        if (!"UNKNOW".equals(appNameConnectingFFSP)) {
            appNameConnectingFFS = appNameConnectingFFSP;
        }
    }

    public static void shutdown() {
        for (FFSClient client : instances.values()) {
            client.shutdownConnAndTasks();
        }
        instances.clear();
    }

    private void shutdownConnAndTasks() {
        this.commonModule.shutdown();
        this.partitionTable.clearPartitionTable();
    }

    public static void destory(FFSClient client) {
        for (String key : instances.keySet()) {
            if (client != instances.get(key)) continue;
            instances.remove(key);
        }
        client.shutdownConnAndTasks();
    }

    private void initPartitionTable() {
    }

    public static synchronized FFSClient getInstance(String hosts, String keyspaceName, IBlockSize blockSize, String username, String password) {
        return FFSClient.getInstance(hosts, keyspaceName, 100, blockSize, username, password);
    }

    public static synchronized FFSClient getInstance(String hosts, String keyspaceName, IBlockSize blockSize, String username, String password, String charsetName) {
        charSetName = charsetName;
        return FFSClient.getInstance(hosts, keyspaceName, 100, blockSize, username, password);
    }

    public static synchronized FFSClient getInstance(String hosts, String keyspaceName, IBlockSize blockSize) {
        return FFSClient.getInstance(hosts, keyspaceName, 100, blockSize, "", "");
    }

    public static synchronized FFSClient getInstanceForRemoteDC(String hosts, String keyspaceName, IBlockSize blockSize, int retrySubmitDelayInSeconds, String username, String password) {
        return FFSClient.getInstanceForRemoteDC(hosts, keyspaceName, 10, blockSize, retrySubmitDelayInSeconds, username, password);
    }

    public static synchronized FFSClient getInstanceForRemoteDC(String hosts, String keyspaceName, IBlockSize blockSize, int retrySubmitDelayInSeconds) {
        return FFSClient.getInstanceForRemoteDC(hosts, keyspaceName, 10, blockSize, retrySubmitDelayInSeconds, "", "");
    }

    static synchronized FFSClient getInstanceForRemoteDC(String hosts, String keyspaceName, int maxActive, IBlockSize blockSize, int retrySubmitDelayInSeconds, String username, String password) {
        if (username == null || password == null) {
            throw new RuntimeException("username or password is null");
        }
        String instanceKey = FFSClient.getInstanceKey(hosts, keyspaceName);
        FFSClient ret = instances.get(instanceKey);
        if (ret == null) {
            ret = new FFSClient(hosts, keyspaceName, maxActive, blockSize, retrySubmitDelayInSeconds, username, password);
            ret.initPartitionTable();
            instances.put(instanceKey, ret);
        }
        return ret;
    }

    static synchronized FFSClient getInstance(String hosts, String keyspaceName, int maxActive, IBlockSize blockSize, String username, String password) {
        if (username == null || password == null) {
            throw new RuntimeException("username or password is null");
        }
        String instanceKey = FFSClient.getInstanceKey(hosts, keyspaceName);
        FFSClient ret = instances.get(instanceKey);
        if (ret == null) {
            ret = new FFSClient(hosts, keyspaceName, maxActive, blockSize, username, password);
            ret.initPartitionTable();
            instances.put(instanceKey, ret);
        }
        return ret;
    }

    private static String getInstanceKey(String hosts, String keyspaceName) {
        return hosts + "~" + keyspaceName;
    }

    private FFSClient(String hosts, String keyspaceName, int maxActive, IBlockSize blockSize, String username, String password) {
        this.chunkMaxSize = blockSize.getBlockSize();
        try {
            CassandraHostConfigurator config = null;
            config = new CassandraHostConfigurator(hosts);
            config.setMaxActive(maxActive);
            config.setRetryDownedHostsDelayInSeconds(1);
            config.setMaxWaitTimeWhenExhausted(12000L);
            config.setCassandraThriftSocketTimeout(12000);
            FFSLoadBalancingPolicy policy = new FFSLoadBalancingPolicy(this.partitionTable);
            HashMap<String, String> credentials = new HashMap<String, String>();
            credentials.put("username", username);
            credentials.put("password", password);
            credentials.put("FFS_CREDENTIALS_PID", pid);
            credentials.put("FFS_CREDENTIALS_APP_NAME", appNameConnectingFFS);
            credentials.put("FFS_CREDENTIALS_CLIENT_VERSION", clientVersion);
            ThriftCluster cluster = new ThriftCluster("defaultClusterName", config, credentials);
            Keyspace keyspace = HFactory.createKeyspace((String)keyspaceName, (Cluster)cluster, (ConsistencyLevelPolicy)new ZeroConsistencyLevelPolicy());
            this.commonModule = new FFSClientCommonModule(keyspace, this.chunkMaxSize, (Cluster)cluster, this.partitionTable, policy, hosts);
            this.deleteModule = new FFSClientDeleteModule(this.commonModule);
            this.readModule = new FFSClientReadModule(this.commonModule, charSetName);
            this.putModule = new FFSClientPutModule(this.commonModule, this.readModule);
            this.backupReadModule = new FFSClientReadBackupModule(this.commonModule);
        }
        catch (Exception e) {
            logger.error((Object)"init FFSClient error:", (Throwable)e);
        }
    }

    private FFSClient(String hosts, String keyspaceName, int maxActive, IBlockSize blockSize, int retrySubmitDelayInSeconds, String username, String password) {
        this.chunkMaxSize = blockSize.getBlockSize();
        try {
            CassandraHostConfigurator config = null;
            config = new CassandraHostConfigurator(hosts);
            config.setMaxActive(maxActive);
            config.setRetryDownedHostsDelayInSeconds(1);
            config.setMaxWaitTimeWhenExhausted(12000L);
            config.setCassandraThriftSocketTimeout(12000);
            FFSLoadBalancingPolicy policy = new FFSLoadBalancingPolicy(this.partitionTable);
            HashMap<String, String> credentials = new HashMap<String, String>();
            credentials.put("username", username);
            credentials.put("password", password);
            credentials.put("FFS_CREDENTIALS_PID", pid);
            credentials.put("FFS_CREDENTIALS_APP_NAME", appNameConnectingFFS);
            credentials.put("FFS_CREDENTIALS_CLIENT_VERSION", clientVersion);
            ThriftCluster cluster = new ThriftCluster("defaultClusterName", config, credentials);
            Keyspace keyspace = HFactory.createKeyspace((String)keyspaceName, (Cluster)cluster, (ConsistencyLevelPolicy)new ZeroConsistencyLevelPolicy(), (FailoverPolicy)new FailoverPolicy(3, retrySubmitDelayInSeconds * 1000));
            this.commonModule = new FFSClientCommonModule(keyspace, this.chunkMaxSize, (Cluster)cluster, this.partitionTable, policy, hosts);
            this.deleteModule = new FFSClientDeleteModule(this.commonModule);
            this.readModule = new FFSClientReadModule(this.commonModule);
            this.putModule = new FFSClientPutModule(this.commonModule, this.readModule);
            this.backupReadModule = new FFSClientReadBackupModule(this.commonModule);
        }
        catch (Exception e) {
            logger.error((Object)"init FFSClient error:", (Throwable)e);
        }
    }

    public void delete(String cfName, String key) throws FFSException {
        this.deleteModule.delete(cfName, key, 1L);
    }

    void delete(String cfName, String key, long clock) throws FFSException {
        this.deleteModule.delete(cfName, key, clock);
    }

    public void putBlob(String tableName, String key, byte[] value, Map<String, String> meta) throws FFSException {
        this.checkMetas(meta);
        this.putModule.putBlob(tableName, key, value, meta, 1L);
    }

    public long putBlob(String tableName, String key, InputStream value, Map<String, String> meta) throws IOException, FFSException {
        this.checkMetas(meta);
        return this.putModule.putBlob(tableName, key, value, meta, 1L);
    }

    int putBlobAsInt(String tableName, String key, InputStream value, Map<String, String> meta) throws FFSException, IOException {
        long blobSize = this.putBlob(tableName, key, value, meta);
        if (blobSize <= Integer.MAX_VALUE && blobSize >= Integer.MIN_VALUE) {
            return (int)blobSize;
        }
        throw new FFSException("blob size should be integer,but is:" + blobSize);
    }

    void putBlob(String tableName, String key, byte[] value, Map<String, String> meta, long clock) throws FFSException {
        this.checkMetas(meta);
        this.putModule.putBlob(tableName, key, value, meta, clock);
    }

    long putBlob(String tableName, String key, InputStream value, Map<String, String> meta, long clock) throws IOException, FFSException {
        this.checkMetas(meta);
        return this.putModule.putBlob(tableName, key, value, meta, clock);
    }

    public void flashback(String tableName, String bussinessKey, long beforeWhichTime) throws FFSException {
        this.putModule.flashback(tableName, bussinessKey, beforeWhichTime);
    }

    public void updateRecordProteties(String tableName, String key, Map<String, String> metas) throws FFSException {
        this.checkMetas(metas);
        this.putModule.updateRecordProteties(tableName, key, metas, 1L);
    }

    private void checkMetas(Map<String, String> metas) {
        Set<String> metaNamsProtected = FFSClientCommonModule.getMetaNamsProtected();
        if (metas != null) {
            for (String meta_key : metas.keySet()) {
                if (!metaNamsProtected.contains(meta_key)) continue;
                throw new RuntimeException(String.format("word:%s is prtected", meta_key));
            }
        }
    }

    private void checkRecordVersion(long recordVersion) {
        if ((recordVersion + "").length() != "0000000000000000000".length()) {
            throw new RuntimeException("illegal recordVersion:" + recordVersion);
        }
    }

    public long putBlob(String tableName, String key, DataPiece dataPiece, Map<String, String> meta) throws IOException, FFSException {
        this.checkMetas(meta);
        return this.putModule.putBlob(tableName, key, dataPiece, meta, 1L);
    }

    public boolean tryToPutWithOutTransBlobs(String tableName, String key, List<String> blobMD5s, Map<String, String> meta) throws IOException, FFSException {
        this.checkMetas(meta);
        return this.putModule.tryToPutWithOutTransBlobs(tableName, key, blobMD5s, meta, 1L);
    }

    public UploadingBrokenPoint getUploadingBrokenPoint(String tableName, String key, long recordVersion, String digitSignFromUser) throws FFSException, IOException {
        return this.putModule.getUploadingBrokenPoint(tableName, key, recordVersion, digitSignFromUser);
    }

    public UploadingBrokenPoint getUploadingBrokenPoint(String tableName, String key, long recordVersion) throws FFSException, IOException {
        return this.putModule.getUploadingBrokenPoint(tableName, key, recordVersion, "defalutDigitSign");
    }

    public long getTimestamp(String cfName, String key) throws FFSException {
        try {
            BlobBean blobBean = this.readModule.getOneBlock(cfName, key, "0000000000000000000", "001", new String[]{"timestamp"});
            return blobBean.getTimestamp();
        }
        catch (IOException e) {
            throw new RuntimeException("should not throw IOException in getTimestamp()", e);
        }
    }

    public long getRecordSize(String cfName, String key) throws FFSException {
        String result = this.readModule.getRecordSize(cfName, key);
        return Long.valueOf(result);
    }

    public Map<String, String> getBlobMeta(String cfName, String key) throws FFSException {
        try {
            BlobBean blobBean = this.readModule.getOneBlock(cfName, key, "0000000000000000000", "001", new String[]{"meta"});
            return blobBean.getMeta();
        }
        catch (IOException e) {
            throw new RuntimeException("should not throw IOException in getBlobMeta()", e);
        }
    }

    public Map<String, String> getBlobMetaWithoutProtectedAttribute(String cfName, String key) throws FFSException {
        try {
            BlobBean blobBean = this.readModule.getOneBlock(cfName, key, "0000000000000000000", "001", new String[]{"meta"});
            return blobBean.getBlobMetaWithoutProtectedAttribute();
        }
        catch (IOException e) {
            throw new RuntimeException("should not throw IOException in getBlobMetaWithoutProtectedAttribute()", e);
        }
    }

    public BlobBean getBlobBean(String cfName, String key) throws FFSException {
        try {
            BlobBean blobBean = this.getBlobBean(cfName, key, null);
            return blobBean;
        }
        catch (IOException e) {
            throw new RuntimeException("should not throw IOException in getBlobBean without blobOS", e);
        }
    }

    public BlobBean getBackupBlobBean(String cfName, String key) throws FFSException {
        try {
            BlobBean blobBean = this.getBackupBlobBean(cfName, key, null);
            return blobBean;
        }
        catch (IOException e) {
            throw new RuntimeException("should not throw IOException in getBlobBean without blobOS", e);
        }
    }

    public BlobBean getBlobBean(String cfName, String key, OutputStream blobOS) throws FFSException, IOException {
        return this.readModule.getBlobBean(cfName, key, blobOS);
    }

    public BlobBean getBackupBlobBean(String cfName, String key, OutputStream blobOS) throws FFSException, IOException {
        return this.backupReadModule.getBlobBean(cfName, key, blobOS);
    }

    public BlobBean getBackupBlobBean(String cfName, String key, long recordVersion, long startPosition, long endPosition, OutputStream blobOS) throws FFSException, IOException {
        this.checkRecordVersion(recordVersion);
        return this.backupReadModule.getBlobBean(cfName, key, recordVersion, startPosition, endPosition, blobOS);
    }

    public long getRecordVersion(String cfName, String key) throws FFSException {
        return this.readModule.getRecordVersion(cfName, key);
    }

    public String getMD5(String cfName, String key) throws FFSException {
        return this.readModule.getMD5(cfName, key);
    }

    public BlobBean getBlobBean(String cfName, String key, long recordVersion, long startPosition, long endPosition, OutputStream blobOS) throws FFSException, IOException {
        this.checkRecordVersion(recordVersion);
        return this.readModule.getBlobBean(cfName, key, recordVersion, startPosition, endPosition, blobOS);
    }

    public BlobBean getOneBlock(String cfName, String key, String chunkIndex, String version) throws FFSException, IOException {
        BlobBean blobBean = this.readModule.getOneBlock(cfName, key, version, chunkIndex, new String[]{"meta", "version", "blob"});
        return blobBean;
    }

    public BlobBean fetchOriginalMetaForNotLating(String cfName, String key, String version) throws FFSException, IOException {
        BlobBean blobBean = this.readModule.getOneBlock(cfName, key, version, "001", new String[]{"blob", "timestamp", "FETCH_ORGINAL_META_FOR_NOT_LATING"});
        if (blobBean.getTimestamp() != 123456L) {
            throw new IOException("you got is not OriginalMeta!");
        }
        return blobBean;
    }

    public InputStreamFromFFS getBlobAsInputStreamFromFFS(String cfName, String key) {
        return new InputStreamFromFFS(cfName, key, this);
    }

    public String[] topRecordKeys(long num, String cfName) throws FFSException {
        return this.readModule.topRecord(num, cfName);
    }

    public String getBackupInfosOfSpecialBID(String cfName, String bussinessKey) throws FFSException {
        return this.readModule.getBackupInfosOfSpecialBID(cfName, bussinessKey);
    }

    public boolean forceReplicate(String table, String key) throws FFSException {
        return this.commonModule.forceReplicateData(table, key);
    }

    public List<String> copyRecordToOtherTable(String fromTableName, List<String> keys, String toTableName) throws FFSException {
        ArrayList<String> result = new ArrayList<String>();
        ArrayList<FFSClientExecuteHandler> executeHandlers = new ArrayList<FFSClientExecuteHandler>();
        for (String key : keys) {
            FFSClientExecuteHandler handler = new FFSClientExecuteHandler(this.commonModule, fromTableName, key, "", toTableName, false);
            executor.execute(handler);
            executeHandlers.add(handler);
        }
        for (FFSClientExecuteHandler handler : executeHandlers) {
            try {
                handler.get();
                boolean isSuccOperation = handler.isOperateSucc();
                if (!isSuccOperation) continue;
                result.add(handler.getKey());
            }
            catch (TimeoutException e) {
                logger.warn((Object)("batchCopyRecordToOtherTable timeout by key:" + handler.getKey()), (Throwable)e);
            }
        }
        return result;
    }

    public List<String> cutRecordToOtherTable(String fromTableName, List<String> keys, String toTableName) throws FFSException {
        logger.info((Object)("start  cutRecordToOtherTable: fromTableName:" + fromTableName + ";toTableName:" + toTableName + keys.toArray()));
        ArrayList<String> result = new ArrayList<String>();
        ArrayList<FFSClientExecuteHandler> executeHandlers = new ArrayList<FFSClientExecuteHandler>();
        for (String key : keys) {
            FFSClientExecuteHandler handler = new FFSClientExecuteHandler(this.commonModule, fromTableName, key, "", toTableName, true);
            executor.execute(handler);
            executeHandlers.add(handler);
        }
        for (FFSClientExecuteHandler handler : executeHandlers) {
            try {
                handler.get();
                boolean isSuccOperation = handler.isOperateSucc();
                if (!isSuccOperation) continue;
                result.add(handler.getKey());
            }
            catch (TimeoutException e) {
                logger.error((Object)("batchCutRecordToOtherTable timeout by key:" + handler.getKey()), (Throwable)e);
            }
        }
        logger.info((Object)("end  cutRecordToOtherTable: fromTableName:" + fromTableName + ";toTableName:" + toTableName + keys.toArray()));
        return result;
    }

    void repairReplicationOperationLog(String cfName, String key, long time) throws Exception {
        BlobBean blobBean = new BlobBean(null);
        String chunkIndex = "001";
        String version = "0000000000000000000";
        while (true) {
            this.readModule.getBlockAndFillToBlobBean(cfName, key, version, chunkIndex, new String[]{"version", "meta"}, blobBean, 0, Integer.MAX_VALUE);
            this.putModule.putBlob(cfName, key, chunkIndex, blobBean.attendLastBlock() ? (char)'b' : 'a', Long.valueOf(blobBean.getVersion()), Constants.BLOB_VALUE_FOR_IDENTIFYING_PUT_WRITING_LOG_ONLY_BYTEBUFFER.array(), null, blobBean.getTimestamp());
            if (blobBean.attendLastBlock()) break;
            version = String.valueOf(blobBean.getVersion());
            chunkIndex = BlockIndex.getNextIndexOfSplitedByString(chunkIndex);
        }
    }

    public boolean isExist(String cfName, String key) throws FFSException {
        try {
            this.getTimestamp(cfName, key);
        }
        catch (FFSException e) {
            if (this.commonModule.resolveException(e, 106)) {
                return false;
            }
            throw e;
        }
        return true;
    }

    int getDCNameConnected() throws FFSException {
        return this.commonModule.getDCNameConnected();
    }

    public static void main(String[] args) throws FFSException, IOException, InterruptedException {
        FFSClient client = FFSClient.getInstance("192.168.2.10:9160", "test", 1, BlockSize.BlockSize_1M, "", "");
        System.out.println("dafafa" + new String(client.getMD5("table1", "1")));
        for (int i = 2; i < 13; ++i) {
        }
        Thread.sleep(Long.MAX_VALUE);
        System.exit(0);
    }

    private static void testSimpleUpload(FFSClient client, String tableName, String key) throws FFSException, IOException {
        HashMap<String, String> meta = new HashMap<String, String>();
        meta.put("name", "liangfeng");
        byte[] value = new byte[10000];
        client.putBlob(tableName, key, value, meta);
    }

    private static void testSimpleUploadByStream(FFSClient client, String tableName, String key) throws FFSException, IOException {
        File file = new File("E:\\ffs\\apache-cassandra-1.0.7-src.tar");
        FileInputStream fileIS = new FileInputStream(file);
        HashMap<String, String> meta = new HashMap<String, String>();
        meta.put("name", "liangfeng");
        client.putBlob(tableName, key, fileIS, meta);
        fileIS.close();
    }

    private static void testUpload(FFSClient client, String tableName, String key) throws FFSException, IOException {
        int pieceLength;
        File file = new File("E:\\ffs\\apache-cassandra-1.0.7-src.tar");
        FileInputStream fileIS = new FileInputStream(file);
        HashMap<String, String> meta = new HashMap<String, String>();
        meta.put("name", "liangfeng");
        long recordVersion = 0L;
        int pieceMaxLength = 1048676;
        long startPosition = 0L;
        long endPosition = 0L;
        do {
            byte[] piece;
            if ((pieceLength = fileIS.read(piece = new byte[pieceMaxLength])) < piece.length) {
                piece = ArrayUtils.subarray((byte[])piece, (int)0, (int)pieceLength);
            }
            ByteArrayInputStream bs = new ByteArrayInputStream(piece);
            endPosition = startPosition + (long)pieceLength - 1L;
            DataPiece dataPiece = null;
            dataPiece = recordVersion == 0L ? new DataPiece(file.length(), bs, startPosition, endPosition) : new DataPiece(recordVersion, file.length(), (InputStream)bs, startPosition, endPosition);
            client.putBlob(tableName, key, dataPiece, meta);
            recordVersion = dataPiece.getRecordVersion();
            startPosition = client.getUploadingBrokenPoint(tableName, key, recordVersion).getBrokenSize();
            bs.close();
        } while (pieceLength >= pieceMaxLength);
        System.out.println(recordVersion);
        fileIS.close();
        System.out.println(client.isExist(tableName, key));
        System.out.println(client.getTimestamp(tableName, key));
        System.out.println(client.getBlobMeta(tableName, key).get("name"));
        System.out.println("meta_version_name" + client.getBlobMeta(tableName, key).get("ffs_v"));
        System.out.println("record_version" + client.getRecordVersion(tableName, key));
        System.out.println("meta_digitsign_name" + client.getBlobMeta(tableName, key).get("ffs_sign"));
        System.out.println("meta_size_name" + client.getBlobMeta(tableName, key).get("ffs_s"));
        System.out.println("meta_sumsize_name" + client.getBlobMeta(tableName, key).get("ffs_sum"));
    }

    private static void testNormalDownload(FFSClient client, String tableName, String key) throws FFSException, IOException {
        BlobBean blobBean = client.getBlobBean(tableName, key);
        System.out.println("blob in memory siz:" + blobBean.getBlob().length);
        FileOutputStream fileOS_without_brokenpoint = new FileOutputStream(new File("E:\\ffs\\apache-cassandra-1.0.7-src-downloaded.tar"));
        client.getBlobBean(tableName, key, fileOS_without_brokenpoint);
        fileOS_without_brokenpoint.close();
    }

    private static void testDownloadWithBrokenPoint(FFSClient client, String tableName, String key) throws FFSException, IOException {
        long version;
        BlobBean bean;
        FileOutputStream fileOS_with_brokenpoint = new FileOutputStream(new File("E:\\ffs\\apache-cassandra-1.0.7-src-downloaded.tar"), true);
        long sizeDownloaded = 0L;
        while (!(bean = client.getBlobBean(tableName, key, version = client.getRecordVersion(tableName, key), sizeDownloaded, sizeDownloaded + 200000L, fileOS_with_brokenpoint)).attendLastBlock() || (long)Integer.parseInt(bean.getMeta().get("ffs_sum")) != (sizeDownloaded += bean.getSumSizeOfCurrentContent())) {
        }
        fileOS_with_brokenpoint.close();
    }

    private static void testDownloadWithBrokenPointAtOneTime(FFSClient client, String tableName, String key) throws FFSException, IOException {
        FileOutputStream fileOS_with_brokenpoint = new FileOutputStream(new File("E:\\ffs\\apache-cassandra-1.0.7-src-downloaded.tar"), true);
        BlobBean bean = client.getBlobBean(tableName, key, 1367917606487144165L, 0L, Long.MAX_VALUE, fileOS_with_brokenpoint);
        fileOS_with_brokenpoint.close();
    }

    static {
        String clientDriverVersion;
        logger = LogFactory.getLog(FFSClient.class);
        appNameConnectingFFS = "UNKNOW";
        pid = "UNKNOW";
        clientVersion = "UNKNOW";
        instances = new ConcurrentHashMap<String, FFSClient>();
        String jvmName = ManagementFactory.getRuntimeMXBean().getName();
        pid = jvmName.split("@")[0];
        String clientImplementationVersion = FFSClient.class.getPackage().getImplementationVersion();
        if (clientImplementationVersion != null && clientImplementationVersion.length() > 0) {
            clientVersion = clientImplementationVersion;
        }
        if ((clientDriverVersion = Cluster.class.getPackage().getImplementationVersion()) != null && clientDriverVersion.length() > 0) {
            clientVersion = clientVersion + "~" + clientDriverVersion;
        }
        Thread cleaner = new Thread(){

            @Override
            public void run() {
                while (true) {
                    for (FFSClient ffsClient : instances.values()) {
                        try {
                            ffsClient.commonModule.releaseIdleConnections();
                            ffsClient.getDCNameConnected();
                        }
                        catch (Throwable e) {
                            logger.warn((Object)"failed to release idle connections!", e);
                        }
                    }
                    try {
                        Thread.sleep(30000L);
                    }
                    catch (InterruptedException interruptedException) {
                    }
                }
            }
        };
        cleaner.start();
        Runtime.getRuntime().addShutdownHook(new Thread(){

            @Override
            public void run() {
                for (FFSClient ffsClient : instances.values()) {
                }
            }
        });
        executor = new ThreadPoolExecutor(48, 96, 10L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(100), new ThreadPoolExecutor.CallerRunsPolicy());
    }

    private static class UploatTask
    extends Thread {
        private int taskNum;

        UploatTask(int taskNum) {
            this.taskNum = taskNum;
        }

        @Override
        public void run() {
            FFSClient client = FFSClient.getInstance("192.168.3.231:9169", "test" + this.taskNum, 1, BlockSize.BlockSize_1M, "", "");
            String tableName1 = "table" + this.taskNum + "_1";
            String tableName2 = "table" + this.taskNum + "_2";
            String key = "liang-";
            for (int i = 0; i < 2530; ++i) {
                HashMap<String, String> meta = new HashMap<String, String>();
                meta.put("name", "liangfeng" + i);
                byte[] value = new byte[10000];
                try {
                    client.putBlob(tableName1, key + i, value, meta);
                    client.putBlob(tableName2, key + i, value, meta);
                    continue;
                }
                catch (FFSException e) {
                    throw new RuntimeException(e);
                }
            }
        }
    }

    public static enum BlockSize implements IBlockSize
    {
        BlockSize_1M{

            @Override
            public int getBlockSize() {
                return 0x100000;
            }
        }
        ,
        BlockSize_2M{

            @Override
            public int getBlockSize() {
                return 0x200000;
            }
        }
        ,
        BlockSize_4M{

            @Override
            public int getBlockSize() {
                return 0x400000;
            }
        }
        ,
        BlockSize_5M{

            @Override
            public int getBlockSize() {
                return 0x500000;
            }
        };

    }

    public static interface IBlockSize {
        public int getBlockSize();
    }
}

