Skip to content

Ethereum Test Specs package

Test spec definitions and utilities.

BaseTest

Bases: BaseModel

Represents a base Ethereum test which must return a single test fixture.

Source code in packages/testing/src/execution_testing/specs/base.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
class BaseTest(BaseModel):
    """
    Represents a base Ethereum test which must return a single test fixture.
    """

    model_config = ConfigDict(extra="forbid")

    fork: Fork | TransitionFork = (
        BaseFork
        # default to BaseFork to allow the filler to set it,
        # instead of each test having to set it
    )
    operation_mode: OpMode | None = None
    gas_optimization_max_gas_limit: int | None = None
    expected_benchmark_gas_used: int | None = None
    skip_gas_used_validation: bool = False
    expected_receipt_status: int | None = None
    is_tx_gas_heavy_test: bool = False
    is_exception_test: bool = False

    # Class variables, to be set by subclasses
    spec_types: ClassVar[Dict[str, Type["BaseTest"]]] = {}
    supported_fixture_formats: ClassVar[
        Sequence[FixtureFormat | LabeledFixtureFormat]
    ] = []
    supported_execute_formats: ClassVar[Sequence[LabeledExecuteFormat]] = []

    supported_markers: ClassVar[Dict[str, str]] = {}

    def model_post_init(self, __context: Any, /) -> None:
        """
        Model post-init to assert that the custom pre-allocation was
        provided and the default was not used.
        """
        super().model_post_init(__context)
        assert self.fork != BaseFork, (
            "Fork was not provided by the filler/executor."
        )

    @classmethod
    def discard_fixture_format_by_marks(
        cls,
        fixture_format: FixtureFormat,
        fork: Fork | TransitionFork,
        markers: List[pytest.Mark],
    ) -> bool:
        """
        Discard a fixture format from filling if the appropriate marker is
        used.
        """
        del fork, fixture_format, markers
        return False

    @classmethod
    def __pydantic_init_subclass__(cls, **kwargs: Any) -> None:
        """
        Register all subclasses of BaseFixture with a fixture format name set
        as possible fixture formats.
        """
        super().__pydantic_init_subclass__(**kwargs)

        # Don't register dynamically generated wrapper classes
        if getattr(cls, "__is_base_test_wrapper__", False):
            return

        if cls.pytest_parameter_name():
            # Register the new fixture format
            BaseTest.spec_types[cls.pytest_parameter_name()] = cls

    @classmethod
    def from_test(
        cls: Type[Self],
        *,
        base_test: "BaseTest",
        **kwargs: Any,
    ) -> Self:
        """Create a test in a different format from a base test."""
        for k in BaseTest.model_fields.keys():
            if k not in kwargs and k in base_test.model_fields_set:
                kwargs[k] = getattr(base_test, k)
        return cls(**kwargs)

    @classmethod
    def discard_execute_format_by_marks(
        cls,
        execute_format: ExecuteFormat,
        fork: Fork | TransitionFork,
        markers: List[pytest.Mark],
    ) -> bool:
        """
        Discard an execute format from executing if the appropriate marker is
        used.
        """
        del execute_format, fork, markers
        return False

    @abstractmethod
    def generate(
        self,
        *,
        t8n: TransitionTool,
        fixture_format: FixtureFormat,
    ) -> FillResult:
        """Generate the test fixture using the given fixture format."""
        pass

    def execute(
        self,
        *,
        execute_format: ExecuteFormat,
    ) -> BaseExecute:
        """Generate the list of test fixtures."""
        raise Exception(f"Unsupported execute format: {execute_format}")

    @classmethod
    def pytest_parameter_name(cls) -> str:
        """
        Must return the name of the parameter used in pytest to select this
        spec type as filler for the test.

        By default, it returns the underscore separated name of the class.
        """
        if cls == BaseTest:
            return ""
        return reduce(
            lambda x, y: x + ("_" if y.isupper() else "") + y, cls.__name__
        ).lower()

    def check_exception_test(
        self,
        *,
        exception: bool,
    ) -> None:
        """Compare the test marker against the outcome of the test."""
        if self.is_exception_test != exception:
            if exception:
                raise Exception(
                    "Test produced an invalid block or transaction but was "
                    "not marked with the `exception_test` marker. Add the "
                    "`@pytest.mark.exception_test` decorator to the test."
                )
            else:
                raise Exception(
                    "Test didn't produce an invalid block or transaction but "
                    "was marked with the `exception_test` marker. Remove the "
                    "`@pytest.mark.exception_test` decorator from the test."
                )

    def get_genesis_environment(self) -> Environment:
        """
        Get the genesis environment for pre-allocation groups.

        Must be implemented by subclasses to provide the appropriate
        environment.
        """
        raise NotImplementedError(
            f"{self.__class__.__name__} must implement genesis environment "
            "access for use with pre-allocation groups."
        )

    def validate_benchmark_gas(
        self, *, benchmark_gas_used: int | None, gas_benchmark_value: int
    ) -> None:
        """
        Validates the total consumed gas of the last block in the test matches
        the expectation of the benchmark test.

        Requires the following fields to be set:
        - expected_benchmark_gas_used
        - operation_mode
        """
        if self.operation_mode != OpMode.BENCHMARKING:
            return
        assert benchmark_gas_used is not None, "_benchmark_gas_used is not set"
        # Perform gas validation if required for benchmarking.
        # Ensures benchmark tests consume exactly the expected gas.
        if not self.skip_gas_used_validation:
            # Verify that the total gas consumed in the last block
            # matches expectations
            expected_benchmark_gas_used = self.expected_benchmark_gas_used
            if expected_benchmark_gas_used is None:
                expected_benchmark_gas_used = gas_benchmark_value
            diff = benchmark_gas_used - expected_benchmark_gas_used
            assert benchmark_gas_used == expected_benchmark_gas_used, (
                f"Total gas used ({benchmark_gas_used}) does not "
                "match expected benchmark gas "
                f"({expected_benchmark_gas_used}), "
                f"difference: {diff}"
            )
        # Gas used should never exceed the maximum benchmark gas allowed.
        assert benchmark_gas_used <= gas_benchmark_value, (
            f"benchmark_gas_used ({benchmark_gas_used}) exceeds maximum "
            "benchmark gas allowed for this configuration: "
            f"{gas_benchmark_value}"
        )

    def validate_receipt_status(
        self,
        *,
        receipts: List[TransactionReceipt],
        block_number: int,
    ) -> None:
        """
        Validate receipt status for every transaction in a block.

        When expected_receipt_status is set, verify that all
        receipts match. Catches silent OOG failures that roll
        back state and invalidate benchmarks.
        """
        if "expected_receipt_status" not in self.model_fields_set:
            return
        for i, receipt in enumerate(receipts):
            if receipt.status is not None and (
                int(receipt.status) != self.expected_receipt_status
            ):
                raise Exception(
                    f"Transaction {i} in block "
                    f"{block_number} has receipt "
                    f"status {int(receipt.status)}, "
                    f"expected "
                    f"{self.expected_receipt_status}."
                )

model_post_init(__context)

Model post-init to assert that the custom pre-allocation was provided and the default was not used.

Source code in packages/testing/src/execution_testing/specs/base.py
131
132
133
134
135
136
137
138
139
def model_post_init(self, __context: Any, /) -> None:
    """
    Model post-init to assert that the custom pre-allocation was
    provided and the default was not used.
    """
    super().model_post_init(__context)
    assert self.fork != BaseFork, (
        "Fork was not provided by the filler/executor."
    )

discard_fixture_format_by_marks(fixture_format, fork, markers) classmethod

Discard a fixture format from filling if the appropriate marker is used.

Source code in packages/testing/src/execution_testing/specs/base.py
141
142
143
144
145
146
147
148
149
150
151
152
153
@classmethod
def discard_fixture_format_by_marks(
    cls,
    fixture_format: FixtureFormat,
    fork: Fork | TransitionFork,
    markers: List[pytest.Mark],
) -> bool:
    """
    Discard a fixture format from filling if the appropriate marker is
    used.
    """
    del fork, fixture_format, markers
    return False

__pydantic_init_subclass__(**kwargs) classmethod

Register all subclasses of BaseFixture with a fixture format name set as possible fixture formats.

Source code in packages/testing/src/execution_testing/specs/base.py
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
@classmethod
def __pydantic_init_subclass__(cls, **kwargs: Any) -> None:
    """
    Register all subclasses of BaseFixture with a fixture format name set
    as possible fixture formats.
    """
    super().__pydantic_init_subclass__(**kwargs)

    # Don't register dynamically generated wrapper classes
    if getattr(cls, "__is_base_test_wrapper__", False):
        return

    if cls.pytest_parameter_name():
        # Register the new fixture format
        BaseTest.spec_types[cls.pytest_parameter_name()] = cls

from_test(*, base_test, **kwargs) classmethod

Create a test in a different format from a base test.

Source code in packages/testing/src/execution_testing/specs/base.py
171
172
173
174
175
176
177
178
179
180
181
182
@classmethod
def from_test(
    cls: Type[Self],
    *,
    base_test: "BaseTest",
    **kwargs: Any,
) -> Self:
    """Create a test in a different format from a base test."""
    for k in BaseTest.model_fields.keys():
        if k not in kwargs and k in base_test.model_fields_set:
            kwargs[k] = getattr(base_test, k)
    return cls(**kwargs)

discard_execute_format_by_marks(execute_format, fork, markers) classmethod

Discard an execute format from executing if the appropriate marker is used.

Source code in packages/testing/src/execution_testing/specs/base.py
184
185
186
187
188
189
190
191
192
193
194
195
196
@classmethod
def discard_execute_format_by_marks(
    cls,
    execute_format: ExecuteFormat,
    fork: Fork | TransitionFork,
    markers: List[pytest.Mark],
) -> bool:
    """
    Discard an execute format from executing if the appropriate marker is
    used.
    """
    del execute_format, fork, markers
    return False

generate(*, t8n, fixture_format) abstractmethod

Generate the test fixture using the given fixture format.

Source code in packages/testing/src/execution_testing/specs/base.py
198
199
200
201
202
203
204
205
206
@abstractmethod
def generate(
    self,
    *,
    t8n: TransitionTool,
    fixture_format: FixtureFormat,
) -> FillResult:
    """Generate the test fixture using the given fixture format."""
    pass

execute(*, execute_format)

Generate the list of test fixtures.

Source code in packages/testing/src/execution_testing/specs/base.py
208
209
210
211
212
213
214
def execute(
    self,
    *,
    execute_format: ExecuteFormat,
) -> BaseExecute:
    """Generate the list of test fixtures."""
    raise Exception(f"Unsupported execute format: {execute_format}")

pytest_parameter_name() classmethod

Must return the name of the parameter used in pytest to select this spec type as filler for the test.

By default, it returns the underscore separated name of the class.

Source code in packages/testing/src/execution_testing/specs/base.py
216
217
218
219
220
221
222
223
224
225
226
227
228
@classmethod
def pytest_parameter_name(cls) -> str:
    """
    Must return the name of the parameter used in pytest to select this
    spec type as filler for the test.

    By default, it returns the underscore separated name of the class.
    """
    if cls == BaseTest:
        return ""
    return reduce(
        lambda x, y: x + ("_" if y.isupper() else "") + y, cls.__name__
    ).lower()

check_exception_test(*, exception)

Compare the test marker against the outcome of the test.

Source code in packages/testing/src/execution_testing/specs/base.py
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
def check_exception_test(
    self,
    *,
    exception: bool,
) -> None:
    """Compare the test marker against the outcome of the test."""
    if self.is_exception_test != exception:
        if exception:
            raise Exception(
                "Test produced an invalid block or transaction but was "
                "not marked with the `exception_test` marker. Add the "
                "`@pytest.mark.exception_test` decorator to the test."
            )
        else:
            raise Exception(
                "Test didn't produce an invalid block or transaction but "
                "was marked with the `exception_test` marker. Remove the "
                "`@pytest.mark.exception_test` decorator from the test."
            )

get_genesis_environment()

Get the genesis environment for pre-allocation groups.

Must be implemented by subclasses to provide the appropriate environment.

Source code in packages/testing/src/execution_testing/specs/base.py
250
251
252
253
254
255
256
257
258
259
260
def get_genesis_environment(self) -> Environment:
    """
    Get the genesis environment for pre-allocation groups.

    Must be implemented by subclasses to provide the appropriate
    environment.
    """
    raise NotImplementedError(
        f"{self.__class__.__name__} must implement genesis environment "
        "access for use with pre-allocation groups."
    )

validate_benchmark_gas(*, benchmark_gas_used, gas_benchmark_value)

Validates the total consumed gas of the last block in the test matches the expectation of the benchmark test.

Requires the following fields to be set: - expected_benchmark_gas_used - operation_mode

Source code in packages/testing/src/execution_testing/specs/base.py
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
def validate_benchmark_gas(
    self, *, benchmark_gas_used: int | None, gas_benchmark_value: int
) -> None:
    """
    Validates the total consumed gas of the last block in the test matches
    the expectation of the benchmark test.

    Requires the following fields to be set:
    - expected_benchmark_gas_used
    - operation_mode
    """
    if self.operation_mode != OpMode.BENCHMARKING:
        return
    assert benchmark_gas_used is not None, "_benchmark_gas_used is not set"
    # Perform gas validation if required for benchmarking.
    # Ensures benchmark tests consume exactly the expected gas.
    if not self.skip_gas_used_validation:
        # Verify that the total gas consumed in the last block
        # matches expectations
        expected_benchmark_gas_used = self.expected_benchmark_gas_used
        if expected_benchmark_gas_used is None:
            expected_benchmark_gas_used = gas_benchmark_value
        diff = benchmark_gas_used - expected_benchmark_gas_used
        assert benchmark_gas_used == expected_benchmark_gas_used, (
            f"Total gas used ({benchmark_gas_used}) does not "
            "match expected benchmark gas "
            f"({expected_benchmark_gas_used}), "
            f"difference: {diff}"
        )
    # Gas used should never exceed the maximum benchmark gas allowed.
    assert benchmark_gas_used <= gas_benchmark_value, (
        f"benchmark_gas_used ({benchmark_gas_used}) exceeds maximum "
        "benchmark gas allowed for this configuration: "
        f"{gas_benchmark_value}"
    )

validate_receipt_status(*, receipts, block_number)

Validate receipt status for every transaction in a block.

When expected_receipt_status is set, verify that all receipts match. Catches silent OOG failures that roll back state and invalidate benchmarks.

Source code in packages/testing/src/execution_testing/specs/base.py
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
def validate_receipt_status(
    self,
    *,
    receipts: List[TransactionReceipt],
    block_number: int,
) -> None:
    """
    Validate receipt status for every transaction in a block.

    When expected_receipt_status is set, verify that all
    receipts match. Catches silent OOG failures that roll
    back state and invalidate benchmarks.
    """
    if "expected_receipt_status" not in self.model_fields_set:
        return
    for i, receipt in enumerate(receipts):
        if receipt.status is not None and (
            int(receipt.status) != self.expected_receipt_status
        ):
            raise Exception(
                f"Transaction {i} in block "
                f"{block_number} has receipt "
                f"status {int(receipt.status)}, "
                f"expected "
                f"{self.expected_receipt_status}."
            )

BaseStaticTest

Bases: BaseModel

Represents a base class that reads cases from static files.

Source code in packages/testing/src/execution_testing/specs/base_static.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
class BaseStaticTest(BaseModel):
    """Represents a base class that reads cases from static files."""

    formats: ClassVar[List[Type["BaseStaticTest"]]] = []
    formats_type_adapter: ClassVar[TypeAdapter]

    format_name: ClassVar[str] = ""

    @classmethod
    def __pydantic_init_subclass__(cls, **kwargs: Any) -> None:
        """
        Register all subclasses of BaseStaticTest with a static test format
        name set as possible static test format.
        """
        if cls.format_name:
            # Register the new fixture format
            BaseStaticTest.formats.append(cls)
            if len(BaseStaticTest.formats) > 1:
                BaseStaticTest.formats_type_adapter = TypeAdapter(
                    Union[tuple(BaseStaticTest.formats)],
                )
            else:
                BaseStaticTest.formats_type_adapter = TypeAdapter(cls)

    @model_validator(mode="wrap")
    @classmethod
    def _parse_into_subclass(
        cls, v: Any, handler: ValidatorFunctionWrapHandler
    ) -> "BaseStaticTest":
        """Parse the static test into the correct subclass."""
        if cls is BaseStaticTest:
            return BaseStaticTest.formats_type_adapter.validate_python(v)
        return handler(v)

    @abstractmethod
    def fill_function(self) -> Callable:
        """
        Return the test function that can be used to fill the test.

        This method should be implemented by the subclasses.

        The function returned can be optionally decorated with the
        `@pytest.mark.parametrize` decorator to parametrize the test with the
        number of sub test cases.

        Example:
        ```
        @pytest.mark.parametrize("n", [1])
        @pytest.mark.parametrize("m", [1, 2])
        @pytest.mark.valid_from("Homestead")
        def test_state_filler(
            state_test: StateTestFiller,
            fork: Fork,
            pre: Alloc,
            n: int,
            m: int
        ):
            \"\"\"Generate a test from a static state filler.\"\"\"
            assert n == 1
            assert m in [1, 2]
            env = Environment(**self.env.model_dump())
            sender = pre.fund_eoa()
            tx = Transaction(
                ty=0x0,
                nonce=0,
                to=Address(0x1000),
                gas_limit=500000,
                protected=False if fork in [Frontier, Homestead] else True,
                data="",
                sender=sender,
            )
            state_test(env=env, pre=pre, post={}, tx=tx)
        ```

        To aid the generation of the test, the function can be defined and then
        the decorator be applied after defining the function:

        ```
        def test_state_filler(
        state_test: StateTestFiller,
            fork: Fork,
            pre: Alloc,
            n: int,
            m: int,
        ):

        ...

        test_state_filler = pytest.mark.parametrize("n",
            [1])(test_state_filler
        )
        test_state_filler = pytest.mark.parametrize("m",
            [1, 2])(test_state_filler
        )

        if self.valid_from:
            test_state_filler = pytest.mark.valid_from(
                self.valid_from
            )(test_state_filler)

        if self.valid_until:
            test_state_filler = pytest.mark.valid_until(
                self.valid_until
            )(test_state_filler)

        return test_state_filler
        ```

        The function can contain the following parameters on top of the spec
        type parameter (`state_test` in the example above): - `fork`: The fork
        for which the test is currently being filled. - `pre`: The pre-state of
        the test.

        """
        raise NotImplementedError

    @staticmethod
    def remove_comments(data: Dict) -> Dict:
        """Remove comments from a dictionary."""
        result = {}
        for k, v in data.items():
            if isinstance(k, str) and k.startswith("//"):
                continue
            if isinstance(v, dict):
                v = BaseStaticTest.remove_comments(v)
            elif isinstance(v, list):
                v = [
                    BaseStaticTest.remove_comments(i)
                    if isinstance(i, dict)
                    else i
                    for i in v
                ]
            result[k] = v
        return result

    @model_validator(mode="before")
    @classmethod
    def remove_comments_from_model(cls, data: Any) -> Any:
        """Remove comments from the static file loaded, if any."""
        if isinstance(data, dict):
            return BaseStaticTest.remove_comments(data)
        return data

__pydantic_init_subclass__(**kwargs) classmethod

Register all subclasses of BaseStaticTest with a static test format name set as possible static test format.

Source code in packages/testing/src/execution_testing/specs/base_static.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@classmethod
def __pydantic_init_subclass__(cls, **kwargs: Any) -> None:
    """
    Register all subclasses of BaseStaticTest with a static test format
    name set as possible static test format.
    """
    if cls.format_name:
        # Register the new fixture format
        BaseStaticTest.formats.append(cls)
        if len(BaseStaticTest.formats) > 1:
            BaseStaticTest.formats_type_adapter = TypeAdapter(
                Union[tuple(BaseStaticTest.formats)],
            )
        else:
            BaseStaticTest.formats_type_adapter = TypeAdapter(cls)

fill_function() abstractmethod

Return the test function that can be used to fill the test.

This method should be implemented by the subclasses.

The function returned can be optionally decorated with the @pytest.mark.parametrize decorator to parametrize the test with the number of sub test cases.

Example:

@pytest.mark.parametrize("n", [1])
@pytest.mark.parametrize("m", [1, 2])
@pytest.mark.valid_from("Homestead")
def test_state_filler(
    state_test: StateTestFiller,
    fork: Fork,
    pre: Alloc,
    n: int,
    m: int
):
    """Generate a test from a static state filler."""
    assert n == 1
    assert m in [1, 2]
    env = Environment(**self.env.model_dump())
    sender = pre.fund_eoa()
    tx = Transaction(
        ty=0x0,
        nonce=0,
        to=Address(0x1000),
        gas_limit=500000,
        protected=False if fork in [Frontier, Homestead] else True,
        data="",
        sender=sender,
    )
    state_test(env=env, pre=pre, post={}, tx=tx)

