/*
 * Decompiled with CFR 0.152.
 */
package alluxio.worker.block;

import alluxio.Configuration;
import alluxio.Sessions;
import alluxio.exception.AlluxioException;
import alluxio.exception.BlockAlreadyExistsException;
import alluxio.exception.BlockDoesNotExistException;
import alluxio.exception.ConnectionFailedException;
import alluxio.exception.InvalidWorkerStateException;
import alluxio.exception.WorkerOutOfSpaceException;
import alluxio.heartbeat.HeartbeatExecutor;
import alluxio.heartbeat.HeartbeatThread;
import alluxio.thrift.BlockWorkerClientService;
import alluxio.util.CommonUtils;
import alluxio.util.ThreadFactoryUtils;
import alluxio.util.io.FileUtils;
import alluxio.util.network.NetworkAddressUtils;
import alluxio.wire.FileInfo;
import alluxio.wire.WorkerNetAddress;
import alluxio.worker.AbstractWorker;
import alluxio.worker.DataServer;
import alluxio.worker.WorkerContext;
import alluxio.worker.WorkerIdRegistry;
import alluxio.worker.block.BlockHeartbeatReport;
import alluxio.worker.block.BlockHeartbeatReporter;
import alluxio.worker.block.BlockMasterClient;
import alluxio.worker.block.BlockMasterSync;
import alluxio.worker.block.BlockMetricsReporter;
import alluxio.worker.block.BlockStore;
import alluxio.worker.block.BlockStoreLocation;
import alluxio.worker.block.BlockStoreMeta;
import alluxio.worker.block.BlockWorkerClientServiceHandler;
import alluxio.worker.block.PinListSync;
import alluxio.worker.block.SessionCleaner;
import alluxio.worker.block.SpaceReserver;
import alluxio.worker.block.TieredBlockStore;
import alluxio.worker.block.io.BlockReader;
import alluxio.worker.block.io.BlockWriter;
import alluxio.worker.block.meta.BlockMeta;
import alluxio.worker.block.meta.TempBlockMeta;
import alluxio.worker.file.FileSystemMasterClient;
import com.google.common.base.Throwables;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.thrift.TProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
public final class BlockWorker
extends AbstractWorker {
    private static final Logger LOG = LoggerFactory.getLogger((String)"alluxio.logger.type");
    private BlockMasterSync mBlockMasterSync;
    private PinListSync mPinListSync;
    private SessionCleaner mSessionCleanerThread;
    private final BlockWorkerClientServiceHandler mServiceHandler;
    private final DataServer mDataServer;
    private final BlockMasterClient mBlockMasterClient;
    private final FileSystemMasterClient mFileSystemMasterClient;
    private final Configuration mConf = WorkerContext.getConf();
    private SpaceReserver mSpaceReserver = null;
    private BlockHeartbeatReporter mHeartbeatReporter;
    private BlockMetricsReporter mMetricsReporter;
    private Sessions mSessions;
    private BlockStore mBlockStore;

    public BlockStore getBlockStore() {
        return this.mBlockStore;
    }

    public BlockWorkerClientServiceHandler getWorkerServiceHandler() {
        return this.mServiceHandler;
    }

    public String getDataBindHost() {
        return this.mDataServer.getBindHost();
    }

    public int getDataLocalPort() {
        return this.mDataServer.getPort();
    }

    public BlockWorker() throws IOException {
        super(Executors.newFixedThreadPool(4, ThreadFactoryUtils.build((String)"block-worker-heartbeat-%d", (boolean)true)));
        this.mBlockMasterClient = new BlockMasterClient(NetworkAddressUtils.getConnectAddress((NetworkAddressUtils.ServiceType)NetworkAddressUtils.ServiceType.MASTER_RPC, (Configuration)this.mConf), this.mConf);
        this.mFileSystemMasterClient = new FileSystemMasterClient(NetworkAddressUtils.getConnectAddress((NetworkAddressUtils.ServiceType)NetworkAddressUtils.ServiceType.MASTER_RPC, (Configuration)this.mConf), this.mConf);
        this.mDataServer = DataServer.Factory.create(NetworkAddressUtils.getBindAddress((NetworkAddressUtils.ServiceType)NetworkAddressUtils.ServiceType.WORKER_DATA, (Configuration)this.mConf), this, this.mConf);
        this.mConf.set("alluxio.worker.data.port", Integer.toString(this.mDataServer.getPort()));
        this.mServiceHandler = new BlockWorkerClientServiceHandler(this);
        this.mHeartbeatReporter = new BlockHeartbeatReporter();
        this.mMetricsReporter = new BlockMetricsReporter(WorkerContext.getWorkerSource());
        this.mSessions = new Sessions();
        this.mBlockStore = new TieredBlockStore();
        this.mBlockStore.registerBlockStoreEventListener(this.mHeartbeatReporter);
        this.mBlockStore.registerBlockStoreEventListener(this.mMetricsReporter);
    }

    @Override
    public Map<String, TProcessor> getServices() {
        HashMap<String, TProcessor> services = new HashMap<String, TProcessor>();
        services.put("BlockWorkerClient", (TProcessor)new BlockWorkerClientService.Processor((BlockWorkerClientService.Iface)this.getWorkerServiceHandler()));
        return services;
    }

    @Override
    public void start() throws IOException {
        WorkerNetAddress netAddress;
        try {
            netAddress = WorkerContext.getNetAddress();
            WorkerIdRegistry.registerWithBlockMaster(this.mBlockMasterClient, netAddress);
        }
        catch (ConnectionFailedException e) {
            LOG.error("Failed to get a worker id from block master", (Throwable)e);
            throw Throwables.propagate((Throwable)e);
        }
        this.mBlockMasterSync = new BlockMasterSync(this, netAddress, this.mBlockMasterClient);
        this.mPinListSync = new PinListSync(this, this.mFileSystemMasterClient);
        this.mSessionCleanerThread = new SessionCleaner(this);
        if (this.mConf.getBoolean("alluxio.worker.tieredstore.reserver.enabled")) {
            this.mSpaceReserver = new SpaceReserver(this);
        }
        this.getExecutorService().submit((Runnable)new HeartbeatThread("Worker Block Sync", (HeartbeatExecutor)this.mBlockMasterSync, (long)WorkerContext.getConf().getInt("alluxio.worker.block.heartbeat.interval.ms")));
        this.getExecutorService().submit((Runnable)new HeartbeatThread("Worker Pin List Sync", (HeartbeatExecutor)this.mPinListSync, (long)WorkerContext.getConf().getInt("alluxio.worker.block.heartbeat.interval.ms")));
        this.getExecutorService().submit(this.mSessionCleanerThread);
        if (this.mSpaceReserver != null) {
            this.getExecutorService().submit(this.mSpaceReserver);
        }
    }

    @Override
    public void stop() throws IOException {
        this.mDataServer.close();
        this.mSessionCleanerThread.stop();
        this.mBlockMasterClient.close();
        if (this.mSpaceReserver != null) {
            this.mSpaceReserver.stop();
        }
        this.mFileSystemMasterClient.close();
        this.getExecutorService().shutdownNow();
        while (!this.mDataServer.isClosed()) {
            this.mDataServer.close();
            CommonUtils.sleepMs((long)100L);
        }
    }

    public void abortBlock(long sessionId, long blockId) throws BlockAlreadyExistsException, BlockDoesNotExistException, InvalidWorkerStateException, IOException {
        this.mBlockStore.abortBlock(sessionId, blockId);
    }

    public void accessBlock(long sessionId, long blockId) throws BlockDoesNotExistException {
        this.mBlockStore.accessBlock(sessionId, blockId);
    }

    public void cleanupSessions() {
        for (long session : this.mSessions.getTimedOutSessions()) {
            this.mSessions.removeSession(session);
            this.mBlockStore.cleanupSession(session);
        }
    }

    public void commitBlock(long sessionId, long blockId) throws BlockAlreadyExistsException, BlockDoesNotExistException, InvalidWorkerStateException, IOException, WorkerOutOfSpaceException {
        this.mBlockStore.commitBlock(sessionId, blockId);
        Long lockId = this.mBlockStore.lockBlock(sessionId, blockId);
        try {
            BlockMeta meta = this.mBlockStore.getBlockMeta(sessionId, blockId, lockId);
            BlockStoreLocation loc = meta.getBlockLocation();
            Long length = meta.getBlockSize();
            BlockStoreMeta storeMeta = this.mBlockStore.getBlockStoreMeta();
            Long bytesUsedOnTier = storeMeta.getUsedBytesOnTiers().get(loc.tierAlias());
            this.mBlockMasterClient.commitBlock(WorkerIdRegistry.getWorkerId(), bytesUsedOnTier, loc.tierAlias(), blockId, length);
        }
        catch (IOException ioe) {
            throw new IOException("Failed to commit block to master.", ioe);
        }
        catch (ConnectionFailedException e) {
            throw new IOException("Failed to commit block to master.", e);
        }
        finally {
            this.mBlockStore.unlockBlock(lockId);
        }
    }

    public String createBlock(long sessionId, long blockId, String tierAlias, long initialBytes) throws BlockAlreadyExistsException, WorkerOutOfSpaceException, IOException {
        BlockStoreLocation loc = BlockStoreLocation.anyDirInTier(tierAlias);
        TempBlockMeta createdBlock = this.mBlockStore.createBlockMeta(sessionId, blockId, loc, initialBytes);
        return createdBlock.getPath();
    }

    public void createBlockRemote(long sessionId, long blockId, String tierAlias, long initialBytes) throws BlockAlreadyExistsException, WorkerOutOfSpaceException, IOException {
        BlockStoreLocation loc = BlockStoreLocation.anyDirInTier(tierAlias);
        TempBlockMeta createdBlock = this.mBlockStore.createBlockMeta(sessionId, blockId, loc, initialBytes);
        FileUtils.createBlockPath((String)createdBlock.getPath());
    }

    public void freeSpace(long sessionId, long availableBytes, String tierAlias) throws WorkerOutOfSpaceException, BlockDoesNotExistException, IOException, BlockAlreadyExistsException, InvalidWorkerStateException {
        BlockStoreLocation location = BlockStoreLocation.anyDirInTier(tierAlias);
        this.mBlockStore.freeSpace(sessionId, availableBytes, location);
    }

    public BlockWriter getTempBlockWriterRemote(long sessionId, long blockId) throws BlockDoesNotExistException, IOException {
        return this.mBlockStore.getBlockWriter(sessionId, blockId);
    }

    public BlockHeartbeatReport getReport() {
        return this.mHeartbeatReporter.generateReport();
    }

    public BlockStoreMeta getStoreMeta() {
        return this.mBlockStore.getBlockStoreMeta();
    }

    public BlockMeta getVolatileBlockMeta(long blockId) throws BlockDoesNotExistException {
        return this.mBlockStore.getVolatileBlockMeta(blockId);
    }

    public boolean hasBlockMeta(long blockId) {
        return this.mBlockStore.hasBlockMeta(blockId);
    }

    public long lockBlock(long sessionId, long blockId) throws BlockDoesNotExistException {
        return this.mBlockStore.lockBlock(sessionId, blockId);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void moveBlock(long sessionId, long blockId, String tierAlias) throws BlockDoesNotExistException, BlockAlreadyExistsException, InvalidWorkerStateException, WorkerOutOfSpaceException, IOException {
        BlockStoreLocation dst = BlockStoreLocation.anyDirInTier(tierAlias);
        long lockId = this.mBlockStore.lockBlock(sessionId, blockId);
        try {
            BlockMeta meta = this.mBlockStore.getBlockMeta(sessionId, blockId, lockId);
            if (meta.getBlockLocation().belongsTo(dst)) {
                return;
            }
        }
        finally {
            this.mBlockStore.unlockBlock(lockId);
        }
        this.mBlockStore.moveBlock(sessionId, blockId, dst);
    }

    public String readBlock(long sessionId, long blockId, long lockId) throws BlockDoesNotExistException, InvalidWorkerStateException {
        BlockMeta meta = this.mBlockStore.getBlockMeta(sessionId, blockId, lockId);
        return meta.getPath();
    }

    public BlockReader readBlockRemote(long sessionId, long blockId, long lockId) throws BlockDoesNotExistException, InvalidWorkerStateException, IOException {
        return this.mBlockStore.getBlockReader(sessionId, blockId, lockId);
    }

    public void removeBlock(long sessionId, long blockId) throws InvalidWorkerStateException, BlockDoesNotExistException, IOException {
        this.mBlockStore.removeBlock(sessionId, blockId);
    }

    public void requestSpace(long sessionId, long blockId, long additionalBytes) throws BlockDoesNotExistException, WorkerOutOfSpaceException, IOException {
        this.mBlockStore.requestSpace(sessionId, blockId, additionalBytes);
    }

    public void unlockBlock(long lockId) throws BlockDoesNotExistException {
        this.mBlockStore.unlockBlock(lockId);
    }

    public void unlockBlock(long sessionId, long blockId) throws BlockDoesNotExistException {
        this.mBlockStore.unlockBlock(sessionId, blockId);
    }

    public void sessionHeartbeat(long sessionId, List<Long> metrics) {
        this.mSessions.sessionHeartbeat(sessionId);
        this.mMetricsReporter.updateClientMetrics(metrics);
    }

    public void updatePinList(Set<Long> pinnedInodes) {
        this.mBlockStore.updatePinnedInodes(pinnedInodes);
    }

    public FileInfo getFileInfo(long fileId) throws IOException {
        try {
            return this.mFileSystemMasterClient.getFileInfo(fileId);
        }
        catch (AlluxioException e) {
            throw new IOException(e);
        }
    }
}

