百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术文章 > 正文

Flink CDC中的事件队列的使用(flink cdc canal)

haoteby 2025-03-19 12:49 20 浏览

生产者/消费者模式是异步编程中最经典且最重要的协作模式之一,其核心思想是通过任务队列实现生产者与消费者之间的解耦,使程序能够高效处理大量任务。在现代软件开发中,这种模式几乎无处不在,无论是线程池、消息队列(如Kafka、RabbitMQ),还是高性能服务器架构,都大量应用了这一模式。

核心概念与实现机制

生产者/消费者模式的基本组成包括三个部分:

  1. 生产者(Producer):负责生成任务或数据的组件
  2. 任务队列(Queue):用于存储待处理任务的缓冲区
  3. 消费者(Consumer):负责处理任务的组件
    在实现机制上,生产者/消费者模式通常采用以下方式:
  • 线程池实现:生产者将任务提交到线程池的任务队列中,消费者是线程池中的线程,负责从队列中取出任务执行。
  • 消息队列实现:生产者将消息发送到消息中间件,消费者从消息队列中拉取消息进行处理。

Flink CDC中的应用

当前以flink CDC 中mysql connector 为源码进行分析。

这个过程涉及到了几个重要的部分:SplitReader、BinlogClient、DataChangeEvent、队列以及SourceReader。
SplitReader 是一个专门设计用来读取和初步处理数据的组件。它通过使用 BinlogClient 与 MySQL 数据库建立连接。Binlog(二进制日志)是 MySQL 中用于记录所有更改数据库状态的信息的一种机制,比如表的插入、更新或删除等操作都会被记录下来。通过 BinlogClient,SplitReader 能够从 MySQL 的 binlog 文件中读取出这些变更信息,SplitReader 不仅负责读取这些日志条目,还会对它们进行解析。这里的解析指的是将原始的日志格式转换为更加结构化且易于后续处理的形式 —— 即 DataChangeEvent 对象。每个 DataChangeEvent 都代表了数据库中发生的一个特定事件,如某条记录被修改了或是新添加了一条记录等,并包含了关于该事件的所有必要信息,如时间戳、受影响的表名、行的具体变化内容等。
一旦这些 DataChangeEvent 被创建出来后,就会被加入到一个队列当中。这个队列充当着缓冲区的角色,确保即使在短时间内产生大量变更时,系统也能够平稳地处理而不至于过载。同时,它还允许不同组件之间异步地交换数据,提高了整个系统的灵活性和响应速度。
SourceReader 是另一个关键组件,它的任务是从上述提到的队列中取出 DataChangeEvent,并将其转发给“下游”——即接下来需要利用这些数据执行进一步处理或分析的部分。这可以包括但不限于数据清洗、转换、加载至其他存储系统、实时计算等等。通过这种方式,SourceReader 担当了数据流动管道中的一个重要环节,确保信息能够高效准确地传递下去。

在配置Mysql connector 时,可以指定以下debezium的属性:

属性

说明

max.queue.size

队列存储的元素最大值

poll.interval.ms

消费者线程拉取数据最大间隔时间

max.queue.size.in.bytes

队列中最大存储的字节

源码分析

该内容的主要代码集中于 ChangeEventQueue, 关注两个方法 doEnqueue 和 poll 方法。

doEnqueue

protected void doEnqueue(T record) throws InterruptedException {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Enqueuing source record '{}'", record);
        }

        synchronized (this) {
            while (queue.size() >= maxQueueSize || (maxQueueSizeInBytes > 0 && currentQueueSizeInBytes >= maxQueueSizeInBytes)) {
                // notify poll() to drain queue
                this.notifyAll();
                // queue size or queue sizeInBytes threshold reached, so wait a bit
                this.wait(pollInterval.toMillis());
            }

            queue.add(record);
            // If we pass a positiveLong max.queue.size.in.bytes to enable handling queue size in bytes feature
            if (maxQueueSizeInBytes > 0) {
                long messageSize = ObjectSizeCalculator.getObjectSize(record);
                sizeInBytesQueue.add(messageSize);
                currentQueueSizeInBytes += messageSize;
            }

            if (queue.size() >= maxBatchSize || (maxQueueSizeInBytes > 0 && currentQueueSizeInBytes >= maxQueueSizeInBytes)) {
                // notify poll() to start draining queue and do not wait
                this.notifyAll();
            }
        }
    }

加入元素的逻辑:

  1. 使用synchronized对当前对象进行锁定
  2. 判断当前队列中是否已经满了,或者存储的内存大小已经超过最大值,如果是则要通知正在等待的线程进行消费,并等待一段时间,重新检查
  3. 当前队列未满的情况下,直接加入队列

poll 逻辑

