Skip to content

Ethereum Test Fixtures package

Ethereum test fixture format definitions.

BaseFixture

Bases: CamelModel

Represents a base Ethereum test fixture of any type.

Source code in packages/testing/src/execution_testing/fixtures/base.py
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
class BaseFixture(CamelModel):
    """Represents a base Ethereum test fixture of any type."""

    # Base Fixture class properties
    formats: ClassVar[Dict[str, Type["BaseFixture"]]] = {}
    formats_type_adapter: ClassVar[TypeAdapter]

    info: Dict[str, Dict[str, Any] | str] = Field(
        default_factory=dict, alias="_info"
    )
    post_verifications: PostVerifications | None = Field(
        default=None, alias="postVerifications"
    )

    # Fixture format properties
    format_name: ClassVar[str] = ""
    output_file_extension: ClassVar[str] = ".json"
    description: ClassVar[str] = "Unknown fixture format; it has not been set."
    format_phases: ClassVar[Set[FixtureFillingPhase]] = {
        FixtureFillingPhase.FILL
    }
    transition_tool_cache_key: ClassVar[str] = ""

    @classmethod
    def output_base_dir_name(cls) -> str:
        """
        Return name of the subdirectory where this type of fixture should be
        dumped to.
        """
        return cls.format_name.replace("test", "tests")

    @classmethod
    def __pydantic_init_subclass__(cls, **kwargs: Any) -> None:
        """
        Register all subclasses of BaseFixture with a fixture format name set
        as possible fixture formats.
        """
        if cls.format_name:
            # Register the new fixture format
            BaseFixture.formats[cls.format_name] = cls
            if len(BaseFixture.formats) > 1:
                BaseFixture.formats_type_adapter = TypeAdapter(
                    Annotated[
                        Union[
                            tuple(
                                Annotated[fixture_format, Tag(format_name)]
                                for (
                                    format_name,
                                    fixture_format,
                                ) in BaseFixture.formats.items()
                            )
                        ],
                        Discriminator(fixture_format_discriminator),
                    ]
                )
            else:
                BaseFixture.formats_type_adapter = TypeAdapter(cls)

    @model_validator(mode="wrap")
    @classmethod
    def _parse_into_subclass(
        cls, v: Any, handler: ValidatorFunctionWrapHandler
    ) -> "BaseFixture":
        """Parse the fixture into the correct subclass."""
        if cls is BaseFixture:
            return BaseFixture.formats_type_adapter.validate_python(v)
        return handler(v)

    @cached_property
    def json_dict(self) -> Dict[str, Any]:
        """Returns the JSON representation of the fixture."""
        return self.model_dump(
            mode="json", by_alias=True, exclude_none=True, exclude={"info"}
        )

    @cached_property
    def hash(self) -> str:
        """Returns the hash of the fixture."""
        json_str = json.dumps(
            self.json_dict, sort_keys=True, separators=(",", ":")
        )
        h = hashlib.sha256(json_str.encode("utf-8")).hexdigest()
        return f"0x{h}"

    def json_dict_with_info(self, hash_only: bool = False) -> Dict[str, Any]:
        """Return JSON representation of the fixture with the info field."""
        dict_with_info = self.json_dict.copy()
        dict_with_info["_info"] = {"hash": self.hash}
        if not hash_only:
            dict_with_info["_info"].update(self.info)
        return dict_with_info

    def model_post_init(self, __context: Any, /) -> None:
        """
        Model post-init to assert that the custom pre-allocation was
        provided and the default was not used.
        """
        super().model_post_init(__context)
        self.info["fixture-format"] = self.format_name

    def fill_info(
        self,
        t8n_version: str,
        test_case_description: str,
        fixture_source_url: str,
        ref_spec: ReferenceSpec | None,
        _info_metadata: Dict[str, Any] | None,
        metadata: Dict[str, Any] | None = None,
    ) -> None:
        """Fill the info field for this fixture."""
        if "comment" not in self.info:
            self.info["comment"] = "`execution-specs` generated test"
        self.info["filling-transition-tool"] = t8n_version
        self.info["description"] = test_case_description
        self.info["url"] = fixture_source_url
        if metadata:
            self.info["metadata"] = metadata
        if ref_spec is not None:
            ref_spec.write_info(self.info)
        if _info_metadata:
            self.info.update(_info_metadata)

    def get_fork(self) -> Fork | TransitionFork | None:
        """Return fork of the fixture as a string."""
        raise NotImplementedError

    @classmethod
    def supports_fork(cls, fork: Fork | TransitionFork) -> bool:
        """
        Return whether the fixture can be generated for the given fork.

        By default, all fixtures support all forks.
        """
        del fork
        return True

    @classmethod
    def discard_fixture_format_by_marks(
        cls,
        fork: Fork | TransitionFork,
        markers: List[pytest.Mark],
    ) -> bool:
        """
        Discard a fixture format from filling if the appropriate marker is
        used.
        """
        del fork, markers
        return False

output_base_dir_name() classmethod

Return name of the subdirectory where this type of fixture should be dumped to.

Source code in packages/testing/src/execution_testing/fixtures/base.py
90
91
92
93
94
95
96
@classmethod
def output_base_dir_name(cls) -> str:
    """
    Return name of the subdirectory where this type of fixture should be
    dumped to.
    """
    return cls.format_name.replace("test", "tests")

__pydantic_init_subclass__(**kwargs) classmethod

Register all subclasses of BaseFixture with a fixture format name set as possible fixture formats.

Source code in packages/testing/src/execution_testing/fixtures/base.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
@classmethod
def __pydantic_init_subclass__(cls, **kwargs: Any) -> None:
    """
    Register all subclasses of BaseFixture with a fixture format name set
    as possible fixture formats.
    """
    if cls.format_name:
        # Register the new fixture format
        BaseFixture.formats[cls.format_name] = cls
        if len(BaseFixture.formats) > 1:
            BaseFixture.formats_type_adapter = TypeAdapter(
                Annotated[
                    Union[
                        tuple(
                            Annotated[fixture_format, Tag(format_name)]
                            for (
                                format_name,
                                fixture_format,
                            ) in BaseFixture.formats.items()
                        )
                    ],
                    Discriminator(fixture_format_discriminator),
                ]
            )
        else:
            BaseFixture.formats_type_adapter = TypeAdapter(cls)

json_dict cached property

Returns the JSON representation of the fixture.

hash cached property

Returns the hash of the fixture.

json_dict_with_info(hash_only=False)

Return JSON representation of the fixture with the info field.

Source code in packages/testing/src/execution_testing/fixtures/base.py
151
152
153
154
155
156
157
def json_dict_with_info(self, hash_only: bool = False) -> Dict[str, Any]:
    """Return JSON representation of the fixture with the info field."""
    dict_with_info = self.json_dict.copy()
    dict_with_info["_info"] = {"hash": self.hash}
    if not hash_only:
        dict_with_info["_info"].update(self.info)
    return dict_with_info

model_post_init(__context)

Model post-init to assert that the custom pre-allocation was provided and the default was not used.

Source code in packages/testing/src/execution_testing/fixtures/base.py
159
160
161
162
163
164
165
def model_post_init(self, __context: Any, /) -> None:
    """
    Model post-init to assert that the custom pre-allocation was
    provided and the default was not used.
    """
    super().model_post_init(__context)
    self.info["fixture-format"] = self.format_name

fill_info(t8n_version, test_case_description, fixture_source_url, ref_spec, _info_metadata, metadata=None)

Fill the info field for this fixture.

Source code in packages/testing/src/execution_testing/fixtures/base.py
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
def fill_info(
    self,
    t8n_version: str,
    test_case_description: str,
    fixture_source_url: str,
    ref_spec: ReferenceSpec | None,
    _info_metadata: Dict[str, Any] | None,
    metadata: Dict[str, Any] | None = None,
) -> None:
    """Fill the info field for this fixture."""
    if "comment" not in self.info:
        self.info["comment"] = "`execution-specs` generated test"
    self.info["filling-transition-tool"] = t8n_version
    self.info["description"] = test_case_description
    self.info["url"] = fixture_source_url
    if metadata:
        self.info["metadata"] = metadata
    if ref_spec is not None:
        ref_spec.write_info(self.info)
    if _info_metadata:
        self.info.update(_info_metadata)

get_fork()

Return fork of the fixture as a string.

Source code in packages/testing/src/execution_testing/fixtures/base.py
189
190
191
def get_fork(self) -> Fork | TransitionFork | None:
    """Return fork of the fixture as a string."""
    raise NotImplementedError

supports_fork(fork) classmethod

Return whether the fixture can be generated for the given fork.

By default, all fixtures support all forks.

Source code in packages/testing/src/execution_testing/fixtures/base.py
193
194
195
196
197
198
199
200
201
@classmethod
def supports_fork(cls, fork: Fork | TransitionFork) -> bool:
    """
    Return whether the fixture can be generated for the given fork.

    By default, all fixtures support all forks.
    """
    del fork
    return True

discard_fixture_format_by_marks(fork, markers) classmethod

Discard a fixture format from filling if the appropriate marker is used.

Source code in packages/testing/src/execution_testing/fixtures/base.py
203
204
205
206
207
208
209
210
211
212
213
214
@classmethod
def discard_fixture_format_by_marks(
    cls,
    fork: Fork | TransitionFork,
    markers: List[pytest.Mark],
) -> bool:
    """
    Discard a fixture format from filling if the appropriate marker is
    used.
    """
    del fork, markers
    return False

FixtureFillingPhase

Bases: Enum

Execution phase for fixture generation.

