生产者/消费者模式是异步编程中最经典且最重要的协作模式之一,其核心思想是通过任务队列实现生产者与消费者之间的解耦,使程序能够高效处理大量任务。在现代软件开发中,这种模式几乎无处不在,无论是线程池、消息队列(如Kafka、RabbitMQ),还是高性能服务器架构,都大量应用了这一模式。
核心概念与实现机制
生产者/消费者模式的基本组成包括三个部分:
- 生产者(Producer):负责生成任务或数据的组件
- 任务队列(Queue):用于存储待处理任务的缓冲区
- 消费者(Consumer):负责处理任务的组件
在实现机制上,生产者/消费者模式通常采用以下方式:
- 线程池实现:生产者将任务提交到线程池的任务队列中,消费者是线程池中的线程,负责从队列中取出任务执行。
- 消息队列实现:生产者将消息发送到消息中间件,消费者从消息队列中拉取消息进行处理。
Flink CDC中的应用
当前以flink CDC 中mysql connector 为源码进行分析。
这个过程涉及到了几个重要的部分:SplitReader、BinlogClient、DataChangeEvent、队列以及SourceReader。
SplitReader 是一个专门设计用来读取和初步处理数据的组件。它通过使用 BinlogClient 与 MySQL 数据库建立连接。Binlog(二进制日志)是 MySQL 中用于记录所有更改数据库状态的信息的一种机制,比如表的插入、更新或删除等操作都会被记录下来。通过 BinlogClient,SplitReader 能够从 MySQL 的 binlog 文件中读取出这些变更信息,SplitReader 不仅负责读取这些日志条目,还会对它们进行解析。这里的解析指的是将原始的日志格式转换为更加结构化且易于后续处理的形式 —— 即 DataChangeEvent 对象。每个 DataChangeEvent 都代表了数据库中发生的一个特定事件,如某条记录被修改了或是新添加了一条记录等,并包含了关于该事件的所有必要信息,如时间戳、受影响的表名、行的具体变化内容等。
一旦这些 DataChangeEvent 被创建出来后,就会被加入到一个队列当中。这个队列充当着缓冲区的角色,确保即使在短时间内产生大量变更时,系统也能够平稳地处理而不至于过载。同时,它还允许不同组件之间异步地交换数据,提高了整个系统的灵活性和响应速度。
SourceReader 是另一个关键组件,它的任务是从上述提到的队列中取出 DataChangeEvent,并将其转发给“下游”——即接下来需要利用这些数据执行进一步处理或分析的部分。这可以包括但不限于数据清洗、转换、加载至其他存储系统、实时计算等等。通过这种方式,SourceReader 担当了数据流动管道中的一个重要环节,确保信息能够高效准确地传递下去。
在配置Mysql connector 时,可以指定以下debezium的属性:
属性 | 说明 |
max.queue.size | 队列存储的元素最大值 |
poll.interval.ms | 消费者线程拉取数据最大间隔时间 |
max.queue.size.in.bytes | 队列中最大存储的字节 |
源码分析
该内容的主要代码集中于 ChangeEventQueue, 关注两个方法 doEnqueue 和 poll 方法。
doEnqueue
protected void doEnqueue(T record) throws InterruptedException {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Enqueuing source record '{}'", record);
}
synchronized (this) {
while (queue.size() >= maxQueueSize || (maxQueueSizeInBytes > 0 && currentQueueSizeInBytes >= maxQueueSizeInBytes)) {
// notify poll() to drain queue
this.notifyAll();
// queue size or queue sizeInBytes threshold reached, so wait a bit
this.wait(pollInterval.toMillis());
}
queue.add(record);
// If we pass a positiveLong max.queue.size.in.bytes to enable handling queue size in bytes feature
if (maxQueueSizeInBytes > 0) {
long messageSize = ObjectSizeCalculator.getObjectSize(record);
sizeInBytesQueue.add(messageSize);
currentQueueSizeInBytes += messageSize;
}
if (queue.size() >= maxBatchSize || (maxQueueSizeInBytes > 0 && currentQueueSizeInBytes >= maxQueueSizeInBytes)) {
// notify poll() to start draining queue and do not wait
this.notifyAll();
}
}
}
加入元素的逻辑:
- 使用synchronized对当前对象进行锁定
- 判断当前队列中是否已经满了,或者存储的内存大小已经超过最大值,如果是则要通知正在等待的线程进行消费,并等待一段时间,重新检查
- 当前队列未满的情况下,直接加入队列
poll 逻辑
public List poll() throws InterruptedException {
LoggingContext.PreviousContext previousContext = loggingContextSupplier.get();
try {
LOGGER.debug("polling records...");
final Timer timeout = Threads.timer(Clock.SYSTEM, Temporals.min(pollInterval, ConfigurationDefaults.RETURN_CONTROL_INTERVAL));
synchronized (this) {
List records = new ArrayList<>(Math.min(maxBatchSize, queue.size()));
// 从队列中获取数据到records,如果不够,等待pollInterval时间
while (drainRecords(records, maxBatchSize - records.size()) < maxBatchSize
&& (maxQueueSizeInBytes == 0 || currentQueueSizeInBytes < maxqueuesizeinbytes timeout.expired throwproducerexceptionifpresent logger.debugno records available yet sleeping a bit... long remainingtimeoutmills='timeout.remaining().toMillis();' if remainingtimeoutmills> 0) {
// notify doEnqueue() to resume processing (if anything is on wait())
this.notify();
// no records yet, so wait a bit
this.wait(remainingTimeoutMills);
}
LOGGER.debug("checking for more records...");
}
// notify doEnqueue() to resume processing
this.notify();
return records;
}
}
finally {
previousContext.restore();
}
}
- 通过synchronized进行锁定当前的对象
- 按当前的队列中数据量创建临时列表,从当前队列中获取数据,如果当前获取的数据小于最大值并且如果有设置内存判断且当前的的内存大小未达到最大值,在未超时的前提下,则等待pollInterval时间