Task Manager
The Task Manager is the orchestration layer for all MRPF scan workflows. It coordinates task lifecycle, fan-out/fan-in execution, scheduling, and worker dispatch. It runs as a stateless AWS Lambda triggered by SQS messages and scheduled events.
Core Concepts
Manager – Stateless orchestrator holding a DbClient and QueueClient. Initialized once during Lambda cold start and reused across invocations. Processes incoming SQS messages through a carefully ordered pipeline. Holds no mutable state between invocations.
TaskCollection – A complete workflow definition wrapping a root TaskContainer. Serialized as JSON for SQS transport. The collection’s state is derived from its root container’s state.
TaskContainer – Tree node with Sequential or Parallel execution mode. Children are either nested TaskContainers or Tasks. Child ordering matters in sequential mode. Container state is computed from its children’s states (not stored directly).
Task – Individual work unit. Holds a TaskKind (the definition/payload), TaskState, CompletionPolicy, and a timeout. Task IDs are UUIDv7, generated in code (not by the database) to support idempotent at-least-once SQS delivery.
TaskKind – Enum of all concrete task types: generators (TcpSynGenerator, DnsScanGenerator, TlsScanGenerator), workers (TcpSyn, DnsScan, TlsScan), aggregators (TcpSynAggregator, DnsScanAggregator, TlsScanAggregator), data tasks (InsertData, ListModels, UpsertModels, Filter variants, Conditional), and notifications (ErrorNotification, GenericNotification, TaskResultNotification).
Task State Lifecycle
Pending -> Running -> Succeeded
-> Failed
-> Timeout
-> PendingTaskCreation -> Succeeded (when all child tasks created)
-> PartiallyFailed
Terminal states: Succeeded, Failed, Timeout, PartiallyFailed, Disabled
- Pending – Not yet scheduled.
- Running – Dispatched to a worker and actively executing.
- PendingTaskCreation – Exclusive to generator tasks. The generator has finished producing
CreateTaskmessages but is waiting for the Manager to confirm all child tasks have been inserted (viacreated_task_idsmatchingexpected_task_count). - PartiallyFailed – Applied to containers when some (but not all) children failed. Also set when tolerated failures exist.
- Disabled – Skipped entirely. Disabled children do not count toward container state.
- Timeout – The task exceeded its
timeout_secswhile in Running or PendingTaskCreation state.
Manager Pipeline
The Manager::run method processes messages in a fixed order. The ordering prevents race conditions and ensures each step operates on the most current state.
- parse_messages – Categorize incoming SQS messages into three buckets:
CreateTaskCollection,CreateTask, andCompleteTask. - process_task_timeouts – Query for running/pending-creation tasks that have exceeded their timeout. Set them to
Timeout. Returns affected root container IDs. - create_task_collections – Insert new task collections into the database (containers first, then tasks, in a single transaction with
READ COMMITTEDisolation). - create_tasks_from_generators – Insert tasks produced by generator workers. Updates the generator’s
created_task_idsarray. Looks up the generator’s parent container if no explicitparent_idis provided. - process_task_completions – Apply
CompleteTaskmessages: update task state in the database (withFOR UPDATErow lock), optionally store task data. Always adds the root container to the re-evaluation set, even on error. - complete_generator_tasks – Find all tasks in
PendingTaskCreationstate wherecreated_task_idslength equalsexpected_task_count. Transition them toSucceeded. Uses a recursive CTE to resolve root container IDs. - schedule_next_tasks – For each root container that had activity, reconstruct the full
TaskCollectionfrom the database (usingREPEATABLE READisolation), walk the tree to find pending tasks, set them toRunning, and dispatch to the appropriate worker queue. If bare metal tasks are dispatched, launch EC2 instances. - finalize_task_collections – Delete completed collections from
task_managertables (CASCADE handles descendants). Updaterecon.job_historywith final state and completion timestamp. - start_due_task_collections – Query
recon.jobsfor eligible jobs (not running, recurrence not 0) usingFOR UPDATE ... SKIP LOCKED. Evaluate cron schedules. Insertjob_historyrows, decrement recurrence, and sendCreateTaskCollectionmessages back to the Manager queue.
Fan-out/Fan-in Pattern
Distributed tasks follow a three-stage pattern within a sequential container:
-
Generator (e.g.,
TcpSynGenerator) – Reads input parameters, partitions work, and produces N worker tasks plus 1 aggregator task. SendsCreateTaskmessages to the Manager queue. Transitions toPendingTaskCreationwith anexpected_task_count. -
Worker tasks (e.g.,
TcpSyn) – Execute in parallel within a parallel container. Each stores intermediate results intask_manager.task_data. -
Aggregator (e.g.,
TcpSynAggregator) – Runs after all worker tasks complete. Reads results fromtask_data/task_collection_data, combines them, and writes final output.
The sequential container ensures the aggregator waits for all parallel workers to reach a terminal state before starting.
CompletionPolicy
Each task carries a CompletionPolicy that controls sequential container behavior on failure:
- FailOnFailure (default) – A failed task stops the sequential container. The container propagates the failure.
- ContinueOnPartialFailure – Continue if the task partially fails (some sub-tasks failed).
- ContinueOnFailure – Skip the failed task and proceed to the next child. The failure is tolerated but tracked: the container enters
PartiallyFailedat completion if any tolerated failures occurred.
Concurrency Safety
- The Manager is the single point of coordination. Workers never modify task state directly in the database.
- Workers send
CompleteTaskmessages via SQS. The Manager applies state transitions. - Task state updates use
SELECT ... FOR UPDATErow locks to prevent concurrent modifications across Lambda invocations. start_due_jobsusesFOR UPDATE ... SKIP LOCKEDso concurrent Manager invocations do not double-start the same job.- Task collection reads use
REPEATABLE READ, READ ONLYtransaction isolation for a consistent snapshot during tree assembly. - Task collection writes use
READ COMMITTED, READ WRITEisolation.
SQS Message Types
TaskManagerQueueMessage (Manager queue):
CreateTaskCollection { collection }– Full workflow definition to insert and start.CreateTask { task, generator_id, parent_id }– New task to insert, generated by a running generator.CompleteTask { task_id, state, root_container_id, data, expected_task_count }– Worker completion notification.
WorkerQueueMessage (Worker queues):
StartTask { root_container_id, task }– Dispatched by the Manager to a worker queue.