消息处理

ConnectionHandler

ConnectionHandler.messageReceived()
public void messageReceived(IoSession session, Object message) throws Exception {
	// Get the stanza handler for this session
	StanzaHandler handler = (StanzaHandler) session.getAttribute(HANDLER);
	// Get the parser to use to process stanza. For optimization there is going
	// to be a parser for each running thread. Each Filter will be executed
	// by the Executor placed as the first Filter. So we can have a parser associated
	// to each Thread
	final XMPPPacketReader parser = PARSER_CACHE.get();
	// Update counter of read btyes
	updateReadBytesCounter(session);
	//System.out.println("RCVD: " + message);
	// Let the stanza handler process the received stanza
	try {
		// 处理消息
		handler.process((String) message, parser);
	} catch (Exception e) {
		Log.error("Closing connection due to error while processing message: " + message, e);
		final Connection connection = (Connection) session.getAttribute(CONNECTION);
		if ( connection != null ) {
			connection.close();
		}

	}
}

StanzaHandler

StanzaHandler.process()
// 解析XML
public void process(String stanza, XMPPPacketReader reader) throws Exception {
	boolean initialStream = stanza.startsWith("<stream:stream") || stanza.startsWith("<flash:stream");
	if (!sessionCreated || initialStream) {
		if (!initialStream) {
			// Allow requests for flash socket policy files directly on the client listener port
			if (stanza.startsWith("<policy-file-request/>")) {
				String crossDomainText = FlashCrossDomainServlet.CROSS_DOMAIN_TEXT +
						XMPPServer.getInstance().getConnectionManager().getClientListenerPort() +
						FlashCrossDomainServlet.CROSS_DOMAIN_END_TEXT + '\0';
				connection.deliverRawText(crossDomainText);
				return;
			}
			else {
				// Ignore <?xml version="1.0"?>
				return;
			}
		}
		// Found an stream:stream tag...
		if (!sessionCreated) {
			sessionCreated = true;
			MXParser parser = reader.getXPPParser();
			parser.setInput(new StringReader(stanza));
			createSession(parser);
		}
		else if (startedTLS) {
			startedTLS = false;
			tlsNegotiated();
		}
		else if (startedSASL && saslStatus == SASLAuthentication.Status.authenticated) {
			startedSASL = false;
			saslSuccessful();
		}
		else if (waitingCompressionACK) {
			waitingCompressionACK = false;
			compressionSuccessful();
		}
		return;
	}

	// Verify if end of stream was requested
	if (stanza.equals("</stream:stream>")) {
		if (session != null) {
			session.getStreamManager().formalClose();
			Log.debug( "Closing session as an end-of-stream was received: {}", session );
			session.close();
		}
		return;
	}
	// Ignore <?xml version="1.0"?> stanzas sent by clients
	if (stanza.startsWith("<?xml")) {
		return;
	}
	// Create DOM object from received stanza
	Element doc = reader.read(new StringReader(stanza)).getRootElement();
	if (doc == null) {
		// No document found.
		return;
	}
	String tag = doc.getName();
	if ("starttls".equals(tag)) {
		// Negotiate TLS
		if (negotiateTLS()) {
			startedTLS = true;
		}
		else {
			connection.close();
			session = null;
		}
	}
	else if ("auth".equals(tag)) {
		// User is trying to authenticate using SASL
		startedSASL = true;
		// Process authentication stanza
		saslStatus = SASLAuthentication.handle(session, doc);
	} else if (startedSASL && "response".equals(tag) || "abort".equals(tag)) {
		// User is responding to SASL challenge. Process response
		saslStatus = SASLAuthentication.handle(session, doc);
	}
	else if ("compress".equals(tag)) {
		// Client is trying to initiate compression
		if (compressClient(doc)) {
			// Compression was successful so open a new stream and offer
			// resource binding and session establishment (to client sessions only)
			waitingCompressionACK = true;
		}
	} else if (isStreamManagementStanza(doc)) {
		session.getStreamManager().process( doc );
	}
	else {
		// 处理解析完毕的消息
		process(doc);
	}
}
StanzaHandler.process()
private void process(Element doc) throws UnauthorizedException {
	if (doc == null) {
		return;
	}

	// Ensure that connection was secured if TLS was required
	if (connection.getTlsPolicy() == Connection.TLSPolicy.required &&
			!connection.isSecure()) {
		closeNeverSecuredConnection();
		return;
	}

	String tag = doc.getName();
	if ("error".equals(tag)) {
		Log.info("The stream is being closed by the peer, which sent this stream error: " + doc.asXML());
		session.close();
	}
	else if ("message".equals(tag)) {	// message类型的消息
		Message packet;
		try {
			packet = new Message(doc, !validateJIDs());
		}
		catch (IllegalArgumentException e) {
			Log.debug("Rejecting packet. JID malformed", e);
			// The original packet contains a malformed JID so answer with an error.
			Message reply = new Message();
			reply.setID(doc.attributeValue("id"));
			reply.setTo(session.getAddress());
			reply.getElement().addAttribute("from", doc.attributeValue("to"));
			reply.setError(PacketError.Condition.jid_malformed);
			session.process(reply);
			return;
		}
		Log.info("message ================= " + packet.toString());
		processMessage(packet);
	}
	else if ("presence".equals(tag)) {		// presence类型的消息
		Presence packet;
		try {
			packet = new Presence(doc, !validateJIDs());
		}
		catch (IllegalArgumentException e) {
			Log.debug("Rejecting packet. JID malformed", e);
			// The original packet contains a malformed JID so answer an error
			Presence reply = new Presence();
			reply.setID(doc.attributeValue("id"));
			reply.setTo(session.getAddress());
			reply.getElement().addAttribute("from", doc.attributeValue("to"));
			reply.setError(PacketError.Condition.jid_malformed);
			session.process(reply);
			return;
		}
		// Check that the presence type is valid. If not then assume available type
		try {
			packet.getType();
		}
		catch (IllegalArgumentException e) {
			Log.warn("Invalid presence type", e);
			// The presence packet contains an invalid presence type so replace it with
			// an available presence type
			packet.setType(null);
		}
		// Check that the presence show is valid. If not then assume available show value
		try {
			packet.getShow();
		}
		catch (IllegalArgumentException e) {
			Log.debug("Invalid presence show for -" + packet.toXML(), e);
			// The presence packet contains an invalid presence show so replace it with
			// an available presence show
			packet.setShow(null);
		}
		if (session.getStatus() == Session.STATUS_CLOSED && packet.isAvailable()) {
			// Ignore available presence packets sent from a closed session. A closed
			// session may have buffered data pending to be processes so we want to ignore
			// just Presences of type available
			Log.warn("Ignoring available presence packet of closed session: " + packet);
			return;
		}
		processPresence(packet);
	}
	else if ("iq".equals(tag)) {			 // iq类型的消息
		IQ packet;
		try {
			packet = getIQ(doc);
		}
		catch (IllegalArgumentException e) {
			Log.debug("Rejecting packet. JID malformed", e);
			// The original packet contains a malformed JID so answer an error
			IQ reply = new IQ();
			if (!doc.elements().isEmpty()) {
				reply.setChildElement(((Element)doc.elements().get(0)).createCopy());
			}
			reply.setID(doc.attributeValue("id"));
			reply.setTo(session.getAddress());
			if (doc.attributeValue("to") != null) {
				reply.getElement().addAttribute("from", doc.attributeValue("to"));
			}
			reply.setError(PacketError.Condition.jid_malformed);
			session.process(reply);
			return;
		}
		if (packet.getID() == null && JiveGlobals.getBooleanProperty("xmpp.server.validation.enabled", false)) {
			// IQ packets MUST have an 'id' attribute so close the connection
			Log.debug( "Closing session, as it sent us an IQ packet that has no ID attribute: {}. Affected session: {}", packet.toXML(), session );
			StreamError error = new StreamError(StreamError.Condition.invalid_xml);
			session.deliverRawText(error.toXML());
			session.close();
			return;
		}
		processIQ(packet);
	}
	else {
		if (!processUnknowPacket(doc)) {
			Log.warn(LocaleUtils.getLocalizedString("admin.error.packet.tag") + doc.asXML() + ". Closing session: " + session);
			session.close();
		}
	}
}

StanzaHandler. processMessage() => PacketRouterImpl.route() => MessageRouter.route() => LocalSession.process => LocalClientSession.deliver() => NIOConnection.deliver()

NIOConnection

NIOConnection.deliver()
public void deliver(Packet packet) throws UnauthorizedException {
	if (isClosed()) {
		backupDeliverer.deliver(packet);
	}
	else {
		boolean errorDelivering = false;
		IoBuffer buffer = IoBuffer.allocate(4096);
		buffer.setAutoExpand(true);
		try {
			buffer.putString(packet.getElement().asXML(), encoder.get());
			if (flashClient) {
				buffer.put((byte) '\0');
			}
			buffer.flip();
			
			ioSessionLock.lock();
			try {
				ioSession.write(buffer);
			} finally {
				ioSessionLock.unlock();
			}
		}
		catch (Exception e) {
			Log.debug("Error delivering packet:\n" + packet, e);
			errorDelivering = true;
		}
		if (errorDelivering) {
			close();
			// Retry sending the packet again. Most probably if the packet is a
			// Message it will be stored offline
			backupDeliverer.deliver(packet);
		}
		else {
			session.incrementServerPacketCount();
		}
	}
}

Last updated