百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术文章 > 正文

Flink CDC中的事件队列的使用(flink cdc canal)

haoteby 2025-03-19 12:49 28 浏览

生产者/消费者模式是异步编程中最经典且最重要的协作模式之一,其核心思想是通过任务队列实现生产者与消费者之间的解耦,使程序能够高效处理大量任务。在现代软件开发中,这种模式几乎无处不在,无论是线程池、消息队列(如Kafka、RabbitMQ),还是高性能服务器架构,都大量应用了这一模式。

核心概念与实现机制

生产者/消费者模式的基本组成包括三个部分:

  1. 生产者(Producer):负责生成任务或数据的组件
  2. 任务队列(Queue):用于存储待处理任务的缓冲区
  3. 消费者(Consumer):负责处理任务的组件
    在实现机制上,生产者/消费者模式通常采用以下方式:
  • 线程池实现:生产者将任务提交到线程池的任务队列中,消费者是线程池中的线程,负责从队列中取出任务执行。
  • 消息队列实现:生产者将消息发送到消息中间件,消费者从消息队列中拉取消息进行处理。

Flink CDC中的应用

当前以flink CDC 中mysql connector 为源码进行分析。

这个过程涉及到了几个重要的部分:SplitReader、BinlogClient、DataChangeEvent、队列以及SourceReader。
SplitReader 是一个专门设计用来读取和初步处理数据的组件。它通过使用 BinlogClient 与 MySQL 数据库建立连接。Binlog(二进制日志)是 MySQL 中用于记录所有更改数据库状态的信息的一种机制,比如表的插入、更新或删除等操作都会被记录下来。通过 BinlogClient,SplitReader 能够从 MySQL 的 binlog 文件中读取出这些变更信息,SplitReader 不仅负责读取这些日志条目,还会对它们进行解析。这里的解析指的是将原始的日志格式转换为更加结构化且易于后续处理的形式 —— 即 DataChangeEvent 对象。每个 DataChangeEvent 都代表了数据库中发生的一个特定事件,如某条记录被修改了或是新添加了一条记录等,并包含了关于该事件的所有必要信息,如时间戳、受影响的表名、行的具体变化内容等。
一旦这些 DataChangeEvent 被创建出来后,就会被加入到一个队列当中。这个队列充当着缓冲区的角色,确保即使在短时间内产生大量变更时,系统也能够平稳地处理而不至于过载。同时,它还允许不同组件之间异步地交换数据,提高了整个系统的灵活性和响应速度。
SourceReader 是另一个关键组件,它的任务是从上述提到的队列中取出 DataChangeEvent,并将其转发给“下游”——即接下来需要利用这些数据执行进一步处理或分析的部分。这可以包括但不限于数据清洗、转换、加载至其他存储系统、实时计算等等。通过这种方式,SourceReader 担当了数据流动管道中的一个重要环节,确保信息能够高效准确地传递下去。

在配置Mysql connector 时,可以指定以下debezium的属性:

属性

说明

max.queue.size

队列存储的元素最大值

poll.interval.ms

消费者线程拉取数据最大间隔时间

max.queue.size.in.bytes

队列中最大存储的字节

源码分析

该内容的主要代码集中于 ChangeEventQueue, 关注两个方法 doEnqueue 和 poll 方法。

doEnqueue

protected void doEnqueue(T record) throws InterruptedException {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Enqueuing source record '{}'", record);
        }

        synchronized (this) {
            while (queue.size() >= maxQueueSize || (maxQueueSizeInBytes > 0 && currentQueueSizeInBytes >= maxQueueSizeInBytes)) {
                // notify poll() to drain queue
                this.notifyAll();
                // queue size or queue sizeInBytes threshold reached, so wait a bit
                this.wait(pollInterval.toMillis());
            }

            queue.add(record);
            // If we pass a positiveLong max.queue.size.in.bytes to enable handling queue size in bytes feature
            if (maxQueueSizeInBytes > 0) {
                long messageSize = ObjectSizeCalculator.getObjectSize(record);
                sizeInBytesQueue.add(messageSize);
                currentQueueSizeInBytes += messageSize;
            }

            if (queue.size() >= maxBatchSize || (maxQueueSizeInBytes > 0 && currentQueueSizeInBytes >= maxQueueSizeInBytes)) {
                // notify poll() to start draining queue and do not wait
                this.notifyAll();
            }
        }
    }

加入元素的逻辑:

  1. 使用synchronized对当前对象进行锁定
  2. 判断当前队列中是否已经满了,或者存储的内存大小已经超过最大值,如果是则要通知正在等待的线程进行消费,并等待一段时间,重新检查
  3. 当前队列未满的情况下,直接加入队列

poll 逻辑

