package org.apache.beam.sdk.io;

import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.InstantCoder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.coders.StructuredCoder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Deduplicate;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Impulse;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.NameUtils;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.ValueWithRecordId;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.Cache;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
import org.apache.flink.api.python.shaded.org.joda.time.Duration;
import org.apache.flink.api.python.shaded.org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/sdk/io/Read.class */
public class Read {
    private static final int DEFAULT_DESIRED_NUM_SPLITS = 20;

    /* loaded from: input_file:org/apache/beam/sdk/io/Read$Bounded.class */
    public static class Bounded<T> extends PTransform<PBegin, PCollection<T>> {
        private final BoundedSource<T> source;

        private Bounded(String str, BoundedSource<T> boundedSource) {
            super(str);
            this.source = (BoundedSource) SerializableUtils.ensureSerializable(boundedSource);
        }

        @Override // org.apache.beam.sdk.transforms.PTransform
        public final PCollection<T> expand(PBegin pBegin) {
            this.source.validate();
            return ((PCollection) ((PCollection) ((PCollection) pBegin.getPipeline().apply(Impulse.create())).apply(ParDo.of(new OutputSingleSource(this.source)))).setCoder(SerializableCoder.of(new TypeDescriptor<BoundedSource<T>>() { // from class: org.apache.beam.sdk.io.Read.Bounded.1
            })).apply(ParDo.of(new BoundedSourceAsSDFWrapperFn()))).setCoder(this.source.getOutputCoder());
        }

        public BoundedSource<T> getSource() {
            return this.source;
        }

        @Override // org.apache.beam.sdk.transforms.PTransform
        public String getKindString() {
            return String.format("Read(%s)", NameUtils.approximateSimpleName(this.source));
        }

        @Override // org.apache.beam.sdk.transforms.PTransform, org.apache.beam.sdk.transforms.display.HasDisplayData
        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            builder.add(DisplayData.item("source", this.source.getClass()).withLabel("Read Source")).include("source", this.source);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/sdk/io/Read$BoundedSourceAsSDFWrapperFn.class */
    public static class BoundedSourceAsSDFWrapperFn<T> extends DoFn<BoundedSource<T>, T> {
        private static final Logger LOG = LoggerFactory.getLogger(BoundedSourceAsSDFWrapperFn.class);
        private static final long DEFAULT_DESIRED_BUNDLE_SIZE_BYTES = 67108864;

        /* loaded from: input_file:org/apache/beam/sdk/io/Read$BoundedSourceAsSDFWrapperFn$BoundedSourceAsSDFRestrictionTracker.class */
        private static class BoundedSourceAsSDFRestrictionTracker<T> extends RestrictionTracker<BoundedSource<T>, TimestampedValue<T>[]> {
            private final BoundedSource<T> initialRestriction;
            private final PipelineOptions pipelineOptions;
            private BoundedSource.BoundedReader<T> currentReader;
            private boolean claimedAll;

            BoundedSourceAsSDFRestrictionTracker(BoundedSource<T> boundedSource, PipelineOptions pipelineOptions) {
                this.initialRestriction = boundedSource;
                this.pipelineOptions = pipelineOptions;
            }

            /* JADX WARN: Removed duplicated region for block: B:40:0x00a3 A[EXC_TOP_SPLITTER, SYNTHETIC] */
            @Override // org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker
            /*
                Code decompiled incorrectly, please refer to instructions dump.
                To view partially-correct add '--show-bad-code' argument
            */
            public boolean tryClaim(org.apache.beam.sdk.values.TimestampedValue<T>[] r6) {
                /*
                    Method dump skipped, instructions count: 211
                    To view this dump add '--comments-level debug' option
                */
                throw new UnsupportedOperationException("Method not decompiled: org.apache.beam.sdk.io.Read.BoundedSourceAsSDFWrapperFn.BoundedSourceAsSDFRestrictionTracker.tryClaim(org.apache.beam.sdk.values.TimestampedValue[]):boolean");
            }

            protected void finalize() throws Throwable {
                if (this.currentReader != null) {
                    try {
                        this.currentReader.close();
                    } catch (IOException e) {
                        BoundedSourceAsSDFWrapperFn.LOG.error("Failed to close BoundedReader due to failure processing bundle.", e);
                    }
                }
            }

