EntropyEngine::Core::Concurrency::NodeScheduler
EntropyEngine::Core::Concurrency::NodeScheduler
Section titled “EntropyEngine::Core::Concurrency::NodeScheduler”Manages graph node scheduling and overflow handling for work execution. More…
#include <NodeScheduler.h>
Public Classes
Section titled “Public Classes”| Name | |
|---|---|
| struct | Stats Health metrics for the scheduler. |
| struct | Config Configuration parameters for tuning scheduler behavior. |
| struct | Callbacks Lifecycle hooks for monitoring node execution flow. |
Public Functions
Section titled “Public Functions”| Name | |
|---|---|
| ~NodeScheduler() | |
| void | setCallbacks(const Callbacks & callbacks) Installs lifecycle monitoring callbacks. |
| size_t | scheduleReadyNodes(const std::vector< NodeHandle > & nodes) Batch scheduling for multiple ready nodes. |
| bool | scheduleNode(const NodeHandle & node) Attempts to schedule a node, deferring if necessary. |
| void | resetStats() Clears all statistics counters back to zero. |
| size_t | processTimedDeferredNodes(size_t maxToSchedule =0) Checks timed deferrals and schedules nodes whose wake time has passed. |
| size_t | processDeferredNodes(size_t maxToSchedule =0) Drains the deferred queue into available execution slots. |
| bool | hasCapacity() const Quick check if we can accept more work right now. |
| Stats | getStats() const |
| size_t | getMemoryUsage() const Estimates memory consumption of the scheduler. |
| size_t | getDeferredCount() const Checks how many nodes are waiting in the deferred queue. |
| size_t | getAvailableCapacity() const Gets exact number of free execution slots. |
| bool | deferNodeUntil(const NodeHandle & node, std::chrono::steady_clock::time_point wakeTime) Defers a node until a specific time point. |
| bool | deferNode(const NodeHandle & node) Explicitly defers a node without trying to schedule first. |
| size_t | clearDeferredNodes() Nuclear option: drops all deferred nodes. |
| NodeScheduler(WorkContractGroup * contractGroup, const WorkGraph * graph, std::shared_mutex * graphMutex, Core::EventBus * eventBus =nullptr, const Config & config ={}) Creates a scheduler bridging graph nodes to work execution. |
Detailed Description
Section titled “Detailed Description”class EntropyEngine::Core::Concurrency::NodeScheduler;Manages graph node scheduling and overflow handling for work execution.
NodeScheduler serves as the interface between WorkGraph’s high-level dependency management and WorkContractGroup’s low-level execution system. When WorkGraph determines nodes are ready for execution, this scheduler manages their transition into the work queue. The scheduler handles capacity constraints by maintaining a deferred queue for nodes that cannot be immediately scheduled due to queue limitations.
This component bridges the abstraction gap between graph-based task management and queue-based execution. It addresses the practical constraint that ready nodes may exceed available execution slots, providing buffering and advanced scheduling infrastructure.
Key responsibilities:
- Immediate scheduling when there’s capacity
- Deferred queue management when work queue is full
- Batch scheduling (schedule multiple nodes in one go)
- Lifecycle callbacks for monitoring and debugging
- Statistics tracking for analysis
Complexity characteristics:
- Deferred queue operations: O(1) push_back, O(1) pop_front
Suitable applications:
- Task graphs that might generate bursts of ready nodes
- Systems where work generation can outpace execution
- Monitoring node execution lifecycle for debugging
- Building priority scheduling on top (future enhancement)
Design trade-offs:
- Uses mutex for thread safety (not lock-free like WorkContractGroup)
- Deque for deferred queue (good cache locality, no allocations until overflow)
- Separate stats tracking to avoid polluting hot path
// Basic usage with a WorkGraphWorkContractGroup contractGroup(1024);WorkGraph graph;NodeScheduler scheduler(&contractGroup, &graph);
// Set up lifecycle monitoringNodeScheduler::Callbacks callbacks;callbacks.onNodeScheduled = [](NodeHandle node) { LOG_DEBUG("Node {} scheduled", node.index());};callbacks.onNodeDeferred = [](NodeHandle node) { LOG_WARN("Node {} deferred (queue full)", node.index());};scheduler.setCallbacks(callbacks);
// Schedule nodes as they become readyif (!scheduler.scheduleNode(readyNode)) { // Node was deferred, will be scheduled when capacity available}
// Process deferred nodes when work completessize_t scheduled = scheduler.processDeferredNodes();Public Functions Documentation
Section titled “Public Functions Documentation”function ~NodeScheduler
Section titled “function ~NodeScheduler”inline ~NodeScheduler()function setCallbacks
Section titled “function setCallbacks”inline void setCallbacks( const Callbacks & callbacks)Installs lifecycle monitoring callbacks.
Parameters:
- callbacks Structure with optional callback functions
Set before scheduling for lifecycle tracking. Not thread-safe with active scheduling - set during init.
NodeScheduler::Callbacks callbacks;callbacks.onNodeDeferred = [&deferredCount](NodeHandle) { deferredCount++;};callbacks.onNodeDropped = [](NodeHandle node) { LOG_ERROR("Critical: Node {} dropped!", node.index());};scheduler.setCallbacks(callbacks);function scheduleReadyNodes
Section titled “function scheduleReadyNodes”size_t scheduleReadyNodes( const std::vector< NodeHandle > & nodes)Batch scheduling for multiple ready nodes.
Parameters:
- nodes Vector of nodes ready for execution
Return: Number scheduled immediately (rest deferred/dropped)
Handles multiple nodes efficiently. Schedules what fits, defers rest. Preserves order.
// After a node completes, schedule all its dependentsstd::vector<NodeHandle> readyDependents = getDependents(completed);size_t scheduled = scheduler.scheduleReadyNodes(readyDependents);LOG_DEBUG("Scheduled {}/{} dependent nodes", scheduled, readyDependents.size());function scheduleNode
Section titled “function scheduleNode”bool scheduleNode( const NodeHandle & node)Attempts to schedule a node, deferring if necessary.
Parameters:
- node Handle to the node that’s ready to execute
Return: true if scheduled immediately, false if deferred or dropped
Main entry point for execution. Tries immediate scheduling, defers if full, drops if deferred queue full. Thread-safe.
// In your graph's "node became ready" logicif (scheduler.scheduleNode(readyNode)) { // Great, it's in the work queue} else { // Deferred or dropped - check stats to see which auto stats = scheduler.getStats(); if (stats.nodesDropped > 0) { // Houston, we have a problem }}function resetStats
Section titled “function resetStats”inline void resetStats()Clears all statistics counters back to zero.
For benchmarking or resetting after warmup. Only counters, not queue.
scheduler.resetStats();runBenchmark();auto stats = scheduler.getStats();LOG_INFO("Benchmark: {} scheduled, {} deferred", stats.nodesScheduled, stats.nodesDeferred);function processTimedDeferredNodes
Section titled “function processTimedDeferredNodes”size_t processTimedDeferredNodes( size_t maxToSchedule =0)Checks timed deferrals and schedules nodes whose wake time has passed.
Parameters:
- maxToSchedule Maximum nodes to schedule (0 = all ready nodes)
Return: Number of timed nodes actually scheduled
Examines the timed deferred queue and moves all nodes whose scheduled wake time has arrived into execution. This is called opportunistically during capacity callbacks and main thread pumping - no dedicated timer thread.
// Called during capacity callbacksvoid onCapacityAvailable() { // Check if any timers are ready size_t scheduled = scheduler.processTimedDeferredNodes(); if (scheduled > 0) { LOG_DEBUG("Woke {} timed nodes", scheduled); }}function processDeferredNodes
Section titled “function processDeferredNodes”size_t processDeferredNodes( size_t maxToSchedule =0)Drains the deferred queue into available execution slots.
Parameters:
- maxToSchedule How many to schedule max (0 = all possible)
Return: Number of nodes actually scheduled
Pulls nodes from deferred queue (FIFO) until empty or out of capacity. Use maxToSchedule to leave room for new work.
// After work completes, process waiting nodesvoid onWorkCompleted() { size_t scheduled = scheduler.processDeferredNodes(); if (scheduled > 0) { LOG_DEBUG("Scheduled {} deferred nodes", scheduled); }}
// Or limit to leave room for new worksize_t availableSlots = scheduler.getAvailableCapacity();scheduler.processDeferredNodes(availableSlots / 2); // Use half for deferredfunction hasCapacity
Section titled “function hasCapacity”inline bool hasCapacity() constQuick check if we can accept more work right now.
Return: true if work queue has free slots, false if full
Snapshot that may be stale. Use for optimization hints, not critical logic.
// Use for early-out optimizationif (!scheduler.hasCapacity()) { // Don't bother trying to schedule a bunch of nodes return;}function getStats
Section titled “function getStats”inline Stats getStats() constfunction getMemoryUsage
Section titled “function getMemoryUsage”inline size_t getMemoryUsage() constEstimates memory consumption of the scheduler.
Return: Approximate bytes used by this scheduler instance
Includes object + deferred queue. Conservative estimate.
// Check memory pressureif (scheduler.getMemoryUsage() > 1024 * 1024) { // 1MB LOG_WARN("Scheduler using {}KB of memory", scheduler.getMemoryUsage() / 1024);}function getDeferredCount
Section titled “function getDeferredCount”inline size_t getDeferredCount() constChecks how many nodes are waiting in the deferred queue.
Return: Current number of nodes waiting for execution capacity
Monitor system pressure. Thread-safe but takes lock - avoid tight loops.
// Monitor for queue buildupsize_t deferred = scheduler.getDeferredCount();if (deferred > 50) { LOG_WARN("High scheduling pressure: {} nodes waiting", deferred);}function getAvailableCapacity
Section titled “function getAvailableCapacity”inline size_t getAvailableCapacity() constGets exact number of free execution slots.
Return: Number of nodes that could be scheduled immediately
More precise than hasCapacity(). Snapshot that might be stale.
// Batch scheduling optimizationsize_t capacity = scheduler.getAvailableCapacity();size_t toSchedule = std::min(capacity, readyNodes.size());for (size_t i = 0; i < toSchedule; ++i) { scheduler.scheduleNode(readyNodes[i]);}function deferNodeUntil
Section titled “function deferNodeUntil”bool deferNodeUntil( const NodeHandle & node, std::chrono::steady_clock::time_point wakeTime)Defers a node until a specific time point.
Parameters:
- node The node to defer
- wakeTime When the node should be reconsidered for scheduling
Return: true if successfully queued
Instead of immediate rescheduling, the node sleeps in a priority queue until the specified wake time. No CPU usage, no thread blocking - just passive waiting. Used by timers and delayed work.
// Timer that fires in 5 secondsauto wakeTime = std::chrono::steady_clock::now() + std::chrono::seconds(5);scheduler.deferNodeUntil(timerNode, wakeTime);// Node sits in queue consuming zero CPU until wakeTime arrivesfunction deferNode
Section titled “function deferNode”bool deferNode( const NodeHandle & node)Explicitly defers a node without trying to schedule first.
Parameters:
- node The node to add to deferred queue
Return: true if queued successfully, false if deferred queue is full
Bypasses capacity check, goes straight to deferred queue. Useful for batch operations when you know there’s no capacity.
// When you know the work queue is fullif (!scheduler.hasCapacity()) { // Don't even try to schedule, just defer for (auto& node : readyNodes) { if (!scheduler.deferNode(node)) { LOG_ERROR("Deferred queue full!"); break; } }}function clearDeferredNodes
Section titled “function clearDeferredNodes”inline size_t clearDeferredNodes()Nuclear option: drops all deferred nodes.
Return: How many nodes were dropped from the queue
For aborting pending work. Nodes are lost - no execution or callbacks.
// Emergency abortsize_t dropped = scheduler.clearDeferredNodes();if (dropped > 0) { LOG_WARN("Dropped {} pending nodes during abort", dropped);}function NodeScheduler
Section titled “function NodeScheduler”inline NodeScheduler( WorkContractGroup * contractGroup, const WorkGraph * graph, std::shared_mutex * graphMutex, Core::EventBus * eventBus =nullptr, const Config & config ={})Creates a scheduler bridging graph nodes to work execution.
Parameters:
- contractGroup Where to schedule work (must outlive scheduler)
- graph The graph whose nodes we’re scheduling (must outlive scheduler)
- eventBus Optional event system (can be nullptr)
- config Tuning parameters
Sets up scheduling for a WorkGraph. Doesn’t own graph or contract group.
// Typical setup in a WorkGraphauto contractGroup = std::make_unique<WorkContractGroup>(1024);auto scheduler = std::make_unique<NodeScheduler>( contractGroup.get(), this, // WorkGraph passes itself _eventBus, NodeScheduler::Config{.maxDeferredNodes = 200});Updated on 2026-01-26 at 16:50:32 -0500