设为首页收藏本站language→→ 语言切换

鸿鹄论坛

 找回密码
 论坛注册

QQ登录

先注册再绑定QQ

查看: 231|回复: 1
收起左侧

[其他] 泰涨知识 | 10分钟带你快速上手Flink编程

[复制链接]
发表于 2024-6-7 15:37:27 | 显示全部楼层 |阅读模式
本帖最后由 泰克Tech 于 2024-6-7 15:38 编辑

一、套接字流作为Flink的数据源1. 编写pom.xml
  1. <pre style="overflow-wrap: initial; background: rgb(248, 248, 250); border-radius: 4px; font-size: 0.9em; overflow: auto; padding: calc(0.888889em); word-break: initial; color: rgb(25, 27, 31);"><code class="language-text" style="background-color: inherit; border-radius: 0px; font-family: Menlo, Monaco, Consolas, &quot;Andale Mono&quot;, &quot;lucida console&quot;, &quot;Courier New&quot;, monospace; font-size: inherit;"> <!--   Flink依赖     -->
  2.        <dependency>
  3.            <groupId>org.apache.flink</groupId>
  4.            <artifactId>flink-scala_2.12</artifactId>
  5.            <version>1.14.0</version>
  6.        </dependency>
  7.        <dependency>
  8.            <groupId>org.apache.flink</groupId>
  9.            <artifactId>flink-streaming-scala_2.12</artifactId>
  10.            <version>1.14.0</version>
  11.        </dependency>
  12.        <dependency>
  13.            <groupId>org.apache.flink</groupId>
  14.            <artifactId>flink-clients_2.12</artifactId>
  15.            <version>1.14.0</version>
  16.        </dependency></code></pre>
复制代码


2. 编写代码

复制代码


3.去到服务器,执行nc -l命令发送数据到Flink
  1. <span style="font-family: Consolas, &quot;Liberation Mono&quot;, Menlo, Courier, monospace; white-space: pre; color: rgb(51, 51, 51); font-size: 14px; letter-spacing: 1px;">#如果服务器是最小化安装的,可能不存在nc命令,需要手动安装</span>
复制代码


4.输出结果
回到IDEA运行代码,在终端输入文字(使用空格分开),在idea终端会显示计算结果:

                               
登录/注册后可看大图



二、Kafka作为Flink的数据源1.执行逻辑

                               
登录/注册后可看大图
2.编写pom.xml
  1. <pre style="overflow-wrap: initial; background: rgb(248, 248, 250); border-radius: 4px; font-size: 0.9em; overflow: auto; padding: calc(0.888889em); word-break: initial; color: rgb(25, 27, 31);"><code class="language-text" style="background-color: inherit; border-radius: 0px; font-family: Menlo, Monaco, Consolas, &quot;Andale Mono&quot;, &quot;lucida console&quot;, &quot;Courier New&quot;, monospace; font-size: inherit;">  <!--   Flink集成Kafka   -->
  2.        <dependency>
  3.            <groupId>org.apache.flink</groupId>
  4.            <artifactId>flink-connector-kafka_2.12</artifactId>
  5.            <version>1.14.0</version>
  6.        </dependency></code></pre>
复制代码



3.编写代码
  1. <pre style="overflow-wrap: initial; background: rgb(248, 248, 250); border-radius: 4px; font-size: 0.9em; overflow: auto; padding: calc(0.888889em); word-break: initial; color: rgb(25, 27, 31);"><code class="language-text" style="background-color: inherit; border-radius: 0px; font-family: Menlo, Monaco, Consolas, &quot;Andale Mono&quot;, &quot;lucida console&quot;, &quot;Courier New&quot;, monospace; font-size: inherit;">import org.apache.flink.api.common.serialization.SimpleStringSchema
  2. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
  3. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer

  4. import java.util.Properties

  5. object flink_demo3 {
  6. def main(args: Array[String]): Unit = {

  7.    //1.创建执行环境
  8.    val env = StreamExecutionEnvironment.getExecutionEnvironment

  9.    val text = env.socketTextStream("192.168.80.145",4444,'\n')

  10.    //2.执行逻辑
  11.    //2.1 kafka的主题
  12.    val topic = "bigdata"
  13.    //2.2 配置其他的Kafka参数
  14.    val prop = new Properties()
  15.    // Kafka的服务器路径及端口
  16.    prop.setProperty("bootstrap.servers", "192.168.80.145:9092")
  17.    
  18.    val myProducer = new FlinkKafkaProducer[String]("192.168.80.145:9092",topic,new SimpleStringSchema())
  19.    
  20.    text.print()
  21.    text.addSink(myProducer)

  22.    //3.执行计算
  23.    env.execute("kafka flink sink")
  24. }
  25. }</code></pre>
