Issue Details (XML | Word | Printable)

Key: MULE-3418
Type: Bug Bug
Status: Closed Closed
Resolution: Fixed
Priority: Major Major
Assignee: Daniel Feist
Reporter: Benjamin Cox
Votes: 0
Watchers: 1
Operations

If you were logged in you would be able to see more operations.
Mule

VM Connector cannot be configured to use a single thread on a singleton component

Created: 30/May/08 06:16 PM   Updated: 25/Nov/08 09:42 AM
Component/s: Transport: VM
Affects Version/s: 2.0.0, 2.0.1
Fix Version/s: 2.1.2

Time Tracking:
Not Specified

File Attachments: 1. Zip Archive TestApp.zip (3 kB)

Environment: Mule embedded, Spring-based configuration
Issue Links:
Block
 

Labels:
User impact: High
Effort points: 1


 Description  « Hide
Back in 1.3, I had a simple component of which there was one instance. If it was busy, the incoming messages would be queued in the vmQueue connector. So I could hold a lot of state in my component and not worry about thread safety nor about having to load all that state multiple times.

Since moving to 2.0 I don't seem to be able to replicate this behavior. I am using a singleton spring bean, so there is only one instance in the context, but I haven't been able to make it so that there is only one thread running through this code at one time.

Here is my previous, working config (relevant parts):

<connector name="vmQueue" className="org.mule.providers.vm.VMConnector"> 
        <properties> 
            <property name="queueEvents" value="true"/> 
        </properties> 
    </connector> 

... 

    <mule-descriptor name="JobWorker"	 implementation="com.mycompany.JobWorker"> 
        <inbound-router> 
            <endpoint address="vm://JobWorkerQueue" connector="vmQueue"/> 
        </inbound-router> 
        <pooling-profile maxActive="1" maxIdle="1" initialisationPolicy="INITIALISE_FIRST" 
                                      exhaustedAction="WAIT" maxWait="0"/> 
    </mule-descriptor>

And the new one:

<vm:connector name="vmQueue" queueEvents="true"> <!-- createDispatcherPerRequest="false"> --> 
        <vm:queueProfile maxOutstandingMessages="0" persistent="false"/> 
    </vm:connector> 

    <vm:endpoint name="jobWorkerQueueEndpoint" address="vm://JobWorkerQueue" connector-ref="vmQueue"/> 

... 

    <mule:service name="jobWorkerService"> 
          <mule:inbound> 
              <mule:inbound-endpoint ref="jobWorkerQueueEndpoint"/> 
          </mule:inbound> 
          <mule:component> 
              <mule:spring-object bean="jobWorker"/> 
          </mule:component> 
     </mule:service> 

... 

    <bean name="jobWorker" class="com.mycompany.JobWorker"/>

Among other things, I tried switching to a pooled-component with a 1-object pooling-profile (basically doesn't work and seems to replicated spring-object functionality). Then I tried adding the following component-threading-profile to the jobWorkerService definition:

<mule:component-threading-profile maxThreadsActive="1" maxThreadsIdle="1" poolExhaustedAction="WAIT" />
ERROR 2008-05-29 19:35:30,189 [jobWorkerService.1] com.mycompany.mule.TestComponentExceptionStrategy: 
******************************************************************************** 
Message               : Failed to borrow object from pool. Component that caused exception is: jobWorkerService. Message payload is of type: DefaultOPSyncJob 
Type                  : org.mule.api.service.ServiceException 
Code                  : MULE_ERROR--2 
Payload               : com.mycompany.datamodel.DefaultJob@1bf68d3 
JavaDoc               : http://mule.mulesource.org/docs/apidocs/org/mule/api/service/ServiceException.html
******************************************************************************** 
Exception stack is: 
1. null (edu.emory.mathcs.backport.java.util.concurrent.RejectedExecutionException) 
  org.mule.util.concurrent.WaitPolicy:56 (null) 
2. Failed to borrow object from pool. Component that caused exception is: jobWorkerService. Message payload is of type: DefaultJob (org.mule.api.service.ServiceException) 
  org.mule.model.seda.SedaService:358 (http://mule.mulesource.org/docs/apidocs/org/mule/api/service/ServiceException.html) 
******************************************************************************** 
Root Exception stack trace: 
edu.emory.mathcs.backport.java.util.concurrent.RejectedExecutionException 
        at org.mule.util.concurrent.WaitPolicy.rejectedExecution(WaitPolicy.java:56) 
        at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:765) 
        at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1288) 
        at org.mule.work.ScheduleWorkExecutor.doExecute(ScheduleWorkExecutor.java:41) 
        at org.mule.work.MuleWorkManager.executeWork(MuleWorkManager.java:269) 
        at org.mule.work.MuleWorkManager.scheduleWork(MuleWorkManager.java:232) 
        at org.mule.model.seda.SedaService.run(SedaService.java:325) 
        at org.mule.work.WorkerContext.run(WorkerContext.java:310) 
        at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1061) 
        at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:575) 
        at java.lang.Thread.run(Thread.java:619) 

******************************************************************************** 

