Debezium 源码解析
Debezium解析binlog得核心类是:
DebeziumEngine,
MysqlStreamingChangeEventSource,
com.github.shyiko.mysql.binlog.BinaryLogClient,
底层通讯类PacketChannel
底层socket通讯类是PacketChannel(com.github.shyiko.mysql.binlog.network.protocol),BinartLogClient类调用该类读取数据.
/*** @author <a href="mailto:stanley.shyiko@gmail.com">Stanley Shyiko</a>*/
public class PacketChannel implements Channel {private int packetNumber = 0;private boolean authenticationComplete;private boolean isSSL = false;private Socket socket;private ByteArrayInputStream inputStream;private ByteArrayOutputStream outputStream;public PacketChannel(String hostname, int port) throws IOException {this(new Socket(hostname, port));}public PacketChannel(Socket socket) throws IOException {this.socket = socket;this.inputStream = new ByteArrayInputStream(new BufferedSocketInputStream(socket.getInputStream()));this.outputStream = new ByteArrayOutputStream(socket.getOutputStream());}public ByteArrayInputStream getInputStream() {return inputStream;}public ByteArrayOutputStream getOutputStream() {return outputStream;}public void authenticationComplete() {authenticationComplete = true;}public byte[] read() throws IOException {int length = inputStream.readInteger(3);int sequence = inputStream.read(); // sequenceif ( sequence != packetNumber++ ) {throw new IOException("unexpected sequence #" + sequence);}return inputStream.read(length);}public void write(Command command) throws IOException {byte[] body = command.toByteArray();ByteArrayOutputStream buffer = new ByteArrayOutputStream();buffer.writeInteger(body.length, 3); // packet length// see https://dev.mysql.com/doc/dev/mysql-server/8.0.11/page_protocol_basic_packets.html#sect_protocol_basic_packets_sequence_id// we only have to maintain a sequence number in the authentication phase.// what the point is, I do not knowif ( authenticationComplete ) {packetNumber = 0;}buffer.writeInteger(packetNumber++, 1);buffer.write(body, 0, body.length);outputStream.write(buffer.toByteArray());// though it has no effect in case of default (underlying) output stream (SocketOutputStream),// it may be necessary in case of non-default oneoutputStream.flush();}public void upgradeToSSL(SSLSocketFactory sslSocketFactory, HostnameVerifier hostnameVerifier) throws IOException {SSLSocket sslSocket = sslSocketFactory.createSocket(this.socket);sslSocket.startHandshake();socket = sslSocket;inputStream = new ByteArrayInputStream(sslSocket.getInputStream());outputStream = new ByteArrayOutputStream(sslSocket.getOutputStream());if (hostnameVerifier != null && !hostnameVerifier.verify(sslSocket.getInetAddress().getHostName(),sslSocket.getSession())) {throw new IdentityVerificationException("\"" + sslSocket.getInetAddress().getHostName() +"\" identity was not confirmed");}isSSL = true;}public boolean isSSL() {return isSSL;}@Overridepublic boolean isOpen() {return !socket.isClosed();}@Overridepublic void close() throws IOException {try {socket.shutdownInput(); // for socketInputStream.setEOF(true)} catch (Exception e) {// ignore}try {socket.shutdownOutput();} catch (Exception e) {// ignore}socket.close();}
}
BinaryLogClient 源码
BinaryLogClient 的源码如下,核心是connect方法,该方法负责:
1.与数据库连接,认证;
2.执行dump命令,使自己作为slave运行获取最新的binlog以及pos位置;
3.执行心跳sql,binlogCheckSum sql,获取binlog 校验值,以及设置主库心跳间隔
4.使用PacketChannel相关方法,将二进制Binlog转换为Event事件;
5.注册eventHandlers,处理相关事件后分发该事件至ChangeDataEvent队列,
使得DebeziumEngine的run方法可以获取到Event进行消费
6.开启后台线程监听Binlog,并执行4,5步
源码为:
/** Copyright 2013 Stanley Shyiko** Licensed under the Apache License, Version 2.0 (the "License");* you may not use this file except in compliance with the License.* You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/
package com.github.shyiko.mysql.binlog;import com.github.shyiko.mysql.binlog.event.Event;
import com.github.shyiko.mysql.binlog.event.EventHeader;
import com.github.shyiko.mysql.binlog.event.EventHeaderV4;
import com.github.shyiko.mysql.binlog.event.EventType;
import com.github.shyiko.mysql.binlog.event.GtidEventData;
import com.github.shyiko.mysql.binlog.event.QueryEventData;
import com.github.shyiko.mysql.binlog.event.RotateEventData;
import com.github.shyiko.mysql.binlog.event.deserialization.ChecksumType;
import com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException;
import com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializer;
import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.EventDataWrapper;
import com.github.shyiko.mysql.binlog.event.deserialization.GtidEventDataDeserializer;
import com.github.shyiko.mysql.binlog.event.deserialization.QueryEventDataDeserializer;
import com.github.shyiko.mysql.binlog.event.deserialization.RotateEventDataDeserializer;
import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;
import com.github.shyiko.mysql.binlog.jmx.BinaryLogClientMXBean;
import com.github.shyiko.mysql.binlog.network.AuthenticationException;
import com.github.shyiko.mysql.binlog.network.Authenticator;
import com.github.shyiko.mysql.binlog.network.ClientCapabilities;
import com.github.shyiko.mysql.binlog.network.DefaultSSLSocketFactory;
import com.github.shyiko.mysql.binlog.network.SSLMode;
import com.github.shyiko.mysql.binlog.network.SSLSocketFactory;
import com.github.shyiko.mysql.binlog.network.ServerException;
import com.github.shyiko.mysql.binlog.network.SocketFactory;
import com.github.shyiko.mysql.binlog.network.TLSHostnameVerifier;
import com.github.shyiko.mysql.binlog.network.protocol.ErrorPacket;
import com.github.shyiko.mysql.binlog.network.protocol.GreetingPacket;
import com.github.shyiko.mysql.binlog.network.protocol.Packet;
import com.github.shyiko.mysql.binlog.network.protocol.PacketChannel;
import com.github.shyiko.mysql.binlog.network.protocol.ResultSetRowPacket;
import com.github.shyiko.mysql.binlog.network.protocol.command.AuthenticateNativePasswordCommand;
import com.github.shyiko.mysql.binlog.network.protocol.command.AuthenticateSHA2Command;
import com.github.shyiko.mysql.binlog.network.protocol.command.AuthenticateSecurityPasswordCommand;
import com.github.shyiko.mysql.binlog.network.protocol.command.Command;
import com.github.shyiko.mysql.binlog.network.protocol.command.DumpBinaryLogCommand;
import com.github.shyiko.mysql.binlog.network.protocol.command.DumpBinaryLogGtidCommand;
import com.github.shyiko.mysql.binlog.network.protocol.command.PingCommand;
import com.github.shyiko.mysql.binlog.network.protocol.command.QueryCommand;
import com.github.shyiko.mysql.binlog.network.protocol.command.SSLRequestCommand;import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
import java.io.EOFException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
import java.security.GeneralSecurityException;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;/*** MySQL replication stream client.** @author <a href="mailto:stanley.shyiko@gmail.com">Stanley Shyiko</a>*/
public class BinaryLogClient implements BinaryLogClientMXBean {private static final SSLSocketFactory DEFAULT_REQUIRED_SSL_MODE_SOCKET_FACTORY = new DefaultSSLSocketFactory() {@Overrideprotected void initSSLContext(SSLContext sc) throws GeneralSecurityException {sc.init(null, new TrustManager[]{new X509TrustManager() {@Overridepublic void checkClientTrusted(X509Certificate[] x509Certificates, String s)throws CertificateException { }@Overridepublic void checkServerTrusted(X509Certificate[] x509Certificates, String s)throws CertificateException { }@Overridepublic X509Certificate[] getAcceptedIssuers() {return new X509Certificate[0];}}}, null);}};private static final SSLSocketFactory DEFAULT_VERIFY_CA_SSL_MODE_SOCKET_FACTORY = new DefaultSSLSocketFactory();// https://dev.mysql.com/doc/internals/en/sending-more-than-16mbyte.htmlprivate static final int MAX_PACKET_LENGTH = 16777215;private final Logger logger = Logger.getLogger(getClass().getName());private final String hostname;private final int port;private final String schema;private final String username;private final String password;private boolean blocking = true;private long serverId = 65535;private volatile String binlogFilename;private volatile long binlogPosition = 4;private volatile long connectionId;private SSLMode sslMode = SSLMode.DISABLED;private GtidSet gtidSet;private final Object gtidSetAccessLock = new Object();private boolean gtidSetFallbackToPurged;private boolean useBinlogFilenamePositionInGtidMode;private String gtid;private boolean tx;private EventDeserializer eventDeserializer = new EventDeserializer();private final List<EventListener> eventListeners = new CopyOnWriteArrayList<EventListener>();private final List<LifecycleListener> lifecycleListeners = new CopyOnWriteArrayList<LifecycleListener>();private SocketFactory socketFactory;private SSLSocketFactory sslSocketFactory;private volatile PacketChannel channel;private volatile boolean connected;private volatile long masterServerId = -1;private ThreadFactory threadFactory;private boolean keepAlive = true;private long keepAliveInterval = TimeUnit.MINUTES.toMillis(1);private long heartbeatInterval;private volatile long eventLastSeen;private long connectTimeout = TimeUnit.SECONDS.toMillis(3);private volatile ExecutorService keepAliveThreadExecutor;private final Lock connectLock = new ReentrantLock();private final Lock keepAliveThreadExecutorLock = new ReentrantLock();/*** Alias for BinaryLogClient("localhost", 3306, <no schema> = null, username, password).* @see BinaryLogClient#BinaryLogClient(String, int, String, String, String)* @param username login name* @param password password*/public BinaryLogClient(String username, String password) {this("localhost", 3306, null, username, password);}/*** Alias for BinaryLogClient("localhost", 3306, schema, username, password).* @see BinaryLogClient#BinaryLogClient(String, int, String, String, String)* @param schema database name, nullable* @param username login name* @param password password*/public BinaryLogClient(String schema, String username, String password) {this("localhost", 3306, schema, username, password);}/*** Alias for BinaryLogClient(hostname, port, <no schema> = null, username, password).* @see BinaryLogClient#BinaryLogClient(String, int, String, String, String)* @param hostname mysql server hostname* @param port mysql server port* @param username login name* @param password password*/public BinaryLogClient(String hostname, int port, String username, String password) {this(hostname, port, null, username, password);}/*** @param hostname mysql server hostname* @param port mysql server port* @param schema database name, nullable. Note that this parameter has nothing to do with event filtering. It's* used only during the authentication.* @param username login name* @param password password*/public BinaryLogClient(String hostname, int port, String schema, String username, String password) {this.hostname = hostname;this.port = port;this.schema = schema;this.username = username;this.password = password;}public boolean isBlocking() {return blocking;}/*** @param blocking blocking mode. If set to false - BinaryLogClient will disconnect after the last event.*/public void setBlocking(boolean blocking) {this.blocking = blocking;}public SSLMode getSSLMode() {return sslMode;}public void setSSLMode(SSLMode sslMode) {if (sslMode == null) {throw new IllegalArgumentException("SSL mode cannot be NULL");}this.sslMode = sslMode;}public long getMasterServerId() {return this.masterServerId;}/*** @return server id (65535 by default)* @see #setServerId(long)*/public long getServerId() {return serverId;}/*** @param serverId server id (in the range from 1 to 2^32 - 1). This value MUST be unique across whole replication* group (that is, different from any other server id being used by any master or slave). Keep in mind that each* binary log client (mysql-binlog-connector-java/BinaryLogClient, mysqlbinlog, etc) should be treated as a* simplified slave and thus MUST also use a different server id.* @see #getServerId()*/public void setServerId(long serverId) {this.serverId = serverId;}/*** @return binary log filename, nullable (and null be default). Note that this value is automatically tracked by* the client and thus is subject to change (in response to {@link EventType#ROTATE}, for example).* @see #setBinlogFilename(String)*/public String getBinlogFilename() {return binlogFilename;}/*** @param binlogFilename binary log filename.* Special values are:* <ul>* <li>null, which turns on automatic resolution (resulting in the last known binlog and position). This is what* happens by default when you don't specify binary log filename explicitly.</li>* <li>"" (empty string), which instructs server to stream events starting from the oldest known binlog.</li>* </ul>* @see #getBinlogFilename()*/public void setBinlogFilename(String binlogFilename) {this.binlogFilename = binlogFilename;}/*** @return binary log position of the next event, 4 by default (which is a position of first event). Note that this* value changes with each incoming event.* @see #setBinlogPosition(long)*/public long getBinlogPosition() {return binlogPosition;}/*** @param binlogPosition binary log position. Any value less than 4 gets automatically adjusted to 4 on connect.* @see #getBinlogPosition()*/public void setBinlogPosition(long binlogPosition) {this.binlogPosition = binlogPosition;}/*** @return thread id*/public long getConnectionId() {return connectionId;}/*** @return GTID set. Note that this value changes with each received GTID event (provided client is in GTID mode).* @see #setGtidSet(String)*/public String getGtidSet() {synchronized (gtidSetAccessLock) {return gtidSet != null ? gtidSet.toString() : null;}}/*** @param gtidSet GTID set (can be an empty string).* <p>NOTE #1: Any value but null will switch BinaryLogClient into a GTID mode (this will also set binlogFilename* to "" (provided it's null) forcing MySQL to send events starting from the oldest known binlog (keep in mind* that connection will fail if gtid_purged is anything but empty (unless* {@link #setGtidSetFallbackToPurged(boolean)} is set to true))).* <p>NOTE #2: GTID set is automatically updated with each incoming GTID event (provided GTID mode is on).* @see #getGtidSet()* @see #setGtidSetFallbackToPurged(boolean)*/public void setGtidSet(String gtidSet) {if (gtidSet != null && this.binlogFilename == null) {this.binlogFilename = "";}synchronized (gtidSetAccessLock) {this.gtidSet = gtidSet != null ? new GtidSet(gtidSet) : null;}}/*** @see #setGtidSetFallbackToPurged(boolean)* @return whether gtid_purged is used as a fallback*/public boolean isGtidSetFallbackToPurged() {return gtidSetFallbackToPurged;}/*** @param gtidSetFallbackToPurged true if gtid_purged should be used as a fallback when gtidSet is set to "" and* MySQL server has purged some of the binary logs, false otherwise (default).*/public void setGtidSetFallbackToPurged(boolean gtidSetFallbackToPurged) {this.gtidSetFallbackToPurged = gtidSetFallbackToPurged;}/*** @see #setUseBinlogFilenamePositionInGtidMode(boolean)* @return value of useBinlogFilenamePostionInGtidMode*/public boolean isUseBinlogFilenamePositionInGtidMode() {return useBinlogFilenamePositionInGtidMode;}/*** @param useBinlogFilenamePositionInGtidMode true if MySQL server should start streaming events from a given* {@link #getBinlogFilename()} and {@link #getBinlogPosition()} instead of "the oldest known binlog" when* {@link #getGtidSet()} is set, false otherwise (default).*/public void setUseBinlogFilenamePositionInGtidMode(boolean useBinlogFilenamePositionInGtidMode) {this.useBinlogFilenamePositionInGtidMode = useBinlogFilenamePositionInGtidMode;}/*** @return true if "keep alive" thread should be automatically started (default), false otherwise.* @see #setKeepAlive(boolean)*/public boolean isKeepAlive() {return keepAlive;}/*** @param keepAlive true if "keep alive" thread should be automatically started (recommended and true by default),* false otherwise.* @see #isKeepAlive()* @see #setKeepAliveInterval(long)*/public void setKeepAlive(boolean keepAlive) {this.keepAlive = keepAlive;}/*** @return "keep alive" interval in milliseconds, 1 minute by default.* @see #setKeepAliveInterval(long)*/public long getKeepAliveInterval() {return keepAliveInterval;}/*** @param keepAliveInterval "keep alive" interval in milliseconds.* @see #getKeepAliveInterval()* @see #setHeartbeatInterval(long)*/public void setKeepAliveInterval(long keepAliveInterval) {this.keepAliveInterval = keepAliveInterval;}/*** @return "keep alive" connect timeout in milliseconds.* @see #setKeepAliveConnectTimeout(long)** @deprecated in favour of {@link #getConnectTimeout()}*/public long getKeepAliveConnectTimeout() {return connectTimeout;}/*** @param connectTimeout "keep alive" connect timeout in milliseconds.* @see #getKeepAliveConnectTimeout()** @deprecated in favour of {@link #setConnectTimeout(long)}*/public void setKeepAliveConnectTimeout(long connectTimeout) {this.connectTimeout = connectTimeout;}/*** @return heartbeat period in milliseconds (0 if not set (default)).* @see #setHeartbeatInterval(long)*/public long getHeartbeatInterval() {return heartbeatInterval;}/*** @param heartbeatInterval heartbeat period in milliseconds.* <p>* If set (recommended)* <ul>* <li> HEARTBEAT event will be emitted every "heartbeatInterval".* <li> if {@link #setKeepAlive(boolean)} is on then keepAlive thread will attempt to reconnect if no* HEARTBEAT events were received within {@link #setKeepAliveInterval(long)} (instead of trying to send* PING every {@link #setKeepAliveInterval(long)}, which is fundamentally flawed -* https://github.com/shyiko/mysql-binlog-connector-java/issues/118).* </ul>* Note that when used together with keepAlive heartbeatInterval MUST be set less than keepAliveInterval.** @see #getHeartbeatInterval()*/public void setHeartbeatInterval(long heartbeatInterval) {this.heartbeatInterval = heartbeatInterval;}/*** @return connect timeout in milliseconds, 3 seconds by default.* @see #setConnectTimeout(long)*/public long getConnectTimeout() {return connectTimeout;}/*** @param connectTimeout connect timeout in milliseconds.* @see #getConnectTimeout()*/public void setConnectTimeout(long connectTimeout) {this.connectTimeout = connectTimeout;}/*** @param eventDeserializer custom event deserializer*/public void setEventDeserializer(EventDeserializer eventDeserializer) {if (eventDeserializer == null) {throw new IllegalArgumentException("Event deserializer cannot be NULL");}this.eventDeserializer = eventDeserializer;}/*** @param socketFactory custom socket factory. If not provided, socket will be created with "new Socket()".*/public void setSocketFactory(SocketFactory socketFactory) {this.socketFactory = socketFactory;}/*** @param sslSocketFactory custom ssl socket factory*/public void setSslSocketFactory(SSLSocketFactory sslSocketFactory) {this.sslSocketFactory = sslSocketFactory;}/*** @param threadFactory custom thread factory. If not provided, threads will be created using simple "new Thread()".*/public void setThreadFactory(ThreadFactory threadFactory) {this.threadFactory = threadFactory;}/*** Connect to the replication stream. Note that this method blocks until disconnected.* @throws AuthenticationException if authentication fails* @throws ServerException if MySQL server responds with an error* @throws IOException if anything goes wrong while trying to connect* @throws IllegalStateException if binary log client is already connected*/public void connect() throws IOException, IllegalStateException {if (!connectLock.tryLock()) {throw new IllegalStateException("BinaryLogClient is already connected");}boolean notifyWhenDisconnected = false;try {Callable cancelDisconnect = null;try {try {long start = System.currentTimeMillis();channel = openChannel();//建立tcp连接,这个时候没有完成用户名,密码认证,只是建立了连接,连接建立后Mysql会发送GreetingPacket包, 里面包含了mysql版本等信息,具体内容为:/**mysql: [Warning] Using a password on the command line interface can be insecure.Welcome to the MySQL monitor. Commands end with ; or \g.Your MySQL connection id is 5 connectionId Server version: 5.7.30-log MySQL Community Server (GPL) MYSQL VERSION Copyright (c) 2000, 2020, Oracle and/or its affiliates. All rights reserved.Oracle is a registered trademark of Oracle Corporation and/or itsaffiliates. Other names may be trademarks of their respectiveowners.Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.*/if (connectTimeout > 0 && !isKeepAliveThreadRunning()) {cancelDisconnect = scheduleDisconnectIn(connectTimeout -(System.currentTimeMillis() - start));}if (channel.getInputStream().peek() == -1) {throw new EOFException();}} catch (IOException e) {throw new IOException("Failed to connect to MySQL on " + hostname + ":" + port +". Please make sure it's running.", e);}GreetingPacket greetingPacket = receiveGreeting();//根据配置判断是否升级为SSL协议,这一步仍然没有用户名,密码认证tryUpgradeToSSL(greetingPacket);
//进行用户名密码认证,构建相关的Commandnew Authenticator(greetingPacket, channel, schema, username, password).authenticate();channel.authenticationComplete();connectionId = greetingPacket.getThreadId();if ("".equals(binlogFilename)) {synchronized (gtidSetAccessLock) {if (gtidSet != null && "".equals(gtidSet.toString()) && gtidSetFallbackToPurged) {gtidSet = new GtidSet(fetchGtidPurged());}}}if (binlogFilename == null) {//执行show master status\G语句,获取当前bilog 文件名以及偏移量,并保存到属性值fetchBinlogFilenameAndPosition();}if (binlogPosition < 4) {if (logger.isLoggable(Level.WARNING)) {logger.warning("Binary log position adjusted from " + binlogPosition + " to " + 4);}binlogPosition = 4;}//设置心跳发送间隔.setupConnection();gtid = null;tx = false;requestBinaryLogStream();//发送dump协议,开始获取binlog} catch (IOException e) {disconnectChannel();throw e;} finally {if (cancelDisconnect != null) {try {cancelDisconnect.call();} catch (Exception e) {if (logger.isLoggable(Level.WARNING)) {logger.warning("\"" + e.getMessage() +"\" was thrown while canceling scheduled disconnect call");}}}}connected = true;notifyWhenDisconnected = true;if (logger.isLoggable(Level.INFO)) {String position;synchronized (gtidSetAccessLock) {position = gtidSet != null ? gtidSet.toString() : binlogFilename + "/" + binlogPosition;}logger.info("Connected to " + hostname + ":" + port + " at " + position +" (" + (blocking ? "sid:" + serverId + ", " : "") + "cid:" + connectionId + ")");}for (LifecycleListener lifecycleListener : lifecycleListeners) {lifecycleListener.onConnect(this);}if (keepAlive && !isKeepAliveThreadRunning()) {spawnKeepAliveThread();}ensureEventDataDeserializer(EventType.ROTATE, RotateEventDataDeserializer.class);synchronized (gtidSetAccessLock) {if (gtidSet != null) {ensureEventDataDeserializer(EventType.GTID, GtidEventDataDeserializer.class);ensureEventDataDeserializer(EventType.QUERY, QueryEventDataDeserializer.class);}}listenForEventPackets();} finally {connectLock.unlock();if (notifyWhenDisconnected) {//执行回调函数for (LifecycleListener lifecycleListener : lifecycleListeners) {lifecycleListener.onDisconnect(this);}}}}/*** Apply additional options for connection before requesting binlog stream.*/protected void setupConnection() throws IOException {ChecksumType checksumType = fetchBinlogChecksum();if (checksumType != ChecksumType.NONE) {confirmSupportOfChecksum(checksumType);}setMasterServerId();if (heartbeatInterval > 0) {enableHeartbeat();}}private PacketChannel openChannel() throws IOException {Socket socket = socketFactory != null ? socketFactory.createSocket() : new Socket();socket.connect(new InetSocketAddress(hostname, port), (int) connectTimeout);return new PacketChannel(socket);}private Callable scheduleDisconnectIn(final long timeout) {final BinaryLogClient self = this;final CountDownLatch connectLatch = new CountDownLatch(1);final Thread thread = newNamedThread(new Runnable() {@Overridepublic void run() {try {connectLatch.await(timeout, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {if (logger.isLoggable(Level.WARNING)) {logger.log(Level.WARNING, e.getMessage());}}if (connectLatch.getCount() != 0) {if (logger.isLoggable(Level.WARNING)) {logger.warning("Failed to establish connection in " + timeout + "ms. " +"Forcing disconnect.");}try {self.disconnectChannel();} catch (IOException e) {if (logger.isLoggable(Level.WARNING)) {logger.log(Level.WARNING, e.getMessage());}}}}}, "blc-disconnect-" + hostname + ":" + port);thread.start();return new Callable() {public Object call() throws Exception {connectLatch.countDown();thread.join();return null;}};}private void checkError(byte[] packet) throws IOException {if (packet[0] == (byte) 0xFF /* error */) {byte[] bytes = Arrays.copyOfRange(packet, 1, packet.length);ErrorPacket errorPacket = new ErrorPacket(bytes);throw new ServerException(errorPacket.getErrorMessage(), errorPacket.getErrorCode(),errorPacket.getSqlState());}}private GreetingPacket receiveGreeting() throws IOException {byte[] initialHandshakePacket = channel.read();checkError(initialHandshakePacket);return new GreetingPacket(initialHandshakePacket);}private boolean tryUpgradeToSSL(GreetingPacket greetingPacket) throws IOException {int collation = greetingPacket.getServerCollation();if (sslMode != SSLMode.DISABLED) {boolean serverSupportsSSL = (greetingPacket.getServerCapabilities() & ClientCapabilities.SSL) != 0;if (!serverSupportsSSL && (sslMode == SSLMode.REQUIRED || sslMode == SSLMode.VERIFY_CA ||sslMode == SSLMode.VERIFY_IDENTITY)) {throw new IOException("MySQL server does not support SSL");}if (serverSupportsSSL) {SSLRequestCommand sslRequestCommand = new SSLRequestCommand();sslRequestCommand.setCollation(collation);channel.write(sslRequestCommand);SSLSocketFactory sslSocketFactory =this.sslSocketFactory != null ?this.sslSocketFactory :sslMode == SSLMode.REQUIRED || sslMode == SSLMode.PREFERRED ?DEFAULT_REQUIRED_SSL_MODE_SOCKET_FACTORY :DEFAULT_VERIFY_CA_SSL_MODE_SOCKET_FACTORY;channel.upgradeToSSL(sslSocketFactory,sslMode == SSLMode.VERIFY_IDENTITY ? new TLSHostnameVerifier() : null);logger.info("SSL enabled");return true;}}return false;}private void enableHeartbeat() throws IOException {channel.write(new QueryCommand("set @master_heartbeat_period=" + heartbeatInterval * 1000000));byte[] statementResult = channel.read();checkError(statementResult);}private void setMasterServerId() throws IOException {channel.write(new QueryCommand("select @@server_id"));ResultSetRowPacket[] resultSet = readResultSet();if (resultSet.length >= 0) {this.masterServerId = Long.parseLong(resultSet[0].getValue(0));}}private void requestBinaryLogStream() throws IOException {long serverId = blocking ? this.serverId : 0; // http://bugs.mysql.com/bug.php?id=71178Command dumpBinaryLogCommand;synchronized (gtidSetAccessLock) {if (gtidSet != null) {dumpBinaryLogCommand = new DumpBinaryLogGtidCommand(serverId,useBinlogFilenamePositionInGtidMode ? binlogFilename : "",useBinlogFilenamePositionInGtidMode ? binlogPosition : 4,gtidSet);} else {dumpBinaryLogCommand = new DumpBinaryLogCommand(serverId, binlogFilename, binlogPosition);}}channel.write(dumpBinaryLogCommand);}private void ensureEventDataDeserializer(EventType eventType,Class<? extends EventDataDeserializer> eventDataDeserializerClass) {EventDataDeserializer eventDataDeserializer = eventDeserializer.getEventDataDeserializer(eventType);if (eventDataDeserializer.getClass() != eventDataDeserializerClass &&eventDataDeserializer.getClass() != EventDataWrapper.Deserializer.class) {EventDataDeserializer internalEventDataDeserializer;try {internalEventDataDeserializer = eventDataDeserializerClass.newInstance();} catch (Exception e) {throw new RuntimeException(e);}eventDeserializer.setEventDataDeserializer(eventType,new EventDataWrapper.Deserializer(internalEventDataDeserializer,eventDataDeserializer));}}private void spawnKeepAliveThread() {final ExecutorService threadExecutor =Executors.newSingleThreadExecutor(new ThreadFactory() {@Overridepublic Thread newThread(Runnable runnable) {return newNamedThread(runnable, "blc-keepalive-" + hostname + ":" + port);}});try {keepAliveThreadExecutorLock.lock();threadExecutor.submit(new Runnable() {@Overridepublic void run() {while (!threadExecutor.isShutdown()) {try {Thread.sleep(keepAliveInterval);} catch (InterruptedException e) {// expected in case of disconnect}if (threadExecutor.isShutdown()) {return;}boolean connectionLost = false;if (heartbeatInterval > 0) {connectionLost = System.currentTimeMillis() - eventLastSeen > keepAliveInterval;} else {try {channel.write(new PingCommand());} catch (IOException e) {connectionLost = true;}}if (connectionLost) {if (logger.isLoggable(Level.INFO)) {logger.info("Trying to restore lost connection to " + hostname + ":" + port);}try {terminateConnect();connect(connectTimeout);} catch (Exception ce) {if (logger.isLoggable(Level.WARNING)) {logger.warning("Failed to restore connection to " + hostname + ":" + port +". Next attempt in " + keepAliveInterval + "ms");}}}}}});keepAliveThreadExecutor = threadExecutor;} finally {keepAliveThreadExecutorLock.unlock();}}private Thread newNamedThread(Runnable runnable, String threadName) {Thread thread = threadFactory == null ? new Thread(runnable) : threadFactory.newThread(runnable);thread.setName(threadName);return thread;}boolean isKeepAliveThreadRunning() {try {keepAliveThreadExecutorLock.lock();return keepAliveThreadExecutor != null && !keepAliveThreadExecutor.isShutdown();} finally {keepAliveThreadExecutorLock.unlock();}}/*** Connect to the replication stream in a separate thread.* @param timeout timeout in milliseconds* @throws AuthenticationException if authentication fails* @throws ServerException if MySQL server responds with an error* @throws IOException if anything goes wrong while trying to connect* @throws TimeoutException if client was unable to connect within given time limit*/public void connect(final long timeout) throws IOException, TimeoutException {final CountDownLatch countDownLatch = new CountDownLatch(1);AbstractLifecycleListener connectListener = new AbstractLifecycleListener() {@Overridepublic void onConnect(BinaryLogClient client) {countDownLatch.countDown();}};registerLifecycleListener(connectListener);final AtomicReference<IOException> exceptionReference = new AtomicReference<IOException>();Runnable runnable = new Runnable() {@Overridepublic void run() {try {setConnectTimeout(timeout);connect();} catch (IOException e) {exceptionReference.set(e);countDownLatch.countDown(); // making sure we don't end up waiting whole "timeout"} catch (Exception e) {exceptionReference.set(new IOException(e)); // method is asynchronous, catch all exceptions so that they are not lostcountDownLatch.countDown(); // making sure we don't end up waiting whole "timeout"}}};newNamedThread(runnable, "blc-" + hostname + ":" + port).start();boolean started = false;try {started = countDownLatch.await(timeout, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {if (logger.isLoggable(Level.WARNING)) {logger.log(Level.WARNING, e.getMessage());}}unregisterLifecycleListener(connectListener);if (exceptionReference.get() != null) {throw exceptionReference.get();}if (!started) {try {terminateConnect();} finally {throw new TimeoutException("BinaryLogClient was unable to connect in " + timeout + "ms");}}}/*** @return true if client is connected, false otherwise*/public boolean isConnected() {return connected;}private String fetchGtidPurged() throws IOException {channel.write(new QueryCommand("show global variables like 'gtid_purged'"));ResultSetRowPacket[] resultSet = readResultSet();if (resultSet.length != 0) {return resultSet[0].getValue(1).toUpperCase();}return "";}private void fetchBinlogFilenameAndPosition() throws IOException {ResultSetRowPacket[] resultSet;channel.write(new QueryCommand("show master status"));resultSet = readResultSet();if (resultSet.length == 0) {throw new IOException("Failed to determine binlog filename/position");}ResultSetRowPacket resultSetRow = resultSet[0];binlogFilename = resultSetRow.getValue(0);binlogPosition = Long.parseLong(resultSetRow.getValue(1));}private ChecksumType fetchBinlogChecksum() throws IOException {channel.write(new QueryCommand("show global variables like 'binlog_checksum'"));ResultSetRowPacket[] resultSet = readResultSet();if (resultSet.length == 0) {return ChecksumType.NONE;}return ChecksumType.valueOf(resultSet[0].getValue(1).toUpperCase());}private void confirmSupportOfChecksum(ChecksumType checksumType) throws IOException {channel.write(new QueryCommand("set @master_binlog_checksum= @@global.binlog_checksum"));byte[] statementResult = channel.read();checkError(statementResult);eventDeserializer.setChecksumType(checksumType);}private void listenForEventPackets() throws IOException {ByteArrayInputStream inputStream = channel.getInputStream();boolean completeShutdown = false;try {while (inputStream.peek() != -1) {int packetLength = inputStream.readInteger(3);inputStream.skip(1); // 1 byte for sequenceint marker = inputStream.read();if (marker == 0xFF) {ErrorPacket errorPacket = new ErrorPacket(inputStream.read(packetLength - 1));throw new ServerException(errorPacket.getErrorMessage(), errorPacket.getErrorCode(),errorPacket.getSqlState());}if (marker == 0xFE && !blocking) {completeShutdown = true;break;}Event event;try {event = eventDeserializer.nextEvent(packetLength == MAX_PACKET_LENGTH ?new ByteArrayInputStream(readPacketSplitInChunks(inputStream, packetLength - 1)) :inputStream);if (event == null) {throw new EOFException();}} catch (Exception e) {Throwable cause = e instanceof EventDataDeserializationException ? e.getCause() : e;if (cause instanceof EOFException || cause instanceof SocketException) {throw e;}if (isConnected()) {for (LifecycleListener lifecycleListener : lifecycleListeners) {lifecycleListener.onEventDeserializationFailure(this, e);}}continue;}if (isConnected()) {eventLastSeen = System.currentTimeMillis();updateGtidSet(event);notifyEventListeners(event);updateClientBinlogFilenameAndPosition(event);}}} catch (Exception e) {if (isConnected()) {for (LifecycleListener lifecycleListener : lifecycleListeners) {lifecycleListener.onCommunicationFailure(this, e);}}} finally {if (isConnected()) {if (completeShutdown) {disconnect(); // initiate complete shutdown sequence (which includes keep alive thread)} else {disconnectChannel();}}}}private byte[] readPacketSplitInChunks(ByteArrayInputStream inputStream, int packetLength) throws IOException {byte[] result = inputStream.read(packetLength);int chunkLength;do {chunkLength = inputStream.readInteger(3);inputStream.skip(1); // 1 byte for sequenceresult = Arrays.copyOf(result, result.length + chunkLength);inputStream.fill(result, result.length - chunkLength, chunkLength);} while (chunkLength == Packet.MAX_LENGTH);return result;}private void updateClientBinlogFilenameAndPosition(Event event) {EventHeader eventHeader = event.getHeader();EventType eventType = eventHeader.getEventType();if (eventType == EventType.ROTATE) {RotateEventData rotateEventData = (RotateEventData) EventDataWrapper.internal(event.getData());binlogFilename = rotateEventData.getBinlogFilename();binlogPosition = rotateEventData.getBinlogPosition();} else// do not update binlogPosition on TABLE_MAP so that in case of reconnect (using a different instance of// client) table mapping cache could be reconstructed before hitting row mutation eventif (eventType != EventType.TABLE_MAP && eventHeader instanceof EventHeaderV4) {EventHeaderV4 trackableEventHeader = (EventHeaderV4) eventHeader;long nextBinlogPosition = trackableEventHeader.getNextPosition();if (nextBinlogPosition > 0) {binlogPosition = nextBinlogPosition;}}}private void updateGtidSet(Event event) {synchronized (gtidSetAccessLock) {if (gtidSet == null) {return;}}EventHeader eventHeader = event.getHeader();switch(eventHeader.getEventType()) {case GTID:GtidEventData gtidEventData = (GtidEventData) EventDataWrapper.internal(event.getData());gtid = gtidEventData.getGtid();break;case XID:commitGtid();tx = false;break;case QUERY:QueryEventData queryEventData = (QueryEventData) EventDataWrapper.internal(event.getData());String sql = queryEventData.getSql();if (sql == null) {break;}if ("BEGIN".equals(sql)) {tx = true;} elseif ("COMMIT".equals(sql) || "ROLLBACK".equals(sql)) {commitGtid();tx = false;} elseif (!tx) {// auto-commit query, likely DDLcommitGtid();}default:}}private void commitGtid() {if (gtid != null) {synchronized (gtidSetAccessLock) {gtidSet.add(gtid);}}}private ResultSetRowPacket[] readResultSet() throws IOException {List<ResultSetRowPacket> resultSet = new LinkedList<>();byte[] statementResult = channel.read();checkError(statementResult);while ((channel.read())[0] != (byte) 0xFE /* eof */) { /* skip */ }for (byte[] bytes; (bytes = channel.read())[0] != (byte) 0xFE /* eof */; ) {checkError(bytes);resultSet.add(new ResultSetRowPacket(bytes));}return resultSet.toArray(new ResultSetRowPacket[resultSet.size()]);}/*** @return registered event listeners*/public List<EventListener> getEventListeners() {return Collections.unmodifiableList(eventListeners);}/*** Register event listener. Note that multiple event listeners will be called in order they* where registered.* @param eventListener event listener*/public void registerEventListener(EventListener eventListener) {eventListeners.add(eventListener);}/*** Unregister all event listener of specific type.* @param listenerClass event listener class to unregister*/public void unregisterEventListener(Class<? extends EventListener> listenerClass) {for (EventListener eventListener: eventListeners) {if (listenerClass.isInstance(eventListener)) {eventListeners.remove(eventListener);}}}/*** Unregister single event listener.* @param eventListener event listener to unregister*/public void unregisterEventListener(EventListener eventListener) {eventListeners.remove(eventListener);}private void notifyEventListeners(Event event) {if (event.getData() instanceof EventDataWrapper) {event = new Event(event.getHeader(), ((EventDataWrapper) event.getData()).getExternal());}for (EventListener eventListener : eventListeners) {try {eventListener.onEvent(event);} catch (Exception e) {if (logger.isLoggable(Level.WARNING)) {logger.log(Level.WARNING, eventListener + " choked on " + event, e);}}}}/*** @return registered lifecycle listeners*/public List<LifecycleListener> getLifecycleListeners() {return Collections.unmodifiableList(lifecycleListeners);}/*** Register lifecycle listener. Note that multiple lifecycle listeners will be called in order they* where registered.* @param lifecycleListener lifecycle listener to register*/public void registerLifecycleListener(LifecycleListener lifecycleListener) {lifecycleListeners.add(lifecycleListener);}/*** Unregister all lifecycle listener of specific type.* @param listenerClass lifecycle listener class to unregister*/public void unregisterLifecycleListener(Class<? extends LifecycleListener> listenerClass) {for (LifecycleListener lifecycleListener : lifecycleListeners) {if (listenerClass.isInstance(lifecycleListener)) {lifecycleListeners.remove(lifecycleListener);}}}/*** Unregister single lifecycle listener.* @param eventListener lifecycle listener to unregister*/public void unregisterLifecycleListener(LifecycleListener eventListener) {lifecycleListeners.remove(eventListener);}/*** Disconnect from the replication stream.* Note that this does not cause binlogFilename/binlogPosition to be cleared out.* As the result following {@link #connect()} resumes client from where it left off.*/public void disconnect() throws IOException {terminateKeepAliveThread();terminateConnect();}private void terminateKeepAliveThread() {try {keepAliveThreadExecutorLock.lock();ExecutorService keepAliveThreadExecutor = this.keepAliveThreadExecutor;if (keepAliveThreadExecutor == null) {return;}keepAliveThreadExecutor.shutdownNow();while (!awaitTerminationInterruptibly(keepAliveThreadExecutor,Long.MAX_VALUE, TimeUnit.NANOSECONDS)) {// ignore}} finally {keepAliveThreadExecutorLock.unlock();}}private static boolean awaitTerminationInterruptibly(ExecutorService executorService, long timeout, TimeUnit unit) {try {return executorService.awaitTermination(timeout, unit);} catch (InterruptedException e) {return false;}}private void terminateConnect() throws IOException {do {disconnectChannel();} while (!tryLockInterruptibly(connectLock, 1000, TimeUnit.MILLISECONDS));connectLock.unlock();}private static boolean tryLockInterruptibly(Lock lock, long time, TimeUnit unit) {try {return lock.tryLock(time, unit);} catch (InterruptedException e) {return false;}}private void disconnectChannel() throws IOException {connected = false;if (channel != null && channel.isOpen()) {channel.close();}}/*** {@link BinaryLogClient}'s event listener.*/public interface EventListener {void onEvent(Event event);}/*** {@link BinaryLogClient}'s lifecycle listener.*/public interface LifecycleListener {/*** Called once client has successfully logged in but before started to receive binlog events.* @param client the client that logged in*/void onConnect(BinaryLogClient client);/*** It's guarantied to be called before {@link #onDisconnect(BinaryLogClient)}) in case of* communication failure.* @param client the client that triggered the communication failure* @param ex The exception that triggered the communication failutre*/void onCommunicationFailure(BinaryLogClient client, Exception ex);/*** Called in case of failed event deserialization. Note this type of error does NOT cause client to* disconnect. If you wish to stop receiving events you'll need to fire client.disconnect() manually.* @param client the client that failed event deserialization* @param ex The exception that triggered the failutre*/void onEventDeserializationFailure(BinaryLogClient client, Exception ex);/*** Called upon disconnect (regardless of the reason).* @param client the client that disconnected*/void onDisconnect(BinaryLogClient client);}/*** Default (no-op) implementation of {@link LifecycleListener}.*/public static abstract class AbstractLifecycleListener implements LifecycleListener {public void onConnect(BinaryLogClient client) { }public void onCommunicationFailure(BinaryLogClient client, Exception ex) { }public void onEventDeserializationFailure(BinaryLogClient client, Exception ex) { }public void onDisconnect(BinaryLogClient client) { }}}
BinaryLogClient listenForEventPackets方法分析
listenForEventPackets方法负责监听mysql主服务器的binlog同步,负责接收binlog并将binlog转换成Event事件,将事件放到ChangeEventQueue里,保证DebeziumEngine能拿到,
阅读listenForEventPackets源码可知,mysql binlog协议格式如下:
3字节 | 1字节 | 1字节 | 若干字节 |
包长度 | mysql序列号 | 标识是否是错误包 | binlog实体数据 |
可以看到,mysql binlog再应用层设计协议时,是按照event为单位进行传输的,一个Binlog包只有一个event.
listenForEventPackets代码也分成三大块:
1.阻塞读取binlog 包
2.将读取到的binlog包序列化成event
3.分发event
4.记录binlog已同步文件名和已同步偏移量
1.阻塞读取
2.序列化.
序列化这一步使用的EventDeserializer是匿名内部类,再MysqlStreamingChangeEventSource的构造方法里生成的,代码是:
public MySqlStreamingChangeEventSource(MySqlConnectorConfig connectorConfig, MySqlConnection connection, EventDispatcher<MySqlPartition, TableId> dispatcher, ErrorHandler errorHandler, Clock clock, MySqlTaskContext taskContext, MySqlStreamingChangeEventSourceMetrics metrics) {.......省略其他代码EventDeserializer eventDeserializer = new EventDeserializer() {public Event nextEvent(ByteArrayInputStream inputStream) throws IOException {try {Event event = super.nextEvent(inputStream);if (event.getHeader().getEventType() == EventType.TABLE_MAP) {TableMapEventData tableMapEvent = (TableMapEventData)event.getData();tableMapEventByTableId.put(tableMapEvent.getTableId(), tableMapEvent);}return event;} catch (EventDataDeserializationException var5) {if (var5.getCause() instanceof IOException) {throw var5;} else {EventHeaderV4 header = new EventHeaderV4();header.setEventType(EventType.INCIDENT);header.setTimestamp(var5.getEventHeader().getTimestamp());header.setServerId(var5.getEventHeader().getServerId());if (var5.getEventHeader() instanceof EventHeaderV4) {header.setEventLength(((EventHeaderV4)var5.getEventHeader()).getEventLength());header.setNextPosition(((EventHeaderV4)var5.getEventHeader()).getNextPosition());header.setFlags(((EventHeaderV4)var5.getEventHeader()).getFlags());}EventData data = new EventDataDeserializationExceptionData(var5);return new Event(header, data);}}}};eventDeserializer.setEventDataDeserializer(EventType.STOP, new StopEventDataDeserializer());eventDeserializer.setEventDataDeserializer(EventType.GTID, new GtidEventDataDeserializer());
}eventDeserializer.setEventDataDeserializer(EventType.WRITE_ROWS, new RowDeserializers.WriteRowsDeserializer(tableMapEventByTableId));eventDeserializer.setEventDataDeserializer(EventType.UPDATE_ROWS, new RowDeserializers.UpdateRowsDeserializer(tableMapEventByTableId));eventDeserializer.setEventDataDeserializer(EventType.DELETE_ROWS, new RowDeserializers.DeleteRowsDeserializer(tableMapEventByTableId));eventDeserializer.setEventDataDeserializer(EventType.EXT_WRITE_ROWS, (new RowDeserializers.WriteRowsDeserializer(tableMapEventByTableId)).setMayContainExtraInformation(true));eventDeserializer.setEventDataDeserializer(EventType.EXT_UPDATE_ROWS, (new RowDeserializers.UpdateRowsDeserializer(tableMapEventByTableId)).setMayContainExtraInformation(true));eventDeserializer.setEventDataDeserializer(EventType.EXT_DELETE_ROWS, (new RowDeserializers.DeleteRowsDeserializer(tableMapEventByTableId)).setMayContainExtraInformation(true));this.client.setEventDeserializer(eventDeserializer);//注册EventDeserializer
这个匿名的EventDeserializer继承了EventDeserializer,该类的nextEvent方法为:
public Event nextEvent(ByteArrayInputStream inputStream) throws IOException {if (inputStream.peek() == -1) {return null;}
//序列化头部EventHeader eventHeader = eventHeaderDeserializer.deserialize(inputStream);EventData eventData;switch (eventHeader.getEventType()) {case FORMAT_DESCRIPTION:eventData = deserializeFormatDescriptionEventData(inputStream, eventHeader);break;case TABLE_MAP:eventData = deserializeTableMapEventData(inputStream, eventHeader);/*tableMapEventByTableId.put(tableMapEvent.getTableId(), tableMapEvent);把table_map信息提前存入map里,保证后续的ROWS_EVENT可以正确的反序列化 */ break;default:EventDataDeserializer eventDataDeserializer = getEventDataDeserializer(eventHeader.getEventType());//根据头部事件获取序列化器eventData = deserializeEventData(inputStream, eventHeader, eventDataDeserializer);//序列化成eventData}return new Event(eventHeader, eventData);生成event}
可以看到,这个类就是比较简单的把Stream流反序列化为Event,并没有做别的事情;BinaryLogClient与它相比只是把Table_Map事件提前记下来,方便后来序列化Row事件.
根据binlog,ROWS_EVENT事件是无状态没有schema,为了保证从库正确反序列化ROWS_EVENT,主库会在Row事件前添加Table_Map事件,table_map事件会包括表各个字段的介绍.
相应的,binlog在写入时会把相同库的row事件写在一起,而不按sql顺序来写.
3.事件监听器处理事件.
先要清楚BinaryLogClient注册了哪些事件监听器,client注册监听器的代码出现在
MysqlStreamingChangeEventSource的execute方法里:
public void execute(ChangeEventSource.ChangeEventSourceContext context, MySqlPartition partition, MySqlOffsetContext offsetContext) throws InterruptedException {......BinaryLogClient.EventListener listener;//调用handler方法处理事件,根据缓冲区大小分为两个,第二个主要用于Statement格式下的Binlog,该格式下会记录事务中回滚的sql语句,用缓冲区可以暂时保存一个事务的sql语句,并把回滚的语句排除掉.不会派给后面的ChangeEventQueue,剩下的功能两个监听器是一样的,都是调用了handleEvent方法
if (this.connectorConfig.bufferSizeForStreamingChangeEventSource() == 0) {listener = (event) -> {this.handleEvent(partition, effectiveOffsetContext, event);};} else {EventBuffer buffer = new EventBuffer(this.connectorConfig.bufferSizeForStreamingChangeEventSource(), this, context);listener = (event) -> {buffer.add(partition, effectiveOffsetContext, event);};}this.client.registerEventListener(listener);this.client.registerLifecycleListener(new ReaderThreadLifecycleListener(effectiveOffsetContext));//记录一些统计数据的监听器this.client.registerEventListener((event) -> {this.onEvent(effectiveOffsetContext, event);});//对事件输出日志的监听器if (LOGGER.isDebugEnabled()) {this.client.registerEventListener((event) -> {this.logEvent(effectiveOffsetContext, event);});}
看代码可以知道,BinaryLogClient注册的监听器有三个,功能入下:
1.调用MysqlStreamingChangeEventSource的handleEvent方法处理并派发事件
2.调用MysqlStreamingChangeEventSource的onEvent方法统计一些数据
3.将event输出到日志中
因此监听器的核心功能就是MysqlStreamingChangeEventSource的handlerEvent方法:
handleEvent方法分析
hanleEvent方法中, eventHandler方法调用的accept方法,会在最后执行发布事件的函数,将数据放到queue里.
accept方法的最后会把eventDispatcher的方法,将event入队
protected void handleEvent(MySqlPartition partition, MySqlOffsetContext offsetContext, Event event) {
if (event != null) {
EventHeader eventHeader = event.getHeader();
......
((BlockingConsumer)this.eventHandlers.getOrDefault(eventType, (e) -> {this.ignoreEvent(offsetContext, e);
})).accept(event);
this.eventDispatcher.dispatchHeartbeatEvent(partition, offsetContext);
offsetContext.completeEvent();.......}
我们以InsertRows为例,实际调用MysqlStreamingChangeEventSource的handleInsert方法,调用链为:
MysqlStreamingChangeEventSource.handleEvent
->MysqlStreamingChangeEventSource.handlerInsert
->MysqlStreamingChangeEventSource.handleChange
->BinlogChangeEmitter.emit
->EventDispatcher.dispatchDataChangeEvent
->ChangeRecordEmitter.emitChangeRcords
->StreamingChangeRecordReceiver.changeRecord
->ChangeEventQueue.enqueue
首先,DebeziumEngine run方法获取Event的类是MysqlConnectonTask类里的queue属性,这是一个ChangeEventQueue类,在方法
public ChangeEventSourceCoordinator<MySqlPartition, MySqlOffsetContext> start(Configuration config)
中,queue被传递给了eventDispatcher,
EventDispatcher<MySqlPartition, TableId> dispatcher = new EventDispatcher(connectorConfig, topicSelector, this.schema, this.queue, connectorConfig.getTableFilters().dataCollectionFilter(), DataChangeEvent::new, (EventDispatcher.InconsistentSchemaHandler)null, metadataProvider, new HeartbeatFactory(connectorConfig, topicSelector, schemaNameAdjuster, () -> {return new MySqlConnection(new MySqlConnection.MySqlConnectionConfiguration(config), (MySqlFieldReader)(connectorConfig.useCursorFetch() ? new MySqlBinaryProtocolFieldReader(connectorConfig) : new MySqlTextProtocolFieldReader(connectorConfig)));}, (exception) -> {switch (exception.getSQLState()) {case "42000":throw new DebeziumException("Could not execute heartbeat action query (Error: " + sqlErrorId + ")", exception);case "3D000":throw new DebeziumException("Could not execute heartbeat action query (Error: " + sqlErrorId + ")", exception);default:}}), schemaNameAdjuster);
观察EventDispatcher的构造函数, 里面的StreamingChangeRecordReceiver类的changeRecord方法会将Event放到ChangeEventQueue里,代码为:
private final class StreamingChangeRecordReceiver implements ChangeRecordEmitter.Receiver<P> {@Overridepublic void changeRecord(P partition,DataCollectionSchema dataCollectionSchema,Operation operation,Object key, Struct value,OffsetContext offsetContext,ConnectHeaders headers)throws InterruptedException {Objects.requireNonNull(value, "value must not be null");LOGGER.trace("Received change record for {} operation on key {}", operation, key);// Truncate events must have null key schema as they are sent to table topics without keysSchema keySchema = (key == null && operation == Operation.TRUNCATE) ? null: dataCollectionSchema.keySchema();String topicName = topicSelector.topicNameFor((T) dataCollectionSchema.id());//生成SourceRecordSourceRecord record = new SourceRecord(partition.getSourcePartition(),offsetContext.getOffset(), //map一些描述信息,topicName, null,keySchema, key, //仍然是schemadataCollectionSchema.getEnvelopeSchema().schema(),//表schema,也就是各列对应的name和mysqlTypevalue, //真正的数据null,headers);//入队queue.enqueue(changeEventCreator.createDataChangeEvent(record));if (emitTombstonesOnDelete && operation == Operation.DELETE) {SourceRecord tombStone = record.newRecord(record.topic(),record.kafkaPartition(),record.keySchema(),record.key(),null, // value schemanull, // valuerecord.timestamp(),record.headers());queue.enqueue(changeEventCreator.createDataChangeEvent(tombStone));}}}
EventDispatcher对象的方法将实现Event的入队
public boolean dispatchDataChangeEvent(P partition, T dataCollectionId, ChangeRecordEmitter<P> changeRecordEmitter) throws InterruptedException {
......
//该方法将生成SourceRecord对象,并放入ChangeEventQueue里streamingReceiver.changeRecord(partition, schema, operation, key, value, offset, headers);....}
归纳:
实际上,在BinaryLogClient获取到stream流到ChangeEventQueue入队Event,中间没有额外的数据拷贝和系统调用,这个流程非常的快, 核心就在于consumer方法能否实现快速的消费消息,如果在consumer里直接进行数据的同步,显然是比较慢的,比较好的处理方式是写入到kafka,rocketmq等消息队列里.交给消费者异步消费.是比较稳妥地方案.
EventDispatcher
eventDispatcher实现了Event的转发.,待编集.............
我们需要关注的是谁把Event放到了ChangeEventQueue里,阅读源码了解到实现转发功能的核心类是EventDispatcher.它里面的dispatchDataChangeEvent和dispatchSnapshotEvent
与mysql交互的Socket就保存在PacketChannel里,里面保存了Input/OutputStream,binllog数据直接在InputStream里获得.之后通过EventDeserializer进行反序列化成Event后,放到ChangeEventQueue里,DebeziumEngine的run方法从里面获取数据后调用消费者执行即可.
因此消费者需要实现保存的方法.
BinaryLogClient 向mysql主服务器请求dump:这里会定义binlog的开始文件名,位移.之后mysql主服务器以此为依据向client传送binlog内容
private void requestBinaryLogStream() throws IOException {long serverId = blocking ? this.serverId : 0; // http://bugs.mysql.com/bug.php?id=71178Command dumpBinaryLogCommand;synchronized (gtidSetAccessLock) {if (gtidSet != null) {
//请求dumpdumpBinaryLogCommand = new DumpBinaryLogGtidCommand(serverId,useBinlogFilenamePositionInGtidMode ? binlogFilename : "",useBinlogFilenamePositionInGtidMode ? binlogPosition : 4,gtidSet);} else {dumpBinaryLogCommand = new DumpBinaryLogCommand(serverId, binlogFilename, binlogPosition);}}channel.write(dumpBinaryLogCommand);}public class DumpBinaryLogCommand implements Command {private long serverId;private String binlogFilename;private long binlogPosition;public DumpBinaryLogCommand(long serverId, String binlogFilename, long binlogPosition) {this.serverId = serverId;this.binlogFilename = binlogFilename;this.binlogPosition = binlogPosition;}@Override //序列化public byte[] toByteArray() throws IOException {ByteArrayOutputStream buffer = new ByteArrayOutputStream();buffer.writeInteger(CommandType.BINLOG_DUMP.ordinal(), 1);buffer.writeLong(this.binlogPosition, 4);buffer.writeInteger(0, 2); // flagbuffer.writeLong(this.serverId, 4);buffer.writeString(this.binlogFilename);return buffer.toByteArray();}}public void write(Command command) throws IOException {byte[] body = command.toByteArray();ByteArrayOutputStream buffer = new ByteArrayOutputStream();buffer.writeInteger(body.length, 3); // packet length 包的长度// see https://dev.mysql.com/doc/dev/mysql-server/8.0.11/page_protocol_basic_packets.html#sect_protocol_basic_packets_sequence_id// we only have to maintain a sequence number in the authentication phase.// what the point is, I do not knowif ( authenticationComplete ) {packetNumber = 0;}buffer.writeInteger(packetNumber++, 1);buffer.write(body, 0, body.length);outputStream.write(buffer.toByteArray());// though it has no effect in case of default (underlying) output stream (SocketOutputStream),// it may be necessary in case of non-default oneoutputStream.flush();}
反序列化
其中依靠EventDeserializer 属性来实现binlog得序列化, 将各种Binlog事件转换为各种eventData在转换成各类Event,传给Debezium得下游,这些序列化器都依据mysql网络协议进行序列化.
private final EventHeaderDeserializer eventHeaderDeserializer;private final EventDataDeserializer defaultEventDataDeserializer;private final Map<EventType, EventDataDeserializer> eventDataDeserializers;private void registerDefaultEventDataDeserializers() {eventDataDeserializers.put(EventType.FORMAT_DESCRIPTION,new FormatDescriptionEventDataDeserializer());eventDataDeserializers.put(EventType.ROTATE,new RotateEventDataDeserializer());eventDataDeserializers.put(EventType.INTVAR,new IntVarEventDataDeserializer());eventDataDeserializers.put(EventType.QUERY,new QueryEventDataDeserializer());eventDataDeserializers.put(EventType.TABLE_MAP,new TableMapEventDataDeserializer());eventDataDeserializers.put(EventType.XID,new XidEventDataDeserializer());eventDataDeserializers.put(EventType.WRITE_ROWS,new WriteRowsEventDataDeserializer(tableMapEventByTableId));eventDataDeserializers.put(EventType.UPDATE_ROWS,new UpdateRowsEventDataDeserializer(tableMapEventByTableId));eventDataDeserializers.put(EventType.DELETE_ROWS,new DeleteRowsEventDataDeserializer(tableMapEventByTableId));eventDataDeserializers.put(EventType.EXT_WRITE_ROWS,new WriteRowsEventDataDeserializer(tableMapEventByTableId).setMayContainExtraInformation(true));eventDataDeserializers.put(EventType.EXT_UPDATE_ROWS,new UpdateRowsEventDataDeserializer(tableMapEventByTableId).setMayContainExtraInformation(true));eventDataDeserializers.put(EventType.EXT_DELETE_ROWS,new DeleteRowsEventDataDeserializer(tableMapEventByTableId).setMayContainExtraInformation(true));eventDataDeserializers.put(EventType.ROWS_QUERY,new RowsQueryEventDataDeserializer());eventDataDeserializers.put(EventType.GTID,new GtidEventDataDeserializer());eventDataDeserializers.put(EventType.PREVIOUS_GTIDS,new PreviousGtidSetDeserializer());eventDataDeserializers.put(EventType.XA_PREPARE,new XAPrepareEventDataDeserializer());}
在哪里调用这些序列化方法以及转换呢?答案是EventDeserializer得nextEvent方法里
/*** @return deserialized event or null in case of end-of-stream* @param inputStream input stream to fetch event from* @throws IOException if connection gets closed*/public Event nextEvent(ByteArrayInputStream inputStream) throws IOException {if (inputStream.peek() == -1) {return null;}
//头部的序列化都是一致的,所以先调用eventHeaderDeserializer属性序列化头部EventHeader eventHeader = eventHeaderDeserializer.deserialize(inputStream);EventData eventData;switch (eventHeader.getEventType()) {//根据头部的eventType转换成EventData case FORMAT_DESCRIPTION:eventData = deserializeFormatDescriptionEventData(inputStream, eventHeader);break;case TABLE_MAP:eventData = deserializeTableMapEventData(inputStream, eventHeader);break;default:EventDataDeserializer eventDataDeserializer = getEventDataDeserializer(eventHeader.getEventType());eventData = deserializeEventData(inputStream, eventHeader, eventDataDeserializer);}//最后再生成Event,这里就可以交给Debezium下游了return new Event(eventHeader, eventData);}
谁将Event放入ChangeEventQueue里
MySqlStreamingChangeEventSource的handleEvent方法,会将转换好的Event类放入MysqlContext里的ChangeEventQueue里,供DebeziumEngine的run方法获取到,进行处理
protected void handleEvent(MySqlPartition partition, MySqlOffsetContext offsetContext, Event event) {if (event != null) {EventHeader eventHeader = event.getHeader();this.eventTimestamp = !eventHeader.getEventType().equals(EventType.HEARTBEAT) ? Instant.ofEpochMilli(eventHeader.getTimestamp()) : null;offsetContext.setBinlogServerId(eventHeader.getServerId());EventType eventType = eventHeader.getEventType();if (eventType == EventType.ROTATE) {EventData eventData = event.getData();RotateEventData rotateEventData;if (eventData instanceof EventDeserializer.EventDataWrapper) {rotateEventData = (RotateEventData)((EventDeserializer.EventDataWrapper)eventData).getInternal();} else {rotateEventData = (RotateEventData)eventData;}offsetContext.setBinlogStartPoint(rotateEventData.getBinlogFilename(), rotateEventData.getBinlogPosition());} else if (eventHeader instanceof EventHeaderV4) {EventHeaderV4 trackableEventHeader = (EventHeaderV4)eventHeader;offsetContext.setEventPosition(trackableEventHeader.getPosition(), trackableEventHeader.getEventLength());}try {//调用事件处理器处理事件((BlockingConsumer)this.eventHandlers.getOrDefault(eventType, (e) -> {this.ignoreEvent(offsetContext, e);})).accept(event);//根据partition,offsetContext获得Event,并放入ChangeEventQueue里this.eventDispatcher.dispatchHeartbeatEvent(partition, offsetContext);offsetContext.completeEvent();//事件完成this.lastOffset = offsetContext.getOffset();if (this.skipEvent) {--this.initialEventsToSkip;this.skipEvent = this.initialEventsToSkip > 0L;}} catch (RuntimeException var8) {this.logStreamingSourceState();this.errorHandler.setProducerThrowable(new DebeziumException("Error processing binlog event", var8));this.eventHandlers.clear();LOGGER.info("Error processing binlog event, and propagating to Kafka Connect so it stops this connector. Future binlog events read before connector is shutdown will be ignored.");} catch (InterruptedException var9) {Thread.currentThread().interrupt();this.eventHandlers.clear();LOGGER.info("Stopped processing binlog events due to thread interruption");}}}
BinaryLogClient的listenForEventPackets方法会形成事件循环在里面循环读取数据库连接InputStream,来消费最新的binlog,因此这就是另外一个线程,我们需要知道哪个方法调用了这个方法,实际上正是BinaryLogClient的connect 方法在最后调用了该方法,而connect方法也是由connect(long timeout)通过启动新的线程执行的
调用链为:
MysqlStreamingChangeEvent.execute
->BinaryLogClient.connect(long timeout)
->BinaryLogClient.connect()
->listenForEventPackets()
->notifyEventListeners()
这个调用链将socket里的binlog转换为Event,再添加到ChangeEventQueue里,之后DebeziumEngine的run()方法通过task.poll()拿到Event; 调用我们自定义的Consumer消费event.
我们可以将该event同步到kafka,elasticsearch等数据库里.实现数据热更新.
private void listenForEventPackets() throws IOException {ByteArrayInputStream inputStream = channel.getInputStream();boolean completeShutdown = false;try {while (inputStream.peek() != -1) {int packetLength = inputStream.readInteger(3);inputStream.skip(1); // 1 byte for sequenceint marker = inputStream.read();if (marker == 0xFF) {ErrorPacket errorPacket = new ErrorPacket(inputStream.read(packetLength - 1));throw new ServerException(errorPacket.getErrorMessage(), errorPacket.getErrorCode(),errorPacket.getSqlState());}if (marker == 0xFE && !blocking) {completeShutdown = true;break;}Event event;try {event = eventDeserializer.nextEvent(packetLength == MAX_PACKET_LENGTH ?new ByteArrayInputStream(readPacketSplitInChunks(inputStream, packetLength - 1)) :inputStream);if (event == null) {throw new EOFException();}} catch (Exception e) {Throwable cause = e instanceof EventDataDeserializationException ? e.getCause() : e;if (cause instanceof EOFException || cause instanceof SocketException) {throw e;}if (isConnected()) {for (LifecycleListener lifecycleListener : lifecycleListeners) {lifecycleListener.onEventDeserializationFailure(this, e);}}continue;}if (isConnected()) {eventLastSeen = System.currentTimeMillis();updateGtidSet(event);notifyEventListeners(event);//事件处理函数updateClientBinlogFilenameAndPosition(event);//更新消费进度}}} catch (Exception e) {if (isConnected()) {for (LifecycleListener lifecycleListener : lifecycleListeners) {lifecycleListener.onCommunicationFailure(this, e);}}} finally {if (isConnected()) {if (completeShutdown) {disconnect(); // initiate complete shutdown sequence (which includes keep alive thread)} else {disconnectChannel();}}}}
binlog消费位移的提交:
binlog的提交使用了代理模式,由EmbeddEngine下的ChangeConsumer的handleBatch方法实现,该方法在每批次消费完Event后提交位移(每次完成后修改内存位移字段,每批次完成后flush).
源码:
public static interface ChangeConsumer extends DebeziumEngine.ChangeConsumer<SourceRecord> {}private static ChangeConsumer buildDefaultChangeConsumer(Consumer<SourceRecord> consumer) {return new ChangeConsumer() {/*** the default implementation that is compatible with the old Consumer api.** On every record, it calls the consumer, and then only marks the record* as processed when accept returns, additionally, it handles StopConnectorExceptions* and ensures that we all ways try and mark a batch as finished, even with exceptions* @param records the records to be processed* @param committer the committer that indicates to the system that we are finished** @throws Exception*/@Overridepublic void handleBatch(List<SourceRecord> records, DebeziumEngine.RecordCommitter<SourceRecord> committer) throws InterruptedException {for (SourceRecord record : records) {try {consumer.accept(record);//用户自定义逻辑committer.markProcessed(record);//标记已经处理}catch (StopConnectorException | StopEngineException ex) {// ensure that we mark the record as finished// in this casecommitter.markProcessed(record);throw ex;}}//在这里真正德执行flush方法,落入硬盘或者提交到消息队列里committer.markBatchFinished();protected void maybeFlush(OffsetStorageWriter offsetWriter, OffsetCommitPolicy policy, Duration commitTimeout,SourceTask task) throws InterruptedException {
// Determine if we need to commit to offset storage ...long timeSinceLastCommitMillis = clock.currentTimeInMillis() - timeOfLastCommitMillis;
//判断距离上一次刷新的时间间隔是否支持本次flsuhif (policy.performCommit(recordsSinceLastCommit,Duration.ofMillis(timeSinceLastCommitMillis))) {
//最终根据实现方式选择kafka.send方法后者本地fileInputStream的writeObject方法进行持久化commitOffsets(offsetWriter, commitTimeout, task);}}}};}protected RecordCommitter buildRecordCommitter(OffsetStorageWriter offsetWriter, SourceTask task, Duration commitTimeout) {return new RecordCommitter() {@Overridepublic synchronized void markProcessed(SourceRecord record) throws InterruptedException {//MysqlConnectorTask中记录lastOffsettask.commitRecord(record);//只是在内存中标记recordsSinceLastCommit += 1;//已提交的记录数量offsetWriter.offset((Map<String, Object>) record.sourcePartition(), (Map<String, Object>) record.sourceOffset());//记录偏移量}}
consumer的消费方法为:
a.每个消费完成后更新MysqlConnectorTask里的内存字段.
private volatile Map<String, ?> lastOffset;
更新例子如下:
sourceOffset={
transaction_id=null,
ts_sec=1759148658,
file=mysql-bin.000006,
pos=2660, row=99,
server_id=1,event=2
}
MysqlPartition如下:
b.在ChangeConsumer里最后调用 committer.markBatchFinished();该方法根据
policy.performCommit(recordsSinceLastCommit, Duration.ofMillis(timeSinceLastCommitMillis))
的返回值确定是否要进行持久化保存.持久化保存的方式有两种:
1.写入kafka消息队列
2.写入本地文件.
默认是周期性提交
Debezium调参
BinaryLogClient的参数设置集中在MsqlStreamingEventChangeSource的构造方法和execute方法里,重点关注这两个方法:
构造方法设置的参数:
1.keepAliveInternal时间,BinaryLogClient有一个后台线程负责监控连接的状态,负责重新发起连接,keepAliveInternal就是该线程sleep的时间,
2.heatbeatInternale=keepAliveInternal*0.8
3.SSLMode
4.serverId: 主从复制使用
execute方法:
binlogFileName,
binlogPosition
GtidSet
MysqlTaskContext构造方法:
username,password,hostname,port
未完成......
Debezium数据同步测试.
使用mybatis插入10000条数据, 用时13516ms,13s,Debezium这边基本是瞬时就过来了.那么大批量情况下,上下游数据消费速度差异是会引起Debezium崩溃的,因为Debezium会把Event存储在ChangeEventQueue里,该queue是有限制的.
this.queue = (new ChangeEventQueue.Builder())
.pollInterval(connectorConfig.getPollInterval())
.maxBatchSize(connectorConfig.getMaxBatchSize())
.maxQueueSize(connectorConfig.getMaxQueueSize())
.maxQueueSizeInBytes(connectorConfig.getMaxQueueSizeInBytes())
.loggingContextSupplier(() -> {return this.taskContext.configureLoggingContext("mysql-connector-task");}).buffering().build();
ChangeEventQueue本身也有限流措施,体现在doEnqueue里,如果队列当前总大小大于配置规定,就会阻塞当前线程,阻止继续从InputStream里获取Event
protected void doEnqueue(T record) throws InterruptedException {if (LOGGER.isDebugEnabled()) {LOGGER.debug("Enqueuing source record '{}'", record);}while(this.maxQueueSizeInBytes > 0L && this.currentQueueSizeInBytes.get() > this.maxQueueSizeInBytes) {//阻塞当前线程,pollInterval msThread.sleep(this.pollInterval.toMillis());}if (this.maxQueueSizeInBytes > 0L) {long messageSize = ObjectSizeCalculator.getObjectSize(record);this.objectMap.put(record, messageSize);this.currentQueueSizeInBytes.addAndGet(messageSize);}this.queue.put(record);}
Debezium增加吞吐量思路:
其实有一个比较好的解决办法来解决MysqlBinlog 传输与Debezium之间消费速度差异的方法是使用linux虚拟文件系统,以及Mmap映射, BinaryLogClient每次都写到内存虚拟文件的PageCache里,JAVA再从pageCache读取.如果一个文件是2G,那么这么做容纳量很大.