首页 > 其他分享 >实现一个实时数据平台的小型demo

实现一个实时数据平台的小型demo

时间:2024-06-18 20:29:07浏览次数:25  
标签:小型 java demo flink 实时 springframework import org public

近期自己梳理了一下自己所属业务线上的数据中台技术栈,以常见的实时链路为例,从最初的埋点到数据服务层查询到结果,依次经过:

1、埋点上报

2、写入消息队列

3、flink读取队列

4、flink写入clickhouse或hbase

5、spring项目提供查询和接口返回

搭建个简易版的实时数据平台流程跑通,手操实践一下,对自己使用过的技术做一个总结,对整体脉络做一个梳理。

完整工程结构

my-java-project
│
├── src
│   ├── main
│   │   ├── java
│   │   │   ├── com
│   │   │   │   ├── example
│   │   │   │   │   ├── controller
│   │   │   │   │   │   ├── EventController.java
│   │   │   │   │   │   ├── QueryController.java
│   │   │   │   │   ├── flink
│   │   │   │   │   │   ├── FlinkKafkaConsumerJob.java
│   │   │   │   │   │   ├── ClickHouseSinkFunction.java
│   │   ├── resources
│   │   │   ├── application.properties
│
├── pom.xml

1. 埋点上报

1.1 创建埋点上报服务
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/events")
public class EventController {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @PostMapping("/report")
    public String reportEvent(@RequestBody String event) {
        kafkaTemplate.send("events_topic", event);
        return "Event reported successfully";
    }
}
1.2 配置 Kafka

application.properties 中添加 Kafka 配置:

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

2. 写入消息队列

使用 Kafka 作为消息队列,已经在上面的代码中实现了写入 Kafka 的功能。

3. Flink 读取队列

3.1 创建 Flink 项目

pom.xml 中添加 Flink 和 Kafka 依赖:

<dependencies>
    <!-- Flink dependencies -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.14.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.12</artifactId>
        <version>1.14.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka_2.12</artifactId>
        <version>1.14.2</version>
    </dependency>
    <!-- Add dependencies for ClickHouse or HBase -->
</dependencies>
3.2 Flink 读取 Kafka 数据
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.api.datastream.DataStream;

import java.util.Properties;

public class FlinkKafkaConsumerJob {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("group.id", "flink_consumer");

        FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(
                "events_topic",
                new SimpleStringSchema(),
                properties
        );

        DataStream<String> stream = env.addSource(kafkaConsumer);

        // Process stream and write to ClickHouse or HBase
        stream.addSink(new ClickHouseSinkFunction());

        env.execute("Flink Kafka Consumer Job");
    }
}

4. Flink 写入 ClickHouse 或 HBase

4.1 写入 ClickHouse

添加 ClickHouse 依赖:

<dependency>
    <groupId>ru.yandex.clickhouse</groupId>
    <artifactId>clickhouse-jdbc</artifactId>
    <version>0.2.4</version>
</dependency>

实现写入 ClickHouse 的 Sink Function:

import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import ru.yandex.clickhouse.ClickHouseConnection;
import ru.yandex.clickhouse.ClickHouseDataSource;

import java.sql.PreparedStatement;
import java.sql.SQLException;

public class ClickHouseSinkFunction implements SinkFunction<String> {

    private transient ClickHouseConnection connection;
    private transient PreparedStatement statement;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        ClickHouseDataSource dataSource = new ClickHouseDataSource("jdbc:clickhouse://localhost:8123/default");
        connection = dataSource.getConnection();
        statement = connection.prepareStatement("INSERT INTO events (event) VALUES (?)");
    }

    @Override
    public void invoke(String value, Context context) throws SQLException {
        statement.setString(1, value);
        statement.executeUpdate();
    }

    @Override
    public void close() throws Exception {
        super.close();
        if (statement != null) {
            statement.close();
        }
        if (connection != null) {
            connection.close();
        }
    }
}

5. Spring 项目提供查询和接口返回

5.1 创建查询接口
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.List;

@RestController
@RequestMapping("/api")
public class QueryController {

    @Autowired
    private JdbcTemplate jdbcTemplate;

    @GetMapping("/events")
    public List<String> getEvents() {
        return jdbcTemplate.queryForList("SELECT event FROM events", String.class);
    }
}
5.2 配置 ClickHouse 数据源

application.properties 中添加 ClickHouse 配置:

