asyncQueue
AsyncQueuer
ClassNote: All core queuing concepts from the Queuing Guide also apply to AsyncQueuer. AsyncQueuer extends these concepts with advanced features like concurrency (multiple tasks at once) and robust error handling. If you are new to queuing, start with the Queuing Guide to learn about FIFO/LIFO, priority, expiration, rejection, and queue management. This guide focuses on what makes AsyncQueuer unique and powerful for asynchronous and concurrent task processing.
While the Queuer provides synchronous queuing with timing controls, the AsyncQueuer is designed specifically for handling concurrent asynchronous operations. It implements what is traditionally known as a "task pool" or "worker pool" pattern, allowing multiple operations to be processed simultaneously while maintaining control over concurrency and timing. The implementation is mostly copied from Swimmer, Tanner's original task pooling utility that has been serving the JavaScript community since 2017.
Async queuing extends the basic queuing concept by adding concurrent processing capabilities. Instead of processing one item at a time, an async queuer can process multiple items simultaneously while still maintaining order and control over the execution. This is particularly useful when dealing with I/O operations, network requests, or any tasks that spend most of their time waiting rather than consuming CPU.
Async Queuing (concurrency: 2, wait: 2 ticks)
Timeline: [1 second per tick]
Calls: ⬇️ ⬇️ ⬇️ ⬇️ ⬇️ ⬇️ ⬇️
Queue: [ABC] [C] [CDE] [E] []
Active: [A,B] [B,C] [C,D] [D,E] [E]
Completed: - A B C D,E
[=================================================================]
^ Unlike regular queuing, multiple items
can be processed concurrently
[Items queue up] [Process 2 at once] [Complete]
when busy with wait between all items
Async Queuing (concurrency: 2, wait: 2 ticks)
Timeline: [1 second per tick]
Calls: ⬇️ ⬇️ ⬇️ ⬇️ ⬇️ ⬇️ ⬇️
Queue: [ABC] [C] [CDE] [E] []
Active: [A,B] [B,C] [C,D] [D,E] [E]
Completed: - A B C D,E
[=================================================================]
^ Unlike regular queuing, multiple items
can be processed concurrently
[Items queue up] [Process 2 at once] [Complete]
when busy with wait between all items
Async queuing is particularly effective when you need to:
The AsyncQueuer is very versatile and can be used in many situations. If you don't need concurrent processing, use Queuing instead. If you don't need all executions that are queued to go through, use Throttling instead.
If you want to group operations together, use Batching instead.
TanStack Pacer provides async queuing through the simple asyncQueue function and the more powerful AsyncQueuer class. All queue types and ordering strategies (FIFO, LIFO, priority, etc.) are supported just like in the core queuing guide.
The asyncQueue function provides a simple way to create an always-running async queue:
import { asyncQueue } from '@tanstack/pacer'
// Create a queue that processes up to 2 items concurrently
const processItems = asyncQueue(
async (item: number) => {
// Process each item asynchronously
const result = await fetchData(item)
return result
},
{
concurrency: 2,
onItemsChange: (queuer) => {
console.log('Active tasks:', queuer.peekActiveItems().length)
}
}
)
// Add items to be processed
processItems(1)
processItems(2)
import { asyncQueue } from '@tanstack/pacer'
// Create a queue that processes up to 2 items concurrently
const processItems = asyncQueue(
async (item: number) => {
// Process each item asynchronously
const result = await fetchData(item)
return result
},
{
concurrency: 2,
onItemsChange: (queuer) => {
console.log('Active tasks:', queuer.peekActiveItems().length)
}
}
)
// Add items to be processed
processItems(1)
processItems(2)
For more control over the queue, use the AsyncQueuer class directly.
The AsyncQueuer class provides complete control over async queue behavior, including all the core queuing features plus:
import { AsyncQueuer } from '@tanstack/pacer'
const queue = new AsyncQueuer(
async (item: number) => {
// Process each item asynchronously
const result = await fetchData(item)
return result
},
{
concurrency: 2, // Process 2 items at once
wait: 1000, // Wait 1 second between starting new items
started: true // Start processing immediately
}
)
// Add error and success handlers via options
queue.setOptions({
onError: (error, queuer) => {
console.error('Task failed:', error)
// You can access queue state here
console.log('Error count:', queuer.store.state.errorCount)
},
onSuccess: (result, queuer) => {
console.log('Task completed:', result)
// You can access queue state here
console.log('Success count:', queuer.store.state.successCount)
},
onSettled: (queuer) => {
// Called after each execution (success or failure)
console.log('Total settled:', queuer.store.state.settledCount)
}
})
// Add items to be processed
queue.addItem(1)
queue.addItem(2)
import { AsyncQueuer } from '@tanstack/pacer'
const queue = new AsyncQueuer(
async (item: number) => {
// Process each item asynchronously
const result = await fetchData(item)
return result
},
{
concurrency: 2, // Process 2 items at once
wait: 1000, // Wait 1 second between starting new items
started: true // Start processing immediately
}
)
// Add error and success handlers via options
queue.setOptions({
onError: (error, queuer) => {
console.error('Task failed:', error)
// You can access queue state here
console.log('Error count:', queuer.store.state.errorCount)
},
onSuccess: (result, queuer) => {
console.log('Task completed:', result)
// You can access queue state here
console.log('Success count:', queuer.store.state.successCount)
},
onSettled: (queuer) => {
// Called after each execution (success or failure)
console.log('Total settled:', queuer.store.state.settledCount)
}
})
// Add items to be processed
queue.addItem(1)
queue.addItem(2)
All queue types and ordering strategies (FIFO, LIFO, priority, etc.) are supported—see the Queuing Guide for details. AsyncQueuer adds:
const priorityQueue = new AsyncQueuer(
async (item: { value: string; priority: number }) => {
// Process each item asynchronously
return await processTask(item.value)
},
{
concurrency: 2,
getPriority: (item) => item.priority // Higher numbers have priority
}
)
priorityQueue.addItem({ value: 'low', priority: 1 })
priorityQueue.addItem({ value: 'high', priority: 3 })
priorityQueue.addItem({ value: 'medium', priority: 2 })
// Processes: high and medium concurrently, then low
const priorityQueue = new AsyncQueuer(
async (item: { value: string; priority: number }) => {
// Process each item asynchronously
return await processTask(item.value)
},
{
concurrency: 2,
getPriority: (item) => item.priority // Higher numbers have priority
}
)
priorityQueue.addItem({ value: 'low', priority: 1 })
priorityQueue.addItem({ value: 'high', priority: 3 })
priorityQueue.addItem({ value: 'medium', priority: 2 })
// Processes: high and medium concurrently, then low
const queue = new AsyncQueuer(
async (item: number) => {
// Process each item asynchronously
if (item < 0) throw new Error('Negative item')
return await processTask(item)
},
{
onError: (error, queuer) => {
console.error('Task failed:', error)
// You can access queue state here
console.log('Error count:', queuer.store.state.errorCount)
},
throwOnError: true, // Will throw errors even with onError handler
onSuccess: (result, queuer) => {
console.log('Task succeeded:', result)
// You can access queue state here
console.log('Success count:', queuer.store.state.successCount)
},
onSettled: (queuer) => {
// Called after each execution (success or failure)
console.log('Total settled:', queuer.store.state.settledCount)
}
}
)
queue.addItem(-1) // Will trigger error handling
queue.addItem(2)
const queue = new AsyncQueuer(
async (item: number) => {
// Process each item asynchronously
if (item < 0) throw new Error('Negative item')
return await processTask(item)
},
{
onError: (error, queuer) => {
console.error('Task failed:', error)
// You can access queue state here
console.log('Error count:', queuer.store.state.errorCount)
},
throwOnError: true, // Will throw errors even with onError handler
onSuccess: (result, queuer) => {
console.log('Task succeeded:', result)
// You can access queue state here
console.log('Success count:', queuer.store.state.successCount)
},
onSettled: (queuer) => {
// Called after each execution (success or failure)
console.log('Total settled:', queuer.store.state.settledCount)
}
}
)
queue.addItem(-1) // Will trigger error handling
queue.addItem(2)
const queue = new AsyncQueuer(
async (item: number) => {
// Process each item asynchronously
return await processTask(item)
},
{
// Dynamic concurrency based on system load
concurrency: (queuer) => {
return Math.max(1, 4 - queuer.store.state.activeItems.length)
},
// Dynamic wait time based on queue size
wait: (queuer) => {
return queuer.store.state.size > 10 ? 2000 : 1000
}
}
)
const queue = new AsyncQueuer(
async (item: number) => {
// Process each item asynchronously
return await processTask(item)
},
{
// Dynamic concurrency based on system load
concurrency: (queuer) => {
return Math.max(1, 4 - queuer.store.state.activeItems.length)
},
// Dynamic wait time based on queue size
wait: (queuer) => {
return queuer.store.state.size > 10 ? 2000 : 1000
}
}
)
AsyncQueuer provides all the queue management and monitoring methods from the core queuing guide, plus async-specific ones:
See the Queuing Guide for more on queue management concepts.
AsyncQueuer supports expiration and rejection just like the core queuer:
See the Queuing Guide for details and examples.
The AsyncQueuer class uses TanStack Store for reactive state management, providing real-time access to queue state, processing statistics, and concurrent task tracking.
When using the AsyncQueuer class directly, access state via the store.state property:
const queue = new AsyncQueuer(processFn, { concurrency: 2, wait: 1000 })
// Access current state
console.log(queue.store.state.isIdle)
const queue = new AsyncQueuer(processFn, { concurrency: 2, wait: 1000 })
// Access current state
console.log(queue.store.state.isIdle)
When using framework adapters like React or Solid, the state is exposed directly as a reactive property:
// React example
const queue = useAsyncQueuer(processFn, { concurrency: 2, wait: 1000 })
// Access state directly (reactive)
console.log(queue.state.successCount) // Reactive value
console.log(queue.state.activeItems.length) // Reactive value
// React example
const queue = useAsyncQueuer(processFn, { concurrency: 2, wait: 1000 })
// Access state directly (reactive)
console.log(queue.state.successCount) // Reactive value
console.log(queue.state.activeItems.length) // Reactive value
You can provide initial state values when creating an async queuer:
const queue = new AsyncQueuer(processFn, {
concurrency: 2,
wait: 1000,
initialState: {
successCount: 10, // Start with 10 successful tasks
errorCount: 2, // Start with 2 failed tasks
isRunning: false, // Start paused
lastResult: 'initial-result', // Start with initial result
}
})
const queue = new AsyncQueuer(processFn, {
concurrency: 2,
wait: 1000,
initialState: {
successCount: 10, // Start with 10 successful tasks
errorCount: 2, // Start with 2 failed tasks
isRunning: false, // Start paused
lastResult: 'initial-result', // Start with initial result
}
})
The store is reactive and supports subscriptions:
const queue = new AsyncQueuer(processFn, { concurrency: 2, wait: 1000 })
// Subscribe to state changes
const unsubscribe = queue.store.subscribe((state) => {
console.log('Active tasks:', state.activeItems.length)
console.log('Success count:', state.successCount)
console.log('Error count:', state.errorCount)
console.log('Queue size:', state.size)
console.log('Is running:', state.isRunning)
})
// Unsubscribe when done
unsubscribe()
const queue = new AsyncQueuer(processFn, { concurrency: 2, wait: 1000 })
// Subscribe to state changes
const unsubscribe = queue.store.subscribe((state) => {
console.log('Active tasks:', state.activeItems.length)
console.log('Success count:', state.successCount)
console.log('Error count:', state.errorCount)
console.log('Queue size:', state.size)
console.log('Is running:', state.isRunning)
})
// Unsubscribe when done
unsubscribe()
The AsyncQueuerState includes all properties from the core queuing guide plus:
The async queuer supports flushing items to process them immediately:
const queue = new AsyncQueuer(processFn, { concurrency: 2, wait: 5000 })
queue.addItem('item1')
queue.addItem('item2')
queue.addItem('item3')
console.log(queue.store.state.size) // 3
// Flush all items immediately instead of waiting
queue.flush()
console.log(queue.store.state.activeItems.length) // 2 (processing concurrently)
console.log(queue.store.state.size) // 1 (one remaining)
// Or flush a specific number of items
queue.flush(1) // Process 1 more item
console.log(queue.store.state.activeItems.length) // 3 (all processing concurrently)
const queue = new AsyncQueuer(processFn, { concurrency: 2, wait: 5000 })
queue.addItem('item1')
queue.addItem('item2')
queue.addItem('item3')
console.log(queue.store.state.size) // 3
// Flush all items immediately instead of waiting
queue.flush()
console.log(queue.store.state.activeItems.length) // 2 (processing concurrently)
console.log(queue.store.state.size) // 1 (one remaining)
// Or flush a specific number of items
queue.flush(1) // Process 1 more item
console.log(queue.store.state.activeItems.length) // 3 (all processing concurrently)
Each framework adapter builds convenient hooks and functions around the async queuer classes. Hooks like useAsyncQueuer or useAsyncQueuedState are small wrappers that can cut down on the boilerplate needed in your own code for some common use cases.
Your weekly dose of JavaScript news. Delivered every Monday to over 100,000 devs, for free.