Source code in packages/testing/src/execution_testing/fixtures/base.py
60
61
62
63
64
class FixtureFillingPhase(Enum):
    """Execution phase for fixture generation."""

    PRE_ALLOC_GENERATION = auto()
    FILL = auto()

LabeledFixtureFormat

Represents a fixture format with a custom label.

This label will be used in the test id and also will be added as a marker to the generated test case when filling the test.

Source code in packages/testing/src/execution_testing/fixtures/base.py
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
class LabeledFixtureFormat:
    """
    Represents a fixture format with a custom label.

    This label will be used in the test id and also will be added as a marker
    to the generated test case when filling the test.
    """

    format: Type[BaseFixture]
    label: str
    description: str

    registered_labels: ClassVar[Dict[str, "LabeledFixtureFormat"]] = {}

    def __init__(
        self,
        fixture_format: "Type[BaseFixture] | LabeledFixtureFormat",
        label: str,
        description: str,
    ):
        """Initialize the fixture format with a custom label."""
        self.format = (
            fixture_format.format
            if isinstance(fixture_format, LabeledFixtureFormat)
            else fixture_format
        )
        self.label = label
        self.description = description
        if label not in LabeledFixtureFormat.registered_labels:
            LabeledFixtureFormat.registered_labels[label] = self

    @property
    def format_name(self) -> str:
        """Get the filling format name."""
        return self.format.format_name

    @property
    def format_phases(self) -> Set[FixtureFillingPhase]:
        """Get the filling format phases where it should be included."""
        return self.format.format_phases

    @property
    def transition_tool_cache_key(self) -> str:
        """Get the transition tool cache key."""
        return self.format.transition_tool_cache_key

    def __eq__(self, other: Any) -> bool:
        """
        Check if two labeled fixture formats are equal.

        If the other object is a FixtureFormat type, the format of the labeled
        fixture format will be compared with the format of the other object.
        """
        if isinstance(other, LabeledFixtureFormat):
            return self.format == other.format
        if isinstance(other, type) and issubclass(other, BaseFixture):
            return self.format == other
        return False

__init__(fixture_format, label, description)

Initialize the fixture format with a custom label.

Source code in packages/testing/src/execution_testing/fixtures/base.py
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
def __init__(
    self,
    fixture_format: "Type[BaseFixture] | LabeledFixtureFormat",
    label: str,
    description: str,
):
    """Initialize the fixture format with a custom label."""
    self.format = (
        fixture_format.format
        if isinstance(fixture_format, LabeledFixtureFormat)
        else fixture_format
    )
    self.label = label
    self.description = description
    if label not in LabeledFixtureFormat.registered_labels:
        LabeledFixtureFormat.registered_labels[label] = self

format_name property

Get the filling format name.

format_phases property

Get the filling format phases where it should be included.

transition_tool_cache_key property

Get the transition tool cache key.

__eq__(other)

Check if two labeled fixture formats are equal.

If the other object is a FixtureFormat type, the format of the labeled fixture format will be compared with the format of the other object.

Source code in packages/testing/src/execution_testing/fixtures/base.py
263
264
265
266
267
268
269
270
271
272
273
274
def __eq__(self, other: Any) -> bool:
    """
    Check if two labeled fixture formats are equal.

    If the other object is a FixtureFormat type, the format of the labeled
    fixture format will be compared with the format of the other object.
    """
    if isinstance(other, LabeledFixtureFormat):
        return self.format == other.format
    if isinstance(other, type) and issubclass(other, BaseFixture):
        return self.format == other
    return False

strip_fixture_format_from_node(item)

Remove fixture format suffix from a test nodeid.

Used for cache keys and xdist grouping to ensure related fixture formats (e.g., blockchain_test and blockchain_test_engine) share the same key.

Example

'test.py::test[fork_Osaka-state_test]' -> 'test.py::test[fork_Osaka]'

Source code in packages/testing/src/execution_testing/fixtures/base.py
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
def strip_fixture_format_from_node(
    item: PytestItemProtocol,
) -> str:
    """
    Remove fixture format suffix from a test nodeid.

    Used for cache keys and xdist grouping to ensure related fixture formats
    (e.g., blockchain_test and blockchain_test_engine) share the same key.

    Example:
        'test.py::test[fork_Osaka-state_test]' -> 'test.py::test[fork_Osaka]'

    """
    fixture_format_id_marker = item.get_closest_marker("fixture_format_id")
    nodeid = item.nodeid
    if fixture_format_id_marker is None:
        return nodeid
    assert len(fixture_format_id_marker.args) == 1
    fixture_id = fixture_format_id_marker.args[0]
    if fixture_id not in nodeid:
        return nodeid
    return nodeid.replace(fixture_id, "")

BlockchainEngineFixture

Bases: BlockchainEngineFixtureCommon

Engine specific test fixture information.

Source code in packages/testing/src/execution_testing/fixtures/blockchain.py
738
739
740
741
742
743
744
745
746
747
748
749
750
751
class BlockchainEngineFixture(BlockchainEngineFixtureCommon):
    """Engine specific test fixture information."""

    format_name: ClassVar[str] = "blockchain_test_engine"
    description: ClassVar[str] = (
        "Tests that generate a blockchain test fixture in Engine API format."
    )
    pre: Alloc
    genesis: FixtureHeader = Field(..., alias="genesisBlockHeader")
    post_state: Alloc | None = Field(None)
    payloads: List[FixtureEngineNewPayload] = Field(
        ..., alias="engineNewPayloads"
    )
    transition_tool_cache_key: ClassVar[str] = "blockchain_test"

BlockchainEngineFixtureCommon

Bases: BaseFixture

Base blockchain test fixture model for Engine API based execution.

Similar to BlockchainFixtureCommon but excludes the 'pre' field to avoid duplicating large pre-allocations.

Source code in packages/testing/src/execution_testing/fixtures/blockchain.py
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
@post_state_validator()
class BlockchainEngineFixtureCommon(BaseFixture):
    """
    Base blockchain test fixture model for Engine API based execution.

    Similar to BlockchainFixtureCommon but excludes the 'pre' field to avoid
    duplicating large pre-allocations.
    """

    fork: Fork | TransitionFork = Field(..., alias="network")
    post_state_hash: Hash | None = Field(None)
    # FIXME: lastBlockHash
    last_block_hash: Hash = Field(..., alias="lastblockhash")
    config: FixtureConfig

    def get_fork(self) -> Fork | TransitionFork | None:
        """Return fixture's `Fork`."""
        return self.fork

    @classmethod
    def supports_fork(cls, fork: Fork | TransitionFork) -> bool:
        """
        Return whether the fixture can be generated for the given fork.

        The Engine API is available only on Paris and afterwards.
        """
        return fork.fork_at(block_number=0, timestamp=0) >= Paris

get_fork()

Return fixture's Fork.

Source code in packages/testing/src/execution_testing/fixtures/blockchain.py
724
725
726
def get_fork(self) -> Fork | TransitionFork | None:
    """Return fixture's `Fork`."""
    return self.fork

supports_fork(fork) classmethod

Return whether the fixture can be generated for the given fork.

The Engine API is available only on Paris and afterwards.

Source code in packages/testing/src/execution_testing/fixtures/blockchain.py
728
729
730
731
732
733
734
735
@classmethod
def supports_fork(cls, fork: Fork | TransitionFork) -> bool:
    """
    Return whether the fixture can be generated for the given fork.

    The Engine API is available only on Paris and afterwards.
    """
    return fork.fork_at(block_number=0, timestamp=0) >= Paris

BlockchainEngineStatefulFixture

Bases: BlockchainEngineFixtureCommon

Engine fixture for snapshot-based stateful testing.

Instead of embedding pre-allocation or referencing a computed group, this fixture references an external snapshot that the consumer must pre-load. Setup payloads deploy contracts and seed accounts on top of the snapshot before the actual test payloads execute.

Source code in packages/testing/src/execution_testing/fixtures/blockchain.py
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
class BlockchainEngineStatefulFixture(BlockchainEngineFixtureCommon):
    """
    Engine fixture for snapshot-based stateful testing.

    Instead of embedding pre-allocation or referencing a computed group,
    this fixture references an external snapshot that the consumer must
    pre-load. Setup payloads deploy contracts and seed accounts on top
    of the snapshot before the actual test payloads execute.
    """

    model_config = CamelModel.model_config | {"extra": "ignore"}

    format_name: ClassVar[str] = "blockchain_test_stateful_engine"
    description: ClassVar[str] = (
        "Tests that generate a Blockchain Test fixture for "
        "snapshot-based stateful Engine API testing."
    )
    format_phases: ClassVar[Set[FixtureFillingPhase]] = {
        FixtureFillingPhase.FILL,
        FixtureFillingPhase.PRE_ALLOC_GENERATION,
    }

    snapshot_block_number: HexNumber
    snapshot_block_hash: Hash

    setup_payloads: List[FixtureEngineNewPayload] = Field(
        ..., alias="setupEngineNewPayloads"
    )
    payloads: List[FixtureEngineNewPayload] = Field(
        ..., alias="engineNewPayloads"
    )

BlockchainEngineSyncFixture

Bases: BlockchainEngineFixture

Engine Sync specific test fixture information.

This fixture format is specifically designed for sync testing where: - The client under test receives all payloads - A sync client attempts to sync from the client under test - Both client types are parametrized from hive client config

