/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.runtime.stream.sql;

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.table.api.bridge.scala.package$;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.planner.runtime.utils.FailingCollectionSource;
import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase;
import org.apache.flink.table.planner.runtime.utils.TestData$;
import org.apache.flink.table.planner.runtime.utils.TestingAppendSink;
import org.apache.flink.types.Row;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import scala.Predef$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.StringOps;
import scala.math.Ordering;
import scala.reflect.ScalaSignature;

@RunWith(value=Parameterized.class)
@ScalaSignature(bytes="\u0006\u0001-4A!\u0001\u0002\u0001'\tIr+\u001b8e_^$\u0016M\u00197f\rVt7\r^5p]&#6)Y:f\u0015\t\u0019A!A\u0002tc2T!!\u0002\u0004\u0002\rM$(/Z1n\u0015\t9\u0001\"A\u0004sk:$\u0018.\\3\u000b\u0005%Q\u0011a\u00029mC:tWM\u001d\u0006\u0003\u00171\tQ\u0001^1cY\u0016T!!\u0004\b\u0002\u000b\u0019d\u0017N\\6\u000b\u0005=\u0001\u0012AB1qC\u000eDWMC\u0001\u0012\u0003\ry'oZ\u0002\u0001'\t\u0001A\u0003\u0005\u0002\u001615\taC\u0003\u0002\u0018\r\u0005)Q\u000f^5mg&\u0011\u0011D\u0006\u0002\u001b'R\u0014X-Y7j]\u001e<\u0016\u000e\u001e5Ti\u0006$X\rV3ti\n\u000b7/\u001a\u0005\t7\u0001\u0011\t\u0011)A\u00059\u0005!Qn\u001c3f!\ti\u0012G\u0004\u0002\u001f_9\u0011qD\f\b\u0003A5r!!\t\u0017\u000f\u0005\tZcBA\u0012+\u001d\t!\u0013F\u0004\u0002&Q5\taE\u0003\u0002(%\u00051AH]8pizJ\u0011!E\u0005\u0003\u001fAI!!\u0004\b\n\u0005-a\u0011BA\u0005\u000b\u0013\t9\u0001\"\u0003\u0002\u0018\r%\u0011\u0001GF\u0001\u001b'R\u0014X-Y7j]\u001e<\u0016\u000e\u001e5Ti\u0006$X\rV3ti\n\u000b7/Z\u0005\u0003eM\u0012\u0001c\u0015;bi\u0016\u0014\u0015mY6f]\u0012lu\u000eZ3\u000b\u0005A2\u0002\"B\u001b\u0001\t\u00031\u0014A\u0002\u001fj]&$h\b\u0006\u00028sA\u0011\u0001\bA\u0007\u0002\u0005!)1\u0004\u000ea\u00019!)1\b\u0001C!y\u00051!-\u001a4pe\u0016$\u0012!\u0010\t\u0003}\u0005k\u0011a\u0010\u0006\u0002\u0001\u0006)1oY1mC&\u0011!i\u0010\u0002\u0005+:LG\u000f\u000b\u0002;\tB\u0011Q\tS\u0007\u0002\r*\u0011q\tE\u0001\u0006UVt\u0017\u000e^\u0005\u0003\u0013\u001a\u0013aAQ3g_J,\u0007\"B&\u0001\t\u0003a\u0014\u0001\u0005;fgR$V/\u001c2mK^Kg\u000eZ8xQ\tQU\n\u0005\u0002F\u001d&\u0011qJ\u0012\u0002\u0005)\u0016\u001cH\u000fC\u0003R\u0001\u0011\u0005A(A\u000fuKN$H+^7cY\u0016<\u0016N\u001c3poR3fiV5uQ>3gm]3uQ\t\u0001V\nC\u0003U\u0001\u0011\u0005A(A\u0013uKN$H+^7cY\u0016<\u0016N\u001c3poR3fiV5uQ:+w-\u0019;jm\u0016|eMZ:fi\"\u00121+\u0014\u0005\u0006/\u0002!\t\u0001P\u0001\u000ei\u0016\u001cH\u000fS8q/&tGm\\<)\u0005Yk\u0005\"\u0002.\u0001\t\u0003a\u0014A\u0005;fgR\u001cU/\\;mCR,w+\u001b8e_^D#!W')\t\u0001i6\r\u001a\t\u0003=\u0006l\u0011a\u0018\u0006\u0003A\u001a\u000baA];o]\u0016\u0014\u0018B\u00012`\u0005\u001d\u0011VO\\,ji\"\fQA^1mk\u0016\u001c\u0013!\u001a\t\u0003M&l\u0011a\u001a\u0006\u0003Q\u001a\u000bqA];o]\u0016\u00148/\u0003\u0002kO\ni\u0001+\u0019:b[\u0016$XM]5{K\u0012\u0004")
public class WindowTableFunctionITCase
extends StreamingWithStateTestBase {
    @Override
    @Before
    public void before() {
        super.before();
        this.env().enableCheckpointing(100L, CheckpointingMode.EXACTLY_ONCE);
        this.env().setRestartStrategy(RestartStrategies.fixedDelayRestart((int)1, (long)0L));
        FailingCollectionSource.reset();
        String dataId = TestValuesTableFactory.registerData(TestData$.MODULE$.windowDataWithTimestamp());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(697).append("\n                       |CREATE TABLE T1 (\n                       | `ts` STRING,\n                       | `int` INT,\n                       | `double` DOUBLE,\n                       | `float` FLOAT,\n                       | `bigdec` DECIMAL(10, 2),\n                       | `string` STRING,\n                       | `name` STRING,\n                       | `rowtime` AS TO_TIMESTAMP(`ts`),\n                       | WATERMARK for `rowtime` AS `rowtime` - INTERVAL '1' SECOND\n                       |) WITH (\n                       | 'connector' = 'values',\n                       | 'data-id' = '").append(dataId).append("',\n                       | 'failing-source' = 'true'\n                       |)\n                       |").toString())).stripMargin());
    }

    @Test
    public void testTumbleWindow() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |  TO_TIMESTAMP(`ts`),\n        |  `int`,\n        |  `double`,\n        |  `float`,\n        |  `bigdec`,\n        |  `string`,\n        |  `name`,\n        |  CAST(`rowtime` AS STRING),\n        |  window_start,\n        |  window_end,\n        |  window_time\n        |FROM TABLE(TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"2020-10-10T00:00:01,1,1.0,1.0,1.11,Hi,a,2020-10-10 00:00:01.000,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-10T00:00:04.999", "2020-10-10T00:00:02,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 00:00:02.000,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-10T00:00:04.999", "2020-10-10T00:00:03,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 00:00:03.000,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-10T00:00:04.999", "2020-10-10T00:00:04,5,5.0,5.0,5.55,null,a,2020-10-10 00:00:04.000,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-10T00:00:04.999", "2020-10-10T00:00:04,5,5.0,null,5.55,Hi,a,2020-10-10 00:00:04.000,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-10T00:00:04.999", "2020-10-10T00:00:06,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "2020-10-10T00:00:07,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "2020-10-10T00:00:08,3,null,3.0,3.33,Comment#2,a,2020-10-10 00:00:08.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "2020-10-10T00:00:16,4,4.0,4.0,4.44,Hi,b,2020-10-10 00:00:16.000,2020-10-10T00:00:15,2020-10-10T00:00:20,2020-10-10T00:00:19.999", "2020-10-10T00:00:32,7,7.0,7.0,7.77,null,null,2020-10-10 00:00:32.000,2020-10-10T00:00:30,2020-10-10T00:00:35,2020-10-10T00:00:34.999", "2020-10-10T00:00:34,1,3.0,3.0,3.33,Comment#3,b,2020-10-10 00:00:34.000,2020-10-10T00:00:30,2020-10-10T00:00:35,2020-10-10T00:00:34.999"}));
        Assert.assertEquals((Object)((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"), (Object)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @Test
    public void testTumbleWindowTVFWithOffset() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n         |SELECT\n         |  TO_TIMESTAMP(`ts`),\n         |  `int`,\n         |  `double`,\n         |  `float`,\n         |  `bigdec`,\n         |  `string`,\n         |  `name`,\n         |  CAST(`rowtime` AS STRING),\n         |  window_start,\n         |  window_end,\n         |  window_time\n         |FROM TABLE(\n         |    TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND, INTERVAL '1' SECOND))\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"2020-10-10T00:00:01,1,1.0,1.0,1.11,Hi,a,2020-10-10 00:00:01.000,2020-10-10T00:00:01,2020-10-10T00:00:06,2020-10-10T00:00:05.999", "2020-10-10T00:00:02,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 00:00:02.000,2020-10-10T00:00:01,2020-10-10T00:00:06,2020-10-10T00:00:05.999", "2020-10-10T00:00:03,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 00:00:03.000,2020-10-10T00:00:01,2020-10-10T00:00:06,2020-10-10T00:00:05.999", "2020-10-10T00:00:04,5,5.0,5.0,5.55,null,a,2020-10-10 00:00:04.000,2020-10-10T00:00:01,2020-10-10T00:00:06,2020-10-10T00:00:05.999", "2020-10-10T00:00:04,5,5.0,null,5.55,Hi,a,2020-10-10 00:00:04.000,2020-10-10T00:00:01,2020-10-10T00:00:06,2020-10-10T00:00:05.999", "2020-10-10T00:00:06,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00:06,2020-10-10T00:00:11,2020-10-10T00:00:10.999", "2020-10-10T00:00:07,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00:06,2020-10-10T00:00:11,2020-10-10T00:00:10.999", "2020-10-10T00:00:08,3,null,3.0,3.33,Comment#2,a,2020-10-10 00:00:08.000,2020-10-10T00:00:06,2020-10-10T00:00:11,2020-10-10T00:00:10.999", "2020-10-10T00:00:16,4,4.0,4.0,4.44,Hi,b,2020-10-10 00:00:16.000,2020-10-10T00:00:16,2020-10-10T00:00:21,2020-10-10T00:00:20.999", "2020-10-10T00:00:32,7,7.0,7.0,7.77,null,null,2020-10-10 00:00:32.000,2020-10-10T00:00:31,2020-10-10T00:00:36,2020-10-10T00:00:35.999", "2020-10-10T00:00:34,1,3.0,3.0,3.33,Comment#3,b,2020-10-10 00:00:34.000,2020-10-10T00:00:31,2020-10-10T00:00:36,2020-10-10T00:00:35.999"}));
        Assert.assertEquals((Object)((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"), (Object)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @Test
    public void testTumbleWindowTVFWithNegativeOffset() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n         |SELECT\n         |  TO_TIMESTAMP(`ts`),\n         |  `int`,\n         |  `double`,\n         |  `float`,\n         |  `bigdec`,\n         |  `string`,\n         |  `name`,\n         |  CAST(`rowtime` AS STRING),\n         |  window_start,\n         |  window_end,\n         |  window_time\n         |FROM TABLE(\n         |  TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND, INTERVAL '-1' SECOND))\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"2020-10-10T00:00:01,1,1.0,1.0,1.11,Hi,a,2020-10-10 00:00:01.000,2020-10-09T23:59:59,2020-10-10T00:00:04,2020-10-10T00:00:03.999", "2020-10-10T00:00:02,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 00:00:02.000,2020-10-09T23:59:59,2020-10-10T00:00:04,2020-10-10T00:00:03.999", "2020-10-10T00:00:03,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 00:00:03.000,2020-10-09T23:59:59,2020-10-10T00:00:04,2020-10-10T00:00:03.999", "2020-10-10T00:00:04,5,5.0,5.0,5.55,null,a,2020-10-10 00:00:04.000,2020-10-10T00:00:04,2020-10-10T00:00:09,2020-10-10T00:00:08.999", "2020-10-10T00:00:04,5,5.0,null,5.55,Hi,a,2020-10-10 00:00:04.000,2020-10-10T00:00:04,2020-10-10T00:00:09,2020-10-10T00:00:08.999", "2020-10-10T00:00:06,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00:04,2020-10-10T00:00:09,2020-10-10T00:00:08.999", "2020-10-10T00:00:07,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00:04,2020-10-10T00:00:09,2020-10-10T00:00:08.999", "2020-10-10T00:00:08,3,null,3.0,3.33,Comment#2,a,2020-10-10 00:00:08.000,2020-10-10T00:00:04,2020-10-10T00:00:09,2020-10-10T00:00:08.999", "2020-10-10T00:00:16,4,4.0,4.0,4.44,Hi,b,2020-10-10 00:00:16.000,2020-10-10T00:00:14,2020-10-10T00:00:19,2020-10-10T00:00:18.999", "2020-10-10T00:00:32,7,7.0,7.0,7.77,null,null,2020-10-10 00:00:32.000,2020-10-10T00:00:29,2020-10-10T00:00:34,2020-10-10T00:00:33.999", "2020-10-10T00:00:34,1,3.0,3.0,3.33,Comment#3,b,2020-10-10 00:00:34.000,2020-10-10T00:00:34,2020-10-10T00:00:39,2020-10-10T00:00:38.999"}));
        Assert.assertEquals((Object)((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"), (Object)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @Test
    public void testHopWindow() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |  TO_TIMESTAMP(`ts`),\n        |  `int`,\n        |  `double`,\n        |  `float`,\n        |  `bigdec`,\n        |  `string`,\n        |  `name`,\n        |  CAST(`rowtime` AS STRING),\n        |  window_start,\n        |  window_end,\n        |  window_time\n        |FROM TABLE(\n        |  HOP(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND, INTERVAL '10' SECOND))\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"2020-10-10T00:00:01,1,1.0,1.0,1.11,Hi,a,2020-10-10 00:00:01.000,2020-10-09T23:59:55,2020-10-10T00:00:05,2020-10-10T00:00:04.999", "2020-10-10T00:00:01,1,1.0,1.0,1.11,Hi,a,2020-10-10 00:00:01.000,2020-10-10T00:00,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "2020-10-10T00:00:02,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 00:00:02.000,2020-10-09T23:59:55,2020-10-10T00:00:05,2020-10-10T00:00:04.999", "2020-10-10T00:00:02,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 00:00:02.000,2020-10-10T00:00,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "2020-10-10T00:00:03,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 00:00:03.000,2020-10-09T23:59:55,2020-10-10T00:00:05,2020-10-10T00:00:04.999", "2020-10-10T00:00:03,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 00:00:03.000,2020-10-10T00:00,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "2020-10-10T00:00:04,5,5.0,5.0,5.55,null,a,2020-10-10 00:00:04.000,2020-10-09T23:59:55,2020-10-10T00:00:05,2020-10-10T00:00:04.999", "2020-10-10T00:00:04,5,5.0,5.0,5.55,null,a,2020-10-10 00:00:04.000,2020-10-10T00:00,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "2020-10-10T00:00:04,5,5.0,null,5.55,Hi,a,2020-10-10 00:00:04.000,2020-10-09T23:59:55,2020-10-10T00:00:05,2020-10-10T00:00:04.999", "2020-10-10T00:00:04,5,5.0,null,5.55,Hi,a,2020-10-10 00:00:04.000,2020-10-10T00:00,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "2020-10-10T00:00:06,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "2020-10-10T00:00:06,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00:05,2020-10-10T00:00:15,2020-10-10T00:00:14.999", "2020-10-10T00:00:07,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "2020-10-10T00:00:07,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00:05,2020-10-10T00:00:15,2020-10-10T00:00:14.999", "2020-10-10T00:00:08,3,null,3.0,3.33,Comment#2,a,2020-10-10 00:00:08.000,2020-10-10T00:00,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "2020-10-10T00:00:08,3,null,3.0,3.33,Comment#2,a,2020-10-10 00:00:08.000,2020-10-10T00:00:05,2020-10-10T00:00:15,2020-10-10T00:00:14.999", "2020-10-10T00:00:16,4,4.0,4.0,4.44,Hi,b,2020-10-10 00:00:16.000,2020-10-10T00:00:10,2020-10-10T00:00:20,2020-10-10T00:00:19.999", "2020-10-10T00:00:16,4,4.0,4.0,4.44,Hi,b,2020-10-10 00:00:16.000,2020-10-10T00:00:15,2020-10-10T00:00:25,2020-10-10T00:00:24.999", "2020-10-10T00:00:32,7,7.0,7.0,7.77,null,null,2020-10-10 00:00:32.000,2020-10-10T00:00:25,2020-10-10T00:00:35,2020-10-10T00:00:34.999", "2020-10-10T00:00:32,7,7.0,7.0,7.77,null,null,2020-10-10 00:00:32.000,2020-10-10T00:00:30,2020-10-10T00:00:40,2020-10-10T00:00:39.999", "2020-10-10T00:00:34,1,3.0,3.0,3.33,Comment#3,b,2020-10-10 00:00:34.000,2020-10-10T00:00:25,2020-10-10T00:00:35,2020-10-10T00:00:34.999", "2020-10-10T00:00:34,1,3.0,3.0,3.33,Comment#3,b,2020-10-10 00:00:34.000,2020-10-10T00:00:30,2020-10-10T00:00:40,2020-10-10T00:00:39.999"}));
        Assert.assertEquals((Object)((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"), (Object)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @Test
    public void testCumulateWindow() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |  TO_TIMESTAMP(`ts`),\n        |  `int`,\n        |  `double`,\n        |  `float`,\n        |  `bigdec`,\n        |  `string`,\n        |  `name`,\n        |  CAST(`rowtime` AS STRING),\n        |  window_start,\n        |  window_end,\n        |  window_time\n        |FROM TABLE(\n        |  CUMULATE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND, INTERVAL '15' SECOND))\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"2020-10-10T00:00:01,1,1.0,1.0,1.11,Hi,a,2020-10-10 00:00:01.000,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-10T00:00:04.999", "2020-10-10T00:00:01,1,1.0,1.0,1.11,Hi,a,2020-10-10 00:00:01.000,2020-10-10T00:00,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "2020-10-10T00:00:01,1,1.0,1.0,1.11,Hi,a,2020-10-10 00:00:01.000,2020-10-10T00:00,2020-10-10T00:00:15,2020-10-10T00:00:14.999", "2020-10-10T00:00:02,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 00:00:02.000,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-10T00:00:04.999", "2020-10-10T00:00:02,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 00:00:02.000,2020-10-10T00:00,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "2020-10-10T00:00:02,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 00:00:02.000,2020-10-10T00:00,2020-10-10T00:00:15,2020-10-10T00:00:14.999", "2020-10-10T00:00:03,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 00:00:03.000,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-10T00:00:04.999", "2020-10-10T00:00:03,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 00:00:03.000,2020-10-10T00:00,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "2020-10-10T00:00:03,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 00:00:03.000,2020-10-10T00:00,2020-10-10T00:00:15,2020-10-10T00:00:14.999", "2020-10-10T00:00:04,5,5.0,5.0,5.55,null,a,2020-10-10 00:00:04.000,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-10T00:00:04.999", "2020-10-10T00:00:04,5,5.0,5.0,5.55,null,a,2020-10-10 00:00:04.000,2020-10-10T00:00,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "2020-10-10T00:00:04,5,5.0,5.0,5.55,null,a,2020-10-10 00:00:04.000,2020-10-10T00:00,2020-10-10T00:00:15,2020-10-10T00:00:14.999", "2020-10-10T00:00:04,5,5.0,null,5.55,Hi,a,2020-10-10 00:00:04.000,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-10T00:00:04.999", "2020-10-10T00:00:04,5,5.0,null,5.55,Hi,a,2020-10-10 00:00:04.000,2020-10-10T00:00,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "2020-10-10T00:00:04,5,5.0,null,5.55,Hi,a,2020-10-10 00:00:04.000,2020-10-10T00:00,2020-10-10T00:00:15,2020-10-10T00:00:14.999", "2020-10-10T00:00:06,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "2020-10-10T00:00:06,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00,2020-10-10T00:00:15,2020-10-10T00:00:14.999", "2020-10-10T00:00:07,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "2020-10-10T00:00:07,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00,2020-10-10T00:00:15,2020-10-10T00:00:14.999", "2020-10-10T00:00:08,3,null,3.0,3.33,Comment#2,a,2020-10-10 00:00:08.000,2020-10-10T00:00,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "2020-10-10T00:00:08,3,null,3.0,3.33,Comment#2,a,2020-10-10 00:00:08.000,2020-10-10T00:00,2020-10-10T00:00:15,2020-10-10T00:00:14.999", "2020-10-10T00:00:16,4,4.0,4.0,4.44,Hi,b,2020-10-10 00:00:16.000,2020-10-10T00:00:15,2020-10-10T00:00:20,2020-10-10T00:00:19.999", "2020-10-10T00:00:16,4,4.0,4.0,4.44,Hi,b,2020-10-10 00:00:16.000,2020-10-10T00:00:15,2020-10-10T00:00:25,2020-10-10T00:00:24.999", "2020-10-10T00:00:16,4,4.0,4.0,4.44,Hi,b,2020-10-10 00:00:16.000,2020-10-10T00:00:15,2020-10-10T00:00:30,2020-10-10T00:00:29.999", "2020-10-10T00:00:32,7,7.0,7.0,7.77,null,null,2020-10-10 00:00:32.000,2020-10-10T00:00:30,2020-10-10T00:00:35,2020-10-10T00:00:34.999", "2020-10-10T00:00:32,7,7.0,7.0,7.77,null,null,2020-10-10 00:00:32.000,2020-10-10T00:00:30,2020-10-10T00:00:40,2020-10-10T00:00:39.999", "2020-10-10T00:00:32,7,7.0,7.0,7.77,null,null,2020-10-10 00:00:32.000,2020-10-10T00:00:30,2020-10-10T00:00:45,2020-10-10T00:00:44.999", "2020-10-10T00:00:34,1,3.0,3.0,3.33,Comment#3,b,2020-10-10 00:00:34.000,2020-10-10T00:00:30,2020-10-10T00:00:35,2020-10-10T00:00:34.999", "2020-10-10T00:00:34,1,3.0,3.0,3.33,Comment#3,b,2020-10-10 00:00:34.000,2020-10-10T00:00:30,2020-10-10T00:00:40,2020-10-10T00:00:39.999", "2020-10-10T00:00:34,1,3.0,3.0,3.33,Comment#3,b,2020-10-10 00:00:34.000,2020-10-10T00:00:30,2020-10-10T00:00:45,2020-10-10T00:00:44.999"}));
        Assert.assertEquals((Object)((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"), (Object)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    public WindowTableFunctionITCase(StreamingWithStateTestBase.StateBackendMode mode) {
        super(mode);
    }
}

