Skip to content

EntropyEngine::Core::Concurrency::NodeScheduler

EntropyEngine::Core::Concurrency::NodeScheduler

Section titled “EntropyEngine::Core::Concurrency::NodeScheduler”

Manages graph node scheduling and overflow handling for work execution. More…

#include <NodeScheduler.h>

Name
structStats
Health metrics for the scheduler.
structConfig
Configuration parameters for tuning scheduler behavior.
structCallbacks
Lifecycle hooks for monitoring node execution flow.
Name
~NodeScheduler()
voidsetCallbacks(const Callbacks & callbacks)
Installs lifecycle monitoring callbacks.
size_tscheduleReadyNodes(const std::vector< NodeHandle > & nodes)
Batch scheduling for multiple ready nodes.
boolscheduleNode(const NodeHandle & node)
Attempts to schedule a node, deferring if necessary.
voidresetStats()
Clears all statistics counters back to zero.
size_tprocessTimedDeferredNodes(size_t maxToSchedule =0)
Checks timed deferrals and schedules nodes whose wake time has passed.
size_tprocessDeferredNodes(size_t maxToSchedule =0)
Drains the deferred queue into available execution slots.
boolhasCapacity() const
Quick check if we can accept more work right now.
StatsgetStats() const
size_tgetMemoryUsage() const
Estimates memory consumption of the scheduler.
size_tgetDeferredCount() const
Checks how many nodes are waiting in the deferred queue.
size_tgetAvailableCapacity() const
Gets exact number of free execution slots.
booldeferNodeUntil(const NodeHandle & node, std::chrono::steady_clock::time_point wakeTime)
Defers a node until a specific time point.
booldeferNode(const NodeHandle & node)
Explicitly defers a node without trying to schedule first.
size_tclearDeferredNodes()
Nuclear option: drops all deferred nodes.
NodeScheduler(WorkContractGroup * contractGroup, const WorkGraph * graph, std::shared_mutex * graphMutex, Core::EventBus * eventBus =nullptr, const Config & config ={})
Creates a scheduler bridging graph nodes to work execution.
class EntropyEngine::Core::Concurrency::NodeScheduler;

Manages graph node scheduling and overflow handling for work execution.

NodeScheduler serves as the interface between WorkGraph’s high-level dependency management and WorkContractGroup’s low-level execution system. When WorkGraph determines nodes are ready for execution, this scheduler manages their transition into the work queue. The scheduler handles capacity constraints by maintaining a deferred queue for nodes that cannot be immediately scheduled due to queue limitations.

This component bridges the abstraction gap between graph-based task management and queue-based execution. It addresses the practical constraint that ready nodes may exceed available execution slots, providing buffering and advanced scheduling infrastructure.

Key responsibilities:

  • Immediate scheduling when there’s capacity
  • Deferred queue management when work queue is full
  • Batch scheduling (schedule multiple nodes in one go)
  • Lifecycle callbacks for monitoring and debugging
  • Statistics tracking for analysis

Complexity characteristics:

  • Deferred queue operations: O(1) push_back, O(1) pop_front

Suitable applications:

  • Task graphs that might generate bursts of ready nodes
  • Systems where work generation can outpace execution
  • Monitoring node execution lifecycle for debugging
  • Building priority scheduling on top (future enhancement)

Design trade-offs:

  • Uses mutex for thread safety (not lock-free like WorkContractGroup)
  • Deque for deferred queue (good cache locality, no allocations until overflow)
  • Separate stats tracking to avoid polluting hot path
// Basic usage with a WorkGraph
WorkContractGroup contractGroup(1024);
WorkGraph graph;
NodeScheduler scheduler(&contractGroup, &graph);
// Set up lifecycle monitoring
NodeScheduler::Callbacks callbacks;
callbacks.onNodeScheduled = [](NodeHandle node) {
LOG_DEBUG("Node {} scheduled", node.index());
};
callbacks.onNodeDeferred = [](NodeHandle node) {
LOG_WARN("Node {} deferred (queue full)", node.index());
};
scheduler.setCallbacks(callbacks);
// Schedule nodes as they become ready
if (!scheduler.scheduleNode(readyNode)) {
// Node was deferred, will be scheduled when capacity available
}
// Process deferred nodes when work completes
size_t scheduled = scheduler.processDeferredNodes();
inline ~NodeScheduler()
inline void setCallbacks(
const Callbacks & callbacks
)