Source code in packages/testing/src/execution_testing/fixtures/blockchain.py
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
class BlockchainEngineSyncFixture(BlockchainEngineFixture):
    """
    Engine Sync specific test fixture information.

    This fixture format is specifically designed for sync testing where:
    - The client under test receives all payloads
    - A sync client attempts to sync from the client under test
    - Both client types are parametrized from hive client config
    """

    format_name: ClassVar[str] = "blockchain_test_sync"
    description: ClassVar[str] = (
        "Tests that generate a blockchain test fixture for Engine API "
        "testing with client sync."
    )
    transition_tool_cache_key: ClassVar[str] = ""
    sync_payload: FixtureEngineNewPayload | None = None

    @classmethod
    def discard_fixture_format_by_marks(
        cls,
        fork: Fork | TransitionFork,
        markers: List[pytest.Mark],
    ) -> bool:
        """Discard the fixture format based on the provided markers."""
        del fork
        marker_names = [m.name for m in markers]
        return "verify_sync" not in marker_names

discard_fixture_format_by_marks(fork, markers) classmethod

Discard the fixture format based on the provided markers.

Source code in packages/testing/src/execution_testing/fixtures/blockchain.py
843
844
845
846
847
848
849
850
851
852
@classmethod
def discard_fixture_format_by_marks(
    cls,
    fork: Fork | TransitionFork,
    markers: List[pytest.Mark],
) -> bool:
    """Discard the fixture format based on the provided markers."""
    del fork
    marker_names = [m.name for m in markers]
    return "verify_sync" not in marker_names

BlockchainEngineXFixture

Bases: BlockchainEngineFixtureCommon

Engine X specific test fixture information.

Uses pre-allocation groups (and a single client instance) for efficient test execution without client restarts.

Source code in packages/testing/src/execution_testing/fixtures/blockchain.py
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
@post_state_validator(alternate_field="post_state_diff")
class BlockchainEngineXFixture(BlockchainEngineFixtureCommon):
    """
    Engine X specific test fixture information.

    Uses pre-allocation groups (and a single client instance) for efficient
    test execution without client restarts.
    """

    # Allow extra fields: BlockchainEngineXFixture is constructed from shared
    # fixture_data that has fields for other fixture formats (e.g. genesis).
    model_config = CamelModel.model_config | {"extra": "ignore"}

    format_name: ClassVar[str] = "blockchain_test_engine_x"
    description: ClassVar[str] = (
        "Tests that generate a Blockchain Test Engine X fixture."
    )
    format_phases: ClassVar[Set[FixtureFillingPhase]] = {
        FixtureFillingPhase.FILL,
        FixtureFillingPhase.PRE_ALLOC_GENERATION,
    }
    transition_tool_cache_key: ClassVar[str] = ""

    pre_hash: str
    """Hash of the pre-allocation group this test belongs to."""

    post_state_diff: Alloc | None = None
    """
    State difference from genesis after test execution (efficiency
    optimization).
    """

    payloads: List[FixtureEngineNewPayload] = Field(
        ..., alias="engineNewPayloads"
    )
    """Engine API payloads for blockchain execution."""

pre_hash instance-attribute

Hash of the pre-allocation group this test belongs to.

post_state_diff = None class-attribute instance-attribute

State difference from genesis after test execution (efficiency optimization).

payloads = Field(..., alias='engineNewPayloads') class-attribute instance-attribute

Engine API payloads for blockchain execution.

BlockchainFixture

Bases: BlockchainFixtureCommon

Cross-client specific blockchain test model use in JSON fixtures.

Source code in packages/testing/src/execution_testing/fixtures/blockchain.py
695
696
697
698
699
700
701
702
703
704
705
706
class BlockchainFixture(BlockchainFixtureCommon):
    """Cross-client specific blockchain test model use in JSON fixtures."""

    format_name: ClassVar[str] = "blockchain_test"
    description: ClassVar[str] = (
        "Tests that generate a blockchain test fixture."
    )

    genesis_rlp: Bytes = Field(..., alias="genesisRLP")
    blocks: List[FixtureBlock | InvalidFixtureBlock]
    seal_engine: Literal["NoProof"] = Field("NoProof")
    transition_tool_cache_key: ClassVar[str] = "blockchain_test"

BlockchainFixtureCommon

Bases: BaseFixture

Base blockchain test fixture model.

Source code in packages/testing/src/execution_testing/fixtures/blockchain.py
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
@post_state_validator()
class BlockchainFixtureCommon(BaseFixture):
    """Base blockchain test fixture model."""

    fork: Fork | TransitionFork = Field(..., alias="network")
    genesis: FixtureHeader = Field(..., alias="genesisBlockHeader")
    pre: Alloc
    post_state: Alloc | None = Field(None)
    post_state_hash: Hash | None = Field(None)
    # FIXME: lastBlockHash
    last_block_hash: Hash = Field(..., alias="lastblockhash")
    config: FixtureConfig

    @model_validator(mode="before")
    @classmethod
    def config_defaults_for_backwards_compatibility(cls, data: Any) -> Any:
        """
        Check if the config field is populated, otherwise use the root-level
        field values for backwards compatibility.
        """
        if isinstance(data, dict):
            if "config" not in data:
                data["config"] = {}
            if isinstance(data["config"], dict):
                if "network" not in data["config"]:
                    data["config"]["network"] = data["network"]
                if "chainid" not in data["config"]:
                    data["config"]["chainid"] = "0x01"
        return data

    def get_fork(self) -> Fork | TransitionFork | None:
        """Return fork of the fixture as a string."""
        return self.fork

config_defaults_for_backwards_compatibility(data) classmethod

Check if the config field is populated, otherwise use the root-level field values for backwards compatibility.

Source code in packages/testing/src/execution_testing/fixtures/blockchain.py
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
@model_validator(mode="before")
@classmethod
def config_defaults_for_backwards_compatibility(cls, data: Any) -> Any:
    """
    Check if the config field is populated, otherwise use the root-level
    field values for backwards compatibility.
    """
    if isinstance(data, dict):
        if "config" not in data:
            data["config"] = {}
        if isinstance(data["config"], dict):
            if "network" not in data["config"]:
                data["config"]["network"] = data["network"]
            if "chainid" not in data["config"]:
                data["config"]["chainid"] = "0x01"
    return data

get_fork()

Return fork of the fixture as a string.

Source code in packages/testing/src/execution_testing/fixtures/blockchain.py
690
691
692
def get_fork(self) -> Fork | TransitionFork | None:
    """Return fork of the fixture as a string."""
    return self.fork

FixtureCollector dataclass

Collects all fixtures generated by the test cases.