To aid the generation of the test, the function can be defined and then the decorator be applied after defining the function:

def test_state_filler(
state_test: StateTestFiller,
    fork: Fork,
    pre: Alloc,
    n: int,
    m: int,
):

...

test_state_filler = pytest.mark.parametrize("n",
    [1])(test_state_filler
)
test_state_filler = pytest.mark.parametrize("m",
    [1, 2])(test_state_filler
)

if self.valid_from:
    test_state_filler = pytest.mark.valid_from(
        self.valid_from
    )(test_state_filler)

if self.valid_until:
    test_state_filler = pytest.mark.valid_until(
        self.valid_until
    )(test_state_filler)

return test_state_filler

The function can contain the following parameters on top of the spec type parameter (state_test in the example above): - fork: The fork for which the test is currently being filled. - pre: The pre-state of the test.

Source code in packages/testing/src/execution_testing/specs/base_static.py
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
@abstractmethod
def fill_function(self) -> Callable:
    """
    Return the test function that can be used to fill the test.

    This method should be implemented by the subclasses.

    The function returned can be optionally decorated with the
    `@pytest.mark.parametrize` decorator to parametrize the test with the
    number of sub test cases.

    Example:
    ```
    @pytest.mark.parametrize("n", [1])
    @pytest.mark.parametrize("m", [1, 2])
    @pytest.mark.valid_from("Homestead")
    def test_state_filler(
        state_test: StateTestFiller,
        fork: Fork,
        pre: Alloc,
        n: int,
        m: int
    ):
        \"\"\"Generate a test from a static state filler.\"\"\"
        assert n == 1
        assert m in [1, 2]
        env = Environment(**self.env.model_dump())
        sender = pre.fund_eoa()
        tx = Transaction(
            ty=0x0,
            nonce=0,
            to=Address(0x1000),
            gas_limit=500000,
            protected=False if fork in [Frontier, Homestead] else True,
            data="",
            sender=sender,
        )
        state_test(env=env, pre=pre, post={}, tx=tx)
    ```

    To aid the generation of the test, the function can be defined and then
    the decorator be applied after defining the function:

    ```
    def test_state_filler(
    state_test: StateTestFiller,
        fork: Fork,
        pre: Alloc,
        n: int,
        m: int,
    ):

    ...

    test_state_filler = pytest.mark.parametrize("n",
        [1])(test_state_filler
    )
    test_state_filler = pytest.mark.parametrize("m",
        [1, 2])(test_state_filler
    )

    if self.valid_from:
        test_state_filler = pytest.mark.valid_from(
            self.valid_from
        )(test_state_filler)

    if self.valid_until:
        test_state_filler = pytest.mark.valid_until(
            self.valid_until
        )(test_state_filler)

    return test_state_filler
    ```

    The function can contain the following parameters on top of the spec
    type parameter (`state_test` in the example above): - `fork`: The fork
    for which the test is currently being filled. - `pre`: The pre-state of
    the test.

    """
    raise NotImplementedError

remove_comments(data) staticmethod

Remove comments from a dictionary.

Source code in packages/testing/src/execution_testing/specs/base_static.py
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
@staticmethod
def remove_comments(data: Dict) -> Dict:
    """Remove comments from a dictionary."""
    result = {}
    for k, v in data.items():
        if isinstance(k, str) and k.startswith("//"):
            continue
        if isinstance(v, dict):
            v = BaseStaticTest.remove_comments(v)
        elif isinstance(v, list):
            v = [
                BaseStaticTest.remove_comments(i)
                if isinstance(i, dict)
                else i
                for i in v
            ]
        result[k] = v
    return result

remove_comments_from_model(data) classmethod

Remove comments from the static file loaded, if any.

Source code in packages/testing/src/execution_testing/specs/base_static.py
154
155
156
157
158
159
160
@model_validator(mode="before")
@classmethod
def remove_comments_from_model(cls, data: Any) -> Any:
    """Remove comments from the static file loaded, if any."""
    if isinstance(data, dict):
        return BaseStaticTest.remove_comments(data)
    return data

BenchmarkTest

Bases: BaseTest

Test type designed specifically for benchmark test cases.

Source code in packages/testing/src/execution_testing/specs/benchmark.py
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
class BenchmarkTest(BaseTest):
    """Test type designed specifically for benchmark test cases."""

    model_config = ConfigDict(extra="forbid")

    pre: Alloc = Field(default_factory=Alloc)
    post: Alloc = Field(default_factory=Alloc)
    tx: Transaction | None = None
    setup_blocks: List[Block] = Field(default_factory=list)
    blocks: List[Block] | None = None
    block_exception: (
        List[TransactionException | BlockException]
        | TransactionException
        | BlockException
        | None
    ) = None
    env: Environment = Field(default_factory=Environment)
    expected_benchmark_gas_used: int | None = None
    gas_benchmark_value: int = Field(
        default_factory=lambda: int(Environment().gas_limit)
    )
    fixed_opcode_count: float | None = None
    expected_opcode_count: int | None = None
    target_opcode: Op | OpcodeTarget | None = None
    code_generator: BenchmarkCodeGenerator | None = None
    # By default, benchmark tests require neither of these
    include_full_post_state_in_output: bool = False
    include_tx_receipts_in_output: bool = False

    supported_fixture_formats: ClassVar[
        Sequence[FixtureFormat | LabeledFixtureFormat]
    ] = [
        BlockchainFixture,
        BlockchainEngineFixture,
        BlockchainEngineXFixture,
        BlockchainEngineStatefulFixture,
    ]

    supported_execute_formats: ClassVar[Sequence[LabeledExecuteFormat]] = [
        LabeledExecuteFormat(
            TransactionPost,
            "benchmark_test",
            "An execute test derived from a benchmark test",
        ),
    ]

    supported_markers: ClassVar[Dict[str, str]] = {
        "blockchain_test_engine_only": (
            "Only generate a blockchain test engine fixture"
        ),
        "blockchain_test_only": "Only generate a blockchain test fixture",
        "repricing": "Mark test as reference test for gas repricing analysis",
    }

    def model_post_init(self, __context: Any, /) -> None:
        """
        Model post-init to assert that the custom pre-allocation was
        provided and the default was not used.
        """
        super().model_post_init(__context)
        assert "pre" in self.model_fields_set, (
            "pre allocation was not provided"
        )

        set_props = [
            name
            for name, val in [
                ("code_generator", self.code_generator),
                ("blocks", self.blocks),
                ("tx", self.tx),
            ]
            if val is not None
        ]

        if len(set_props) != 1:
            raise ValueError(
                f"Exactly one must be set, but got {len(set_props)}: "
                f"{', '.join(set_props)}"
            )

        blocks: List[Block] = self.setup_blocks

        if self.fixed_opcode_count is not None and self.code_generator is None:
            pytest.skip(
                "Cannot run fixed opcode count tests without a code generator"
            )

        if self.code_generator is not None:
            # Inject fixed_opcode_count into the code generator if provided
            self.code_generator.fixed_opcode_count = self.fixed_opcode_count

            # In fixed opcode count mode, skip gas validation since we're
            # measuring performance by operation count, not gas usage
            if self.fixed_opcode_count is not None:
                self.skip_gas_used_validation = True
                generated_blocks = (
                    self.generate_fixed_opcode_count_transactions()
                )
            else:
                generated_blocks = self.generate_blocks_from_code_generator()
            blocks += generated_blocks

        elif self.blocks is not None:
            blocks += self.blocks

        elif self.tx is not None:
            gas_limit = (
                self.fork.fork_at(
                    block_number=0, timestamp=0
                ).transaction_gas_limit_cap()
                or self.gas_benchmark_value
            )

            transactions = self.split_transaction(self.tx, gas_limit)

            blocks.append(Block(txs=transactions))

        else:
            raise ValueError(
                "Cannot create BlockchainTest without a code generator, "
                "transactions, or blocks"
            )

        self.blocks = blocks

    @classmethod
    def pytest_parameter_name(cls) -> str:
        """
        Return the parameter name used in pytest
        to select this spec type.
        """
        return "benchmark_test"

    @classmethod
    def discard_fixture_format_by_marks(
        cls,
        fixture_format: FixtureFormat,
        fork: Fork | TransitionFork,
        markers: List[pytest.Mark],
    ) -> bool:
        """
        Discard a fixture format from filling if the
        appropriate marker is used.
        """
        del fork

        if "blockchain_test_only" in [m.name for m in markers]:
            return fixture_format != BlockchainFixture
        if "blockchain_test_engine_only" in [m.name for m in markers]:
            return fixture_format != BlockchainEngineFixture
        return False

    def get_genesis_environment(self) -> Environment:
        """Get the genesis environment for this benchmark test."""
        return self.generate_blockchain_test().get_genesis_environment()

    def split_transaction(
        self, tx: Transaction, gas_limit_cap: int | None
    ) -> List[Transaction]:
        """
        Split a transaction that exceeds the gas
        limit cap into multiple transactions.
        """
        if gas_limit_cap is None:
            tx.gas_limit = HexNumber(self.gas_benchmark_value)
            return [tx]

        if gas_limit_cap >= self.gas_benchmark_value:
            tx.gas_limit = HexNumber(self.gas_benchmark_value)
            return [tx]

        num_splits = math.ceil(self.gas_benchmark_value / gas_limit_cap)
        remaining_gas = self.gas_benchmark_value

        split_transactions = []
        for i in range(num_splits):
            split_tx = tx.model_copy()
            split_tx.gas_limit = HexNumber(
                remaining_gas if i == num_splits - 1 else gas_limit_cap
            )
            remaining_gas -= gas_limit_cap
            split_tx.nonce = HexNumber(tx.nonce + i)
            split_transactions.append(split_tx)

        return split_transactions

    def generate_blocks_from_code_generator(self) -> List[Block]:
        """Generate blocks using the code generator."""
        if self.code_generator is None:
            raise Exception("Code generator is not set")
        self.code_generator.deploy_contracts(
            pre=self.pre, fork=self.fork.fork_at(block_number=0, timestamp=0)
        )
        gas_limit = (
            self.fork.fork_at(
                block_number=0, timestamp=0
            ).transaction_gas_limit_cap()
            or self.gas_benchmark_value
        )
        benchmark_tx = self.code_generator.generate_transaction(
            pre=self.pre, gas_benchmark_value=gas_limit
        )

        execution_txs = self.split_transaction(benchmark_tx, gas_limit)
        execution_block = Block(txs=execution_txs)

        return [execution_block]

    def generate_fixed_opcode_count_transactions(self) -> List[Block]:
        """Generate transactions with a fixed opcode count."""
        if self.code_generator is None:
            raise Exception("Code generator is not set")
        self.code_generator.deploy_fix_count_contracts(
            pre=self.pre, fork=self.fork.fork_at(block_number=0, timestamp=0)
        )
        gas_limit = (
            self.fork.fork_at(
                block_number=0, timestamp=0
            ).transaction_gas_limit_cap()
            or self.gas_benchmark_value
        )
        benchmark_tx = self.code_generator.generate_transaction(
            pre=self.pre, gas_benchmark_value=gas_limit
        )
        execution_block = Block(txs=[benchmark_tx])
        return [execution_block]

    def generate_blockchain_test(self) -> BlockchainTest:
        """Create a BlockchainTest from this BenchmarkTest."""
        return BlockchainTest.from_test(
            base_test=self,
            genesis_environment=self.env,
            pre=self.pre,
            post=self.post,
            blocks=self.blocks,
            include_full_post_state_in_output=self.include_full_post_state_in_output,
            include_tx_receipts_in_output=self.include_tx_receipts_in_output,
        )

    @staticmethod
    def _verify_target_opcode_count(
        *,
        target_opcode: Op | OpcodeTarget,
        opcode_count: OpcodeCount,
        expected_opcode_count: int,
        tolerance: float = 0.05,
    ) -> None:
        """Verify target opcode was executed the expected number of times."""
        count_opcode = (
            target_opcode.opcode
            if isinstance(target_opcode, OpcodeTarget)
            else target_opcode
        )
        actual = opcode_count.root.get(count_opcode, 0)
        max_difference = expected_opcode_count * tolerance  # 5% tolerance

        if abs(actual - expected_opcode_count) > max_difference:
            raise ValueError(
                f"Target opcode {target_opcode} count mismatch: "
                f"expected ~{expected_opcode_count} "
                f"(±{tolerance * 100}%), got {actual}"
            )

    def generate(
        self,
        t8n: TransitionTool,
        fixture_format: FixtureFormat,
    ) -> FillResult:
        """Generate the blockchain test fixture."""
        self.check_exception_test(
            exception=self.tx.error is not None if self.tx else False
        )
        if fixture_format in BlockchainTest.supported_fixture_formats:
            fill_result = self.generate_blockchain_test().generate(
                t8n=t8n, fixture_format=fixture_format
            )

            # Verify target opcode count if specified
            if (
                self.fixed_opcode_count is not None
                or self.expected_opcode_count is not None
            ):
                target_opcode = self.target_opcode
                assert target_opcode is not None, "target_opcode is not set"
                opcode_count = fill_result.benchmark_opcode_count
                if opcode_count is not None:
                    if self.fixed_opcode_count is not None:
                        self._verify_target_opcode_count(
                            target_opcode=target_opcode,
                            opcode_count=opcode_count,
                            expected_opcode_count=int(
                                # fixed_opcode_count is in thousands units
                                self.fixed_opcode_count * 1000
                            ),
                        )
                    elif self.expected_opcode_count is not None:
                        self._verify_target_opcode_count(
                            target_opcode=target_opcode,
                            opcode_count=opcode_count,
                            expected_opcode_count=self.expected_opcode_count,
                        )
                    else:
                        raise Exception(
                            "Opcode verification needs either "
                            "`fixed_opcode_count` or `expected_opcode_count` "
                            "to be set."
                        )

            if self.target_opcode is not None:
                fill_result.metadata["target_opcode"] = str(self.target_opcode)

            return fill_result
        else:
            raise Exception(f"Unsupported fixture format: {fixture_format}")

    def execute(
        self,
        *,
        execute_format: ExecuteFormat,
    ) -> BaseExecute:
        """Execute the benchmark test by sending it to the live network."""
        if execute_format == TransactionPost:
            assert self.blocks is not None
            return TransactionPost(
                blocks=[block.txs for block in self.blocks],
                post=self.post,
                benchmark_mode=True,
            )
        raise Exception(f"Unsupported execute format: {execute_format}")

model_post_init(__context)

Model post-init to assert that the custom pre-allocation was provided and the default was not used.

Source code in packages/testing/src/execution_testing/specs/benchmark.py
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
def model_post_init(self, __context: Any, /) -> None:
    """
    Model post-init to assert that the custom pre-allocation was
    provided and the default was not used.
    """
    super().model_post_init(__context)
    assert "pre" in self.model_fields_set, (
        "pre allocation was not provided"
    )

    set_props = [
        name
        for name, val in [
            ("code_generator", self.code_generator),
            ("blocks", self.blocks),
            ("tx", self.tx),
        ]
        if val is not None
    ]

    if len(set_props) != 1:
        raise ValueError(
            f"Exactly one must be set, but got {len(set_props)}: "
            f"{', '.join(set_props)}"
        )

    blocks: List[Block] = self.setup_blocks

    if self.fixed_opcode_count is not None and self.code_generator is None:
        pytest.skip(
            "Cannot run fixed opcode count tests without a code generator"
        )

    if self.code_generator is not None:
        # Inject fixed_opcode_count into the code generator if provided
        self.code_generator.fixed_opcode_count = self.fixed_opcode_count

        # In fixed opcode count mode, skip gas validation since we're
        # measuring performance by operation count, not gas usage
        if self.fixed_opcode_count is not None:
            self.skip_gas_used_validation = True
            generated_blocks = (
                self.generate_fixed_opcode_count_transactions()
            )
        else:
            generated_blocks = self.generate_blocks_from_code_generator()
        blocks += generated_blocks

    elif self.blocks is not None:
        blocks += self.blocks

    elif self.tx is not None:
        gas_limit = (
            self.fork.fork_at(
                block_number=0, timestamp=0
            ).transaction_gas_limit_cap()
            or self.gas_benchmark_value
        )

        transactions = self.split_transaction(self.tx, gas_limit)

        blocks.append(Block(txs=transactions))

    else:
        raise ValueError(
            "Cannot create BlockchainTest without a code generator, "
            "transactions, or blocks"
        )

    self.blocks = blocks

pytest_parameter_name() classmethod

Return the parameter name used in pytest to select this spec type.

Source code in packages/testing/src/execution_testing/specs/benchmark.py
412
413
414
415
416
417
418
@classmethod
def pytest_parameter_name(cls) -> str:
    """
    Return the parameter name used in pytest
    to select this spec type.
    """
    return "benchmark_test"

discard_fixture_format_by_marks(fixture_format, fork, markers) classmethod

Discard a fixture format from filling if the appropriate marker is used.

Source code in packages/testing/src/execution_testing/specs/benchmark.py
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
@classmethod
def discard_fixture_format_by_marks(
    cls,
    fixture_format: FixtureFormat,
    fork: Fork | TransitionFork,
    markers: List[pytest.Mark],
) -> bool:
    """
    Discard a fixture format from filling if the
    appropriate marker is used.
    """
    del fork

    if "blockchain_test_only" in [m.name for m in markers]:
        return fixture_format != BlockchainFixture
    if "blockchain_test_engine_only" in [m.name for m in markers]:
        return fixture_format != BlockchainEngineFixture
    return False

get_genesis_environment()

Get the genesis environment for this benchmark test.

Source code in packages/testing/src/execution_testing/specs/benchmark.py
439
440
441
def get_genesis_environment(self) -> Environment:
    """Get the genesis environment for this benchmark test."""
    return self.generate_blockchain_test().get_genesis_environment()

split_transaction(tx, gas_limit_cap)

Split a transaction that exceeds the gas limit cap into multiple transactions.

Source code in packages/testing/src/execution_testing/specs/benchmark.py
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
def split_transaction(
    self, tx: Transaction, gas_limit_cap: int | None
) -> List[Transaction]:
    """
    Split a transaction that exceeds the gas
    limit cap into multiple transactions.
    """
    if gas_limit_cap is None:
        tx.gas_limit = HexNumber(self.gas_benchmark_value)
        return [tx]

    if gas_limit_cap >= self.gas_benchmark_value:
        tx.gas_limit = HexNumber(self.gas_benchmark_value)
        return [tx]

    num_splits = math.ceil(self.gas_benchmark_value / gas_limit_cap)
    remaining_gas = self.gas_benchmark_value

    split_transactions = []
    for i in range(num_splits):
        split_tx = tx.model_copy()
        split_tx.gas_limit = HexNumber(
            remaining_gas if i == num_splits - 1 else gas_limit_cap
        )
        remaining_gas -= gas_limit_cap
        split_tx.nonce = HexNumber(tx.nonce + i)
        split_transactions.append(split_tx)

    return split_transactions

generate_blocks_from_code_generator()

Generate blocks using the code generator.

Source code in packages/testing/src/execution_testing/specs/benchmark.py
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
def generate_blocks_from_code_generator(self) -> List[Block]:
    """Generate blocks using the code generator."""
    if self.code_generator is None:
        raise Exception("Code generator is not set")
    self.code_generator.deploy_contracts(
        pre=self.pre, fork=self.fork.fork_at(block_number=0, timestamp=0)
    )
    gas_limit = (
        self.fork.fork_at(
            block_number=0, timestamp=0
        ).transaction_gas_limit_cap()
        or self.gas_benchmark_value
    )
    benchmark_tx = self.code_generator.generate_transaction(
        pre=self.pre, gas_benchmark_value=gas_limit
    )

    execution_txs = self.split_transaction(benchmark_tx, gas_limit)
    execution_block = Block(txs=execution_txs)

    return [execution_block]

generate_fixed_opcode_count_transactions()

Generate transactions with a fixed opcode count.

Source code in packages/testing/src/execution_testing/specs/benchmark.py
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
def generate_fixed_opcode_count_transactions(self) -> List[Block]:
    """Generate transactions with a fixed opcode count."""
    if self.code_generator is None:
        raise Exception("Code generator is not set")
    self.code_generator.deploy_fix_count_contracts(
        pre=self.pre, fork=self.fork.fork_at(block_number=0, timestamp=0)
    )
    gas_limit = (
        self.fork.fork_at(
            block_number=0, timestamp=0
        ).transaction_gas_limit_cap()
        or self.gas_benchmark_value
    )
    benchmark_tx = self.code_generator.generate_transaction(
        pre=self.pre, gas_benchmark_value=gas_limit
    )
    execution_block = Block(txs=[benchmark_tx])
    return [execution_block]

generate_blockchain_test()

Create a BlockchainTest from this BenchmarkTest.

Source code in packages/testing/src/execution_testing/specs/benchmark.py
514
515
516
517
518
519
520
521
522
523
524
def generate_blockchain_test(self) -> BlockchainTest:
    """Create a BlockchainTest from this BenchmarkTest."""
    return BlockchainTest.from_test(
        base_test=self,
        genesis_environment=self.env,
        pre=self.pre,
        post=self.post,
        blocks=self.blocks,
        include_full_post_state_in_output=self.include_full_post_state_in_output,
        include_tx_receipts_in_output=self.include_tx_receipts_in_output,
    )

