/*
 * Decompiled with CFR 0.152.
 */
package alluxio.master.journal;

import alluxio.Configuration;
import alluxio.exception.ExceptionMessage;
import alluxio.master.MasterContext;
import alluxio.master.journal.Journal;
import alluxio.master.journal.JournalOutputStream;
import alluxio.proto.journal.Journal;
import alluxio.underfs.UnderFileSystem;
import com.google.common.base.Preconditions;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
public final class JournalWriter {
    private static final Logger LOG = LoggerFactory.getLogger((String)"alluxio.logger.type");
    private final Journal mJournal;
    private final String mJournalDirectory;
    private final String mCompletedDirectory;
    private final String mTempCheckpointPath;
    private final UnderFileSystem mUfs;
    private final long mMaxLogSize;
    private long mNextCompleteLogNumber = 1L;
    private CheckpointOutputStream mCheckpointOutputStream = null;
    private EntryOutputStream mEntryOutputStream = null;
    private long mNextEntrySequenceNumber = 1L;

    JournalWriter(Journal journal) {
        this.mJournal = (Journal)Preconditions.checkNotNull((Object)journal);
        this.mJournalDirectory = this.mJournal.getDirectory();
        this.mCompletedDirectory = this.mJournal.getCompletedDirectory();
        this.mTempCheckpointPath = this.mJournal.getCheckpointFilePath() + ".tmp";
        Configuration conf = MasterContext.getConf();
        this.mUfs = UnderFileSystem.get((String)this.mJournalDirectory, (Configuration)conf);
        this.mMaxLogSize = conf.getBytes("alluxio.master.journal.log.size.bytes.max");
    }

    public synchronized void completeAllLogs() throws IOException {
        LOG.info("Marking all logs as complete.");
        this.mNextCompleteLogNumber = 1L;
        String logFilename = this.mJournal.getCompletedLogFilePath(this.mNextCompleteLogNumber);
        while (this.mUfs.exists(logFilename)) {
            ++this.mNextCompleteLogNumber;
            logFilename = this.mJournal.getCompletedLogFilePath(this.mNextCompleteLogNumber);
        }
        this.completeCurrentLog();
    }

    public synchronized JournalOutputStream getCheckpointOutputStream(long latestSequenceNumber) throws IOException {
        if (this.mCheckpointOutputStream == null) {
            LOG.info("Creating tmp checkpoint file: {}", (Object)this.mTempCheckpointPath);
            if (!this.mUfs.exists(this.mJournalDirectory)) {
                LOG.info("Creating journal folder: {}", (Object)this.mJournalDirectory);
                this.mUfs.mkdirs(this.mJournalDirectory, true);
            }
            this.mNextEntrySequenceNumber = latestSequenceNumber + 1L;
            LOG.info("Latest journal sequence number: {} Next journal sequence number: {}", (Object)latestSequenceNumber, (Object)this.mNextEntrySequenceNumber);
            this.mCheckpointOutputStream = new CheckpointOutputStream(new DataOutputStream(this.mUfs.create(this.mTempCheckpointPath)));
        }
        return this.mCheckpointOutputStream;
    }

    public synchronized JournalOutputStream getEntryOutputStream() throws IOException {
        if (this.mCheckpointOutputStream == null || !this.mCheckpointOutputStream.isClosed()) {
            throw new IOException("The checkpoint must be written and closed before writing entries.");
        }
        if (this.mEntryOutputStream == null) {
            this.mEntryOutputStream = new EntryOutputStream(this.openCurrentLog());
        }
        return this.mEntryOutputStream;
    }

    public synchronized void close() throws IOException {
        if (this.mCheckpointOutputStream != null) {
            this.mCheckpointOutputStream.close();
        }
        if (this.mEntryOutputStream != null) {
            this.mEntryOutputStream.close();
        }
        this.mUfs.close();
    }

    private synchronized OutputStream openCurrentLog() throws IOException {
        String currentLogFile = this.mJournal.getCurrentLogFilePath();
        OutputStream os = this.mUfs.create(currentLogFile);
        LOG.info("Opened current log file: {}", (Object)currentLogFile);
        return os;
    }

    private synchronized void deleteCompletedLogs() throws IOException {
        LOG.info("Deleting all completed log files...");
        long logNumber = 1L;
        String logFilename = this.mJournal.getCompletedLogFilePath(logNumber);
        while (this.mUfs.exists(logFilename)) {
            LOG.info("Deleting completed log: {}", (Object)logFilename);
            this.mUfs.delete(logFilename, true);
            logFilename = this.mJournal.getCompletedLogFilePath(++logNumber);
        }
        LOG.info("Finished deleting all completed log files.");
        this.mNextCompleteLogNumber = 1L;
    }

