package org.apache.hudi.timeline.service;

import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Opcodes;
import io.javalin.Javalin;
import io.javalin.core.util.JettyServerUtil;
import java.io.IOException;
import java.io.Serializable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hudi.com.beust.jcommander.JCommander;
import org.apache.hudi.com.beust.jcommander.Parameter;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.view.FileSystemViewManager;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.FileSystemViewStorageType;
import org.apache.hudi.org.apache.jetty.server.Server;
import org.apache.hudi.org.apache.jetty.util.thread.QueuedThreadPool;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

/* loaded from: input_file:org/apache/hudi/timeline/service/TimelineService.class */
public class TimelineService {
    private static final Logger LOG = LogManager.getLogger(TimelineService.class);
    private static final int START_SERVICE_MAX_RETRIES = 16;
    private static final int DEFAULT_NUM_THREADS = -1;
    private int serverPort;
    private Config timelineServerConf;
    private Configuration conf;
    private transient HoodieEngineContext context;
    private transient FileSystem fs;
    private transient Javalin app = null;
    private transient FileSystemViewManager fsViewsManager;
    private transient RequestHandler requestHandler;

    /* loaded from: input_file:org/apache/hudi/timeline/service/TimelineService$Config.class */
    public static class Config implements Serializable {

        @Parameter(names = {"--server-port", "-p"}, description = " Server Port")
        public Integer serverPort = 26754;

        @Parameter(names = {"--view-storage", "-st"}, description = "View Storage Type. Default - SPILLABLE_DISK")
        public FileSystemViewStorageType viewStorageType = FileSystemViewStorageType.SPILLABLE_DISK;

        @Parameter(names = {"--max-view-mem-per-table", "-mv"}, description = "Maximum view memory per table in MB to be used for storing file-groups. Overflow file-groups will be spilled to disk. Used for SPILLABLE_DISK storage type")
        public Integer maxViewMemPerTableInMB = 2048;

        @Parameter(names = {"--mem-overhead-fraction-pending-compaction", "-cf"}, description = "Memory Fraction of --max-view-mem-per-table to be allocated for managing pending compaction storage. Overflow entries will be spilled to disk. Used for SPILLABLE_DISK storage type")
        public Double memFractionForCompactionPerTable = Double.valueOf(0.001d);

        @Parameter(names = {"--base-store-path", "-sp"}, description = "Directory where spilled view entries will be stored. Used for SPILLABLE_DISK storage type")
        public String baseStorePathForFileGroups = FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue();

        @Parameter(names = {"--rocksdb-path", "-rp"}, description = "Root directory for RocksDB")
        public String rocksDBPath = FileSystemViewStorageConfig.ROCKSDB_BASE_PATH.defaultValue();

        @Parameter(names = {"--threads", "-t"}, description = "Number of threads to use for serving requests")
        public int numThreads = -1;

        @Parameter(names = {"--async"}, description = "Use asyncronous request processing")
        public boolean async = false;

        @Parameter(names = {"--compress"}, description = "Compress output using gzip")
        public boolean compress = true;

        @Parameter(names = {"--enable-marker-requests", "-em"}, description = "Enable handling of marker-related requests")
        public boolean enableMarkerRequests = false;

        @Parameter(names = {"--marker-batch-threads", "-mbt"}, description = "Number of threads to use for batch processing marker creation requests")
        public int markerBatchNumThreads = 20;

        @Parameter(names = {"--marker-batch-interval-ms", "-mbi"}, description = "The interval in milliseconds between two batch processing of marker creation requests")
        public long markerBatchIntervalMs = 50;

        @Parameter(names = {"--marker-parallelism", "-mdp"}, description = "Parallelism to use for reading and deleting marker files")
        public int markerParallelism = 100;

        @Parameter(names = {"--help", "-h"})
        public Boolean help = false;

        /* loaded from: input_file:org/apache/hudi/timeline/service/TimelineService$Config$Builder.class */
        public static class Builder {
            private Integer serverPort = 26754;
            private FileSystemViewStorageType viewStorageType = FileSystemViewStorageType.SPILLABLE_DISK;
            private Integer maxViewMemPerTableInMB = 2048;
            private Double memFractionForCompactionPerTable = Double.valueOf(0.001d);
            private String baseStorePathForFileGroups = FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue();
            private String rocksDBPath = FileSystemViewStorageConfig.ROCKSDB_BASE_PATH.defaultValue();
            private int numThreads = -1;
            private boolean async = false;
            private boolean compress = true;
            private boolean enableMarkerRequests = false;
            private int markerBatchNumThreads = 20;
            private long markerBatchIntervalMs = 50;
            private int markerParallelism = 100;

