百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术文章 > 正文

Flink CDC中的事件队列的使用(flink cdc canal)

haoteby 2025-03-19 12:49 23 浏览

生产者/消费者模式是异步编程中最经典且最重要的协作模式之一,其核心思想是通过任务队列实现生产者与消费者之间的解耦,使程序能够高效处理大量任务。在现代软件开发中,这种模式几乎无处不在,无论是线程池、消息队列(如Kafka、RabbitMQ),还是高性能服务器架构,都大量应用了这一模式。

核心概念与实现机制

生产者/消费者模式的基本组成包括三个部分:

  1. 生产者(Producer):负责生成任务或数据的组件
  2. 任务队列(Queue):用于存储待处理任务的缓冲区
  3. 消费者(Consumer):负责处理任务的组件
    在实现机制上,生产者/消费者模式通常采用以下方式:
  • 线程池实现:生产者将任务提交到线程池的任务队列中,消费者是线程池中的线程,负责从队列中取出任务执行。
  • 消息队列实现:生产者将消息发送到消息中间件,消费者从消息队列中拉取消息进行处理。

Flink CDC中的应用

当前以flink CDC 中mysql connector 为源码进行分析。

这个过程涉及到了几个重要的部分:SplitReader、BinlogClient、DataChangeEvent、队列以及SourceReader。
SplitReader 是一个专门设计用来读取和初步处理数据的组件。它通过使用 BinlogClient 与 MySQL 数据库建立连接。Binlog(二进制日志)是 MySQL 中用于记录所有更改数据库状态的信息的一种机制,比如表的插入、更新或删除等操作都会被记录下来。通过 BinlogClient,SplitReader 能够从 MySQL 的 binlog 文件中读取出这些变更信息,SplitReader 不仅负责读取这些日志条目,还会对它们进行解析。这里的解析指的是将原始的日志格式转换为更加结构化且易于后续处理的形式 —— 即 DataChangeEvent 对象。每个 DataChangeEvent 都代表了数据库中发生的一个特定事件,如某条记录被修改了或是新添加了一条记录等,并包含了关于该事件的所有必要信息,如时间戳、受影响的表名、行的具体变化内容等。
一旦这些 DataChangeEvent 被创建出来后,就会被加入到一个队列当中。这个队列充当着缓冲区的角色,确保即使在短时间内产生大量变更时,系统也能够平稳地处理而不至于过载。同时,它还允许不同组件之间异步地交换数据,提高了整个系统的灵活性和响应速度。
SourceReader 是另一个关键组件,它的任务是从上述提到的队列中取出 DataChangeEvent,并将其转发给“下游”——即接下来需要利用这些数据执行进一步处理或分析的部分。这可以包括但不限于数据清洗、转换、加载至其他存储系统、实时计算等等。通过这种方式,SourceReader 担当了数据流动管道中的一个重要环节,确保信息能够高效准确地传递下去。

在配置Mysql connector 时,可以指定以下debezium的属性:

属性

说明

max.queue.size

队列存储的元素最大值

poll.interval.ms

消费者线程拉取数据最大间隔时间

max.queue.size.in.bytes

队列中最大存储的字节

源码分析

该内容的主要代码集中于 ChangeEventQueue, 关注两个方法 doEnqueue 和 poll 方法。

doEnqueue

protected void doEnqueue(T record) throws InterruptedException {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Enqueuing source record '{}'", record);
        }

        synchronized (this) {
            while (queue.size() >= maxQueueSize || (maxQueueSizeInBytes > 0 && currentQueueSizeInBytes >= maxQueueSizeInBytes)) {
                // notify poll() to drain queue
                this.notifyAll();
                // queue size or queue sizeInBytes threshold reached, so wait a bit
                this.wait(pollInterval.toMillis());
            }

            queue.add(record);
            // If we pass a positiveLong max.queue.size.in.bytes to enable handling queue size in bytes feature
            if (maxQueueSizeInBytes > 0) {
                long messageSize = ObjectSizeCalculator.getObjectSize(record);
                sizeInBytesQueue.add(messageSize);
                currentQueueSizeInBytes += messageSize;
            }

            if (queue.size() >= maxBatchSize || (maxQueueSizeInBytes > 0 && currentQueueSizeInBytes >= maxQueueSizeInBytes)) {
                // notify poll() to start draining queue and do not wait
                this.notifyAll();
            }
        }
    }

加入元素的逻辑:

  1. 使用synchronized对当前对象进行锁定
  2. 判断当前队列中是否已经满了,或者存储的内存大小已经超过最大值,如果是则要通知正在等待的线程进行消费,并等待一段时间,重新检查
  3. 当前队列未满的情况下,直接加入队列

poll 逻辑

