/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.streaming.util;

import java.io.EOFException;
import java.io.Serializable;
import java.nio.ByteBuffer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.util.BatchedWriteAheadLog;
import org.apache.spark.streaming.util.FileBasedWriteAheadLog;
import org.apache.spark.streaming.util.FileBasedWriteAheadLogReader;
import org.apache.spark.streaming.util.FileBasedWriteAheadLogSegment;
import org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter;
import org.apache.spark.streaming.util.HdfsUtils$;
import org.apache.spark.streaming.util.WriteAheadLog;
import org.apache.spark.util.ManualClock;
import org.apache.spark.util.Utils$;
import scala.Array$;
import scala.Function1;
import scala.Predef;
import scala.Predef$;
import scala.collection.Iterator;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.immutable.IndexedSeq$;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.ArrayBuffer;
import scala.collection.mutable.ArrayOps;
import scala.math.Ordering;
import scala.reflect.ClassTag$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;
import scala.runtime.ScalaRunTime$;

public final class WriteAheadLogSuite$ {
    public static WriteAheadLogSuite$ MODULE$;
    private final Configuration hadoopConf;

    static {
        new WriteAheadLogSuite$();
    }

    private Configuration hadoopConf() {
        return this.hadoopConf;
    }

    public Seq<FileBasedWriteAheadLogSegment> writeDataManually(Seq<String> data, String file, boolean allowBatching) {
        ArrayBuffer segments = new ArrayBuffer();
        FSDataOutputStream writer = HdfsUtils$.MODULE$.getOutputStream(file, this.hadoopConf());
        if (allowBatching) {
            WriteAheadLogSuite$.writeToStream$1(this.wrapArrayArrayByte(data.toArray(ClassTag$.MODULE$.apply(String.class))).array(), writer, segments, file);
        } else {
            data.foreach((Function1 & Serializable & scala.Serializable)item -> {
                WriteAheadLogSuite$.writeToStream$1(Utils$.MODULE$.serialize((Object)item), writer, segments, file);
                return BoxedUnit.UNIT;
            });
        }
        writer.close();
        return segments;
    }

    public Seq<FileBasedWriteAheadLogSegment> writeDataUsingWriter(String filePath, Seq<String> data) {
        FileBasedWriteAheadLogWriter writer = new FileBasedWriteAheadLogWriter(filePath, this.hadoopConf());
        Seq segments = (Seq)data.map((Function1 & Serializable & scala.Serializable)item -> writer.write(MODULE$.stringToByteBuffer((String)item)), Seq$.MODULE$.canBuildFrom());
        writer.close();
        return segments;
    }

    public WriteAheadLog writeDataUsingWriteAheadLog(String logDirectory, Seq<String> data, boolean closeFileAfterWrite, boolean allowBatching, ManualClock manualClock, boolean closeLog, int clockAdvanceTime) {
        WriteAheadLog wal;
        block1: {
            if (manualClock.getTimeMillis() < 100000L) {
                manualClock.setTime(10000L);
            }
            wal = this.createWriteAheadLog(logDirectory, closeFileAfterWrite, allowBatching);
            data.foreach((Function1 & Serializable & scala.Serializable)item -> {
                manualClock.advance((long)clockAdvanceTime);
                return wal.write(MODULE$.stringToByteBuffer((String)item), manualClock.getTimeMillis());
            });
            if (!closeLog) break block1;
            wal.close();
        }
        return wal;
    }

    public ManualClock writeDataUsingWriteAheadLog$default$5() {
        return new ManualClock();
    }

    public boolean writeDataUsingWriteAheadLog$default$6() {
        return true;
    }

    public int writeDataUsingWriteAheadLog$default$7() {
        return 500;
    }

    public Seq<String> readDataManually(Seq<FileBasedWriteAheadLogSegment> segments) {
        return (Seq)segments.map((Function1 & Serializable & scala.Serializable)segment -> {
            void v0;
            try (FSDataInputStream reader = HdfsUtils$.MODULE$.getInputStream(segment.path(), MODULE$.hadoopConf());){
                void var3_3;
                reader.seek(segment.offset());
                byte[] bytes = new byte[segment.length()];
                reader.readInt();
                reader.readFully(bytes);
                String data = (String)Utils$.MODULE$.deserialize(bytes);
                reader.close();
                v0 = var3_3;
            }
            return v0;
        }, Seq$.MODULE$.canBuildFrom());
    }

    /*
     * WARNING - void declaration
     */
    public <T> Seq<T> readDataManually(String file) {
        void var3_3;
        ArrayBuffer buffer = new ArrayBuffer();
        try (FSDataInputStream reader = HdfsUtils$.MODULE$.getInputStream(file, this.hadoopConf());){
            try {
                while (true) {
                    int length = reader.readInt();
                    byte[] bytes = new byte[length];
                    reader.read(bytes);
                    buffer.$plus$eq(Utils$.MODULE$.deserialize(bytes));
                }
            }
            catch (EOFException ex) {
            }
        }
        return var3_3;
    }

    /*
     * WARNING - void declaration
     */
    public Seq<String> readDataUsingReader(String file) {
        void var3_3;
        FileBasedWriteAheadLogReader reader = new FileBasedWriteAheadLogReader(file, this.hadoopConf());
        List readData = (List)reader.toList().map((Function1 & Serializable & scala.Serializable)byteBuffer -> MODULE$.byteBufferToString((ByteBuffer)byteBuffer), List$.MODULE$.canBuildFrom());
        reader.close();
        return var3_3;
    }