public List poll() throws InterruptedException {
        LoggingContext.PreviousContext previousContext = loggingContextSupplier.get();

        try {
            LOGGER.debug("polling records...");
            final Timer timeout = Threads.timer(Clock.SYSTEM, Temporals.min(pollInterval, ConfigurationDefaults.RETURN_CONTROL_INTERVAL));
            synchronized (this) {
                List records = new ArrayList<>(Math.min(maxBatchSize, queue.size()));
                // 从队列中获取数据到records,如果不够,等待pollInterval时间
                while (drainRecords(records, maxBatchSize - records.size()) < maxBatchSize
                        && (maxQueueSizeInBytes == 0 || currentQueueSizeInBytes < maxqueuesizeinbytes timeout.expired throwproducerexceptionifpresent logger.debugno records available yet sleeping a bit... long remainingtimeoutmills='timeout.remaining().toMillis();' if remainingtimeoutmills> 0) {
                        // notify doEnqueue() to resume processing (if anything is on wait())
                        this.notify();
                        // no records yet, so wait a bit
                        this.wait(remainingTimeoutMills);
                    }
                    LOGGER.debug("checking for more records...");
                }
                // notify doEnqueue() to resume processing
                this.notify();
                return records;
            }
        }
        finally {
            previousContext.restore();
        }
    }
  1. 通过synchronized进行锁定当前的对象
  2. 按当前的队列中数据量创建临时列表,从当前队列中获取数据,如果当前获取的数据小于最大值并且如果有设置内存判断且当前的的内存大小未达到最大值,在未超时的前提下,则等待pollInterval时间

相关推荐

Chrome OS 41 用 Freon 取代 X11_chrome os atom

在刚发布的ChromeOS41里,除了常规的Wi-Fi稳定性提升(几乎所有系统的changelog里都会包含这一项)、访客模式壁纸等之外,还存在底层改变。这一更新中Google移除...

苹果iPad Pro再曝光 有望今年六月发布

自进入2015年以后,有关大屏iPad的消息便一直不绝于耳,之前就有不少媒体猜想这款全新的平板电脑将会在三月发布,不过可惜的是我么只在那次发布会上看到了MacBookPro。近日@Ubuntu团队便...

雷卯针对香橙派Orange Pi 5 Max开发板防雷防静电方案

一、应用场景高端平板、边缘计算、人工智能、云计算、AR/VR、智能安防、智能家居、Linux桌面计算机、Linux网络服务器、Android平板、Android游戏机...

Ubuntu Server无法更新问题解决_ubuntu server not found

上周老家的一台运行UbuntuServer的盒子无法连接上了,中秋这两天回来打开,顺手更新一下发现更新报错。提示`E:Releasefileforhttps://mirrors.aliyun...

虚幻引擎5正式版发布:古墓丽影&amp;巫师新作采用、新一代实时渲染

机器之心报道编辑:杜伟、陈萍虚幻引擎5的目标是「助力各种规模的团队在视觉领域和互动领域挑战极限,施展无限潜能」。...

AMD Milan-X双路霄龙7773X平台基准测试曝光 CPU缓存总量超1.5GB

OpenBenchmarking基准测试数据库刚刚曝光了AMDMilan-X双路霄龙7773X平台的跑分成绩,虽然很快就被撤下,但我们还是知晓了高达1.6GB的总CPU缓存。早些时...

ROS机器人建模_ros机器人硬件搭建

...

全网最新的Dify(1.7.2)私有化离线部署教程(ARM架构)

Hello,大家好!近期工作中有涉及到Dify私有化离线部署,特别是针对于一些国产设备。因此特别整理了该教程,实测有效!有需要的小伙伴可以参考下!本文主要针对Dify1.7.2最新版本+国产操作系...

在ubuntu下新建asp.net core项目_创建ubuntu

本文一步步讲述在ubuntu下用visualstudiocode创建asp.netcore项目的过程。step1:环境操作系统:virtualbox下安装的lubuntu。请不要开启“硬件...

在晶晨A311D2处理器上进行Linux硬件视频编码
在晶晨A311D2处理器上进行Linux硬件视频编码

在KhadasVIM4AmogicA311D2SBC上,我更多的时间是在使用Ubuntu22.04。它的总体性能还不错,只不过缺少3D图形加速和硬件视...

2025-08-26 17:22 haoteby

Nacos3.0重磅来袭!全面拥抱AI,单机及集群模式安装详细教程!

之前和大家分享过JDK17的多版本管理及详细安装过程,然后在项目升级完jdk17后又发现之前的注册和配置中心nacos又用不了,原因是之前的nacos1.3版本的,版本太老了,已经无法适配当前新的JD...

电影质量级渲染来了!虚幻引擎5.3正式发布:已开放下载

快科技9月8日消息,日前,Unrealengine正式发布了虚幻引擎5.3,带来了大量全方位的改进。...

2025如何选购办公电脑?极摩客mini主机英特尔系列选购指南

当下,迷你主机的性能越来越强,品类也越来越多。但是CPU是不变的,基本都是AMD和英特尔的。有一个小伙伴在评论区提问,我应该如何在众多机器中选购一台符合自己的迷你主机呢?那今天我们优先把我们的系列,分...

ubuntu 20.04+RTX4060 Ti+CUDA 11.7+cudnn

ububtu添加国内源sudocp/etc/apt/sources.list/etc/apt/sources.list.backupsudovim/etc/apt/sources.lis...

Linux Mint 18将重新基于Ubuntu 16.04 带来更好硬件支持

项目负责人ClementLefebvre在本月6日披露了关于LinuxMint18“Sarah”操作系统的大量信息,包括带来全新扁平化体验的Mint-Y主题。而现在,这款将于年底之前上线的操作...