            public Builder serverPort(int i) {
                this.serverPort = Integer.valueOf(i);
                return this;
            }

            public Builder viewStorageType(FileSystemViewStorageType fileSystemViewStorageType) {
                this.viewStorageType = fileSystemViewStorageType;
                return this;
            }

            public Builder maxViewMemPerTableInMB(int i) {
                this.maxViewMemPerTableInMB = Integer.valueOf(i);
                return this;
            }

            public Builder memFractionForCompactionPerTable(double d) {
                this.memFractionForCompactionPerTable = Double.valueOf(d);
                return this;
            }

            public Builder baseStorePathForFileGroups(String str) {
                this.baseStorePathForFileGroups = str;
                return this;
            }

            public Builder rocksDBPath(String str) {
                this.rocksDBPath = str;
                return this;
            }

            public Builder numThreads(int i) {
                this.numThreads = i;
                return this;
            }

            public Builder async(boolean z) {
                this.async = z;
                return this;
            }

            public Builder compress(boolean z) {
                this.compress = z;
                return this;
            }

            public Builder enableMarkerRequests(boolean z) {
                this.enableMarkerRequests = z;
                return this;
            }

            public Builder markerBatchNumThreads(int i) {
                this.markerBatchNumThreads = i;
                return this;
            }

            public Builder markerBatchIntervalMs(long j) {
                this.markerBatchIntervalMs = j;
                return this;
            }

            public Builder markerParallelism(int i) {
                this.markerParallelism = i;
                return this;
            }

            public Config build() {
                Config config = new Config();
                config.serverPort = this.serverPort;
                config.viewStorageType = this.viewStorageType;
                config.maxViewMemPerTableInMB = this.maxViewMemPerTableInMB;
                config.memFractionForCompactionPerTable = this.memFractionForCompactionPerTable;
                config.baseStorePathForFileGroups = this.baseStorePathForFileGroups;
                config.rocksDBPath = this.rocksDBPath;
                config.numThreads = this.numThreads;
                config.async = this.async;
                config.compress = this.compress;
                config.enableMarkerRequests = this.enableMarkerRequests;
                config.markerBatchNumThreads = this.markerBatchNumThreads;
                config.markerBatchIntervalMs = this.markerBatchIntervalMs;
                config.markerParallelism = this.markerParallelism;
                return config;
            }
        }

        public static Builder builder() {
            return new Builder();
        }
    }

    public int getServerPort() {
        return this.serverPort;
    }

    public TimelineService(HoodieEngineContext hoodieEngineContext, Configuration configuration, Config config, FileSystem fileSystem, FileSystemViewManager fileSystemViewManager) throws IOException {
        this.conf = FSUtils.prepareHadoopConf(configuration);
        this.timelineServerConf = config;
        this.serverPort = config.serverPort.intValue();
        this.context = hoodieEngineContext;
        this.fs = fileSystem;
        this.fsViewsManager = fileSystemViewManager;
    }

    private int startServiceOnPort(int i) throws IOException {
        if (i != 0 && (1024 > i || i >= 65536)) {
            throw new IllegalArgumentException(String.format("startPort should be between 1024 and 65535 (inclusive), or 0 for a random free port. but now is %s.", Integer.valueOf(i)));
        }
        for (int i2 = 0; i2 < 16; i2++) {
            int i3 = i == 0 ? i : (((i + i2) - Opcodes.ACC_ABSTRACT) % 64512) + Opcodes.ACC_ABSTRACT;
            try {
                this.app.start(i3);
                return this.app.mo89port();
            } catch (Exception e) {
                if (e.getMessage() == null || !e.getMessage().contains("Failed to bind to")) {
                    LOG.warn(String.format("Timeline server start failed on port %d. Attempting port %d + 1.", Integer.valueOf(i3), Integer.valueOf(i3)), e);
                } else if (i3 == 0) {
                    LOG.warn("Timeline server could not bind on a random free port.");
                } else {
                    LOG.warn(String.format("Timeline server could not bind on port %d. Attempting port %d + 1.", Integer.valueOf(i3), Integer.valueOf(i3)));
                }
            }
        }
        throw new IOException(String.format("Timeline server start failed on port %d, after retry %d times", Integer.valueOf(i), 16));
    }