            @Override // org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker
            public BoundedSource<T> currentRestriction() {
                return this.currentReader == null ? this.initialRestriction : this.currentReader.getCurrentSource();
            }

            @Override // org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker
            public SplitResult<BoundedSource<T>> trySplit(double d) {
                if (this.currentReader == null) {
                    return null;
                }
                Double fractionConsumed = this.currentReader.getFractionConsumed();
                double d2 = d;
                if (fractionConsumed != null) {
                    d2 = fractionConsumed.doubleValue() + ((1.0d - fractionConsumed.doubleValue()) * d);
                }
                BoundedSource<T> splitAtFraction = this.currentReader.splitAtFraction(d2);
                if (splitAtFraction == null) {
                    return null;
                }
                return SplitResult.of(this.currentReader.getCurrentSource(), splitAtFraction);
            }

            @Override // org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker
            public void checkDone() throws IllegalStateException {
                Preconditions.checkState(this.claimedAll, "Expected all records to have been claimed but finished processing bounded source while some records may have not been read.");
            }

            @Override // org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker
            public RestrictionTracker.IsBounded isBounded() {
                return RestrictionTracker.IsBounded.BOUNDED;
            }
        }

        BoundedSourceAsSDFWrapperFn() {
        }

        @DoFn.GetInitialRestriction
        public BoundedSource<T> initialRestriction(@DoFn.Element BoundedSource<T> boundedSource) {
            return boundedSource;
        }

        @DoFn.GetSize
        public double getSize(@DoFn.Restriction BoundedSource<T> boundedSource, PipelineOptions pipelineOptions) throws Exception {
            return boundedSource.getEstimatedSizeBytes(pipelineOptions);
        }

        @DoFn.SplitRestriction
        public void splitRestriction(@DoFn.Restriction BoundedSource<T> boundedSource, DoFn.OutputReceiver<BoundedSource<T>> outputReceiver, PipelineOptions pipelineOptions) throws Exception {
            Iterator<? extends BoundedSource<T>> it = boundedSource.split(Math.min(DEFAULT_DESIRED_BUNDLE_SIZE_BYTES, Math.max(1L, boundedSource.getEstimatedSizeBytes(pipelineOptions) / 20)), pipelineOptions).iterator();
            while (it.hasNext()) {
                outputReceiver.output(it.next());
            }
        }

        @DoFn.NewTracker
        public RestrictionTracker<BoundedSource<T>, TimestampedValue<T>[]> restrictionTracker(@DoFn.Restriction BoundedSource<T> boundedSource, PipelineOptions pipelineOptions) {
            return new BoundedSourceAsSDFRestrictionTracker(boundedSource, pipelineOptions);
        }

        @DoFn.ProcessElement
        public void processElement(RestrictionTracker<BoundedSource<T>, TimestampedValue<T>[]> restrictionTracker, DoFn.OutputReceiver<T> outputReceiver) throws IOException {
            TimestampedValue<T>[] timestampedValueArr = new TimestampedValue[1];
            while (restrictionTracker.tryClaim(timestampedValueArr)) {
                outputReceiver.outputWithTimestamp(timestampedValueArr[0].getValue(), timestampedValueArr[0].getTimestamp());
            }
        }

