首页 > 其他分享 >Kafka+Fink 实战+工具类

Kafka+Fink 实战+工具类

时间:2023-08-19 22:33:26浏览次数:40  
标签:实战 return String jsonObject param Kafka Fink static public

  • LogServiceImpl
@Service
@Slf4j
public class LogServiceImpl implements LogService {

    private static final String TOPIC_NAME = "ods_link_visit_topic";

    @Autowired
    private KafkaTemplate kafkaTemplate;

    /**
     * 记录日志
     *
     * @param request
     * @param shortLinkCode
     * @param accountNo
     * @return
     */
    @Override
    public void recodeShortLinkLog(HttpServletRequest request, String shortLinkCode, Long accountNo) {
        // ip、 浏览器信息
        String ip = CommonUtil.getIpAddr(request);
        // 全部请求头
        Map<String, String> headerMap = CommonUtil.getAllRequestHeader(request);

        Map<String,String> availableMap = new HashMap<>();
        availableMap.put("user-agent",headerMap.get("user-agent"));
        availableMap.put("referer",headerMap.get("referer"));
        availableMap.put("accountNo",accountNo.toString());

        LogRecord logRecord = LogRecord.builder()
                //日志类型
                .event(LogTypeEnum.SHORT_LINK_TYPE.name())
                //日志内容
                .data(availableMap)
                //客户端ip
                .ip(ip)
                // 时间
                .ts(CommonUtil.getCurrentTimestamp())
                //业务唯一标识(短链码)
                .bizId(shortLinkCode).build();

        String jsonLog = JsonUtil.obj2Json(logRecord);

        //打印日志 in 控制台
        log.info(jsonLog);

        // 发送kafka
        kafkaTemplate.send(TOPIC_NAME,jsonLog);


    }
}

  • DwdShortLinkLogApp
@Slf4j
public class DwdShortLinkLogApp {
    //定义 topic
    public static final String SOURCE_TOPIC = "ods_link_visit_topic";

    //定义 消费组
    public static final String SINK_TOPIC = "dwd_link_visit_topic";

    //定义 消费组
    public static final String GROUP_ID = "dwd_short_link_group";


    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

//        DataStream<String> ds = env.socketTextStream("192.168.75.146", 8888);

        FlinkKafkaConsumer<String> kafkaConsumer = KafkaUtil.getKafkaConsumer(SOURCE_TOPIC, GROUP_ID);

        DataStreamSource<String> ds = env.addSource(kafkaConsumer);

        ds.print();

        SingleOutputStreamOperator<JSONObject> jsonDs = ds.flatMap(new FlatMapFunction<String, JSONObject>() {

            @Override
            public void flatMap(String value, Collector<JSONObject> out) throws Exception {
                JSONObject jsonObject = JSON.parseObject(value);
                // 生成web端设备唯一标识
                String udid = getDeviceId(jsonObject);
                jsonObject.put("udid",udid);

                String referer = getReferer(jsonObject);
                jsonObject.put("referer",referer);

                out.collect(jsonObject);

            }
        });

        // 分组
        KeyedStream<JSONObject, String> keyedStream = jsonDs.keyBy(new KeySelector<JSONObject, String>() {

            @Override
            public String getKey(JSONObject value) throws Exception {
                return value.getString("udid");

            }
        });


        // 识别新老访客    richMap open函数,对状态以及日期格式进行初始化

        SingleOutputStreamOperator<String> jsonDSWithVisitorState = keyedStream.map(new VisitorMapFunction());

        jsonDSWithVisitorState.print("ods新老访客");

        // 存储到dwd
        FlinkKafkaProducer<String> kafkaProducer = KafkaUtil.getKafkaProducer(SINK_TOPIC);

        jsonDSWithVisitorState.addSink(kafkaProducer);