Installs lifecycle monitoring callbacks.

Parameters:

  • callbacks Structure with optional callback functions

Set before scheduling for lifecycle tracking. Not thread-safe with active scheduling - set during init.

NodeScheduler::Callbacks callbacks;
callbacks.onNodeDeferred = [&deferredCount](NodeHandle) {
deferredCount++;
};
callbacks.onNodeDropped = [](NodeHandle node) {
LOG_ERROR("Critical: Node {} dropped!", node.index());
};
scheduler.setCallbacks(callbacks);
size_t scheduleReadyNodes(
const std::vector< NodeHandle > & nodes
)

Batch scheduling for multiple ready nodes.

Parameters:

  • nodes Vector of nodes ready for execution

Return: Number scheduled immediately (rest deferred/dropped)

Handles multiple nodes efficiently. Schedules what fits, defers rest. Preserves order.

// After a node completes, schedule all its dependents
std::vector<NodeHandle> readyDependents = getDependents(completed);
size_t scheduled = scheduler.scheduleReadyNodes(readyDependents);
LOG_DEBUG("Scheduled {}/{} dependent nodes",
scheduled, readyDependents.size());
bool scheduleNode(
const NodeHandle & node
)

Attempts to schedule a node, deferring if necessary.

Parameters:

  • node Handle to the node that’s ready to execute

Return: true if scheduled immediately, false if deferred or dropped

Main entry point for execution. Tries immediate scheduling, defers if full, drops if deferred queue full. Thread-safe.

// In your graph's "node became ready" logic
if (scheduler.scheduleNode(readyNode)) {
// Great, it's in the work queue
} else {
// Deferred or dropped - check stats to see which
auto stats = scheduler.getStats();
if (stats.nodesDropped > 0) {
// Houston, we have a problem
}
}
inline void resetStats()

Clears all statistics counters back to zero.

For benchmarking or resetting after warmup. Only counters, not queue.

scheduler.resetStats();
runBenchmark();
auto stats = scheduler.getStats();
LOG_INFO("Benchmark: {} scheduled, {} deferred",
stats.nodesScheduled, stats.nodesDeferred);
size_t processTimedDeferredNodes(
size_t maxToSchedule =0
)

Checks timed deferrals and schedules nodes whose wake time has passed.

Parameters:

  • maxToSchedule Maximum nodes to schedule (0 = all ready nodes)

Return: Number of timed nodes actually scheduled

Examines the timed deferred queue and moves all nodes whose scheduled wake time has arrived into execution. This is called opportunistically during capacity callbacks and main thread pumping - no dedicated timer thread.

// Called during capacity callbacks
void onCapacityAvailable() {
// Check if any timers are ready
size_t scheduled = scheduler.processTimedDeferredNodes();
if (scheduled > 0) {
LOG_DEBUG("Woke {} timed nodes", scheduled);
}
}
size_t processDeferredNodes(
size_t maxToSchedule =0
)

Drains the deferred queue into available execution slots.

Parameters:

  • maxToSchedule How many to schedule max (0 = all possible)

Return: Number of nodes actually scheduled

Pulls nodes from deferred queue (FIFO) until empty or out of capacity. Use maxToSchedule to leave room for new work.

// After work completes, process waiting nodes
void onWorkCompleted() {
size_t scheduled = scheduler.processDeferredNodes();
if (scheduled > 0) {
LOG_DEBUG("Scheduled {} deferred nodes", scheduled);
}
}
// Or limit to leave room for new work
size_t availableSlots = scheduler.getAvailableCapacity();
scheduler.processDeferredNodes(availableSlots / 2); // Use half for deferred
inline bool hasCapacity() const

Quick check if we can accept more work right now.

Return: true if work queue has free slots, false if full