        @DoFn.GetRestrictionCoder
        public Coder<BoundedSource<T>> restrictionCoder() {
            return SerializableCoder.of(new TypeDescriptor<BoundedSource<T>>() { // from class: org.apache.beam.sdk.io.Read.BoundedSourceAsSDFWrapperFn.1
            });
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/Read$Builder.class */
    public static class Builder {
        private final String name;

        private Builder(String str) {
            this.name = str;
        }

        public <T> Bounded<T> from(BoundedSource<T> boundedSource) {
            return new Bounded<>(this.name, boundedSource);
        }

        public <T> Unbounded<T> from(UnboundedSource<T, ?> unboundedSource) {
            return new Unbounded<>(this.name, unboundedSource);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/Read$OutputSingleSource.class */
    public static class OutputSingleSource<T extends HasDisplayData> extends DoFn<byte[], T> {
        private final T source;

        private OutputSingleSource(T t) {
            this.source = t;
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.OutputReceiver<T> outputReceiver) {
            outputReceiver.output(this.source);
        }

        @Override // org.apache.beam.sdk.transforms.DoFn, org.apache.beam.sdk.transforms.display.HasDisplayData
        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            builder.add(DisplayData.item("source", this.source.getClass()).withLabel("Read Source")).include("source", this.source);
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/Read$Unbounded.class */
    public static class Unbounded<T> extends PTransform<PBegin, PCollection<T>> {
        private final UnboundedSource<T, UnboundedSource.CheckpointMark> source;

        private Unbounded(String str, UnboundedSource<T, ?> unboundedSource) {
            super(str);
            this.source = (UnboundedSource) SerializableUtils.ensureSerializable(unboundedSource);
        }

        public BoundedReadFromUnboundedSource<T> withMaxNumRecords(long j) {
            return new BoundedReadFromUnboundedSource<>(this.source, j, null);
        }

        public BoundedReadFromUnboundedSource<T> withMaxReadTime(Duration duration) {
            return new BoundedReadFromUnboundedSource<>(this.source, Long.MAX_VALUE, duration);
        }

        @Override // org.apache.beam.sdk.transforms.PTransform
        public final PCollection<T> expand(PBegin pBegin) {
            this.source.validate();
            PCollection<T> coder = ((PCollection) ((PCollection) ((PCollection) pBegin.getPipeline().apply(Impulse.create())).apply(ParDo.of(new OutputSingleSource(this.source)))).setCoder(SerializableCoder.of(new TypeDescriptor<UnboundedSource<T, UnboundedSource.CheckpointMark>>() { // from class: org.apache.beam.sdk.io.Read.Unbounded.1
            })).apply(ParDo.of(new UnboundedSourceAsSDFWrapperFn(this.source.getCheckpointMarkCoder())))).setCoder(ValueWithRecordId.ValueWithRecordIdCoder.of(this.source.getOutputCoder()));
            if (this.source.requiresDeduping()) {
                coder.apply(Deduplicate.withRepresentativeValueFn(valueWithRecordId -> {
                    return valueWithRecordId.getId();
                }).withRepresentativeType(TypeDescriptor.of(byte[].class)));
            }
            return (PCollection) coder.apply(ParDo.of(new ValueWithRecordId.StripIdsDoFn()));
        }

        public UnboundedSource<T, ?> getSource() {
            return this.source;
        }

        @Override // org.apache.beam.sdk.transforms.PTransform
        public String getKindString() {
            return String.format("Read(%s)", NameUtils.approximateSimpleName(this.source));
        }

        @Override // org.apache.beam.sdk.transforms.PTransform, org.apache.beam.sdk.transforms.display.HasDisplayData
        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            builder.add(DisplayData.item("source", this.source.getClass()).withLabel("Read Source")).include("source", this.source);
        }

        private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
            String implMethodName = serializedLambda.getImplMethodName();
            boolean z = -1;
            switch (implMethodName.hashCode()) {
                case 853661846:
                    if (implMethodName.equals("lambda$expand$de390e22$1")) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/Read$Unbounded") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/values/ValueWithRecordId;)[B")) {
                        return valueWithRecordId -> {
                            return valueWithRecordId.getId();
                        };
                    }
                    break;
            }
            throw new IllegalArgumentException("Invalid lambda deserialization");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @DoFn.UnboundedPerElement
    /* loaded from: input_file:org/apache/beam/sdk/io/Read$UnboundedSourceAsSDFWrapperFn.class */
    public static class UnboundedSourceAsSDFWrapperFn<OutputT, CheckpointT extends UnboundedSource.CheckpointMark> extends DoFn<UnboundedSource<OutputT, CheckpointT>, ValueWithRecordId<OutputT>> {
        private static final Logger LOG = LoggerFactory.getLogger(UnboundedSourceAsSDFWrapperFn.class);
        private static final int DEFAULT_BUNDLE_FINALIZATION_LIMIT_MINS = 10;
        private final Coder<CheckpointT> checkpointCoder;
        private Cache<Object, UnboundedSource.UnboundedReader<OutputT>> cachedReaders;
        private Coder<UnboundedSourceRestriction<OutputT, CheckpointT>> restrictionCoder;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/beam/sdk/io/Read$UnboundedSourceAsSDFWrapperFn$EmptyUnboundedSource.class */
        public static class EmptyUnboundedSource<OutputT, CheckpointT extends UnboundedSource.CheckpointMark> extends UnboundedSource<OutputT, CheckpointT> {
            private static final EmptyUnboundedSource INSTANCE = new EmptyUnboundedSource();

            /* JADX INFO: Access modifiers changed from: private */
            /* loaded from: input_file:org/apache/beam/sdk/io/Read$UnboundedSourceAsSDFWrapperFn$EmptyUnboundedSource$EmptyUnboundedReader.class */
            public class EmptyUnboundedReader extends UnboundedSource.UnboundedReader<OutputT> {
                private final CheckpointT checkpointMark;

                private EmptyUnboundedReader(CheckpointT checkpointt) {
                    this.checkpointMark = checkpointt;
                }

                @Override // org.apache.beam.sdk.io.UnboundedSource.UnboundedReader, org.apache.beam.sdk.io.Source.Reader
                public boolean start() throws IOException {
                    return false;
                }

                @Override // org.apache.beam.sdk.io.UnboundedSource.UnboundedReader, org.apache.beam.sdk.io.Source.Reader
                public boolean advance() throws IOException {
                    return false;
                }

                @Override // org.apache.beam.sdk.io.Source.Reader
                public OutputT getCurrent() throws NoSuchElementException {
                    throw new UnsupportedOperationException("getCurrent is never meant to be invoked.");
                }

                @Override // org.apache.beam.sdk.io.Source.Reader
                public Instant getCurrentTimestamp() throws NoSuchElementException {
                    throw new UnsupportedOperationException("getCurrentTimestamp is never meant to be invoked.");
                }

                @Override // org.apache.beam.sdk.io.Source.Reader, java.lang.AutoCloseable
                public void close() throws IOException {
                }

                @Override // org.apache.beam.sdk.io.UnboundedSource.UnboundedReader
                public Instant getWatermark() {
                    return BoundedWindow.TIMESTAMP_MAX_VALUE;
                }

                @Override // org.apache.beam.sdk.io.UnboundedSource.UnboundedReader
                public UnboundedSource.CheckpointMark getCheckpointMark() {
                    return this.checkpointMark;
                }

                @Override // org.apache.beam.sdk.io.UnboundedSource.UnboundedReader, org.apache.beam.sdk.io.Source.Reader
                public UnboundedSource<OutputT, ?> getCurrentSource() {
                    return EmptyUnboundedSource.INSTANCE;
                }
            }

            private EmptyUnboundedSource() {
            }

            @Override // org.apache.beam.sdk.io.UnboundedSource
            public List<? extends UnboundedSource<OutputT, CheckpointT>> split(int i, PipelineOptions pipelineOptions) throws Exception {
                throw new UnsupportedOperationException("split is never meant to be invoked.");
            }

            @Override // org.apache.beam.sdk.io.UnboundedSource
            public UnboundedSource.UnboundedReader<OutputT> createReader(PipelineOptions pipelineOptions, CheckpointT checkpointt) {
                return new EmptyUnboundedReader(checkpointt);
            }

            @Override // org.apache.beam.sdk.io.UnboundedSource
            public Coder<CheckpointT> getCheckpointMarkCoder() {
                throw new UnsupportedOperationException("getCheckpointMarkCoder is never meant to be invoked.");
            }
        }

        /* loaded from: input_file:org/apache/beam/sdk/io/Read$UnboundedSourceAsSDFWrapperFn$UnboundedSourceAsSDFRestrictionTracker.class */
        private static class UnboundedSourceAsSDFRestrictionTracker<OutputT, CheckpointT extends UnboundedSource.CheckpointMark> extends RestrictionTracker<UnboundedSourceRestriction<OutputT, CheckpointT>, UnboundedSourceValue<OutputT>[]> implements RestrictionTracker.HasProgress {
            private final UnboundedSourceRestriction<OutputT, CheckpointT> initialRestriction;
            private final PipelineOptions pipelineOptions;
            private UnboundedSource.UnboundedReader<OutputT> currentReader;
            private boolean readerHasBeenStarted;
            private Cache<Object, UnboundedSource.UnboundedReader<OutputT>> cachedReaders;
            private Coder<UnboundedSourceRestriction<OutputT, CheckpointT>> restrictionCoder;

            UnboundedSourceAsSDFRestrictionTracker(UnboundedSourceRestriction<OutputT, CheckpointT> unboundedSourceRestriction, PipelineOptions pipelineOptions, Cache<Object, UnboundedSource.UnboundedReader<OutputT>> cache, Coder<UnboundedSourceRestriction<OutputT, CheckpointT>> coder) {
                this.initialRestriction = unboundedSourceRestriction;
                this.pipelineOptions = pipelineOptions;
                this.cachedReaders = cache;
                this.restrictionCoder = coder;
            }

            private Object createCacheKey(UnboundedSource<OutputT, CheckpointT> unboundedSource, CheckpointT checkpointt) {
                Preconditions.checkNotNull(this.restrictionCoder);
                return this.restrictionCoder.structuralValue(UnboundedSourceRestriction.create(unboundedSource, checkpointt, BoundedWindow.TIMESTAMP_MIN_VALUE));
            }

            @Override // org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker
            public boolean tryClaim(UnboundedSourceValue<OutputT>[] unboundedSourceValueArr) {
                try {
                    if (this.currentReader == null) {
                        Object createCacheKey = createCacheKey(this.initialRestriction.getSource(), this.initialRestriction.getCheckpoint());
                        this.currentReader = this.cachedReaders.getIfPresent(createCacheKey);
                        if (this.currentReader == null) {
                            this.currentReader = this.initialRestriction.getSource().createReader(this.pipelineOptions, this.initialRestriction.getCheckpoint());
                        } else {
                            this.readerHasBeenStarted = true;
                            this.cachedReaders.invalidate(createCacheKey);
                        }
                    }
                    if (this.currentReader instanceof EmptyUnboundedSource.EmptyUnboundedReader) {
                        return false;
                    }
                    if (!this.readerHasBeenStarted) {
                        this.readerHasBeenStarted = true;
                        if (!this.currentReader.start()) {
                            unboundedSourceValueArr[0] = null;
                            return true;
                        }
                    } else if (!this.currentReader.advance()) {
                        unboundedSourceValueArr[0] = null;
                        return true;
                    }
                    unboundedSourceValueArr[0] = UnboundedSourceValue.create(this.currentReader.getCurrentRecordId(), this.currentReader.getCurrent(), this.currentReader.getCurrentTimestamp());
                    return true;
                } catch (IOException e) {
                    if (this.currentReader != null) {
                        try {
                            this.currentReader.close();
                        } catch (IOException e2) {
                            e.addSuppressed(e2);
                        }
                    }
                    throw new RuntimeException(e);
                }
            }

            /* JADX WARN: Multi-variable type inference failed */
            @Override // org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker
            public UnboundedSourceRestriction<OutputT, CheckpointT> currentRestriction() {
                if (this.currentReader == null) {
                    return this.initialRestriction;
                }
                Instant ensureTimestampWithinBounds = UnboundedSourceAsSDFWrapperFn.ensureTimestampWithinBounds(this.currentReader.getWatermark());
                if (!(this.currentReader instanceof EmptyUnboundedSource.EmptyUnboundedReader) && BoundedWindow.TIMESTAMP_MAX_VALUE.equals(ensureTimestampWithinBounds)) {
                    UnboundedSource.CheckpointMark checkpointMark = this.currentReader.getCheckpointMark();
                    try {
                        try {
                            this.currentReader.close();
                            this.currentReader = EmptyUnboundedSource.INSTANCE.createReader(null, checkpointMark);
                        } catch (IOException e) {
                            UnboundedSourceAsSDFWrapperFn.LOG.warn("Failed to close UnboundedReader.", e);
                            this.currentReader = EmptyUnboundedSource.INSTANCE.createReader(null, checkpointMark);
                        }
                    } catch (Throwable th) {
                        this.currentReader = EmptyUnboundedSource.INSTANCE.createReader(null, checkpointMark);
                        throw th;
                    }
                }
                return UnboundedSourceRestriction.create(this.currentReader.getCurrentSource(), this.currentReader.getCheckpointMark(), ensureTimestampWithinBounds);
            }

            @Override // org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker
            public SplitResult<UnboundedSourceRestriction<OutputT, CheckpointT>> trySplit(double d) {
                UnboundedSourceRestriction<OutputT, CheckpointT> currentRestriction = currentRestriction();
                if (currentRestriction.getSource() instanceof EmptyUnboundedSource) {
                    return null;
                }
                SplitResult<UnboundedSourceRestriction<OutputT, CheckpointT>> of = SplitResult.of(UnboundedSourceRestriction.create(EmptyUnboundedSource.INSTANCE, null, BoundedWindow.TIMESTAMP_MAX_VALUE), currentRestriction);
                if (!(this.currentReader instanceof EmptyUnboundedSource.EmptyUnboundedReader)) {
                    this.cachedReaders.put(createCacheKey(currentRestriction.getSource(), currentRestriction.getCheckpoint()), this.currentReader);
                }
                this.currentReader = EmptyUnboundedSource.INSTANCE.createReader(null, currentRestriction.getCheckpoint());
                return of;
            }

            @Override // org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker
            public void checkDone() throws IllegalStateException {
                Preconditions.checkState(this.currentReader instanceof EmptyUnboundedSource.EmptyUnboundedReader, "Expected all records to have been claimed but finished processing unbounded source while some records may have not been read.");
            }

            @Override // org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker
            public RestrictionTracker.IsBounded isBounded() {
                return RestrictionTracker.IsBounded.UNBOUNDED;
            }

            @Override // org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress
            public RestrictionTracker.Progress getProgress() {
                if (currentRestriction().getSource() instanceof EmptyUnboundedSource) {
                    return RestrictionTracker.Progress.from(1.0d, 0.0d);
                }
                if (this.currentReader == null) {
                    try {
                        this.currentReader = this.initialRestriction.getSource().createReader(this.pipelineOptions, this.initialRestriction.getCheckpoint());
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                }
                long splitBacklogBytes = this.currentReader.getSplitBacklogBytes();
                return splitBacklogBytes != -1 ? RestrictionTracker.Progress.from(0.0d, splitBacklogBytes) : RestrictionTracker.Progress.from(0.0d, 1.0d);
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        @AutoValue
        /* loaded from: input_file:org/apache/beam/sdk/io/Read$UnboundedSourceAsSDFWrapperFn$UnboundedSourceRestriction.class */
        public static abstract class UnboundedSourceRestriction<OutputT, CheckpointT extends UnboundedSource.CheckpointMark> implements Serializable {
            public static <OutputT, CheckpointT extends UnboundedSource.CheckpointMark> UnboundedSourceRestriction<OutputT, CheckpointT> create(UnboundedSource<OutputT, CheckpointT> unboundedSource, CheckpointT checkpointt, Instant instant) {
                return new AutoValue_Read_UnboundedSourceAsSDFWrapperFn_UnboundedSourceRestriction(unboundedSource, checkpointt, instant);
            }

            public abstract UnboundedSource<OutputT, CheckpointT> getSource();

            public abstract CheckpointT getCheckpoint();

            public abstract Instant getWatermark();
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/beam/sdk/io/Read$UnboundedSourceAsSDFWrapperFn$UnboundedSourceRestrictionCoder.class */
        public static class UnboundedSourceRestrictionCoder<OutputT, CheckpointT extends UnboundedSource.CheckpointMark> extends StructuredCoder<UnboundedSourceRestriction<OutputT, CheckpointT>> {
            private final Coder<UnboundedSource<OutputT, CheckpointT>> sourceCoder;
            private final Coder<CheckpointT> checkpointCoder;

            private UnboundedSourceRestrictionCoder(Coder<UnboundedSource<OutputT, CheckpointT>> coder, Coder<CheckpointT> coder2) {
                this.sourceCoder = coder;
                this.checkpointCoder = coder2;
            }

            @Override // org.apache.beam.sdk.coders.Coder
            public void encode(UnboundedSourceRestriction<OutputT, CheckpointT> unboundedSourceRestriction, OutputStream outputStream) throws CoderException, IOException {
                this.sourceCoder.encode(unboundedSourceRestriction.getSource(), outputStream);
                this.checkpointCoder.encode(unboundedSourceRestriction.getCheckpoint(), outputStream);
                InstantCoder.of().encode(unboundedSourceRestriction.getWatermark(), outputStream);
            }

            @Override // org.apache.beam.sdk.coders.Coder
            public UnboundedSourceRestriction<OutputT, CheckpointT> decode(InputStream inputStream) throws CoderException, IOException {
                return UnboundedSourceRestriction.create(this.sourceCoder.decode(inputStream), this.checkpointCoder.decode(inputStream), InstantCoder.of().decode(inputStream));
            }

            @Override // org.apache.beam.sdk.coders.Coder
            public List<? extends Coder<?>> getCoderArguments() {
                return Arrays.asList(this.sourceCoder, this.checkpointCoder);
            }

            @Override // org.apache.beam.sdk.coders.Coder
            public void verifyDeterministic() throws Coder.NonDeterministicException {
                verifyDeterministic(this.sourceCoder, "source coder not deterministic", (Coder<?>[]) new Coder[0]);
                verifyDeterministic((Coder<?>) this.checkpointCoder, "checkpoint coder not deterministic", (Coder<?>[]) new Coder[0]);
                verifyDeterministic(InstantCoder.of(), "watermark coder not deterministic", (Coder<?>[]) new Coder[0]);
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        @AutoValue
        /* loaded from: input_file:org/apache/beam/sdk/io/Read$UnboundedSourceAsSDFWrapperFn$UnboundedSourceValue.class */
        public static abstract class UnboundedSourceValue<T> {
            public static <T> UnboundedSourceValue<T> create(byte[] bArr, T t, Instant instant) {
                return new AutoValue_Read_UnboundedSourceAsSDFWrapperFn_UnboundedSourceValue(bArr, t, instant);
            }

            public abstract byte[] getId();

            public abstract T getValue();

            public abstract Instant getTimestamp();
        }

        private UnboundedSourceAsSDFWrapperFn(Coder<CheckpointT> coder) {
            this.checkpointCoder = coder;
        }

        @DoFn.GetInitialRestriction
        public UnboundedSourceRestriction<OutputT, CheckpointT> initialRestriction(@DoFn.Element UnboundedSource<OutputT, CheckpointT> unboundedSource) {
            return UnboundedSourceRestriction.create(unboundedSource, null, BoundedWindow.TIMESTAMP_MIN_VALUE);
        }

        @DoFn.Setup
        public void setUp() throws Exception {
            this.restrictionCoder = restrictionCoder();
            this.cachedReaders = CacheBuilder.newBuilder().expireAfterWrite(1L, TimeUnit.MINUTES).maximumSize(100L).removalListener(removalNotification -> {
                if (removalNotification.wasEvicted()) {
                    try {
                        ((UnboundedSource.UnboundedReader) removalNotification.getValue()).close();
                    } catch (IOException e) {
                        LOG.warn("Failed to close UnboundedReader.", e);
                    }
                }
            }).build();
        }

        @DoFn.SplitRestriction
        public void splitRestriction(@DoFn.Restriction UnboundedSourceRestriction<OutputT, CheckpointT> unboundedSourceRestriction, DoFn.OutputReceiver<UnboundedSourceRestriction<OutputT, CheckpointT>> outputReceiver, PipelineOptions pipelineOptions) throws Exception {
            if (unboundedSourceRestriction.getSource() instanceof EmptyUnboundedSource) {
                return;
            }
            if (unboundedSourceRestriction.getCheckpoint() != null && !(unboundedSourceRestriction.getCheckpoint() instanceof UnboundedSource.CheckpointMark.NoopCheckpointMark)) {
                outputReceiver.output(unboundedSourceRestriction);
            }
            try {
                Iterator<? extends UnboundedSource<OutputT, CheckpointT>> it = unboundedSourceRestriction.getSource().split(20, pipelineOptions).iterator();
                while (it.hasNext()) {
                    outputReceiver.output(UnboundedSourceRestriction.create(it.next(), null, unboundedSourceRestriction.getWatermark()));
                }
            } catch (Exception e) {
                LOG.warn("Exception while splitting source. Source not split.", e);
                outputReceiver.output(unboundedSourceRestriction);
            }
        }

        @DoFn.NewTracker
        public RestrictionTracker<UnboundedSourceRestriction<OutputT, CheckpointT>, UnboundedSourceValue<OutputT>[]> restrictionTracker(@DoFn.Restriction UnboundedSourceRestriction<OutputT, CheckpointT> unboundedSourceRestriction, PipelineOptions pipelineOptions) {
            Preconditions.checkNotNull(this.restrictionCoder);
            Preconditions.checkNotNull(this.cachedReaders);
            return new UnboundedSourceAsSDFRestrictionTracker(unboundedSourceRestriction, pipelineOptions, this.cachedReaders, this.restrictionCoder);
        }

        @DoFn.ProcessElement
        public DoFn.ProcessContinuation processElement(RestrictionTracker<UnboundedSourceRestriction<OutputT, CheckpointT>, UnboundedSourceValue[]> restrictionTracker, ManualWatermarkEstimator<Instant> manualWatermarkEstimator, DoFn.OutputReceiver<ValueWithRecordId<OutputT>> outputReceiver, DoFn.BundleFinalizer bundleFinalizer) throws IOException {
            UnboundedSourceRestriction<OutputT, CheckpointT> currentRestriction = restrictionTracker.currentRestriction();
            UnboundedSourceValue[] unboundedSourceValueArr = new UnboundedSourceValue[1];
            while (restrictionTracker.tryClaim(unboundedSourceValueArr) && unboundedSourceValueArr[0] != null) {
                outputReceiver.outputWithTimestamp(new ValueWithRecordId<>(unboundedSourceValueArr[0].getValue(), unboundedSourceValueArr[0].getId()), unboundedSourceValueArr[0].getTimestamp());
            }
            UnboundedSourceRestriction<OutputT, CheckpointT> currentRestriction2 = restrictionTracker.currentRestriction();
            manualWatermarkEstimator.setWatermark(currentRestriction2.getWatermark());
            boolean z = currentRestriction == currentRestriction2;
            if (currentRestriction2.getCheckpoint() != null && !z && !(restrictionTracker.currentRestriction().getCheckpoint() instanceof UnboundedSource.CheckpointMark.NoopCheckpointMark)) {
                Instant plus = Instant.now().plus(Duration.standardMinutes(10L));
                CheckpointT checkpoint = currentRestriction2.getCheckpoint();
                Objects.requireNonNull(checkpoint);
                bundleFinalizer.afterBundleCommit(plus, checkpoint::finalizeCheckpoint);
            }
            return currentRestriction2.getSource() instanceof EmptyUnboundedSource ? DoFn.ProcessContinuation.stop() : DoFn.ProcessContinuation.resume();
        }

        @DoFn.GetInitialWatermarkEstimatorState
        public Instant getInitialWatermarkEstimatorState(@DoFn.Timestamp Instant instant) {
            return instant;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public static Instant ensureTimestampWithinBounds(Instant instant) {
            if (instant.isBefore(BoundedWindow.TIMESTAMP_MIN_VALUE)) {
                instant = BoundedWindow.TIMESTAMP_MIN_VALUE;
            } else if (instant.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
                instant = BoundedWindow.TIMESTAMP_MAX_VALUE;
            }
            return instant;
        }

        @DoFn.NewWatermarkEstimator
        public WatermarkEstimators.Manual newWatermarkEstimator(@DoFn.WatermarkEstimatorState Instant instant) {
            return new WatermarkEstimators.Manual(ensureTimestampWithinBounds(instant));
        }

        @DoFn.GetRestrictionCoder
        public Coder<UnboundedSourceRestriction<OutputT, CheckpointT>> restrictionCoder() {
            return new UnboundedSourceRestrictionCoder(SerializableCoder.of(new TypeDescriptor<UnboundedSource<OutputT, CheckpointT>>() { // from class: org.apache.beam.sdk.io.Read.UnboundedSourceAsSDFWrapperFn.1
            }), NullableCoder.of(this.checkpointCoder));
        }
    }

    public static <T> Bounded<T> from(BoundedSource<T> boundedSource) {
        return new Bounded<>(null, boundedSource);
    }

    public static <T> Unbounded<T> from(UnboundedSource<T, ?> unboundedSource) {
        return new Unbounded<>(null, unboundedSource);
    }
}