        env.execute();
    }

    /**
     * 获取referer
     * @param jsonObject
     * @return
     */
    public static String getReferer(JSONObject jsonObject){
        JSONObject dataJsonObj = jsonObject.getJSONObject("data");
        if(dataJsonObj.containsKey("referer")){

            String referer = dataJsonObj.getString("referer");
            if(StringUtils.isNotBlank(referer)){
                try {
                    URL url = new URL(referer);
                    return url.getHost();
                } catch (MalformedURLException e) {
                    log.error("提取referer失败:{}",e.toString());
                }
            }
        }

        return "";

    }

    /**
     * 生成设备唯一标识
     *
     * @param jsonObject
     * @return
     */
    public static String getDeviceId(JSONObject jsonObject){
        Map<String,String> map= new TreeMap<>();

        try{
            map.put("ip",jsonObject.getString("ip"));
            map.put("event",jsonObject.getString("event"));
            map.put("bizId",jsonObject.getString("bizId"));
            map.put("userAgent",jsonObject.getJSONObject("data").getString("userAgent"));

            return DeviceUtil.geneWebUniqueDeviceId(map);

        }catch (Exception e){
            log.error("生产唯一deviceId异常:{}", jsonObject);
            return null;
        }


    }


}

  • KafkaUtil

    @Slf4j
    public class KafkaUtil {
    
        /**
         * kafka 的 broker 地址
         */
        private static String KAFKA_SERVER = null;
    
        static {
            Properties properties = new Properties();
    
            InputStream in = KafkaUtil.class.getClassLoader().getResourceAsStream("application.properties");
    
            try {
                properties.load(in);
            } catch (IOException e) {
                e.printStackTrace();
                log.error("加载Kafka配置文件失败:{}",e.getMessage());
            }
    
            //获取配置文件中的value
            KAFKA_SERVER = properties.getProperty("kafka.servers");
    
        }
    
        /**
         * 获取flink的kafka消费者
         * @param topic
         * @param groupId
         * @return
         */
        public static FlinkKafkaConsumer<String> getKafkaConsumer(String topic,String groupId){
            Properties properties = new Properties();
            properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,groupId);
            properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,KAFKA_SERVER);
    
            return new FlinkKafkaConsumer<String>(topic,new SimpleStringSchema(),properties);
        }
    
        /**
         * 获取flink的kafka生产者
         * @param topic
         * @return
         */
        public static FlinkKafkaProducer<String> getKafkaProducer(String topic){
            return new FlinkKafkaProducer<String>(KAFKA_SERVER,topic,new SimpleStringSchema());
        }
    }
    
    
  • TimeUtil

    public class TimeUtil {
    
        /**
         * 默认日期格式
         */
        private static final String DEFAULT_PATTERN = "yyyy-MM-dd";
    
        /**
         * 默认日期格式
         */
        private static final DateTimeFormatter DEFAULT_DATE_TIME_FORMATTER  = DateTimeFormatter.ofPattern(DEFAULT_PATTERN);
    
        private static final ZoneId DEFAULT_ZONE_ID = ZoneId.systemDefault();
    
    
        /**
         * LocalDateTime 转 字符串,指定日期格式
         * @param localDateTime
         * @param pattern
         * @return
         */
        public static String format(LocalDateTime localDateTime, String pattern){
            DateTimeFormatter formatter = DateTimeFormatter.ofPattern(pattern);
            String timeStr = formatter.format(localDateTime.atZone(DEFAULT_ZONE_ID));
            return timeStr;
        }
    
    
        /**
         * Date 转 字符串, 指定日期格式
         * @param time
         * @param pattern
         * @return
         */
        public static String format(Date time, String pattern){
            DateTimeFormatter formatter = DateTimeFormatter.ofPattern(pattern);
            String timeStr = formatter.format(time.toInstant().atZone(DEFAULT_ZONE_ID));
            return timeStr;
        }
    
        /**
         *  Date 转 字符串,默认日期格式
         * @param time
         * @return
         */
        public static String format(Date time){
    
            String timeStr = DEFAULT_DATE_TIME_FORMATTER.format(time.toInstant().atZone(DEFAULT_ZONE_ID));
            return timeStr;
        }
    
        /**
         * timestamp 转 字符串,默认日期格式
         *
         * @param timestamp
         * @return
         */
        public static String format(long timestamp) {
            String timeStr = DEFAULT_DATE_TIME_FORMATTER.format(new Date(timestamp).toInstant().atZone(DEFAULT_ZONE_ID));
            return timeStr;
        }
    
    
        /**
         * 字符串 转 Date
         *
         * @param time
         * @return
         */
        public static Date strToDate(String time) {
            LocalDateTime localDateTime = LocalDateTime.parse(time, DEFAULT_DATE_TIME_FORMATTER);
            return Date.from(localDateTime.atZone(DEFAULT_ZONE_ID).toInstant());
    
        }
    
    }
    

标签:实战,return,String,jsonObject,param,Kafka,Fink,static,public
From: https://www.cnblogs.com/xietingwei/p/17643274.html