FATAL 2008-05-29 19:35:30,189 [jobWorkerService.1] com.mycompany.mule.TestComponentExceptionStrategy: The Default Component Exception Strategy has been invoked but there is no current event on the context 
FATAL 2008-05-29 19:35:30,189 [jobWorkerService.1] com.mycompany.mule.TestComponentExceptionStrategy: The error is: Failed to borrow object from pool. Component that caused exception is: jobWorkerService. Message payload is of type: DefaultJob 
org.mule.api.service.ServiceException: Failed to borrow object from pool. Component that caused exception is: jobWorkerService. Message payload is of type: DefaultJob 
        at org.mule.model.seda.SedaService.run(SedaService.java:358) 
        at org.mule.work.WorkerContext.run(WorkerContext.java:310) 
        at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1061) 
        at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:575) 
        at java.lang.Thread.run(Thread.java:619) 
Caused by: edu.emory.mathcs.backport.java.util.concurrent.RejectedExecutionException 
        at org.mule.util.concurrent.WaitPolicy.rejectedExecution(WaitPolicy.java:56) 
        at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:765) 
        at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1288) 
        at org.mule.work.ScheduleWorkExecutor.doExecute(ScheduleWorkExecutor.java:41) 
        at org.mule.work.MuleWorkManager.executeWork(MuleWorkManager.java:269) 
        at org.mule.work.MuleWorkManager.scheduleWork(MuleWorkManager.java:232) 
        at org.mule.model.seda.SedaService.run(SedaService.java:325) 
        ... 4 more 
FATAL 2008-05-29 19:35:30,189 [jobWorkerService.1] com.mycompany.TestComponentExceptionStrategy: The Default Service Exception Strategy has been invoked but there is no current event on the context 
FATAL 2008-05-29 19:35:30,189 [jobWorkerService.1] com.mycompany.mule.MuleShutdownExceptionHandler: Thu May 29 19:35:30 PDT 2008  - Got a FatalIntegrationException: Failed to borrow object from pool. Component that caused exception is: jobWorkerService. Message payload is of type: DefaultJob 
Caused by: null 

edu.emory.mathcs.backport.java.util.concurrent.RejectedExecutionException 
        at org.mule.util.concurrent.WaitPolicy.rejectedExecution(WaitPolicy.java:56) 
        at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:765) 
        at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1288) 
        at org.mule.work.ScheduleWorkExecutor.doExecute(ScheduleWorkExecutor.java:41) 
        at org.mule.work.MuleWorkManager.executeWork(MuleWorkManager.java:269) 
        at org.mule.work.MuleWorkManager.scheduleWork(MuleWorkManager.java:232) 
        at org.mule.model.seda.SedaService.run(SedaService.java:325) 
        at org.mule.work.WorkerContext.run(WorkerContext.java:310) 
        at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1061) 
        at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:575) 
        at java.lang.Thread.run(Thread.java:619) 

The JobTester class is a JUnit test which uses DefaultJobSubmitter to push DefaultJob objects into the mule endpoint. The JobWorker listens via that endpoint (configured as "jobWorkerService").

JobWorker will have startJob() called by Mule, at which point it will spawn a thread to "run" the Job. The DefaultJob just sleeps for 10 seconds. When finished it triggers a listener which the JobWorker is waiting for, at which point the JobWorker will return from the startJob() call and the next one should go through. If a job is currently running when startJob() is called again, it will throw an IllegalStateException.

  • If you run the test case as it is now, it will throw the IllegalStateException to the console once the second job is submitted.
  • If you tweak the JobTester.WAIT_PERIOD value to 15000, it will not throw any exception.
  • If you uncomment the component-threading-profile tag in the jobWorkerService definition, you can see it throw an exception as soon as the first job is submitted.

The preferred behavior is for the VM Connector to queue up events and have one thread consume the queue and call startJob() serially, rather than multiple threads doing so in parallel.

Does this seem like a tricky problem, or fairly straightforward? I could get Mule source and poke around if you think it's likely to be effective.

Thanks,

Ben



 All   Comments   Work Log   Change History   Transitions   FishEye      Sort Order: Ascending order - Click to sort in descending order
Benjamin Cox added a comment - 30/May/08 06:24 PM
The attached file is a simple test case, including 4 classes and a Spring-centric Mule config file.

The JobTester class is a JUnit test which uses DefaultJobSubmitter to push DefaultJob objects into the mule endpoint. The JobWorker listens to that endpoint (configured as "jobWorkerService").

JobWorker will have startJob() called by Mule, at which point it will spawn a thread to "run" the Job. The DefaultJob just sleeps for 10 seconds. When finished it triggers a listener which the JobWorker is waiting for, at which point the JobWorker will return from the startJob() call and the next one should go through. If a job is currently running when startJob() is called again, it will throw an IllegalStateException.

  • If you run the test case as it is now, it will throw the IllegalStateException to the console once the second job is submitted.
  • If you tweak the JobTester.WAIT_PERIOD value to 15000, it will not throw any exception.
  • If you uncomment the component-threading-profile tag in the jobWorkerService definition, you can see it throw an exception as soon as the first job is submitted.

The preferred behavior is for the VM Connector to queue up events and have one thread consume the queue and call startJob() serially, rather than multiple threads doing so in parallel.


Benjamin Cox added a comment - 30/May/08 06:25 PM
P.S. Sorry about the formatting of the initial issue - I don't seem to have the usual Edit link on the left side

Daniel Feist added a comment - 25/Nov/08 09:42 AM
This should now be fixed. See the two issue linked as blocking for more information, both issues have now been fixed with test cases.

You can now also use doThreading="false" instead of have to set number of threads to 1.