generate(t8n, fixture_format)

Generate the blockchain test fixture.

Source code in packages/testing/src/execution_testing/specs/benchmark.py
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
def generate(
    self,
    t8n: TransitionTool,
    fixture_format: FixtureFormat,
) -> FillResult:
    """Generate the blockchain test fixture."""
    self.check_exception_test(
        exception=self.tx.error is not None if self.tx else False
    )
    if fixture_format in BlockchainTest.supported_fixture_formats:
        fill_result = self.generate_blockchain_test().generate(
            t8n=t8n, fixture_format=fixture_format
        )

        # Verify target opcode count if specified
        if (
            self.fixed_opcode_count is not None
            or self.expected_opcode_count is not None
        ):
            target_opcode = self.target_opcode
            assert target_opcode is not None, "target_opcode is not set"
            opcode_count = fill_result.benchmark_opcode_count
            if opcode_count is not None:
                if self.fixed_opcode_count is not None:
                    self._verify_target_opcode_count(
                        target_opcode=target_opcode,
                        opcode_count=opcode_count,
                        expected_opcode_count=int(
                            # fixed_opcode_count is in thousands units
                            self.fixed_opcode_count * 1000
                        ),
                    )
                elif self.expected_opcode_count is not None:
                    self._verify_target_opcode_count(
                        target_opcode=target_opcode,
                        opcode_count=opcode_count,
                        expected_opcode_count=self.expected_opcode_count,
                    )
                else:
                    raise Exception(
                        "Opcode verification needs either "
                        "`fixed_opcode_count` or `expected_opcode_count` "
                        "to be set."
                    )

        if self.target_opcode is not None:
            fill_result.metadata["target_opcode"] = str(self.target_opcode)

        return fill_result
    else:
        raise Exception(f"Unsupported fixture format: {fixture_format}")

execute(*, execute_format)

Execute the benchmark test by sending it to the live network.

Source code in packages/testing/src/execution_testing/specs/benchmark.py
602
603
604
605
606
607
608
609
610
611
612
613
614
615
def execute(
    self,
    *,
    execute_format: ExecuteFormat,
) -> BaseExecute:
    """Execute the benchmark test by sending it to the live network."""
    if execute_format == TransactionPost:
        assert self.blocks is not None
        return TransactionPost(
            blocks=[block.txs for block in self.blocks],
            post=self.post,
            benchmark_mode=True,
        )
    raise Exception(f"Unsupported execute format: {execute_format}")

OpcodeTarget dataclass

Map a display name to an underlying opcode for count validation.

Use when the fixture metadata should show a descriptive label (e.g. a precompile name) while opcode-count validation targets the real EVM opcode that gets executed (e.g. STATICCALL).

Source code in packages/testing/src/execution_testing/specs/benchmark.py
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
@dataclass(frozen=True)
class OpcodeTarget:
    """
    Map a display name to an underlying opcode for count validation.

    Use when the fixture metadata should show a descriptive label (e.g. a
    precompile name) while opcode-count validation targets the real EVM
    opcode that gets executed (e.g. STATICCALL).
    """

    name: str
    opcode: Op

    def __str__(self) -> str:
        """Return the display name."""
        return self.name

__str__()

Return the display name.

Source code in packages/testing/src/execution_testing/specs/benchmark.py
62
63
64
def __str__(self) -> str:
    """Return the display name."""
    return self.name

BlobsTest

Bases: BaseTest

Test specification for blob tests.

Source code in packages/testing/src/execution_testing/specs/blobs.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
class BlobsTest(BaseTest):
    """Test specification for blob tests."""

    pre: Alloc
    txs: List[NetworkWrappedTransaction | Transaction]
    nonexisting_blob_hashes: List[Hash] | None = None
    get_blobs_version: int | None = None

    supported_execute_formats: ClassVar[Sequence[LabeledExecuteFormat]] = [
        LabeledExecuteFormat(
            BlobTransaction,
            "blob_transaction_test",
            "A test that executes a blob transaction",
        ),
    ]

    def generate(
        self,
        *,
        t8n: TransitionTool,
        fixture_format: FixtureFormat,
    ) -> FillResult:
        """Generate the list of test fixtures."""
        del t8n
        raise Exception(f"Unknown fixture format: {fixture_format}")

    def execute(
        self,
        *,
        execute_format: ExecuteFormat,
    ) -> BaseExecute:
        """Generate the list of test fixtures."""
        if execute_format == BlobTransaction:
            return BlobTransaction(
                txs=self.txs,
                nonexisting_blob_hashes=self.nonexisting_blob_hashes,
                get_blobs_version=self.get_blobs_version,
            )
        raise Exception(f"Unsupported execute format: {execute_format}")

generate(*, t8n, fixture_format)

Generate the list of test fixtures.

Source code in packages/testing/src/execution_testing/specs/blobs.py
36
37
38
39
40
41
42
43
44
def generate(
    self,
    *,
    t8n: TransitionTool,
    fixture_format: FixtureFormat,
) -> FillResult:
    """Generate the list of test fixtures."""
    del t8n
    raise Exception(f"Unknown fixture format: {fixture_format}")

execute(*, execute_format)

Generate the list of test fixtures.

Source code in packages/testing/src/execution_testing/specs/blobs.py
46
47
48
49
50
51
52
53
54
55
56
57
58
def execute(
    self,
    *,
    execute_format: ExecuteFormat,
) -> BaseExecute:
    """Generate the list of test fixtures."""
    if execute_format == BlobTransaction:
        return BlobTransaction(
            txs=self.txs,
            nonexisting_blob_hashes=self.nonexisting_blob_hashes,
            get_blobs_version=self.get_blobs_version,
        )
    raise Exception(f"Unsupported execute format: {execute_format}")

Block

Bases: Header

Block type used to describe block properties in test specs.

Source code in packages/testing/src/execution_testing/specs/blockchain.py
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
class Block(Header):
    """Block type used to describe block properties in test specs."""

    header_verify: Header | None = None
    # If set, the block header will be verified against the specified values.
    rlp_modifier: Header | None = None
    """
    An RLP modifying header which values would be used to override the ones
    returned by the `ethereum_clis.TransitionTool`.
    """
    expected_block_access_list: BlockAccessListExpectation | None = None
    """
    If set, the block access list will be verified and potentially corrupted
    for invalid tests.
    """
    exception: BLOCK_EXCEPTION_TYPE = None
    # If set, the block is expected to be rejected by the client.
    skip_exception_verification: bool = False
    """
    Skip verifying that the exception is returned by the transition tool. This
    could be because the exception is inserted in the block after the
    transition tool evaluates it.
    """
    engine_api_error_code: EngineAPIError | None = None
    """
    If set, the block is expected to produce an error response from the Engine
    API.
    """
    include_receipts_in_output: bool | None = None
    """
    If set to `True`, the block’s output fixture representation will include
    full transaction receipts. If unset, the test-level value is used.
    """
    txs: List[Transaction] = Field(default_factory=list)
    """List of transactions included in the block."""
    ommers: List[Header] | None = None
    """List of ommer headers included in the block."""
    withdrawals: List[Withdrawal] | None = None
    """List of withdrawals to perform for this block."""
    requests: List[Bytes] | None = None
    """Custom list of requests to embed in this block."""
    expected_post_state: Alloc | None = None
    """Post state for verification after block execution in BlockchainTest"""
    block_access_list: Bytes | None = Field(None)
    """EIP-7928: Block-level access lists (serialized)."""
    expected_gas_used: int | None = None
    """Expected gas used for the block."""

    @property
    def phase(self) -> TestPhase | None:
        """
        Return the single phase shared by all txs, or ``None`` when the
        block has no phase-tagged txs.

        Mixed-phase blocks must be split via ``_split_blocks_by_phase``
        before this property is read — they would otherwise need an
        arbitrary tiebreaker, which is a bug, not a default.
        """
        phases = {_tx_phase(tx) for tx in self.txs}
        phases.discard(None)
        if not phases:
            return None
        if len(phases) == 1:
            return next(iter(phases))
        raise AssertionError(
            f"Block.phase called on mixed-phase block (phases={phases}); "
            "split via _split_blocks_by_phase first."
        )

    def set_environment(self, env: Environment) -> Environment:
        """
        Create copy of the environment with the characteristics of this
        specific block.
        """
        new_env_values: Dict[str, Any] = {}

        """
        Values that need to be set in the environment and are `None` for this
        block need to be set to their defaults.
        """
        new_env_values["difficulty"] = self.difficulty
        new_env_values["prev_randao"] = self.prev_randao
        new_env_values["fee_recipient"] = (
            self.fee_recipient
            if self.fee_recipient is not None
            else Environment().fee_recipient
        )
        new_env_values["gas_limit"] = (
            self.gas_limit or env.parent_gas_limit or Environment().gas_limit
        )
        if not isinstance(self.base_fee_per_gas, Removable):
            new_env_values["base_fee_per_gas"] = self.base_fee_per_gas
        new_env_values["withdrawals"] = self.withdrawals
        if not isinstance(self.excess_blob_gas, Removable):
            new_env_values["excess_blob_gas"] = self.excess_blob_gas
        if not isinstance(self.blob_gas_used, Removable):
            new_env_values["blob_gas_used"] = self.blob_gas_used
        if not isinstance(self.parent_beacon_block_root, Removable):
            new_env_values["parent_beacon_block_root"] = (
                self.parent_beacon_block_root
            )
        if (
            not isinstance(self.requests_hash, Removable)
            and self.block_access_list is not None
        ):
            new_env_values["block_access_list_hash"] = (
                self.block_access_list.keccak256()
            )
            new_env_values["block_access_list"] = self.block_access_list
        if (
            not isinstance(self.block_access_list, Removable)
            and self.block_access_list is not None
        ):
            new_env_values["block_access_list"] = self.block_access_list
        if not isinstance(self.slot_number, Removable):
            new_env_values["slot_number"] = self.slot_number
        """
        These values are required, but they depend on the previous environment,
        so they can be calculated here.
        """
        if self.number is not None:
            new_env_values["number"] = self.number
        else:
            # calculate the next block number for the environment
            if len(env.block_hashes) == 0:
                new_env_values["number"] = 0
            else:
                new_env_values["number"] = (
                    max([Number(n) for n in env.block_hashes.keys()]) + 1
                )

        if self.timestamp is not None:
            new_env_values["timestamp"] = self.timestamp
        else:
            assert env.parent_timestamp is not None
            new_env_values["timestamp"] = int(
                Number(env.parent_timestamp) + 12
            )

        return env.copy(**new_env_values)

rlp_modifier = None class-attribute instance-attribute

An RLP modifying header which values would be used to override the ones returned by the ethereum_clis.TransitionTool.

expected_block_access_list = None class-attribute instance-attribute

If set, the block access list will be verified and potentially corrupted for invalid tests.

skip_exception_verification = False class-attribute instance-attribute

Skip verifying that the exception is returned by the transition tool. This could be because the exception is inserted in the block after the transition tool evaluates it.

engine_api_error_code = None class-attribute instance-attribute

If set, the block is expected to produce an error response from the Engine API.

include_receipts_in_output = None class-attribute instance-attribute

If set to True, the block’s output fixture representation will include full transaction receipts. If unset, the test-level value is used.

txs = Field(default_factory=list) class-attribute instance-attribute

List of transactions included in the block.

ommers = None class-attribute instance-attribute

List of ommer headers included in the block.

withdrawals = None class-attribute instance-attribute

List of withdrawals to perform for this block.

requests = None class-attribute instance-attribute

Custom list of requests to embed in this block.

expected_post_state = None class-attribute instance-attribute

Post state for verification after block execution in BlockchainTest

block_access_list = Field(None) class-attribute instance-attribute

EIP-7928: Block-level access lists (serialized).

expected_gas_used = None class-attribute instance-attribute

Expected gas used for the block.

phase property

Return the single phase shared by all txs, or None when the block has no phase-tagged txs.

Mixed-phase blocks must be split via _split_blocks_by_phase before this property is read — they would otherwise need an arbitrary tiebreaker, which is a bug, not a default.

set_environment(env)

Create copy of the environment with the characteristics of this specific block.

Source code in packages/testing/src/execution_testing/specs/blockchain.py
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
def set_environment(self, env: Environment) -> Environment:
    """
    Create copy of the environment with the characteristics of this
    specific block.
    """
    new_env_values: Dict[str, Any] = {}

    """
    Values that need to be set in the environment and are `None` for this
    block need to be set to their defaults.
    """
    new_env_values["difficulty"] = self.difficulty
    new_env_values["prev_randao"] = self.prev_randao
    new_env_values["fee_recipient"] = (
        self.fee_recipient
        if self.fee_recipient is not None
        else Environment().fee_recipient
    )
    new_env_values["gas_limit"] = (
        self.gas_limit or env.parent_gas_limit or Environment().gas_limit
    )
    if not isinstance(self.base_fee_per_gas, Removable):
        new_env_values["base_fee_per_gas"] = self.base_fee_per_gas
    new_env_values["withdrawals"] = self.withdrawals
    if not isinstance(self.excess_blob_gas, Removable):
        new_env_values["excess_blob_gas"] = self.excess_blob_gas
    if not isinstance(self.blob_gas_used, Removable):
        new_env_values["blob_gas_used"] = self.blob_gas_used
    if not isinstance(self.parent_beacon_block_root, Removable):
        new_env_values["parent_beacon_block_root"] = (
            self.parent_beacon_block_root
        )
    if (
        not isinstance(self.requests_hash, Removable)
        and self.block_access_list is not None
    ):
        new_env_values["block_access_list_hash"] = (
            self.block_access_list.keccak256()
        )
        new_env_values["block_access_list"] = self.block_access_list
    if (
        not isinstance(self.block_access_list, Removable)
        and self.block_access_list is not None
    ):
        new_env_values["block_access_list"] = self.block_access_list
    if not isinstance(self.slot_number, Removable):
        new_env_values["slot_number"] = self.slot_number
    """
    These values are required, but they depend on the previous environment,
    so they can be calculated here.
    """
    if self.number is not None:
        new_env_values["number"] = self.number
    else:
        # calculate the next block number for the environment
        if len(env.block_hashes) == 0:
            new_env_values["number"] = 0
        else:
            new_env_values["number"] = (
                max([Number(n) for n in env.block_hashes.keys()]) + 1
            )

    if self.timestamp is not None:
        new_env_values["timestamp"] = self.timestamp
    else:
        assert env.parent_timestamp is not None
        new_env_values["timestamp"] = int(
            Number(env.parent_timestamp) + 12
        )

    return env.copy(**new_env_values)

BlockchainTest

Bases: BaseTest

Filler type that tests multiple blocks (valid or invalid) in a chain.