Source code in packages/testing/src/execution_testing/fixtures/collector.py
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
@dataclass(kw_only=True)
class FixtureCollector:
    """Collects all fixtures generated by the test cases."""

    output_dir: Path
    fill_static_tests: bool
    single_fixture_per_file: bool
    filler_path: Path
    base_dump_dir: Optional[Path] = None
    generate_index: bool = True
    # Worker ID for partial files. None = read from env var.
    worker_id: Optional[str] = None

    # Internal state (only used for stdout mode)
    all_fixtures: Dict[Path, Fixtures] = field(default_factory=dict)

    # Streaming file handles - kept open for module duration
    _partial_fixture_files: Dict[Path, IO[str]] = field(default_factory=dict)
    _partial_index_file: Optional[IO[str]] = field(default=None)
    _worker_id_cached: bool = field(default=False, init=False)

    # Lightweight tracking for verification (path, format class, debug_path)
    # Only stores metadata, not fixture data - memory efficient
    _fixtures_to_verify: List[Tuple[Path, type, Optional[Path]]] = field(
        default_factory=list
    )

    def get_fixture_basename(self, info: TestInfo) -> Path:
        """Return basename of the fixture file for a given test case."""
        module_relative_output_dir = info.get_module_relative_output_dir(
            self.filler_path
        )

        # Each legacy test filler has only 1 test per file if it's a !state
        # test! So no need to create directory Add11/add11.json it can be plain
        # add11.json
        if self.fill_static_tests:
            return module_relative_output_dir.parent / info.original_name

        if self.single_fixture_per_file:
            return module_relative_output_dir / info.get_single_test_name(
                mode="test"
            )
        return module_relative_output_dir / info.get_single_test_name(
            mode="module"
        )

    def _get_worker_id(self) -> str | None:
        """Get the worker ID (from constructor or environment)."""
        if self.worker_id is not None:
            return self.worker_id
        if not self._worker_id_cached:
            # Cache the env var lookup
            env_worker_id = os.environ.get("PYTEST_XDIST_WORKER")
            if env_worker_id:
                self.worker_id = env_worker_id
            self._worker_id_cached = True
        return self.worker_id

    def add_fixture(
        self,
        info: TestInfo,
        fixture: BaseFixture,
        output_subdir: Path | None = None,
    ) -> Path:
        """Add fixture and immediately stream to partial JSONL file."""
        fixture_basename = self.get_fixture_basename(info)
        if (
            output_subdir is not None
            and SUBFOLDER_LEVEL_SEPARATOR in output_subdir.name
        ):
            parts = fixture_basename.parts
            if parts and parts[0] == "benchmark":
                # Strip the "benchmark/" prefix from the fixture path so
                # files land directly under the gas-limit subdirectory.
                fixture_basename = Path(*parts[1:])

        format_output_dir = self.output_dir / fixture.output_base_dir_name()
        if output_subdir is not None and self.output_dir.name != "stdout":
            format_output_dir = format_output_dir / output_subdir

        fixture_path = format_output_dir / fixture_basename.with_suffix(
            fixture.output_file_extension
        )

        # Stream fixture directly to partial JSONL (no memory accumulation)
        if self.output_dir.name != "stdout":
            self._stream_fixture_to_partial(
                fixture_path, info.get_id(), fixture
            )
            # Track for verification (lightweight - only path and format class)
            debug_path = self._get_consume_direct_dump_dir(info)
            self._fixtures_to_verify.append(
                (fixture_path, fixture.__class__, debug_path)
            )
        else:
            # stdout mode: accumulate for final JSON dump
            if fixture_path not in self.all_fixtures:
                self.all_fixtures[fixture_path] = Fixtures(root={})
            self.all_fixtures[fixture_path][info.get_id()] = fixture

        # Stream index entry directly to partial JSONL
        if self.generate_index and self.output_dir.name != "stdout":
            relative_path = fixture_path.relative_to(self.output_dir)
            fixture_fork = fixture.get_fork()
            index_entry = {
                "id": info.get_id(),
                "json_path": str(relative_path),
                "fixture_hash": str(fixture.hash) if fixture.hash else None,
                "fork": fixture_fork.name() if fixture_fork else None,
                "format": fixture.format_name,
            }
            if (pre_hash := getattr(fixture, "pre_hash", None)) is not None:
                index_entry["pre_hash"] = pre_hash
            self._stream_index_entry_to_partial(index_entry)

        return fixture_path

    def _get_partial_fixture_file(self, fixture_path: Path) -> "IO[str]":
        """Get or create a file handle for streaming fixtures."""
        worker_id = self._get_worker_id()
        suffix = f".{worker_id}" if worker_id else ".main"
        partial_path = fixture_path.with_suffix(f".partial{suffix}.jsonl")

        if partial_path not in self._partial_fixture_files:
            partial_path.parent.mkdir(parents=True, exist_ok=True)
            self._partial_fixture_files[partial_path] = open(partial_path, "a")

        return self._partial_fixture_files[partial_path]

    def _stream_fixture_to_partial(
        self,
        fixture_path: Path,
        fixture_id: str,
        fixture: BaseFixture,
    ) -> None:
        """Stream a single fixture to its partial JSONL file."""
        value = json.dumps(fixture.json_dict_with_info(), indent=4)
        line = json.dumps({"k": fixture_id, "v": value}) + "\n"

        f = self._get_partial_fixture_file(fixture_path)
        f.write(line)
        f.flush()  # Ensure data is written immediately

    def _get_partial_index_file(self) -> "IO[str]":
        """Get or create the file handle for streaming index entries."""
        if self._partial_index_file is None:
            worker_id = self._get_worker_id()
            suffix = f".{worker_id}" if worker_id else ".main"
            partial_index_path = (
                self.output_dir / ".meta" / f"partial_index{suffix}.jsonl"
            )
            partial_index_path.parent.mkdir(parents=True, exist_ok=True)
            self._partial_index_file = open(partial_index_path, "a")

        return self._partial_index_file

    def _stream_index_entry_to_partial(self, entry: Dict) -> None:
        """Stream a single index entry to partial JSONL file."""
        f = self._get_partial_index_file()
        f.write(json.dumps(entry) + "\n")
        f.flush()  # Ensure data is written immediately

    def close_streaming_files(self) -> None:
        """Close all open streaming file handles."""
        for f in self._partial_fixture_files.values():
            f.close()
        self._partial_fixture_files.clear()

        if self._partial_index_file is not None:
            self._partial_index_file.close()
            self._partial_index_file = None

    def dump_fixtures(self) -> None:
        """Dump collected fixtures (only used for stdout mode)."""
        if self.output_dir.name == "stdout":
            combined_fixtures = {
                k: to_json(v)
                for fixture in self.all_fixtures.values()
                for k, v in fixture.items()
            }
            json.dump(combined_fixtures, sys.stdout, indent=4)
            self.all_fixtures.clear()
        # For file output, fixtures are already streamed in add_fixture()

    def _get_consume_direct_dump_dir(self, info: TestInfo) -> Path | None:
        """
        Directory to dump the current test function's fixture.json and fixture
        verification debug output.
        """
        if not self.base_dump_dir:
            return None
        if self.single_fixture_per_file:
            return info.get_dump_dir_path(
                self.base_dump_dir, self.filler_path, level="test_parameter"
            )
        else:
            return info.get_dump_dir_path(
                self.base_dump_dir, self.filler_path, level="test_function"
            )

    def verify_fixture_files(
        self, evm_fixture_verification: FixtureConsumer
    ) -> None:
        """
        Run `evm [state|block]test` on each fixture.

        For streaming mode, uses lightweight tracking of fixture paths/formats
        rather than keeping full fixtures in memory.
        """
        if self.output_dir.name == "stdout":
            # stdout mode: fixtures are in memory
            for fixture_path, name_fixture_dict in self.all_fixtures.items():
                for _fixture_name, fixture in name_fixture_dict.items():
                    if evm_fixture_verification.can_consume(fixture.__class__):
                        evm_fixture_verification.consume_fixture(
                            fixture.__class__,
                            fixture_path,
                            fixture_name=None,
                            debug_output_path=None,
                        )
        else:
            # Streaming mode: use tracked fixture metadata
            for entry in self._fixtures_to_verify:
                fixture_path, fixture_format, debug_path = entry
                if evm_fixture_verification.can_consume(fixture_format):
                    evm_fixture_verification.consume_fixture(
                        fixture_format,
                        fixture_path,
                        fixture_name=None,
                        debug_output_path=debug_path,
                    )
            # Clear tracking after verification
            self._fixtures_to_verify.clear()

get_fixture_basename(info)

Return basename of the fixture file for a given test case.

Source code in packages/testing/src/execution_testing/fixtures/collector.py
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
def get_fixture_basename(self, info: TestInfo) -> Path:
    """Return basename of the fixture file for a given test case."""
    module_relative_output_dir = info.get_module_relative_output_dir(
        self.filler_path
    )

    # Each legacy test filler has only 1 test per file if it's a !state
    # test! So no need to create directory Add11/add11.json it can be plain
    # add11.json
    if self.fill_static_tests:
        return module_relative_output_dir.parent / info.original_name

    if self.single_fixture_per_file:
        return module_relative_output_dir / info.get_single_test_name(
            mode="test"
        )
    return module_relative_output_dir / info.get_single_test_name(
        mode="module"
    )

add_fixture(info, fixture, output_subdir=None)

Add fixture and immediately stream to partial JSONL file.

Source code in packages/testing/src/execution_testing/fixtures/collector.py
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
def add_fixture(
    self,
    info: TestInfo,
    fixture: BaseFixture,
    output_subdir: Path | None = None,
) -> Path:
    """Add fixture and immediately stream to partial JSONL file."""
    fixture_basename = self.get_fixture_basename(info)
    if (
        output_subdir is not None
        and SUBFOLDER_LEVEL_SEPARATOR in output_subdir.name
    ):
        parts = fixture_basename.parts
        if parts and parts[0] == "benchmark":
            # Strip the "benchmark/" prefix from the fixture path so
            # files land directly under the gas-limit subdirectory.
            fixture_basename = Path(*parts[1:])

    format_output_dir = self.output_dir / fixture.output_base_dir_name()
    if output_subdir is not None and self.output_dir.name != "stdout":
        format_output_dir = format_output_dir / output_subdir

    fixture_path = format_output_dir / fixture_basename.with_suffix(
        fixture.output_file_extension
    )

    # Stream fixture directly to partial JSONL (no memory accumulation)
    if self.output_dir.name != "stdout":
        self._stream_fixture_to_partial(
            fixture_path, info.get_id(), fixture
        )
        # Track for verification (lightweight - only path and format class)
        debug_path = self._get_consume_direct_dump_dir(info)
        self._fixtures_to_verify.append(
            (fixture_path, fixture.__class__, debug_path)
        )
    else:
        # stdout mode: accumulate for final JSON dump
        if fixture_path not in self.all_fixtures:
            self.all_fixtures[fixture_path] = Fixtures(root={})
        self.all_fixtures[fixture_path][info.get_id()] = fixture

    # Stream index entry directly to partial JSONL
    if self.generate_index and self.output_dir.name != "stdout":
        relative_path = fixture_path.relative_to(self.output_dir)
        fixture_fork = fixture.get_fork()
        index_entry = {
            "id": info.get_id(),
            "json_path": str(relative_path),
            "fixture_hash": str(fixture.hash) if fixture.hash else None,
            "fork": fixture_fork.name() if fixture_fork else None,
            "format": fixture.format_name,
        }
        if (pre_hash := getattr(fixture, "pre_hash", None)) is not None:
            index_entry["pre_hash"] = pre_hash
        self._stream_index_entry_to_partial(index_entry)

    return fixture_path

close_streaming_files()

Close all open streaming file handles.

Source code in packages/testing/src/execution_testing/fixtures/collector.py
359
360
361
362
363
364
365
366
367
def close_streaming_files(self) -> None:
    """Close all open streaming file handles."""
    for f in self._partial_fixture_files.values():
        f.close()
    self._partial_fixture_files.clear()

    if self._partial_index_file is not None:
        self._partial_index_file.close()
        self._partial_index_file = None

dump_fixtures()

Dump collected fixtures (only used for stdout mode).

Source code in packages/testing/src/execution_testing/fixtures/collector.py
369
370
371
372
373
374
375
376
377
378
def dump_fixtures(self) -> None:
    """Dump collected fixtures (only used for stdout mode)."""
    if self.output_dir.name == "stdout":
        combined_fixtures = {
            k: to_json(v)
            for fixture in self.all_fixtures.values()
            for k, v in fixture.items()
        }
        json.dump(combined_fixtures, sys.stdout, indent=4)
        self.all_fixtures.clear()

verify_fixture_files(evm_fixture_verification)