相关文章

  • 一线大厂性能优化实战解析,看到就是赚到
    前言我们平时在使用软件的过程中是不是遇到过这样的情况:"这个app怎么还没下载完!"、太卡了吧!"、"图片怎么还没加载出来!"、"怎么刚进去就卡了!"、"这怎么点了一下就退出了!"等等,这些情况其实包含了我们性能优化的主要内容.,性能的优化是一个老生常谈的点,也是一个比较重要的点.特......
  • 【web_逆向09】AES加密逆向实战
    目标网站话不多说,直接干:https://www.XXXX.com/rank_m/c7/,可以联系本人微信号:wxid_ps0bm4kbsl0t22寻找加密入口查看接口数据,发现入参、出参都是经过加密的,需要加密、解密查看Initiator中,发现promise。异步通过interceptors搜索,往回找不一定能找到,可以考虑正向搜索注......
  • uniapp APP微信登录、支付、分享以及支付宝支付 实战踩坑记录
    1、微信支付和支付宝支付  先上代码、封装好了的组件   html部分    <template> <viewclass="rows"> <!------------------------------充值的弹框开始------------------------------> <uni-popupclass="common-popup"ref="popupChongZhi":i......
  • yolov7实战
    目录1.网络结构(1)backbone(2)SPPCSPC层(3)RepConv层2.标签分配(1)对位置和anchor框大小做初筛(2)根据IOU和类别进行复筛3.计算损失4.预测结果YOLOV7主要的贡献在于:1.将模型重参数化引入到网络架构中,重参数化这一思想最早出现于REPVGG中。2.标签分配策略采用的是YOLOV5的跨网......
  • 部署Kafka+ZK及其日志采集实战(系统版本:linux_CentOs_7.8)
    部署ZKdockerrun-d--namezookeeper-p2181:2181-twurstmeister/zookeeper部署Kafka-p9092:9092\-eKAFKA_BROKER_ID=0\--envKAFKA_HEAP_OPTS=-Xmx256M\--envKAFKA_HEAP_OPTS=-Xms128M\-eKAFKA_ZOOKEEPER_CONNECT=[内网ip]:2181\-eKAFKA_ADVERTISED......
  • Debezium+KafkaConnect+Confluent实现企业级实时数据复制平台
    【I】集群规划5台节点IP地址  10.101.1.45 ZK、Kafka、DebeziumConnector、JDK、DebeziumUI、MySQL、Kafka-Eagle10.101.1.46 ZK、Kafka、DebeziumConnector、JDK10.101.1.47 ZK、Kafka、DebeziumConnector、JDK10.101.1.48 ZK、Kafka、DebeziumConnector、JDK10.......
  • vagrant实战爬坑
    为什么要用到这个技术?简单来说,vagrant是一个操作虚拟机的工具。它提供了一套高效而便利的虚拟机管理方式,通过命令和配置文件,当然也要基于vagrant自身的约定,就能很快的完成一套开发环境的部署,并可以打包传播,极大的方便了在工作环境中,各个开发环境不一致的问题,也解决了重复配置环......
  • 开源数据库Mysql_DBA运维实战 (总结)
    开源数据库Mysql_DBA运维实战(总结)SQL语句都包含哪些类型DDLDCLDMLDQLYum安装MySQL的配置文件配置文件:/etc/my.cnf日志目录:/var/log/mysqld.log错误日志:/var/log/mysql/error.logMySQL的主从切换查看主从复制状态停止主数据库的写入操作记录当前二级制日志文件和位置更新从数据库......
  • 软件调试与问题排查的修炼之路与实战经验
    久经沙场,才能练就丰富经验与实战能力。调试调试,调整与测试。那些机械工程师通常需要对仪器参数进行设置以便能够更好的观察。软件调试有种类似的含义,比如高级工程师会对一些参数进行设置以便达到更好的性能优化。而在通常意义上,调试通常是指对不合预期的状态进行定位、调......
  • C++项目实战之演讲比赛流程管理系统
    演讲比赛流程管理系统1.演讲比赛程序需求1.1比赛规则学校举行一场演讲比赛,共有12个人参加。比赛共两轮,第一轮为淘汰赛,第二轮为决赛每名选手都有对应的编号,如10001~10012比赛方式:分组比赛,每组6个人第一轮分为两个小组,整体按照选手编号进行抽签后顺序演讲10个......