1、添加不同数据源
package com.baidu.keyue.deepsight.memory.test;
import com.baidu.keyue.deepsight.memory.WordCount;
import com.baidu.keyue.deepsight.memory.WordCountData;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.connector.datagen.source.GeneratorFunction;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class EnvDemo {
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
configuration.set(RestOptions.BIND_PORT, "8082");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
env.setParallelism(1);
DataStream<String> text = env.fromSource(createDataGeneratorSource(), WatermarkStrategy.noWatermarks(), "datagen source");
DataStream<Tuple2<String, Integer>> counts =
text.flatMap(new WordCount.Tokenizer())
.keyBy(0)
.sum(1);
counts.print();
env.execute("WordCount");
}
public static FileSource crateFileSource() {
FileSource<String> fileSource = FileSource.forRecordStreamFormat(new TextLineInputFormat(),
new Path("input/word.txt")).build();
return fileSource;
}
public static KafkaSource<String> crateKafkaSource() {
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers("ip-port")
.setTopics("topic")
.setGroupId("groupId")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
return kafkaSource;
}
public static DataGeneratorSource<String> createDataGeneratorSource() {
DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(
(GeneratorFunction<Long, String>) value -> "hello" + value,
100,
RateLimiterStrategy.perSecond(5),
Types.STRING
);
return dataGeneratorSource;
}
}
2、数据处理
package com.baidu.keyue.deepsight.memory.test;
import com.baidu.keyue.deepsight.memory.WordCount;
import com.baidu.keyue.deepsight.memory.bean.WaterSensor;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.connector.datagen.source.GeneratorFunction;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class DataProcessDemo {
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
configuration.set(RestOptions.BIND_PORT, "8082");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
env.setParallelism(1);
DataStream<WaterSensor> sensorDs = env.fromElements(
new WaterSensor("s1", 1L, 1),
new WaterSensor("s1", 100L, 100),
new WaterSensor("s1", 1000L, 1000),
new WaterSensor("s3", 3L, 3)
);
SingleOutputStreamOperator<String> map = sensorDs.map(new MapFunction<WaterSensor, String>() {
@Override
public String map(WaterSensor waterSensor) throws Exception {
return waterSensor.getId() + " : " + waterSensor.getVc();
}
});
SingleOutputStreamOperator<WaterSensor> filter = sensorDs.filter(new FilterFunction<WaterSensor>() {
@Override
public boolean filter(WaterSensor waterSensor) throws Exception {
return waterSensor.getVc() > 1;
}
});
SingleOutputStreamOperator<String> flatMap = sensorDs.flatMap(new FlatMapFunction<WaterSensor, String>() {
@Override
public void flatMap(WaterSensor waterSensor, Collector<String> collector) throws Exception {
if ("s1".equals(waterSensor.getId())) {
collector.collect(waterSensor.getId());
} else {
collector.collect(waterSensor.getId());
collector.collect(waterSensor.getVc().toString());
}
}
});
KeyedStream<WaterSensor, String> keyBy = sensorDs.keyBy(new KeySelector<WaterSensor, String>() {
@Override
public String getKey(WaterSensor waterSensor) throws Exception {
return waterSensor.getId();
}
});
keyBy.reduce(new ReduceFunction<WaterSensor>() {
@Override
public WaterSensor reduce(WaterSensor t2, WaterSensor t1) throws Exception {
System.out.println("t1=" + t1);
System.out.println("t2=" + t2);
return new WaterSensor(t1.getId(), t1.getTs() + t2.getTs(), t1.getVc() + t2.getVc());
}
}).print();
env.execute("WordCount");
}
}
3、分流/合流
package com.baidu.keyue.deepsight.memory.test;
import com.baidu.keyue.deepsight.memory.bean.WaterSensor;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class FenliuDemo {
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
configuration.set(RestOptions.BIND_PORT, "8082");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
env.setParallelism(1);
DataStream<WaterSensor> sensorDs = env.fromElements(
new WaterSensor("s1", 1L, 1),
new WaterSensor("s1", 100L, 100),
new WaterSensor("s1", 1000L, 1000),
new WaterSensor("s3", 3L, 3)
);
SingleOutputStreamOperator<WaterSensor> oushu = sensorDs.filter(waterSensor -> waterSensor.getVc() % 2 == 0);
SingleOutputStreamOperator<WaterSensor> jishu = sensorDs.filter(waterSensor -> waterSensor.getVc() % 2 == 1);
oushu.print("偶数流");
jishu.print("奇数流");
oushu.union(jishu).print("合并流");
env.execute("WordCount");
}
}
4、输出流 sink
package com.baidu.keyue.deepsight.memory.test;
import com.baidu.keyue.deepsight.memory.bean.WaterSensor;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class SinkDemo {
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
configuration.set(RestOptions.BIND_PORT, "8082");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
// 设置流处理还是批处理
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
env.setParallelism(1);
// 读取数据
DataStream<WaterSensor> sensorDs = env.fromElements(
new WaterSensor("s1", 1L, 1),
new WaterSensor("s1", 100L, 100),
new WaterSensor("s1", 1000L, 1000),
new WaterSensor("s3", 3L, 3)
);
FileSink<WaterSensor> fileSink = FileSink.<WaterSensor>forRowFormat(new Path("/Users/chengyong03/Downloads/output/flink"),
new SimpleStringEncoder<>("UTF-8"))
.build();
sensorDs.sinkTo(fileSink);
env.execute("WordCount");
}
}
5、flink流表互转,flink sql
package com.baidu.keyue.deepsight.memory.test;
import com.baidu.keyue.deepsight.memory.bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class SqlDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
DataStream<WaterSensor> sensorDs = env.fromElements(
new WaterSensor("s1", 1L, 1),
new WaterSensor("s1", 100L, 100),
new WaterSensor("s1", 1000L, 1000),
new WaterSensor("s3", 3L, 3)
);
sensorDs.print();
Table sensorTable = tableEnv.fromDataStream(sensorDs);
tableEnv.createTemporaryView("sensorTable", sensorTable);
Table resultTable = tableEnv.sqlQuery("select * from sensorTable where vc > 10");
DataStream<WaterSensor> waterSensorDataStream = tableEnv.toDataStream(resultTable, WaterSensor.class);
waterSensorDataStream.print();
env.execute();
}
}