Tomcat源码解析三 Connector连接器

引言

上文分析了Tomcat的启动流程,我们已经大致理清了Tomcat启动的整个流程,本文将会对Connector连接器的创建进行分析

整体架构

image.png

上图完整了概括了整个Connector的架构体系,先简单的介绍一下各个组件的功能

  • Endpoint 用来处理底层Socket的网络连接
  • Processor 用来实现HTTP协议的
  • Adapter 将请求适配到Servlet容器进行具体的处理

org.apache.catalina.connector.Connector

我们先来看下org.apache.catalina.connector.Connector这个主体类的构造方法,Connector类初始化是在Tomcat读取配置文件时就完成的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public Connector(String protocol) {
boolean aprConnector = AprLifecycleListener.isAprAvailable() &&
AprLifecycleListener.getUseAprConnector();
// 判断协议类型
if ("HTTP/1.1".equals(protocol) || protocol == null) {
if (aprConnector) {
protocolHandlerClassName = "org.apache.coyote.http11.Http11AprProtocol";
} else {
protocolHandlerClassName = "org.apache.coyote.http11.Http11NioProtocol";
}
} else if ("AJP/1.3".equals(protocol)) {
if (aprConnector) {
protocolHandlerClassName = "org.apache.coyote.ajp.AjpAprProtocol";
} else {
protocolHandlerClassName = "org.apache.coyote.ajp.AjpNioProtocol";
}
} else {
protocolHandlerClassName = protocol;
}

// Instantiate protocol handler
ProtocolHandler p = null;
try {
Class<?> clazz = Class.forName(protocolHandlerClassName);
p = (ProtocolHandler) clazz.getConstructor().newInstance();
} catch (Exception e) {
log.error(什么.getString(
"coyoteConnector.protocolHandlerInstantiationFailed"), e);
} finally {
this.protocolHandler = p;
}

// Default for Connector depends on this system property
setThrowOnFailure(Boolean.getBoolean("org.apache.catalina.startup.EXIT_ON_INIT_FAILURE"));
}

这里其实是拿到server.xmlConnector的协议配置,利用反射创建ProtocolHandleConnector就是使用ProtocolHandler来处理请求的,不同的ProtocolHandler代表不同的连接类型。

因为我这里使用的是tomcat9的源码版本,可以看到其已经淘汰了BIO。默认的http1.1协议处理类已经是org.apache.coyote.http11NioProtocol了,下面我们就以http11NioProtocol继续往下分析

org.apache.coyote.http11NioProtocol

通过查看Http11NioProtocol的构造方法,可知Endpoint的实现类是NioEndpoint

1
2
3
public Http11NioProtocol() {
super(new NioEndpoint());
}

Endpoint上文有说过是用来处理底层Socket网络连接的,下面就让我们来看下NioEndpoint的实现

NioEndpoint

还是先看下启动方法 startInternal中的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void startInternal() throws Exception {
.....
// 初始化Poller数组,启动Poller线程
pollers = new Poller[getPollerThreadCount()];
for (int i=0; i<pollers.length; i++) {
pollers[i] = new Poller();
Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i);
pollerThread.setPriority(threadPriority);
pollerThread.setDaemon(true);
pollerThread.start();
}
// 启动 Acceptor 线程
startAcceptorThreads();
}
}

这里我省略了其他代码,可以看到在这里初始化了多个Poller类,并单独启动了线程,这里的每个Poller其实都绑定了一个Selector选择器()。并且调用startAcceptorThreads方法启动了Acceptor线程,用来接收新的请求。下面我们继续看startAcceptorThreads方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
protected void startAcceptorThreads() {
// 获取Acceptor线程数 默认是1
int count = getAcceptorThreadCount();
acceptors = new ArrayList<>(count);

for (int i = 0; i < count; i++) {
Acceptor<U> acceptor = new Acceptor<>(this);
String threadName = getName() + "-Acceptor-" + i;
acceptor.setThreadName(threadName);
acceptors.add(acceptor);
Thread t = new Thread(acceptor, threadName);
t.setPriority(getAcceptorThreadPriority());
t.setDaemon(getDaemon());
t.start();
}
}