Source code in packages/testing/src/execution_testing/specs/blockchain.py
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
class BlockchainTest(BaseTest):
    """Filler type that tests multiple blocks (valid or invalid) in a chain."""

    pre: Alloc
    post: Alloc
    blocks: List[Block]
    genesis_environment: Environment = Field(default_factory=Environment)
    chain_id: int = Field(
        default_factory=lambda: ChainConfigDefaults.chain_id,
        validate_default=True,
    )
    include_full_post_state_in_output: bool = True
    """
    Include the post state in the fixture output. Otherwise, the state
    verification is only performed based on the state root.
    """
    include_tx_receipts_in_output: bool = True
    """
    Include transaction receipts in the fixture output.
    """

    supported_fixture_formats: ClassVar[
        Sequence[FixtureFormat | LabeledFixtureFormat]
    ] = [
        BlockchainFixture,
        BlockchainEngineFixture,
        BlockchainEngineSyncFixture,
        BlockchainEngineXFixture,
        BlockchainEngineStatefulFixture,
    ]
    supported_execute_formats: ClassVar[Sequence[LabeledExecuteFormat]] = [
        LabeledExecuteFormat(
            TransactionPost,
            "blockchain_test",
            "An execute test derived from a blockchain test",
        ),
    ]

    supported_markers: ClassVar[Dict[str, str]] = {
        "blockchain_test_engine_only": (
            "Only generate a blockchain test engine fixture"
        ),
        "blockchain_test_only": "Only generate a blockchain test fixture",
    }

    @classmethod
    def discard_fixture_format_by_marks(
        cls,
        fixture_format: FixtureFormat,
        fork: Fork | TransitionFork,
        markers: List[pytest.Mark],
    ) -> bool:
        """
        Discard a fixture format from filling if the appropriate marker is
        used.
        """
        del fork

        marker_names = [m.name for m in markers]
        if (
            fixture_format != BlockchainFixture
            and "blockchain_test_only" in marker_names
        ):
            return True
        if (
            fixture_format
            not in [BlockchainEngineFixture, BlockchainEngineXFixture]
            and "blockchain_test_engine_only" in marker_names
        ):
            return True
        return False

    def get_genesis_environment(self) -> Environment:
        """Get the genesis environment for pre-allocation groups."""
        modified_values = self.genesis_environment.set_fork_requirements(
            self.fork.transitions_from()
        ).model_dump(exclude_unset=True)
        return Environment(**(GENESIS_ENVIRONMENT_DEFAULTS | modified_values))

    def make_genesis(
        self, *, apply_pre_allocation_blockchain: bool
    ) -> Tuple[Alloc, FixtureBlock]:
        """Create a genesis block from the blockchain test definition."""
        env = self.get_genesis_environment()
        assert env.withdrawals is None or len(env.withdrawals) == 0, (
            "withdrawals must be empty at genesis"
        )
        assert (
            env.parent_beacon_block_root is None
            or env.parent_beacon_block_root == Hash(0)
        ), "parent_beacon_block_root must be empty at genesis"

        pre_alloc = self.pre
        if apply_pre_allocation_blockchain:
            pre_alloc = Alloc.merge(
                Alloc.model_validate(
                    self.fork.transitions_to().pre_allocation_blockchain()
                ),
                pre_alloc,
            )
        if empty_accounts := pre_alloc.empty_accounts():
            raise Exception(f"Empty accounts in pre state: {empty_accounts}")
        state_root = pre_alloc.state_root()
        genesis = FixtureHeader.genesis(
            self.fork.transitions_from(), env, state_root
        )

        return (
            pre_alloc,
            FixtureBlockBase(
                header=genesis,
                withdrawals=None if env.withdrawals is None else [],
            ).with_rlp(txs=[]),
        )

    def generate_block_data(
        self,
        t8n: FillerBackend,
        block: Block,
        previous_env: Environment,
        previous_alloc: Alloc | LazyAlloc,
    ) -> BuiltBlock:
        """
        Generate common block data for both make_fixture and make_hive_fixture.

        ``t8n`` is any backend satisfying ``FillerBackend``. The
        default compute path passes a concrete ``TransitionTool``; stateful
        filling will pass an ``ClientBackend`` that drives
        ``testing_buildBlockV1`` against a live client.
        """
        env = block.set_environment(previous_env)
        fork = self.fork.fork_at(
            block_number=env.number, timestamp=env.timestamp
        )
        env = env.set_fork_requirements(fork)
        txs = [tx.with_signature_and_sender() for tx in block.txs]

        if failing_tx_count := len([tx for tx in txs if tx.error]) > 0:
            if failing_tx_count > 1:
                raise Exception(
                    "test correctness: only one transaction can produce "
                    "an exception in a block"
                )
            if not txs[-1].error:
                raise Exception(
                    "test correctness: the transaction that produces an "
                    "exception must be the last transaction in the block"
                )

        transition_tool_output = t8n.evaluate(
            transition_tool_data=TransitionTool.TransitionToolData(
                alloc=previous_alloc,
                txs=txs,
                env=env,
                fork=fork,
                chain_id=self.chain_id,
                reward=fork.get_reward(),
                blob_schedule=fork.blob_schedule(),
            ),
            slow_request=self.is_tx_gas_heavy_test,
        )

        # One special case of the invalid transactions is the blob gas used,
        # since this value is not included in the transition tool result, but
        # it is included in the block header, and some clients check it before
        # executing the block by simply counting the type-3 txs, we need to set
        # the correct value by default.
        blob_gas_used: int | None = None
        if fork.supports_blobs():
            if (blob_gas_per_blob := fork.blob_gas_per_blob()) > 0:
                blob_gas_used = blob_gas_per_blob * count_blobs(txs)

        # Prepare slot_number for header initialization
        slot_number_value: ZeroPaddedHexNumber | None = None
        if fork.header_slot_number_required():
            slot_number_value = ZeroPaddedHexNumber(
                int(env.slot_number) if env.slot_number is not None else 0
            )

        header = FixtureHeader(
            **(
                transition_tool_output.result.model_dump(
                    exclude_none=True,
                    exclude={"blob_gas_used", "transactions_trie"},
                )
                | env.model_dump(
                    exclude_none=True,
                    exclude={"blob_gas_used", "slot_number"},
                )
            ),
            blob_gas_used=blob_gas_used,
            transactions_trie=Transaction.list_root(txs),
            extra_data=(
                block.extra_data if block.extra_data is not None else b""
            ),
            slot_number=slot_number_value,
            fork=fork,
        )

        if block.header_verify is not None:
            # Verify the header after transition tool processing.
            try:
                block.header_verify.verify(header)
            except Exception as e:
                raise Exception(
                    f"Verification of block {int(env.number)} failed"
                ) from e

        if block.expected_gas_used is not None:
            gas_used = int(transition_tool_output.result.gas_used)
            assert gas_used == block.expected_gas_used, (
                f"gas_used ({gas_used}) does not match expected_gas_used "
                f"({block.expected_gas_used})"
                f", difference: {gas_used - block.expected_gas_used}"
            )

        requests_list: List[Bytes] | None = None
        if fork.header_requests_required():
            assert transition_tool_output.result.requests is not None, (
                "Requests are required for this block"
            )
            requests = Requests(
                requests_lists=list(transition_tool_output.result.requests)
            )

            if Hash(requests) != header.requests_hash:
                raise Exception(
                    "Requests root in header does not match the requests "
                    "root in the transition tool output: "
                    f"{header.requests_hash} != {Hash(requests)}"
                )

            requests_list = requests.requests_list

        if block.requests is not None:
            header.requests_hash = Hash(
                Requests(requests_lists=list(block.requests))
            )
            requests_list = block.requests

        # Decode BAL from RLP bytes provided by the transition tool.
        t8n_bal_rlp = transition_tool_output.result.block_access_list
        t8n_bal: BlockAccessList | None = None
        if t8n_bal_rlp is not None:
            t8n_bal = BlockAccessList.from_rlp(t8n_bal_rlp)

        if fork.header_bal_hash_required():
            assert t8n_bal is not None, (
                "Block access list is required for this block but was not "
                "provided by the transition tool"
            )

            computed_block_access_list_hash = Hash(t8n_bal.rlp.keccak256())
            assert (
                computed_block_access_list_hash
                == header.block_access_list_hash
            ), (
                "Block access list hash in header does not match the "
                f"computed hash from BAL: {header.block_access_list_hash} "
                f"!= {computed_block_access_list_hash}"
            )

        if block.rlp_modifier is not None:
            # Modify any parameter specified in the `rlp_modifier` after
            # transition tool processing.
            header = block.rlp_modifier.apply(header)
            header.fork = fork  # Deleted during `apply` because `exclude=True`

        # Process block access list - apply transformer if present for invalid
        # tests
        bal = t8n_bal

        # Always validate BAL structural integrity (ordering, duplicates)
        # if present
        if t8n_bal is not None:
            t8n_bal.validate_structure()

        # If expected BAL is defined, verify against it
        if (
            block.expected_block_access_list is not None
            and t8n_bal is not None
        ):
            block.expected_block_access_list.verify_against(t8n_bal)

            bal = block.expected_block_access_list.modify_if_invalid_test(
                t8n_bal
            )
            if bal != t8n_bal:
                # If the BAL was modified and the fork requires it, update the
                # header hash
                header.block_access_list_hash = Hash(bal.rlp.keccak256())

        built_block_kwargs: Dict[str, Any] = dict(
            header=header,
            alloc=transition_tool_output.alloc,
            state_root=transition_tool_output.result.state_root,
            env=env,
            txs=txs,
            ommers=[],
            withdrawals=env.withdrawals,
            requests=requests_list,
            result=transition_tool_output.result,
            expected_exception=block.exception,
            engine_api_error_code=block.engine_api_error_code,
            rlp_modifier=block.rlp_modifier,
            fork=fork,
            block_access_list=bal,
        )
        built_block: BuiltBlock
        if transition_tool_output.engine_payload is not None:
            built_block = TestingBuildBlock(
                **built_block_kwargs,
                engine_payload=transition_tool_output.engine_payload,
            )
        else:
            built_block = BuiltBlock(**built_block_kwargs)

        try:
            rejected_txs = built_block.verify_transactions(
                transition_tool_exceptions_reliable=t8n.exception_mapper.reliable,
            )
            if (
                not rejected_txs
                and block.rlp_modifier is None
                and block.requests is None
                and not block.skip_exception_verification
                and not (
                    block.expected_block_access_list is not None
                    and block.expected_block_access_list._modifier is not None
                )
            ):
                # Only verify block level exception if: - No transaction
                # exception was raised, because these are not reported as block
                # exceptions. - No RLP modifier was specified, because the
                # modifier is what normally produces the block exception. - No
                # requests were specified, because modified requests are also
                # what normally produces the block exception. - No BAL modifier
                # was specified, because modified BAL also produces block
                # exceptions.
                built_block.verify_block_exception(
                    transition_tool_exceptions_reliable=t8n.exception_mapper.reliable,
                )
            verify_result(transition_tool_output.result, env)
        except Exception as e:
            print_traces(t8n.get_traces())
            pprint(transition_tool_output.result)
            pprint(previous_alloc)
            pprint(transition_tool_output.alloc.get())
            raise e

        if len(rejected_txs) > 0 and block.exception is None:
            print_traces(t8n.get_traces())
            raise Exception(
                "one or more transactions in `BlockchainTest` are "
                + "intrinsically invalid, but the block was not expected "
                + "to be invalid. Please verify whether the transaction "
                + "was indeed expected to fail and add the proper "
                + "`block.exception`"
            )

        return built_block

    def verify_post_state(
        self,
        t8n: FillerBackend,
        t8n_state: Alloc,
        expected_state: Alloc | None = None,
    ) -> None:
        """Verify post alloc after all block/s or payload/s are generated."""
        try:
            if expected_state:
                expected_state.verify_post_alloc(t8n_state)
            else:
                self.post.verify_post_alloc(t8n_state)
        except Exception as e:
            print_traces(t8n.get_traces())
            raise e

    def make_fixture(
        self,
        t8n: FillerBackend,
    ) -> FillResult:
        """Create a fixture from the blockchain test definition."""
        fixture_blocks: List[FixtureBlock | InvalidFixtureBlock] = []

        pre, genesis = self.make_genesis(apply_pre_allocation_blockchain=True)

        alloc: Alloc | LazyAlloc = pre
        state_root = genesis.header.state_root
        env = environment_from_parent_header(genesis.header)
        head = genesis.header.block_hash
        invalid_blocks = 0
        benchmark_gas_used: int | None = None
        benchmark_opcode_count: OpcodeCount | None = None
        for block in self.blocks:
            # This is the most common case, the RLP needs to be constructed
            # based on the transactions to be included in the block.
            # Set the environment according to the block to execute.
            built_block = self.generate_block_data(
                t8n=t8n,
                block=block,
                previous_env=env,
                previous_alloc=alloc,
            )
            block_number = int(built_block.header.number)
            is_last_block = block is self.blocks[-1]
            if is_last_block and self.operation_mode == OpMode.BENCHMARKING:
                benchmark_gas_used = int(built_block.result.gas_used)
                benchmark_opcode_count = built_block.result.opcode_count
            if built_block.result.receipts:
                self.validate_receipt_status(
                    receipts=built_block.result.receipts,
                    block_number=block_number,
                )
            include_receipts = (
                block.include_receipts_in_output
                if block.include_receipts_in_output is not None
                else self.include_tx_receipts_in_output
            )
            fixture_blocks.append(
                built_block.get_fixture_block(
                    include_receipts=include_receipts
                )
            )

            # BAL verification already done in to_fixture_bal() if
            # expected_block_access_list set

            if block.exception is None:
                # Update env, alloc and last block hash for the next block.
                alloc = built_block.alloc
                state_root = built_block.state_root
                env = apply_new_parent(built_block.env, built_block.header)
                head = built_block.header.block_hash
            else:
                invalid_blocks += 1

            if block.expected_post_state:
                self.verify_post_state(
                    t8n,
                    t8n_state=alloc.get()
                    if isinstance(alloc, LazyAlloc)
                    else alloc,
                    expected_state=block.expected_post_state,
                )
        self.check_exception_test(exception=invalid_blocks > 0)
        alloc = alloc.get() if isinstance(alloc, LazyAlloc) else alloc
        self.verify_post_state(t8n, t8n_state=alloc)
        fixture = BlockchainFixture(
            fork=self.fork,
            genesis=genesis.header,
            genesis_rlp=genesis.rlp,
            blocks=fixture_blocks,
            last_block_hash=head,
            pre=pre,
            post_state=alloc
            if self.include_full_post_state_in_output
            else None,
            post_state_hash=state_root
            if not self.include_full_post_state_in_output
            else None,
            config=FixtureConfig(
                fork=self.fork,
                blob_schedule=FixtureBlobSchedule.from_blob_schedule(
                    self.fork.transitions_to().blob_schedule()
                ),
                chain_id=self.chain_id,
            ),
        )
        return FillResult(
            fixture=fixture,
            gas_optimization=None,
            benchmark_gas_used=benchmark_gas_used,
            benchmark_opcode_count=benchmark_opcode_count,
            post_verifications=PostVerifications.from_alloc(self.post),
        )

    def make_hive_fixture(
        self,
        t8n: FillerBackend,
        fixture_format: FixtureFormat = BlockchainEngineFixture,
    ) -> FillResult:
        """Create a hive fixture from the blocktest definition."""
        fixture_payloads: List[FixtureEngineNewPayload] = []

        pre, genesis = self.make_genesis(
            apply_pre_allocation_blockchain=fixture_format
            != BlockchainEngineXFixture,
        )
        alloc: Alloc | LazyAlloc = pre
        state_root = genesis.header.state_root
        env = environment_from_parent_header(genesis.header)
        head_hash = genesis.header.block_hash
        invalid_blocks = 0
        benchmark_gas_used: int | None = None
        benchmark_opcode_count: OpcodeCount | None = None
        for block in self.blocks:
            built_block = self.generate_block_data(
                t8n=t8n,
                block=block,
                previous_env=env,
                previous_alloc=alloc,
            )
            block_number = int(built_block.header.number)
            is_last_block = block is self.blocks[-1]
            if is_last_block and self.operation_mode == OpMode.BENCHMARKING:
                benchmark_gas_used = int(built_block.result.gas_used)
                benchmark_opcode_count = built_block.result.opcode_count
            if built_block.result.receipts:
                self.validate_receipt_status(
                    receipts=built_block.result.receipts,
                    block_number=block_number,
                )
            fixture_payloads.append(
                built_block.get_fixture_engine_new_payload()
            )
            if block.exception is None:
                alloc = built_block.alloc
                state_root = built_block.state_root
                env = apply_new_parent(built_block.env, built_block.header)
                head_hash = built_block.header.block_hash
            else:
                invalid_blocks += 1

            if block.expected_post_state:
                self.verify_post_state(
                    t8n,
                    t8n_state=alloc.get()
                    if isinstance(alloc, LazyAlloc)
                    else alloc,
                    expected_state=block.expected_post_state,
                )
        self.check_exception_test(exception=invalid_blocks > 0)
        fcu_version = (
            self.fork.transitions_from().engine_forkchoice_updated_version()
        )
        assert fcu_version is not None, (
            "A hive fixture was requested but no forkchoice update is defined."
            " The framework should never try to execute this test case."
        )

        alloc = alloc.get() if isinstance(alloc, LazyAlloc) else alloc
        self.verify_post_state(t8n, t8n_state=alloc)

        # Create base fixture data, common to all fixture formats
        fixture_data: Dict[str, Any] = {
            "fork": self.fork,
            "genesis": genesis.header,
            "payloads": fixture_payloads,
            "last_block_hash": head_hash,
            "post_state_hash": state_root
            if not self.include_full_post_state_in_output
            else None,
            "config": FixtureConfig(
                fork=self.fork,
                chain_id=self.chain_id,
                blob_schedule=FixtureBlobSchedule.from_blob_schedule(
                    self.fork.transitions_to().blob_schedule()
                ),
            ),
        }

        # Add format-specific fields
        fixture: BaseFixture
        if fixture_format == BlockchainEngineXFixture:
            # For Engine X format, exclude pre (will be provided via shared
            # state) and prepare for state diff optimization
            fixture_data.update(
                {
                    "post_state": alloc
                    if self.include_full_post_state_in_output
                    else None,
                    "pre_hash": "",  # Will be set by BaseTestWrapper
                }
            )
            fixture = BlockchainEngineXFixture(**fixture_data)
        elif fixture_format == BlockchainEngineSyncFixture:
            # Sync fixture format
            assert genesis.header.block_hash != head_hash, (
                "Invalid payload tests negative test via sync is not "
                "supported yet."
            )
            # Most clients require the header to start the sync process, so we
            # create an empty block on top of the last block of the test to
            # send it as new payload and trigger the sync process.
            sync_built_block = self.generate_block_data(
                t8n=t8n,
                block=Block(),
                previous_env=env,
                previous_alloc=alloc,
            )
            fixture_data.update(
                {
                    "sync_payload": (
                        sync_built_block.get_fixture_engine_new_payload()
                    ),
                    "pre": pre,
                    "post_state": alloc
                    if self.include_full_post_state_in_output
                    else None,
                }
            )
            fixture = BlockchainEngineSyncFixture(**fixture_data)
        else:
            # Standard engine fixture
            fixture_data.update(
                {
                    "pre": pre,
                    "post_state": alloc
                    if self.include_full_post_state_in_output
                    else None,
                }
            )
            fixture = BlockchainEngineFixture(**fixture_data)

        return FillResult(
            fixture=fixture,
            gas_optimization=None,
            benchmark_gas_used=benchmark_gas_used,
            benchmark_opcode_count=benchmark_opcode_count,
            post_verifications=PostVerifications.from_alloc(self.post),
        )

    def make_stateful_fixture(
        self,
        t8n: FillerBackend,
    ) -> FillResult:
        """
        Create a ``BlockchainEngineStatefulFixture`` against a live client.

        Differs from ``make_hive_fixture``:

        - No genesis building: the client already has warm state from a
          snapshot. The backend (typically ``ClientBackend``) owns
          ``snapshot_block`` / ``start_block`` captured at session start.
        - ``pre.fund_eoa`` / ``pre.deploy_contract`` calls are materialised
          into a synthetic setup block prepended to ``self.blocks``,
          instead of being baked into genesis alloc.
        - Payloads are partitioned by ``FixtureEngineNewPayload.phase``
          into ``setup_payloads`` (setup-phase txs) and ``payloads``
          (execution-phase txs).
        - ``verify_post_state`` is skipped: the client is the oracle.
        """
        if not isinstance(t8n, ClientBackend):
            raise RuntimeError(
                "make_stateful_fixture requires a ClientBackend; got "
                f"{type(t8n).__name__}."
            )
        if t8n.snapshot_block is None or t8n.start_block is None:
            raise RuntimeError(
                "ClientBackend.snapshot_block / .start_block must be "
                "captured by the fill-stateful pre-run before fill."
            )
        snapshot_block = t8n.snapshot_block
        start_block = t8n.start_block

        # Mirror execute.py's pre-send flow so pending-tx funding amounts
        # materialise before we drain the queue: required-balances →
        # resolve deferred deploys/stubs/fund_addresses → run
        # minimum_balance_for_pending_transactions. Tests whose Alloc
        # does not expose ``pending_transactions`` skip this entirely.
        pending_getter = getattr(self.pre, "pending_transactions", None)
        resolve_deferred = getattr(self.pre, "resolve_deferred_checks", None)
        min_balance = getattr(
            self.pre, "minimum_balance_for_pending_transactions", None
        )
        if (
            callable(pending_getter)
            and callable(resolve_deferred)
            and callable(min_balance)
        ):
            execute_plan = self.execute(execute_format=TransactionPost)
            session_fork = self.fork.fork_at(block_number=0, timestamp=0)
            # Session fees pinned on the backend by the fill-stateful plugin.
            gas_price = t8n.gas_price
            max_fee_per_gas = t8n.max_fee_per_gas
            max_priority_fee_per_gas = t8n.max_priority_fee_per_gas
            max_fee_per_blob_gas = t8n.max_fee_per_blob_gas
            if not all(
                [
                    gas_price,
                    max_fee_per_gas,
                    max_priority_fee_per_gas,
                    max_fee_per_blob_gas,
                ]
            ):
                raise RuntimeError(
                    "make_stateful_fixture requires the backend to carry "
                    f"non-zero session fees; got gas_price={gas_price}, "
                    f"max_fee_per_gas={max_fee_per_gas}, "
                    f"max_priority_fee_per_gas={max_priority_fee_per_gas}, "
                    f"max_fee_per_blob_gas={max_fee_per_blob_gas}."
                )
            required_balances = execute_plan.get_required_sender_balances(
                gas_price=gas_price,
                max_fee_per_gas=max_fee_per_gas,
                max_priority_fee_per_gas=max_priority_fee_per_gas,
                max_fee_per_blob_gas=max_fee_per_blob_gas,
                fork=session_fork,
            )
            resolve_deferred()
            min_balance(
                required_balances,
                gas_price=gas_price,
                max_fee_per_gas=max_fee_per_gas,
                max_priority_fee_per_gas=max_priority_fee_per_gas,
                max_fee_per_blob_gas=max_fee_per_blob_gas,
            )

        # Materialise queued pre-alloc txs into a synthetic setup block.
        blocks_to_process: List[Block] = []
        if callable(pending_getter):
            setup_txs = pending_getter()
            if setup_txs:
                blocks_to_process.append(Block(txs=setup_txs))
        # Each block must be single-phase (Block.phase asserts otherwise);
        # mixed blocks (e.g. EIP-7702 authorization + benchmark exec) are
        # split into contiguous phase runs so benchmark gas isn't
        # swallowed into ``setupEngineNewPayloads``.
        blocks_to_process.extend(_split_blocks_by_phase(self.blocks))

        # Chain off the session start_block. We pull parent_* from a
        # FixtureHeader-validated copy of the client's block dict, but
        # seed block_hashes with the client's hash directly — FixtureHeader
        # recomputes block_hash from RLP and that diverges from the
        # client's authoritative hash unless every header byte is
        # reproduced exactly.
        start_block_number = int(HexNumber(start_block["number"]))
        start_block_hash = Hash(start_block["hash"])
        parent_header = FixtureHeader.model_validate(start_block)
        env = Environment(
            parent_difficulty=parent_header.difficulty,
            parent_timestamp=parent_header.timestamp,
            parent_base_fee_per_gas=parent_header.base_fee_per_gas,
            parent_blob_gas_used=parent_header.blob_gas_used,
            parent_excess_blob_gas=parent_header.excess_blob_gas,
            parent_gas_used=parent_header.gas_used,
            parent_gas_limit=parent_header.gas_limit,
            parent_ommers_hash=parent_header.ommers_hash,
            block_hashes={
                HexNumber(start_block_number): start_block_hash,
            },
        )

        setup_payloads: List[FixtureEngineNewPayload] = []
        execution_payloads: List[FixtureEngineNewPayload] = []
        head_hash = start_block_hash
        benchmark_gas_used: int | None = None
        benchmark_opcode_count: OpcodeCount | None = None
        # Alloc is not authoritative in stateful mode; pass self.pre as a
        # placeholder — ClientBackend ignores it.
        alloc: Alloc | LazyAlloc = self.pre
        for block in blocks_to_process:
            built_block = self.generate_block_data(
                t8n=t8n,
                block=block,
                previous_env=env,
                previous_alloc=alloc,
            )
            assert isinstance(built_block, TestingBuildBlock), (
                "ClientBackend must return TestingBuildBlock; got "
                f"{type(built_block).__name__}"
            )
            payload = payload_metadata_to_fixture(
                built_block.engine_payload, phase=block.phase
            )
            if payload.phase == TestPhase.SETUP:
                setup_payloads.append(payload)
            else:
                execution_payloads.append(payload)
                if self.operation_mode == OpMode.BENCHMARKING:
                    benchmark_gas_used = int(built_block.result.gas_used)
                    benchmark_opcode_count = built_block.result.opcode_count
            # Overwrite the block_hash apply_new_parent just recorded —
            # it's the FixtureHeader-recomputed RLP hash, which diverges
            # from the client's authoritative hash (client picks fields
            # like gas_limit). The next block's parent_hash must point at
            # what the client actually built.
            client_hash = Hash(
                built_block.engine_payload.payload_response.execution_payload.block_hash
            )
            env = apply_new_parent(built_block.env, built_block.header)
            env = env.copy(
                block_hashes={
                    **env.block_hashes,
                    HexNumber(int(env.number)): client_hash,
                },
            )
            head_hash = client_hash

        fixture = BlockchainEngineStatefulFixture(
            fork=self.fork,
            last_block_hash=head_hash,
            config=FixtureConfig(fork=self.fork),
            snapshot_block_number=HexNumber(snapshot_block["number"]),
            snapshot_block_hash=Hash(snapshot_block["hash"]),
            start_block_number=HexNumber(start_block_number),
            start_block_hash=start_block_hash,
            setup_payloads=setup_payloads,
            payloads=execution_payloads,
            benchmark_gas_used=(
                HexNumber(benchmark_gas_used)
                if benchmark_gas_used is not None
                else None
            ),
        )
        return FillResult(
            fixture=fixture,
            gas_optimization=None,
            benchmark_gas_used=benchmark_gas_used,
            benchmark_opcode_count=benchmark_opcode_count,
            post_verifications=PostVerifications.from_alloc(self.post),
        )

    def generate(
        self,
        t8n: FillerBackend,
        fixture_format: FixtureFormat,
    ) -> FillResult:
        """Generate the BlockchainTest fixture."""
        if fixture_format == BlockchainEngineStatefulFixture:
            return self.make_stateful_fixture(t8n)
        if fixture_format in [
            BlockchainEngineFixture,
            BlockchainEngineXFixture,
            BlockchainEngineSyncFixture,
        ]:
            return self.make_hive_fixture(t8n, fixture_format)
        elif fixture_format == BlockchainFixture:
            return self.make_fixture(t8n)

        raise Exception(f"Unknown fixture format: {fixture_format}")

    def execute(
        self,
        *,
        execute_format: ExecuteFormat,
    ) -> BaseExecute:
        """Generate the list of test fixtures."""
        if execute_format == TransactionPost:
            blocks: List[List[Transaction]] = []
            for block in self.blocks:
                blocks += [block.txs]
            # Pass gas validation params for benchmark tests
            # If not benchmark mode, skip gas used validation
            if self.operation_mode != OpMode.BENCHMARKING:
                self.skip_gas_used_validation = True

            benchmark_mode = self.operation_mode == OpMode.BENCHMARKING
            return TransactionPost(
                blocks=blocks,
                post=self.post,
                benchmark_mode=benchmark_mode,
            )
        raise Exception(f"Unsupported execute format: {execute_format}")

