百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术文章 > 正文

Flink CDC中的事件队列的使用(flink cdc canal)

haoteby 2025-03-19 12:49 32 浏览

生产者/消费者模式是异步编程中最经典且最重要的协作模式之一,其核心思想是通过任务队列实现生产者与消费者之间的解耦,使程序能够高效处理大量任务。在现代软件开发中,这种模式几乎无处不在,无论是线程池、消息队列(如Kafka、RabbitMQ),还是高性能服务器架构,都大量应用了这一模式。

核心概念与实现机制

生产者/消费者模式的基本组成包括三个部分:

  1. 生产者(Producer):负责生成任务或数据的组件
  2. 任务队列(Queue):用于存储待处理任务的缓冲区
  3. 消费者(Consumer):负责处理任务的组件
    在实现机制上,生产者/消费者模式通常采用以下方式:
  • 线程池实现:生产者将任务提交到线程池的任务队列中,消费者是线程池中的线程,负责从队列中取出任务执行。
  • 消息队列实现:生产者将消息发送到消息中间件,消费者从消息队列中拉取消息进行处理。

Flink CDC中的应用

当前以flink CDC 中mysql connector 为源码进行分析。

这个过程涉及到了几个重要的部分:SplitReader、BinlogClient、DataChangeEvent、队列以及SourceReader。
SplitReader 是一个专门设计用来读取和初步处理数据的组件。它通过使用 BinlogClient 与 MySQL 数据库建立连接。Binlog(二进制日志)是 MySQL 中用于记录所有更改数据库状态的信息的一种机制,比如表的插入、更新或删除等操作都会被记录下来。通过 BinlogClient,SplitReader 能够从 MySQL 的 binlog 文件中读取出这些变更信息,SplitReader 不仅负责读取这些日志条目,还会对它们进行解析。这里的解析指的是将原始的日志格式转换为更加结构化且易于后续处理的形式 —— 即 DataChangeEvent 对象。每个 DataChangeEvent 都代表了数据库中发生的一个特定事件,如某条记录被修改了或是新添加了一条记录等,并包含了关于该事件的所有必要信息,如时间戳、受影响的表名、行的具体变化内容等。
一旦这些 DataChangeEvent 被创建出来后,就会被加入到一个队列当中。这个队列充当着缓冲区的角色,确保即使在短时间内产生大量变更时,系统也能够平稳地处理而不至于过载。同时,它还允许不同组件之间异步地交换数据,提高了整个系统的灵活性和响应速度。
SourceReader 是另一个关键组件,它的任务是从上述提到的队列中取出 DataChangeEvent,并将其转发给“下游”——即接下来需要利用这些数据执行进一步处理或分析的部分。这可以包括但不限于数据清洗、转换、加载至其他存储系统、实时计算等等。通过这种方式,SourceReader 担当了数据流动管道中的一个重要环节,确保信息能够高效准确地传递下去。

在配置Mysql connector 时,可以指定以下debezium的属性:

属性

说明

max.queue.size

队列存储的元素最大值

poll.interval.ms

消费者线程拉取数据最大间隔时间

max.queue.size.in.bytes

队列中最大存储的字节

源码分析

该内容的主要代码集中于 ChangeEventQueue, 关注两个方法 doEnqueue 和 poll 方法。

doEnqueue

protected void doEnqueue(T record) throws InterruptedException {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Enqueuing source record '{}'", record);
        }

        synchronized (this) {
            while (queue.size() >= maxQueueSize || (maxQueueSizeInBytes > 0 && currentQueueSizeInBytes >= maxQueueSizeInBytes)) {
                // notify poll() to drain queue
                this.notifyAll();
                // queue size or queue sizeInBytes threshold reached, so wait a bit
                this.wait(pollInterval.toMillis());
            }

            queue.add(record);
            // If we pass a positiveLong max.queue.size.in.bytes to enable handling queue size in bytes feature
            if (maxQueueSizeInBytes > 0) {
                long messageSize = ObjectSizeCalculator.getObjectSize(record);
                sizeInBytesQueue.add(messageSize);
                currentQueueSizeInBytes += messageSize;
            }

            if (queue.size() >= maxBatchSize || (maxQueueSizeInBytes > 0 && currentQueueSizeInBytes >= maxQueueSizeInBytes)) {
                // notify poll() to start draining queue and do not wait
                this.notifyAll();
            }
        }
    }

加入元素的逻辑:

  1. 使用synchronized对当前对象进行锁定
  2. 判断当前队列中是否已经满了,或者存储的内存大小已经超过最大值,如果是则要通知正在等待的线程进行消费,并等待一段时间,重新检查
  3. 当前队列未满的情况下,直接加入队列

poll 逻辑

