Skip to content

Customize the Pipeline

The default pipeline is three stages: AssignPossibleEntitiesToAllocations, RunPreliminaryRules, RunAlgorithmAndDispatchResults. Replace the whole sequence by passing stages= to BeeKeeper:

from beekeeper import BeeKeeper
from beekeeper.flow.flow_stages.assign_possible_entities_to_allocations import (
    AssignPossibleEntitiesToAllocations,
)
from beekeeper.flow.flow_stages.run_preliminary_rules import RunPreliminaryRules
from beekeeper.flow.flow_stages.run_algorithm_and_dispatch_results import (
    RunAlgorithmAndDispatchResults,
)
from my_app.stages import LogCandidatesStage


bk = BeeKeeper[MyWorker, MyRequest](
    input_adapter=...,
    stages=[
        AssignPossibleEntitiesToAllocations[MyWorker, MyRequest](),
        LogCandidatesStage(),  # custom stage you wrote
        RunPreliminaryRules[MyWorker, MyRequest](),
        RunAlgorithmAndDispatchResults(algorithms=[...], output_adapters=[...]),
    ],
)

When you supply stages=, you own the wiring entirely. The algorithm and output_adapters kwargs at the BeeKeeper level become unavailable on the stages= overload — RunAlgorithmAndDispatchResults takes its algorithms= chain and output_adapters= directly in its own constructor, as in the snippet above.

Writing a custom stage

from beekeeper import AnyEntity, AnyRequest
from beekeeper.flow.beekeeper_flow_state import BeeKeeperFlowState
from beekeeper.flow.flow_stages.base_beekeeper_flow_stage import BaseBeeKeeperFlowStage


class LogCandidatesStage[TEntity: AnyEntity, TAllocReq: AnyRequest](
    BaseBeeKeeperFlowStage[TEntity, TAllocReq],
):
    def run_stage(self, state: BeeKeeperFlowState[TEntity, TAllocReq]):
        for alloc_id, candidates in state.candidate_map.items():
            print(f"alloc {alloc_id}: {len(candidates)} candidates")
        return state

Each stage takes the in-flight BeeKeeperFlowState and returns it (typically the same object, mutated). The state carries entities, allocations, preliminary_rules, stateful_rules, and the per-stage-1-built candidate_map.