include_full_post_state_in_output = True class-attribute instance-attribute

Include the post state in the fixture output. Otherwise, the state verification is only performed based on the state root.

include_tx_receipts_in_output = True class-attribute instance-attribute

Include transaction receipts in the fixture output.

discard_fixture_format_by_marks(fixture_format, fork, markers) classmethod

Discard a fixture format from filling if the appropriate marker is used.

Source code in packages/testing/src/execution_testing/specs/blockchain.py
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
@classmethod
def discard_fixture_format_by_marks(
    cls,
    fixture_format: FixtureFormat,
    fork: Fork | TransitionFork,
    markers: List[pytest.Mark],
) -> bool:
    """
    Discard a fixture format from filling if the appropriate marker is
    used.
    """
    del fork

    marker_names = [m.name for m in markers]
    if (
        fixture_format != BlockchainFixture
        and "blockchain_test_only" in marker_names
    ):
        return True
    if (
        fixture_format
        not in [BlockchainEngineFixture, BlockchainEngineXFixture]
        and "blockchain_test_engine_only" in marker_names
    ):
        return True
    return False

get_genesis_environment()

Get the genesis environment for pre-allocation groups.

Source code in packages/testing/src/execution_testing/specs/blockchain.py
763
764
765
766
767
768
def get_genesis_environment(self) -> Environment:
    """Get the genesis environment for pre-allocation groups."""
    modified_values = self.genesis_environment.set_fork_requirements(
        self.fork.transitions_from()
    ).model_dump(exclude_unset=True)
    return Environment(**(GENESIS_ENVIRONMENT_DEFAULTS | modified_values))

make_genesis(*, apply_pre_allocation_blockchain)

Create a genesis block from the blockchain test definition.

Source code in packages/testing/src/execution_testing/specs/blockchain.py
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
def make_genesis(
    self, *, apply_pre_allocation_blockchain: bool
) -> Tuple[Alloc, FixtureBlock]:
    """Create a genesis block from the blockchain test definition."""
    env = self.get_genesis_environment()
    assert env.withdrawals is None or len(env.withdrawals) == 0, (
        "withdrawals must be empty at genesis"
    )
    assert (
        env.parent_beacon_block_root is None
        or env.parent_beacon_block_root == Hash(0)
    ), "parent_beacon_block_root must be empty at genesis"

    pre_alloc = self.pre
    if apply_pre_allocation_blockchain:
        pre_alloc = Alloc.merge(
            Alloc.model_validate(
                self.fork.transitions_to().pre_allocation_blockchain()
            ),
            pre_alloc,
        )
    if empty_accounts := pre_alloc.empty_accounts():
        raise Exception(f"Empty accounts in pre state: {empty_accounts}")
    state_root = pre_alloc.state_root()
    genesis = FixtureHeader.genesis(
        self.fork.transitions_from(), env, state_root
    )

    return (
        pre_alloc,
        FixtureBlockBase(
            header=genesis,
            withdrawals=None if env.withdrawals is None else [],
        ).with_rlp(txs=[]),
    )

generate_block_data(t8n, block, previous_env, previous_alloc)

Generate common block data for both make_fixture and make_hive_fixture.

t8n is any backend satisfying FillerBackend. The default compute path passes a concrete TransitionTool; stateful filling will pass an ClientBackend that drives testing_buildBlockV1 against a live client.

Source code in packages/testing/src/execution_testing/specs/blockchain.py
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
def generate_block_data(
    self,
    t8n: FillerBackend,
    block: Block,
    previous_env: Environment,
    previous_alloc: Alloc | LazyAlloc,
) -> BuiltBlock:
    """
    Generate common block data for both make_fixture and make_hive_fixture.

    ``t8n`` is any backend satisfying ``FillerBackend``. The
    default compute path passes a concrete ``TransitionTool``; stateful
    filling will pass an ``ClientBackend`` that drives
    ``testing_buildBlockV1`` against a live client.
    """
    env = block.set_environment(previous_env)
    fork = self.fork.fork_at(
        block_number=env.number, timestamp=env.timestamp
    )
    env = env.set_fork_requirements(fork)
    txs = [tx.with_signature_and_sender() for tx in block.txs]

    if failing_tx_count := len([tx for tx in txs if tx.error]) > 0:
        if failing_tx_count > 1:
            raise Exception(
                "test correctness: only one transaction can produce "
                "an exception in a block"
            )
        if not txs[-1].error:
            raise Exception(
                "test correctness: the transaction that produces an "
                "exception must be the last transaction in the block"
            )

    transition_tool_output = t8n.evaluate(
        transition_tool_data=TransitionTool.TransitionToolData(
            alloc=previous_alloc,
            txs=txs,
            env=env,
            fork=fork,
            chain_id=self.chain_id,
            reward=fork.get_reward(),
            blob_schedule=fork.blob_schedule(),
        ),
        slow_request=self.is_tx_gas_heavy_test,
    )

    # One special case of the invalid transactions is the blob gas used,
    # since this value is not included in the transition tool result, but
    # it is included in the block header, and some clients check it before
    # executing the block by simply counting the type-3 txs, we need to set
    # the correct value by default.
    blob_gas_used: int | None = None
    if fork.supports_blobs():
        if (blob_gas_per_blob := fork.blob_gas_per_blob()) > 0:
            blob_gas_used = blob_gas_per_blob * count_blobs(txs)

    # Prepare slot_number for header initialization
    slot_number_value: ZeroPaddedHexNumber | None = None
    if fork.header_slot_number_required():
        slot_number_value = ZeroPaddedHexNumber(
            int(env.slot_number) if env.slot_number is not None else 0
        )

    header = FixtureHeader(
        **(
            transition_tool_output.result.model_dump(
                exclude_none=True,
                exclude={"blob_gas_used", "transactions_trie"},
            )
            | env.model_dump(
                exclude_none=True,
                exclude={"blob_gas_used", "slot_number"},
            )
        ),
        blob_gas_used=blob_gas_used,
        transactions_trie=Transaction.list_root(txs),
        extra_data=(
            block.extra_data if block.extra_data is not None else b""
        ),
        slot_number=slot_number_value,
        fork=fork,
    )

    if block.header_verify is not None:
        # Verify the header after transition tool processing.
        try:
            block.header_verify.verify(header)
        except Exception as e:
            raise Exception(
                f"Verification of block {int(env.number)} failed"
            ) from e

    if block.expected_gas_used is not None:
        gas_used = int(transition_tool_output.result.gas_used)
        assert gas_used == block.expected_gas_used, (
            f"gas_used ({gas_used}) does not match expected_gas_used "
            f"({block.expected_gas_used})"
            f", difference: {gas_used - block.expected_gas_used}"
        )

    requests_list: List[Bytes] | None = None
    if fork.header_requests_required():
        assert transition_tool_output.result.requests is not None, (
            "Requests are required for this block"
        )
        requests = Requests(
            requests_lists=list(transition_tool_output.result.requests)
        )

        if Hash(requests) != header.requests_hash:
            raise Exception(
                "Requests root in header does not match the requests "
                "root in the transition tool output: "
                f"{header.requests_hash} != {Hash(requests)}"
            )

        requests_list = requests.requests_list

    if block.requests is not None:
        header.requests_hash = Hash(
            Requests(requests_lists=list(block.requests))
        )
        requests_list = block.requests

    # Decode BAL from RLP bytes provided by the transition tool.
    t8n_bal_rlp = transition_tool_output.result.block_access_list
    t8n_bal: BlockAccessList | None = None
    if t8n_bal_rlp is not None:
        t8n_bal = BlockAccessList.from_rlp(t8n_bal_rlp)

    if fork.header_bal_hash_required():
        assert t8n_bal is not None, (
            "Block access list is required for this block but was not "
            "provided by the transition tool"
        )

        computed_block_access_list_hash = Hash(t8n_bal.rlp.keccak256())
        assert (
            computed_block_access_list_hash
            == header.block_access_list_hash
        ), (
            "Block access list hash in header does not match the "
            f"computed hash from BAL: {header.block_access_list_hash} "
            f"!= {computed_block_access_list_hash}"
        )

    if block.rlp_modifier is not None:
        # Modify any parameter specified in the `rlp_modifier` after
        # transition tool processing.
        header = block.rlp_modifier.apply(header)
        header.fork = fork  # Deleted during `apply` because `exclude=True`

    # Process block access list - apply transformer if present for invalid
    # tests
    bal = t8n_bal

    # Always validate BAL structural integrity (ordering, duplicates)
    # if present
    if t8n_bal is not None:
        t8n_bal.validate_structure()

    # If expected BAL is defined, verify against it
    if (
        block.expected_block_access_list is not None
        and t8n_bal is not None
    ):
        block.expected_block_access_list.verify_against(t8n_bal)

        bal = block.expected_block_access_list.modify_if_invalid_test(
            t8n_bal
        )
        if bal != t8n_bal:
            # If the BAL was modified and the fork requires it, update the
            # header hash
            header.block_access_list_hash = Hash(bal.rlp.keccak256())

    built_block_kwargs: Dict[str, Any] = dict(
        header=header,
        alloc=transition_tool_output.alloc,
        state_root=transition_tool_output.result.state_root,
        env=env,
        txs=txs,
        ommers=[],
        withdrawals=env.withdrawals,
        requests=requests_list,
        result=transition_tool_output.result,
        expected_exception=block.exception,
        engine_api_error_code=block.engine_api_error_code,
        rlp_modifier=block.rlp_modifier,
        fork=fork,
        block_access_list=bal,
    )
    built_block: BuiltBlock
    if transition_tool_output.engine_payload is not None:
        built_block = TestingBuildBlock(
            **built_block_kwargs,
            engine_payload=transition_tool_output.engine_payload,
        )
    else:
        built_block = BuiltBlock(**built_block_kwargs)

    try:
        rejected_txs = built_block.verify_transactions(
            transition_tool_exceptions_reliable=t8n.exception_mapper.reliable,
        )
        if (
            not rejected_txs
            and block.rlp_modifier is None
            and block.requests is None
            and not block.skip_exception_verification
            and not (
                block.expected_block_access_list is not None
                and block.expected_block_access_list._modifier is not None
            )
        ):
            # Only verify block level exception if: - No transaction
            # exception was raised, because these are not reported as block
            # exceptions. - No RLP modifier was specified, because the
            # modifier is what normally produces the block exception. - No
            # requests were specified, because modified requests are also
            # what normally produces the block exception. - No BAL modifier
            # was specified, because modified BAL also produces block
            # exceptions.
            built_block.verify_block_exception(
                transition_tool_exceptions_reliable=t8n.exception_mapper.reliable,
            )
        verify_result(transition_tool_output.result, env)
    except Exception as e:
        print_traces(t8n.get_traces())
        pprint(transition_tool_output.result)
        pprint(previous_alloc)
        pprint(transition_tool_output.alloc.get())
        raise e

    if len(rejected_txs) > 0 and block.exception is None:
        print_traces(t8n.get_traces())
        raise Exception(
            "one or more transactions in `BlockchainTest` are "
            + "intrinsically invalid, but the block was not expected "
            + "to be invalid. Please verify whether the transaction "
            + "was indeed expected to fail and add the proper "
            + "`block.exception`"
        )

    return built_block

verify_post_state(t8n, t8n_state, expected_state=None)

Verify post alloc after all block/s or payload/s are generated.

Source code in packages/testing/src/execution_testing/specs/blockchain.py
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
def verify_post_state(
    self,
    t8n: FillerBackend,
    t8n_state: Alloc,
    expected_state: Alloc | None = None,
) -> None:
    """Verify post alloc after all block/s or payload/s are generated."""
    try:
        if expected_state:
            expected_state.verify_post_alloc(t8n_state)
        else:
            self.post.verify_post_alloc(t8n_state)
    except Exception as e:
        print_traces(t8n.get_traces())
        raise e

make_fixture(t8n)

Create a fixture from the blockchain test definition.

Source code in packages/testing/src/execution_testing/specs/blockchain.py
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
def make_fixture(
    self,
    t8n: FillerBackend,
) -> FillResult:
    """Create a fixture from the blockchain test definition."""
    fixture_blocks: List[FixtureBlock | InvalidFixtureBlock] = []

    pre, genesis = self.make_genesis(apply_pre_allocation_blockchain=True)

    alloc: Alloc | LazyAlloc = pre
    state_root = genesis.header.state_root
    env = environment_from_parent_header(genesis.header)
    head = genesis.header.block_hash
    invalid_blocks = 0
    benchmark_gas_used: int | None = None
    benchmark_opcode_count: OpcodeCount | None = None
    for block in self.blocks:
        # This is the most common case, the RLP needs to be constructed
        # based on the transactions to be included in the block.
        # Set the environment according to the block to execute.
        built_block = self.generate_block_data(
            t8n=t8n,
            block=block,
            previous_env=env,
            previous_alloc=alloc,
        )
        block_number = int(built_block.header.number)
        is_last_block = block is self.blocks[-1]
        if is_last_block and self.operation_mode == OpMode.BENCHMARKING:
            benchmark_gas_used = int(built_block.result.gas_used)
            benchmark_opcode_count = built_block.result.opcode_count
        if built_block.result.receipts:
            self.validate_receipt_status(
                receipts=built_block.result.receipts,
                block_number=block_number,
            )
        include_receipts = (
            block.include_receipts_in_output
            if block.include_receipts_in_output is not None
            else self.include_tx_receipts_in_output
        )
        fixture_blocks.append(
            built_block.get_fixture_block(
                include_receipts=include_receipts
            )
        )

        # BAL verification already done in to_fixture_bal() if
        # expected_block_access_list set

        if block.exception is None:
            # Update env, alloc and last block hash for the next block.
            alloc = built_block.alloc
            state_root = built_block.state_root
            env = apply_new_parent(built_block.env, built_block.header)
            head = built_block.header.block_hash
        else:
            invalid_blocks += 1

        if block.expected_post_state:
            self.verify_post_state(
                t8n,
                t8n_state=alloc.get()
                if isinstance(alloc, LazyAlloc)
                else alloc,
                expected_state=block.expected_post_state,
            )
    self.check_exception_test(exception=invalid_blocks > 0)
    alloc = alloc.get() if isinstance(alloc, LazyAlloc) else alloc
    self.verify_post_state(t8n, t8n_state=alloc)
    fixture = BlockchainFixture(
        fork=self.fork,
        genesis=genesis.header,
        genesis_rlp=genesis.rlp,
        blocks=fixture_blocks,
        last_block_hash=head,
        pre=pre,
        post_state=alloc
        if self.include_full_post_state_in_output
        else None,
        post_state_hash=state_root
        if not self.include_full_post_state_in_output
        else None,
        config=FixtureConfig(
            fork=self.fork,
            blob_schedule=FixtureBlobSchedule.from_blob_schedule(
                self.fork.transitions_to().blob_schedule()
            ),
            chain_id=self.chain_id,
        ),
    )
    return FillResult(
        fixture=fixture,
        gas_optimization=None,
        benchmark_gas_used=benchmark_gas_used,
        benchmark_opcode_count=benchmark_opcode_count,
        post_verifications=PostVerifications.from_alloc(self.post),
    )

make_hive_fixture(t8n, fixture_format=BlockchainEngineFixture)

Create a hive fixture from the blocktest definition.

Source code in packages/testing/src/execution_testing/specs/blockchain.py
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
def make_hive_fixture(
    self,
    t8n: FillerBackend,
    fixture_format: FixtureFormat = BlockchainEngineFixture,
) -> FillResult:
    """Create a hive fixture from the blocktest definition."""
    fixture_payloads: List[FixtureEngineNewPayload] = []

    pre, genesis = self.make_genesis(
        apply_pre_allocation_blockchain=fixture_format
        != BlockchainEngineXFixture,
    )
    alloc: Alloc | LazyAlloc = pre
    state_root = genesis.header.state_root
    env = environment_from_parent_header(genesis.header)
    head_hash = genesis.header.block_hash
    invalid_blocks = 0
    benchmark_gas_used: int | None = None
    benchmark_opcode_count: OpcodeCount | None = None
    for block in self.blocks:
        built_block = self.generate_block_data(
            t8n=t8n,
            block=block,
            previous_env=env,
            previous_alloc=alloc,
        )
        block_number = int(built_block.header.number)
        is_last_block = block is self.blocks[-1]
        if is_last_block and self.operation_mode == OpMode.BENCHMARKING:
            benchmark_gas_used = int(built_block.result.gas_used)
            benchmark_opcode_count = built_block.result.opcode_count
        if built_block.result.receipts:
            self.validate_receipt_status(
                receipts=built_block.result.receipts,
                block_number=block_number,
            )
        fixture_payloads.append(
            built_block.get_fixture_engine_new_payload()
        )
        if block.exception is None:
            alloc = built_block.alloc
            state_root = built_block.state_root
            env = apply_new_parent(built_block.env, built_block.header)
            head_hash = built_block.header.block_hash
        else:
            invalid_blocks += 1

        if block.expected_post_state:
            self.verify_post_state(
                t8n,
                t8n_state=alloc.get()
                if isinstance(alloc, LazyAlloc)
                else alloc,
                expected_state=block.expected_post_state,
            )
    self.check_exception_test(exception=invalid_blocks > 0)
    fcu_version = (
        self.fork.transitions_from().engine_forkchoice_updated_version()
    )
    assert fcu_version is not None, (
        "A hive fixture was requested but no forkchoice update is defined."
        " The framework should never try to execute this test case."
    )

    alloc = alloc.get() if isinstance(alloc, LazyAlloc) else alloc
    self.verify_post_state(t8n, t8n_state=alloc)

    # Create base fixture data, common to all fixture formats
    fixture_data: Dict[str, Any] = {
        "fork": self.fork,
        "genesis": genesis.header,
        "payloads": fixture_payloads,
        "last_block_hash": head_hash,
        "post_state_hash": state_root
        if not self.include_full_post_state_in_output
        else None,
        "config": FixtureConfig(
            fork=self.fork,
            chain_id=self.chain_id,
            blob_schedule=FixtureBlobSchedule.from_blob_schedule(
                self.fork.transitions_to().blob_schedule()
            ),
        ),
    }

    # Add format-specific fields
    fixture: BaseFixture
    if fixture_format == BlockchainEngineXFixture:
        # For Engine X format, exclude pre (will be provided via shared
        # state) and prepare for state diff optimization
        fixture_data.update(
            {
                "post_state": alloc
                if self.include_full_post_state_in_output
                else None,
                "pre_hash": "",  # Will be set by BaseTestWrapper
            }
        )
        fixture = BlockchainEngineXFixture(**fixture_data)
    elif fixture_format == BlockchainEngineSyncFixture:
        # Sync fixture format
        assert genesis.header.block_hash != head_hash, (
            "Invalid payload tests negative test via sync is not "
            "supported yet."
        )
        # Most clients require the header to start the sync process, so we
        # create an empty block on top of the last block of the test to
        # send it as new payload and trigger the sync process.
        sync_built_block = self.generate_block_data(
            t8n=t8n,
            block=Block(),
            previous_env=env,
            previous_alloc=alloc,
        )
        fixture_data.update(
            {
                "sync_payload": (
                    sync_built_block.get_fixture_engine_new_payload()
                ),
                "pre": pre,
                "post_state": alloc
                if self.include_full_post_state_in_output
                else None,
            }
        )
        fixture = BlockchainEngineSyncFixture(**fixture_data)
    else:
        # Standard engine fixture
        fixture_data.update(
            {
                "pre": pre,
                "post_state": alloc
                if self.include_full_post_state_in_output
                else None,
            }
        )
        fixture = BlockchainEngineFixture(**fixture_data)

    return FillResult(
        fixture=fixture,
        gas_optimization=None,
        benchmark_gas_used=benchmark_gas_used,
        benchmark_opcode_count=benchmark_opcode_count,
        post_verifications=PostVerifications.from_alloc(self.post),
    )

