History | Log In     View a printable version of the current page.  
Issue Details (XML | Word | Printable)

Key: MULE-1894
Type: Bug Bug
Status: Closed Closed
Resolution: Fixed
Priority: Critical Critical
Assignee: Andrew Perepelytsya
Reporter: Quoc Le
Votes: 2
Watchers: 5
Operations

If you were logged in you would be able to see more operations.
Mule

JMS Session Leak in JMS Transport

Created: 07/Jun/07 02:53 PM   Updated: 27/Nov/07 12:39 PM
Component/s: Transport: JMS
Affects Version/s: 1.3, 1.4.0, 1.4.1, 1.3.1, 1.3.2, 1.3.3
Fix Version/s: 1.4.4

Time Tracking:
Not Specified

Issue Links:
Related
 

Labels:
User impact: High


 Description  « Hide
There's a JMS Session leak, which was originally observed in WebLogic JMS, but is likely more generic problem than that. I am checking in a fix to trunk now.

 All   Comments   Work Log   Change History   Transitions   FishEye      Sort Order: Ascending order - Click to sort in descending order
Quoc Le - 07/Jun/07 03:00 PM

The AbstractJmsTransformer was opening a JMS session but not closing it. We should also look into an alternative to creating a JMS session just in the transformer (to reduce overhead)

Fix committed: r6970


Andrew Perepelytsya - 07/Jun/07 03:08 PM
Apply checkstyle, set fix version and priority.

Andrew Perepelytsya - 07/Jun/07 03:18 PM
This is not a fix, read my comments on http://fisheye.codehaus.org/changelog/mule/?cs=6972

Quoc Le - 07/Jun/07 03:53 PM
I see, I fixed the leak in the non-transacted case, but then broke the transacted case.

So, help me out here. I could do something similar to the JmsMessageDispatcher where I only close the session if it's not in a transacted or cached?

It's too bad we have to open a JMS session at all in the AbstractJmsTransformer at all, but I guess that's the only way to create the JmsMessage ?


Justin Rowe - 08/Jun/07 05:28 AM
Please refer to case 1784 for potential fix.

Quoc Le - 10/Jun/07 10:07 PM

I checked in a fix, which is similar to my previous fix but handles the case where the session in a transaction. All jms tests passed, including the transaction tests.

This fix is for a customer case (00001091). I checked the fix into the sandbox this time, since it's a 1.4.0 fix that needs to be merged to trunk.

r7031


Quoc Le - 11/Jun/07 12:30 AM
Thanks Justin for the suggestion. I was heads down with developing a code fix, but I'll take a look at the config workaround you suggested.

Justin Rowe - 11/Jun/07 08:54 AM
No problem Quoc. BTW my suggested fix is a code change to AbstractJmsTransformer.transformToMessage, not a config workaround. I believe this case to be synonynous to both 1079 and 1784.

