Flink入门1

  |   0 评论   |   0 浏览

一:Flink是什么?

Flink是一个分布式计算框架。

Flink可以搭建廉价机群,快速处理任意规模的数据。

Flink总体架构如图,从左往右看。

Flink的实时处理是一个个Event(事件)驱动的(类比Kafka,Flume),不同于Spark Streaming中微批次。

(1)Flink的架构

简单理解无界流和有界流

无界流:流数据不会停止,没有边界,需要实时处理,绝对的实时处理,来一条,处理一条。

有界流:定义了数据的范围,类比Spark-Streaming中的微批次处理,Hive离线Mr处理。

Flink擅长于处理无界数据流(例如Kafka里的日志数据),有界数据集。

Fink可以部署在Yarn,K8s,Mesos多种资源调度框架中。

Fink可以处理任意数据量级。

  • 上万亿的Event处理。
  • 维护TB级别的处理状态。(类比Spark RDD中Cache,持久化TB级别的处理状态)
  • 运行在上千个核心的机群中。

Flink的状态持久化的优化

  • 当Flink计算Task中内存不足时候,Flink通过特殊的数据结构,高效的持久化到本地磁盘。
  • Flink会周期的异步持久化计算状态,防止Task进程挂掉,Task 主机意外宕机。并保证持久化数据的一致性。
  • Flink提供了CheckPoint,可以异步的将计算状态持久化到持久层(如HDFS,本地文件系统)

(2)Flink的应用

关于流处理的一些基本概念

流处理:

  • Flink可以处理有界或无界数据,提供了强力的无界数据处理的特性。
  • Flink更加偏向绝对的实时处理,来一条处理一条,而不是微批次。

状态:

计算状态是Flink的一等公民,Flink提供了许多特性来处理状态。

分层API

Flink提供了不同级别的API,满足各种应用场景的应用需求(类比RDD和Spark SQL)

ProcessFunction对应RDD,DataStream对应Spark Streaming, SQL/Table API对应Spark SQL

总结:

官网介绍的更加详细,有兴趣可以自行看下(有中文版,牛逼吧,有中文版)。

Flink实时处理很强,提供的ProcessFunction API可以非常细粒度灵活的控制Event处理。

Flink的提供了SQL支持,对Event进行数据聚合后,使用SQL进行数据分析。

Flink是来一个Event处理一次,对比按照时间间隔微批次处理,所以保证了数据实时处理。

Flink对计算状态持久化提供了非常多的特性。

二:Flink demo项目

Maven POM, 注意使用Scala2.11, 如果你想用Flink1.9,请选择Scala2.12。


    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.11</artifactId>
            <version>1.7.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>1.7.0</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <!-- 该插件用于将Scala代码编译成class文件 -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.4.6</version>
                <executions>
                    <execution>
                        <!-- 声明绑定到maven的compile阶段 -->
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

第一个WorkCount.scala

import org.apache.flink.api.scala.ExecutionEnvironment
//隐式转换
import org.apache.flink.api.scala._

object WordCount {
  def main(args: Array[String]): Unit = {
     //获取Flink运行环境
    val environment = ExecutionEnvironment.getExecutionEnvironment
    //注意 readTextFile路径填你自己的文件路径。
    val testDataSet = environment.readTextFile("1.txt")
    //可以理解testDataSet为一个RDD
   //groupby(0)意思是以元组第一个为key进行分组,sum(1)是对元组第二个位置数据进行累加。
    val result = testDataSet.flatMap(_.split(" ")).map((_, 1)).groupBy(0).sum(1)
    result.print()
  }

}

1.txt

hello java
hello java
hello flink
hello flink

运行结果


Flink Sock实时统计

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

import org.apache.flink.api.scala._

object SockDStream {
  def main(args: Array[String]): Unit = {
    val environment = StreamExecutionEnvironment.getExecutionEnvironment
    val unit = environment.socketTextStream("192.168.208.102", 7777)
    val result = unit.flatMap(_.split(" ")).filter(_.nonEmpty).map((_, 1)).keyBy(0).sum(1)
    result.print()
    //执行
    environment.execute()

  }

}

linux 192.168.208.102 虚拟机内

#如果没有netcat请先安装
yum install -y nc

#启动sock隧道
nc -lk 7777

运行SockDStream.scala

Flink默认是状态保持的(输入两个hello java, 输出 hello 2, java 2),默认保存在内存内

三:Flink的部署

单节点模式

配置Jdk1.8以上的环境变量

下载Flink

上传至Linux服务器,解压。

tar xf flink****.tar  -C {指定目录}

./bin/start-cluster.sh

http://{节点Ip}:8081为Flink WebUI

机群搭建

Flink机群相对来说配置比较简单

配置Jdk1.8,同样的步骤, 上传解压。

注意机群模式需要配置主机访问从机的SSH免密。

编辑配置文件 vi conf/flink-conf.yaml

注意yaml文件 冒号后必须加一个空格再填写参数。

jobmanager.rpc.address: (此项设置为主机IP地址设置)

编写conf/slaves文件,填加从机IP地址。

从机配置地址

主机Ip地址配置

分发文件到从机,分发脚本如下。

#!/bin/bash
pcount=$#
if((pcount==0)); then
echo no args;
exit;
fi

p1=$1
fname=`basename $p1`
echo fname=$fname

pdir=`cd -P $(dirname $p1); pwd`
echo pdir=$pdir

user=`whoami`
//注意下一行你必须修改,换成主机名,或者你的IP
for((host=102;host<105;host++)); do 
echo  --------hadoop$host--------
rsync -rvl $pdir/$fname $user@hadoop$host:$pdir
done

启动机群

bin/start-cluster.sh 

查看Web UI

{主机地址}:8081

原文链接https://zhuanlan.zhihu.com/p/85086072