com.mysql.cj.jdbc.Driver 解析
Driver类的文档描述:
package com.mysql.cj.jdbc;import java.sql.SQLException;/*** The Java SQL framework allows for multiple database drivers. Each driver should supply a class that implements the Driver interface* * <p>* The DriverManager will try to load as many drivers as it can find and then for any given connection request, it will ask each driver in turn to try to* connect to the target URL.* * <p>* It is strongly recommended that each Driver class should be small and standalone so that the Driver class can be loaded and queried without bringing in vast* quantities of supporting code.* * <p>* When a Driver class is loaded, it should create an instance of itself and register it with the DriverManager. This means that a user can load and register a* driver by doing Class.forName("foo.bah.Driver")*/
public class Driver extends NonRegisteringDriver implements java.sql.Driver {//// Register ourselves with the DriverManager//static {try {java.sql.DriverManager.registerDriver(new Driver());} catch (SQLException E) {throw new RuntimeException("Can't register driver!");}}/*** Construct a new driver and register it with DriverManager* * @throws SQLException* if a database error occurs.*/public Driver() throws SQLException {// Required for Class.forName().newInstance()}
}
这里的英文描述核心是说:
1.Driver类要保证自己被加载时就主动注册到DriverManager里
2.DriverManager会一次尝试使用每个Driver与数据库建立连接
3.每个Driver实现类需要实现java.sql.Driver接口
4.Driver实现类不应该有依赖项,自己作为一个jar包就可以完成工作;
对于描述1,可以看到Driver的static代码块中调用了
java.sql.DriverManager.registerDriver(new Driver())
来保证Class被加载时就能注册进DriverManager里,这行代码是怎么执行注册的呢?,跟踪发现调用链:
java.sql.DriverManager.registerDriver
->registerDriver(java.sql.Driver driver,DriverAction da /*注册时该变量为Null,该类的负责取消注册*/)
->registeredDrivers.addIfAbsent(new DriverInfo(driver, da));
registerDrivers的定义如下,所以注册就是加入DriverManager的属性队列里
private final static CopyOnWriteArrayList<DriverInfo> registeredDrivers = new CopyOnWriteArrayList<>();
com.mysql.cj.jdbc.Driver类通过继承NonRegisteringDriver来实现java.sql.Driver里的方法.NonRegisteringDriver类
NonRegisteringDriver 的静态代码块会开启一个清理mysql弃置连接的后台线程,核心是通过软引用实现的.
static {try {Class.forName(AbandonedConnectionCleanupThread.class.getName());} catch (ClassNotFoundException e) {// ignore}}Reference<? extends MysqlConnection> reference是一个软引用队列,jvm进行垃圾回收时,完成对垃圾回收对象的回收后会把被回收的对象放到这个队列里,AbandonedConnectionCleanupThread对象拿到后调用方法finalizeResource方法关闭MysqlConnection的底层连接以及InputStream和OutputStream
AbandonedConnectionCleanupThread
public void run() {for (;;) {try {checkThreadContextClassLoader();Reference<? extends MysqlConnection> reference = referenceQueue.remove(5000);if (reference != null) {finalizeResource((ConnectionFinalizerPhantomReference) reference);}} catch (InterruptedException e) {threadRefLock.lock();try {threadRef = null;// Finalize remaining references.Reference<? extends MysqlConnection> reference;while ((reference = referenceQueue.poll()) != null) {finalizeResource((ConnectionFinalizerPhantomReference) reference);}connectionFinalizerPhantomRefs.clear();} finally {threadRefLock.unlock();}return;} catch (Exception ex) {// Nowhere to really log this.}}}
ConnectionImpl
ConnectionImpl类完成参数设置,交给NativerSocketConnection完成底层连接,构造函数里的createNewIO方法完成连接
HostInfo类保存连接的信息,比如host,port,username,password, hostProperties
NativeSocketConnection
NativeSocketConnection 的connection方法完成建立连接以及读写缓冲区分配,读写缓冲区大小居为16M,
@Overridepublic void connect(String hostName, int portNumber, PropertySet propSet, ExceptionInterceptor excInterceptor, Log log, int loginTimeout) {try {this.port = portNumber;this.host = hostName;this.propertySet = propSet;this.exceptionInterceptor = excInterceptor;this.socketFactory = createSocketFactory(propSet.getStringProperty(PropertyKey.socketFactory).getStringValue());this.mysqlSocket = this.socketFactory.connect(this.host, this.port, propSet, loginTimeout);int socketTimeout = propSet.getIntegerProperty(PropertyKey.socketTimeout).getValue();if (socketTimeout != 0) {try {this.mysqlSocket.setSoTimeout(socketTimeout);} catch (Exception ex) {/* Ignore if the platform does not support it */}}this.socketFactory.beforeHandshake();InputStream rawInputStream;if (propSet.getBooleanProperty(PropertyKey.useReadAheadInput).getValue()) {rawInputStream = new ReadAheadInputStream(this.mysqlSocket.getInputStream(), 16384,propSet.getBooleanProperty(PropertyKey.traceProtocol).getValue(), log);} else if (propSet.getBooleanProperty(PropertyKey.useUnbufferedInput).getValue()) {rawInputStream = this.mysqlSocket.getInputStream();} else {rawInputStream = new BufferedInputStream(this.mysqlSocket.getInputStream(), 16384);}this.mysqlInput = new FullReadInputStream(rawInputStream);this.mysqlOutput = new BufferedOutputStream(this.mysqlSocket.getOutputStream(), 16384);} catch (IOException ioEx) {throw ExceptionFactory.createCommunicationsException(propSet, null, new PacketSentTimeHolder() {}, null, ioEx, getExceptionInterceptor());}}
NativeProtocol完成与mysql的交互,SimplePacketReader,SimplePacketSender完成底层socket的读写
public class SimplePacketSender implements MessageSender<NativePacketPayload> {private BufferedOutputStream outputStream;public SimplePacketSender(BufferedOutputStream outputStream) {this.outputStream = outputStream;}
//send方法利用mysql交互协议完成socket写public void send(byte[] packet, int packetLen, byte packetSequence) throws IOException {PacketSplitter packetSplitter = new PacketSplitter(packetLen);while (packetSplitter.nextPacket()) {this.outputStream.write(NativeUtils.encodeMysqlThreeByteInteger(packetSplitter.getPacketLen()));this.outputStream.write(packetSequence++);this.outputStream.write(packet, packetSplitter.getOffset(), packetSplitter.getPacketLen());}this.outputStream.flush();}@Overridepublic MessageSender<NativePacketPayload> undecorateAll() {return this;}@Overridepublic MessageSender<NativePacketPayload> undecorate() {return this;}
}
/** Copyright (c) 2016, 2021, Oracle and/or its affiliates.** This program is free software; you can redistribute it and/or modify it under* the terms of the GNU General Public License, version 2.0, as published by the* Free Software Foundation.** This program is also distributed with certain software (including but not* limited to OpenSSL) that is licensed under separate terms, as designated in a* particular file or component or in included license documentation. The* authors of MySQL hereby grant you an additional permission to link the* program and your derivative works with the separately licensed software that* they have included with MySQL.** Without limiting anything contained in the foregoing, this file, which is* part of MySQL Connector/J, is also subject to the Universal FOSS Exception,* version 1.0, a copy of which can be found at* http://oss.oracle.com/licenses/universal-foss-exception.** This program is distributed in the hope that it will be useful, but WITHOUT* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS* FOR A PARTICULAR PURPOSE. See the GNU General Public License, version 2.0,* for more details.** You should have received a copy of the GNU General Public License along with* this program; if not, write to the Free Software Foundation, Inc.,* 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA*/package com.mysql.cj.protocol.a;import java.io.IOException;
import java.util.Optional;import com.mysql.cj.Messages;
import com.mysql.cj.conf.RuntimeProperty;
import com.mysql.cj.exceptions.CJPacketTooBigException;
import com.mysql.cj.protocol.MessageReader;
import com.mysql.cj.protocol.SocketConnection;/*** Simple implementation of {@link MessageReader} which handles the receiving of logical MySQL packets from the provided socket input stream.* Multi-packets are handled outside of this reader.*/
public class SimplePacketReader implements MessageReader<NativePacketHeader, NativePacketPayload> {protected SocketConnection socketConnection;protected RuntimeProperty<Integer> maxAllowedPacket;private byte readPacketSequence = -1;NativePacketHeader lastHeader = null;NativePacketPayload lastMessage = null;public SimplePacketReader(SocketConnection socketConnection, RuntimeProperty<Integer> maxAllowedPacket) {this.socketConnection = socketConnection;this.maxAllowedPacket = maxAllowedPacket;}@Overridepublic NativePacketHeader readHeader() throws IOException {if (this.lastHeader == null) {return readHeaderLocal();}NativePacketHeader hdr = this.lastHeader;this.lastHeader = null;this.readPacketSequence = hdr.getMessageSequence();return hdr;}@Overridepublic NativePacketHeader probeHeader() throws IOException {this.lastHeader = readHeaderLocal();return this.lastHeader;}private NativePacketHeader readHeaderLocal() throws IOException {NativePacketHeader hdr = new NativePacketHeader();try {this.socketConnection.getMysqlInput().readFully(hdr.getBuffer().array(), 0, NativeConstants.HEADER_LENGTH);int packetLength = hdr.getMessageSize();if (packetLength > this.maxAllowedPacket.getValue()) {throw new CJPacketTooBigException(packetLength, this.maxAllowedPacket.getValue());}} catch (IOException | CJPacketTooBigException e) {try {this.socketConnection.forceClose();} catch (Exception ex) {// ignore}throw e;}this.readPacketSequence = hdr.getMessageSequence();return hdr;}@Overridepublic NativePacketPayload readMessage(Optional<NativePacketPayload> reuse, NativePacketHeader header) throws IOException {if (this.lastMessage == null) {return readMessageLocal(reuse, header);}NativePacketPayload buf = this.lastMessage;this.lastMessage = null;return buf;}@Overridepublic NativePacketPayload probeMessage(Optional<NativePacketPayload> reuse, NativePacketHeader header) throws IOException {this.lastMessage = readMessageLocal(reuse, header);return this.lastMessage;}private NativePacketPayload readMessageLocal(Optional<NativePacketPayload> reuse, NativePacketHeader header) throws IOException {try {int packetLength = header.getMessageSize();NativePacketPayload message;if (reuse.isPresent()) {message = reuse.get();// Set the Buffer to it's original statemessage.setPosition(0);// Do we need to re-alloc the byte buffer?if (message.getByteBuffer().length < packetLength) {// Note: We actually check the length of the buffer, rather than getBufLength(), because getBufLength()// is not necessarily the actual length of the byte array used as the buffermessage.setByteBuffer(new byte[packetLength]);}// Set the new lengthmessage.setPayloadLength(packetLength);} else {message = new NativePacketPayload(new byte[packetLength]);}// Read the data from the serverint numBytesRead = this.socketConnection.getMysqlInput().readFully(message.getByteBuffer(), 0, packetLength);if (numBytesRead != packetLength) {throw new IOException(Messages.getString("PacketReader.1", new Object[] { packetLength, numBytesRead }));}return message;} catch (IOException e) {try {this.socketConnection.forceClose();} catch (Exception ex) {// ignore}throw e;}}@Overridepublic byte getMessageSequence() {return this.readPacketSequence;}@Overridepublic void resetMessageSequence() {this.readPacketSequence = 0;}}