    public Seq<String> readDataUsingWriteAheadLog(String logDirectory, boolean closeFileAfterWrite, boolean allowBatching) {
        WriteAheadLog wal = this.createWriteAheadLog(logDirectory, closeFileAfterWrite, allowBatching);
        String[] data = (String[])((Iterator)JavaConverters$.MODULE$.asScalaIteratorConverter(wal.readAll()).asScala()).map((Function1 & Serializable & scala.Serializable)byteBuffer -> MODULE$.byteBufferToString((ByteBuffer)byteBuffer)).toArray(ClassTag$.MODULE$.apply(String.class));
        wal.close();
        return Predef$.MODULE$.wrapRefArray((Object[])data);
    }

    public Seq<String> getLogFilesInDirectory(String directory) {
        Path logDirectoryPath = new Path(directory);
        FileSystem fileSystem = HdfsUtils$.MODULE$.getFileSystemForPath(logDirectoryPath, this.hadoopConf());
        return fileSystem.exists(logDirectoryPath) && fileSystem.getFileStatus(logDirectoryPath).isDirectory() ? (Seq)new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])fileSystem.listStatus(logDirectoryPath))).map((Function1 & Serializable & scala.Serializable)x$4 -> x$4.getPath(), Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(Path.class))))).sortBy((Function1 & Serializable & scala.Serializable)x$5 -> BoxesRunTime.boxToLong((long)WriteAheadLogSuite$.$anonfun$getLogFilesInDirectory$2(x$5)), (Ordering)Ordering.Long$.MODULE$))).map((Function1 & Serializable & scala.Serializable)x$6 -> new StringOps(Predef$.MODULE$.augmentString(x$6.toString())).stripPrefix("file:"), Array$.MODULE$.fallbackCanBuildFrom(Predef.DummyImplicit$.MODULE$.dummyImplicit())) : (Seq)Seq$.MODULE$.empty();
    }

    public WriteAheadLog createWriteAheadLog(String logDirectory, boolean closeFileAfterWrite, boolean allowBatching) {
        SparkConf sparkConf = new SparkConf();
        FileBasedWriteAheadLog wal = new FileBasedWriteAheadLog(sparkConf, logDirectory, this.hadoopConf(), 1, 1, closeFileAfterWrite);
        return allowBatching ? new BatchedWriteAheadLog((WriteAheadLog)wal, sparkConf) : wal;
    }

    public Seq<String> generateRandomData() {
        return (Seq)RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), 100).map((Function1 & Serializable & scala.Serializable)x$7 -> WriteAheadLogSuite$.$anonfun$generateRandomData$1(BoxesRunTime.unboxToInt((Object)x$7)), IndexedSeq$.MODULE$.canBuildFrom());
    }

    public Seq<String> readAndDeserializeDataManually(Seq<String> logFiles, boolean allowBatching) {
        return allowBatching ? (Seq)logFiles.flatMap((Function1 & Serializable & scala.Serializable)file -> {
            Seq data = MODULE$.readDataManually((String)file);
            return (Seq)data.flatMap((Function1 & Serializable & scala.Serializable)byteArray -> new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])byteArray)).map((Function1 & Serializable & scala.Serializable)bytes -> (String)Utils$.MODULE$.deserialize(bytes), Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(String.class))))), Seq$.MODULE$.canBuildFrom());
        }, Seq$.MODULE$.canBuildFrom()) : (Seq)logFiles.flatMap((Function1 & Serializable & scala.Serializable)file -> MODULE$.readDataManually((String)file), Seq$.MODULE$.canBuildFrom());
    }

    public ByteBuffer stringToByteBuffer(String str) {
        return ByteBuffer.wrap(Utils$.MODULE$.serialize((Object)str));
    }

    public String byteBufferToString(ByteBuffer byteBuffer) {
        return (String)Utils$.MODULE$.deserialize(byteBuffer.array());
    }

    public <T> ByteBuffer wrapArrayArrayByte(Object records) {
        return ByteBuffer.wrap(Utils$.MODULE$.serialize(Predef$.MODULE$.genericArrayOps(records).map((Function1 & Serializable & scala.Serializable)o -> Utils$.MODULE$.serialize(o), Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(ScalaRunTime$.MODULE$.arrayClass(Byte.TYPE))))));
    }

    private static final void writeToStream$1(byte[] bytes, FSDataOutputStream writer$2, ArrayBuffer segments$1, String file$1) {
        long offset = writer$2.getPos();
        writer$2.writeInt(new ArrayOps.ofByte(Predef$.MODULE$.byteArrayOps(bytes)).size());
        writer$2.write(bytes);
        segments$1.$plus$eq((Object)new FileBasedWriteAheadLogSegment(file$1, offset, new ArrayOps.ofByte(Predef$.MODULE$.byteArrayOps(bytes)).size()));
    }

    public static final /* synthetic */ long $anonfun$getLogFilesInDirectory$2(Path x$5) {
        return new StringOps(Predef$.MODULE$.augmentString(x$5.getName().split("-")[1])).toLong();
    }

    public static final /* synthetic */ String $anonfun$generateRandomData$1(int x$7) {
        return ((Object)BoxesRunTime.boxToInteger((int)x$7)).toString();
    }

    private WriteAheadLogSuite$() {
        MODULE$ = this;
        this.hadoopConf = new Configuration();
    }
}