make_stateful_fixture(t8n)

Create a BlockchainEngineStatefulFixture against a live client.

Differs from make_hive_fixture:

  • No genesis building: the client already has warm state from a snapshot. The backend (typically ClientBackend) owns snapshot_block / start_block captured at session start.
  • pre.fund_eoa / pre.deploy_contract calls are materialised into a synthetic setup block prepended to self.blocks, instead of being baked into genesis alloc.
  • Payloads are partitioned by FixtureEngineNewPayload.phase into setup_payloads (setup-phase txs) and payloads (execution-phase txs).
  • verify_post_state is skipped: the client is the oracle.
Source code in packages/testing/src/execution_testing/specs/blockchain.py
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
def make_stateful_fixture(
    self,
    t8n: FillerBackend,
) -> FillResult:
    """
    Create a ``BlockchainEngineStatefulFixture`` against a live client.

    Differs from ``make_hive_fixture``:

    - No genesis building: the client already has warm state from a
      snapshot. The backend (typically ``ClientBackend``) owns
      ``snapshot_block`` / ``start_block`` captured at session start.
    - ``pre.fund_eoa`` / ``pre.deploy_contract`` calls are materialised
      into a synthetic setup block prepended to ``self.blocks``,
      instead of being baked into genesis alloc.
    - Payloads are partitioned by ``FixtureEngineNewPayload.phase``
      into ``setup_payloads`` (setup-phase txs) and ``payloads``
      (execution-phase txs).
    - ``verify_post_state`` is skipped: the client is the oracle.
    """
    if not isinstance(t8n, ClientBackend):
        raise RuntimeError(
            "make_stateful_fixture requires a ClientBackend; got "
            f"{type(t8n).__name__}."
        )
    if t8n.snapshot_block is None or t8n.start_block is None:
        raise RuntimeError(
            "ClientBackend.snapshot_block / .start_block must be "
            "captured by the fill-stateful pre-run before fill."
        )
    snapshot_block = t8n.snapshot_block
    start_block = t8n.start_block

    # Mirror execute.py's pre-send flow so pending-tx funding amounts
    # materialise before we drain the queue: required-balances →
    # resolve deferred deploys/stubs/fund_addresses → run
    # minimum_balance_for_pending_transactions. Tests whose Alloc
    # does not expose ``pending_transactions`` skip this entirely.
    pending_getter = getattr(self.pre, "pending_transactions", None)
    resolve_deferred = getattr(self.pre, "resolve_deferred_checks", None)
    min_balance = getattr(
        self.pre, "minimum_balance_for_pending_transactions", None
    )
    if (
        callable(pending_getter)
        and callable(resolve_deferred)
        and callable(min_balance)
    ):
        execute_plan = self.execute(execute_format=TransactionPost)
        session_fork = self.fork.fork_at(block_number=0, timestamp=0)
        # Session fees pinned on the backend by the fill-stateful plugin.
        gas_price = t8n.gas_price
        max_fee_per_gas = t8n.max_fee_per_gas
        max_priority_fee_per_gas = t8n.max_priority_fee_per_gas
        max_fee_per_blob_gas = t8n.max_fee_per_blob_gas
        if not all(
            [
                gas_price,
                max_fee_per_gas,
                max_priority_fee_per_gas,
                max_fee_per_blob_gas,
            ]
        ):
            raise RuntimeError(
                "make_stateful_fixture requires the backend to carry "
                f"non-zero session fees; got gas_price={gas_price}, "
                f"max_fee_per_gas={max_fee_per_gas}, "
                f"max_priority_fee_per_gas={max_priority_fee_per_gas}, "
                f"max_fee_per_blob_gas={max_fee_per_blob_gas}."
            )
        required_balances = execute_plan.get_required_sender_balances(
            gas_price=gas_price,
            max_fee_per_gas=max_fee_per_gas,
            max_priority_fee_per_gas=max_priority_fee_per_gas,
            max_fee_per_blob_gas=max_fee_per_blob_gas,
            fork=session_fork,
        )
        resolve_deferred()
        min_balance(
            required_balances,
            gas_price=gas_price,
            max_fee_per_gas=max_fee_per_gas,
            max_priority_fee_per_gas=max_priority_fee_per_gas,
            max_fee_per_blob_gas=max_fee_per_blob_gas,
        )

    # Materialise queued pre-alloc txs into a synthetic setup block.
    blocks_to_process: List[Block] = []
    if callable(pending_getter):
        setup_txs = pending_getter()
        if setup_txs:
            blocks_to_process.append(Block(txs=setup_txs))
    # Each block must be single-phase (Block.phase asserts otherwise);
    # mixed blocks (e.g. EIP-7702 authorization + benchmark exec) are
    # split into contiguous phase runs so benchmark gas isn't
    # swallowed into ``setupEngineNewPayloads``.
    blocks_to_process.extend(_split_blocks_by_phase(self.blocks))

    # Chain off the session start_block. We pull parent_* from a
    # FixtureHeader-validated copy of the client's block dict, but
    # seed block_hashes with the client's hash directly — FixtureHeader
    # recomputes block_hash from RLP and that diverges from the
    # client's authoritative hash unless every header byte is
    # reproduced exactly.
    start_block_number = int(HexNumber(start_block["number"]))
    start_block_hash = Hash(start_block["hash"])
    parent_header = FixtureHeader.model_validate(start_block)
    env = Environment(
        parent_difficulty=parent_header.difficulty,
        parent_timestamp=parent_header.timestamp,
        parent_base_fee_per_gas=parent_header.base_fee_per_gas,
        parent_blob_gas_used=parent_header.blob_gas_used,
        parent_excess_blob_gas=parent_header.excess_blob_gas,
        parent_gas_used=parent_header.gas_used,
        parent_gas_limit=parent_header.gas_limit,
        parent_ommers_hash=parent_header.ommers_hash,
        block_hashes={
            HexNumber(start_block_number): start_block_hash,
        },
    )

    setup_payloads: List[FixtureEngineNewPayload] = []
    execution_payloads: List[FixtureEngineNewPayload] = []
    head_hash = start_block_hash
    benchmark_gas_used: int | None = None
    benchmark_opcode_count: OpcodeCount | None = None
    # Alloc is not authoritative in stateful mode; pass self.pre as a
    # placeholder — ClientBackend ignores it.
    alloc: Alloc | LazyAlloc = self.pre
    for block in blocks_to_process:
        built_block = self.generate_block_data(
            t8n=t8n,
            block=block,
            previous_env=env,
            previous_alloc=alloc,
        )
        assert isinstance(built_block, TestingBuildBlock), (
            "ClientBackend must return TestingBuildBlock; got "
            f"{type(built_block).__name__}"
        )
        payload = payload_metadata_to_fixture(
            built_block.engine_payload, phase=block.phase
        )
        if payload.phase == TestPhase.SETUP:
            setup_payloads.append(payload)
        else:
            execution_payloads.append(payload)
            if self.operation_mode == OpMode.BENCHMARKING:
                benchmark_gas_used = int(built_block.result.gas_used)
                benchmark_opcode_count = built_block.result.opcode_count
        # Overwrite the block_hash apply_new_parent just recorded —
        # it's the FixtureHeader-recomputed RLP hash, which diverges
        # from the client's authoritative hash (client picks fields
        # like gas_limit). The next block's parent_hash must point at
        # what the client actually built.
        client_hash = Hash(
            built_block.engine_payload.payload_response.execution_payload.block_hash
        )
        env = apply_new_parent(built_block.env, built_block.header)
        env = env.copy(
            block_hashes={
                **env.block_hashes,
                HexNumber(int(env.number)): client_hash,
            },
        )
        head_hash = client_hash

    fixture = BlockchainEngineStatefulFixture(
        fork=self.fork,
        last_block_hash=head_hash,
        config=FixtureConfig(fork=self.fork),
        snapshot_block_number=HexNumber(snapshot_block["number"]),
        snapshot_block_hash=Hash(snapshot_block["hash"]),
        start_block_number=HexNumber(start_block_number),
        start_block_hash=start_block_hash,
        setup_payloads=setup_payloads,
        payloads=execution_payloads,
        benchmark_gas_used=(
            HexNumber(benchmark_gas_used)
            if benchmark_gas_used is not None
            else None
        ),
    )
    return FillResult(
        fixture=fixture,
        gas_optimization=None,
        benchmark_gas_used=benchmark_gas_used,
        benchmark_opcode_count=benchmark_opcode_count,
        post_verifications=PostVerifications.from_alloc(self.post),
    )

generate(t8n, fixture_format)

Generate the BlockchainTest fixture.

Source code in packages/testing/src/execution_testing/specs/blockchain.py
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
def generate(
    self,
    t8n: FillerBackend,
    fixture_format: FixtureFormat,
) -> FillResult:
    """Generate the BlockchainTest fixture."""
    if fixture_format == BlockchainEngineStatefulFixture:
        return self.make_stateful_fixture(t8n)
    if fixture_format in [
        BlockchainEngineFixture,
        BlockchainEngineXFixture,
        BlockchainEngineSyncFixture,
    ]:
        return self.make_hive_fixture(t8n, fixture_format)
    elif fixture_format == BlockchainFixture:
        return self.make_fixture(t8n)

    raise Exception(f"Unknown fixture format: {fixture_format}")

execute(*, execute_format)

Generate the list of test fixtures.

Source code in packages/testing/src/execution_testing/specs/blockchain.py
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
def execute(
    self,
    *,
    execute_format: ExecuteFormat,
) -> BaseExecute:
    """Generate the list of test fixtures."""
    if execute_format == TransactionPost:
        blocks: List[List[Transaction]] = []
        for block in self.blocks:
            blocks += [block.txs]
        # Pass gas validation params for benchmark tests
        # If not benchmark mode, skip gas used validation
        if self.operation_mode != OpMode.BENCHMARKING:
            self.skip_gas_used_validation = True

        benchmark_mode = self.operation_mode == OpMode.BENCHMARKING
        return TransactionPost(
            blocks=blocks,
            post=self.post,
            benchmark_mode=benchmark_mode,
        )
    raise Exception(f"Unsupported execute format: {execute_format}")

Header

Bases: CamelModel

Header type used to describe block header properties in test specs.

Source code in packages/testing/src/execution_testing/specs/blockchain.py
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
class Header(CamelModel):
    """Header type used to describe block header properties in test specs."""

    parent_hash: Hash | None = None
    ommers_hash: Hash | None = None
    fee_recipient: Address | None = None
    state_root: Hash | None = None
    transactions_trie: Hash | None = None
    receipts_root: Hash | None = None
    logs_bloom: Bloom | None = None
    difficulty: HexNumber | None = None
    number: HexNumber | None = None
    gas_limit: HexNumber | None = None
    gas_used: HexNumber | None = None
    timestamp: HexNumber | None = None
    extra_data: Bytes | None = None
    prev_randao: Hash | None = None
    nonce: HeaderNonce | None = None
    base_fee_per_gas: Removable | HexNumber | None = None
    withdrawals_root: Removable | Hash | None = None
    blob_gas_used: Removable | HexNumber | None = None
    excess_blob_gas: Removable | HexNumber | None = None
    parent_beacon_block_root: Removable | Hash | None = None
    requests_hash: Removable | Hash | None = None
    block_access_list_hash: Removable | Hash | None = None
    slot_number: Removable | HexNumber | None = None

    REMOVE_FIELD: ClassVar[Removable] = Removable()
    """
    Sentinel object used to specify that a header field should be removed.
    """
    EMPTY_FIELD: ClassVar[Removable] = Removable()
    """
    Sentinel object used to specify that a header field must be empty during
    verification.

    This can be used in a test to explicitly skip a field in a block's RLP
    encoding that would otherwise be included in the (json) output when the
    model is serialized. For example:

    ```
    header_modifier = Header(
        excess_blob_gas=Header.REMOVE_FIELD,
    )
    block = Block(
        timestamp=TIMESTAMP,
        rlp_modifier=header_modifier,
        exception=BlockException.INCORRECT_BLOCK_FORMAT,
        engine_api_error_code=EngineAPIError.InvalidParams,
    )
    ```
    """

    model_config = ConfigDict(
        **CamelModel.model_config,
        arbitrary_types_allowed=True,
    )

    @model_serializer(mode="wrap", when_used="json")
    def _serialize_model(self, serializer: Any, info: Any) -> Dict[str, Any]:
        """Exclude Removable fields from serialization."""
        del info
        data = serializer(self)
        return {k: v for k, v in data.items() if not isinstance(v, Removable)}

    @field_validator("withdrawals_root", mode="before")
    @classmethod
    def validate_withdrawals_root(cls, value: Any) -> Any:
        """Convert a list of withdrawals into the withdrawals root hash."""
        if isinstance(value, list):
            return Withdrawal.list_root(value)
        return value

    def apply(self, target: FixtureHeader) -> FixtureHeader:
        """
        Produce a fixture header copy with the set values from the modifier.
        """
        overrides = {
            k: (v if v is not Header.REMOVE_FIELD else None)
            for k, v in self.model_dump(exclude_none=True).items()
        }
        unknown = overrides.keys() - target.__class__.model_fields.keys()
        if unknown:
            raise ValueError(
                f"Header fields {unknown} do not exist on "
                f"{target.__class__.__name__}. Check for field name "
                f"mismatches between Header and {target.__class__.__name__}."
            )
        return target.copy(**overrides)

    def verify(self, target: FixtureHeader) -> None:
        """Verify that the header fields from self are as expected."""
        for field_name in self.__class__.model_fields:
            baseline_value = getattr(self, field_name)
            if baseline_value is not None:
                assert baseline_value is not Header.REMOVE_FIELD, (
                    "invalid header"
                )
                value = getattr(target, field_name)
                if baseline_value is Header.EMPTY_FIELD:
                    assert value is None, (
                        f"invalid header field {field_name}, "
                        f"got {value}, want None"
                    )
                    continue
                assert value == baseline_value, (
                    f"invalid header field ({field_name}) value, "
                    + f"got {value}, want {baseline_value}"
                )

REMOVE_FIELD = Removable() class-attribute

Sentinel object used to specify that a header field should be removed.

EMPTY_FIELD = Removable() class-attribute

Sentinel object used to specify that a header field must be empty during verification.

This can be used in a test to explicitly skip a field in a block's RLP encoding that would otherwise be included in the (json) output when the model is serialized. For example:

header_modifier = Header(
    excess_blob_gas=Header.REMOVE_FIELD,
)
block = Block(
    timestamp=TIMESTAMP,
    rlp_modifier=header_modifier,
    exception=BlockException.INCORRECT_BLOCK_FORMAT,
    engine_api_error_code=EngineAPIError.InvalidParams,
)

validate_withdrawals_root(value) classmethod

Convert a list of withdrawals into the withdrawals root hash.

Source code in packages/testing/src/execution_testing/specs/blockchain.py
250
251
252
253
254
255
256
@field_validator("withdrawals_root", mode="before")
@classmethod
def validate_withdrawals_root(cls, value: Any) -> Any:
    """Convert a list of withdrawals into the withdrawals root hash."""
    if isinstance(value, list):
        return Withdrawal.list_root(value)
    return value

apply(target)

Produce a fixture header copy with the set values from the modifier.

Source code in packages/testing/src/execution_testing/specs/blockchain.py
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
def apply(self, target: FixtureHeader) -> FixtureHeader:
    """
    Produce a fixture header copy with the set values from the modifier.
    """
    overrides = {
        k: (v if v is not Header.REMOVE_FIELD else None)
        for k, v in self.model_dump(exclude_none=True).items()
    }
    unknown = overrides.keys() - target.__class__.model_fields.keys()
    if unknown:
        raise ValueError(
            f"Header fields {unknown} do not exist on "
            f"{target.__class__.__name__}. Check for field name "
            f"mismatches between Header and {target.__class__.__name__}."
        )
    return target.copy(**overrides)

verify(target)

Verify that the header fields from self are as expected.

Source code in packages/testing/src/execution_testing/specs/blockchain.py
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
def verify(self, target: FixtureHeader) -> None:
    """Verify that the header fields from self are as expected."""
    for field_name in self.__class__.model_fields:
        baseline_value = getattr(self, field_name)
        if baseline_value is not None:
            assert baseline_value is not Header.REMOVE_FIELD, (
                "invalid header"
            )
            value = getattr(target, field_name)
            if baseline_value is Header.EMPTY_FIELD:
                assert value is None, (
                    f"invalid header field {field_name}, "
                    f"got {value}, want None"
                )
                continue
            assert value == baseline_value, (
                f"invalid header field ({field_name}) value, "
                + f"got {value}, want {baseline_value}"
            )

StateTest

Bases: BaseTest

Filler type that tests transactions over the period of a single block.