spring.datasource.url=jdbc:clickhouse://localhost:8123/default
spring.datasource.username=default
spring.datasource.password=
spring.datasource.driver-class-name=ru.yandex.clickhouse.ClickHouseDriver

标签:小型,java,demo,flink,实时,springframework,import,org,public
From: https://blog.csdn.net/weixin_48313678/article/details/139782246

相关文章

  • 高效、智能、安全:小型机房EasyCVR+AI视频综合监控解决方案
    一、背景需求分析随着信息技术的迅猛发展,小型机房在企事业单位中扮演着越来越重要的角色。为了确保机房的安全稳定运行,远程监控成为了必不可少的手段。二、视频监控视频监控是机房远程监控的重要组成部分。通过安装IP摄像机及部署视频监控系统EasyCVR平台,可以实现机房的实时监......
  • 由心知天气服务器响应的实时天气数据并进行JSON解析
    由心知天气服务器响应的实时天气数据并进行JSON解析#include<netinet/in.h>#include<arpa/inet.h>#include<stdio.h>#include<errno.h>#include<sys/socket.h>#include<netinet/in.h>#include<netinet/ip.h>#include<arpa/inet.h>......
  • 更简易、高效的微软Edge-tts库实时详细的经验分享
    Edge-tts是一个利用微软AzureCognitiveServices的在线文本转语音服务的Python库。它允许开发者在Python代码中使用该服务,而无需安装MicrosoftEdge浏览器、Windows操作系统或使用API密钥。这个Edge-tts库通过调用微软Edge浏览器的文本朗读相关API实现文本转......
  • 基于html,css,js,jQuery,ajax实时天气的网页
    <!DOCTYPEhtml><htmllang="en"><head><metacharset="UTF-8"><metaname="viewport"content="width=device-width,initial-scale=1.0"><linkrel="stylesheet"hr......
  • 英伟达开源 3400 亿参数模型;苹果 iOS 18 紧急 SOS 新增实时视频功能丨 RTE 开发者日报
       开发者朋友们大家好: 这里是「RTE开发者日报」,每天和大家一起看新闻、聊八卦。我们的社区编辑团队会整理分享RTE(Real-TimeEngagement)领域内「有话题的新闻」、「有态度的观点」、「有意思的数据」、「有思考的文章」、「有看点的会议」,但内容仅代表编辑的个人观......
  • 实时工业数据采集分析系统高效处理产线信息!
    对于大部分制造业企业,测量仪器的自动数据采集一直是个令人烦恼的事情,即使仪器已经具有RS232/485等接口,但仍然在使用一边测量,一边手工记录到纸张,再输入到PC中处理的方式,不但工作繁重,同时也无法保证数据的准确性,常常管理人员得到的数据已经是滞后了一两天的数据;而对于现场的不......
  • Elasticsearch 近实时搜索的底层原理
    我们都知道Elasticsearch的搜索是近实时的,数据写入后,立即搜索(不通过id)文档是搜不到的。这一切的原因要归于lucene所提供的API,因为lucene的API就是非实时的,Elasticsearch在lucene之上盖房子,通过一些增强,实现了查询的近实时和id查询的实时性。本文就来看看这个近实时......
  • 使用OpenCV进行实时性别和年龄识别
            在计算机视觉领域,使用深度学习技术进行实时性别和年龄识别是一项具有挑战性和实用性的任务。本文将深入解析一个使用OpenCV和预训练模型实现的实时性别和年龄识别代码,并逐行进行详细的注释解析,帮助读者理解代码的工作原理和实现细节。importcv2importnumpy......
  • 2024 最新谷歌邮箱 Gmail 账号注册完整指南 (多种方法 实时更新)
    Gmail是目前国内外是最常见、使用最广泛的邮箱,基本上持有谷歌邮箱的人可以”横行互联网“。针对很多人反映自己在注册谷歌账号时总是失败,本文整理了截止2024年6月亲测可用的所有注册方法,以图文结合的形式详细手把手带你注册Gmail新账户。 本文将包括:注册Gmail的主......
  • Demo | 利用机器学习构建作物模型的Python源码
    作物模型提出很早,但应用有限。看起来复杂,其实解决的是环境与表型间的关联,可参考前期推文:作物生长模型CropGrow。环境组的复杂,关键在于数据的准确性获取。对于数据分析人员来说,如果不care数据准确性,分析其实很简单的,就是经典的机器学习流程。这里提供一段伪代码仅供参考。1.导库......