首页 > 其他分享 >Impala与Flink开发应用_tyt2023

Impala与Flink开发应用_tyt2023

时间:2023-12-27 12:33:39浏览次数:40  
标签:info Flink flink xxx user Impala tyt2023

本实验基于MRS环境,Impala部分主要介绍基本操作。假定用户开发一个应用程序,用于管理企业中的使用A业务的用户信息,使用Impala客户端实现A业务操作流程。Flink部分主要介绍如何实现Flink与Kafka的连接以满足实时计算场景应用。

购买MRS集群

选择“自定义购买”

区域:华北-北京四

集群名称:mrs

版本类型:普通版

集群版本:MRS 3.1.0 WXL

集群类型:自定义

勾选组件:Hadoop/Impala/Kafka/Flink/Zookeeper/Ranger

开启“拓扑调整”,勾选如下图位置所示的“DN, NM, B”。勾选所有节点上的Impalad服务

 

添加安全组规则,默认情况下华为云外部无法直接连接集群,我们需要放开安全组限制。

 

 

链接MRS集群

打开实验桌面的“Xfce终端”,使用ssh命令连接集群。

Impala实验

 Impala是用于处理存储在Hadoop集群中的大量数据的MPP(大规模并行处理)SQL查询引擎。 它是一个用C++和Java编写的开源软件。 与其他Hadoop的SQL引擎相比,它拥有高性能和低延迟的特点。

2.1背景信息

假定用户开发一个应用程序,用于管理企业中的使用A业务的用户信息,使用Impala客户端实现A业务操作流程如下:

普通表的操作:

1.创建用户信息表user_info。

2.在用户信息中新增用户的学历、职称信息。

3.根据用户编号查询用户姓名和地址。

4.A业务结束后,删除用户信息表。

安装Impala客户端

在MRS集群详情页面,点击“前往Manager”

 

 打开实验桌面的“Xfce终端”,使用ssh命令连接集群

进入Impala客户端下载目录并解压

cd /tmp/FusionInsight-Client/
tar -vxf FusionInsight_Cluster_1_Impala_Client.tar
tar -vxf FusionInsight_Cluster_1_Impala_ClientConfig.tar

安装Impala客户端至/opt/client目录

cd /tmp/FusionInsight-Client/FusionInsight_Cluster_1_Impala_ClientConfig
./install.sh /opt/client

 切换到Impala安装目录并启动

cd /opt/client
source bigdata_env
impala-shell

退出Impala环境

quit;

创建用户信息数据

创建用户信息表user_info。

 
                                vi user_info

 

登录Impala客户端

运行Impala客户端命令

impala-shell

内部表的操作

创建用户信息表user_info并添加相关数据。

create table user_info(id string,name string,gender string,age int,addr string);

insert into table user_info(id,name,gender,age,addr) values ("12005000201","A","男",19,"A城市");

在用户信息表user_info中新增用户的学历、职称信息。


alter table user_info add columns(education string,technical string);

根据用户编号查询用户姓名和地址。

 
  select name,addr from user_info where id='12005000201';

 

删除用户信息表。

 
                                drop table user_info;

外部表的操作

创建外部表。

create external table user_info(id string,name string,gender string,age int,addr string) partitioned by(year string) row format delimited fields terminated by ' ' lines terminated by '\n' stored as textfile location '/hive/user_info';
 

使用insert语句插入数据。

 
insert into user_info partition(year="2018") values ("12005000201","A","男",19,"A城市");

执行以下SQL语句,查看数据插入成功。

 
                                select * from user_info;
退出Impala环境

使用load data命令导入文件数据

上传文件至hdfs。

hdfs dfs -put user_info /tmp

进入Impala环境

 impala-shell
加载数据到表中。
load data inpath '/tmp/user_info' into table user_info partition (year='2018');

select * from user_info;
 删除用户信息表。
drop table user_info;
 Flink实验

场景介绍

假定某个Flink业务每秒就会收到1个消息记录。基于某些业务要求,开发的Flink应用程序实现功能:实时输出带有前缀的消息内容。

安装JDK环境

wget https://sandbox-expriment-files.obs.cn-north-1.myhuaweicloud.com/hccdp/HCCDP/jdk-8u341-linux-x64.tar.gz
 

下载完成后,运行下列命令进行解压:

 
 tar -zxvf jdk-8u341-linux-x64.tar.gz

3新建Flink Maven工程

打开实验桌面的eclipse,点击“File->New->Project”

Group Id: com.huawei

Artifact Id: FlinkExample

修改JDK路径

右上角选择Window标签,在下拉菜单最后一栏中找到Preferences。

看到项目名称下有一个类似JRE System Library [J2SE-1.5]的标签。右键点击该标签,选择Build Path->Configure Build Path:

在新窗口点击Add Library

 选择JRE System Library,点击Next。 此时应该能看到Workspace default JRE (jdk1.8.0_341),保证勾选后点击Finish: 将之前的J2SE-1.5直接删除。选择该模块,在右边找到Remove:

配置POM文件

打开“FlinkExample->pom.xml”文件

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.huawei</groupId>
<artifactId>FlinkExample</artifactId>
<version>0.0.1-SNAPSHOT</version>

<properties>
<flink.version>1.12.0-hw-ei-310003</flink.version>
<flink.shaded.zookeeper.version>3.5.6-12.0</flink.shaded.zookeeper.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop-2</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop-2</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop-2</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-zookeeper-3</artifactId>
<version>${flink.shaded.zookeeper.version}</version>
</dependency>
</dependencies>

<repositories>
<repository>
<id>huaweicloud2</id>
<name>huaweicloud2</name>
<url>https://mirrors.huaweicloud.com/repository/maven/</url>
</repository>
<repository>
<id>huaweicloud1</id>
<name>huaweicloud1</name>
<url>https://repo.huaweicloud.com/repository/maven/huaweicloudsdk/</url>
</repository>
</repositories>

<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>assembly</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

关闭pom.xml,下载第三方依赖程序包需要一定时间,预计约需5分钟左右。

开发程序

3.6.1创建程序包

在“src/main/java”处右键“new->package”

创建包FlinkDemo,点击“Finish”

创建类WriteIntoKafka

在上步FlinkDemo处右键点击,选择“New->Class”创建Java类WriteIntoKafka。

package FlinkDemo;

import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

public class WriteIntoKafka {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
ParameterTool paraTool = ParameterTool.fromArgs(args);
DataStream<String> messageStream = env.addSource(new SimpleStringGenerator());
messageStream.addSink(
new FlinkKafkaProducer<String>(paraTool.get("topic"), new SimpleStringSchema(), paraTool.getProperties()));
env.execute();
}

public static class SimpleStringGenerator implements SourceFunction<String> {
private static final long serialVersionUID = 2174904787118597072L;
boolean running = true;
long i = 0;

public void run(SourceContext<String> ctx) throws Exception {
while (running) {
ctx.collect("element-" + (i++));
Thread.sleep(1000);
}
}

public void cancel() {
running = false;
}
}
}

程序打包

在FlinkExample右键,选择“Run As->Maven install”导出Jar包。

成功导出Jar包后,可以在target文件夹下找到FlinkExample-0.0.1-SNAPSHOT.jar

将FlinkExample-0.0.1-SNAPSHOT.jar上传至大数据集群/home/omm目录下

scp /home/user/eclipse-workspace/FlinkExample/target/FlinkExample-0.0.1-SNAPSHOT.jar root@xxx.xxx.xxx.xxx:/home/omm

安装Flink客户端

类似于Impala客户端安装,需要进入MRS Manager页面,找到对应Flink服务。

 打开实验桌面的“Xfce终端”,使用ssh命令连接集群。

修改文件所属用户、用户组信息、操作权限

cd /home/omm
chown omm:wheel FlinkExample-0.0.1-SNAPSHOT.jar
chmod 777 FlinkExample-0.0.1-SNAPSHOT.jar
chmod 777 FusionInsight_Cluster_1_Flink_Client.tar

解压客户端

tar -vxf FusionInsight_Cluster_1_Flink_Client.tar

tar -vxf FusionInsight_Cluster_1_Flink_ClientConfig.tar

安装Flink客户端至/home/omm目录
cd FusionInsight_Cluster_1_Flink_ClientConfig
./install.sh /home/omm/FlinkClient

WriteIntoKafka程序运行

查询集群Kafka Broker信息

查询集群Zookeeper信息

创建kafka主题Topic

进入Kakfa客户端目录

cd /opt/Bigdata/components/FusionInsight_HD_8.1.0.1/Kafka/client/install_files/kafka/bin
 为脚本添加可执行权限:

chmod 777 kafka-topics.sh
chmod 777 kafka-run-class.sh
chmod 777 kafka-console-producer.sh
chmod 777 kafka-console-consumer.sh

创建Topic:

./kafka-topics.sh --create --zookeeper xxx.xxx.xxx.xxx:2181/kafka --topic FlinkTopic --replication-factor 2 --partitions 2
 

查看topic

./kafka-topics.sh --list --zookeeper xxx.xxx.xxx.xxx:2181/kafka
 

WriteIntoKafka程序运行

 创建Kafka消费者

新建Xfce终端窗口,登录集群。

进入Kafka目录。
cd /opt/Bigdata/components/FusionInsight_HD_8.1.0.1/Kafka/client/install_files/kafka/bin
 执行命令创建消费者,这里IP地址为broker的任一业务IP地址。
./kafka-console-consumer.sh --topic FlinkTopic --bootstrap-server xxx.xxx.xxx.xxx:9092 --from-beginning
 

启动WriteIntoKafka作为Kafka生产者

新建Xfce终端窗口,登录集群

Flink应用处于安全性考虑,需要由非root用户启动