Run evm [state|block]test on each fixture.

For streaming mode, uses lightweight tracking of fixture paths/formats rather than keeping full fixtures in memory.

Source code in packages/testing/src/execution_testing/fixtures/collector.py
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
def verify_fixture_files(
    self, evm_fixture_verification: FixtureConsumer
) -> None:
    """
    Run `evm [state|block]test` on each fixture.

    For streaming mode, uses lightweight tracking of fixture paths/formats
    rather than keeping full fixtures in memory.
    """
    if self.output_dir.name == "stdout":
        # stdout mode: fixtures are in memory
        for fixture_path, name_fixture_dict in self.all_fixtures.items():
            for _fixture_name, fixture in name_fixture_dict.items():
                if evm_fixture_verification.can_consume(fixture.__class__):
                    evm_fixture_verification.consume_fixture(
                        fixture.__class__,
                        fixture_path,
                        fixture_name=None,
                        debug_output_path=None,
                    )
    else:
        # Streaming mode: use tracked fixture metadata
        for entry in self._fixtures_to_verify:
            fixture_path, fixture_format, debug_path = entry
            if evm_fixture_verification.can_consume(fixture_format):
                evm_fixture_verification.consume_fixture(
                    fixture_format,
                    fixture_path,
                    fixture_name=None,
                    debug_output_path=debug_path,
                )
        # Clear tracking after verification
        self._fixtures_to_verify.clear()

TestInfo dataclass

Contains test information from the current node.

Source code in packages/testing/src/execution_testing/fixtures/collector.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
@dataclass(kw_only=True, slots=True)
class TestInfo:
    """Contains test information from the current node."""

    name: str  # pytest: Item.name, e.g. test_paris_one[fork_Paris-state_test]
    id: str  # pytest: Item.nodeid, e.g.
    # tests/paris/test_module_paris.py::test_paris_one[...]
    original_name: str  # pytest: Item.originalname, e.g. test_paris_one
    module_path: Path  # pytest: Item.path, e.g.
    # .../tests/paris/test_module_paris.py

    test_prefix: ClassVar[str] = "test_"  # Python test prefix
    filler_suffix: ClassVar[str] = "Filler"  # Static test suffix

    @classmethod
    def strip_test_name(cls, name: str) -> str:
        """Remove test prefix from a python test case name."""
        if name.startswith(cls.test_prefix):
            return name.removeprefix(cls.test_prefix)
        if name.endswith(cls.filler_suffix):
            return name.removesuffix(cls.filler_suffix)
        return name

    def get_name_and_parameters(self) -> Tuple[str, str]:
        """
        Convert test name to a tuple containing the test name and test
        parameters.

        Example: test_push0_key_sstore[fork_Shanghai] -> test_push0_key_sstore,
        fork_Shanghai
        """
        test_name, parameters = self.name.split("[")
        return test_name, re.sub(r"[\[\-]", "_", parameters).replace("]", "")

    def get_single_test_name(
        self, mode: Literal["module", "test"] = "module"
    ) -> str:
        """Convert test name to a single test name."""
        if mode == "module":
            # Use the module name as the test name
            return self.strip_test_name(self.original_name)
        elif mode == "test":
            # Mix the module name and the test name/arguments
            test_name, test_parameters = self.get_name_and_parameters()
            test_name = self.strip_test_name(test_name)
            return f"{test_name}__{test_parameters}"

    def get_dump_dir_path(
        self,
        base_dump_dir: Optional[Path],
        filler_path: Path,
        level: Literal[
            "test_module", "test_function", "test_parameter"
        ] = "test_parameter",
    ) -> Optional[Path]:
        """Path to dump the debug output as defined by the level to dump at."""
        if not base_dump_dir:
            return None
        test_module_relative_dir = self.get_module_relative_output_dir(
            filler_path
        )
        if level == "test_module":
            return Path(base_dump_dir) / Path(
                str(test_module_relative_dir).replace(os.sep, "__")
            )
        test_name, test_parameter_string = self.get_name_and_parameters()
        dir_str = str(test_module_relative_dir).replace(os.sep, "__")
        flat_path = f"{dir_str}__{test_name}"
        if level == "test_function":
            return Path(base_dump_dir) / flat_path
        elif level == "test_parameter":
            return Path(base_dump_dir) / flat_path / test_parameter_string
        raise Exception("Unexpected level.")

    def get_id(self) -> str:
        """Return the test id."""
        return self.id

    def get_module_relative_output_dir(self, filler_path: Path) -> Path:
        """
        Return a directory name for the provided test_module (relative to the
        base ./tests directory) that can be used for output (within the
        configured fixtures output path or the base_dump_dir directory).

        Example: tests/shanghai/eip3855_push0/test_push0.py ->
        shanghai/eip3855_push0/test_push0
        """
        basename = self.module_path.with_suffix("").absolute()
        basename_relative = basename.relative_to(
            os.path.commonpath([filler_path.absolute(), basename])
        )
        module_path = basename_relative.parent / self.strip_test_name(
            basename_relative.stem
        )
        return module_path

strip_test_name(name) classmethod

Remove test prefix from a python test case name.

Source code in packages/testing/src/execution_testing/fixtures/collector.py
113
114
115
116
117
118
119
120
@classmethod
def strip_test_name(cls, name: str) -> str:
    """Remove test prefix from a python test case name."""
    if name.startswith(cls.test_prefix):
        return name.removeprefix(cls.test_prefix)
    if name.endswith(cls.filler_suffix):
        return name.removesuffix(cls.filler_suffix)
    return name

get_name_and_parameters()

Convert test name to a tuple containing the test name and test parameters.

Example: test_push0_key_sstore[fork_Shanghai] -> test_push0_key_sstore, fork_Shanghai

Source code in packages/testing/src/execution_testing/fixtures/collector.py
122
123
124
125
126
127
128
129
130
131
def get_name_and_parameters(self) -> Tuple[str, str]:
    """
    Convert test name to a tuple containing the test name and test
    parameters.

    Example: test_push0_key_sstore[fork_Shanghai] -> test_push0_key_sstore,
    fork_Shanghai
    """
    test_name, parameters = self.name.split("[")
    return test_name, re.sub(r"[\[\-]", "_", parameters).replace("]", "")

get_single_test_name(mode='module')

Convert test name to a single test name.

Source code in packages/testing/src/execution_testing/fixtures/collector.py
133
134
135
136
137
138
139
140
141
142
143
144
def get_single_test_name(
    self, mode: Literal["module", "test"] = "module"
) -> str:
    """Convert test name to a single test name."""
    if mode == "module":
        # Use the module name as the test name
        return self.strip_test_name(self.original_name)
    elif mode == "test":
        # Mix the module name and the test name/arguments
        test_name, test_parameters = self.get_name_and_parameters()
        test_name = self.strip_test_name(test_name)
        return f"{test_name}__{test_parameters}"

get_dump_dir_path(base_dump_dir, filler_path, level='test_parameter')

Path to dump the debug output as defined by the level to dump at.

Source code in packages/testing/src/execution_testing/fixtures/collector.py
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
def get_dump_dir_path(
    self,
    base_dump_dir: Optional[Path],
    filler_path: Path,
    level: Literal[
        "test_module", "test_function", "test_parameter"
    ] = "test_parameter",
) -> Optional[Path]:
    """Path to dump the debug output as defined by the level to dump at."""
    if not base_dump_dir:
        return None
    test_module_relative_dir = self.get_module_relative_output_dir(
        filler_path
    )
    if level == "test_module":
        return Path(base_dump_dir) / Path(
            str(test_module_relative_dir).replace(os.sep, "__")
        )
    test_name, test_parameter_string = self.get_name_and_parameters()
    dir_str = str(test_module_relative_dir).replace(os.sep, "__")
    flat_path = f"{dir_str}__{test_name}"
    if level == "test_function":
        return Path(base_dump_dir) / flat_path
    elif level == "test_parameter":
        return Path(base_dump_dir) / flat_path / test_parameter_string
    raise Exception("Unexpected level.")

get_id()

Return the test id.

Source code in packages/testing/src/execution_testing/fixtures/collector.py
173
174
175
def get_id(self) -> str:
    """Return the test id."""
    return self.id

get_module_relative_output_dir(filler_path)

Return a directory name for the provided test_module (relative to the base ./tests directory) that can be used for output (within the configured fixtures output path or the base_dump_dir directory).

Example: tests/shanghai/eip3855_push0/test_push0.py -> shanghai/eip3855_push0/test_push0

Source code in packages/testing/src/execution_testing/fixtures/collector.py
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
def get_module_relative_output_dir(self, filler_path: Path) -> Path:
    """
    Return a directory name for the provided test_module (relative to the
    base ./tests directory) that can be used for output (within the
    configured fixtures output path or the base_dump_dir directory).

    Example: tests/shanghai/eip3855_push0/test_push0.py ->
    shanghai/eip3855_push0/test_push0
    """
    basename = self.module_path.with_suffix("").absolute()
    basename_relative = basename.relative_to(
        os.path.commonpath([filler_path.absolute(), basename])
    )
    module_path = basename_relative.parent / self.strip_test_name(
        basename_relative.stem
    )
    return module_path

merge_partial_fixture_files(output_dir)

Merge all partial fixture JSONL files into final JSON fixture files.

Called at session end after all workers have written their partials. Each partial file contains JSONL lines: {"k": fixture_id, "v": json_str}

Processes one target file at a time, reading its partials sequentially into a dict. Memory = O(entries per target), freed before next target.

