加载连接
ConnectionManagerImpl
public ConnectionManagerImpl() throws IOException
{
super("Connection Manager");
InetAddress bindAddress = null;
InetAddress adminConsoleBindAddress = null;
try
{
bindAddress = getListenAddress();
}
catch ( UnknownHostException e )
{
Log.warn( "Unable to resolve bind address: ", e );
}
try
{
adminConsoleBindAddress = getAdminConsoleListenAddress();
if( adminConsoleBindAddress == null )
{
adminConsoleBindAddress = bindAddress;
}
}
catch( UnknownHostException e )
{
Log.warn( "Unable to resolve admin console bind address: ", e );
}
final CertificateStoreManager certificateStoreManager = XMPPServer.getInstance().getCertificateStoreManager();
// client-to-server
clientListener = new ConnectionListener(
ConnectionType.SOCKET_C2S,
ConnectionSettings.Client.PORT,
DEFAULT_PORT,
ConnectionSettings.Client.SOCKET_ACTIVE,
ConnectionSettings.Client.MAX_THREADS,
ConnectionSettings.Client.MAX_READ_BUFFER,
ConnectionSettings.Client.TLS_POLICY,
ConnectionSettings.Client.AUTH_PER_CLIENTCERT_POLICY,
bindAddress,
certificateStoreManager.getIdentityStoreConfiguration( ConnectionType.SOCKET_C2S ),
certificateStoreManager.getTrustStoreConfiguration( ConnectionType.SOCKET_C2S ),
ConnectionSettings.Client.COMPRESSION_SETTINGS
);
clientSslListener = new ConnectionListener(
ConnectionType.SOCKET_C2S,
ConnectionSettings.Client.OLD_SSLPORT,
DEFAULT_SSL_PORT,
ConnectionSettings.Client.ENABLE_OLD_SSLPORT,
ConnectionSettings.Client.MAX_THREADS_SSL,
ConnectionSettings.Client.MAX_READ_BUFFER_SSL,
Connection.TLSPolicy.legacyMode.name(), // force legacy mode
ConnectionSettings.Client.AUTH_PER_CLIENTCERT_POLICY,
bindAddress,
certificateStoreManager.getIdentityStoreConfiguration( ConnectionType.SOCKET_C2S ),
certificateStoreManager.getTrustStoreConfiguration( ConnectionType.SOCKET_C2S ),
ConnectionSettings.Client.COMPRESSION_SETTINGS
);
// BOSH / HTTP-bind
boshListener = new ConnectionListener(
ConnectionType.BOSH_C2S,
HttpBindManager.HTTP_BIND_PORT,
HttpBindManager.HTTP_BIND_PORT_DEFAULT,
HttpBindManager.HTTP_BIND_ENABLED, // TODO this one property enables/disables both normal and legacymode port. Should be separated into two.
HttpBindManager.HTTP_BIND_THREADS,
null,
Connection.TLSPolicy.disabled.name(), // StartTLS over HTTP? Should use boshSslListener instead.
HttpBindManager.HTTP_BIND_AUTH_PER_CLIENTCERT_POLICY,
bindAddress,
certificateStoreManager.getIdentityStoreConfiguration( ConnectionType.BOSH_C2S ),
certificateStoreManager.getTrustStoreConfiguration( ConnectionType.BOSH_C2S ),
ConnectionSettings.Client.COMPRESSION_SETTINGS // Existing code re-used the generic client compression property. Should we have a BOSH-specific one?
);
boshSslListener = new ConnectionListener(
ConnectionType.BOSH_C2S,
HttpBindManager.HTTP_BIND_SECURE_PORT,
HttpBindManager.HTTP_BIND_SECURE_PORT_DEFAULT,
HttpBindManager.HTTP_BIND_ENABLED, // TODO this one property enables/disables both normal and legacymode port. Should be separated into two.
HttpBindManager.HTTP_BIND_THREADS,
null,
Connection.TLSPolicy.legacyMode.name(),
HttpBindManager.HTTP_BIND_AUTH_PER_CLIENTCERT_POLICY,
bindAddress,
certificateStoreManager.getIdentityStoreConfiguration( ConnectionType.BOSH_C2S ),
certificateStoreManager.getTrustStoreConfiguration( ConnectionType.BOSH_C2S ),
ConnectionSettings.Client.COMPRESSION_SETTINGS // Existing code re-used the generic client compression property. Should we have a BOSH-specific one?
);
// server-to-server (federation)
serverListener = new ConnectionListener(
ConnectionType.SOCKET_S2S,
ConnectionSettings.Server.PORT,
DEFAULT_SERVER_PORT,
ConnectionSettings.Server.SOCKET_ACTIVE,
"xmpp.server.processing.threads",
null,
ConnectionSettings.Server.TLS_POLICY,
ConnectionSettings.Server.AUTH_PER_CLIENTCERT_POLICY,
bindAddress,
certificateStoreManager.getIdentityStoreConfiguration( ConnectionType.SOCKET_S2S ),
certificateStoreManager.getTrustStoreConfiguration( ConnectionType.SOCKET_S2S ),
ConnectionSettings.Server.COMPRESSION_SETTINGS
);
// external components (XEP 0114)
componentListener = new ConnectionListener(
ConnectionType.COMPONENT,
ConnectionSettings.Component.PORT,
DEFAULT_COMPONENT_PORT,
ConnectionSettings.Component.SOCKET_ACTIVE,
ConnectionSettings.Component.MAX_THREADS,
null,
ConnectionSettings.Component.TLS_POLICY,
ConnectionSettings.Component.AUTH_PER_CLIENTCERT_POLICY,
bindAddress,
certificateStoreManager.getIdentityStoreConfiguration( ConnectionType.COMPONENT ),
certificateStoreManager.getTrustStoreConfiguration( ConnectionType.COMPONENT ),
ConnectionSettings.Component.COMPRESSION_SETTINGS
);
componentSslListener = new ConnectionListener(
ConnectionType.COMPONENT,
ConnectionSettings.Component.OLD_SSLPORT,
DEFAULT_COMPONENT_SSL_PORT,
ConnectionSettings.Component.ENABLE_OLD_SSLPORT,
ConnectionSettings.Component.MAX_THREADS_SSL,
null,
Connection.TLSPolicy.legacyMode.name(), // force legacy mode
ConnectionSettings.Component.AUTH_PER_CLIENTCERT_POLICY,
bindAddress,
certificateStoreManager.getIdentityStoreConfiguration( ConnectionType.COMPONENT ),
certificateStoreManager.getTrustStoreConfiguration( ConnectionType.COMPONENT ),
ConnectionSettings.Component.COMPRESSION_SETTINGS
);
// Multiplexers (our propertietary connection manager implementation)
connectionManagerListener = new ConnectionListener(
ConnectionType.CONNECTION_MANAGER,
ConnectionSettings.Multiplex.PORT,
DEFAULT_MULTIPLEX_PORT,
ConnectionSettings.Multiplex.SOCKET_ACTIVE,
ConnectionSettings.Multiplex.MAX_THREADS,
null,
ConnectionSettings.Multiplex.TLS_POLICY,
ConnectionSettings.Multiplex.AUTH_PER_CLIENTCERT_POLICY,
bindAddress,
certificateStoreManager.getIdentityStoreConfiguration( ConnectionType.CONNECTION_MANAGER ),
certificateStoreManager.getTrustStoreConfiguration( ConnectionType.CONNECTION_MANAGER ),
ConnectionSettings.Multiplex.COMPRESSION_SETTINGS
);
connectionManagerSslListener = new ConnectionListener(
ConnectionType.CONNECTION_MANAGER,
ConnectionSettings.Multiplex.OLD_SSLPORT,
DEFAULT_MULTIPLEX_SSL_PORT,
ConnectionSettings.Multiplex.ENABLE_OLD_SSLPORT,
ConnectionSettings.Multiplex.MAX_THREADS_SSL,
null,
Connection.TLSPolicy.legacyMode.name(), // force legacy mode
ConnectionSettings.Multiplex.AUTH_PER_CLIENTCERT_POLICY,
bindAddress,
certificateStoreManager.getIdentityStoreConfiguration( ConnectionType.CONNECTION_MANAGER ),
certificateStoreManager.getTrustStoreConfiguration( ConnectionType.CONNECTION_MANAGER ),
ConnectionSettings.Multiplex.COMPRESSION_SETTINGS
);
// Admin console (the Openfire web-admin) // TODO these use the XML properties instead of normal properties!
webAdminListener = new ConnectionListener(
ConnectionType.WEBADMIN,
"adminConsole.port",
9090,
null,
"adminConsole.serverThreads",
null,
Connection.TLSPolicy.disabled.name(), // StartTLS over HTTP? Should use webAdminSslListener instead.
null,
adminConsoleBindAddress,
certificateStoreManager.getIdentityStoreConfiguration( ConnectionType.WEBADMIN ),
certificateStoreManager.getTrustStoreConfiguration( ConnectionType.WEBADMIN ),
null // Should we have compression on the admin console?
);
webAdminSslListener = new ConnectionListener(
ConnectionType.WEBADMIN,
"adminConsole.securePort",
9091,
null,
"adminConsole.serverThreads",
null,
Connection.TLSPolicy.legacyMode.name(),
null,
adminConsoleBindAddress,
certificateStoreManager.getIdentityStoreConfiguration( ConnectionType.WEBADMIN ),
certificateStoreManager.getTrustStoreConfiguration( ConnectionType.WEBADMIN ),
null // Should we have compression on the admin console?
);
}
public void start() {
super.start();
// 启动ConnectionListener
startListeners();
SocketSendingTracker.getInstance().start();
CertificateManager.addListener(this);
}
ConnectionListener
public synchronized void start()
{
// TODO Start all connection types here, by supplying more connection acceptors other than a MINA-based one.
switch ( getType() )
{
case BOSH_C2S:
case WEBADMIN:
Log.debug( "Not starting a (MINA-based) connection acceptor, as connections of type " + getType() + " depend on another IO technology.");
return;
default:
}
if ( !isEnabled() )
{
Log.debug( "Not starting: disabled by configuration." );
return;
}
if ( connectionAcceptor != null )
{
// This might indicate an illegal state. Legacy code allows for this, so we won't throw a runtime exception (for now).
if ( !connectionAcceptor.isIdle() )
{
Log.warn( "Unable to start: it appears to have already been started (and it is currently serving connections)! To restart, first stop this listener explicitly." );
return;
}
else
{
Log.warn( "Stopping (in order to restart) an instance that has already been started, but is idle. This start would have failed if the listener was not idle. The implementation should have called stop() or restart() first, to ensure a clean restart!" );
connectionAcceptor.stop();
}
}
Log.debug( "Starting..." );
if ( getType() == ConnectionType.SOCKET_S2S )
{
// 已废弃
connectionAcceptor = new LegacyConnectionAcceptor( generateConnectionConfiguration() );
}
else
{
// 初始化MINAConnectionAcceptor
connectionAcceptor = new MINAConnectionAcceptor( generateConnectionConfiguration() );
}
// 启动 MINAConnectionAcceptor
connectionAcceptor.start();
Log.info( "Started." );
}
MINAConnectionAcceptor
public MINAConnectionAcceptor( ConnectionConfiguration configuration )
{
super( configuration );
this.name = configuration.getType().toString().toLowerCase() + ( configuration.getTlsPolicy() == Connection.TLSPolicy.legacyMode ? "_ssl" : "" );
Log = LoggerFactory.getLogger( MINAConnectionAcceptor.class.getName() + "[" + name + "]" );
switch ( configuration.getType() )
{
case SOCKET_S2S: // server to server
connectionHandler = new ServerConnectionHandler( configuration );
break;
case SOCKET_C2S: // client to server
// ClientConnectionHandler继承了ConnectionHandler
// ConnectionHandler继承了IoHandlerAdapter
// MINA的框架将消息交给IoHandlerAdapter处理
connectionHandler = new ClientConnectionHandler( configuration );
break;
case COMPONENT:
connectionHandler = new ComponentConnectionHandler( configuration );
break;
case CONNECTION_MANAGER:
connectionHandler = new MultiplexerConnectionHandler( configuration );
break;
default:
throw new IllegalStateException( "This implementation does not support the connection type as defined in the provided configuration: " + configuration.getType() );
}
this.encryptionArtifactFactory = new EncryptionArtifactFactory( configuration );
}
public synchronized void start()
{
if ( socketAcceptor != null )
{
Log.warn( "Unable to start acceptor (it is already started!)" );
return;
}
try
{
// Configure the thread pool that is to be used.
final int initialSize = ( configuration.getMaxThreadPoolSize() / 4 ) + 1;
final ExecutorFilter executorFilter = new ExecutorFilter( initialSize, configuration.getMaxThreadPoolSize(), 60, TimeUnit.SECONDS );
final ThreadPoolExecutor eventExecutor = (ThreadPoolExecutor) executorFilter.getExecutor();
final ThreadFactory threadFactory = new NamedThreadFactory( name + "-thread-", eventExecutor.getThreadFactory(), true, null );
eventExecutor.setThreadFactory( threadFactory );
// Construct a new socket acceptor, and configure it.
socketAcceptor = buildSocketAcceptor();
if ( JMXManager.isEnabled() )
{
configureJMX( socketAcceptor, name );
}
final DefaultIoFilterChainBuilder filterChain = socketAcceptor.getFilterChain();
filterChain.addFirst( ConnectionManagerImpl.EXECUTOR_FILTER_NAME, executorFilter );
// Add the XMPP codec filter
filterChain.addAfter( ConnectionManagerImpl.EXECUTOR_FILTER_NAME, ConnectionManagerImpl.XMPP_CODEC_FILTER_NAME, new ProtocolCodecFilter( new XMPPCodecFactory() ) );
// Kill sessions whose outgoing queues keep growing and fail to send traffic
filterChain.addAfter( ConnectionManagerImpl.XMPP_CODEC_FILTER_NAME, ConnectionManagerImpl.CAPACITY_FILTER_NAME, new StalledSessionsFilter() );
// Ports can be configured to start connections in SSL (as opposed to upgrade a non-encrypted socket to an encrypted one, typically using StartTLS)
if ( configuration.getTlsPolicy() == Connection.TLSPolicy.legacyMode )
{
final SslFilter sslFilter = encryptionArtifactFactory.createServerModeSslFilter();
filterChain.addAfter( ConnectionManagerImpl.EXECUTOR_FILTER_NAME, ConnectionManagerImpl.TLS_FILTER_NAME, sslFilter );
}
// Throttle sessions who send data too fast
if ( configuration.getMaxBufferSize() > 0 )
{
socketAcceptor.getSessionConfig().setMaxReadBufferSize( configuration.getMaxBufferSize() );
Log.debug( "Throttling read buffer for connections to max={} bytes", configuration.getMaxBufferSize() );
}
// Start accepting connections
// 添加connectionHandler处理消息
socketAcceptor.setHandler( connectionHandler );
// 启动网络监听
socketAcceptor.bind( new InetSocketAddress( configuration.getBindAddress(), configuration.getPort() ) );
}
catch ( Exception e )
{
System.err.println( "Error starting " + configuration.getPort() + ": " + e.getMessage() );
Log.error( "Error starting: " + configuration.getPort(), e );
// Reset for future use.
if (socketAcceptor != null) {
try {
socketAcceptor.unbind();
} finally {
socketAcceptor = null;
}
}
}
}
Last updated