复制代码


4.查看结果
去到服务器启动Kafka:
4.1 启动zookeeper
  1. zookeeper-server-start.sh /usr/local/src/kafka_2.12-2.4.1/config/zookeeper.properties
复制代码


4.2 另起一个终端,启动Kafka
  1. kafka-server-start.sh /usr/local/src/kafka_2.12-2.4.1/config/server.properties
复制代码


4.3 另起一个终端,启动生产者
  1. kafka-console-producer.sh --broker-list master:9092 --topic bigdata
复制代码


4.4 在生产者端口输入数据,然后就能在IDEA终端(Flink)查看到结果

                               
登录/注册后可看大图
三、Kafka作为Flink的输出Sink1.执行逻辑套接字流作为Flink的数据源Source,将数据发送给Flink,Flink再将数据发送给Kafka(相当于Kafka作为Flink的Sink,或者说Flink作为Kafka的生产者),服务器终端的消费者可以消费到Flink发送过来的数据:

                               
登录/注册后可看大图
2.编写pom.xml
和Kafka作为Flink的数据源一致:
  1.   <!--   Flink集成Kafka   -->
  2.        <dependency>
  3.            <groupId>org.apache.flink</groupId>
  4.            <artifactId>flink-connector-kafka_2.12</artifactId>
  5.            <version>1.14.0</version>
  6.        </dependency>
复制代码

3.编写代码
  1. import org.apache.flink.api.common.serialization.SimpleStringSchema
  2. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
  3. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer

  4. import java.util.Properties

  5. object flink_demo3 {
  6. def main(args: Array[String]): Unit = {

  7.    //1.创建执行环境
  8.    val env = StreamExecutionEnvironment.getExecutionEnvironment

  9.    val text = env.socketTextStream("192.168.80.145",4444,'\n')

  10.    //2.执行逻辑
  11.    //2.1 kafka的主题
  12.    val topic = "bigdata"
  13.    //2.2 配置其他的Kafka参数
  14.    val prop = new Properties()
  15.    // Kafka的服务器路径及端口
  16.    prop.setProperty("bootstrap.servers", "192.168.80.145:9092")
  17.    
  18.    val myProducer = new FlinkKafkaProducer[String]("192.168.80.145:9092",topic,new SimpleStringSchema())
  19.    
  20.    text.print()
  21.    text.addSink(myProducer)

  22.    //3.执行计算
  23.    env.execute("kafka flink sink")
  24. }
  25. }
复制代码



4.在终端启动套接字流
去到服务器,执行nc -l命令发送数据到Flink:
  1. nc -l 4444
复制代码

5.查看结果
5.1 前提:Zookeeper、Kafka已经启动了,如果没有启动,执行下述命令:
  1. #启动zookeeper
  2. zookeeper-server-start.sh /usr/local/src/kafka_2.12-2.4.1/config/zookeeper.properties
  3. #另起一个终端,启动Kafka
  4. kafka-server-start.sh /usr/local/src/kafka_2.12-2.4.1/config/server.properties
复制代码


5.2 回到IDEA即可看到套接字流发送到Flink的内容。