su omm

切换到Flink安装目录

 
cd /home/omm/FlinkClient
 应用环境变量信息
source bigdata_env
 检查“FlinkYarnSessionCli”进程是否启动 jps   注意:若无“FlinkYarnSessionCli”进程,则启动Yarn集群 cd Flink/flink/bin ./yarn-session.sh &     运行Flink应用程序
flink run --class FlinkDemo.WriteIntoKafka /home/omm/FlinkExample-0.0.1-SNAPSHOT.jar --topic FlinkTopic --bootstrap.servers xxx.xxx.xxx.xxx:9092
 

标签:info,Flink,flink,xxx,user,Impala,tyt2023
From: https://www.cnblogs.com/playforever/p/17930282.html

相关文章

  • flink 的安装以及fink-cdc 基于多数据源导入的es 的简单使用
    此文档是参照flink-cdc文档(https://ververica.github.io/flink-cdc-connectors/master/content/快速上手/mysql-postgres-tutorial-zh.html)案例 的最佳实践1.下载flinkrelease最新版本1.18.0并解压, https://repo.maven.apache.org/maven2/org/apache/flink/flink-......
  • Kafka与ClickHouse开发与应用_tyt2023
    本实验基于MRS环境,Kafka部分主要介绍kafka命令行脚本的基本使用规范,以及通过介绍一个电商数据实时分析的场景将Kafka和SparkStreaming进行组合,帮助大家更好地掌握实际项目的开发流程。ClickHouse部分主要介绍常见的业务操作,代码样例中所涉及的SQL操作主要包括创建数据库、创建表......
  • Spark 开发与应用_tyt2023
    本实验基于MRS环境,主要介绍如何利用SparkRDD的常用算子进行简单统计分析,以及如何利用SparkSQL进行结构化批处理。购买弹性公网IP购买MRS集群 选择“自定义购买”区域:华北—北京四计费模式:按需计费集群名称:mrs-bigdata版本类型:普通版集群版本:MRS3.1.0WXL......
  • MRS基础组件之HBase与Hive开发应用_tyt2023
    MRS基础组件之HBase与Hive开发应用本实验基于MRS环境,介绍如何利用HBase与Hive来进行相关操作。其中,HBase主要介绍包括如何利用JavaAPI创建数据表、写入数据、查看数据以及删除数据;而Hive则通过介绍UDF、UDTF和UDAF等自定义函数的基本操作,讲解如何进行自定义数据处理和清洗作业。......
  • HDFS与MapReduce_tyt2023
    1.购买弹性公网IP产品->网络->弹性公网IPEIP计费模式:按需计费区域:华北-北京四线路:全动态BGP公网带宽:按流量计费带宽大小:100IPv6:不开启弹性公网IP名称:eip-bigdata1购买量:12.购买MRS集群产品-》大数据=》MapReduce服务选择“自定义购买”区域:华北—......
  • Flink计算TopN
    在ApacheFlink中实现高效的TopN数据处理,尤其是涉及时间窗口和多条件排序时,需要精细地控制数据流和状态管理。普通计算TopN:1.定义数据源(Source)首先,我们需要定义数据源。这可能是Kafka流、文件、数据库或任何其他支持的数据源。valstream:DataStream[YourType]=en......
  • 【Flink从入门到精通 05】Source&Sink
    【Flink从入门到精通05】Source&SinkFlink用于处理有状态的流式计算,需要对Source端的数据进行加工处理,然后写入到Sink端,下图展示了在Flink中数据所经历的过程,今天就根据这张图分别给大家分享下。01EnvironmentFlink所有的程序都从这一步开始,只有创建了执行环境,才能开......
  • Java版Flink(一)概述和入门案例
    一、概述1、Flink是什么ApacheFlinkisaframeworkanddistributedprocessingengineforstatefulcomputationsoverunboundedandboundeddatastreams.ApacheFlink是一个框架和分布式处理引擎,用于对无界和有界数据流进行状态计算。官网地址2、Flink特点......
  • 【flink番外篇】6、flink的WaterMark(介绍、基本使用、kafka的水印以及超出最大允许延
    Flink系列文章一、Flink专栏Flink专栏系统介绍某一知识点,并辅以具体的示例进行说明。1、Flink部署系列本部分介绍Flink的部署、配置相关基础内容。2、Flink基础系列本部分介绍Flink的基础部分,比如术语、架构、编程模型、编程指南、基本的datastreamapi用法、四大基......
  • 【flink番外篇】5、flink的window(介绍、分类、函数及Tumbling、Sliding、session窗口
    Flink系列文章一、Flink专栏Flink专栏系统介绍某一知识点,并辅以具体的示例进行说明。1、Flink部署系列本部分介绍Flink的部署、配置相关基础内容。2、Flink基础系列本部分介绍Flink的基础部分,比如术语、架构、编程模型、编程指南、基本的datastreamapi用法、四大基......