public List poll() throws InterruptedException {
        LoggingContext.PreviousContext previousContext = loggingContextSupplier.get();

        try {
            LOGGER.debug("polling records...");
            final Timer timeout = Threads.timer(Clock.SYSTEM, Temporals.min(pollInterval, ConfigurationDefaults.RETURN_CONTROL_INTERVAL));
            synchronized (this) {
                List records = new ArrayList<>(Math.min(maxBatchSize, queue.size()));
                // 从队列中获取数据到records,如果不够,等待pollInterval时间
                while (drainRecords(records, maxBatchSize - records.size()) < maxBatchSize
                        && (maxQueueSizeInBytes == 0 || currentQueueSizeInBytes < maxqueuesizeinbytes timeout.expired throwproducerexceptionifpresent logger.debugno records available yet sleeping a bit... long remainingtimeoutmills='timeout.remaining().toMillis();' if remainingtimeoutmills> 0) {
                        // notify doEnqueue() to resume processing (if anything is on wait())
                        this.notify();
                        // no records yet, so wait a bit
                        this.wait(remainingTimeoutMills);
                    }
                    LOGGER.debug("checking for more records...");
                }
                // notify doEnqueue() to resume processing
                this.notify();
                return records;
            }
        }
        finally {
            previousContext.restore();
        }
    }
  1. 通过synchronized进行锁定当前的对象
  2. 按当前的队列中数据量创建临时列表,从当前队列中获取数据,如果当前获取的数据小于最大值并且如果有设置内存判断且当前的的内存大小未达到最大值,在未超时的前提下,则等待pollInterval时间

相关推荐

如何为MySQL服务器和客户机启用SSL?

用户想要与MySQL服务器建立一条安全连接时,常常依赖VPN隧道或SSH隧道。不过,获得MySQL连接的另一个办法是,启用MySQL服务器上的SSL封装器(SSLwrapper)。这每一种方法各有其...

OpenVPN客户端配置_openvpn客户端配置文件解析

...

k8s 证书问题排查_k8s dashboard 证书

从去年开始一些老项目上陆陆续续出现一些列的证书问题,(证书原理这里就不说了,官方文档一堆)多数刚开始的表现就是节点的kubelet服务起不来,节点状态NotReady表现日志如下failed...

企业级网络互通方案:云端OpenVPN+爱快路由器+Win11互联实战

企业级网络互通方案:OpenVPN搭建公有云+爱快路由器+Win11三地互联实战指南「安全高效」三地局域网秒变局域网实施环境说明...

OpenV** Server/Client配置文件详解

Server配置详解...

接口基础认知:关键信息与合规前提

1.核心技术参数(必记)...

S交换机通过SSH登录设备配置示例(RADIUS认证+本地认证独立)

说明:●本示例只介绍设备的认证相关配置,请同时确保已在RADIUS服务器上做了相关配置,如设备地址、共享密钥、创建用户等配置。●通过不同的管理域来实现RADIUS认证与本地认证两种方式同时使用,两...

SSL证书如何去除私钥密码保护_ssl证书怎么取消

有时候我们在生成证书的时候可以加入了密码保护。然后申请到证书安装到了web服务器。但是这样可能会带来麻烦。每次重启apache或者nginx的时候,都需要输入密码。那么SSL证书如何去除私钥密码保护。...

SSL证书基础知识与自签名证书生成指南

一、证书文件类型解析...

S交换机通过SSH登录设备配置示例(RADIUS认证)

说明:本示例只介绍设备的认证相关配置,请同时确保已在RADIUS服务器上做了相关配置,如设备地址、共享密钥、创建用户等配置。假设已在RADIUS服务器上创建了用户名yc123,密码test#123。对...

HTTPS是什么?加密原理和证书。SSL/TLS握手过程

秘钥的产生过程非对称加密...

HTTPS TLS握手流程_进行tls握手

1.客户端向服务器发送`ClientHello`消息,包括支持的TLS版本、加密套件、随机数等信息。2.服务器收到`ClientHello`消息后,解析其中的信息,并根据配置选择一个加密套件。3....

Spring Boot 单点登录(SSO)实现_spring boot 单点登录jwt

SpringBoot单点登录(SSO)实现全指南单点登录(SingleSign-On,SSO)是一种身份验证机制,允许用户使用一组凭证登录多个相关但独立的系统。在微服务架构和企业级系统中,SS...

源码分享:在pdf上加盖电子签章_pdf如何加盖电子公章

在pdf上加盖电子签章,并不是只是加个印章图片,。而是要使用一对密钥中的私钥对文件进行签字。为啥要用私钥呢?很简单,因为公钥是公开的,其他人才可以用公钥为你证明,这个文件是你签的。这就是我们常说的:私...

微信支付商户API证书到期 怎么更换

微信支付商户API证书到期更换是一个非常重要的操作,需要仔细按照流程进行。如果证书过期,所有通过API的支付、退款等操作都会失败,将直接影响您的业务。请按照以下详细步骤进行操作:重要前提:分清...