public List poll() throws InterruptedException {
        LoggingContext.PreviousContext previousContext = loggingContextSupplier.get();

        try {
            LOGGER.debug("polling records...");
            final Timer timeout = Threads.timer(Clock.SYSTEM, Temporals.min(pollInterval, ConfigurationDefaults.RETURN_CONTROL_INTERVAL));
            synchronized (this) {
                List records = new ArrayList<>(Math.min(maxBatchSize, queue.size()));
                // 从队列中获取数据到records,如果不够,等待pollInterval时间
                while (drainRecords(records, maxBatchSize - records.size()) < maxBatchSize
                        && (maxQueueSizeInBytes == 0 || currentQueueSizeInBytes < maxqueuesizeinbytes timeout.expired throwproducerexceptionifpresent logger.debugno records available yet sleeping a bit... long remainingtimeoutmills='timeout.remaining().toMillis();' if remainingtimeoutmills> 0) {
                        // notify doEnqueue() to resume processing (if anything is on wait())
                        this.notify();
                        // no records yet, so wait a bit
                        this.wait(remainingTimeoutMills);
                    }
                    LOGGER.debug("checking for more records...");
                }
                // notify doEnqueue() to resume processing
                this.notify();
                return records;
            }
        }
        finally {
            previousContext.restore();
        }
    }
  1. 通过synchronized进行锁定当前的对象
  2. 按当前的队列中数据量创建临时列表,从当前队列中获取数据,如果当前获取的数据小于最大值并且如果有设置内存判断且当前的的内存大小未达到最大值,在未超时的前提下,则等待pollInterval时间

相关推荐

Python爬虫进阶教程(二):线程、协程

简介线程线程也叫轻量级进程,它是一个基本的CPU执行单元,也是程序执行过程中的最小单元,由线程ID、程序计数器、寄存器集合和堆栈共同组成。线程的引入减小了程序并发执行时的开销,提高了操作系统的并发性能...

A320-V2500发动机系统FADEC介绍(2)

目的全权数字发动机控制(FADEC)系统在所有飞行和运行阶段提供全范围发动机控制。...

三国志战棋版:玩家“二叔”用这套群DOT在比武中拿下31胜5负

声明:本文首发于今日头条,而后发布于“鼎叔闯三棋”的微信公众号、抖音、哔哩哔哩和小红书平台,如果在其他平台就是抄袭。...

真正的独一无二:Dot One 推出 DNA 定制系列 139英镑起

相信很多人在挑选衣物时有着这样的困扰,综合了性价比、面料等因素后好不容易找到了心仪的款式,还要担心是否会撞衫,不管是擦肩而过的陌生人还是身边的熟人,都令人尴尬。小部分人为此热衷于购买少量的古着或者限量...

崩铁:周年庆福利再升级,老角色加强时间确定,3.xdot体系反转

#埃安UT大一圈高级很多#...

Dotgo推出RBMHub,扩大了CPaaS提供商的覆盖范围和功能

据telecompaper网7月15日报道,用于商业消息传递的RichCommunicationServices(RCS)解决方案的领先提供商Dotgo宣布推出RBMHub。RBMHub的推出扩大了C...

深度解析:快照取消Dot职业的将何去何从

写在前面曾几何时,术士的出现便被冠以dot大师的名头,从远古时期的献祭腐蚀虹吸不如暗牧一个痛,到TBC上满dot=荣誉击杀+1,到wlk接近全暴击的冰晶腐蚀,再到CTM就算了吧MOP的各种变态吸x放...

星穹铁道:抽卡芙卡之前,你必须了解什么是dot!

卡妈终于上线了,可还是有很多人不明白什么是dot伤害,抽了卡妈直接玩起了直伤流,把一个持续伤害的引爆器玩成了打手,卡妈打dot伤害是远高于直伤的,有了卡妈的玩家一直了解dot,不然这卡妈就真被玩成四不...

游戏界的闪耀星辰陨落:悼念知名游戏博主″dotα牛娃″

无尽哀思!在数字时代浪潮中,游戏不仅是消遣娱乐的代名词,更是连接心灵的桥梁,构筑了无数人的青春回忆。在这片浩瀚无垠的游戏宇宙中,有这样一位博主,他以独特的风采、深邃的洞察力和无尽的热情,成为了玩家心中...

直击2017新加坡同性恋聚会Pink Dot,自由爱!

今年的“粉红点”又来啦~这个支持LGBT群体(男女同志、双性恋、跨性别等)群体的活动,从2009年起,已经在新加坡举办8年了!”这个非营利的同性恋权益活动,主要是希望大家了解到,不管一个人的性倾向或...

python-dotenv,一款超级实用处理环境变量python库

python-dotenv,一款超级实用处理环境变量python库python-dotenv概述:...

亚马逊语音助手毫无征兆发笑 诡异至极吓坏用户

来源:新华网美国电商亚马逊7日承诺,将更改名下“亚历克萨”语音系统设置,令它不会莫名发笑,免得吓坏用户。“亚历克萨”是亚马逊开发的语音助手软件,可服从用户语音指令完成对话、播放音乐等任务。依照原来设计...

2022最火英文网名男女生

精选好听英文昵称带翻译1.moveon(离开)2.Monster(怪物)3.Solo吉他手4.Finish.(散场)...

智能家具 RecycleDot 的出现给传统家具厂商带来新的挑战

从可穿戴手环、手表到智能衣服,智能硬件逐步渗透到每一个领域。最近有一对父子MikeSandru和JohnSandru在自家的车库中设计了一款智能家具RecycleDot,给日渐萧条的家具行...

欧洲通信卫星公司 OneWeb 敦促印度DoT尽早批准提供卫星宽带服务

据telecomtalk2月17日报道,欧洲通信卫星公司EutelsatOneWeb近日敦促印度电信部(DoT)尽快批准其在印度部署双地球站网关的计划,以便连接其近地轨道(LEO)全球卫星星座,并...