5.3 在服务器终端启动一个消费者,然后查看Flink是否将数据发送给Kafka。

                               
登录/注册后可看大图
四、Redis作为Flink的输出Sink1.执行逻辑

                               
登录/注册后可看大图
2.编写pom.xml
  1. <!--   Flink集成Redis     -->
  2.        <dependency>
  3.            <groupId>org.apache.bahir</groupId>
  4.            <artifactId>flink-connector-redis_2.12</artifactId>
  5.            <version>1.1.0</version>
  6.        </dependency>
复制代码

3.编写代码
  1. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
  2. import org.apache.flink.streaming.connectors.redis.RedisSink
  3. import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
  4. import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
  5. import org.apache.flink.util.StringUtils

  6. /**
  7. * 使用Redis作为Flink的Sink
  8. */
  9. object flink_demo4 {

  10. class myRedisMapper extends RedisMapper[(String,String)]{
  11.    override def getCommandDescription: RedisCommandDescription =
  12.      new RedisCommandDescription(RedisCommand.SET)

  13.    override def getKeyFromData(t: (String, String)): String = t._1

  14.    override def getValueFromData(t: (String, String)): String = t._2
  15. }

  16. def main(args: Array[String]): Unit = {


  17.    //1.创建执行环境
  18.    val env = StreamExecutionEnvironment.getExecutionEnvironment

  19.    //从套接字流获取数据(即使用socket作为Flink的Source)
  20.    val text = env.socketTextStream("192.168.80.145", 4444, '\n')

  21.    //隐式转换
  22.    import org.apache.flink.api.scala._

  23.    //对获取到的数据进行处理
  24.    val data = text
  25.     .filter(!StringUtils.isNullOrWhitespaceOnly(_))//过滤空行
  26.     .flatMap(_.split(" "))//根据空格拆分
  27.     .map((_,1))//转换为键值对
  28.     .keyBy(0).sum(1)//根据键统计值,即相同key的值加1
  29.      //到这个位置获取到的内容相当于("hadoop",1)("flink",2)
  30.     .map(
  31.      x=>{
  32.        println(x)//在控制台打印键值对
  33.       (x._1,x._2.toString) //因为之前的键值对的值是整型,但Redis不支持整型,故将其转换成字符串
  34.     }
  35.   )
  36.    //配置Redis连接池
  37.    val redisConfig = new FlinkJedisPoolConfig.Builder().setHost("192.168.80.145").setPort(6379).build()

  38.    //配置Redis为Flink的sink
  39.    val redisSink = new RedisSink[(String,String)](redisConfig,new myRedisMapper)
  40.    //指定redisSink为Flink的Sink
  41.    data.addSink(redisSink)

  42.    //执行代码
  43.    env.execute()
  44. }
  45. }
复制代码

4.修改Redis配置
  1. vi /usr/local/src/redis-6.2.7/redis.conf

  2. #①在配置文件redis.conf修改bind 127.0.0.1,在最前面加入#注释他
  3. #bind 127.0.0.1 -::1

  4. #找到protected-mode yes将其修改为protected-mode no
复制代码

5.启动Redis的服务,终端会卡住,不要停止它
  1. redis-server /usr/local/src/redis-6.2.7/redis.conf
复制代码

6.另起一个终端,启动Redis命令行
  1. redis-cli
复制代码

7.另起一个终端,启动套接字流
  1. nc -l 4444
复制代码


8.回到IDEA,运行代码

然后再套接字流终端输入内容,即可在IDEA终端查看到输出的内容:

                               
登录/注册后可看大图

9.回到Redis命令行终端查看结果

回到Redis命令行终端,输入keys *命令即可查看到Flink输入Redis的内容:

                               
登录/注册后可看大图


扫码获取更多学习资料、备考资讯、岗位内推机会~


                               
登录/注册后可看大图

您需要登录后才可以回帖 登录 | 论坛注册

本版积分规则

QQ|Archiver|手机版|小黑屋|sitemap|鸿鹄论坛 ( 京ICP备14027439号 )  

GMT+8, 2024-6-20 03:38 , Processed in 0.057620 second(s), 9 queries , Redis On.  

  Powered by Discuz!

  © 2001-2024 HH010.COM

快速回复 返回顶部 返回列表