Snapshot that may be stale. Use for optimization hints, not critical logic.

// Use for early-out optimization
if (!scheduler.hasCapacity()) {
// Don't bother trying to schedule a bunch of nodes
return;
}
inline Stats getStats() const
inline size_t getMemoryUsage() const

Estimates memory consumption of the scheduler.

Return: Approximate bytes used by this scheduler instance

Includes object + deferred queue. Conservative estimate.

// Check memory pressure
if (scheduler.getMemoryUsage() > 1024 * 1024) { // 1MB
LOG_WARN("Scheduler using {}KB of memory",
scheduler.getMemoryUsage() / 1024);
}
inline size_t getDeferredCount() const

Checks how many nodes are waiting in the deferred queue.

Return: Current number of nodes waiting for execution capacity

Monitor system pressure. Thread-safe but takes lock - avoid tight loops.

// Monitor for queue buildup
size_t deferred = scheduler.getDeferredCount();
if (deferred > 50) {
LOG_WARN("High scheduling pressure: {} nodes waiting", deferred);
}
inline size_t getAvailableCapacity() const

Gets exact number of free execution slots.

Return: Number of nodes that could be scheduled immediately

More precise than hasCapacity(). Snapshot that might be stale.

// Batch scheduling optimization
size_t capacity = scheduler.getAvailableCapacity();
size_t toSchedule = std::min(capacity, readyNodes.size());
for (size_t i = 0; i < toSchedule; ++i) {
scheduler.scheduleNode(readyNodes[i]);
}
bool deferNodeUntil(
const NodeHandle & node,
std::chrono::steady_clock::time_point wakeTime
)

Defers a node until a specific time point.

Parameters:

  • node The node to defer
  • wakeTime When the node should be reconsidered for scheduling

Return: true if successfully queued

Instead of immediate rescheduling, the node sleeps in a priority queue until the specified wake time. No CPU usage, no thread blocking - just passive waiting. Used by timers and delayed work.

// Timer that fires in 5 seconds
auto wakeTime = std::chrono::steady_clock::now() + std::chrono::seconds(5);
scheduler.deferNodeUntil(timerNode, wakeTime);
// Node sits in queue consuming zero CPU until wakeTime arrives
bool deferNode(
const NodeHandle & node
)

Explicitly defers a node without trying to schedule first.

Parameters:

  • node The node to add to deferred queue

Return: true if queued successfully, false if deferred queue is full

Bypasses capacity check, goes straight to deferred queue. Useful for batch operations when you know there’s no capacity.

// When you know the work queue is full
if (!scheduler.hasCapacity()) {
// Don't even try to schedule, just defer
for (auto& node : readyNodes) {
if (!scheduler.deferNode(node)) {
LOG_ERROR("Deferred queue full!");
break;
}
}
}
inline size_t clearDeferredNodes()

Nuclear option: drops all deferred nodes.

Return: How many nodes were dropped from the queue

For aborting pending work. Nodes are lost - no execution or callbacks.

// Emergency abort
size_t dropped = scheduler.clearDeferredNodes();
if (dropped > 0) {
LOG_WARN("Dropped {} pending nodes during abort", dropped);
}
inline NodeScheduler(
WorkContractGroup * contractGroup,
const WorkGraph * graph,
std::shared_mutex * graphMutex,
Core::EventBus * eventBus =nullptr,
const Config & config ={}
)

Creates a scheduler bridging graph nodes to work execution.

Parameters:

  • contractGroup Where to schedule work (must outlive scheduler)
  • graph The graph whose nodes we’re scheduling (must outlive scheduler)
  • eventBus Optional event system (can be nullptr)
  • config Tuning parameters

Sets up scheduling for a WorkGraph. Doesn’t own graph or contract group.

// Typical setup in a WorkGraph
auto contractGroup = std::make_unique<WorkContractGroup>(1024);
auto scheduler = std::make_unique<NodeScheduler>(
contractGroup.get(),
this, // WorkGraph passes itself
_eventBus,
NodeScheduler::Config{.maxDeferredNodes = 200}
);

Updated on 2026-01-26 at 16:50:32 -0500