Source code in packages/testing/src/execution_testing/fixtures/collector.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
def merge_partial_fixture_files(output_dir: Path) -> None:
    """
    Merge all partial fixture JSONL files into final JSON fixture files.

    Called at session end after all workers have written their partials.
    Each partial file contains JSONL lines: {"k": fixture_id, "v": json_str}

    Processes one target file at a time, reading its partials sequentially
    into a dict. Memory = O(entries per target), freed before next target.
    """
    # Find all partial files
    partial_files = list(output_dir.rglob("*.partial.*.jsonl"))
    if not partial_files:
        return

    # Group partial files by their target fixture file
    # e.g., "test.partial.gw0.jsonl" -> "test.json"
    partials_by_target: Dict[Path, List[Path]] = {}
    for partial in partial_files:
        # Remove .partial.{worker_id}.jsonl suffix to get target
        name = partial.name
        # Find ".partial." and remove everything after
        idx = name.find(".partial.")
        if idx == -1:
            continue
        target_name = name[:idx] + ".json"
        target_path = partial.parent / target_name
        if target_path not in partials_by_target:
            partials_by_target[target_path] = []
        partials_by_target[target_path].append(partial)

    # Merge each group into its target file
    for target_path, partials in partials_by_target.items():
        # Read partials sequentially into dict (one at a time)
        entries: Dict[str, str] = {}
        for partial in partials:
            with open(partial) as f:
                for line in f:
                    line = line.strip()
                    if line:
                        entry = json.loads(line)
                        entries[entry["k"]] = entry["v"]

        # Write sorted entries to output file
        with open(target_path, "w") as out_f:
            out_f.write("{\n")
            sorted_keys = sorted(entries.keys())
            last_idx = len(sorted_keys) - 1
            for i, key in enumerate(sorted_keys):
                key_json = json.dumps(key)
                value_indented = entries[key].replace("\n", "\n    ")
                out_f.write(f"    {key_json}: {value_indented}")
                out_f.write(",\n" if i < last_idx else "\n")
            out_f.write("}")

        # Free memory before processing next target
        entries.clear()

        # Clean up partial files
        for partial in partials:
            partial.unlink()
            # Also remove lock files
            lock_file = partial.with_suffix(".lock")
            if lock_file.exists():
                lock_file.unlink()

FixtureConsumer

Bases: ABC

Abstract class for verifying Ethereum test fixtures.

Source code in packages/testing/src/execution_testing/fixtures/consume.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
class FixtureConsumer(ABC):
    """Abstract class for verifying Ethereum test fixtures."""

    fixture_formats: List[FixtureFormat]

    def can_consume(
        self,
        fixture_format: FixtureFormat,
    ) -> bool:
        """Return whether the fixture format is consumable by this consumer."""
        return fixture_format in self.fixture_formats

    @abstractmethod
    def consume_fixture(
        self,
        fixture_format: FixtureFormat,
        fixture_path: Path,
        fixture_name: str | None = None,
        debug_output_path: Path | None = None,
    ) -> None:
        """
        Test the client with the specified fixture using its direct consumer
        interface.
        """
        raise NotImplementedError(
            "The `consume_fixture()` function is not supported by this tool."
        )

can_consume(fixture_format)

Return whether the fixture format is consumable by this consumer.

Source code in packages/testing/src/execution_testing/fixtures/consume.py
22
23
24
25
26
27
def can_consume(
    self,
    fixture_format: FixtureFormat,
) -> bool:
    """Return whether the fixture format is consumable by this consumer."""
    return fixture_format in self.fixture_formats

consume_fixture(fixture_format, fixture_path, fixture_name=None, debug_output_path=None) abstractmethod

Test the client with the specified fixture using its direct consumer interface.

Source code in packages/testing/src/execution_testing/fixtures/consume.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
@abstractmethod
def consume_fixture(
    self,
    fixture_format: FixtureFormat,
    fixture_path: Path,
    fixture_name: str | None = None,
    debug_output_path: Path | None = None,
) -> None:
    """
    Test the client with the specified fixture using its direct consumer
    interface.
    """
    raise NotImplementedError(
        "The `consume_fixture()` function is not supported by this tool."
    )

AccountCheck

Bases: CamelModel

Capture which fields are verified for a single account.

A None value means the field is not checked (it was not explicitly set by the test author). A present value records the expected value that check_alloc would assert against.

Source code in packages/testing/src/execution_testing/fixtures/post_verifications.py
18
19
20
21
22
23
24
25
26
27
28
29
30
class AccountCheck(CamelModel):
    """
    Capture which fields are verified for a single account.

    A ``None`` value means the field is not checked (it was not
    explicitly set by the test author).  A present value records the
    expected value that ``check_alloc`` would assert against.
    """

    nonce: ZeroPaddedHexNumber | None = None
    balance: ZeroPaddedHexNumber | None = None
    code: Bytes | None = None
    storage: Mapping[ZeroPaddedHexNumber, ZeroPaddedHexNumber] | None = None

PostVerifications

Bases: CamelModel

Record every post-state check performed during a fill session.

Accounts mapped to None represent should-not-exist checks.

Source code in packages/testing/src/execution_testing/fixtures/post_verifications.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
class PostVerifications(CamelModel):
    """
    Record every post-state check performed during a fill session.

    Accounts mapped to ``None`` represent *should-not-exist* checks.
    """

    accounts: Dict[Address, AccountCheck | None]

    @classmethod
    def from_alloc(cls, alloc: Alloc) -> PostVerifications:
        """
        Derive verification checks from an expected post ``Alloc``.

        Walk each address/account pair and inspect
        ``model_fields_set`` to determine which fields will actually
        be compared by ``Account.check_alloc``.
        """
        accounts: Dict[Address, AccountCheck | None] = {}
        for address, account in alloc.root.items():
            if account is None:
                accounts[address] = None
                continue
            accounts[address] = AccountCheck(
                nonce=(
                    account.nonce
                    if "nonce" in account.model_fields_set
                    else None
                ),
                balance=(
                    account.balance
                    if "balance" in account.model_fields_set
                    else None
                ),
                code=(
                    account.code
                    if "code" in account.model_fields_set
                    else None
                ),
                storage=(
                    dict(account.storage.root)
                    if "storage" in account.model_fields_set
                    else None
                ),
            )
        return cls(accounts=accounts)

from_alloc(alloc) classmethod

Derive verification checks from an expected post Alloc.

Walk each address/account pair and inspect model_fields_set to determine which fields will actually be compared by Account.check_alloc.

Source code in packages/testing/src/execution_testing/fixtures/post_verifications.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
@classmethod
def from_alloc(cls, alloc: Alloc) -> PostVerifications:
    """
    Derive verification checks from an expected post ``Alloc``.

    Walk each address/account pair and inspect
    ``model_fields_set`` to determine which fields will actually
    be compared by ``Account.check_alloc``.
    """
    accounts: Dict[Address, AccountCheck | None] = {}
    for address, account in alloc.root.items():
        if account is None:
            accounts[address] = None
            continue
        accounts[address] = AccountCheck(
            nonce=(
                account.nonce
                if "nonce" in account.model_fields_set
                else None
            ),
            balance=(
                account.balance
                if "balance" in account.model_fields_set
                else None
            ),
            code=(
                account.code
                if "code" in account.model_fields_set
                else None
            ),
            storage=(
                dict(account.storage.root)
                if "storage" in account.model_fields_set
                else None
            ),
        )
    return cls(accounts=accounts)

PreAllocGroup

Bases: PreAllocGroupBuilder

Pre-allocation group for tests with identical Environment and fork values.

Groups tests by a hash of their fixture Environment and fork to enable pre-allocation group optimization.

Source code in packages/testing/src/execution_testing/fixtures/pre_alloc_groups.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
class PreAllocGroup(PreAllocGroupBuilder):
    """
    Pre-allocation group for tests with identical Environment and fork values.

    Groups tests by a hash of their fixture Environment and fork to enable
    pre-allocation group optimization.
    """

    pre: GroupPreAlloc
    genesis: FixtureHeader
    pre_account_count: int
    test_count: int

    def model_post_init(self, __context: Any) -> None:
        """
        Model post init method to cache the state root in GroupPreAlloc.
        """
        super().model_post_init(__context)
        self.pre._cached_state_root = self.genesis.state_root

    @classmethod
    def from_file(cls, file: Path) -> Self:
        """
        Load a pre-allocation group from a JSON file.

        Files are stored in builder format (without genesis). Genesis is
        computed on-demand when loading, ensuring state root computation
        happens exactly once in Phase 2, not during Phase 1 merging.
        """
        with open(file) as f:
            data = f.read()

        builder = PreAllocGroupBuilder.model_validate_json(data)
        built = builder.build()
        # Use cls.model_validate to ensure proper Self return type
        return cls.model_validate(built.model_dump())

model_post_init(__context)

Model post init method to cache the state root in GroupPreAlloc.

Source code in packages/testing/src/execution_testing/fixtures/pre_alloc_groups.py
341
342
343
344
345
346
def model_post_init(self, __context: Any) -> None:
    """
    Model post init method to cache the state root in GroupPreAlloc.
    """
    super().model_post_init(__context)
    self.pre._cached_state_root = self.genesis.state_root

from_file(file) classmethod

Load a pre-allocation group from a JSON file.

Files are stored in builder format (without genesis). Genesis is computed on-demand when loading, ensuring state root computation happens exactly once in Phase 2, not during Phase 1 merging.