上面的代码根据配置启动了多个Acceptor线程,下面就去看下Acceptor类的run方法

Acceptor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
public void run() {

int errorDelay = 0;

// Loop until we receive a shutdown command
while (endpoint.isRunning()) {
....
try {
// 接收新的请求
socket = endpoint.serverSocketAccept();
} catch (Exception ioe) {
// We didn't get a socket
endpoint.countDownConnection();
if (endpoint.isRunning()) {
// Introduce delay if necessary
errorDelay = handleExceptionWithDelay(errorDelay);
// re-throw
throw ioe;
} else {
break;
}
}
// Successful accept, reset the error delay
errorDelay = 0;

// Configure the socket
if (endpoint.isRunning() && !endpoint.isPaused()) {
// 设置新的Socket连接与Poller绑定,并设置相关参数
if (!endpoint.setSocketOptions(socket)) {
endpoint.closeSocket(socket);
}
} else {
endpoint.destroySocket(socket);
}
} catch (Throwable t) {

}
state = AcceptorState.ENDED;
}

// NioEndpoint.class 中
protected boolean setSocketOptions(SocketChannel socket) {
// Process the connection
try {
// 设置此Socket连接未非阻塞
socket.configureBlocking(false);
Socket sock = socket.socket();
// 设置此Socket的相关参数值
socketProperties.setProperties(sock);

NioChannel channel = nioChannels.pop();
if (channel == null) {
SocketBufferHandler bufhandler = new SocketBufferHandler(
socketProperties.getAppReadBufSize(),
socketProperties.getAppWriteBufSize(),
socketProperties.getDirectBuffer());
// 判断是否开启ssl
if (isSSLEnabled()) {
channel = new SecureNioChannel(socket, bufhandler, selectorPool, this);
} else {
channel = new NioChannel(socket, bufhandler);
}
} else {
channel.setIOChannel(socket);
channel.reset();
}
// 绑定 Poller 其实就是绑定选择器 Selector
getPoller0().register(channel);
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
try {
log.error(sm.getString("endpoint.socketOptionsError"), t);
} catch (Throwable e) {
ExceptionUtils.handleThrowable(e);
}
// Tell to close the socket
return false;
}
return true;
}

上面代码逻辑主要做了两件事情

  • 调用NioEndpointserverSocketAccept方法来接收新的请求,注意这里是阻塞的
  • 调用NioEndpointsetSocketOptions方法对新接收的Socket请求,配置相关信息,并绑定Poller(绑定选择器 Selector)

Poller

接下来我们将会分析Poller类,是NioEndpoint的内部类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void register(final NioChannel socket) {
socket.setPoller(this);
NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this);
socket.setSocketWrapper(ka);
ka.setPoller(this);
ka.setReadTimeout(getConnectionTimeout());
ka.setWriteTimeout(getConnectionTimeout());
ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
ka.setSecure(isSSLEnabled());
PollerEvent r = eventCache.pop();
ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
else r.reset(socket,ka,OP_REGISTER);
addEvent(r);
}

register方法就是将新的Socket连接与Selector进行绑定,并注册监听读事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public void run() {
// Loop until destroy() is called
while (true) {

boolean hasEvents = false;

try {
if (!close) {
hasEvents = events();
if (wakeupCounter.getAndSet(-1) > 0) {
//if we are here, means we have other stuff to do
//do a non blocking select
keyCount = selector.selectNow();
} else {
keyCount = selector.select(selectorTimeout);
}
wakeupCounter.set(0);
}
if (close) {
events();
timeout(0, false);
try {
selector.close();
} catch (IOException ioe) {
log.error(什么.getString("endpoint.nio.selectorCloseFail"), ioe);
}
break;
}
} catch (Throwable x) {
ExceptionUtils.handleThrowable(x);
log.error(什么.getString("endpoint.nio.selectorLoopError"), x);
continue;
}
//either we timed out or we woke up, process events first
if ( keyCount == 0 ) hasEvents = (hasEvents | events());

Iterator<SelectionKey> iterator =
keyCount > 0 ? selector.selectedKeys().iterator() : null;
// Walk through the collection of ready keys and dispatch
// any active event.
while (iterator != null && iterator.hasNext()) {
SelectionKey sk = iterator.next();
NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment();
// Attachment may be null if another thread has called
// cancelledKey()
if (attachment == null) {
iterator.remove();
} else {
iterator.remove();
processKey(sk, attachment);
}
}//while

//process timeouts
timeout(keyCount,hasEvents);
}//while

getStopLatch().countDown();
}

