Flink操練(三十四)之ValueState求平均值

1、程式碼邏輯實現

package day03;import org。apache。flink。api。common。state。ValueState;import org。apache。flink。api。common。state。ValueStateDescriptor;import org。apache。flink。api。common。typeinfo。Types;import org。apache。flink。api。java。tuple。Tuple2;import org。apache。flink。configuration。Configuration;import org。apache。flink。streaming。api。datastream。DataStreamSource;import org。apache。flink。streaming。api。datastream。SingleOutputStreamOperator;import org。apache。flink。streaming。api。environment。StreamExecutionEnvironment;import org。apache。flink。streaming。api。functions。KeyedProcessFunction;import org。apache。flink。streaming。api。functions。source。SourceFunction;import org。apache。flink。util。Collector;import java。util。Random;/** * @program: Flink_learn * @description: 狀態變數 * @author: Mr。逗 * @create: 2021-09-17 15:21 **/public class StateDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment。getExecutionEnvironment(); env。setParallelism(1); DataStreamSource source = env。addSource(new SourceFunction() { private boolean isRunning = true; private Random random = new Random(); @Override public void run(SourceContext ctx) throws Exception { while (isRunning) { ctx。collect( random。nextInt(10) ); Thread。sleep(1000); } } @Override public void cancel() { isRunning = false; } }); SingleOutputStreamOperator process = source。keyBy(v -> true) 。process(new KeyedProcessFunction() { //宣告一個狀態變數作為累加器 // 狀態變數的可見範圍是當前key // 狀態變數是單例,只能被例項化一次 private ValueState> valueState; // 儲存定時器的時間戳 private ValueState timeTs; @Override public void open(Configuration parameters) throws Exception { super。open(parameters); //例項化狀態變數 valueState = getRuntimeContext()。getState(new ValueStateDescriptor>(“sum_count”, Types。TUPLE(Types。INT, Types。INT))); timeTs = getRuntimeContext()。getState(new ValueStateDescriptor(“timer”, Types。LONG)); } @Override public void processElement(Integer value, Context ctx, Collector out) throws Exception { // 當第一條資料到來時,狀態變數的值為null // 使用。value()方法讀取狀態變數的值,使用。update()方法更新狀態變數的值 if (valueState。value() == null) { valueState。update(Tuple2。of(value, 1)); } else { Tuple2 tmp = valueState。value(); valueState。update(Tuple2。of(tmp。f0 + value, tmp。f1 + 1)); } if (timeTs。value() == null) { long tenSecLater = ctx。timerService()。currentProcessingTime() + 10 * 1000; ctx。timerService()。registerEventTimeTimer(tenSecLater); timeTs。update(tenSecLater); } } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception { super。onTimer(timestamp, ctx, out); if (valueState。value() != null) { out。collect((double) valueState。value()。f0 / valueState。value()。f1); timeTs。clear(); } } }); process。print(); String name = StateDemo。class。getName(); env。execute(name); }}

2、結果之展示

“C:\Program Files\Java\jdk1。8。0_191\bin\java。exe” “-javaagent:F:\app\IntelliJ IDEA 2019。3。3\lib\idea_rt。jar=55777:F:\app\IntelliJ IDEA 2019。3。3\bin” -Dfile。encoding=UTF-8 -classpath “C:\Program Files\Java\jdk1。8。0_191\jre\lib\charsets。jar;C:\Program Files\Java\jdk1。8。0_191\jre\lib\deploy。jar;C:\Program Files\Java\jdk1。8。0_191\jre\lib\ext\access-bridge-64。jar;C:\Program Files\Java\jdk1。8。0_191\jre\lib\ext\cldrdata。jar;C:\Program Files\Java\jdk1。8。0_191\jre\lib\ext\dnsns。jar;C:\Program Files\Java\jdk1。8。0_191\jre\lib\ext\jaccess。jar;C:\Program Files\Java\jdk1。8。0_191\jre\lib\ext\jfxrt。jar;C:\Program Files\Java\jdk1。8。0_191\jre\lib\ext\localedata。jar;C:\Program Files\Java\jdk1。8。0_191\jre\lib\ext\nashorn。jar;C:\Program Files\Java\jdk1。8。0_191\jre\lib\ext\sunec。jar;C:\Program Files\Java\jdk1。8。0_191\jre\lib\ext\sunjce_provider。jar;C:\Program Files\Java\jdk1。8。0_191\jre\lib\ext\sunmscapi。jar;C:\Program Files\Java\jdk1。8。0_191\jre\lib\ext\sunpkcs11。jar;C:\Program Files\Java\jdk1。8。0_191\jre\lib\ext\zipfs。jar;C:\Program Files\Java\jdk1。8。0_191\jre\lib\javaws。jar;C:\Program Files\Java\jdk1。8。0_191\jre\lib\jce。jar;C:\Program Files\Java\jdk1。8。0_191\jre\lib\jfr。jar;C:\Program Files\Java\jdk1。8。0_191\jre\lib\jfxswt。jar;C:\Program Files\Java\jdk1。8。0_191\jre\lib\jsse。jar;C:\Program Files\Java\jdk1。8。0_191\jre\lib\management-agent。jar;C:\Program Files\Java\jdk1。8。0_191\jre\lib\plugin。jar;C:\Program Files\Java\jdk1。8。0_191\jre\lib\resources。jar;C:\Program Files\Java\jdk1。8。0_191\jre\lib\rt。jar;D:\bigData\bigData_learn\Flink_learn\target\classes;C:\Users\Administrator\。m2\repository\org\apache\flink\flink-java\1。13。0\flink-java-1。13。0。jar;C:\Users\Administrator\。m2\repository\org\apache\flink\flink-core\1。13。0\flink-core-1。13。0。jar;C:\Users\Administrator\。m2\repository\org\apache\flink\flink-annotations\1。13。0\flink-annotations-1。13。0。jar;C:\Users\Administrator\。m2\repository\org\apache\flink\flink-metrics-core\1。13。0\flink-metrics-core-1。13。0。jar;C:\Users\Administrator\。m2\repository\com\esotericsoftware\kryo\kryo\2。24。0\kryo-2。24。0。jar;C:\Users\Administrator\。m2\repository\com\esotericsoftware\minlog\minlog\1。2\minlog-1。2。jar;C:\Users\Administrator\。m2\repository\org\objenesis\objenesis\2。1\objenesis-2。1。jar;C:\Users\Administrator\。m2\repository\commons-collections\commons-collections\3。2。2\commons-collections-3。2。2。jar;C:\Users\Administrator\。m2\repository\org\apache\commons\commons-compress\1。20\commons-compress-1。20。jar;C:\Users\Administrator\。m2\repository\org\apache\commons\commons-lang3\3。3。2\commons-lang3-3。3。2。jar;C:\Users\Administrator\。m2\repository\org\apache\commons\commons-math3\3。5\commons-math3-3。5。jar;C:\Users\Administrator\。m2\repository\org\slf4j\slf4j-api\1。7。15\slf4j-api-1。7。15。jar;C:\Users\Administrator\。m2\repository\com\google\code\findbugs\jsr305\1。3。9\jsr305-1。3。9。jar;C:\Users\Administrator\。m2\repository\org\apache\flink\force-shading\1。13。0\force-shading-1。13。0。jar;C:\Users\Administrator\。m2\repository\org\apache\flink\flink-streaming-java_2。12\1。13。0\flink-streaming-java_2。12-1。13。0。jar;C:\Users\Administrator\。m2\repository\org\apache\flink\flink-file-sink-common\1。13。0\flink-file-sink-common-1。13。0。jar;C:\Users\Administrator\。m2\repository\org\apache\flink\flink-runtime_2。12\1。13。0\flink-runtime_2。12-1。13。0。jar;C:\Users\Administrator\。m2\repository\org\apache\flink\flink-queryable-state-client-java\1。13。0\flink-queryable-state-client-java-1。13。0。jar;C:\Users\Administrator\。m2\repository\org\apache\flink\flink-hadoop-fs\1。13。0\flink-hadoop-fs-1。13。0。jar;C:\Users\Administrator\。m2\repository\commons-io\commons-io\2。7\commons-io-2。7。jar;C:\Users\Administrator\。m2\repository\org\apache\flink\flink-shaded-netty\4。1。49。Final-13。0\flink-shaded-netty-4。1。49。Final-13。0。jar;C:\Users\Administrator\。m2\repository\org\apache\flink\flink-shaded-jackson\2。12。1-13。0\flink-shaded-jackson-2。12。1-13。0。jar;C:\Users\Administrator\。m2\repository\org\apache\flink\flink-shaded-zookeeper-3\3。4。14-13。0\flink-shaded-zookeeper-3-3。4。14-13。0。jar;C:\Users\Administrator\。m2\repository\org\javassist\javassist\3。24。0-GA\javassist-3。24。0-GA。jar;C:\Users\Administrator\。m2\repository\com\typesafe\akka\akka-actor_2。12\2。5。21\akka-actor_2。12-2。5。21。jar;C:\Users\Administrator\。m2\repository\com\typesafe\config\1。3。3\config-1。3。3。jar;C:\Users\Administrator\。m2\repository\org\scala-lang\modules\scala-java8-compat_2。12\0。8。0\scala-java8-compat_2。12-0。8。0。jar;C:\Users\Administrator\。m2\repository\com\typesafe\akka\akka-stream_2。12\2。5。21\akka-stream_2。12-2。5。21。jar;C:\Users\Administrator\。m2\repository\org\reactivestreams\reactive-streams\1。0。2\reactive-streams-1。0。2。jar;C:\Users\Administrator\。m2\repository\com\typesafe\ssl-config-core_2。12\0。3。7\ssl-config-core_2。12-0。3。7。jar;C:\Users\Administrator\。m2\repository\org\scala-lang\modules\scala-parser-combinators_2。12\1。1。1\scala-parser-combinators_2。12-1。1。1。jar;C:\Users\Administrator\。m2\repository\com\typesafe\akka\akka-protobuf_2。12\2。5。21\akka-protobuf_2。12-2。5。21。jar;C:\Users\Administrator\。m2\repository\com\typesafe\akka\akka-slf4j_2。12\2。5。21\akka-slf4j_2。12-2。5。21。jar;C:\Users\Administrator\。m2\repository\org\clapper\grizzled-slf4j_2。12\1。3。2\grizzled-slf4j_2。12-1。3。2。jar;C:\Users\Administrator\。m2\repository\com\github\scopt\scopt_2。12\3。5。0\scopt_2。12-3。5。0。jar;C:\Users\Administrator\。m2\repository\org\xerial\snappy\snappy-java\1。1。8。3\snappy-java-1。1。8。3。jar;C:\Users\Administrator\。m2\repository\com\twitter\chill_2。12\0。7。6\chill_2。12-0。7。6。jar;C:\Users\Administrator\。m2\repository\com\twitter\chill-java\0。7。6\chill-java-0。7。6。jar;C:\Users\Administrator\。m2\repository\org\lz4\lz4-java\1。6。0\lz4-java-1。6。0。jar;C:\Users\Administrator\。m2\repository\org\apache\flink\flink-shaded-guava\18。0-13。0\flink-shaded-guava-18。0-13。0。jar;C:\Users\Administrator\。m2\repository\org\apache\flink\flink-clients_2。12\1。13。0\flink-clients_2。12-1。13。0。jar;C:\Users\Administrator\。m2\repository\org\apache\flink\flink-optimizer_2。12\1。13。0\flink-optimizer_2。12-1。13。0。jar;C:\Users\Administrator\。m2\repository\commons-cli\commons-cli\1。3。1\commons-cli-1。3。1。jar;C:\Users\Administrator\。m2\repository\org\apache\flink\flink-table-api-java-bridge_2。12\1。13。0\flink-table-api-java-bridge_2。12-1。13。0。jar;C:\Users\Administrator\。m2\repository\org\apache\flink\flink-table-api-java\1。13。0\flink-table-api-java-1。13。0。jar;C:\Users\Administrator\。m2\repository\org\apache\flink\flink-table-planner-blink_2。12\1。13。0\flink-table-planner-blink_2。12-1。13。0。jar;C:\Users\Administrator\。m2\repository\org\apache\flink\flink-table-api-scala_2。12\1。13。0\flink-table-api-scala_2。12-1。13。0。jar;C:\Users\Administrator\。m2\repository\org\apache\flink\flink-table-api-scala-bridge_2。12\1。13。0\flink-table-api-scala-bridge_2。12-1。13。0。jar;C:\Users\Administrator\。m2\repository\org\apache\flink\flink-table-runtime-blink_2。12\1。13。0\flink-table-runtime-blink_2。12-1。13。0。jar;C:\Users\Administrator\。m2\repository\org\codehaus\janino\janino\3。0。11\janino-3。0。11。jar;C:\Users\Administrator\。m2\repository\org\codehaus\janino\commons-compiler\3。0。11\commons-compiler-3。0。11。jar;C:\Users\Administrator\。m2\repository\org\apache\calcite\avatica\avatica-core\1。17。0\avatica-core-1。17。0。jar;C:\Users\Administrator\。m2\repository\org\apache\flink\flink-streaming-scala_2。12\1。13。0\flink-streaming-scala_2。12-1。13。0。jar;C:\Users\Administrator\。m2\repository\org\apache\flink\flink-scala_2。12\1。13。0\flink-scala_2。12-1。13。0。jar;C:\Users\Administrator\。m2\repository\org\scala-lang\scala-reflect\2。12。7\scala-reflect-2。12。7。jar;C:\Users\Administrator\。m2\repository\org\scala-lang\scala-library\2。12。7\scala-library-2。12。7。jar;C:\Users\Administrator\。m2\repository\org\scala-lang\scala-compiler\2。12。7\scala-compiler-2。12。7。jar;C:\Users\Administrator\。m2\repository\org\scala-lang\modules\scala-xml_2。12\1。0。6\scala-xml_2。12-1。0。6。jar;C:\Users\Administrator\。m2\repository\org\apache\flink\flink-table-common\1。13。0\flink-table-common-1。13。0。jar;C:\Users\Administrator\。m2\repository\org\apache\flink\flink-connector-files\1。13。0\flink-connector-files-1。13。0。jar;C:\Users\Administrator\。m2\repository\org\apache\flink\flink-shaded-asm-7\7。1-13。0\flink-shaded-asm-7-7。1-13。0。jar;C:\Users\Administrator\。m2\repository\org\apache\flink\flink-cep_2。12\1。13。0\flink-cep_2。12-1。13。0。jar;C:\Users\Administrator\。m2\repository\org\apache\flink\flink-csv\1。13。0\flink-csv-1。13。0。jar;C:\Users\Administrator\。m2\repository\org\apache\flink\flink-connector-kafka_2。12\1。13。0\flink-connector-kafka_2。12-1。13。0。jar;C:\Users\Administrator\。m2\repository\org\apache\kafka\kafka-clients\2。4。1\kafka-clients-2。4。1。jar;C:\Users\Administrator\。m2\repository\com\github\luben\zstd-jni\1。4。3-1\zstd-jni-1。4。3-1。jar;C:\Users\Administrator\。m2\repository\org\apache\flink\flink-connector-base\1。13。0\flink-connector-base-1。13。0。jar;C:\Users\Administrator\。m2\repository\org\apache\bahir\flink-connector-redis_2。11\1。0\flink-connector-redis_2。11-1。0。jar;C:\Users\Administrator\。m2\repository\org\apache\flink\flink-streaming-java_2。11\1。2。0\flink-streaming-java_2。11-1。2。0。jar;C:\Users\Administrator\。m2\repository\org\apache\flink\flink-runtime_2。11\1。2。0\flink-runtime_2。11-1。2。0。jar;C:\Users\Administrator\。m2\repository\org\apache\flink\flink-shaded-hadoop2\1。2。0\flink-shaded-hadoop2-1。2。0。jar;C:\Users\Administrator\。m2\repository\org\tukaani\xz\1。0\xz-1。0。jar;C:\Users\Administrator\。m2\repository\xmlenc\xmlenc\0。52\xmlenc-0。52。jar;C:\Users\Administrator\。m2\repository\commons-codec\commons-codec\1。4\commons-codec-1。4。jar;C:\Users\Administrator\。m2\repository\commons-net\commons-net\3。1\commons-net-3。1。jar;C:\Users\Administrator\。m2\repository\javax\servlet\servlet-api\2。5\servlet-api-2。5。jar;C:\Users\Administrator\。m2\repository\org\mortbay\jetty\jetty-util\6。1。26\jetty-util-6。1。26。jar;C:\Users\Administrator\。m2\repository\com\sun\jersey\jersey-core\1。9\jersey-core-1。9。jar;C:\Users\Administrator\。m2\repository\commons-el\commons-el\1。0\commons-el-1。0。jar;C:\Users\Administrator\。m2\repository\commons-logging\commons-logging\1。1。3\commons-logging-1。1。3。jar;C:\Users\Administrator\。m2\repository\com\jamesmurty\utils\java-xmlbuilder\0。4\java-xmlbuilder-0。4。jar;C:\Users\Administrator\。m2\repository\commons-lang\commons-lang\2。6\commons-lang-2。6。jar;C:\Users\Administrator\。m2\repository\commons-configuration\commons-configuration\1。7\commons-configuration-1。7。jar;C:\Users\Administrator\。m2\repository\commons-digester\commons-digester\1。8。1\commons-digester-1。8。1。jar;C:\Users\Administrator\。m2\repository\org\codehaus\jackson\jackson-core-asl\1。8。8\jackson-core-asl-1。8。8。jar;C:\Users\Administrator\。m2\repository\org\codehaus\jackson\jackson-mapper-asl\1。8。8\jackson-mapper-asl-1。8。8。jar;C:\Users\Administrator\。m2\repository\org\apache\avro\avro\1。7。7\avro-1。7。7。jar;C:\Users\Administrator\。m2\repository\com\thoughtworks\paranamer\paranamer\2。3\paranamer-2。3。jar;C:\Users\Administrator\。m2\repository\com\jcraft\jsch\0。1。42\jsch-0。1。42。jar;C:\Users\Administrator\。m2\repository\commons-beanutils\commons-beanutils-bean-collections\1。8。3\commons-beanutils-bean-collections-1。8。3。jar;C:\Users\Administrator\。m2\repository\commons-daemon\commons-daemon\1。0。13\commons-daemon-1。0。13。jar;C:\Users\Administrator\。m2\repository\javax\xml\bind\jaxb-api\2。2。2\jaxb-api-2。2。2。jar;C:\Users\Administrator\。m2\repository\javax\xml\stream\stax-api\1。0-2\stax-api-1。0-2。jar;C:\Users\Administrator\。m2\repository\javax\activation\activation\1。1\activation-1。1。jar;C:\Users\Administrator\。m2\repository\io\netty\netty-all\4。0。27。Final\netty-all-4。0。27。Final。jar;C:\Users\Administrator\。m2\repository\com\data-artisans\flakka-actor_2。11\2。3-custom\flakka-actor_2。11-2。3-custom。jar;C:\Users\Administrator\。m2\repository\com\data-artisans\flakka-remote_2。11\2。3-custom\flakka-remote_2。11-2。3-custom。jar;C:\Users\Administrator\。m2\repository\io\netty\netty\3。8。0。Final\netty-3。8。0。Final。jar;C:\Users\Administrator\。m2\repository\org\uncommons\maths\uncommons-maths\1。2。2a\uncommons-maths-1。2。2a。jar;C:\Users\Administrator\。m2\repository\com\data-artisans\flakka-slf4j_2。11\2。3-custom\flakka-slf4j_2。11-2。3-custom。jar;C:\Users\Administrator\。m2\repository\org\clapper\grizzled-slf4j_2。11\1。0。2\grizzled-slf4j_2。11-1。0。2。jar;C:\Users\Administrator\。m2\repository\com\github\scopt\scopt_2。11\3。2。0\scopt_2。11-3。2。0。jar;C:\Users\Administrator\。m2\repository\com\fasterxml\jackson\core\jackson-core\2。7。4\jackson-core-2。7。4。jar;C:\Users\Administrator\。m2\repository\com\fasterxml\jackson\core\jackson-databind\2。7。4\jackson-databind-2。7。4。jar;C:\Users\Administrator\。m2\repository\com\fasterxml\jackson\core\jackson-annotations\2。7。0\jackson-annotations-2。7。0。jar;C:\Users\Administrator\。m2\repository\org\apache\zookeeper\zookeeper\3。4。6\zookeeper-3。4。6。jar;C:\Users\Administrator\。m2\repository\jline\jline\0。9。94\jline-0。9。94。jar;C:\Users\Administrator\。m2\repository\junit\junit\3。8。1\junit-3。8。1。jar;C:\Users\Administrator\。m2\repository\com\twitter\chill_2。11\0。7。4\chill_2。11-0。7。4。jar;C:\Users\Administrator\。m2\repository\org\apache\flink\flink-clients_2。11\1。2。0\flink-clients_2。11-1。2。0。jar;C:\Users\Administrator\。m2\repository\org\apache\flink\flink-optimizer_2。11\1。2。0\flink-optimizer_2。11-1。2。0。jar;C:\Users\Administrator\。m2\repository\org\apache\sling\org。apache。sling。commons。json\2。0。6\org。apache。sling。commons。json-2。0。6。jar;C:\Users\Administrator\。m2\repository\mysql\mysql-connector-java\8。0。21\mysql-connector-java-8。0。21。jar;C:\Users\Administrator\。m2\repository\com\google\protobuf\protobuf-java\3。11。4\protobuf-java-3。11。4。jar;C:\Users\Administrator\。m2\repository\org\apache\flink\flink-connector-jdbc_2。12\1。13。0\flink-connector-jdbc_2。12-1。13。0。jar;C:\Users\Administrator\。m2\repository\org\slf4j\slf4j-log4j12\1。7。30\slf4j-log4j12-1。7。30。jar;C:\Users\Administrator\。m2\repository\log4j\log4j\1。2。17\log4j-1。2。17。jar;C:\Users\Administrator\。m2\repository\org\apache\logging\log4j\log4j-to-slf4j\2。14。0\log4j-to-slf4j-2。14。0。jar;C:\Users\Administrator\。m2\repository\org\apache\logging\log4j\log4j-api\2。14。0\log4j-api-2。14。0。jar;C:\Users\Administrator\。m2\repository\redis\clients\jedis\2。9。0\jedis-2。9。0。jar;C:\Users\Administrator\。m2\repository\org\apache\commons\commons-pool2\2。4。2\commons-pool2-2。4。2。jar;C:\Users\Administrator\。m2\repository\com\google\code\gson\gson\2。8。5\gson-2。8。5。jar” day03。StateDemolog4j:WARN No appenders could be found for logger (org。apache。flink。api。java。ClosureCleaner)。log4j:WARN Please initialize the log4j system properly。log4j:WARN See http://logging。apache。org/log4j/1。2/faq。html#noconfig for more info。