Source code in packages/testing/src/execution_testing/specs/state.py
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
class StateTest(BaseTest):
    """
    Filler type that tests transactions over the period of a single block.
    """

    env: Environment = Field(default_factory=Environment)
    pre: Alloc
    post: Alloc
    tx: Transaction
    block_exception: (
        List[TransactionException | BlockException]
        | TransactionException
        | BlockException
        | None
    ) = None
    engine_api_error_code: Optional[EngineAPIError] = None
    blockchain_test_header_verify: Optional[Header] = None
    blockchain_test_rlp_modifier: Optional[Header] = None
    expected_block_access_list: Optional[BlockAccessListExpectation] = None
    chain_id: int = 1

    supported_fixture_formats: ClassVar[
        Sequence[FixtureFormat | LabeledFixtureFormat]
    ] = [
        StateFixture,
    ] + [
        LabeledFixtureFormat(
            fixture_format,
            f"{fixture_format.format_name}_from_state_test",
            f"A {fixture_format.format_name} generated from a state_test",
        )
        for fixture_format in BlockchainTest.supported_fixture_formats
        # Exclude sync fixtures from state tests - they don't make sense for
        # state tests
        if not (
            (
                hasattr(fixture_format, "__name__")
                and "Sync" in fixture_format.__name__
            )
            or (
                hasattr(fixture_format, "format")
                and "Sync" in fixture_format.format.__name__
            )
        )
    ]
    supported_execute_formats: ClassVar[Sequence[LabeledExecuteFormat]] = [
        LabeledExecuteFormat(
            TransactionPost,
            "state_test",
            "An execute test derived from a state test",
        ),
    ]

    supported_markers: ClassVar[Dict[str, str]] = {
        "state_test_only": "Only generate a state test fixture",
    }

    def verify_modified_gas_limit(
        self,
        *,
        t8n: TransitionTool,
        base_tool_result: Result,
        base_tool_alloc: Alloc,
        fork: Fork,
        current_gas_limit: int,
        pre_alloc: Alloc,
        env: Environment,
        ignore_gas_differences: bool,
    ) -> bool:
        """Verify a new lower gas limit yields the same transaction outcome."""
        base_traces = base_tool_result.traces
        assert base_traces is not None, (
            "Traces not collected for gas optimization"
        )
        new_tx = self.tx.copy(
            gas_limit=current_gas_limit
        ).with_signature_and_sender()
        modified_tool_output = t8n.evaluate(
            transition_tool_data=TransitionTool.TransitionToolData(
                alloc=pre_alloc,
                txs=[new_tx],
                env=env,
                fork=fork,
                chain_id=self.chain_id,
                reward=0,  # Reward on state tests is always zero
                blob_schedule=fork.blob_schedule(),
                state_test=True,
            ),
            slow_request=self.is_tx_gas_heavy_test,
        )
        modified_traces = modified_tool_output.result.traces
        assert modified_traces is not None, (
            "Traces not collected for gas optimization"
        )
        if not base_traces.are_equivalent(
            modified_tool_output.result.traces,
            ignore_gas_differences,
        ):
            logger.debug(
                f"Traces are not equivalent (gas_limit={current_gas_limit})"
            )
            return False
        modified_tool_alloc = modified_tool_output.alloc.get()
        try:
            self.post.verify_post_alloc(modified_tool_alloc)
        except Exception as e:
            logger.debug(
                f"Post alloc is not equivalent (gas_limit={current_gas_limit})"
            )
            logger.debug(e)
            return False
        try:
            verify_transactions(
                txs=[new_tx],
                result=modified_tool_output.result,
                transition_tool_exceptions_reliable=t8n.exception_mapper.reliable,
            )
        except Exception as e:
            logger.debug(
                "Transactions are not equivalent "
                f"(gas_limit={current_gas_limit})"
            )
            logger.debug(e)
            return False
        if len(base_tool_alloc.root) != len(modified_tool_alloc.root):
            logger.debug(
                f"Post alloc is not equivalent (gas_limit={current_gas_limit})"
            )
            return False
        if base_tool_alloc.root.keys() != modified_tool_alloc.root.keys():
            logger.debug(
                f"Post alloc is not equivalent (gas_limit={current_gas_limit})"
            )
            return False
        for k in base_tool_alloc.root.keys():
            if k not in modified_tool_alloc:
                logger.debug(
                    "Post alloc is not equivalent "
                    f"(gas_limit={current_gas_limit})"
                )
                return False
            base_account = base_tool_alloc[k]
            modified_account = modified_tool_alloc[k]
            if (modified_account is None) != (base_account is None):
                logger.debug(
                    "Post alloc is not equivalent "
                    f"(gas_limit={current_gas_limit})"
                )
                return False
            if (
                modified_account is not None
                and base_account is not None
                and base_account.nonce != modified_account.nonce
            ):
                logger.debug(
                    "Post alloc is not equivalent "
                    f"(gas_limit={current_gas_limit})"
                )
                return False
        logger.debug(
            f"Gas limit is equivalent (gas_limit={current_gas_limit})"
        )
        return True

    @classmethod
    def discard_fixture_format_by_marks(
        cls,
        fixture_format: FixtureFormat,
        fork: Fork | TransitionFork,
        markers: List[pytest.Mark],
    ) -> bool:
        """
        Discard a fixture format from filling if the appropriate marker is
        used.
        """
        del fork

        if "state_test_only" in [m.name for m in markers]:
            return fixture_format != StateFixture
        return False

    def _generate_blockchain_genesis_environment(self) -> Environment:
        """
        Generate the genesis environment for the BlockchainTest formatted test.
        """
        assert self.env.number >= 1, (
            "genesis block number cannot be negative, set state test "
            "env.number to at least 1"
        )
        assert self.env.timestamp >= 1, (
            "genesis timestamp cannot be negative, set state test "
            "env.timestamp to at least 1"
        )
        # There's only a handful of values that we need to set in the genesis
        # for the environment values at block 1 to make sense:
        # - Number: Needs to be N minus 1
        # - Timestamp: Needs to be zero, because the subsequent
        #              block can come at any time.
        # - Gas Limit: Changes from parent to child, needs to be set in genesis
        # - Base Fee Per Gas: Block's base fee depends on the parent's value
        # - Excess Blob Gas: Block's excess blob gas value depends on
        #                    the parent's value
        genesis_block_number = self.env.number - 1
        genesis_timestamp = 0
        genesis_fork = self.fork.fork_at(
            block_number=genesis_block_number, timestamp=genesis_timestamp
        )
        kwargs: Dict[str, Any] = {
            "number": genesis_block_number,
            "timestamp": genesis_timestamp,
        }

        if "gas_limit" in self.env.model_fields_set:
            kwargs["gas_limit"] = self.env.gas_limit

        if self.env.base_fee_per_gas:
            # Calculate genesis base fee per gas from state test's block#1 env
            kwargs["base_fee_per_gas"] = HexNumber(
                int(int(str(self.env.base_fee_per_gas), 0) * 8 / 7)
            )

        if self.env.excess_blob_gas:
            # The excess blob gas environment value means the value of the
            # context (block header) where the transaction is executed. In a
            # blockchain test, we need to indirectly set the excess blob gas by
            # setting the excess blob gas of the genesis block to the expected
            # value plus the BLOB_TARGET_GAS_PER_BLOCK, which is the value that
            # will be subtracted from the excess blob gas when the first block
            # is mined.
            kwargs["excess_blob_gas"] = self.env.excess_blob_gas + (
                genesis_fork.target_blobs_per_block()
                * genesis_fork.blob_gas_per_blob()
            )

        return Environment(**kwargs)

    def _generate_blockchain_blocks(self) -> List[Block]:
        """
        Generate the single block that represents this state test in a
        BlockchainTest format.
        """
        number = self.env.number
        timestamp = self.env.timestamp
        fork = self.fork.fork_at(block_number=number, timestamp=timestamp)
        kwargs = {
            "number": self.env.number,
            "timestamp": self.env.timestamp,
            "prev_randao": self.env.prev_randao,
            "fee_recipient": self.env.fee_recipient,
            "gas_limit": self.env.gas_limit,
            "extra_data": self.env.extra_data,
            "withdrawals": self.env.withdrawals,
            "parent_beacon_block_root": self.env.parent_beacon_block_root,
            "slot_number": self.env.slot_number,
            "txs": [self.tx],
            "ommers": [],
            "header_verify": self.blockchain_test_header_verify,
            "rlp_modifier": self.blockchain_test_rlp_modifier,
            "expected_block_access_list": self.expected_block_access_list,
        }
        if not fork.header_prev_randao_required():
            kwargs["difficulty"] = self.env.difficulty
        if "block_exception" in self.model_fields_set:
            kwargs["exception"] = self.block_exception  # type: ignore
        elif "error" in self.tx.model_fields_set:
            kwargs["exception"] = self.tx.error  # type: ignore
        return [Block(**kwargs)]

    def generate_blockchain_test(self) -> BlockchainTest:
        """Generate a BlockchainTest fixture from this StateTest fixture."""
        return BlockchainTest.from_test(
            base_test=self,
            genesis_environment=self._generate_blockchain_genesis_environment(),
            pre=self.pre,
            post=self.post,
            blocks=self._generate_blockchain_blocks(),
        )

    def make_state_test_fixture(
        self,
        t8n: TransitionTool,
    ) -> FillResult:
        """Create a fixture from the state test definition."""
        # We can't generate a state test fixture that names a transition fork,
        # so we get the fork at the block number and timestamp of the state
        # test
        fork = self.fork.fork_at(
            block_number=self.env.number, timestamp=self.env.timestamp
        )

        env = self.env.set_fork_requirements(fork)
        tx = self.tx.with_signature_and_sender(keep_secret_key=True)
        pre_alloc = Alloc.merge(
            Alloc.model_validate(fork.pre_allocation()),
            self.pre,
        )
        if empty_accounts := pre_alloc.empty_accounts():
            raise Exception(f"Empty accounts in pre state: {empty_accounts}")

        transition_tool_output = t8n.evaluate(
            transition_tool_data=TransitionTool.TransitionToolData(
                alloc=pre_alloc,
                txs=[tx],
                env=env,
                fork=fork,
                chain_id=self.chain_id,
                reward=0,  # Reward on state tests is always zero
                blob_schedule=fork.blob_schedule(),
                state_test=True,
            ),
            slow_request=self.is_tx_gas_heavy_test,
        )
        output_alloc = transition_tool_output.alloc.get()

        try:
            self.post.verify_post_alloc(output_alloc)
        except Exception as e:
            print_traces(t8n.get_traces())
            raise e

        try:
            verify_transactions(
                txs=[tx],
                result=transition_tool_output.result,
                transition_tool_exceptions_reliable=t8n.exception_mapper.reliable,
            )
        except Exception as e:
            print_traces(t8n.get_traces())
            pprint(transition_tool_output.result)
            pprint(output_alloc)
            raise e

        gas_optimization: int | None = None

        if (
            self.operation_mode == OpMode.OPTIMIZE_GAS
            or self.operation_mode == OpMode.OPTIMIZE_GAS_POST_PROCESSING
        ):
            ignore_gas_differences = (
                self.operation_mode == OpMode.OPTIMIZE_GAS_POST_PROCESSING
            )
            base_tool_output = transition_tool_output
            base_tool_alloc = base_tool_output.alloc.get()
            base_tool_result = base_tool_output.result

            assert base_tool_result.traces is not None, "Traces not found."

            # First try reducing the gas limit only by one, if the validation
            # fails, it means that the traces change even with the slightest
            # modification to the gas.
            if self.verify_modified_gas_limit(
                t8n=t8n,
                base_tool_result=base_tool_result,
                base_tool_alloc=base_tool_alloc,
                fork=fork,
                current_gas_limit=self.tx.gas_limit - 1,
                pre_alloc=pre_alloc,
                env=env,
                ignore_gas_differences=ignore_gas_differences,
            ):
                minimum_gas_limit = 0
                maximum_gas_limit = int(self.tx.gas_limit)
                while minimum_gas_limit < maximum_gas_limit:
                    current_gas_limit = (
                        maximum_gas_limit + minimum_gas_limit
                    ) // 2
                    if self.verify_modified_gas_limit(
                        t8n=t8n,
                        base_tool_result=base_tool_result,
                        base_tool_alloc=base_tool_alloc,
                        fork=fork,
                        current_gas_limit=current_gas_limit,
                        pre_alloc=pre_alloc,
                        env=env,
                        ignore_gas_differences=ignore_gas_differences,
                    ):
                        maximum_gas_limit = current_gas_limit
                    else:
                        minimum_gas_limit = current_gas_limit + 1
                        if (
                            self.gas_optimization_max_gas_limit is not None
                            and minimum_gas_limit
                            > self.gas_optimization_max_gas_limit
                        ):
                            raise Exception(
                                "Requires more than the minimum "
                                f"{self.gas_optimization_max_gas_limit} "
                                "wanted."
                            )

                assert self.verify_modified_gas_limit(
                    t8n=t8n,
                    base_tool_result=base_tool_result,
                    base_tool_alloc=base_tool_alloc,
                    fork=fork,
                    current_gas_limit=minimum_gas_limit,
                    pre_alloc=pre_alloc,
                    env=env,
                    ignore_gas_differences=ignore_gas_differences,
                )
                gas_optimization = current_gas_limit
            else:
                raise Exception("Impossible to compare.")

        if len(transition_tool_output.result.receipts) == 1:
            receipt = FixtureTransactionReceipt.from_transaction_receipt(
                transition_tool_output.result.receipts[0], tx
            )
            receipt_root = FixtureTransactionReceipt.list_root([receipt])
            assert (
                transition_tool_output.result.receipts_root == receipt_root
            ), (
                f"Receipts root mismatch: "
                f"{transition_tool_output.result.receipts_root} != "
                f"{receipt_root.hex()}"
                f"Receipt: {receipt.rlp()}"
            )
        else:
            receipt = None

        fixture = StateFixture(
            env=FixtureEnvironment(**env.model_dump(exclude_none=True)),
            pre=pre_alloc,
            post={
                fork: [
                    FixtureForkPost(
                        state_root=transition_tool_output.result.state_root,
                        logs_hash=transition_tool_output.result.logs_hash,
                        receipt=receipt,
                        tx_bytes=tx.rlp(),
                        expect_exception=tx.error,
                        state=output_alloc,
                    )
                ]
            },
            transaction=FixtureTransaction.from_transaction(tx),
            config=FixtureConfig(
                blob_schedule=FixtureBlobSchedule.from_blob_schedule(
                    fork.blob_schedule()
                ),
                chain_id=self.chain_id,
            ),
        )
        return FillResult(
            fixture=fixture,
            gas_optimization=gas_optimization,
            benchmark_gas_used=transition_tool_output.result.gas_used,
            benchmark_opcode_count=transition_tool_output.result.opcode_count,
            post_verifications=PostVerifications.from_alloc(self.post),
        )

    def get_genesis_environment(self) -> Environment:
        """Get the genesis environment for pre-allocation groups."""
        return self.generate_blockchain_test().get_genesis_environment()

    def generate(
        self,
        t8n: TransitionTool,
        fixture_format: FixtureFormat,
    ) -> FillResult:
        """Generate the BlockchainTest fixture."""
        self.check_exception_test(exception=self.tx.error is not None)
        if fixture_format in BlockchainTest.supported_fixture_formats:
            return self.generate_blockchain_test().generate(
                t8n=t8n, fixture_format=fixture_format
            )
        elif fixture_format == StateFixture:
            return self.make_state_test_fixture(t8n)

        raise Exception(f"Unknown fixture format: {fixture_format}")

    def execute(
        self,
        *,
        execute_format: ExecuteFormat,
    ) -> BaseExecute:
        """Generate the list of test fixtures."""
        if execute_format == TransactionPost:
            # Pass gas validation params for benchmark tests
            # If not benchmark mode, skip gas used validation
            if self.operation_mode != OpMode.BENCHMARKING:
                self.skip_gas_used_validation = True
            benchmark_mode = self.operation_mode == OpMode.BENCHMARKING
            return TransactionPost(
                blocks=[[self.tx]],
                post=self.post,
                benchmark_mode=benchmark_mode,
            )
        raise Exception(f"Unsupported execute format: {execute_format}")

verify_modified_gas_limit(*, t8n, base_tool_result, base_tool_alloc, fork, current_gas_limit, pre_alloc, env, ignore_gas_differences)

Verify a new lower gas limit yields the same transaction outcome.

Source code in packages/testing/src/execution_testing/specs/state.py
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
def verify_modified_gas_limit(
    self,
    *,
    t8n: TransitionTool,
    base_tool_result: Result,
    base_tool_alloc: Alloc,
    fork: Fork,
    current_gas_limit: int,
    pre_alloc: Alloc,
    env: Environment,
    ignore_gas_differences: bool,
) -> bool:
    """Verify a new lower gas limit yields the same transaction outcome."""
    base_traces = base_tool_result.traces
    assert base_traces is not None, (
        "Traces not collected for gas optimization"
    )
    new_tx = self.tx.copy(
        gas_limit=current_gas_limit
    ).with_signature_and_sender()
    modified_tool_output = t8n.evaluate(
        transition_tool_data=TransitionTool.TransitionToolData(
            alloc=pre_alloc,
            txs=[new_tx],
            env=env,
            fork=fork,
            chain_id=self.chain_id,
            reward=0,  # Reward on state tests is always zero
            blob_schedule=fork.blob_schedule(),
            state_test=True,
        ),
        slow_request=self.is_tx_gas_heavy_test,
    )
    modified_traces = modified_tool_output.result.traces
    assert modified_traces is not None, (
        "Traces not collected for gas optimization"
    )
    if not base_traces.are_equivalent(
        modified_tool_output.result.traces,
        ignore_gas_differences,
    ):
        logger.debug(
            f"Traces are not equivalent (gas_limit={current_gas_limit})"
        )
        return False
    modified_tool_alloc = modified_tool_output.alloc.get()
    try:
        self.post.verify_post_alloc(modified_tool_alloc)
    except Exception as e:
        logger.debug(
            f"Post alloc is not equivalent (gas_limit={current_gas_limit})"
        )
        logger.debug(e)
        return False
    try:
        verify_transactions(
            txs=[new_tx],
            result=modified_tool_output.result,
            transition_tool_exceptions_reliable=t8n.exception_mapper.reliable,
        )
    except Exception as e:
        logger.debug(
            "Transactions are not equivalent "
            f"(gas_limit={current_gas_limit})"
        )
        logger.debug(e)
        return False
    if len(base_tool_alloc.root) != len(modified_tool_alloc.root):
        logger.debug(
            f"Post alloc is not equivalent (gas_limit={current_gas_limit})"
        )
        return False
    if base_tool_alloc.root.keys() != modified_tool_alloc.root.keys():
        logger.debug(
            f"Post alloc is not equivalent (gas_limit={current_gas_limit})"
        )
        return False
    for k in base_tool_alloc.root.keys():
        if k not in modified_tool_alloc:
            logger.debug(
                "Post alloc is not equivalent "
                f"(gas_limit={current_gas_limit})"
            )
            return False
        base_account = base_tool_alloc[k]
        modified_account = modified_tool_alloc[k]
        if (modified_account is None) != (base_account is None):
            logger.debug(
                "Post alloc is not equivalent "
                f"(gas_limit={current_gas_limit})"
            )
            return False
        if (
            modified_account is not None
            and base_account is not None
            and base_account.nonce != modified_account.nonce
        ):
            logger.debug(
                "Post alloc is not equivalent "
                f"(gas_limit={current_gas_limit})"
            )
            return False
    logger.debug(
        f"Gas limit is equivalent (gas_limit={current_gas_limit})"
    )
    return True

discard_fixture_format_by_marks(fixture_format, fork, markers) classmethod

Discard a fixture format from filling if the appropriate marker is used.

Source code in packages/testing/src/execution_testing/specs/state.py
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
@classmethod
def discard_fixture_format_by_marks(
    cls,
    fixture_format: FixtureFormat,
    fork: Fork | TransitionFork,
    markers: List[pytest.Mark],
) -> bool:
    """
    Discard a fixture format from filling if the appropriate marker is
    used.
    """
    del fork

    if "state_test_only" in [m.name for m in markers]:
        return fixture_format != StateFixture
    return False

generate_blockchain_test()

Generate a BlockchainTest fixture from this StateTest fixture.

Source code in packages/testing/src/execution_testing/specs/state.py
336
337
338
339
340
341
342
343
344
def generate_blockchain_test(self) -> BlockchainTest:
    """Generate a BlockchainTest fixture from this StateTest fixture."""
    return BlockchainTest.from_test(
        base_test=self,
        genesis_environment=self._generate_blockchain_genesis_environment(),
        pre=self.pre,
        post=self.post,
        blocks=self._generate_blockchain_blocks(),
    )

make_state_test_fixture(t8n)

Create a fixture from the state test definition.

Source code in packages/testing/src/execution_testing/specs/state.py
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
def make_state_test_fixture(
    self,
    t8n: TransitionTool,
) -> FillResult:
    """Create a fixture from the state test definition."""
    # We can't generate a state test fixture that names a transition fork,
    # so we get the fork at the block number and timestamp of the state
    # test
    fork = self.fork.fork_at(
        block_number=self.env.number, timestamp=self.env.timestamp
    )

    env = self.env.set_fork_requirements(fork)
    tx = self.tx.with_signature_and_sender(keep_secret_key=True)
    pre_alloc = Alloc.merge(
        Alloc.model_validate(fork.pre_allocation()),
        self.pre,
    )
    if empty_accounts := pre_alloc.empty_accounts():
        raise Exception(f"Empty accounts in pre state: {empty_accounts}")

    transition_tool_output = t8n.evaluate(
        transition_tool_data=TransitionTool.TransitionToolData(
            alloc=pre_alloc,
            txs=[tx],
            env=env,
            fork=fork,
            chain_id=self.chain_id,
            reward=0,  # Reward on state tests is always zero
            blob_schedule=fork.blob_schedule(),
            state_test=True,
        ),
        slow_request=self.is_tx_gas_heavy_test,
    )
    output_alloc = transition_tool_output.alloc.get()

    try:
        self.post.verify_post_alloc(output_alloc)
    except Exception as e:
        print_traces(t8n.get_traces())
        raise e

    try:
        verify_transactions(
            txs=[tx],
            result=transition_tool_output.result,
            transition_tool_exceptions_reliable=t8n.exception_mapper.reliable,
        )
    except Exception as e:
        print_traces(t8n.get_traces())
        pprint(transition_tool_output.result)
        pprint(output_alloc)
        raise e

    gas_optimization: int | None = None

    if (
        self.operation_mode == OpMode.OPTIMIZE_GAS
        or self.operation_mode == OpMode.OPTIMIZE_GAS_POST_PROCESSING
    ):
        ignore_gas_differences = (
            self.operation_mode == OpMode.OPTIMIZE_GAS_POST_PROCESSING
        )
        base_tool_output = transition_tool_output
        base_tool_alloc = base_tool_output.alloc.get()
        base_tool_result = base_tool_output.result

        assert base_tool_result.traces is not None, "Traces not found."

        # First try reducing the gas limit only by one, if the validation
        # fails, it means that the traces change even with the slightest
        # modification to the gas.
        if self.verify_modified_gas_limit(
            t8n=t8n,
            base_tool_result=base_tool_result,
            base_tool_alloc=base_tool_alloc,
            fork=fork,
            current_gas_limit=self.tx.gas_limit - 1,
            pre_alloc=pre_alloc,
            env=env,
            ignore_gas_differences=ignore_gas_differences,
        ):
            minimum_gas_limit = 0
            maximum_gas_limit = int(self.tx.gas_limit)
            while minimum_gas_limit < maximum_gas_limit:
                current_gas_limit = (
                    maximum_gas_limit + minimum_gas_limit
                ) // 2
                if self.verify_modified_gas_limit(
                    t8n=t8n,
                    base_tool_result=base_tool_result,
                    base_tool_alloc=base_tool_alloc,
                    fork=fork,
                    current_gas_limit=current_gas_limit,
                    pre_alloc=pre_alloc,
                    env=env,
                    ignore_gas_differences=ignore_gas_differences,
                ):
                    maximum_gas_limit = current_gas_limit
                else:
                    minimum_gas_limit = current_gas_limit + 1
                    if (
                        self.gas_optimization_max_gas_limit is not None
                        and minimum_gas_limit
                        > self.gas_optimization_max_gas_limit
                    ):
                        raise Exception(
                            "Requires more than the minimum "
                            f"{self.gas_optimization_max_gas_limit} "
                            "wanted."
                        )

            assert self.verify_modified_gas_limit(
                t8n=t8n,
                base_tool_result=base_tool_result,
                base_tool_alloc=base_tool_alloc,
                fork=fork,
                current_gas_limit=minimum_gas_limit,
                pre_alloc=pre_alloc,
                env=env,
                ignore_gas_differences=ignore_gas_differences,
            )
            gas_optimization = current_gas_limit
        else:
            raise Exception("Impossible to compare.")

    if len(transition_tool_output.result.receipts) == 1:
        receipt = FixtureTransactionReceipt.from_transaction_receipt(
            transition_tool_output.result.receipts[0], tx
        )
        receipt_root = FixtureTransactionReceipt.list_root([receipt])
        assert (
            transition_tool_output.result.receipts_root == receipt_root
        ), (
            f"Receipts root mismatch: "
            f"{transition_tool_output.result.receipts_root} != "
            f"{receipt_root.hex()}"
            f"Receipt: {receipt.rlp()}"
        )
    else:
        receipt = None

    fixture = StateFixture(
        env=FixtureEnvironment(**env.model_dump(exclude_none=True)),
        pre=pre_alloc,
        post={
            fork: [
                FixtureForkPost(
                    state_root=transition_tool_output.result.state_root,
                    logs_hash=transition_tool_output.result.logs_hash,
                    receipt=receipt,
                    tx_bytes=tx.rlp(),
                    expect_exception=tx.error,
                    state=output_alloc,
                )
            ]
        },
        transaction=FixtureTransaction.from_transaction(tx),
        config=FixtureConfig(
            blob_schedule=FixtureBlobSchedule.from_blob_schedule(
                fork.blob_schedule()
            ),
            chain_id=self.chain_id,
        ),
    )
    return FillResult(
        fixture=fixture,
        gas_optimization=gas_optimization,
        benchmark_gas_used=transition_tool_output.result.gas_used,
        benchmark_opcode_count=transition_tool_output.result.opcode_count,
        post_verifications=PostVerifications.from_alloc(self.post),
    )