    private synchronized void completeCurrentLog() throws IOException {
        String currentLog = this.mJournal.getCurrentLogFilePath();
        if (!this.mUfs.exists(currentLog)) {
            return;
        }
        if (!this.mUfs.exists(this.mCompletedDirectory)) {
            this.mUfs.mkdirs(this.mCompletedDirectory, true);
        }
        String completedLog = this.mJournal.getCompletedLogFilePath(this.mNextCompleteLogNumber);
        this.mUfs.rename(currentLog, completedLog);
        LOG.info("Completed current log: {} to completed log: {}", (Object)currentLog, (Object)completedLog);
        ++this.mNextCompleteLogNumber;
    }

    @ThreadSafe
    private class EntryOutputStream
    implements JournalOutputStream {
        private OutputStream mRawOutputStream;
        private DataOutputStream mDataOutputStream;
        private boolean mIsClosed = false;

        EntryOutputStream(OutputStream outputStream) {
            this.mRawOutputStream = outputStream;
            this.mDataOutputStream = new DataOutputStream(outputStream);
        }

        @Override
        public synchronized void writeEntry(Journal.JournalEntry entry) throws IOException {
            if (this.mIsClosed) {
                throw new IOException(ExceptionMessage.JOURNAL_WRITE_AFTER_CLOSE.getMessage(new Object[0]));
            }
            JournalWriter.this.mJournal.getJournalFormatter().serialize(entry.toBuilder().setSequenceNumber(JournalWriter.this.mNextEntrySequenceNumber++).build(), this.mDataOutputStream);
        }

        @Override
        public synchronized void close() throws IOException {
            if (this.mIsClosed) {
                return;
            }
            if (this.mDataOutputStream != null) {
                this.mDataOutputStream.close();
            }
            this.mIsClosed = true;
        }

        @Override
        public synchronized void flush() throws IOException {
            boolean overSize;
            if (this.mIsClosed) {
                return;
            }
            this.mDataOutputStream.flush();
            if (this.mRawOutputStream instanceof FSDataOutputStream) {
                ((FSDataOutputStream)this.mRawOutputStream).sync();
            }
            boolean bl = overSize = (long)this.mDataOutputStream.size() > JournalWriter.this.mMaxLogSize;
            if (overSize || JournalWriter.this.mUfs.getUnderFSType() == UnderFileSystem.UnderFSType.S3 || JournalWriter.this.mUfs.getUnderFSType() == UnderFileSystem.UnderFSType.OSS) {
                if (overSize) {
                    LOG.info("Rotating log file. size: {} maxSize: {}", (Object)this.mDataOutputStream.size(), (Object)JournalWriter.this.mMaxLogSize);
                }
                this.mDataOutputStream.close();
                JournalWriter.this.completeCurrentLog();
                this.mRawOutputStream = JournalWriter.this.openCurrentLog();
                this.mDataOutputStream = new DataOutputStream(this.mRawOutputStream);
            }
        }
    }

    private class CheckpointOutputStream
    implements JournalOutputStream {
        private final DataOutputStream mOutputStream;
        private boolean mIsClosed = false;

        CheckpointOutputStream(DataOutputStream outputStream) {
            this.mOutputStream = outputStream;
        }

        boolean isClosed() {
            return this.mIsClosed;
        }

        @Override
        public synchronized void writeEntry(Journal.JournalEntry entry) throws IOException {
            if (this.mIsClosed) {
                throw new IOException(ExceptionMessage.JOURNAL_WRITE_AFTER_CLOSE.getMessage(new Object[0]));
            }
            JournalWriter.this.mJournal.getJournalFormatter().serialize(entry.toBuilder().setSequenceNumber(JournalWriter.this.mNextEntrySequenceNumber++).build(), this.mOutputStream);
        }

        @Override
        public synchronized void close() throws IOException {
            if (this.mIsClosed) {
                return;
            }
            this.mOutputStream.flush();
            this.mOutputStream.close();
            LOG.info("Successfully created tmp checkpoint file: {}", (Object)JournalWriter.this.mTempCheckpointPath);
            JournalWriter.this.mUfs.delete(JournalWriter.this.mJournal.getCheckpointFilePath(), false);
            JournalWriter.this.mUfs.rename(JournalWriter.this.mTempCheckpointPath, JournalWriter.this.mJournal.getCheckpointFilePath());
            JournalWriter.this.mUfs.delete(JournalWriter.this.mTempCheckpointPath, false);
            LOG.info("Renamed checkpoint file {} to {}", (Object)JournalWriter.this.mTempCheckpointPath, (Object)JournalWriter.this.mJournal.getCheckpointFilePath());
            JournalWriter.this.deleteCompletedLogs();
            JournalWriter.this.completeCurrentLog();
            this.mIsClosed = true;
        }

        @Override
        public synchronized void flush() throws IOException {
            if (this.mIsClosed) {
                return;
            }
            this.mOutputStream.flush();
        }
    }
}

