Tomcat-8.5.57版本
这是顶级容器,代表整个 Tomcat 实例。在一个 Tomcat 实例中通常只有一个 Server 元素。
配置文件:conf/server.xml
Service 是 Server 的子元素,每个 Service 负责处理一组的请求。
在一个 Server 中可以定义多个 Service 元素,但在大多数配置中通常只有一个 Service。
每个 Service 包含一个 Engine 和一个或多个连接器 Connector(例如 HTTP 或 AJP 连接器)。
Connector 是 Service 的子元素,每个 Connector 负责监听、处理客户端连接、读写请求。
Poller用于epoll.wait接收客户端读写事件,并处理。
Acceptor用于阻塞接收客户端连接。
连接可以有多种协议,Http11NioProtocol其中封装了NioEndpoint。
NioEndpoint封装了Java的nio操作。
Engine 是 Service 的子元素,负责实际处理请求。它代表了一个请求处理管道。
每个 Service 只能包含一个 Engine 元素。
Engine 管理多个虚拟主机Host。
Host 是 Engine 的子元素,代表一个虚拟主机或域名。
一个 Engine 可以包含多个 Host,每个 Host 代表一个独立的虚拟主机(例如 www.zcc6248.top 和 app.zcc6248.top
可以是两个不同的 Host)。
Context 是 Host 的子元素,代表一个应用。
在一个 Host 中可以定义多个 Context(每个 Context 代表一个 Web 应用程序)。
www.zcc6248.top/app1
、www.zcc6248.top/app2
即分别表示两个Context
每个Wrapper封装一个Servlet。
Tomcat创建Acceptor线程用于accept()
阻塞接收客户端连接,将请求封装为PollerEvent对象加入Poller其中一个线程的events队列中,在Poller线程中将events队列中的事件注册到自己的选择器epoll中,并且select()
其中准备好的客户端连接,根据读写状态位来进行处理。
xinterface Lifecycle {
//生命周期事件
String BEFORE_INIT_EVENT = "before_init";
String AFTER_INIT_EVENT = "after_init";
String START_EVENT = "start";
String BEFORE_START_EVENT = "before_start";
String AFTER_START_EVENT = "after_start";
String STOP_EVENT = "stop";
String BEFORE_STOP_EVENT = "before_stop";
String AFTER_STOP_EVENT = "after_stop";
String AFTER_DESTROY_EVENT = "after_destroy";
String BEFORE_DESTROY_EVENT = "before_destroy";
String PERIODIC_EVENT = "periodic";
String CONFIGURE_START_EVENT = "configure_start";
String CONFIGURE_STOP_EVENT = "configure_stop";
//添加事件观察者
void addLifecycleListener(LifecycleListener listener);
LifecycleListener[] findLifecycleListeners();
void removeLifecycleListener(LifecycleListener listener);
void init() throws LifecycleException;
void start() throws LifecycleException;
void stop() throws LifecycleException;
void destroy() throws LifecycleException;
LifecycleState getState();
String getStateName();
}
对接口默认实现,并且在init()
、start()
、stop()
、destroy()
中都进行了状态切换,并且通知了观察者
子类子需要实现initInternal()
、startInternal()
、stopInternal()
、destroyInternal()
。
xxxxxxxxxx
abstract class LifecycleBase implements Lifecycle {
final synchronized void init() throws LifecycleException {
try {
setStateInternal(LifecycleState.INITIALIZING, null, false);
initInternal();
setStateInternal(LifecycleState.INITIALIZED, null, false);
} catch (Throwable t) {
}
}
protected abstract void initInternal() throws LifecycleException;
protected abstract void startInternal() throws LifecycleException;
protected abstract void stopInternal() throws LifecycleException;
protected abstract void destroyInternal() throws LifecycleException;
LifecycleState getState() {
return state;
}
String getStateName() {
return getState().toString();
}
protected synchronized void setState(LifecycleState state)
throws LifecycleException {
setStateInternal(state, null, true);
}
private synchronized void setStateInternal(LifecycleState state,
Object data, boolean check) {
if (check) {
//...状态检测
}
this.state = state;
String lifecycleEvent = state.getLifecycleEvent();
if (lifecycleEvent != null) {
fireLifecycleEvent(lifecycleEvent, data);
}
}
}
LifecycleMBeanBase
提供的 JMX 集成功能,可以方便地监控和管理组件。例如,我们可以在 JConsole 或其他 JMX 客户端中查看和操作这些组件的状态。
组件Server
、Service
、Connector
只需要继承LifecycleMBeanBase
即可。
xxxxxxxxxx
StandardServer extends LifecycleMBeanBase implements Server;
StandardService extends LifecycleMBeanBase implements Service;
Connector extends LifecycleMBeanBase;
在Lifecycle
的基础上还需要包含一些其他事件时比如:添加移除子容器、添加移除通道pipelines
。
Engine
、Host
、Context
、Wrapper
这四种都属于容器。
xxxxxxxxxx
protected void initInternal() throws LifecycleException {
BlockingQueue<Runnable> startStopQueue = new LinkedBlockingQueue<>();
startStopExecutor = new ThreadPoolExecutor(
getStartStopThreadsInternal(),
getStartStopThreadsInternal(), 10, TimeUnit.SECONDS,
startStopQueue,
new StartStopThreadFactory(getName() + "-startStop-"));
startStopExecutor.allowCoreThreadTimeOut(true);
super.initInternal();
}
protected synchronized void startInternal() throws LifecycleException {
Container children[] = findChildren();//父类调用子类的start
List<Future<Void>> results = new ArrayList<>();
for (Container child : children) {
results.add(startStopExecutor.submit(new StartChild(child)));
}
for (Future<Void> result : results) {
result.get();
}
if (pipeline instanceof Lifecycle) {
((Lifecycle) pipeline).start();
}
setState(LifecycleState.STARTING);
threadStart();
}
//执行周期性后台任务:如热更新,高版本会变为ScheduleThread
protected void threadStart() {
if (thread != null)
return;
if (backgroundProcessorDelay <= 0)
return;
threadDone = false;
thread = new Thread(new ContainerBackgroundProcessor());
thread.setDaemon(true);
thread.start();
}
Tomcat
脚本启动时,会调用Bootstrap类的main方法。而其start实现如下
xxxxxxxxxx
daemon.setAwait(true);
daemon.load(args);
daemon.start();
xxxxxxxxxx
//Catalina类:
void load() {
//解析conf/server.xml文件
Digester digester = createStartDigester();
inputSource.setByteStream(inputStream);
digester.push(this);
digester.parse(inputSource);
//执行到此,server.xml中配置的所有数据都已经生成对应对象和属性。
getServer().setCatalina(this);
getServer().setCatalinaHome(Bootstrap.getCatalinaHomeFile());
getServer().setCatalinaBase(Bootstrap.getCatalinaBaseFile());
getServer().init();
}
void start() {
if (getServer() == null) {
load();
}
getServer().start();
//监听shutdown
if (await) {
await();
stop();
}
}
其实就是调用Catalina
的load
、start
方法。最后调用StandardServer
的init()
、start()
方法。至此整个生命周期就开始了。下面就可以从StandardServer
开始研究其到底干了什么事,是如何处理客户端请求的。
实现shutdown
服务的监听。
调用StandardService
的init()
方法。
xxxxxxxxxx
protected void initInternal() throws LifecycleException {
super.initInternal();
for (Service service : services) {
service.init();
}
}
调用StandardService
的start()
方法。
xxxxxxxxxx
protected void startInternal() throws LifecycleException {
//通知config_start事件
fireLifecycleEvent(CONFIGURE_START_EVENT, null);
//设置状态为starting
setState(LifecycleState.STARTING);
//调用子类
synchronized (servicesLock) {
for (Service service : services) {
service.start();
}
}
}
管理Engine
、Executor
、MapperListener
、Connector
最大线程数200,核心线程数25
tomcat对其进行了重新设计,使得和Java的线程池有所不同。
tomcat:首先核心线程,到达最大核心线程值启动非核心线程,到达最大线程数值时加入阻塞队列。
Java:首先核心线程,到达最大核心线程值时加入阻塞队列,队列满时启动非核心线程。
为什么这样设计:当核心线程满时,如果客户端请求来了,此时将请求加入阻塞队列,那么会出现大量500、超时。
在startInternal()
方法中会将其加入到所有组件、容器的观察者中,用于监听其创建和销毁。当创建时,将其对应的路径设置到mapper中,以后一个链接过来时,就可以根据mapper中的路径找到对应的Host
、Context
、Wrapper
等。
xxxxxxxxxx
protected void initInternal() throws LifecycleException {
super.initInternal();
if (engine != null) {
engine.init();
}
for (Executor executor : findExecutors()) {
if (executor instanceof JmxEnabled) {
((JmxEnabled) executor).setDomain(getDomain());
}
executor.init();
}
mapperListener.init();
synchronized (connectorsLock) {
for (Connector connector : connectors) {
connector.init();
}
}
}
xxxxxxxxxx
protected void startInternal() throws LifecycleException {
setState(LifecycleState.STARTING);
if (engine != null) {
synchronized (engine) {
engine.start();
}
}
synchronized (executors) {
for (Executor executor: executors) {
executor.start();
}
}
mapperListener.start();
synchronized (connectorsLock) {
for (Connector connector: connectors) {
if (connector.getState() != LifecycleState.FAILED)
{
connector.start();
}
}
}
}
Connector就是使用ProtocolHandler来处理请求的,不同的ProtocolHandler代表不同的连接类型,比如:Http11Protocol使用的是普通Socket来连接的, Http11NioProtocol使用的是NioSocket来连接的。
其中ProtocolHandler包含了三个部件:Endpoint
、Processor
、Adapter
。
Endpoint
用来处理底层Socket的网络连接,Processor
用于将 Endpoint
接收到的Socket封装成Request,Adapter
用于将Request交给 Container进行具体的处理。
Endpoint
由于是处理底层的Socket网络连接,因此Endpoint是用来实现TCP/IP协议的,而Processor
用来实现HTTP协议的,Adapter
将请求适配到Servlet容器进行具体的处理。
Endpoint
的抽象实现AbstractEndpoint
里面定义的Acceptor和AsyncTimeout两个内部类和一个Handler接口。Acceptor用于监听请求,AsyncTimeout用于检查异步Request的超时,Handler用于处理接收到的Socket,在内部调用Processor
进行处理。
其中会调用Http11NioProtocol
的init()
方法。Http11NioProtocol
其包装了网络IO、和HTTP解码等。最终会调用NioEndpoint.init()
方法调用bind
创建ServerSocketChannel
。
xxxxxxxxxx
protected void initInternal() throws LifecycleException {
super.initInternal();
// Initialize adapter
adapter = new CoyoteAdapter(this);
protocolHandler.setAdapter(adapter);
// Http11NioProtocol
protocolHandler.init();
}
调用Http11NioProtocol
的start()
方法。protocolHandler.start()
方法会调用NioEndpoint.start()
方法。最后调用NioEndpoint.startInternal()
。其中会创建专门接收客户端连接的Acceptor
线程组和接收读写请求的Poller
线程组。
xxxxxxxxxx
protected void startInternal() throws LifecycleException {
setState(LifecycleState.STARTING);
protocolHandler.start();
}
xxxxxxxxxx
//NioEndpoint.startInternal()
void startInternal() throws Exception {
if (!running) {
running = true;
// 创建业务线程池
if ( getExecutor() == null ) {
createExecutor();
}
// 创建Poller线程
pollers = new Poller[getPollerThreadCount()];
for (int i=0; i<pollers.length; i++) {
pollers[i] = new Poller();
Thread pollerThread =
new Thread(pollers[i], getName());
pollerThread.setPriority(threadPriority);
pollerThread.setDaemon(true);
pollerThread.start();
}
// 创建Acceptor线程
startAcceptorThreads();
}
}
run
方法:首先控制连接数,如果当前达到最大连接数就会阻塞不在accept()
新的连接,否则接收连接并且setSocketOptions()
方法中设置客户端属性,并且负载均衡获取一个Poller
将其注册到events队列中。
xxxxxxxxxx
protected class Acceptor extends AbstractEndpoint.Acceptor {
void run() {
int errorDelay = 0;
while (running) {
// 被暂停
while (paused && running) {
state = AcceptorState.PAUSED;
try {
Thread.sleep(50);
} catch (InterruptedException e) {
}
}
if (!running) {
break;
}
state = AcceptorState.RUNNING;
try {
//达到最大连接数时阻塞,maxConnections = 10000
countUpOrAwaitConnection();
SocketChannel socket = null;
socket = serverSock.accept();
errorDelay = 0;
if (running && !paused) {
if (!setSocketOptions(socket)) {
closeSocket(socket);
}
} else {
closeSocket(socket);
}
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
}
}
state = AcceptorState.ENDED;
}
}
protected boolean setSocketOptions(SocketChannel socket) {
try {
//设置为非阻塞
socket.configureBlocking(false);
Socket sock = socket.socket();
//设置属性:输入输出缓存区、连接超时时间等
socketProperties.setProperties(sock);
//缓存
NioChannel channel = nioChannels.pop();
if (channel == null) {
SocketBufferHandler bufhandler =
new SocketBufferHandler(
socketProperties.getAppReadBufSize(),
socketProperties.getAppWriteBufSize(),
socketProperties.getDirectBuffer());
if (isSSLEnabled()) {
channel = new SecureNioChannel
(socket, bufhandler, selectorPool, this);
} else {
channel = new NioChannel(socket, bufhandler);
}
} else {
channel.setIOChannel(socket);
channel.reset();
}
//负载均衡,获取一个Poller注册Read事件。
getPoller0().register(channel);
} catch (Throwable t) {
return false;
}
return true;
}
void register(final NioChannel socket) {
socket.setPoller(this);
//在注册到epoll中时,可以设置attach。
//在下次可读可写时,可以获取到。
NioSocketWrapper ka =
new NioSocketWrapper(socket, NioEndpoint.this);
socket.setSocketWrapper(ka);
ka.setPoller(this);
ka.setReadTimeout(getConnectionTimeout());
ka.setWriteTimeout(getConnectionTimeout());
PollerEvent r = eventCache.pop();
ka.interestOps(SelectionKey.OP_READ);
if ( r==null)
r = new PollerEvent(socket,ka,OP_REGISTER);
else r.reset(socket,ka,OP_REGISTER);
addEvent(r);
}
private void addEvent(PollerEvent event) {
events.offer(event);
//返回最新值,也即wakeupCounter=-1时,唤醒
if ( wakeupCounter.incrementAndGet() == 0 )
selector.wakeup();
}
run
方法:处理events
中的事件,即向epoll中注册read事件。selector.select
获取已经准备好的事件集,然后根据事件类型processSocket
进行处理。
wakeupCounter:
大于0:表示有新的事件需要处理,不能阻塞select
等于0:表示已经select
完成
等于-1:表示正在超时select(selectorTimeout)
xxxxxxxxxx
void run() {
while (true) {
//是否有向epoll中注册的事件
boolean hasEvents = false;
try {
if (!close) {
//处理events中的事件,即向epoll中注册read事件
hasEvents = events();
//设置为-1,返回之前值
if (wakeupCounter.getAndSet(-1) > 0) {
keyCount = selector.selectNow();
} else {
keyCount = selector.select(selectorTimeout);
}
wakeupCounter.set(0);
}
if (close) {
events();
timeout(0, false);
selector.close();
break;
}
} catch (Throwable x) {
continue;
}
if ( keyCount == 0 )
hasEvents = (hasEvents | events());
Iterator<SelectionKey> iterator = keyCount > 0 ?
selector.selectedKeys().iterator() : null;
//处理准备好的事件
while (iterator != null && iterator.hasNext()) {
SelectionKey sk = iterator.next();
//Acceptor中设置
NioSocketWrapper attachment =
(NioSocketWrapper)sk.attachment();
if (attachment == null) {
iterator.remove();
} else {
iterator.remove();
processKey(sk, attachment); //处理读写事件
}
}
timeout(keyCount,hasEvents);
}
//关闭时break,此时减少数量
getStopLatch().countDown();
}
protected void processKey(SelectionKey sk,
NioSocketWrapper attachment) {
try {
if ( close ) {
cancelledKey(sk);
} else if ( sk.isValid() && attachment != null ) {
//直接sendFile,减少内核用户态之间内存复制
if ( attachment.getSendfileData() != null ) {
processSendfile(sk,attachment, false);
} else {
boolean closeSocket = false;
if (sk.isReadable()) {
if (!processSocket(attachment,
SocketEvent.OPEN_READ, true)) {
closeSocket = true;
}
}
//写事件忽略,因为在此不涉及写,写时使用其他epoll
if (!closeSocket && sk.isWritable()) {
if (!processSocket(attachment,
SocketEvent.OPEN_WRITE, true)) {
closeSocket = true;
}
}
if (closeSocket) {
cancelledKey(sk);
}
}
} else {
cancelledKey(sk);
}
} catch ( CancelledKeyException ckx ) {
cancelledKey(sk);
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
}
}
boolean processSocket(SocketWrapperBase<S> socketWrapper,
SocketEvent event, boolean dispatch) {
try {
if (socketWrapper == null) {
return false;
}
SocketProcessorBase<S> sc = processorCache.pop();
if (sc == null) {
//创建SocketProcessor
sc = createSocketProcessor(socketWrapper, event);
} else {
sc.reset(socketWrapper, event);
}
//使用业务线程池异步执行,减少poller线程压力
Executor executor = getExecutor();
if (dispatch && executor != null) {
executor.execute(sc);
} else {
sc.run();
}
} catch (RejectedExecutionException ree) {
return false;
} catch (Throwable t) {
return false;
}
return true;
}
如果已经完成三次握手,则开始进入连接的生命周期即ConnectionHandler
类中
xxxxxxxxxx
protected void doRun() {
NioChannel socket = socketWrapper.getSocket();
try {
int handshake = -1;
//...是否三次握手成功
if (handshake == 0) {
SocketState state = SocketState.OPEN;
state = getHandler().process(socketWrapper, event);
if (state == SocketState.CLOSED) {
close(socket, key);
}
} else if (handshake == -1 ) {
close(socket, key);
}
}catch (Throwable t) {
socket.getPoller().cancelledKey(key);
} finally {
socketWrapper = null;
event = null;
if (running && !paused) {
processorCache.push(this);
}
}
}
创建Http11Processor
,并调用process
方法
xxxxxxxxxx
protected static class ConnectionHandler<S>
implements AbstractEndpoint.Handler<S> {
SocketState process(SocketWrapperBase<S> wrapper,
SocketEvent status) {
if (wrapper == null) {
return SocketState.CLOSED;
}
S socket = wrapper.getSocket();
Processor processor = connections.get(socket);
try {
if (processor == null) { // 先取缓存
processor = recycledProcessors.pop();
}
if (processor == null) { // 创建
processor = getProtocol().createProcessor();
}
connections.put(socket, processor);
SocketState state = SocketState.CLOSED;
do {
//核心其中会调用Http11Processor.service
state = processor.process(wrapper, status);
if (state == SocketState.UPGRADING) {
//协议升级
}
} while ( state == SocketState.UPGRADING);
return SocketState.CLOSED;
}
}
xxxxxxxxxx
protected Processor createProcessor() {
Http11Processor processor = new Http11Processor(this, getEndpoint());
processor.setAdapter(getAdapter());
processor.setMaxKeepAliveRequests(getMaxKeepAliveRequests());
processor.setConnectionUploadTimeout(getConnectionUploadTimeout());
processor.setDisableUploadTimeout(getDisableUploadTimeout());
processor.setRestrictedUserAgents(getRestrictedUserAgents());
processor.setMaxSavePostSize(getMaxSavePostSize());
return processor;
}
解析http,并且检测并设置错误码400、500、502,调用适配器,请求开始进入Tomcat
的容器
xxxxxxxxxx
SocketState service(SocketWrapperBase<?> socketWrapper)
throws IOException {
// 解析HTTP
rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
getAdapter().service(request, response);
}
封装Tomcat
的Request
、Response
为HttpServletRequest
、HttpServletResponse
。最后调用Engine
的Pipeline
。Pipeline
工作原理下文介绍。
xxxxxxxxxx
void service(org.apache.coyote.Request req,
org.apache.coyote.Response res)
throws Exception {
Request request = (Request) req.getNote(ADAPTER_NOTES);
Response response = (Response) res.getNote(ADAPTER_NOTES);
if (request == null) {
// Create objects
request = connector.createRequest();
request.setCoyoteRequest(req);
response = connector.createResponse();
response.setCoyoteResponse(res);
// Link objects
request.setResponse(response);
response.setRequest(request);
// Set as notes
req.setNote(ADAPTER_NOTES, request);
res.setNote(ADAPTER_NOTES, response);
}
//设置request.mappingData中的属性:
//包含对应的Host、contextPath、context、wrapper
postParseRequest(req, request, res, response);
// Calling the container
connector.getService().getContainer().
getPipeline().getFirst().invoke(request, response);
}
至此我们知道,Tomcat
如何接收、处理请求:
在Connector
的start
生命周期中,创建Acceptor
、Poller
线程组,并执行run
方法。
在Acceptor
中接收客户端连接并将其负载均衡到Poller
中的events
队列中。
Poller
在循环中首先将events
中的事件注册到自己的epoll
中。
Poller
开始select
已经准备好的请求,并且创建SocketProcessor
将其放入线程池中异步执行。
SocketProcessor
的run
方法会调用ConnectionHandler
开始处理请求的生命周期包含资源的释放等。
ConnectionHandler
最后调用Http11Processor
将请求解析,最后调用CoyoteAdapter
。
CoyoteAdapter
将请求封装为HttpServletRequest
、HttpServletResponse
。传入Engine
容器的Pipeline
中。
总体流程如上,只需要研究StandardWrapperValve
干了什么即可。
首先通过所有Filter
,之后调用servlet.service
,根据请求类型走不同的方法如:doPost
、doGet
、doDelete
xxxxxxxxxx
final void invoke(Request request, Response response)
throws IOException, ServletException {
StandardWrapper wrapper = (StandardWrapper) getContainer();
Servlet servlet = null;
Context context = (Context) wrapper.getParent();
try {
if (!unavailable) {
//通过反射方式创建用户定义的servlet
servlet = wrapper.allocate();
}
}catch (Throwable e) {
exception(request, response, e);
servlet = null;
}
ApplicationFilterChain filterChain = ApplicationFilterFactory
.createFilterChain(request, wrapper, servlet);
filterChain.doFilter(request.getRequest(),response.getResponse());
}
xxxxxxxxxx
// filterChain.doFilter
private void internalDoFilter(ServletRequest request,
ServletResponse response) {
if (pos < n) {
ApplicationFilterConfig filterConfig = filters[pos++];
try {
Filter filter = filterConfig.getFilter();
filter.doFilter(request, response, this);
} catch (Throwable e) {
}
return;
}
try {
//最后调用service方法。
servlet.service(request, response);
} catch (Throwable e) {
} finally {
if (ApplicationDispatcher.WRAP_SAME_OBJECT) {
lastServicedRequest.set(null);
lastServicedResponse.set(null);
}
}
}
上一节,主要讲了一个Tomcat如何获取读请求到封装Servlet的整个过程,我们发现在Epoll中,写请求未作任何处理,主要是因为写请求不Poller中的epoll中进行检测,而是有自己独立的Epoll,这一节就主要来研究其工作原理。
xxxxxxxxxx
response.getOutputStream().write(jsonData);
我们一般使用这种方式会写数据,接下来主要研究其底层到底干了什么。
方法返回了一个CoyoteOutputStream对象,其中封装了Java nio的操作。将数据写入堆缓存中。主要是mark、position、limit、capacity四个变量。就不详细解释了,主要看flush是干了什么。
xxxxxxxxxx
public ServletOutputStream getOutputStream() {
usingOutputStream = true;
if (outputStream == null) {
outputStream = new CoyoteOutputStream(outputBuffer);
}
return outputStream;
}
//CoyoteOutputStream类
public void flush() throws IOException {
//调用OutputBuffer的flush方法
ob.flush();
}
xxxxxxxxxx
public void flush() throws IOException {
doFlush(true);
}
protected void doFlush(boolean realFlush) throws IOException {
try {
doFlush = true;
if (cb.remaining() > 0) {
flushCharBuffer(); //字节方式
}
if (bb.remaining() > 0) {
flushByteBuffer(); //流方式
}
} finally {
doFlush = false;
}
if (realFlush) {
// 调用flush方法
coyoteResponse.action(ActionCode.CLIENT_FLUSH, null);
}
}
private void flushByteBuffer() throws IOException {
//slice:创建一个新的byteBuffer,共享原来的地址空间。重新设置三个变量即可。
realWriteBytes(bb.slice());
clear(bb);
}
public void realWriteBytes(ByteBuffer buf) throws IOException {
//tomcat的Response对象,在CoyoteAdapter.service方法中设置的
if (coyoteResponse == null) {
return;
}
if (buf.remaining() > 0) {
coyoteResponse.doWrite(buf);
}
}
最终会调用NioEndpoint
的doWrite
方法。
xxxxxxxxxx
protected void doWrite(boolean block, ByteBuffer from) throws IOException {
long writeTimeout = getWriteTimeout();
Selector selector = null;
try {
//选择一个epoll,用于注册写事件
selector = pool.get();
} catch (IOException x) {
}
try {
pool.write(from, getSocket(), selector, writeTimeout, block);
updateLastWrite();
} finally {
if (selector != null) {
pool.put(selector);
}
}
}
如果网络的写缓冲区满了不可写了,那么就需要特殊处理,将注册一个epoll的写事件,等到可写时再进行写入操作。
xxxxxxxxxx
public int write(ByteBuffer buf, NioChannel socket, Selector selector,
long writeTimeout, boolean block) throws IOException {
if ( SHARED && block ) {
return blockingSelector.write(buf,socket,writeTimeout);
}
SelectionKey key = null;
int written = 0;
boolean timedout = false;
int keycount = 1;
long time = System.currentTimeMillis();
try {
while ( (!timedout) && buf.hasRemaining() ) {
int cnt = 0;
if ( keycount > 0 ) {
cnt = socket.write(buf);
if (cnt == -1) throw new EOFException();
written += cnt;
if (cnt > 0) { //写入成功,继续
time = System.currentTimeMillis();
continue;
}
if (cnt==0 && (!block)) break;
}
if ( selector != null ) {
if (key==null)
key = socket.getIOChannel().register(selector, SelectionKey.OP_WRITE);
else key.interestOps(SelectionKey.OP_WRITE);
if (writeTimeout==0) {
timedout = buf.hasRemaining();
} else if (writeTimeout<0) {
keycount = selector.select();
} else {
keycount = selector.select(writeTimeout);
}
}
if (writeTimeout > 0 && (selector == null || keycount == 0) )
timedout = (System.currentTimeMillis()-time)>=writeTimeout;
}//while
if ( timedout ) throw new SocketTimeoutException();
} finally {
if (key != null) {
key.cancel();
if (selector != null) selector.selectNow();
}
}
return written;
}