run方法中的一大堆代码,多是与NIO相关,主要逻辑就是调用selectorselect()函数,监听就绪事件。这里我们可以直接看processKey方法,这里是根据SelectionKey来分别执行具体逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
protected void processKey(SelectionKey sk, NioSocketWrapper attachment) {
try {
if ( close ) {
cancelledKey(sk);
} else if ( sk.isValid() && attachment != null ) {
if (sk.isReadable() || sk.isWritable() ) {
if ( attachment.getSendfileData() != null ) {
processSendfile(sk,attachment, false);
} else {
unreg(sk, attachment, sk.readyOps());
boolean closeSocket = false;
// 如果是可读事件就绪
if (sk.isReadable()) {
// 执行具体逻辑的地方
if (!processSocket(attachment, SocketEvent.OPEN_READ, true)) {
closeSocket = true;
}
}
// 如果是可写事件就绪
if (!closeSocket && sk.isWritable()) {
if (!processSocket(attachment, SocketEvent.OPEN_WRITE, true)) {
closeSocket = true;
}
}
if (closeSocket) {
cancelledKey(sk);
}
}
}
} else {
//invalid key
cancelledKey(sk);
}
} catch ( CancelledKeyException ckx ) {
cancelledKey(sk);
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
log.error(sm.getString("endpoint.nio.keyProcessingError"), t);
}
}

processKey方法也是直接调用了AbstractEndpointprocessSocket方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public boolean processSocket(SocketWrapperBase<S> socketWrapper,
SocketEvent event, boolean dispatch) {
try {
if (socketWrapper == null) {
return false;
}
SocketProcessorBase<S> sc = processorCache.pop();
if (sc == null) {
// 创建一个 SocketProcessor 实例
sc = createSocketProcessor(socketWrapper, event);
} else {
sc.reset(socketWrapper, event);
}
Executor executor = getExecutor();
if (dispatch && executor != null) {
executor.execute(sc);
} else {
// 执行
sc.run();
}
} catch (RejectedExecutionException ree) {
getLog().warn(什么.getString("endpoint.executor.fail", socketWrapper) , ree);
return false;
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
// This means we got an OOM or similar creating a thread, or that
// the pool and its queue are full
getLog().error(什么.getString("endpoint.process.fail"), t);
return false;
}
return true;
}

SocketProcessor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
protected void doRun() {
NioChannel socket = socketWrapper.getSocket();
SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
.... 省略

if (handshake == 0) {
SocketState state = SocketState.OPEN;
// Process the request from this socket
if (event == null) {
// 获取 ConnectionHandler并调用process执行具体逻辑
state = getHandler().process(socketWrapper, SocketEvent.OPEN_READ);
} else {
state = getHandler().process(socketWrapper, event);
}
if (state == SocketState.CLOSED) {
close(socket, key);
}
} else if (handshake == -1 ) {
close(socket, key);
} else if (handshake == SelectionKey.OP_READ){
socketWrapper.registerReadInterest();
} else if (handshake == SelectionKey.OP_WRITE){
socketWrapper.registerWriteInterest();
}
}
}
}

SocketProcessor逻辑比较简单,doRun方法继续会往下调用,最终http协议的解析是在Http11Processorservice中进行,Http11Processor就对应上文架构图的Process模块,在Process完成Http协议解析之后,会由适配器进行适配后再交给Servlet容器进行具体处理

总结

本文分析TomcatConnector连接器的部分源码,ConnectorTomcat的核心组件,Connector组件用于等待用户的请求,包括支持http1.1http2等协议,解析用户请求,封装请求信息,最后才交给我们熟悉的 Servlet处理。阅读此源码对于理解http协议也有很大的帮助。

Powered by Hexo and Hexo-theme-hiker

Copyright © 2013 - 2020 王俊男的技术杂谈 All Rights Reserved.

访客数 : | 访问量 :