Source code in packages/testing/src/execution_testing/fixtures/pre_alloc_groups.py
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
@classmethod
def from_file(cls, file: Path) -> Self:
    """
    Load a pre-allocation group from a JSON file.

    Files are stored in builder format (without genesis). Genesis is
    computed on-demand when loading, ensuring state root computation
    happens exactly once in Phase 2, not during Phase 1 merging.
    """
    with open(file) as f:
        data = f.read()

    builder = PreAllocGroupBuilder.model_validate_json(data)
    built = builder.build()
    # Use cls.model_validate to ensure proper Self return type
    return cls.model_validate(built.model_dump())

PreAllocGroupBuilder

Bases: CamelModel

Pre-allocation group builder.

Source code in packages/testing/src/execution_testing/fixtures/pre_alloc_groups.py
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
class PreAllocGroupBuilder(CamelModel):
    """Pre-allocation group builder."""

    test_ids: List[str] = Field(default_factory=list)
    environment: Environment = Field(
        ..., description="Grouping environment for this test group"
    )
    fork: Fork | TransitionFork = Field(..., alias="network")
    chain_id: int = DEFAULT_CHAIN_ID
    pre: Alloc

    def get_pre_account_count(self) -> int:
        """Return the amount of accounts the pre-allocation group holds."""
        return len(self.pre.root)

    def get_test_count(self) -> int:
        """Return the amount of tests that use this pre-allocation group."""
        return len(self.test_ids)

    def calculate_genesis(self) -> FixtureHeader:
        """Get the genesis header for this group."""
        return FixtureHeader.genesis(
            self.fork.transitions_from(),
            self.environment,
            self.pre.state_root(),
        )

    def add_test_alloc(self, test_id: str, new_pre: Alloc) -> None:
        """Adds a pre to this builder's pre."""
        self.pre = Alloc.merge(
            self.pre,
            new_pre,
            key_collision_mode=Alloc.KeyCollisionMode.ALLOW_IDENTICAL_ACCOUNTS,
        )
        self.test_ids.append(test_id)

    def build(self) -> "PreAllocGroup":
        """Build the pre-alloc group."""
        return PreAllocGroup(
            test_ids=self.test_ids,
            environment=self.environment,
            fork=self.fork,
            chain_id=self.chain_id,
            pre=self.pre.model_dump(),
            pre_account_count=self.get_pre_account_count(),
            test_count=self.get_test_count(),
            genesis=self.calculate_genesis(),
        )

    def to_partial_file(
        self, file: Path, worker_id: Optional[str] = None
    ) -> None:
        """
        Save PreAllocGroupBuilder to a partial file (no locking).

        Each worker writes its own partial file, which are merged at session
        end by merge_partial_group_files(). This eliminates lock contention
        that caused workers to take 30-180+ seconds each.

        Saves the builder format (without genesis/state_root) to avoid
        expensive state root computation during Phase 1. State root is
        computed once when loading in Phase 2 via PreAllocGroup.from_file().
        """
        suffix = f".{worker_id}" if worker_id else ".main"
        partial_path = file.with_suffix(f".partial{suffix}.json")
        partial_path.write_text(
            self.model_dump_json(by_alias=True, exclude_none=True, indent=2)
        )

get_pre_account_count()

Return the amount of accounts the pre-allocation group holds.

Source code in packages/testing/src/execution_testing/fixtures/pre_alloc_groups.py
45
46
47
def get_pre_account_count(self) -> int:
    """Return the amount of accounts the pre-allocation group holds."""
    return len(self.pre.root)

get_test_count()

Return the amount of tests that use this pre-allocation group.

Source code in packages/testing/src/execution_testing/fixtures/pre_alloc_groups.py
49
50
51
def get_test_count(self) -> int:
    """Return the amount of tests that use this pre-allocation group."""
    return len(self.test_ids)

calculate_genesis()

Get the genesis header for this group.

Source code in packages/testing/src/execution_testing/fixtures/pre_alloc_groups.py
53
54
55
56
57
58
59
def calculate_genesis(self) -> FixtureHeader:
    """Get the genesis header for this group."""
    return FixtureHeader.genesis(
        self.fork.transitions_from(),
        self.environment,
        self.pre.state_root(),
    )

add_test_alloc(test_id, new_pre)

Adds a pre to this builder's pre.

Source code in packages/testing/src/execution_testing/fixtures/pre_alloc_groups.py
61
62
63
64
65
66
67
68
def add_test_alloc(self, test_id: str, new_pre: Alloc) -> None:
    """Adds a pre to this builder's pre."""
    self.pre = Alloc.merge(
        self.pre,
        new_pre,
        key_collision_mode=Alloc.KeyCollisionMode.ALLOW_IDENTICAL_ACCOUNTS,
    )
    self.test_ids.append(test_id)

build()

Build the pre-alloc group.

Source code in packages/testing/src/execution_testing/fixtures/pre_alloc_groups.py
70
71
72
73
74
75
76
77
78
79
80
81
def build(self) -> "PreAllocGroup":
    """Build the pre-alloc group."""
    return PreAllocGroup(
        test_ids=self.test_ids,
        environment=self.environment,
        fork=self.fork,
        chain_id=self.chain_id,
        pre=self.pre.model_dump(),
        pre_account_count=self.get_pre_account_count(),
        test_count=self.get_test_count(),
        genesis=self.calculate_genesis(),
    )

to_partial_file(file, worker_id=None)

Save PreAllocGroupBuilder to a partial file (no locking).

Each worker writes its own partial file, which are merged at session end by merge_partial_group_files(). This eliminates lock contention that caused workers to take 30-180+ seconds each.

Saves the builder format (without genesis/state_root) to avoid expensive state root computation during Phase 1. State root is computed once when loading in Phase 2 via PreAllocGroup.from_file().

Source code in packages/testing/src/execution_testing/fixtures/pre_alloc_groups.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
def to_partial_file(
    self, file: Path, worker_id: Optional[str] = None
) -> None:
    """
    Save PreAllocGroupBuilder to a partial file (no locking).

    Each worker writes its own partial file, which are merged at session
    end by merge_partial_group_files(). This eliminates lock contention
    that caused workers to take 30-180+ seconds each.

    Saves the builder format (without genesis/state_root) to avoid
    expensive state root computation during Phase 1. State root is
    computed once when loading in Phase 2 via PreAllocGroup.from_file().
    """
    suffix = f".{worker_id}" if worker_id else ".main"
    partial_path = file.with_suffix(f".partial{suffix}.json")
    partial_path.write_text(
        self.model_dump_json(by_alias=True, exclude_none=True, indent=2)
    )

PreAllocGroupBuilders

Bases: EthereumTestRootModel

Root model mapping pre-allocation group hashes to test groups.

If lazy_load is True, the groups are not loaded from the folder until they are accessed.

Iterating will fail if lazy_load is True.

Source code in packages/testing/src/execution_testing/fixtures/pre_alloc_groups.py
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
class PreAllocGroupBuilders(EthereumTestRootModel):
    """
    Root model mapping pre-allocation group hashes to test groups.

    If lazy_load is True, the groups are not loaded from the folder until they
    are accessed.

    Iterating will fail if lazy_load is True.
    """

    root: Dict[str, PreAllocGroupBuilder]

    def to_folder(self, folder: Path, worker_id: Optional[str] = None) -> None:
        """
        Save PreAllocGroups to a folder as partial files.

        Each worker writes its own partial files (no lock contention).
        Call merge_partial_group_files() on master after all workers finish.
        """
        for key, value in self.root.items():
            assert value is not None, f"Value for key {key} is None"
            value.to_partial_file(folder / f"{key}.json", worker_id=worker_id)

    def add_test_pre(
        self,
        *,
        pre_alloc_hash: str,
        test_id: str,
        fork: Fork | TransitionFork,
        chain_id: int,
        environment: Environment,
        pre: Alloc,
    ) -> None:
        """Adds a single test to the appropriate group based on the hash."""
        if pre_alloc_hash in self.root:
            # Update existing group - just merge pre-allocations
            group = self.root[pre_alloc_hash]
            assert group.fork == fork, (
                f"Incompatible fork: {group.fork}!={fork}"
            )
            assert group.chain_id == chain_id, (
                f"Incompatible chain id: {group.chain_id}!={chain_id}"
            )
            group.add_test_alloc(test_id, pre)
        else:
            # Create new group - use Environment instead of expensive genesis
            # generation
            group = PreAllocGroupBuilder(
                test_ids=[test_id],
                fork=fork,
                chain_id=chain_id,
                environment=environment,
                pre=Alloc.merge(
                    Alloc.model_validate(
                        fork.transitions_to().pre_allocation_blockchain()
                    ),
                    pre,
                ),
            )
            self.root[pre_alloc_hash] = group

to_folder(folder, worker_id=None)

Save PreAllocGroups to a folder as partial files.

Each worker writes its own partial files (no lock contention). Call merge_partial_group_files() on master after all workers finish.

Source code in packages/testing/src/execution_testing/fixtures/pre_alloc_groups.py
195
196
197
198
199
200
201
202
203
204
def to_folder(self, folder: Path, worker_id: Optional[str] = None) -> None:
    """
    Save PreAllocGroups to a folder as partial files.

    Each worker writes its own partial files (no lock contention).
    Call merge_partial_group_files() on master after all workers finish.
    """
    for key, value in self.root.items():
        assert value is not None, f"Value for key {key} is None"
        value.to_partial_file(folder / f"{key}.json", worker_id=worker_id)

add_test_pre(*, pre_alloc_hash, test_id, fork, chain_id, environment, pre)

Adds a single test to the appropriate group based on the hash.

