消息处理
ConnectionHandler
public void messageReceived(IoSession session, Object message) throws Exception {
// Get the stanza handler for this session
StanzaHandler handler = (StanzaHandler) session.getAttribute(HANDLER);
// Get the parser to use to process stanza. For optimization there is going
// to be a parser for each running thread. Each Filter will be executed
// by the Executor placed as the first Filter. So we can have a parser associated
// to each Thread
final XMPPPacketReader parser = PARSER_CACHE.get();
// Update counter of read btyes
updateReadBytesCounter(session);
//System.out.println("RCVD: " + message);
// Let the stanza handler process the received stanza
try {
// 处理消息
handler.process((String) message, parser);
} catch (Exception e) {
Log.error("Closing connection due to error while processing message: " + message, e);
final Connection connection = (Connection) session.getAttribute(CONNECTION);
if ( connection != null ) {
connection.close();
}
}
}
StanzaHandler
// 解析XML
public void process(String stanza, XMPPPacketReader reader) throws Exception {
boolean initialStream = stanza.startsWith("<stream:stream") || stanza.startsWith("<flash:stream");
if (!sessionCreated || initialStream) {
if (!initialStream) {
// Allow requests for flash socket policy files directly on the client listener port
if (stanza.startsWith("<policy-file-request/>")) {
String crossDomainText = FlashCrossDomainServlet.CROSS_DOMAIN_TEXT +
XMPPServer.getInstance().getConnectionManager().getClientListenerPort() +
FlashCrossDomainServlet.CROSS_DOMAIN_END_TEXT + '\0';
connection.deliverRawText(crossDomainText);
return;
}
else {
// Ignore <?xml version="1.0"?>
return;
}
}
// Found an stream:stream tag...
if (!sessionCreated) {
sessionCreated = true;
MXParser parser = reader.getXPPParser();
parser.setInput(new StringReader(stanza));
createSession(parser);
}
else if (startedTLS) {
startedTLS = false;
tlsNegotiated();
}
else if (startedSASL && saslStatus == SASLAuthentication.Status.authenticated) {
startedSASL = false;
saslSuccessful();
}
else if (waitingCompressionACK) {
waitingCompressionACK = false;
compressionSuccessful();
}
return;
}
// Verify if end of stream was requested
if (stanza.equals("</stream:stream>")) {
if (session != null) {
session.getStreamManager().formalClose();
Log.debug( "Closing session as an end-of-stream was received: {}", session );
session.close();
}
return;
}
// Ignore <?xml version="1.0"?> stanzas sent by clients
if (stanza.startsWith("<?xml")) {
return;
}
// Create DOM object from received stanza
Element doc = reader.read(new StringReader(stanza)).getRootElement();
if (doc == null) {
// No document found.
return;
}
String tag = doc.getName();
if ("starttls".equals(tag)) {
// Negotiate TLS
if (negotiateTLS()) {
startedTLS = true;
}
else {
connection.close();
session = null;
}
}
else if ("auth".equals(tag)) {
// User is trying to authenticate using SASL
startedSASL = true;
// Process authentication stanza
saslStatus = SASLAuthentication.handle(session, doc);
} else if (startedSASL && "response".equals(tag) || "abort".equals(tag)) {
// User is responding to SASL challenge. Process response
saslStatus = SASLAuthentication.handle(session, doc);
}
else if ("compress".equals(tag)) {
// Client is trying to initiate compression
if (compressClient(doc)) {
// Compression was successful so open a new stream and offer
// resource binding and session establishment (to client sessions only)
waitingCompressionACK = true;
}
} else if (isStreamManagementStanza(doc)) {
session.getStreamManager().process( doc );
}
else {
// 处理解析完毕的消息
process(doc);
}
}
private void process(Element doc) throws UnauthorizedException {
if (doc == null) {
return;
}
// Ensure that connection was secured if TLS was required
if (connection.getTlsPolicy() == Connection.TLSPolicy.required &&
!connection.isSecure()) {
closeNeverSecuredConnection();
return;
}
String tag = doc.getName();
if ("error".equals(tag)) {
Log.info("The stream is being closed by the peer, which sent this stream error: " + doc.asXML());
session.close();
}
else if ("message".equals(tag)) { // message类型的消息
Message packet;
try {
packet = new Message(doc, !validateJIDs());
}
catch (IllegalArgumentException e) {
Log.debug("Rejecting packet. JID malformed", e);
// The original packet contains a malformed JID so answer with an error.
Message reply = new Message();
reply.setID(doc.attributeValue("id"));
reply.setTo(session.getAddress());
reply.getElement().addAttribute("from", doc.attributeValue("to"));
reply.setError(PacketError.Condition.jid_malformed);
session.process(reply);
return;
}
Log.info("message ================= " + packet.toString());
processMessage(packet);
}
else if ("presence".equals(tag)) { // presence类型的消息
Presence packet;
try {
packet = new Presence(doc, !validateJIDs());
}
catch (IllegalArgumentException e) {
Log.debug("Rejecting packet. JID malformed", e);
// The original packet contains a malformed JID so answer an error
Presence reply = new Presence();
reply.setID(doc.attributeValue("id"));
reply.setTo(session.getAddress());
reply.getElement().addAttribute("from", doc.attributeValue("to"));
reply.setError(PacketError.Condition.jid_malformed);
session.process(reply);
return;
}
// Check that the presence type is valid. If not then assume available type
try {
packet.getType();
}
catch (IllegalArgumentException e) {
Log.warn("Invalid presence type", e);
// The presence packet contains an invalid presence type so replace it with
// an available presence type
packet.setType(null);
}
// Check that the presence show is valid. If not then assume available show value
try {
packet.getShow();
}
catch (IllegalArgumentException e) {
Log.debug("Invalid presence show for -" + packet.toXML(), e);
// The presence packet contains an invalid presence show so replace it with
// an available presence show
packet.setShow(null);
}
if (session.getStatus() == Session.STATUS_CLOSED && packet.isAvailable()) {
// Ignore available presence packets sent from a closed session. A closed
// session may have buffered data pending to be processes so we want to ignore
// just Presences of type available
Log.warn("Ignoring available presence packet of closed session: " + packet);
return;
}
processPresence(packet);
}
else if ("iq".equals(tag)) { // iq类型的消息
IQ packet;
try {
packet = getIQ(doc);
}
catch (IllegalArgumentException e) {
Log.debug("Rejecting packet. JID malformed", e);
// The original packet contains a malformed JID so answer an error
IQ reply = new IQ();
if (!doc.elements().isEmpty()) {
reply.setChildElement(((Element)doc.elements().get(0)).createCopy());
}
reply.setID(doc.attributeValue("id"));
reply.setTo(session.getAddress());
if (doc.attributeValue("to") != null) {
reply.getElement().addAttribute("from", doc.attributeValue("to"));
}
reply.setError(PacketError.Condition.jid_malformed);
session.process(reply);
return;
}
if (packet.getID() == null && JiveGlobals.getBooleanProperty("xmpp.server.validation.enabled", false)) {
// IQ packets MUST have an 'id' attribute so close the connection
Log.debug( "Closing session, as it sent us an IQ packet that has no ID attribute: {}. Affected session: {}", packet.toXML(), session );
StreamError error = new StreamError(StreamError.Condition.invalid_xml);
session.deliverRawText(error.toXML());
session.close();
return;
}
processIQ(packet);
}
else {
if (!processUnknowPacket(doc)) {
Log.warn(LocaleUtils.getLocalizedString("admin.error.packet.tag") + doc.asXML() + ". Closing session: " + session);
session.close();
}
}
}
StanzaHandler. processMessage() => PacketRouterImpl.route() => MessageRouter.route() => LocalSession.process => LocalClientSession.deliver() => NIOConnection.deliver()
NIOConnection
public void deliver(Packet packet) throws UnauthorizedException {
if (isClosed()) {
backupDeliverer.deliver(packet);
}
else {
boolean errorDelivering = false;
IoBuffer buffer = IoBuffer.allocate(4096);
buffer.setAutoExpand(true);
try {
buffer.putString(packet.getElement().asXML(), encoder.get());
if (flashClient) {
buffer.put((byte) '\0');
}
buffer.flip();
ioSessionLock.lock();
try {
ioSession.write(buffer);
} finally {
ioSessionLock.unlock();
}
}
catch (Exception e) {
Log.debug("Error delivering packet:\n" + packet, e);
errorDelivering = true;
}
if (errorDelivering) {
close();
// Retry sending the packet again. Most probably if the packet is a
// Message it will be stored offline
backupDeliverer.deliver(packet);
}
else {
session.incrementServerPacketCount();
}
}
}
Last updated