public List poll() throws InterruptedException {
        LoggingContext.PreviousContext previousContext = loggingContextSupplier.get();

        try {
            LOGGER.debug("polling records...");
            final Timer timeout = Threads.timer(Clock.SYSTEM, Temporals.min(pollInterval, ConfigurationDefaults.RETURN_CONTROL_INTERVAL));
            synchronized (this) {
                List records = new ArrayList<>(Math.min(maxBatchSize, queue.size()));
                // 从队列中获取数据到records,如果不够,等待pollInterval时间
                while (drainRecords(records, maxBatchSize - records.size()) < maxBatchSize
                        && (maxQueueSizeInBytes == 0 || currentQueueSizeInBytes < maxqueuesizeinbytes timeout.expired throwproducerexceptionifpresent logger.debugno records available yet sleeping a bit... long remainingtimeoutmills='timeout.remaining().toMillis();' if remainingtimeoutmills> 0) {
                        // notify doEnqueue() to resume processing (if anything is on wait())
                        this.notify();
                        // no records yet, so wait a bit
                        this.wait(remainingTimeoutMills);
                    }
                    LOGGER.debug("checking for more records...");
                }
                // notify doEnqueue() to resume processing
                this.notify();
                return records;
            }
        }
        finally {
            previousContext.restore();
        }
    }
  1. 通过synchronized进行锁定当前的对象
  2. 按当前的队列中数据量创建临时列表,从当前队列中获取数据,如果当前获取的数据小于最大值并且如果有设置内存判断且当前的的内存大小未达到最大值,在未超时的前提下,则等待pollInterval时间

相关推荐

别争了,Access数据库才是真正的低代码开发平台

Access数据库是微软公司搞出来的“奇葩”产品。...

Access开发轻松一键将 Access 全库表格导出为 Excel

hi,大家好呀!在日常工作中,Access常常是我们忠实的数据管家,默默守护着项目信息、客户列表或是库存记录。它结构清晰,录入便捷,对于许多中小型应用场景来说,无疑是个得力助手。然而,当我们需要对这...

跟我学:从零开始用Access设计一套完整的系统(一)

序言:Access是一款强大而灵活的数据库软件,可以设计和开发各种类型和规模的数据库应用程序。本文旨在为您提供从零开始设计Access数据库系统的详细指导,并通过实际案例演示如何在Access中设计和...

问卷调查管理程序 Access数据库 功能介绍和VBA代码分享

o本系统包含主要功能有:问卷管理,题目管理,问卷填写,调查结果统计,数据汇总导出o数据库系统包含:表,查询,窗体,VBA代码...

非绑定记录窗体查看管理数据 Access数据库功能模块 VBA代码编程

模块Public成绩IDnumAsLong学生成绩管理PrivateSubCommand更新_Click()DoCmd.SetWarnings(False)...

ACCESS中的DLookUp函数是如何运算的?

一、DLookUp函数介绍1.DLookUp函数的用途:可以用于从指定集合(一个域)中获取符合条件的特定字段的值。2.DLookUp函数的格式为:DLookUp(expr,domain,...

Excel常用技能分享与探讨(5-宏与VBA简介 VBA之用户窗体-一)

用户窗体(UserForm)是VBA中创建交互式界面的核心工具,可用于数据录入、设置参数或展示信息。...

【每日任务计划管理系统】Access数据库管理系统 VBA代码分享

窗体系统主页文本框,组合框,按钮,子窗体OptionCompareDatabase...

VBA高效开发:用用户窗体打造个性化数据录入工具

在日常办公中,Excel的数据录入是否总让你陷入这些困境?手动输入易错、格式混乱难追溯、重复操作耗时费力。今天,我们将突破常规,利用VBA的用户窗体(UserForm)构建一套“智能校验、流程清晰、...

VBA编程(基于Access)第1课:VBA的作用和学习方法

VBA,英文全称VisualBasicforApplications,直接翻译过来叫做“可以直接使用的VB语言”。...

Access数据库宏与VBA代码的使用(精品一)

Access数据库的宏相当于实现某一功能的一系列命令和操作,我们无需写代码,系统已经将主体代码集成一块,我们只需要做一些简单的操作即可,而VBA代码则是实实在在的代码写到程序里面,我们可以自己编写,也...

【每日任务管理系统】(2) VB 管理系统 代码 Visual Basic access数据库

窗体全部任务DimdhAsLong'存储高度差DimdwAsLong'存储宽度差...

VBA连接access数据库开发软件(vba调用数据库连接)

VBA连接access数据库开发小软件虽然VBA(包括VB)已不再流行,但是在某些场合还是比较方便的,尤其对非编程专业人员。灵活使用VBA,可以很十分方便的处理excel,access数据,提高工作效...

九章云极发布全新AI智算云平台:支持秒级生成百万级Token

6月16日,在在九章云极智能计算科技论坛上,九章云极宣布推出全新AI智算云平台“九章智算云AlayaNeWCloud2.0”,该平台基于Serverless技术架构与强化学习技术(RL)的深度融合...

浅谈基于大数据技术下的“云旅游”平台运营策略研究

云旅游体验平台是利用大数据和虚拟技术,构建虚拟旅游环境,能够改变旅游企业的营销模式和旅游者的消费模式。本文从云旅平台多维度数据信息的获取与分析,平台体验质量反馈信息数据构建,云旅游平台服务功能设计方案...