get_genesis_environment()

Get the genesis environment for pre-allocation groups.

Source code in packages/testing/src/execution_testing/specs/state.py
519
520
521
def get_genesis_environment(self) -> Environment:
    """Get the genesis environment for pre-allocation groups."""
    return self.generate_blockchain_test().get_genesis_environment()

generate(t8n, fixture_format)

Generate the BlockchainTest fixture.

Source code in packages/testing/src/execution_testing/specs/state.py
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
def generate(
    self,
    t8n: TransitionTool,
    fixture_format: FixtureFormat,
) -> FillResult:
    """Generate the BlockchainTest fixture."""
    self.check_exception_test(exception=self.tx.error is not None)
    if fixture_format in BlockchainTest.supported_fixture_formats:
        return self.generate_blockchain_test().generate(
            t8n=t8n, fixture_format=fixture_format
        )
    elif fixture_format == StateFixture:
        return self.make_state_test_fixture(t8n)

    raise Exception(f"Unknown fixture format: {fixture_format}")

execute(*, execute_format)

Generate the list of test fixtures.

Source code in packages/testing/src/execution_testing/specs/state.py
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
def execute(
    self,
    *,
    execute_format: ExecuteFormat,
) -> BaseExecute:
    """Generate the list of test fixtures."""
    if execute_format == TransactionPost:
        # Pass gas validation params for benchmark tests
        # If not benchmark mode, skip gas used validation
        if self.operation_mode != OpMode.BENCHMARKING:
            self.skip_gas_used_validation = True
        benchmark_mode = self.operation_mode == OpMode.BENCHMARKING
        return TransactionPost(
            blocks=[[self.tx]],
            post=self.post,
            benchmark_mode=benchmark_mode,
        )
    raise Exception(f"Unsupported execute format: {execute_format}")

StateStaticTest

Bases: BaseStaticTest

General State Test static filler from ethereum/tests.

Source code in packages/testing/src/execution_testing/specs/static_state/state_static.py
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
class StateStaticTest(BaseStaticTest):
    """General State Test static filler from ethereum/tests."""

    test_name: str = ""
    format_name: ClassVar[str] = "state_test"

    info: Info | None = Field(None, alias="_info")
    env: EnvironmentInStateTestFiller
    pre: PreInFiller
    transaction: GeneralTransactionInFiller
    expect: List[ExpectSectionInStateTestFiller]

    model_config = ConfigDict(extra="forbid")

    def model_post_init(self, context: Any) -> None:
        """Initialize StateStaticTest."""
        super().model_post_init(context)

    @model_validator(mode="after")
    def match_labels(self) -> Self:
        """Replace labels in expect section with corresponding tx.d indexes."""

        def parse_string_indexes(indexes: str) -> List[int]:
            """Parse index that are string in to list of int."""
            if ":label" in indexes:
                # Parse labels in data
                indexes = indexes.replace(":label ", "")
                tx_matches: List[int] = []
                for idx in self.transaction.data:
                    if indexes == idx.label:
                        tx_matches.append(idx.index)
                return tx_matches
            else:
                # Parse ranges in data
                start, end = map(int, indexes.lstrip().split("-"))
                return list(range(start, end + 1))

        def parse_indexes(
            indexes: Union[
                int, str, list[Union[int, str]], list[str], list[int]
            ],
            do_hint: bool = False,
        ) -> List[int] | int:
            """
            Parse indexes and replace all ranges and labels into tx indexes.
            """
            result: List[int] | int = []

            if do_hint:
                print("Before: " + str(indexes))

            if isinstance(indexes, int):
                result = indexes
            if isinstance(indexes, str):
                result = parse_string_indexes(indexes)
            if isinstance(indexes, list):
                result = []
                for element in indexes:
                    parsed = parse_indexes(element)
                    if isinstance(parsed, int):
                        result.append(parsed)
                    else:
                        result.extend(parsed)
                result = list(set(result))

            if do_hint:
                print("After: " + str(result))
            return result

        for expect_section in self.expect:
            expect_section.indexes.data = parse_indexes(
                expect_section.indexes.data
            )
            expect_section.indexes.gas = parse_indexes(
                expect_section.indexes.gas
            )
            expect_section.indexes.value = parse_indexes(
                expect_section.indexes.value
            )

        return self

    def fill_function(self) -> Callable:
        """Return a StateTest spec from a static file."""
        # Check if this test uses tags
        has_tags = False
        tx_tag_dependencies = self.transaction.tag_dependencies()
        if tx_tag_dependencies:
            has_tags = True
        else:
            # Check expect sections for tags
            for expect in self.expect:
                result_tag_dependencies = expect.result.tag_dependencies()
                if result_tag_dependencies:
                    has_tags = True
                    break

        fully_tagged = True
        for address in self.pre.root:
            if not isinstance(address, Tag):
                fully_tagged = False
                break

        d_g_v_parameters: List[ParameterSet] = []
        for d in self.transaction.data:
            for g in range(len(self.transaction.gas_limit)):
                for v in range(len(self.transaction.value)):
                    exception_test = False
                    for expect in self.expect:
                        if (
                            expect.has_index(d.index, g, v)
                            and expect.expect_exception is not None
                        ):
                            exception_test = True
                    # TODO: This does not take into account exceptions that
                    # only happen on specific forks, but this requires a
                    # covariant parametrize
                    marks = (
                        [pytest.mark.exception_test] if exception_test else []
                    )
                    id_label = ""
                    if len(self.transaction.data) > 1 or d.label is not None:
                        if d.label is not None:
                            id_label = f"{d}"
                        else:
                            id_label = f"d{d}"
                    if len(self.transaction.gas_limit) > 1:
                        id_label += f"-g{g}"
                    if len(self.transaction.value) > 1:
                        id_label += f"-v{v}"
                    d_g_v_parameters.append(
                        pytest.param(d.index, g, v, marks=marks, id=id_label)
                    )

        @pytest.mark.valid_at(*self.get_valid_at_forks())
        @pytest.mark.parametrize("d,g,v", d_g_v_parameters)
        def test_state_vectors(
            state_test: StateTestFiller,
            pre: Alloc,
            fork: Fork,
            d: int,
            g: int,
            v: int,
        ) -> None:
            for expect in self.expect:
                if expect.has_index(d, g, v):
                    if fork in expect.network:
                        tx_tag_dependencies = (
                            self.transaction.tag_dependencies()
                        )
                        result_tag_dependencies = (
                            expect.result.tag_dependencies()
                        )
                        all_dependencies = {
                            **tx_tag_dependencies,
                            **result_tag_dependencies,
                        }
                        tags = self.pre.setup(pre, all_dependencies)
                        env = self.env.get_environment(tags)
                        exception = (
                            None
                            if expect.expect_exception is None
                            else expect.expect_exception[fork]
                        )
                        tx = self.transaction.get_transaction(
                            tags, d, g, v, exception
                        )
                        post = expect.result.resolve(tags)
                        state_test(
                            env=env,
                            pre=pre,
                            post=post,
                            tx=tx,
                        )
                        return
            pytest.fail(
                f"Expectation not found for d={d}, g={g}, v={v}, fork={fork}"
            )

        if self.info and self.info.pytest_marks:
            for mark in self.info.pytest_marks:
                apply_mark = getattr(pytest.mark, mark)
                test_state_vectors = apply_mark(test_state_vectors)

        if has_tags:
            test_state_vectors = pytest.mark.tagged(test_state_vectors)
            if fully_tagged:
                test_state_vectors = pytest.mark.fully_tagged(
                    test_state_vectors
                )
        else:
            test_state_vectors = pytest.mark.untagged(test_state_vectors)

        # All static tests are mutable since we do `pre[0x123...] = Account()`
        test_state_vectors = pytest.mark.pre_alloc_mutable(test_state_vectors)

        return test_state_vectors

    def get_valid_at_forks(self) -> List[str]:
        """Return list of forks that are valid for this test."""
        fork_set: Set[Fork] = set()
        for expect in self.expect:
            fork_set.update(expect.network)
        return sorted([str(f) for f in fork_set])

model_post_init(context)

Initialize StateStaticTest.

Source code in packages/testing/src/execution_testing/specs/static_state/state_static.py
42
43
44
def model_post_init(self, context: Any) -> None:
    """Initialize StateStaticTest."""
    super().model_post_init(context)

match_labels()

Replace labels in expect section with corresponding tx.d indexes.

Source code in packages/testing/src/execution_testing/specs/static_state/state_static.py
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
@model_validator(mode="after")
def match_labels(self) -> Self:
    """Replace labels in expect section with corresponding tx.d indexes."""

    def parse_string_indexes(indexes: str) -> List[int]:
        """Parse index that are string in to list of int."""
        if ":label" in indexes:
            # Parse labels in data
            indexes = indexes.replace(":label ", "")
            tx_matches: List[int] = []
            for idx in self.transaction.data:
                if indexes == idx.label:
                    tx_matches.append(idx.index)
            return tx_matches
        else:
            # Parse ranges in data
            start, end = map(int, indexes.lstrip().split("-"))
            return list(range(start, end + 1))

    def parse_indexes(
        indexes: Union[
            int, str, list[Union[int, str]], list[str], list[int]
        ],
        do_hint: bool = False,
    ) -> List[int] | int:
        """
        Parse indexes and replace all ranges and labels into tx indexes.
        """
        result: List[int] | int = []

        if do_hint:
            print("Before: " + str(indexes))

        if isinstance(indexes, int):
            result = indexes
        if isinstance(indexes, str):
            result = parse_string_indexes(indexes)
        if isinstance(indexes, list):
            result = []
            for element in indexes:
                parsed = parse_indexes(element)
                if isinstance(parsed, int):
                    result.append(parsed)
                else:
                    result.extend(parsed)
            result = list(set(result))

        if do_hint:
            print("After: " + str(result))
        return result

    for expect_section in self.expect:
        expect_section.indexes.data = parse_indexes(
            expect_section.indexes.data
        )
        expect_section.indexes.gas = parse_indexes(
            expect_section.indexes.gas
        )
        expect_section.indexes.value = parse_indexes(
            expect_section.indexes.value
        )

    return self

fill_function()

Return a StateTest spec from a static file.

Source code in packages/testing/src/execution_testing/specs/static_state/state_static.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
def fill_function(self) -> Callable:
    """Return a StateTest spec from a static file."""
    # Check if this test uses tags
    has_tags = False
    tx_tag_dependencies = self.transaction.tag_dependencies()
    if tx_tag_dependencies:
        has_tags = True
    else:
        # Check expect sections for tags
        for expect in self.expect:
            result_tag_dependencies = expect.result.tag_dependencies()
            if result_tag_dependencies:
                has_tags = True
                break

    fully_tagged = True
    for address in self.pre.root:
        if not isinstance(address, Tag):
            fully_tagged = False
            break

    d_g_v_parameters: List[ParameterSet] = []
    for d in self.transaction.data:
        for g in range(len(self.transaction.gas_limit)):
            for v in range(len(self.transaction.value)):
                exception_test = False
                for expect in self.expect:
                    if (
                        expect.has_index(d.index, g, v)
                        and expect.expect_exception is not None
                    ):
                        exception_test = True
                # TODO: This does not take into account exceptions that
                # only happen on specific forks, but this requires a
                # covariant parametrize
                marks = (
                    [pytest.mark.exception_test] if exception_test else []
                )
                id_label = ""
                if len(self.transaction.data) > 1 or d.label is not None:
                    if d.label is not None:
                        id_label = f"{d}"
                    else:
                        id_label = f"d{d}"
                if len(self.transaction.gas_limit) > 1:
                    id_label += f"-g{g}"
                if len(self.transaction.value) > 1:
                    id_label += f"-v{v}"
                d_g_v_parameters.append(
                    pytest.param(d.index, g, v, marks=marks, id=id_label)
                )

    @pytest.mark.valid_at(*self.get_valid_at_forks())
    @pytest.mark.parametrize("d,g,v", d_g_v_parameters)
    def test_state_vectors(
        state_test: StateTestFiller,
        pre: Alloc,
        fork: Fork,
        d: int,
        g: int,
        v: int,
    ) -> None:
        for expect in self.expect:
            if expect.has_index(d, g, v):
                if fork in expect.network:
                    tx_tag_dependencies = (
                        self.transaction.tag_dependencies()
                    )
                    result_tag_dependencies = (
                        expect.result.tag_dependencies()
                    )
                    all_dependencies = {
                        **tx_tag_dependencies,
                        **result_tag_dependencies,
                    }
                    tags = self.pre.setup(pre, all_dependencies)
                    env = self.env.get_environment(tags)
                    exception = (
                        None
                        if expect.expect_exception is None
                        else expect.expect_exception[fork]
                    )
                    tx = self.transaction.get_transaction(
                        tags, d, g, v, exception
                    )
                    post = expect.result.resolve(tags)
                    state_test(
                        env=env,
                        pre=pre,
                        post=post,
                        tx=tx,
                    )
                    return
        pytest.fail(
            f"Expectation not found for d={d}, g={g}, v={v}, fork={fork}"
        )

    if self.info and self.info.pytest_marks:
        for mark in self.info.pytest_marks:
            apply_mark = getattr(pytest.mark, mark)
            test_state_vectors = apply_mark(test_state_vectors)

    if has_tags:
        test_state_vectors = pytest.mark.tagged(test_state_vectors)
        if fully_tagged:
            test_state_vectors = pytest.mark.fully_tagged(
                test_state_vectors
            )
    else:
        test_state_vectors = pytest.mark.untagged(test_state_vectors)

    # All static tests are mutable since we do `pre[0x123...] = Account()`
    test_state_vectors = pytest.mark.pre_alloc_mutable(test_state_vectors)

    return test_state_vectors

get_valid_at_forks()

Return list of forks that are valid for this test.

Source code in packages/testing/src/execution_testing/specs/static_state/state_static.py
226
227
228
229
230
231
def get_valid_at_forks(self) -> List[str]:
    """Return list of forks that are valid for this test."""
    fork_set: Set[Fork] = set()
    for expect in self.expect:
        fork_set.update(expect.network)
    return sorted([str(f) for f in fork_set])

TransactionTest

Bases: BaseTest

Filler type that tests the transaction over the period of a single block.

Source code in packages/testing/src/execution_testing/specs/transaction.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
class TransactionTest(BaseTest):
    """
    Filler type that tests the transaction over the period of a single block.
    """

    tx: Transaction
    pre: Alloc | None = None

    supported_fixture_formats: ClassVar[
        Sequence[FixtureFormat | LabeledFixtureFormat]
    ] = [
        TransactionFixture,
    ]
    supported_execute_formats: ClassVar[Sequence[LabeledExecuteFormat]] = [
        LabeledExecuteFormat(
            TransactionPost,
            "transaction_test",
            "An execute test derived from a transaction test",
        ),
    ]

    def make_transaction_test_fixture(
        self,
    ) -> FillResult:
        """Create a fixture from the transaction test definition."""
        fork = self.fork.transitions_from()
        if self.tx.error is not None:
            result = FixtureResult(
                exception=self.tx.error,
                hash=None,
                intrinsic_gas=0,
                sender=None,
            )
        else:
            intrinsic_gas_cost_calculator = (
                fork.transitions_from().transaction_intrinsic_cost_calculator()
            )
            intrinsic_gas = intrinsic_gas_cost_calculator(
                calldata=self.tx.data,
                contract_creation=self.tx.to is None,
                access_list=self.tx.access_list,
                authorization_list_or_count=self.tx.authorization_list,
            )
            result = FixtureResult(
                exception=None,
                hash=self.tx.hash,
                intrinsic_gas=intrinsic_gas,
                sender=self.tx.sender,
            )

        fixture = TransactionFixture(
            result={
                fork: result,
            },
            transaction=self.tx.with_signature_and_sender().rlp(),
        )
        return FillResult(
            fixture=fixture,
            gas_optimization=None,
            benchmark_gas_used=None,
            benchmark_opcode_count=None,
        )

    def generate(
        self,
        t8n: TransitionTool,
        fixture_format: FixtureFormat,
    ) -> FillResult:
        """Generate the TransactionTest fixture."""
        del t8n

        self.check_exception_test(exception=self.tx.error is not None)
        if fixture_format == TransactionFixture:
            return self.make_transaction_test_fixture()

        raise Exception(f"Unknown fixture format: {fixture_format}")

    def execute(
        self,
        *,
        execute_format: ExecuteFormat,
    ) -> BaseExecute:
        """Execute the transaction test by sending it to the live network."""
        if execute_format == TransactionPost:
            benchmark_mode = self.operation_mode == OpMode.BENCHMARKING
            return TransactionPost(
                blocks=[[self.tx]],
                post={},
                benchmark_mode=benchmark_mode,
            )
        raise Exception(f"Unsupported execute format: {execute_format}")

make_transaction_test_fixture()

Create a fixture from the transaction test definition.

Source code in packages/testing/src/execution_testing/specs/transaction.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
def make_transaction_test_fixture(
    self,
) -> FillResult:
    """Create a fixture from the transaction test definition."""
    fork = self.fork.transitions_from()
    if self.tx.error is not None:
        result = FixtureResult(
            exception=self.tx.error,
            hash=None,
            intrinsic_gas=0,
            sender=None,
        )
    else:
        intrinsic_gas_cost_calculator = (
            fork.transitions_from().transaction_intrinsic_cost_calculator()
        )
        intrinsic_gas = intrinsic_gas_cost_calculator(
            calldata=self.tx.data,
            contract_creation=self.tx.to is None,
            access_list=self.tx.access_list,
            authorization_list_or_count=self.tx.authorization_list,
        )
        result = FixtureResult(
            exception=None,
            hash=self.tx.hash,
            intrinsic_gas=intrinsic_gas,
            sender=self.tx.sender,
        )

    fixture = TransactionFixture(
        result={
            fork: result,
        },
        transaction=self.tx.with_signature_and_sender().rlp(),
    )
    return FillResult(
        fixture=fixture,
        gas_optimization=None,
        benchmark_gas_used=None,
        benchmark_opcode_count=None,
    )

generate(t8n, fixture_format)

Generate the TransactionTest fixture.

Source code in packages/testing/src/execution_testing/specs/transaction.py
86
87
88
89
90
91
92
93
94
95
96
97
98
def generate(
    self,
    t8n: TransitionTool,
    fixture_format: FixtureFormat,
) -> FillResult:
    """Generate the TransactionTest fixture."""
    del t8n

    self.check_exception_test(exception=self.tx.error is not None)
    if fixture_format == TransactionFixture:
        return self.make_transaction_test_fixture()

    raise Exception(f"Unknown fixture format: {fixture_format}")

execute(*, execute_format)

Execute the transaction test by sending it to the live network.

Source code in packages/testing/src/execution_testing/specs/transaction.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
def execute(
    self,
    *,
    execute_format: ExecuteFormat,
) -> BaseExecute:
    """Execute the transaction test by sending it to the live network."""
    if execute_format == TransactionPost:
        benchmark_mode = self.operation_mode == OpMode.BENCHMARKING
        return TransactionPost(
            blocks=[[self.tx]],
            post={},
            benchmark_mode=benchmark_mode,
        )
    raise Exception(f"Unsupported execute format: {execute_format}")