本帖最后由 泰克Tech 于 2024-6-7 15:38 编辑
一、套接字流作为Flink的数据源1. 编写pom.xml
- <pre style="overflow-wrap: initial; background: rgb(248, 248, 250); border-radius: 4px; font-size: 0.9em; overflow: auto; padding: calc(0.888889em); word-break: initial; color: rgb(25, 27, 31);"><code class="language-text" style="background-color: inherit; border-radius: 0px; font-family: Menlo, Monaco, Consolas, "Andale Mono", "lucida console", "Courier New", monospace; font-size: inherit;"> <!-- Flink依赖 -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-scala_2.12</artifactId>
- <version>1.14.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-scala_2.12</artifactId>
- <version>1.14.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-clients_2.12</artifactId>
- <version>1.14.0</version>
- </dependency></code></pre>
复制代码
2. 编写代码
3.去到服务器,执行nc -l命令发送数据到Flink
- <span style="font-family: Consolas, "Liberation Mono", Menlo, Courier, monospace; white-space: pre; color: rgb(51, 51, 51); font-size: 14px; letter-spacing: 1px;">#如果服务器是最小化安装的,可能不存在nc命令,需要手动安装</span>
复制代码
4.输出结果回到IDEA运行代码,在终端输入文字(使用空格分开),在idea终端会显示计算结果: 
二、Kafka作为Flink的数据源1.执行逻辑 2.编写pom.xml
- <pre style="overflow-wrap: initial; background: rgb(248, 248, 250); border-radius: 4px; font-size: 0.9em; overflow: auto; padding: calc(0.888889em); word-break: initial; color: rgb(25, 27, 31);"><code class="language-text" style="background-color: inherit; border-radius: 0px; font-family: Menlo, Monaco, Consolas, "Andale Mono", "lucida console", "Courier New", monospace; font-size: inherit;"> <!-- Flink集成Kafka -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-kafka_2.12</artifactId>
- <version>1.14.0</version>
- </dependency></code></pre>
复制代码
3.编写代码
- <pre style="overflow-wrap: initial; background: rgb(248, 248, 250); border-radius: 4px; font-size: 0.9em; overflow: auto; padding: calc(0.888889em); word-break: initial; color: rgb(25, 27, 31);"><code class="language-text" style="background-color: inherit; border-radius: 0px; font-family: Menlo, Monaco, Consolas, "Andale Mono", "lucida console", "Courier New", monospace; font-size: inherit;">import org.apache.flink.api.common.serialization.SimpleStringSchema
- import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
- import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
- import java.util.Properties
- object flink_demo3 {
- def main(args: Array[String]): Unit = {
- //1.创建执行环境
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val text = env.socketTextStream("192.168.80.145",4444,'\n')
- //2.执行逻辑
- //2.1 kafka的主题
- val topic = "bigdata"
- //2.2 配置其他的Kafka参数
- val prop = new Properties()
- // Kafka的服务器路径及端口
- prop.setProperty("bootstrap.servers", "192.168.80.145:9092")
-
- val myProducer = new FlinkKafkaProducer[String]("192.168.80.145:9092",topic,new SimpleStringSchema())
-
- text.print()
- text.addSink(myProducer)
- //3.执行计算
- env.execute("kafka flink sink")
- }
- }</code></pre>
复制代码
4.查看结果去到服务器启动Kafka: 4.1 启动zookeeper - zookeeper-server-start.sh /usr/local/src/kafka_2.12-2.4.1/config/zookeeper.properties
复制代码
4.2 另起一个终端,启动Kafka - kafka-server-start.sh /usr/local/src/kafka_2.12-2.4.1/config/server.properties
复制代码
4.3 另起一个终端,启动生产者 - kafka-console-producer.sh --broker-list master:9092 --topic bigdata
复制代码
4.4 在生产者端口输入数据,然后就能在IDEA终端(Flink)查看到结果 三、Kafka作为Flink的输出Sink1.执行逻辑套接字流作为Flink的数据源Source,将数据发送给Flink,Flink再将数据发送给Kafka(相当于Kafka作为Flink的Sink,或者说Flink作为Kafka的生产者),服务器终端的消费者可以消费到Flink发送过来的数据: 2.编写pom.xml和Kafka作为Flink的数据源一致: - <!-- Flink集成Kafka -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-kafka_2.12</artifactId>
- <version>1.14.0</version>
- </dependency>
复制代码
3.编写代码- import org.apache.flink.api.common.serialization.SimpleStringSchema
- import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
- import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
- import java.util.Properties
- object flink_demo3 {
- def main(args: Array[String]): Unit = {
- //1.创建执行环境
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val text = env.socketTextStream("192.168.80.145",4444,'\n')
- //2.执行逻辑
- //2.1 kafka的主题
- val topic = "bigdata"
- //2.2 配置其他的Kafka参数
- val prop = new Properties()
- // Kafka的服务器路径及端口
- prop.setProperty("bootstrap.servers", "192.168.80.145:9092")
-
- val myProducer = new FlinkKafkaProducer[String]("192.168.80.145:9092",topic,new SimpleStringSchema())
-
- text.print()
- text.addSink(myProducer)
- //3.执行计算
- env.execute("kafka flink sink")
- }
- }
复制代码
4.在终端启动套接字流去到服务器,执行nc -l命令发送数据到Flink:
5.查看结果
5.1 前提:Zookeeper、Kafka已经启动了,如果没有启动,执行下述命令: - #启动zookeeper
- zookeeper-server-start.sh /usr/local/src/kafka_2.12-2.4.1/config/zookeeper.properties
- #另起一个终端,启动Kafka
- kafka-server-start.sh /usr/local/src/kafka_2.12-2.4.1/config/server.properties
复制代码
5.2 回到IDEA即可看到套接字流发送到Flink的内容。
5.3 在服务器终端启动一个消费者,然后查看Flink是否将数据发送给Kafka。 四、Redis作为Flink的输出Sink1.执行逻辑 2.编写pom.xml- <!-- Flink集成Redis -->
- <dependency>
- <groupId>org.apache.bahir</groupId>
- <artifactId>flink-connector-redis_2.12</artifactId>
- <version>1.1.0</version>
- </dependency>
复制代码
3.编写代码- import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
- import org.apache.flink.streaming.connectors.redis.RedisSink
- import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
- import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
- import org.apache.flink.util.StringUtils
- /**
- * 使用Redis作为Flink的Sink
- */
- object flink_demo4 {
- class myRedisMapper extends RedisMapper[(String,String)]{
- override def getCommandDescription: RedisCommandDescription =
- new RedisCommandDescription(RedisCommand.SET)
- override def getKeyFromData(t: (String, String)): String = t._1
- override def getValueFromData(t: (String, String)): String = t._2
- }
- def main(args: Array[String]): Unit = {
- //1.创建执行环境
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- //从套接字流获取数据(即使用socket作为Flink的Source)
- val text = env.socketTextStream("192.168.80.145", 4444, '\n')
- //隐式转换
- import org.apache.flink.api.scala._
- //对获取到的数据进行处理
- val data = text
- .filter(!StringUtils.isNullOrWhitespaceOnly(_))//过滤空行
- .flatMap(_.split(" "))//根据空格拆分
- .map((_,1))//转换为键值对
- .keyBy(0).sum(1)//根据键统计值,即相同key的值加1
- //到这个位置获取到的内容相当于("hadoop",1)("flink",2)
- .map(
- x=>{
- println(x)//在控制台打印键值对
- (x._1,x._2.toString) //因为之前的键值对的值是整型,但Redis不支持整型,故将其转换成字符串
- }
- )
- //配置Redis连接池
- val redisConfig = new FlinkJedisPoolConfig.Builder().setHost("192.168.80.145").setPort(6379).build()
- //配置Redis为Flink的sink
- val redisSink = new RedisSink[(String,String)](redisConfig,new myRedisMapper)
- //指定redisSink为Flink的Sink
- data.addSink(redisSink)
- //执行代码
- env.execute()
- }
- }
复制代码
4.修改Redis配置- vi /usr/local/src/redis-6.2.7/redis.conf
- #①在配置文件redis.conf修改bind 127.0.0.1,在最前面加入#注释他
- #bind 127.0.0.1 -::1
- #找到protected-mode yes将其修改为protected-mode no
复制代码
5.启动Redis的服务,终端会卡住,不要停止它- redis-server /usr/local/src/redis-6.2.7/redis.conf
复制代码
6.另起一个终端,启动Redis命令行
7.另起一个终端,启动套接字流
8.回到IDEA,运行代码
然后再套接字流终端输入内容,即可在IDEA终端查看到输出的内容: 
9.回到Redis命令行终端查看结果
回到Redis命令行终端,输入keys *命令即可查看到Flink输入Redis的内容: 
|