Here is my suggested fix, I have tested with both no transaction and resource local transaction (haven't tested with XA yet but I will). I'd be happy to try out your proposed fix, if you can post it here (I don't have subversion)?

protected Message transformToMessage(Object src) throws TransformerException {

Session session = null;
try {
Message result;

if (src instanceof Message) { result = (Message) src; result.clearProperties(); } else { session = this.getSession(); result = JmsMessageUtils.toMessage(src, session); }

// set the event properties on the Message
UMOEventContext ctx = RequestContext.getEventContext();
if (ctx == null) { logger.warn("There is no current event context"); return result; }

this.setJmsProperties(ctx.getMessage(), result);

return result;
} catch (TransformerException tex) { // rethrow throw tex; } catch (Exception e) { throw new TransformerException(this, e); } finally {
if (session != null) {
UMOTransaction tx = TransactionCoordination.getInstance().getTransaction();
JmsConnector conn = (JmsConnector)endpoint.getConnector();
if (tx == null || !tx.hasResource(conn.getConnection())) {
try { logger.info("closing JMS session"); session.close(); } catch (JMSException e) { logger.info("Exception closing JMS session: " +e.getMessage(), e); }
}
}
}
}


Quoc Le - 11/Jun/07 11:13 AM

Justin,

Yes, looks like we have roughly the same fix, except I just used a slightly different criteria for whether the session is in a transcation. (Your criteria may actually be better).

Here's the fix I proposed:

http://svn.codehaus.org/mule/trunk/mule-sandbox/transports/jms-leak-1.4.0/

I also had to modify one of the test cases slightly, that's nested down in the url above.

If you could test, that would be great. I will do the same and test your fix.

Thanks!
quoc


Justin Rowe - 12/Jun/07 07:12 AM
Thanks for sending that through Quoc. I tested your fix in our test env with XA transaction, resource (i.e. websphere MQ) transaction and no transaction and it works in all three cases, so well done! I believe that our fixes are equivalent, so no need to bother testing mine, just check yours in, I hope we'll see this in the Mule 1.4.2 release!

BTW session caching is performed in the message dispatcher, as such it is not available in the jms connector, so unless the jms connector is reworked to implement session caching you can't access the cached session from the transformer. Session caching is only applicable if there is no transaction available.

Another interesting thing I discovered is that the jms receiver endpoint also invokes the ObjectToJmsMessage response transformer by default, even if you set synchronous="false" on the endpoint, seems to be redundant in that case?

Cheers,

Justin


Quoc Le - 12/Jun/07 04:46 PM

Sounds good, yes I will merge the change to trunk and hopefully it gets into 1.4.2.

I appreciate the help and insight, Justin. That's great to hear the WebSphere tests also pass. Btw, the customer also verified the fix in their WebLogic environment.


Michal Porebski - 06/Jul/07 12:30 PM
Hi,
I tried this new code of Transformer, digged in the Mule 14.1 code and I think JMS session management should be redesigned again. Probably my note is out of scope of this bug, but I cannot see any working solution if only transformer code is changed. We're using Mule&JMS in a few applications that use WebSphere MQ queues and mostlty work according to scenarios where bug fix does not work:

1. Event from Transacted JMS Receiver -> JMS Transformer -> JMS Dispatch (one or n-times)
Here JMS session is always available through JMSConnector.getSessionFromTransaction. And the design is clear: TransactedJmsMessageReceiver class keeps whole transaction processing encapsulated. It knows when to start, keep the session, and end transaction. So the transformer gets JMS session from transaction context.

2. Event from somewhere else -> JMS Transformer -> JMS Dispatch (one or n-times) without transaction
First JMS Session is created in Transformer and then the second one during dispatch in JMSMessageDispatcher. It can be cached within endpoint. When there's a dynamic endpoint from AbstractRecipientList where there's no control over endpoint cache, session count starts to grow depending on number of cached enpoints.

3. Event from somewhere else -> JMS Transformer -> JMS Dispatch (one or n-times) within transaction.
JMS Session is created in Transformer, because it is first time when session is needed. It's bound do transaction ctx but after commit/rollback this session is lost. When next event comes to transformer it creates another transaction with a new JMS session.

I felt a bit lost, so tried to find solution somewhere else. So I looked at JMSConnector - and I think this class is right the place for storing JMS sessions. They should be kept like JDBC Connections in the managed pool. The number of sessions needed is a count of concurrent JMS dispatchers and probably depends on threading profile of Mule components. I tried to do quite primitive pool that controls creation of JMS sessions. Here's the code. Better implementations could decorate JmsTransaction class to release session to the pool.



Michal Porebski - 06/Jul/07 12:31 PM
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import javax.jms.JMSException;
import javax.jms.Session;

import org.mule.providers.ConnectException;
import org.mule.providers.jms.JmsConnector;
import org.mule.transaction.TransactionCoordination;
import org.mule.umo.TransactionException;
import org.mule.umo.UMOTransaction;

public class SessionPoolJmsConnector extends JmsConnector {

	private Map<Thread, Session> sessionPool = new ConcurrentHashMap<Thread, Session>();

	@Override
	public Session getSession(boolean transacted, boolean topic)
			throws JMSException {
		if (!isConnected()) {
			throw new JMSException("Not connected");
		}
		Session session = getSessionFromTransaction();
		if (session != null) {
			return session;
		}

		// find session
		session = sessionPool.get(Thread.currentThread());
		if (session == null) {
			// bind to thread
			session = super.getSession(transacted, topic);
			sessionPool.put(Thread.currentThread(), session);
		} else if (transacted) {
			doBindSessionToTransaction(session);
		}
		return session;
	}

	private final void doBindSessionToTransaction(Session session) {
		UMOTransaction tx = TransactionCoordination.getInstance()
				.getTransaction();
		if (tx != null) {
			logger.debug("Binding session to current transaction");
			try {
				tx.bindResource(getConnection(), session);
			} catch (TransactionException e) {
				throw new RuntimeException(
						"Could not bind session to current transaction", e);
			}
		}
	}

	private final void clearSessionPool() {
		for (Iterator<Thread> it = sessionPool.keySet().iterator(); it
				.hasNext();) {
			Thread thread = it.next();
			it.remove();
			closeQuietly(sessionPool.get(thread));
		}
	}

	@Override
	protected void doDisconnect() throws ConnectException {
		clearSessionPool();
		super.doDisconnect();
	}

	@Override
	protected void doDispose() {
		clearSessionPool();
		super.doDispose();
	}

}

Leon Barmakov - 09/Jul/07 06:31 AM
I've had to take the following change to the ObjectToJmsMessage transformer:
Additional code is underlined as inserted. Prior to ths fix the transacted flag was not set correctly in a case when this transformer was first to obtain the new session.

/**

  • Gets transformed message from object while cleanly closing any sessions Added per MULE-1894
  • @param src
  • source object
  • @return <code>Message</code> transformed message
  • */

protected final Message getResult(Object src) throws JMSException, TransformerException {

boolean transacted = false;
boolean cached = false;

// Figure out if the session comes from a Transaction or Cache. This
// affects how we close the session in this transformer.
// Note: This session is only used for

// TODO: JmsMessageDispatcher has similar code to figure out if session is in transaction

Session session = null;

JmsConnector connector = (JmsConnector) endpoint.getConnector();

session = connector.getSessionFromTransaction();

if (session != null) { transacted = true; } else {

session = this.getSession();

UMOTransaction tx = TransactionCoordination.getInstance().getTransaction();
if ((tx != null) && (tx.hasResource(connector.getConnection()))) {
transacted = true;

}

}

// TODO: figure out if JmsSession caching is on. Since it's off by default,
// deferring this for now.

try { return JmsMessageUtils.toMessage(src, session); } finally {

if (session != null && !transacted && !cached) { session.close(); }

}

}


Justin Rowe - 10/Jul/07 03:52 AM
We also found a similar problem to what Leon just posted in JMSMessageDispatcher dispatchMessage method when used with a non-JMS receiver (e.g. DB).

The JMSMessageDispatcher checks the endpoint configuration to determine if it is transacted. This means that it is neccessary to add an ALWAYS_JOIN transaction on the jms endpoint, or else it wont participate in the transaction. The solution is to check if the connector has been registered in the transaction (by connector.getSession), then it will be automatically enlisted without need to add a transaction on the endpoint. See the underlined code below...

private UMOMessage dispatchMessage(UMOEvent event) throws Exception
{
Session session = null;
MessageProducer producer = null;
MessageConsumer consumer = null;
Destination replyTo = null;
boolean transacted = false;
boolean cached = false;
boolean remoteSync = useRemoteSync(event);

if (logger.isDebugEnabled())

{ logger.debug("dispatching on endpoint: " + event.getEndpoint().getEndpointURI() + ". Event id is: " + event.getId()); }

try
{
session = connector.getSessionFromTransaction();
if (session != null)
{
transacted = true;

// If a transaction is running, we can not receive any messages
// in the same transaction.
if (remoteSync)

{ throw new IllegalTransactionStateException( JmsMessages.connectorDoesNotSupportSyncReceiveWhenTransacted()); }

}
// Should we be caching sessions? Note this is not part of the JMS spec.
// and is turned off by default.
else if (event.getMessage().getBooleanProperty(JmsConstants.CACHE_JMS_SESSIONS_PROPERTY,
connector.isCacheJmsSessions()))
{
cached = true;
if (cachedSession != null)

{ session = cachedSession; }

else

{ session = connector.getSession(event.getEndpoint()); cachedSession = session; }

}
else
{
session = connector.getSession(event.getEndpoint());
// check global transaction, not endpoint transaction config
// since endpoint transaction config need not be set
UMOTransaction tx = TransactionCoordination.getInstance().getTransaction();
if ((tx != null) && (tx.hasResource(connector.getConnection()))) {
transacted = true;
}

/* if (event.getEndpoint().getTransactionConfig().isTransacted())

{ transacted = true; }

*/
}


Andrew Perepelytsya - 10/Jul/07 09:37 AM
Justin, please file separate issues for the above, and use patches.

Andreas Guenther - 11/Sep/07 12:58 PM
Quoc,

Where do we stand with this issue. How long would it take to finish up the work?

Thank you,
-Andreas


Quoc Le - 11/Sep/07 01:12 PM
Andreas,

Incorporating suggestions from Leon and Justin to address setting of the transacted flag under certain scenarios, I'd say there's 3-4 days left to analyze, test, and check in. However, I am completely swamped.

Thanks,
quoc


Marie Claire Rizzo - 26/Sep/07 09:06 AM
Hi Quoc

Andrew Calafato and I have also stumbled across this problem. Seeing that you're swamped do you need a hand?

Thanks
Marie


Quoc Le - 26/Sep/07 10:25 AM
Sounds great!

Marie Claire Rizzo - 23/Oct/07 04:22 AM
Hi Quoc

I made some changes and put the session on the thread, to avoid opening and closing of sessions in the transformer. The session leak seems to be fixed. However, I'm having problems with the xa transactions not picking up all the messages on the queue. I'm not sure whether this is related so I'm not committing anything at this time.


Steve Olson - 31/Oct/07 04:36 PM
In case it helps anyone, I would like to volunteer to be a tester of the fix for this - we have MQ with and without XA and can run volume tests (10,000s of messages) on both cases. And we have a build environment of the current 1.4.4 trunk for patching/debugging/etc.

Marie Claire Rizzo - 02/Nov/07 04:27 AM
Update: The leak seems to be fixed and the XA transactions are working fine but I broke the topics. Will be working on them next. Since I put the session on the thread, all messageListeners that are on the thread get registered with the session and this blows up the topics.

Andrew Perepelytsya - 12/Nov/07 08:59 PM
http://fisheye.codehaus.org/changelog/mule/?cs=9693

Comments inlined in the changeset