Source code in packages/testing/src/execution_testing/fixtures/pre_alloc_groups.py
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
def add_test_pre(
    self,
    *,
    pre_alloc_hash: str,
    test_id: str,
    fork: Fork | TransitionFork,
    chain_id: int,
    environment: Environment,
    pre: Alloc,
) -> None:
    """Adds a single test to the appropriate group based on the hash."""
    if pre_alloc_hash in self.root:
        # Update existing group - just merge pre-allocations
        group = self.root[pre_alloc_hash]
        assert group.fork == fork, (
            f"Incompatible fork: {group.fork}!={fork}"
        )
        assert group.chain_id == chain_id, (
            f"Incompatible chain id: {group.chain_id}!={chain_id}"
        )
        group.add_test_alloc(test_id, pre)
    else:
        # Create new group - use Environment instead of expensive genesis
        # generation
        group = PreAllocGroupBuilder(
            test_ids=[test_id],
            fork=fork,
            chain_id=chain_id,
            environment=environment,
            pre=Alloc.merge(
                Alloc.model_validate(
                    fork.transitions_to().pre_allocation_blockchain()
                ),
                pre,
            ),
        )
        self.root[pre_alloc_hash] = group

PreAllocGroups

Bases: EthereumTestRootModel

Root model mapping pre-allocation group hashes to test groups.

If lazy_load is True, the groups are not loaded from the folder until they are accessed.

Iterating will fail if lazy_load is True.

Source code in packages/testing/src/execution_testing/fixtures/pre_alloc_groups.py
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
class PreAllocGroups(EthereumTestRootModel):
    """
    Root model mapping pre-allocation group hashes to test groups.

    If lazy_load is True, the groups are not loaded from the folder until they
    are accessed.

    Iterating will fail if lazy_load is True.
    """

    root: Dict[str, PreAllocGroup | None]

    _folder_source: Path | None = PrivateAttr(None)

    def __setitem__(self, key: str, value: Any) -> None:
        """Set item in root dict."""
        assert self._folder_source is None, (
            "Cannot set item in root dict after folder source is set"
        )
        self.root[key] = value

    @classmethod
    def from_folder(cls, folder: Path, *, lazy_load: bool = False) -> Self:
        """Create PreAllocGroups from a folder of pre-allocation files."""
        # First check for collision failures
        for fail_file in folder.glob("*.fail"):
            with open(fail_file) as f:
                raise Alloc.CollisionError.from_json(json.loads(f.read()))

        data: Dict[str, PreAllocGroup | None] = {}
        for file in folder.glob("*.json"):
            if lazy_load:
                data[file.stem] = None
            else:
                data[file.stem] = PreAllocGroup.from_file(file)
        instance = cls(root=data)
        if lazy_load:
            instance._folder_source = folder
        return instance

    def __getitem__(self, item: str) -> PreAllocGroup:
        """Get item from root dict."""
        if self._folder_source is None:
            value = self.root[item]
            assert value is not None, f"Item {item} is None"
            return value
        else:
            if self.root[item] is None:
                self.root[item] = PreAllocGroup.from_file(
                    self._folder_source / f"{item}.json"
                )
            result = self.root[item]
            assert result is not None
            return result

    def __iter__(self) -> Iterator[str]:  # type: ignore [override]
        """Iterate over root dict."""
        return iter(self.root)

    def __contains__(self, item: str) -> bool:
        """Check if item in root dict."""
        return item in self.root

    def __len__(self) -> int:
        """Get length of root dict."""
        return len(self.root)

    def keys(self) -> KeysView[str]:
        """Get keys from root dict."""
        return self.root.keys()

    def values(self) -> Generator[PreAllocGroup, None, None]:
        """Get values from root dict."""
        for value in self.root.values():
            assert value is not None, "Value is None"
            yield value

    def items(self) -> Generator[Tuple[str, PreAllocGroup], None, None]:
        """Get items from root dict."""
        for key, value in self.root.items():
            assert value is not None, f"Value for key {key} is None"
            yield key, value

__setitem__(key, value)

Set item in root dict.

Source code in packages/testing/src/execution_testing/fixtures/pre_alloc_groups.py
380
381
382
383
384
385
def __setitem__(self, key: str, value: Any) -> None:
    """Set item in root dict."""
    assert self._folder_source is None, (
        "Cannot set item in root dict after folder source is set"
    )
    self.root[key] = value

from_folder(folder, *, lazy_load=False) classmethod

Create PreAllocGroups from a folder of pre-allocation files.

Source code in packages/testing/src/execution_testing/fixtures/pre_alloc_groups.py
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
@classmethod
def from_folder(cls, folder: Path, *, lazy_load: bool = False) -> Self:
    """Create PreAllocGroups from a folder of pre-allocation files."""
    # First check for collision failures
    for fail_file in folder.glob("*.fail"):
        with open(fail_file) as f:
            raise Alloc.CollisionError.from_json(json.loads(f.read()))

    data: Dict[str, PreAllocGroup | None] = {}
    for file in folder.glob("*.json"):
        if lazy_load:
            data[file.stem] = None
        else:
            data[file.stem] = PreAllocGroup.from_file(file)
    instance = cls(root=data)
    if lazy_load:
        instance._folder_source = folder
    return instance

__getitem__(item)

Get item from root dict.

Source code in packages/testing/src/execution_testing/fixtures/pre_alloc_groups.py
406
407
408
409
410
411
412
413
414
415
416
417
418
419
def __getitem__(self, item: str) -> PreAllocGroup:
    """Get item from root dict."""
    if self._folder_source is None:
        value = self.root[item]
        assert value is not None, f"Item {item} is None"
        return value
    else:
        if self.root[item] is None:
            self.root[item] = PreAllocGroup.from_file(
                self._folder_source / f"{item}.json"
            )
        result = self.root[item]
        assert result is not None
        return result

__iter__()

Iterate over root dict.

Source code in packages/testing/src/execution_testing/fixtures/pre_alloc_groups.py
421
422
423
def __iter__(self) -> Iterator[str]:  # type: ignore [override]
    """Iterate over root dict."""
    return iter(self.root)

__contains__(item)

Check if item in root dict.

Source code in packages/testing/src/execution_testing/fixtures/pre_alloc_groups.py
425
426
427
def __contains__(self, item: str) -> bool:
    """Check if item in root dict."""
    return item in self.root

__len__()

Get length of root dict.

Source code in packages/testing/src/execution_testing/fixtures/pre_alloc_groups.py
429
430
431
def __len__(self) -> int:
    """Get length of root dict."""
    return len(self.root)

keys()

Get keys from root dict.

Source code in packages/testing/src/execution_testing/fixtures/pre_alloc_groups.py
433
434
435
def keys(self) -> KeysView[str]:
    """Get keys from root dict."""
    return self.root.keys()

values()

Get values from root dict.

Source code in packages/testing/src/execution_testing/fixtures/pre_alloc_groups.py
437
438
439
440
441
def values(self) -> Generator[PreAllocGroup, None, None]:
    """Get values from root dict."""
    for value in self.root.values():
        assert value is not None, "Value is None"
        yield value

items()

Get items from root dict.

Source code in packages/testing/src/execution_testing/fixtures/pre_alloc_groups.py
443
444
445
446
447
def items(self) -> Generator[Tuple[str, PreAllocGroup], None, None]:
    """Get items from root dict."""
    for key, value in self.root.items():
        assert value is not None, f"Value for key {key} is None"
        yield key, value

StateFixture

Bases: BaseFixture

Fixture for a single StateTest.

Source code in packages/testing/src/execution_testing/fixtures/state.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
class StateFixture(BaseFixture):
    """Fixture for a single StateTest."""

    format_name: ClassVar[str] = "state_test"
    description: ClassVar[str] = "Tests that generate a state test fixture."

    env: FixtureEnvironment
    pre: Alloc
    transaction: FixtureTransaction
    post: Mapping[Fork | TransitionFork, List[FixtureForkPost]]
    config: FixtureConfig

    def get_fork(self) -> Fork | TransitionFork | None:
        """Return fork of the fixture as a string."""
        forks = list(self.post.keys())
        assert len(forks) == 1, "Expected state test fixture with single fork"
        return forks[0]

get_fork()

Return fork of the fixture as a string.

Source code in packages/testing/src/execution_testing/fixtures/state.py
126
127
128
129
130
def get_fork(self) -> Fork | TransitionFork | None:
    """Return fork of the fixture as a string."""
    forks = list(self.post.keys())
    assert len(forks) == 1, "Expected state test fixture with single fork"
    return forks[0]

TransactionFixture

Bases: BaseFixture

Fixture for a single TransactionTest.

Source code in packages/testing/src/execution_testing/fixtures/transaction.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
class TransactionFixture(BaseFixture):
    """Fixture for a single TransactionTest."""

    format_name: ClassVar[str] = "transaction_test"
    description: ClassVar[str] = (
        "Tests that generate a transaction test fixture."
    )

    result: Mapping[Fork | TransitionFork, FixtureResult]
    transaction: Bytes = Field(..., alias="txbytes")

    def get_fork(self) -> Fork | TransitionFork | None:
        """Return the fork of the fixture as a string."""
        forks = list(self.result.keys())
        assert len(forks) == 1, (
            "Expected transaction test fixture with single fork"
        )
        return forks[0]

get_fork()

Return the fork of the fixture as a string.

Source code in packages/testing/src/execution_testing/fixtures/transaction.py
40
41
42
43
44
45
46
def get_fork(self) -> Fork | TransitionFork | None:
    """Return the fork of the fixture as a string."""
    forks = list(self.result.keys())
    assert len(forks) == 1, (
        "Expected transaction test fixture with single fork"
    )
    return forks[0]