    public int startService() throws IOException {
        Server defaultServer = this.timelineServerConf.numThreads == -1 ? JettyServerUtil.defaultServer() : new Server(new QueuedThreadPool(this.timelineServerConf.numThreads));
        this.app = Javalin.create().mo91server(() -> {
            return defaultServer;
        });
        if (!this.timelineServerConf.compress) {
            this.app.disableDynamicGzip();
        }
        this.requestHandler = new RequestHandler(this.app, this.conf, this.timelineServerConf, this.context, this.fs, this.fsViewsManager);
        this.app.get("/", context -> {
            context.result("Hello Hudi");
        });
        this.requestHandler.register();
        int startServiceOnPort = startServiceOnPort(this.serverPort);
        LOG.info("Starting Timeline server on port :" + startServiceOnPort);
        this.serverPort = startServiceOnPort;
        return startServiceOnPort;
    }

    public void run() throws IOException {
        startService();
    }

    public static FileSystemViewManager buildFileSystemViewManager(Config config, SerializableConfiguration serializableConfiguration) {
        HoodieLocalEngineContext hoodieLocalEngineContext = new HoodieLocalEngineContext(serializableConfiguration.get());
        HoodieMetadataConfig build = HoodieMetadataConfig.newBuilder().build();
        HoodieCommonConfig build2 = HoodieCommonConfig.newBuilder().build();
        switch (config.viewStorageType) {
            case MEMORY:
                FileSystemViewStorageConfig.Builder newBuilder = FileSystemViewStorageConfig.newBuilder();
                newBuilder.withStorageType(FileSystemViewStorageType.MEMORY);
                return FileSystemViewManager.createViewManager(hoodieLocalEngineContext, build, newBuilder.build(), build2);
            case SPILLABLE_DISK:
                FileSystemViewStorageConfig.Builder newBuilder2 = FileSystemViewStorageConfig.newBuilder();
                newBuilder2.withStorageType(FileSystemViewStorageType.SPILLABLE_DISK).withBaseStoreDir(config.baseStorePathForFileGroups).withMaxMemoryForView(Long.valueOf(config.maxViewMemPerTableInMB.intValue() * Opcodes.ACC_ABSTRACT * 1024)).withMemFractionForPendingCompaction(config.memFractionForCompactionPerTable);
                return FileSystemViewManager.createViewManager(hoodieLocalEngineContext, build, newBuilder2.build(), build2);
            case EMBEDDED_KV_STORE:
                FileSystemViewStorageConfig.Builder newBuilder3 = FileSystemViewStorageConfig.newBuilder();
                newBuilder3.withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE).withRocksDBPath(config.rocksDBPath);
                return FileSystemViewManager.createViewManager(hoodieLocalEngineContext, build, newBuilder3.build(), build2);
            default:
                throw new IllegalArgumentException("Invalid view manager storage type :" + config.viewStorageType);
        }
    }

    public void close() {
        LOG.info("Closing Timeline Service");
        if (this.requestHandler != null) {
            this.requestHandler.stop();
        }
        if (this.app != null) {
            this.app.mo94stop();
            this.app = null;
        }
        this.fsViewsManager.close();
        LOG.info("Closed Timeline Service");
    }

    public Configuration getConf() {
        return this.conf;
    }

    public FileSystem getFs() {
        return this.fs;
    }

    public static void main(String[] strArr) throws Exception {
        Config config = new Config();
        JCommander jCommander = new JCommander(config, null, strArr);
        if (config.help.booleanValue()) {
            jCommander.usage();
            System.exit(1);
        }
        new TimelineService(new HoodieLocalEngineContext(FSUtils.prepareHadoopConf(new Configuration())), new Configuration(), config, FileSystem.get(new Configuration()), buildFileSystemViewManager(config, new SerializableConfiguration(FSUtils.prepareHadoopConf(new Configuration())))).run();
    }
}
