Skip to content

Ethereum Test Specs package

Test spec definitions and utilities.

BaseTest

Bases: BaseModel

Represents a base Ethereum test which must return a single test fixture.

Source code in packages/testing/src/execution_testing/specs/base.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
class BaseTest(BaseModel):
    """
    Represents a base Ethereum test which must return a single test fixture.
    """

    model_config = ConfigDict(extra="forbid")

    fork: Fork | TransitionFork = (
        BaseFork
        # default to BaseFork to allow the filler to set it,
        # instead of each test having to set it
    )
    operation_mode: OpMode | None = None
    gas_optimization_max_gas_limit: int | None = None
    expected_benchmark_gas_used: int | None = None
    skip_gas_used_validation: bool = False
    expected_receipt_status: int | None = None
    is_tx_gas_heavy_test: bool = False
    is_exception_test: bool = False

    # Class variables, to be set by subclasses
    spec_types: ClassVar[Dict[str, Type["BaseTest"]]] = {}
    supported_fixture_formats: ClassVar[
        Sequence[FixtureFormat | LabeledFixtureFormat]
    ] = []
    supported_execute_formats: ClassVar[Sequence[LabeledExecuteFormat]] = []

    supported_markers: ClassVar[Dict[str, str]] = {}

    def model_post_init(self, __context: Any, /) -> None:
        """
        Model post-init to assert that the custom pre-allocation was
        provided and the default was not used.
        """
        super().model_post_init(__context)
        assert self.fork != BaseFork, (
            "Fork was not provided by the filler/executor."
        )

    @classmethod
    def discard_fixture_format_by_marks(
        cls,
        fixture_format: FixtureFormat,
        fork: Fork | TransitionFork,
        markers: List[pytest.Mark],
    ) -> bool:
        """
        Discard a fixture format from filling if the appropriate marker is
        used.
        """
        del fork, fixture_format, markers
        return False

    @classmethod
    def __pydantic_init_subclass__(cls, **kwargs: Any) -> None:
        """
        Register all subclasses of BaseFixture with a fixture format name set
        as possible fixture formats.
        """
        super().__pydantic_init_subclass__(**kwargs)

        # Don't register dynamically generated wrapper classes
        if getattr(cls, "__is_base_test_wrapper__", False):
            return

        if cls.pytest_parameter_name():
            # Register the new fixture format
            BaseTest.spec_types[cls.pytest_parameter_name()] = cls

    @classmethod
    def from_test(
        cls: Type[Self],
        *,
        base_test: "BaseTest",
        **kwargs: Any,
    ) -> Self:
        """Create a test in a different format from a base test."""
        for k in BaseTest.model_fields.keys():
            if k not in kwargs and k in base_test.model_fields_set:
                kwargs[k] = getattr(base_test, k)
        return cls(**kwargs)

    @classmethod
    def discard_execute_format_by_marks(
        cls,
        execute_format: ExecuteFormat,
        fork: Fork | TransitionFork,
        markers: List[pytest.Mark],
    ) -> bool:
        """
        Discard an execute format from executing if the appropriate marker is
        used.
        """
        del execute_format, fork, markers
        return False

    @abstractmethod
    def generate(
        self,
        *,
        t8n: TransitionTool,
        fixture_format: FixtureFormat,
    ) -> FillResult:
        """Generate the test fixture using the given fixture format."""
        pass

    def execute(
        self,
        *,
        execute_format: ExecuteFormat,
    ) -> BaseExecute:
        """Generate the list of test fixtures."""
        raise Exception(f"Unsupported execute format: {execute_format}")

    @classmethod
    def pytest_parameter_name(cls) -> str:
        """
        Must return the name of the parameter used in pytest to select this
        spec type as filler for the test.

        By default, it returns the underscore separated name of the class.
        """
        if cls == BaseTest:
            return ""
        return reduce(
            lambda x, y: x + ("_" if y.isupper() else "") + y, cls.__name__
        ).lower()

    def check_exception_test(
        self,
        *,
        exception: bool,
    ) -> None:
        """Compare the test marker against the outcome of the test."""
        if self.is_exception_test != exception:
            if exception:
                raise Exception(
                    "Test produced an invalid block or transaction but was "
                    "not marked with the `exception_test` marker. Add the "
                    "`@pytest.mark.exception_test` decorator to the test."
                )
            else:
                raise Exception(
                    "Test didn't produce an invalid block or transaction but "
                    "was marked with the `exception_test` marker. Remove the "
                    "`@pytest.mark.exception_test` decorator from the test."
                )

    def get_genesis_environment(self) -> Environment:
        """
        Get the genesis environment for pre-allocation groups.

        Must be implemented by subclasses to provide the appropriate
        environment.
        """
        raise NotImplementedError(
            f"{self.__class__.__name__} must implement genesis environment "
            "access for use with pre-allocation groups."
        )

    def validate_benchmark_gas(
        self, *, benchmark_gas_used: int | None, gas_benchmark_value: int
    ) -> None:
        """
        Validates the total consumed gas of the last block in the test matches
        the expectation of the benchmark test.

        Requires the following fields to be set:
        - expected_benchmark_gas_used
        - operation_mode
        """
        if self.operation_mode != OpMode.BENCHMARKING:
            return
        assert benchmark_gas_used is not None, "_benchmark_gas_used is not set"
        # Perform gas validation if required for benchmarking.
        # Ensures benchmark tests consume exactly the expected gas.
        if not self.skip_gas_used_validation:
            # Verify that the total gas consumed in the last block
            # matches expectations
            expected_benchmark_gas_used = self.expected_benchmark_gas_used
            if expected_benchmark_gas_used is None:
                expected_benchmark_gas_used = gas_benchmark_value
            diff = benchmark_gas_used - expected_benchmark_gas_used
            assert benchmark_gas_used == expected_benchmark_gas_used, (
                f"Total gas used ({benchmark_gas_used}) does not "
                "match expected benchmark gas "
                f"({expected_benchmark_gas_used}), "
                f"difference: {diff}"
            )
        # Gas used should never exceed the maximum benchmark gas allowed.
        assert benchmark_gas_used <= gas_benchmark_value, (
            f"benchmark_gas_used ({benchmark_gas_used}) exceeds maximum "
            "benchmark gas allowed for this configuration: "
            f"{gas_benchmark_value}"
        )

    def validate_receipt_status(
        self,
        *,
        receipts: List[TransactionReceipt],
        block_number: int,
    ) -> None:
        """
        Validate receipt status for every transaction in a block.

        When expected_receipt_status is set, verify that all
        receipts match. Catches silent OOG failures that roll
        back state and invalidate benchmarks.
        """
        if "expected_receipt_status" not in self.model_fields_set:
            return
        for i, receipt in enumerate(receipts):
            if receipt.status is not None and (
                int(receipt.status) != self.expected_receipt_status
            ):
                raise Exception(
                    f"Transaction {i} in block "
                    f"{block_number} has receipt "
                    f"status {int(receipt.status)}, "
                    f"expected "
                    f"{self.expected_receipt_status}."
                )

model_post_init(__context)

Model post-init to assert that the custom pre-allocation was provided and the default was not used.

Source code in packages/testing/src/execution_testing/specs/base.py
131
132
133
134
135
136
137
138
139
def model_post_init(self, __context: Any, /) -> None:
    """
    Model post-init to assert that the custom pre-allocation was
    provided and the default was not used.
    """
    super().model_post_init(__context)
    assert self.fork != BaseFork, (
        "Fork was not provided by the filler/executor."
    )

discard_fixture_format_by_marks(fixture_format, fork, markers) classmethod

Discard a fixture format from filling if the appropriate marker is used.

Source code in packages/testing/src/execution_testing/specs/base.py
141
142
143
144
145
146
147
148
149
150
151
152
153
@classmethod
def discard_fixture_format_by_marks(
    cls,
    fixture_format: FixtureFormat,
    fork: Fork | TransitionFork,
    markers: List[pytest.Mark],
) -> bool:
    """
    Discard a fixture format from filling if the appropriate marker is
    used.
    """
    del fork, fixture_format, markers
    return False

__pydantic_init_subclass__(**kwargs) classmethod

Register all subclasses of BaseFixture with a fixture format name set as possible fixture formats.

Source code in packages/testing/src/execution_testing/specs/base.py
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
@classmethod
def __pydantic_init_subclass__(cls, **kwargs: Any) -> None:
    """
    Register all subclasses of BaseFixture with a fixture format name set
    as possible fixture formats.
    """
    super().__pydantic_init_subclass__(**kwargs)

    # Don't register dynamically generated wrapper classes
    if getattr(cls, "__is_base_test_wrapper__", False):
        return

    if cls.pytest_parameter_name():
        # Register the new fixture format
        BaseTest.spec_types[cls.pytest_parameter_name()] = cls

from_test(*, base_test, **kwargs) classmethod

Create a test in a different format from a base test.

Source code in packages/testing/src/execution_testing/specs/base.py
171
172
173
174
175
176
177
178
179
180
181
182
@classmethod
def from_test(
    cls: Type[Self],
    *,
    base_test: "BaseTest",
    **kwargs: Any,
) -> Self:
    """Create a test in a different format from a base test."""
    for k in BaseTest.model_fields.keys():
        if k not in kwargs and k in base_test.model_fields_set:
            kwargs[k] = getattr(base_test, k)
    return cls(**kwargs)

discard_execute_format_by_marks(execute_format, fork, markers) classmethod

Discard an execute format from executing if the appropriate marker is used.

Source code in packages/testing/src/execution_testing/specs/base.py
184
185
186
187
188
189
190
191
192
193
194
195
196
@classmethod
def discard_execute_format_by_marks(
    cls,
    execute_format: ExecuteFormat,
    fork: Fork | TransitionFork,
    markers: List[pytest.Mark],
) -> bool:
    """
    Discard an execute format from executing if the appropriate marker is
    used.
    """
    del execute_format, fork, markers
    return False

generate(*, t8n, fixture_format) abstractmethod

Generate the test fixture using the given fixture format.

Source code in packages/testing/src/execution_testing/specs/base.py
198
199
200
201
202
203
204
205
206
@abstractmethod
def generate(
    self,
    *,
    t8n: TransitionTool,
    fixture_format: FixtureFormat,
) -> FillResult:
    """Generate the test fixture using the given fixture format."""
    pass

execute(*, execute_format)

Generate the list of test fixtures.

Source code in packages/testing/src/execution_testing/specs/base.py
208
209
210
211
212
213
214
def execute(
    self,
    *,
    execute_format: ExecuteFormat,
) -> BaseExecute:
    """Generate the list of test fixtures."""
    raise Exception(f"Unsupported execute format: {execute_format}")

pytest_parameter_name() classmethod

Must return the name of the parameter used in pytest to select this spec type as filler for the test.

By default, it returns the underscore separated name of the class.

Source code in packages/testing/src/execution_testing/specs/base.py
216
217
218
219
220
221
222
223
224
225
226
227
228
@classmethod
def pytest_parameter_name(cls) -> str:
    """
    Must return the name of the parameter used in pytest to select this
    spec type as filler for the test.

    By default, it returns the underscore separated name of the class.
    """
    if cls == BaseTest:
        return ""
    return reduce(
        lambda x, y: x + ("_" if y.isupper() else "") + y, cls.__name__
    ).lower()

check_exception_test(*, exception)

Compare the test marker against the outcome of the test.

Source code in packages/testing/src/execution_testing/specs/base.py
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
def check_exception_test(
    self,
    *,
    exception: bool,
) -> None:
    """Compare the test marker against the outcome of the test."""
    if self.is_exception_test != exception:
        if exception:
            raise Exception(
                "Test produced an invalid block or transaction but was "
                "not marked with the `exception_test` marker. Add the "
                "`@pytest.mark.exception_test` decorator to the test."
            )
        else:
            raise Exception(
                "Test didn't produce an invalid block or transaction but "
                "was marked with the `exception_test` marker. Remove the "
                "`@pytest.mark.exception_test` decorator from the test."
            )

get_genesis_environment()

Get the genesis environment for pre-allocation groups.

Must be implemented by subclasses to provide the appropriate environment.

Source code in packages/testing/src/execution_testing/specs/base.py
250
251
252
253
254
255
256
257
258
259
260
def get_genesis_environment(self) -> Environment:
    """
    Get the genesis environment for pre-allocation groups.

    Must be implemented by subclasses to provide the appropriate
    environment.
    """
    raise NotImplementedError(
        f"{self.__class__.__name__} must implement genesis environment "
        "access for use with pre-allocation groups."
    )

validate_benchmark_gas(*, benchmark_gas_used, gas_benchmark_value)

Validates the total consumed gas of the last block in the test matches the expectation of the benchmark test.

Requires the following fields to be set: - expected_benchmark_gas_used - operation_mode

Source code in packages/testing/src/execution_testing/specs/base.py
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
def validate_benchmark_gas(
    self, *, benchmark_gas_used: int | None, gas_benchmark_value: int
) -> None:
    """
    Validates the total consumed gas of the last block in the test matches
    the expectation of the benchmark test.

    Requires the following fields to be set:
    - expected_benchmark_gas_used
    - operation_mode
    """
    if self.operation_mode != OpMode.BENCHMARKING:
        return
    assert benchmark_gas_used is not None, "_benchmark_gas_used is not set"
    # Perform gas validation if required for benchmarking.
    # Ensures benchmark tests consume exactly the expected gas.
    if not self.skip_gas_used_validation:
        # Verify that the total gas consumed in the last block
        # matches expectations
        expected_benchmark_gas_used = self.expected_benchmark_gas_used
        if expected_benchmark_gas_used is None:
            expected_benchmark_gas_used = gas_benchmark_value
        diff = benchmark_gas_used - expected_benchmark_gas_used
        assert benchmark_gas_used == expected_benchmark_gas_used, (
            f"Total gas used ({benchmark_gas_used}) does not "
            "match expected benchmark gas "
            f"({expected_benchmark_gas_used}), "
            f"difference: {diff}"
        )
    # Gas used should never exceed the maximum benchmark gas allowed.
    assert benchmark_gas_used <= gas_benchmark_value, (
        f"benchmark_gas_used ({benchmark_gas_used}) exceeds maximum "
        "benchmark gas allowed for this configuration: "
        f"{gas_benchmark_value}"
    )

validate_receipt_status(*, receipts, block_number)

Validate receipt status for every transaction in a block.

When expected_receipt_status is set, verify that all receipts match. Catches silent OOG failures that roll back state and invalidate benchmarks.

Source code in packages/testing/src/execution_testing/specs/base.py
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
def validate_receipt_status(
    self,
    *,
    receipts: List[TransactionReceipt],
    block_number: int,
) -> None:
    """
    Validate receipt status for every transaction in a block.

    When expected_receipt_status is set, verify that all
    receipts match. Catches silent OOG failures that roll
    back state and invalidate benchmarks.
    """
    if "expected_receipt_status" not in self.model_fields_set:
        return
    for i, receipt in enumerate(receipts):
        if receipt.status is not None and (
            int(receipt.status) != self.expected_receipt_status
        ):
            raise Exception(
                f"Transaction {i} in block "
                f"{block_number} has receipt "
                f"status {int(receipt.status)}, "
                f"expected "
                f"{self.expected_receipt_status}."
            )

BaseStaticTest

Bases: BaseModel

Represents a base class that reads cases from static files.

Source code in packages/testing/src/execution_testing/specs/base_static.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
class BaseStaticTest(BaseModel):
    """Represents a base class that reads cases from static files."""

    formats: ClassVar[List[Type["BaseStaticTest"]]] = []
    formats_type_adapter: ClassVar[TypeAdapter]

    format_name: ClassVar[str] = ""

    @classmethod
    def __pydantic_init_subclass__(cls, **kwargs: Any) -> None:
        """
        Register all subclasses of BaseStaticTest with a static test format
        name set as possible static test format.
        """
        if cls.format_name:
            # Register the new fixture format
            BaseStaticTest.formats.append(cls)
            if len(BaseStaticTest.formats) > 1:
                BaseStaticTest.formats_type_adapter = TypeAdapter(
                    Union[tuple(BaseStaticTest.formats)],
                )
            else:
                BaseStaticTest.formats_type_adapter = TypeAdapter(cls)

    @model_validator(mode="wrap")
    @classmethod
    def _parse_into_subclass(
        cls, v: Any, handler: ValidatorFunctionWrapHandler
    ) -> "BaseStaticTest":
        """Parse the static test into the correct subclass."""
        if cls is BaseStaticTest:
            return BaseStaticTest.formats_type_adapter.validate_python(v)
        return handler(v)

    @abstractmethod
    def fill_function(self) -> Callable:
        """
        Return the test function that can be used to fill the test.

        This method should be implemented by the subclasses.

        The function returned can be optionally decorated with the
        `@pytest.mark.parametrize` decorator to parametrize the test with the
        number of sub test cases.

        Example:
        ```
        @pytest.mark.parametrize("n", [1])
        @pytest.mark.parametrize("m", [1, 2])
        @pytest.mark.valid_from("Homestead")
        def test_state_filler(
            state_test: StateTestFiller,
            fork: Fork,
            pre: Alloc,
            n: int,
            m: int
        ):
            \"\"\"Generate a test from a static state filler.\"\"\"
            assert n == 1
            assert m in [1, 2]
            env = Environment(**self.env.model_dump())
            sender = pre.fund_eoa()
            tx = Transaction(
                ty=0x0,
                nonce=0,
                to=Address(0x1000),
                gas_limit=500000,
                protected=False if fork in [Frontier, Homestead] else True,
                data="",
                sender=sender,
            )
            state_test(env=env, pre=pre, post={}, tx=tx)
        ```

        To aid the generation of the test, the function can be defined and then
        the decorator be applied after defining the function:

        ```
        def test_state_filler(
        state_test: StateTestFiller,
            fork: Fork,
            pre: Alloc,
            n: int,
            m: int,
        ):

        ...

        test_state_filler = pytest.mark.parametrize("n",
            [1])(test_state_filler
        )
        test_state_filler = pytest.mark.parametrize("m",
            [1, 2])(test_state_filler
        )

        if self.valid_from:
            test_state_filler = pytest.mark.valid_from(
                self.valid_from
            )(test_state_filler)

        if self.valid_until:
            test_state_filler = pytest.mark.valid_until(
                self.valid_until
            )(test_state_filler)

        return test_state_filler
        ```

        The function can contain the following parameters on top of the spec
        type parameter (`state_test` in the example above): - `fork`: The fork
        for which the test is currently being filled. - `pre`: The pre-state of
        the test.

        """
        raise NotImplementedError

    @staticmethod
    def remove_comments(data: Dict) -> Dict:
        """Remove comments from a dictionary."""
        result = {}
        for k, v in data.items():
            if isinstance(k, str) and k.startswith("//"):
                continue
            if isinstance(v, dict):
                v = BaseStaticTest.remove_comments(v)
            elif isinstance(v, list):
                v = [
                    BaseStaticTest.remove_comments(i)
                    if isinstance(i, dict)
                    else i
                    for i in v
                ]
            result[k] = v
        return result

    @model_validator(mode="before")
    @classmethod
    def remove_comments_from_model(cls, data: Any) -> Any:
        """Remove comments from the static file loaded, if any."""
        if isinstance(data, dict):
            return BaseStaticTest.remove_comments(data)
        return data

__pydantic_init_subclass__(**kwargs) classmethod

Register all subclasses of BaseStaticTest with a static test format name set as possible static test format.

Source code in packages/testing/src/execution_testing/specs/base_static.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@classmethod
def __pydantic_init_subclass__(cls, **kwargs: Any) -> None:
    """
    Register all subclasses of BaseStaticTest with a static test format
    name set as possible static test format.
    """
    if cls.format_name:
        # Register the new fixture format
        BaseStaticTest.formats.append(cls)
        if len(BaseStaticTest.formats) > 1:
            BaseStaticTest.formats_type_adapter = TypeAdapter(
                Union[tuple(BaseStaticTest.formats)],
            )
        else:
            BaseStaticTest.formats_type_adapter = TypeAdapter(cls)

fill_function() abstractmethod

Return the test function that can be used to fill the test.

This method should be implemented by the subclasses.

The function returned can be optionally decorated with the @pytest.mark.parametrize decorator to parametrize the test with the number of sub test cases.

Example:

@pytest.mark.parametrize("n", [1])
@pytest.mark.parametrize("m", [1, 2])
@pytest.mark.valid_from("Homestead")
def test_state_filler(
    state_test: StateTestFiller,
    fork: Fork,
    pre: Alloc,
    n: int,
    m: int
):
    """Generate a test from a static state filler."""
    assert n == 1
    assert m in [1, 2]
    env = Environment(**self.env.model_dump())
    sender = pre.fund_eoa()
    tx = Transaction(
        ty=0x0,
        nonce=0,
        to=Address(0x1000),
        gas_limit=500000,
        protected=False if fork in [Frontier, Homestead] else True,
        data="",
        sender=sender,
    )
    state_test(env=env, pre=pre, post={}, tx=tx)

To aid the generation of the test, the function can be defined and then the decorator be applied after defining the function:

def test_state_filler(
state_test: StateTestFiller,
    fork: Fork,
    pre: Alloc,
    n: int,
    m: int,
):

...

test_state_filler = pytest.mark.parametrize("n",
    [1])(test_state_filler
)
test_state_filler = pytest.mark.parametrize("m",
    [1, 2])(test_state_filler
)

if self.valid_from:
    test_state_filler = pytest.mark.valid_from(
        self.valid_from
    )(test_state_filler)

if self.valid_until:
    test_state_filler = pytest.mark.valid_until(
        self.valid_until
    )(test_state_filler)

return test_state_filler

The function can contain the following parameters on top of the spec type parameter (state_test in the example above): - fork: The fork for which the test is currently being filled. - pre: The pre-state of the test.

Source code in packages/testing/src/execution_testing/specs/base_static.py
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
@abstractmethod
def fill_function(self) -> Callable:
    """
    Return the test function that can be used to fill the test.

    This method should be implemented by the subclasses.

    The function returned can be optionally decorated with the
    `@pytest.mark.parametrize` decorator to parametrize the test with the
    number of sub test cases.

    Example:
    ```
    @pytest.mark.parametrize("n", [1])
    @pytest.mark.parametrize("m", [1, 2])
    @pytest.mark.valid_from("Homestead")
    def test_state_filler(
        state_test: StateTestFiller,
        fork: Fork,
        pre: Alloc,
        n: int,
        m: int
    ):
        \"\"\"Generate a test from a static state filler.\"\"\"
        assert n == 1
        assert m in [1, 2]
        env = Environment(**self.env.model_dump())
        sender = pre.fund_eoa()
        tx = Transaction(
            ty=0x0,
            nonce=0,
            to=Address(0x1000),
            gas_limit=500000,
            protected=False if fork in [Frontier, Homestead] else True,
            data="",
            sender=sender,
        )
        state_test(env=env, pre=pre, post={}, tx=tx)
    ```

    To aid the generation of the test, the function can be defined and then
    the decorator be applied after defining the function:

    ```
    def test_state_filler(
    state_test: StateTestFiller,
        fork: Fork,
        pre: Alloc,
        n: int,
        m: int,
    ):

    ...

    test_state_filler = pytest.mark.parametrize("n",
        [1])(test_state_filler
    )
    test_state_filler = pytest.mark.parametrize("m",
        [1, 2])(test_state_filler
    )

    if self.valid_from:
        test_state_filler = pytest.mark.valid_from(
            self.valid_from
        )(test_state_filler)

    if self.valid_until:
        test_state_filler = pytest.mark.valid_until(
            self.valid_until
        )(test_state_filler)

    return test_state_filler
    ```

    The function can contain the following parameters on top of the spec
    type parameter (`state_test` in the example above): - `fork`: The fork
    for which the test is currently being filled. - `pre`: The pre-state of
    the test.

    """
    raise NotImplementedError

remove_comments(data) staticmethod

Remove comments from a dictionary.

Source code in packages/testing/src/execution_testing/specs/base_static.py
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
@staticmethod
def remove_comments(data: Dict) -> Dict:
    """Remove comments from a dictionary."""
    result = {}
    for k, v in data.items():
        if isinstance(k, str) and k.startswith("//"):
            continue
        if isinstance(v, dict):
            v = BaseStaticTest.remove_comments(v)
        elif isinstance(v, list):
            v = [
                BaseStaticTest.remove_comments(i)
                if isinstance(i, dict)
                else i
                for i in v
            ]
        result[k] = v
    return result

remove_comments_from_model(data) classmethod

Remove comments from the static file loaded, if any.

Source code in packages/testing/src/execution_testing/specs/base_static.py
154
155
156
157
158
159
160
@model_validator(mode="before")
@classmethod
def remove_comments_from_model(cls, data: Any) -> Any:
    """Remove comments from the static file loaded, if any."""
    if isinstance(data, dict):
        return BaseStaticTest.remove_comments(data)
    return data

BenchmarkTest

Bases: BaseTest

Test type designed specifically for benchmark test cases.

Source code in packages/testing/src/execution_testing/specs/benchmark.py
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
class BenchmarkTest(BaseTest):
    """Test type designed specifically for benchmark test cases."""

    model_config = ConfigDict(extra="forbid")

    pre: Alloc = Field(default_factory=Alloc)
    post: Alloc = Field(default_factory=Alloc)
    tx: Transaction | None = None
    setup_blocks: List[Block] = Field(default_factory=list)
    blocks: List[Block] | None = None
    block_exception: (
        List[TransactionException | BlockException]
        | TransactionException
        | BlockException
        | None
    ) = None
    env: Environment = Field(default_factory=Environment)
    expected_benchmark_gas_used: int | None = None
    gas_benchmark_value: int = Field(
        default_factory=lambda: int(Environment().gas_limit)
    )
    fixed_opcode_count: float | None = None
    expected_opcode_count: int | None = None
    target_opcode: Op | OpcodeTarget | None = None
    code_generator: BenchmarkCodeGenerator | None = None
    # By default, benchmark tests require neither of these
    include_full_post_state_in_output: bool = False
    include_tx_receipts_in_output: bool = False

    supported_fixture_formats: ClassVar[
        Sequence[FixtureFormat | LabeledFixtureFormat]
    ] = [
        BlockchainFixture,
        BlockchainEngineFixture,
        BlockchainEngineXFixture,
    ]

    supported_execute_formats: ClassVar[Sequence[LabeledExecuteFormat]] = [
        LabeledExecuteFormat(
            TransactionPost,
            "benchmark_test",
            "An execute test derived from a benchmark test",
        ),
    ]

    supported_markers: ClassVar[Dict[str, str]] = {
        "blockchain_test_engine_only": (
            "Only generate a blockchain test engine fixture"
        ),
        "blockchain_test_only": "Only generate a blockchain test fixture",
        "repricing": "Mark test as reference test for gas repricing analysis",
    }

    def model_post_init(self, __context: Any, /) -> None:
        """
        Model post-init to assert that the custom pre-allocation was
        provided and the default was not used.
        """
        super().model_post_init(__context)
        assert "pre" in self.model_fields_set, (
            "pre allocation was not provided"
        )

        set_props = [
            name
            for name, val in [
                ("code_generator", self.code_generator),
                ("blocks", self.blocks),
                ("tx", self.tx),
            ]
            if val is not None
        ]

        if len(set_props) != 1:
            raise ValueError(
                f"Exactly one must be set, but got {len(set_props)}: "
                f"{', '.join(set_props)}"
            )

        blocks: List[Block] = self.setup_blocks

        if self.fixed_opcode_count is not None and self.code_generator is None:
            pytest.skip(
                "Cannot run fixed opcode count tests without a code generator"
            )

        if self.code_generator is not None:
            # Inject fixed_opcode_count into the code generator if provided
            self.code_generator.fixed_opcode_count = self.fixed_opcode_count

            # In fixed opcode count mode, skip gas validation since we're
            # measuring performance by operation count, not gas usage
            if self.fixed_opcode_count is not None:
                self.skip_gas_used_validation = True
                generated_blocks = (
                    self.generate_fixed_opcode_count_transactions()
                )
            else:
                generated_blocks = self.generate_blocks_from_code_generator()
            blocks += generated_blocks

        elif self.blocks is not None:
            blocks += self.blocks

        elif self.tx is not None:
            gas_limit = (
                self.fork.fork_at(
                    block_number=0, timestamp=0
                ).transaction_gas_limit_cap()
                or self.gas_benchmark_value
            )

            transactions = self.split_transaction(self.tx, gas_limit)

            blocks.append(Block(txs=transactions))

        else:
            raise ValueError(
                "Cannot create BlockchainTest without a code generator, "
                "transactions, or blocks"
            )

        self.blocks = blocks

    @classmethod
    def pytest_parameter_name(cls) -> str:
        """
        Return the parameter name used in pytest
        to select this spec type.
        """
        return "benchmark_test"

    @classmethod
    def discard_fixture_format_by_marks(
        cls,
        fixture_format: FixtureFormat,
        fork: Fork | TransitionFork,
        markers: List[pytest.Mark],
    ) -> bool:
        """
        Discard a fixture format from filling if the
        appropriate marker is used.
        """
        del fork

        if "blockchain_test_only" in [m.name for m in markers]:
            return fixture_format != BlockchainFixture
        if "blockchain_test_engine_only" in [m.name for m in markers]:
            return fixture_format != BlockchainEngineFixture
        return False

    def get_genesis_environment(self) -> Environment:
        """Get the genesis environment for this benchmark test."""
        return self.generate_blockchain_test().get_genesis_environment()

    def split_transaction(
        self, tx: Transaction, gas_limit_cap: int | None
    ) -> List[Transaction]:
        """
        Split a transaction that exceeds the gas
        limit cap into multiple transactions.
        """
        if gas_limit_cap is None:
            tx.gas_limit = HexNumber(self.gas_benchmark_value)
            return [tx]

        if gas_limit_cap >= self.gas_benchmark_value:
            tx.gas_limit = HexNumber(self.gas_benchmark_value)
            return [tx]

        num_splits = math.ceil(self.gas_benchmark_value / gas_limit_cap)
        remaining_gas = self.gas_benchmark_value

        split_transactions = []
        for i in range(num_splits):
            split_tx = tx.model_copy()
            split_tx.gas_limit = HexNumber(
                remaining_gas if i == num_splits - 1 else gas_limit_cap
            )
            remaining_gas -= gas_limit_cap
            split_tx.nonce = HexNumber(tx.nonce + i)
            split_transactions.append(split_tx)

        return split_transactions

    def generate_blocks_from_code_generator(self) -> List[Block]:
        """Generate blocks using the code generator."""
        if self.code_generator is None:
            raise Exception("Code generator is not set")
        self.code_generator.deploy_contracts(
            pre=self.pre, fork=self.fork.fork_at(block_number=0, timestamp=0)
        )
        gas_limit = (
            self.fork.fork_at(
                block_number=0, timestamp=0
            ).transaction_gas_limit_cap()
            or self.gas_benchmark_value
        )
        benchmark_tx = self.code_generator.generate_transaction(
            pre=self.pre, gas_benchmark_value=gas_limit
        )

        execution_txs = self.split_transaction(benchmark_tx, gas_limit)
        execution_block = Block(txs=execution_txs)

        return [execution_block]

    def generate_fixed_opcode_count_transactions(self) -> List[Block]:
        """Generate transactions with a fixed opcode count."""
        if self.code_generator is None:
            raise Exception("Code generator is not set")
        self.code_generator.deploy_fix_count_contracts(
            pre=self.pre, fork=self.fork.fork_at(block_number=0, timestamp=0)
        )
        gas_limit = (
            self.fork.fork_at(
                block_number=0, timestamp=0
            ).transaction_gas_limit_cap()
            or self.gas_benchmark_value
        )
        benchmark_tx = self.code_generator.generate_transaction(
            pre=self.pre, gas_benchmark_value=gas_limit
        )
        execution_block = Block(txs=[benchmark_tx])
        return [execution_block]

    def generate_blockchain_test(self) -> BlockchainTest:
        """Create a BlockchainTest from this BenchmarkTest."""
        return BlockchainTest.from_test(
            base_test=self,
            genesis_environment=self.env,
            pre=self.pre,
            post=self.post,
            blocks=self.blocks,
            include_full_post_state_in_output=self.include_full_post_state_in_output,
            include_tx_receipts_in_output=self.include_tx_receipts_in_output,
        )

    @staticmethod
    def _verify_target_opcode_count(
        *,
        target_opcode: Op | OpcodeTarget,
        opcode_count: OpcodeCount,
        expected_opcode_count: int,
        tolerance: float = 0.05,
    ) -> None:
        """Verify target opcode was executed the expected number of times."""
        count_opcode = (
            target_opcode.opcode
            if isinstance(target_opcode, OpcodeTarget)
            else target_opcode
        )
        actual = opcode_count.root.get(count_opcode, 0)
        max_difference = expected_opcode_count * tolerance  # 5% tolerance

        if abs(actual - expected_opcode_count) > max_difference:
            raise ValueError(
                f"Target opcode {target_opcode} count mismatch: "
                f"expected ~{expected_opcode_count} "
                f"(±{tolerance * 100}%), got {actual}"
            )

    def generate(
        self,
        t8n: TransitionTool,
        fixture_format: FixtureFormat,
    ) -> FillResult:
        """Generate the blockchain test fixture."""
        self.check_exception_test(
            exception=self.tx.error is not None if self.tx else False
        )
        if fixture_format in BlockchainTest.supported_fixture_formats:
            fill_result = self.generate_blockchain_test().generate(
                t8n=t8n, fixture_format=fixture_format
            )

            # Verify target opcode count if specified
            if (
                self.fixed_opcode_count is not None
                or self.expected_opcode_count is not None
            ):
                target_opcode = self.target_opcode
                assert target_opcode is not None, "target_opcode is not set"
                opcode_count = fill_result.benchmark_opcode_count
                if opcode_count is not None:
                    if self.fixed_opcode_count is not None:
                        self._verify_target_opcode_count(
                            target_opcode=target_opcode,
                            opcode_count=opcode_count,
                            expected_opcode_count=int(
                                # fixed_opcode_count is in thousands units
                                self.fixed_opcode_count * 1000
                            ),
                        )
                    elif self.expected_opcode_count is not None:
                        self._verify_target_opcode_count(
                            target_opcode=target_opcode,
                            opcode_count=opcode_count,
                            expected_opcode_count=self.expected_opcode_count,
                        )
                    else:
                        raise Exception(
                            "Opcode verification needs either "
                            "`fixed_opcode_count` or `expected_opcode_count` "
                            "to be set."
                        )

            if self.target_opcode is not None:
                fill_result.metadata["target_opcode"] = str(self.target_opcode)

            return fill_result
        else:
            raise Exception(f"Unsupported fixture format: {fixture_format}")

    def execute(
        self,
        *,
        execute_format: ExecuteFormat,
    ) -> BaseExecute:
        """Execute the benchmark test by sending it to the live network."""
        if execute_format == TransactionPost:
            assert self.blocks is not None
            return TransactionPost(
                blocks=[block.txs for block in self.blocks],
                post=self.post,
                benchmark_mode=True,
            )
        raise Exception(f"Unsupported execute format: {execute_format}")

model_post_init(__context)

Model post-init to assert that the custom pre-allocation was provided and the default was not used.

Source code in packages/testing/src/execution_testing/specs/benchmark.py
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
def model_post_init(self, __context: Any, /) -> None:
    """
    Model post-init to assert that the custom pre-allocation was
    provided and the default was not used.
    """
    super().model_post_init(__context)
    assert "pre" in self.model_fields_set, (
        "pre allocation was not provided"
    )

    set_props = [
        name
        for name, val in [
            ("code_generator", self.code_generator),
            ("blocks", self.blocks),
            ("tx", self.tx),
        ]
        if val is not None
    ]

    if len(set_props) != 1:
        raise ValueError(
            f"Exactly one must be set, but got {len(set_props)}: "
            f"{', '.join(set_props)}"
        )

    blocks: List[Block] = self.setup_blocks

    if self.fixed_opcode_count is not None and self.code_generator is None:
        pytest.skip(
            "Cannot run fixed opcode count tests without a code generator"
        )

    if self.code_generator is not None:
        # Inject fixed_opcode_count into the code generator if provided
        self.code_generator.fixed_opcode_count = self.fixed_opcode_count

        # In fixed opcode count mode, skip gas validation since we're
        # measuring performance by operation count, not gas usage
        if self.fixed_opcode_count is not None:
            self.skip_gas_used_validation = True
            generated_blocks = (
                self.generate_fixed_opcode_count_transactions()
            )
        else:
            generated_blocks = self.generate_blocks_from_code_generator()
        blocks += generated_blocks

    elif self.blocks is not None:
        blocks += self.blocks

    elif self.tx is not None:
        gas_limit = (
            self.fork.fork_at(
                block_number=0, timestamp=0
            ).transaction_gas_limit_cap()
            or self.gas_benchmark_value
        )

        transactions = self.split_transaction(self.tx, gas_limit)

        blocks.append(Block(txs=transactions))

    else:
        raise ValueError(
            "Cannot create BlockchainTest without a code generator, "
            "transactions, or blocks"
        )

    self.blocks = blocks

pytest_parameter_name() classmethod

Return the parameter name used in pytest to select this spec type.

Source code in packages/testing/src/execution_testing/specs/benchmark.py
410
411
412
413
414
415
416
@classmethod
def pytest_parameter_name(cls) -> str:
    """
    Return the parameter name used in pytest
    to select this spec type.
    """
    return "benchmark_test"

discard_fixture_format_by_marks(fixture_format, fork, markers) classmethod

Discard a fixture format from filling if the appropriate marker is used.

Source code in packages/testing/src/execution_testing/specs/benchmark.py
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
@classmethod
def discard_fixture_format_by_marks(
    cls,
    fixture_format: FixtureFormat,
    fork: Fork | TransitionFork,
    markers: List[pytest.Mark],
) -> bool:
    """
    Discard a fixture format from filling if the
    appropriate marker is used.
    """
    del fork

    if "blockchain_test_only" in [m.name for m in markers]:
        return fixture_format != BlockchainFixture
    if "blockchain_test_engine_only" in [m.name for m in markers]:
        return fixture_format != BlockchainEngineFixture
    return False

get_genesis_environment()

Get the genesis environment for this benchmark test.

Source code in packages/testing/src/execution_testing/specs/benchmark.py
437
438
439
def get_genesis_environment(self) -> Environment:
    """Get the genesis environment for this benchmark test."""
    return self.generate_blockchain_test().get_genesis_environment()

split_transaction(tx, gas_limit_cap)

Split a transaction that exceeds the gas limit cap into multiple transactions.

Source code in packages/testing/src/execution_testing/specs/benchmark.py
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
def split_transaction(
    self, tx: Transaction, gas_limit_cap: int | None
) -> List[Transaction]:
    """
    Split a transaction that exceeds the gas
    limit cap into multiple transactions.
    """
    if gas_limit_cap is None:
        tx.gas_limit = HexNumber(self.gas_benchmark_value)
        return [tx]

    if gas_limit_cap >= self.gas_benchmark_value:
        tx.gas_limit = HexNumber(self.gas_benchmark_value)
        return [tx]

    num_splits = math.ceil(self.gas_benchmark_value / gas_limit_cap)
    remaining_gas = self.gas_benchmark_value

    split_transactions = []
    for i in range(num_splits):
        split_tx = tx.model_copy()
        split_tx.gas_limit = HexNumber(
            remaining_gas if i == num_splits - 1 else gas_limit_cap
        )
        remaining_gas -= gas_limit_cap
        split_tx.nonce = HexNumber(tx.nonce + i)
        split_transactions.append(split_tx)

    return split_transactions

generate_blocks_from_code_generator()

Generate blocks using the code generator.

Source code in packages/testing/src/execution_testing/specs/benchmark.py
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
def generate_blocks_from_code_generator(self) -> List[Block]:
    """Generate blocks using the code generator."""
    if self.code_generator is None:
        raise Exception("Code generator is not set")
    self.code_generator.deploy_contracts(
        pre=self.pre, fork=self.fork.fork_at(block_number=0, timestamp=0)
    )
    gas_limit = (
        self.fork.fork_at(
            block_number=0, timestamp=0
        ).transaction_gas_limit_cap()
        or self.gas_benchmark_value
    )
    benchmark_tx = self.code_generator.generate_transaction(
        pre=self.pre, gas_benchmark_value=gas_limit
    )

    execution_txs = self.split_transaction(benchmark_tx, gas_limit)
    execution_block = Block(txs=execution_txs)

    return [execution_block]

generate_fixed_opcode_count_transactions()

Generate transactions with a fixed opcode count.

Source code in packages/testing/src/execution_testing/specs/benchmark.py
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
def generate_fixed_opcode_count_transactions(self) -> List[Block]:
    """Generate transactions with a fixed opcode count."""
    if self.code_generator is None:
        raise Exception("Code generator is not set")
    self.code_generator.deploy_fix_count_contracts(
        pre=self.pre, fork=self.fork.fork_at(block_number=0, timestamp=0)
    )
    gas_limit = (
        self.fork.fork_at(
            block_number=0, timestamp=0
        ).transaction_gas_limit_cap()
        or self.gas_benchmark_value
    )
    benchmark_tx = self.code_generator.generate_transaction(
        pre=self.pre, gas_benchmark_value=gas_limit
    )
    execution_block = Block(txs=[benchmark_tx])
    return [execution_block]

generate_blockchain_test()

Create a BlockchainTest from this BenchmarkTest.

Source code in packages/testing/src/execution_testing/specs/benchmark.py
512
513
514
515
516
517
518
519
520
521
522
def generate_blockchain_test(self) -> BlockchainTest:
    """Create a BlockchainTest from this BenchmarkTest."""
    return BlockchainTest.from_test(
        base_test=self,
        genesis_environment=self.env,
        pre=self.pre,
        post=self.post,
        blocks=self.blocks,
        include_full_post_state_in_output=self.include_full_post_state_in_output,
        include_tx_receipts_in_output=self.include_tx_receipts_in_output,
    )

generate(t8n, fixture_format)

Generate the blockchain test fixture.

Source code in packages/testing/src/execution_testing/specs/benchmark.py
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
def generate(
    self,
    t8n: TransitionTool,
    fixture_format: FixtureFormat,
) -> FillResult:
    """Generate the blockchain test fixture."""
    self.check_exception_test(
        exception=self.tx.error is not None if self.tx else False
    )
    if fixture_format in BlockchainTest.supported_fixture_formats:
        fill_result = self.generate_blockchain_test().generate(
            t8n=t8n, fixture_format=fixture_format
        )

        # Verify target opcode count if specified
        if (
            self.fixed_opcode_count is not None
            or self.expected_opcode_count is not None
        ):
            target_opcode = self.target_opcode
            assert target_opcode is not None, "target_opcode is not set"
            opcode_count = fill_result.benchmark_opcode_count
            if opcode_count is not None:
                if self.fixed_opcode_count is not None:
                    self._verify_target_opcode_count(
                        target_opcode=target_opcode,
                        opcode_count=opcode_count,
                        expected_opcode_count=int(
                            # fixed_opcode_count is in thousands units
                            self.fixed_opcode_count * 1000
                        ),
                    )
                elif self.expected_opcode_count is not None:
                    self._verify_target_opcode_count(
                        target_opcode=target_opcode,
                        opcode_count=opcode_count,
                        expected_opcode_count=self.expected_opcode_count,
                    )
                else:
                    raise Exception(
                        "Opcode verification needs either "
                        "`fixed_opcode_count` or `expected_opcode_count` "
                        "to be set."
                    )

        if self.target_opcode is not None:
            fill_result.metadata["target_opcode"] = str(self.target_opcode)

        return fill_result
    else:
        raise Exception(f"Unsupported fixture format: {fixture_format}")

execute(*, execute_format)

Execute the benchmark test by sending it to the live network.

Source code in packages/testing/src/execution_testing/specs/benchmark.py
600
601
602
603
604
605
606
607
608
609
610
611
612
613
def execute(
    self,
    *,
    execute_format: ExecuteFormat,
) -> BaseExecute:
    """Execute the benchmark test by sending it to the live network."""
    if execute_format == TransactionPost:
        assert self.blocks is not None
        return TransactionPost(
            blocks=[block.txs for block in self.blocks],
            post=self.post,
            benchmark_mode=True,
        )
    raise Exception(f"Unsupported execute format: {execute_format}")

OpcodeTarget dataclass

Map a display name to an underlying opcode for count validation.

Use when the fixture metadata should show a descriptive label (e.g. a precompile name) while opcode-count validation targets the real EVM opcode that gets executed (e.g. STATICCALL).

Source code in packages/testing/src/execution_testing/specs/benchmark.py
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
@dataclass(frozen=True)
class OpcodeTarget:
    """
    Map a display name to an underlying opcode for count validation.

    Use when the fixture metadata should show a descriptive label (e.g. a
    precompile name) while opcode-count validation targets the real EVM
    opcode that gets executed (e.g. STATICCALL).
    """

    name: str
    opcode: Op

    def __str__(self) -> str:
        """Return the display name."""
        return self.name

__str__()

Return the display name.

Source code in packages/testing/src/execution_testing/specs/benchmark.py
61
62
63
def __str__(self) -> str:
    """Return the display name."""
    return self.name

BlobsTest

Bases: BaseTest

Test specification for blob tests.

Source code in packages/testing/src/execution_testing/specs/blobs.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
class BlobsTest(BaseTest):
    """Test specification for blob tests."""

    pre: Alloc
    txs: List[NetworkWrappedTransaction | Transaction]
    nonexisting_blob_hashes: List[Hash] | None = None
    get_blobs_version: int | None = None

    supported_execute_formats: ClassVar[Sequence[LabeledExecuteFormat]] = [
        LabeledExecuteFormat(
            BlobTransaction,
            "blob_transaction_test",
            "A test that executes a blob transaction",
        ),
    ]

    def generate(
        self,
        *,
        t8n: TransitionTool,
        fixture_format: FixtureFormat,
    ) -> FillResult:
        """Generate the list of test fixtures."""
        del t8n
        raise Exception(f"Unknown fixture format: {fixture_format}")

    def execute(
        self,
        *,
        execute_format: ExecuteFormat,
    ) -> BaseExecute:
        """Generate the list of test fixtures."""
        if execute_format == BlobTransaction:
            return BlobTransaction(
                txs=self.txs,
                nonexisting_blob_hashes=self.nonexisting_blob_hashes,
                get_blobs_version=self.get_blobs_version,
            )
        raise Exception(f"Unsupported execute format: {execute_format}")

generate(*, t8n, fixture_format)

Generate the list of test fixtures.

Source code in packages/testing/src/execution_testing/specs/blobs.py
36
37
38
39
40
41
42
43
44
def generate(
    self,
    *,
    t8n: TransitionTool,
    fixture_format: FixtureFormat,
) -> FillResult:
    """Generate the list of test fixtures."""
    del t8n
    raise Exception(f"Unknown fixture format: {fixture_format}")

execute(*, execute_format)

Generate the list of test fixtures.

Source code in packages/testing/src/execution_testing/specs/blobs.py
46
47
48
49
50
51
52
53
54
55
56
57
58
def execute(
    self,
    *,
    execute_format: ExecuteFormat,
) -> BaseExecute:
    """Generate the list of test fixtures."""
    if execute_format == BlobTransaction:
        return BlobTransaction(
            txs=self.txs,
            nonexisting_blob_hashes=self.nonexisting_blob_hashes,
            get_blobs_version=self.get_blobs_version,
        )
    raise Exception(f"Unsupported execute format: {execute_format}")

Block

Bases: Header

Block type used to describe block properties in test specs.

Source code in packages/testing/src/execution_testing/specs/blockchain.py
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
class Block(Header):
    """Block type used to describe block properties in test specs."""

    header_verify: Header | None = None
    # If set, the block header will be verified against the specified values.
    rlp_modifier: Header | None = None
    """
    An RLP modifying header which values would be used to override the ones
    returned by the `ethereum_clis.TransitionTool`.
    """
    expected_block_access_list: BlockAccessListExpectation | None = None
    """
    If set, the block access list will be verified and potentially corrupted
    for invalid tests.
    """
    exception: BLOCK_EXCEPTION_TYPE = None
    # If set, the block is expected to be rejected by the client.
    skip_exception_verification: bool = False
    """
    Skip verifying that the exception is returned by the transition tool. This
    could be because the exception is inserted in the block after the
    transition tool evaluates it.
    """
    engine_api_error_code: EngineAPIError | None = None
    """
    If set, the block is expected to produce an error response from the Engine
    API.
    """
    include_receipts_in_output: bool | None = None
    """
    If set to `True`, the block’s output fixture representation will include
    full transaction receipts. If unset, the test-level value is used.
    """
    txs: List[Transaction] = Field(default_factory=list)
    """List of transactions included in the block."""
    ommers: List[Header] | None = None
    """List of ommer headers included in the block."""
    withdrawals: List[Withdrawal] | None = None
    """List of withdrawals to perform for this block."""
    requests: List[Bytes] | None = None
    """Custom list of requests to embed in this block."""
    expected_post_state: Alloc | None = None
    """Post state for verification after block execution in BlockchainTest"""
    block_access_list: Bytes | None = Field(None)
    """EIP-7928: Block-level access lists (serialized)."""

    def set_environment(self, env: Environment) -> Environment:
        """
        Create copy of the environment with the characteristics of this
        specific block.
        """
        new_env_values: Dict[str, Any] = {}

        """
        Values that need to be set in the environment and are `None` for this
        block need to be set to their defaults.
        """
        new_env_values["difficulty"] = self.difficulty
        new_env_values["prev_randao"] = self.prev_randao
        new_env_values["fee_recipient"] = (
            self.fee_recipient
            if self.fee_recipient is not None
            else Environment().fee_recipient
        )
        new_env_values["gas_limit"] = (
            self.gas_limit or env.parent_gas_limit or Environment().gas_limit
        )
        if not isinstance(self.base_fee_per_gas, Removable):
            new_env_values["base_fee_per_gas"] = self.base_fee_per_gas
        new_env_values["withdrawals"] = self.withdrawals
        if not isinstance(self.excess_blob_gas, Removable):
            new_env_values["excess_blob_gas"] = self.excess_blob_gas
        if not isinstance(self.blob_gas_used, Removable):
            new_env_values["blob_gas_used"] = self.blob_gas_used
        if not isinstance(self.parent_beacon_block_root, Removable):
            new_env_values["parent_beacon_block_root"] = (
                self.parent_beacon_block_root
            )
        if (
            not isinstance(self.requests_hash, Removable)
            and self.block_access_list is not None
        ):
            new_env_values["block_access_list_hash"] = (
                self.block_access_list.keccak256()
            )
            new_env_values["block_access_list"] = self.block_access_list
        if (
            not isinstance(self.block_access_list, Removable)
            and self.block_access_list is not None
        ):
            new_env_values["block_access_list"] = self.block_access_list
        """
        These values are required, but they depend on the previous environment,
        so they can be calculated here.
        """
        if self.number is not None:
            new_env_values["number"] = self.number
        else:
            # calculate the next block number for the environment
            if len(env.block_hashes) == 0:
                new_env_values["number"] = 0
            else:
                new_env_values["number"] = (
                    max([Number(n) for n in env.block_hashes.keys()]) + 1
                )

        if self.timestamp is not None:
            new_env_values["timestamp"] = self.timestamp
        else:
            assert env.parent_timestamp is not None
            new_env_values["timestamp"] = int(
                Number(env.parent_timestamp) + 12
            )

        return env.copy(**new_env_values)

rlp_modifier = None class-attribute instance-attribute

An RLP modifying header which values would be used to override the ones returned by the ethereum_clis.TransitionTool.

expected_block_access_list = None class-attribute instance-attribute

If set, the block access list will be verified and potentially corrupted for invalid tests.

skip_exception_verification = False class-attribute instance-attribute

Skip verifying that the exception is returned by the transition tool. This could be because the exception is inserted in the block after the transition tool evaluates it.

engine_api_error_code = None class-attribute instance-attribute

If set, the block is expected to produce an error response from the Engine API.

include_receipts_in_output = None class-attribute instance-attribute

If set to True, the block’s output fixture representation will include full transaction receipts. If unset, the test-level value is used.

txs = Field(default_factory=list) class-attribute instance-attribute

List of transactions included in the block.

ommers = None class-attribute instance-attribute

List of ommer headers included in the block.

withdrawals = None class-attribute instance-attribute

List of withdrawals to perform for this block.

requests = None class-attribute instance-attribute

Custom list of requests to embed in this block.

expected_post_state = None class-attribute instance-attribute

Post state for verification after block execution in BlockchainTest

block_access_list = Field(None) class-attribute instance-attribute

EIP-7928: Block-level access lists (serialized).

set_environment(env)

Create copy of the environment with the characteristics of this specific block.

Source code in packages/testing/src/execution_testing/specs/blockchain.py
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
def set_environment(self, env: Environment) -> Environment:
    """
    Create copy of the environment with the characteristics of this
    specific block.
    """
    new_env_values: Dict[str, Any] = {}

    """
    Values that need to be set in the environment and are `None` for this
    block need to be set to their defaults.
    """
    new_env_values["difficulty"] = self.difficulty
    new_env_values["prev_randao"] = self.prev_randao
    new_env_values["fee_recipient"] = (
        self.fee_recipient
        if self.fee_recipient is not None
        else Environment().fee_recipient
    )
    new_env_values["gas_limit"] = (
        self.gas_limit or env.parent_gas_limit or Environment().gas_limit
    )
    if not isinstance(self.base_fee_per_gas, Removable):
        new_env_values["base_fee_per_gas"] = self.base_fee_per_gas
    new_env_values["withdrawals"] = self.withdrawals
    if not isinstance(self.excess_blob_gas, Removable):
        new_env_values["excess_blob_gas"] = self.excess_blob_gas
    if not isinstance(self.blob_gas_used, Removable):
        new_env_values["blob_gas_used"] = self.blob_gas_used
    if not isinstance(self.parent_beacon_block_root, Removable):
        new_env_values["parent_beacon_block_root"] = (
            self.parent_beacon_block_root
        )
    if (
        not isinstance(self.requests_hash, Removable)
        and self.block_access_list is not None
    ):
        new_env_values["block_access_list_hash"] = (
            self.block_access_list.keccak256()
        )
        new_env_values["block_access_list"] = self.block_access_list
    if (
        not isinstance(self.block_access_list, Removable)
        and self.block_access_list is not None
    ):
        new_env_values["block_access_list"] = self.block_access_list
    """
    These values are required, but they depend on the previous environment,
    so they can be calculated here.
    """
    if self.number is not None:
        new_env_values["number"] = self.number
    else:
        # calculate the next block number for the environment
        if len(env.block_hashes) == 0:
            new_env_values["number"] = 0
        else:
            new_env_values["number"] = (
                max([Number(n) for n in env.block_hashes.keys()]) + 1
            )

    if self.timestamp is not None:
        new_env_values["timestamp"] = self.timestamp
    else:
        assert env.parent_timestamp is not None
        new_env_values["timestamp"] = int(
            Number(env.parent_timestamp) + 12
        )

    return env.copy(**new_env_values)

BlockchainTest

Bases: BaseTest

Filler type that tests multiple blocks (valid or invalid) in a chain.

Source code in packages/testing/src/execution_testing/specs/blockchain.py
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
class BlockchainTest(BaseTest):
    """Filler type that tests multiple blocks (valid or invalid) in a chain."""

    pre: Alloc
    post: Alloc
    blocks: List[Block]
    genesis_environment: Environment = Field(default_factory=Environment)
    chain_id: int = Field(
        default_factory=lambda: ChainConfigDefaults.chain_id,
        validate_default=True,
    )
    include_full_post_state_in_output: bool = True
    """
    Include the post state in the fixture output. Otherwise, the state
    verification is only performed based on the state root.
    """
    include_tx_receipts_in_output: bool = True
    """
    Include transaction receipts in the fixture output.
    """

    supported_fixture_formats: ClassVar[
        Sequence[FixtureFormat | LabeledFixtureFormat]
    ] = [
        BlockchainFixture,
        BlockchainEngineFixture,
        BlockchainEngineXFixture,
        BlockchainEngineSyncFixture,
    ]
    supported_execute_formats: ClassVar[Sequence[LabeledExecuteFormat]] = [
        LabeledExecuteFormat(
            TransactionPost,
            "blockchain_test",
            "An execute test derived from a blockchain test",
        ),
    ]

    supported_markers: ClassVar[Dict[str, str]] = {
        "blockchain_test_engine_only": (
            "Only generate a blockchain test engine fixture"
        ),
        "blockchain_test_only": "Only generate a blockchain test fixture",
    }

    @classmethod
    def discard_fixture_format_by_marks(
        cls,
        fixture_format: FixtureFormat,
        fork: Fork | TransitionFork,
        markers: List[pytest.Mark],
    ) -> bool:
        """
        Discard a fixture format from filling if the appropriate marker is
        used.
        """
        del fork

        marker_names = [m.name for m in markers]
        if (
            fixture_format != BlockchainFixture
            and "blockchain_test_only" in marker_names
        ):
            return True
        if (
            fixture_format
            not in [BlockchainEngineFixture, BlockchainEngineXFixture]
            and "blockchain_test_engine_only" in marker_names
        ):
            return True
        return False

    def get_genesis_environment(self) -> Environment:
        """Get the genesis environment for pre-allocation groups."""
        modified_values = self.genesis_environment.set_fork_requirements(
            self.fork.transitions_from()
        ).model_dump(exclude_unset=True)
        return Environment(**(GENESIS_ENVIRONMENT_DEFAULTS | modified_values))

    def make_genesis(
        self, *, apply_pre_allocation_blockchain: bool
    ) -> Tuple[Alloc, FixtureBlock]:
        """Create a genesis block from the blockchain test definition."""
        env = self.get_genesis_environment()
        assert env.withdrawals is None or len(env.withdrawals) == 0, (
            "withdrawals must be empty at genesis"
        )
        assert (
            env.parent_beacon_block_root is None
            or env.parent_beacon_block_root == Hash(0)
        ), "parent_beacon_block_root must be empty at genesis"

        pre_alloc = self.pre
        if apply_pre_allocation_blockchain:
            pre_alloc = Alloc.merge(
                Alloc.model_validate(
                    self.fork.transitions_to().pre_allocation_blockchain()
                ),
                pre_alloc,
            )
        if empty_accounts := pre_alloc.empty_accounts():
            raise Exception(f"Empty accounts in pre state: {empty_accounts}")
        state_root = pre_alloc.state_root()
        genesis = FixtureHeader.genesis(
            self.fork.transitions_from(), env, state_root
        )

        return (
            pre_alloc,
            FixtureBlockBase(
                header=genesis,
                withdrawals=None if env.withdrawals is None else [],
            ).with_rlp(txs=[]),
        )

    def generate_block_data(
        self,
        t8n: TransitionTool,
        block: Block,
        previous_env: Environment,
        previous_alloc: Alloc | LazyAlloc,
    ) -> BuiltBlock:
        """
        Generate common block data for both make_fixture and make_hive_fixture.
        """
        env = block.set_environment(previous_env)
        fork = self.fork.fork_at(
            block_number=env.number, timestamp=env.timestamp
        )
        env = env.set_fork_requirements(fork)
        txs = [tx.with_signature_and_sender() for tx in block.txs]

        if failing_tx_count := len([tx for tx in txs if tx.error]) > 0:
            if failing_tx_count > 1:
                raise Exception(
                    "test correctness: only one transaction can produce "
                    "an exception in a block"
                )
            if not txs[-1].error:
                raise Exception(
                    "test correctness: the transaction that produces an "
                    "exception must be the last transaction in the block"
                )

        transition_tool_output = t8n.evaluate(
            transition_tool_data=TransitionTool.TransitionToolData(
                alloc=previous_alloc,
                txs=txs,
                env=env,
                fork=fork,
                chain_id=self.chain_id,
                reward=fork.get_reward(),
                blob_schedule=fork.blob_schedule(),
            ),
            slow_request=self.is_tx_gas_heavy_test,
        )

        # One special case of the invalid transactions is the blob gas used,
        # since this value is not included in the transition tool result, but
        # it is included in the block header, and some clients check it before
        # executing the block by simply counting the type-3 txs, we need to set
        # the correct value by default.
        blob_gas_used: int | None = None
        if fork.supports_blobs():
            if (blob_gas_per_blob := fork.blob_gas_per_blob()) > 0:
                blob_gas_used = blob_gas_per_blob * count_blobs(txs)

        header = FixtureHeader(
            **(
                transition_tool_output.result.model_dump(
                    exclude_none=True,
                    exclude={"blob_gas_used", "transactions_trie"},
                )
                | env.model_dump(exclude_none=True, exclude={"blob_gas_used"})
            ),
            blob_gas_used=blob_gas_used,
            transactions_trie=Transaction.list_root(txs),
            extra_data=block.extra_data
            if block.extra_data is not None
            else b"",
            fork=fork,
        )

        if block.header_verify is not None:
            # Verify the header after transition tool processing.
            try:
                block.header_verify.verify(header)
            except Exception as e:
                raise Exception(
                    f"Verification of block {int(env.number)} failed"
                ) from e

        requests_list: List[Bytes] | None = None
        if fork.header_requests_required():
            assert transition_tool_output.result.requests is not None, (
                "Requests are required for this block"
            )
            requests = Requests(
                requests_lists=list(transition_tool_output.result.requests)
            )

            if Hash(requests) != header.requests_hash:
                raise Exception(
                    "Requests root in header does not match the requests "
                    "root in the transition tool output: "
                    f"{header.requests_hash} != {Hash(requests)}"
                )

            requests_list = requests.requests_list

        if block.requests is not None:
            header.requests_hash = Hash(
                Requests(requests_lists=list(block.requests))
            )
            requests_list = block.requests

        # Decode BAL from RLP bytes provided by the transition tool.
        t8n_bal_rlp = transition_tool_output.result.block_access_list
        t8n_bal: BlockAccessList | None = None
        if t8n_bal_rlp is not None:
            t8n_bal = BlockAccessList.from_rlp(t8n_bal_rlp)

        if fork.header_bal_hash_required():
            assert t8n_bal is not None, (
                "Block access list is required for this block but was not "
                "provided by the transition tool"
            )

            computed_block_access_list_hash = Hash(t8n_bal.rlp.keccak256())
            assert (
                computed_block_access_list_hash
                == header.block_access_list_hash
            ), (
                "Block access list hash in header does not match the "
                f"computed hash from BAL: {header.block_access_list_hash} "
                f"!= {computed_block_access_list_hash}"
            )

        if block.rlp_modifier is not None:
            # Modify any parameter specified in the `rlp_modifier` after
            # transition tool processing.
            header = block.rlp_modifier.apply(header)
            header.fork = fork  # Deleted during `apply` because `exclude=True`

        # Process block access list - apply transformer if present for invalid
        # tests
        bal = t8n_bal

        # Always validate BAL structural integrity (ordering, duplicates)
        # if present
        if t8n_bal is not None:
            t8n_bal.validate_structure()

        # If expected BAL is defined, verify against it
        if (
            block.expected_block_access_list is not None
            and t8n_bal is not None
        ):
            block.expected_block_access_list.verify_against(t8n_bal)

            bal = block.expected_block_access_list.modify_if_invalid_test(
                t8n_bal
            )
            if bal != t8n_bal:
                # If the BAL was modified, update the header hash
                header.block_access_list_hash = Hash(bal.rlp.keccak256())

        built_block = BuiltBlock(
            header=header,
            alloc=transition_tool_output.alloc,
            state_root=transition_tool_output.result.state_root,
            env=env,
            txs=txs,
            ommers=[],
            withdrawals=env.withdrawals,
            requests=requests_list,
            result=transition_tool_output.result,
            expected_exception=block.exception,
            engine_api_error_code=block.engine_api_error_code,
            fork=fork,
            block_access_list=bal,
        )

        try:
            rejected_txs = built_block.verify_transactions(
                transition_tool_exceptions_reliable=t8n.exception_mapper.reliable,
            )
            if (
                not rejected_txs
                and block.rlp_modifier is None
                and block.requests is None
                and not block.skip_exception_verification
                and not (
                    block.expected_block_access_list is not None
                    and block.expected_block_access_list._modifier is not None
                )
            ):
                # Only verify block level exception if: - No transaction
                # exception was raised, because these are not reported as block
                # exceptions. - No RLP modifier was specified, because the
                # modifier is what normally produces the block exception. - No
                # requests were specified, because modified requests are also
                # what normally produces the block exception. - No BAL modifier
                # was specified, because modified BAL also produces block
                # exceptions.
                built_block.verify_block_exception(
                    transition_tool_exceptions_reliable=t8n.exception_mapper.reliable,
                )
            verify_result(transition_tool_output.result, env)
        except Exception as e:
            print_traces(t8n.get_traces())
            pprint(transition_tool_output.result)
            pprint(previous_alloc)
            pprint(transition_tool_output.alloc.get())
            raise e

        if len(rejected_txs) > 0 and block.exception is None:
            print_traces(t8n.get_traces())
            raise Exception(
                "one or more transactions in `BlockchainTest` are "
                + "intrinsically invalid, but the block was not expected "
                + "to be invalid. Please verify whether the transaction "
                + "was indeed expected to fail and add the proper "
                + "`block.exception`"
            )

        return built_block

    def verify_post_state(
        self,
        t8n: TransitionTool,
        t8n_state: Alloc,
        expected_state: Alloc | None = None,
    ) -> None:
        """Verify post alloc after all block/s or payload/s are generated."""
        try:
            if expected_state:
                expected_state.verify_post_alloc(t8n_state)
            else:
                self.post.verify_post_alloc(t8n_state)
        except Exception as e:
            print_traces(t8n.get_traces())
            raise e

    def make_fixture(
        self,
        t8n: TransitionTool,
    ) -> FillResult:
        """Create a fixture from the blockchain test definition."""
        fixture_blocks: List[FixtureBlock | InvalidFixtureBlock] = []

        pre, genesis = self.make_genesis(apply_pre_allocation_blockchain=True)

        alloc: Alloc | LazyAlloc = pre
        state_root = genesis.header.state_root
        env = environment_from_parent_header(genesis.header)
        head = genesis.header.block_hash
        invalid_blocks = 0
        benchmark_gas_used: int | None = None
        benchmark_opcode_count: OpcodeCount | None = None
        for block in self.blocks:
            # This is the most common case, the RLP needs to be constructed
            # based on the transactions to be included in the block.
            # Set the environment according to the block to execute.
            built_block = self.generate_block_data(
                t8n=t8n,
                block=block,
                previous_env=env,
                previous_alloc=alloc,
            )
            block_number = int(built_block.header.number)
            is_last_block = block is self.blocks[-1]
            if is_last_block and self.operation_mode == OpMode.BENCHMARKING:
                benchmark_gas_used = int(built_block.result.gas_used)
                benchmark_opcode_count = built_block.result.opcode_count
            if built_block.result.receipts:
                self.validate_receipt_status(
                    receipts=built_block.result.receipts,
                    block_number=block_number,
                )
            include_receipts = (
                block.include_receipts_in_output
                if block.include_receipts_in_output is not None
                else self.include_tx_receipts_in_output
            )
            fixture_blocks.append(
                built_block.get_fixture_block(
                    include_receipts=include_receipts
                )
            )

            # BAL verification already done in to_fixture_bal() if
            # expected_block_access_list set

            if block.exception is None:
                # Update env, alloc and last block hash for the next block.
                alloc = built_block.alloc
                state_root = built_block.state_root
                env = apply_new_parent(built_block.env, built_block.header)
                head = built_block.header.block_hash
            else:
                invalid_blocks += 1

            if block.expected_post_state:
                self.verify_post_state(
                    t8n,
                    t8n_state=alloc.get()
                    if isinstance(alloc, LazyAlloc)
                    else alloc,
                    expected_state=block.expected_post_state,
                )
        self.check_exception_test(exception=invalid_blocks > 0)
        alloc = alloc.get() if isinstance(alloc, LazyAlloc) else alloc
        self.verify_post_state(t8n, t8n_state=alloc)
        fixture = BlockchainFixture(
            fork=self.fork,
            genesis=genesis.header,
            genesis_rlp=genesis.rlp,
            blocks=fixture_blocks,
            last_block_hash=head,
            pre=pre,
            post_state=alloc
            if self.include_full_post_state_in_output
            else None,
            post_state_hash=state_root
            if not self.include_full_post_state_in_output
            else None,
            config=FixtureConfig(
                fork=self.fork,
                blob_schedule=FixtureBlobSchedule.from_blob_schedule(
                    self.fork.transitions_to().blob_schedule()
                ),
                chain_id=self.chain_id,
            ),
        )
        return FillResult(
            fixture=fixture,
            gas_optimization=None,
            benchmark_gas_used=benchmark_gas_used,
            benchmark_opcode_count=benchmark_opcode_count,
            post_verifications=PostVerifications.from_alloc(self.post),
        )

    def make_hive_fixture(
        self,
        t8n: TransitionTool,
        fixture_format: FixtureFormat = BlockchainEngineFixture,
    ) -> FillResult:
        """Create a hive fixture from the blocktest definition."""
        fixture_payloads: List[FixtureEngineNewPayload] = []

        pre, genesis = self.make_genesis(
            apply_pre_allocation_blockchain=fixture_format
            != BlockchainEngineXFixture,
        )
        alloc: Alloc | LazyAlloc = pre
        state_root = genesis.header.state_root
        env = environment_from_parent_header(genesis.header)
        head_hash = genesis.header.block_hash
        invalid_blocks = 0
        benchmark_gas_used: int | None = None
        benchmark_opcode_count: OpcodeCount | None = None
        for block in self.blocks:
            built_block = self.generate_block_data(
                t8n=t8n,
                block=block,
                previous_env=env,
                previous_alloc=alloc,
            )
            block_number = int(built_block.header.number)
            is_last_block = block is self.blocks[-1]
            if is_last_block and self.operation_mode == OpMode.BENCHMARKING:
                benchmark_gas_used = int(built_block.result.gas_used)
                benchmark_opcode_count = built_block.result.opcode_count
            if built_block.result.receipts:
                self.validate_receipt_status(
                    receipts=built_block.result.receipts,
                    block_number=block_number,
                )
            fixture_payloads.append(
                built_block.get_fixture_engine_new_payload()
            )
            if block.exception is None:
                alloc = built_block.alloc
                state_root = built_block.state_root
                env = apply_new_parent(built_block.env, built_block.header)
                head_hash = built_block.header.block_hash
            else:
                invalid_blocks += 1

            if block.expected_post_state:
                self.verify_post_state(
                    t8n,
                    t8n_state=alloc.get()
                    if isinstance(alloc, LazyAlloc)
                    else alloc,
                    expected_state=block.expected_post_state,
                )
        self.check_exception_test(exception=invalid_blocks > 0)
        fcu_version = (
            self.fork.transitions_from().engine_forkchoice_updated_version()
        )
        assert fcu_version is not None, (
            "A hive fixture was requested but no forkchoice update is defined."
            " The framework should never try to execute this test case."
        )

        alloc = alloc.get() if isinstance(alloc, LazyAlloc) else alloc
        self.verify_post_state(t8n, t8n_state=alloc)

        # Create base fixture data, common to all fixture formats
        fixture_data: Dict[str, Any] = {
            "fork": self.fork,
            "genesis": genesis.header,
            "payloads": fixture_payloads,
            "last_block_hash": head_hash,
            "post_state_hash": state_root
            if not self.include_full_post_state_in_output
            else None,
            "config": FixtureConfig(
                fork=self.fork,
                chain_id=self.chain_id,
                blob_schedule=FixtureBlobSchedule.from_blob_schedule(
                    self.fork.transitions_to().blob_schedule()
                ),
            ),
        }

        # Add format-specific fields
        fixture: BaseFixture
        if fixture_format == BlockchainEngineXFixture:
            # For Engine X format, exclude pre (will be provided via shared
            # state) and prepare for state diff optimization
            fixture_data.update(
                {
                    "post_state": alloc
                    if self.include_full_post_state_in_output
                    else None,
                    "pre_hash": "",  # Will be set by BaseTestWrapper
                }
            )
            fixture = BlockchainEngineXFixture(**fixture_data)
        elif fixture_format == BlockchainEngineSyncFixture:
            # Sync fixture format
            assert genesis.header.block_hash != head_hash, (
                "Invalid payload tests negative test via sync is not "
                "supported yet."
            )
            # Most clients require the header to start the sync process, so we
            # create an empty block on top of the last block of the test to
            # send it as new payload and trigger the sync process.
            sync_built_block = self.generate_block_data(
                t8n=t8n,
                block=Block(),
                previous_env=env,
                previous_alloc=alloc,
            )
            fixture_data.update(
                {
                    "sync_payload": (
                        sync_built_block.get_fixture_engine_new_payload()
                    ),
                    "pre": pre,
                    "post_state": alloc
                    if self.include_full_post_state_in_output
                    else None,
                }
            )
            fixture = BlockchainEngineSyncFixture(**fixture_data)
        else:
            # Standard engine fixture
            fixture_data.update(
                {
                    "pre": pre,
                    "post_state": alloc
                    if self.include_full_post_state_in_output
                    else None,
                }
            )
            fixture = BlockchainEngineFixture(**fixture_data)

        return FillResult(
            fixture=fixture,
            gas_optimization=None,
            benchmark_gas_used=benchmark_gas_used,
            benchmark_opcode_count=benchmark_opcode_count,
            post_verifications=PostVerifications.from_alloc(self.post),
        )

    def generate(
        self,
        t8n: TransitionTool,
        fixture_format: FixtureFormat,
    ) -> FillResult:
        """Generate the BlockchainTest fixture."""
        if fixture_format in [
            BlockchainEngineFixture,
            BlockchainEngineXFixture,
            BlockchainEngineSyncFixture,
        ]:
            return self.make_hive_fixture(t8n, fixture_format)
        elif fixture_format == BlockchainFixture:
            return self.make_fixture(t8n)

        raise Exception(f"Unknown fixture format: {fixture_format}")

    def execute(
        self,
        *,
        execute_format: ExecuteFormat,
    ) -> BaseExecute:
        """Generate the list of test fixtures."""
        if execute_format == TransactionPost:
            blocks: List[List[Transaction]] = []
            for block in self.blocks:
                blocks += [block.txs]
            # Pass gas validation params for benchmark tests
            # If not benchmark mode, skip gas used validation
            if self.operation_mode != OpMode.BENCHMARKING:
                self.skip_gas_used_validation = True

            benchmark_mode = self.operation_mode == OpMode.BENCHMARKING
            return TransactionPost(
                blocks=blocks,
                post=self.post,
                benchmark_mode=benchmark_mode,
            )
        raise Exception(f"Unsupported execute format: {execute_format}")

include_full_post_state_in_output = True class-attribute instance-attribute

Include the post state in the fixture output. Otherwise, the state verification is only performed based on the state root.

include_tx_receipts_in_output = True class-attribute instance-attribute

Include transaction receipts in the fixture output.

discard_fixture_format_by_marks(fixture_format, fork, markers) classmethod

Discard a fixture format from filling if the appropriate marker is used.

Source code in packages/testing/src/execution_testing/specs/blockchain.py
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
@classmethod
def discard_fixture_format_by_marks(
    cls,
    fixture_format: FixtureFormat,
    fork: Fork | TransitionFork,
    markers: List[pytest.Mark],
) -> bool:
    """
    Discard a fixture format from filling if the appropriate marker is
    used.
    """
    del fork

    marker_names = [m.name for m in markers]
    if (
        fixture_format != BlockchainFixture
        and "blockchain_test_only" in marker_names
    ):
        return True
    if (
        fixture_format
        not in [BlockchainEngineFixture, BlockchainEngineXFixture]
        and "blockchain_test_engine_only" in marker_names
    ):
        return True
    return False

get_genesis_environment()

Get the genesis environment for pre-allocation groups.

Source code in packages/testing/src/execution_testing/specs/blockchain.py
578
579
580
581
582
583
def get_genesis_environment(self) -> Environment:
    """Get the genesis environment for pre-allocation groups."""
    modified_values = self.genesis_environment.set_fork_requirements(
        self.fork.transitions_from()
    ).model_dump(exclude_unset=True)
    return Environment(**(GENESIS_ENVIRONMENT_DEFAULTS | modified_values))

make_genesis(*, apply_pre_allocation_blockchain)

Create a genesis block from the blockchain test definition.

Source code in packages/testing/src/execution_testing/specs/blockchain.py
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
def make_genesis(
    self, *, apply_pre_allocation_blockchain: bool
) -> Tuple[Alloc, FixtureBlock]:
    """Create a genesis block from the blockchain test definition."""
    env = self.get_genesis_environment()
    assert env.withdrawals is None or len(env.withdrawals) == 0, (
        "withdrawals must be empty at genesis"
    )
    assert (
        env.parent_beacon_block_root is None
        or env.parent_beacon_block_root == Hash(0)
    ), "parent_beacon_block_root must be empty at genesis"

    pre_alloc = self.pre
    if apply_pre_allocation_blockchain:
        pre_alloc = Alloc.merge(
            Alloc.model_validate(
                self.fork.transitions_to().pre_allocation_blockchain()
            ),
            pre_alloc,
        )
    if empty_accounts := pre_alloc.empty_accounts():
        raise Exception(f"Empty accounts in pre state: {empty_accounts}")
    state_root = pre_alloc.state_root()
    genesis = FixtureHeader.genesis(
        self.fork.transitions_from(), env, state_root
    )

    return (
        pre_alloc,
        FixtureBlockBase(
            header=genesis,
            withdrawals=None if env.withdrawals is None else [],
        ).with_rlp(txs=[]),
    )

generate_block_data(t8n, block, previous_env, previous_alloc)

Generate common block data for both make_fixture and make_hive_fixture.

Source code in packages/testing/src/execution_testing/specs/blockchain.py
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
def generate_block_data(
    self,
    t8n: TransitionTool,
    block: Block,
    previous_env: Environment,
    previous_alloc: Alloc | LazyAlloc,
) -> BuiltBlock:
    """
    Generate common block data for both make_fixture and make_hive_fixture.
    """
    env = block.set_environment(previous_env)
    fork = self.fork.fork_at(
        block_number=env.number, timestamp=env.timestamp
    )
    env = env.set_fork_requirements(fork)
    txs = [tx.with_signature_and_sender() for tx in block.txs]

    if failing_tx_count := len([tx for tx in txs if tx.error]) > 0:
        if failing_tx_count > 1:
            raise Exception(
                "test correctness: only one transaction can produce "
                "an exception in a block"
            )
        if not txs[-1].error:
            raise Exception(
                "test correctness: the transaction that produces an "
                "exception must be the last transaction in the block"
            )

    transition_tool_output = t8n.evaluate(
        transition_tool_data=TransitionTool.TransitionToolData(
            alloc=previous_alloc,
            txs=txs,
            env=env,
            fork=fork,
            chain_id=self.chain_id,
            reward=fork.get_reward(),
            blob_schedule=fork.blob_schedule(),
        ),
        slow_request=self.is_tx_gas_heavy_test,
    )

    # One special case of the invalid transactions is the blob gas used,
    # since this value is not included in the transition tool result, but
    # it is included in the block header, and some clients check it before
    # executing the block by simply counting the type-3 txs, we need to set
    # the correct value by default.
    blob_gas_used: int | None = None
    if fork.supports_blobs():
        if (blob_gas_per_blob := fork.blob_gas_per_blob()) > 0:
            blob_gas_used = blob_gas_per_blob * count_blobs(txs)

    header = FixtureHeader(
        **(
            transition_tool_output.result.model_dump(
                exclude_none=True,
                exclude={"blob_gas_used", "transactions_trie"},
            )
            | env.model_dump(exclude_none=True, exclude={"blob_gas_used"})
        ),
        blob_gas_used=blob_gas_used,
        transactions_trie=Transaction.list_root(txs),
        extra_data=block.extra_data
        if block.extra_data is not None
        else b"",
        fork=fork,
    )

    if block.header_verify is not None:
        # Verify the header after transition tool processing.
        try:
            block.header_verify.verify(header)
        except Exception as e:
            raise Exception(
                f"Verification of block {int(env.number)} failed"
            ) from e

    requests_list: List[Bytes] | None = None
    if fork.header_requests_required():
        assert transition_tool_output.result.requests is not None, (
            "Requests are required for this block"
        )
        requests = Requests(
            requests_lists=list(transition_tool_output.result.requests)
        )

        if Hash(requests) != header.requests_hash:
            raise Exception(
                "Requests root in header does not match the requests "
                "root in the transition tool output: "
                f"{header.requests_hash} != {Hash(requests)}"
            )

        requests_list = requests.requests_list

    if block.requests is not None:
        header.requests_hash = Hash(
            Requests(requests_lists=list(block.requests))
        )
        requests_list = block.requests

    # Decode BAL from RLP bytes provided by the transition tool.
    t8n_bal_rlp = transition_tool_output.result.block_access_list
    t8n_bal: BlockAccessList | None = None
    if t8n_bal_rlp is not None:
        t8n_bal = BlockAccessList.from_rlp(t8n_bal_rlp)

    if fork.header_bal_hash_required():
        assert t8n_bal is not None, (
            "Block access list is required for this block but was not "
            "provided by the transition tool"
        )

        computed_block_access_list_hash = Hash(t8n_bal.rlp.keccak256())
        assert (
            computed_block_access_list_hash
            == header.block_access_list_hash
        ), (
            "Block access list hash in header does not match the "
            f"computed hash from BAL: {header.block_access_list_hash} "
            f"!= {computed_block_access_list_hash}"
        )

    if block.rlp_modifier is not None:
        # Modify any parameter specified in the `rlp_modifier` after
        # transition tool processing.
        header = block.rlp_modifier.apply(header)
        header.fork = fork  # Deleted during `apply` because `exclude=True`

    # Process block access list - apply transformer if present for invalid
    # tests
    bal = t8n_bal

    # Always validate BAL structural integrity (ordering, duplicates)
    # if present
    if t8n_bal is not None:
        t8n_bal.validate_structure()

    # If expected BAL is defined, verify against it
    if (
        block.expected_block_access_list is not None
        and t8n_bal is not None
    ):
        block.expected_block_access_list.verify_against(t8n_bal)

        bal = block.expected_block_access_list.modify_if_invalid_test(
            t8n_bal
        )
        if bal != t8n_bal:
            # If the BAL was modified, update the header hash
            header.block_access_list_hash = Hash(bal.rlp.keccak256())

    built_block = BuiltBlock(
        header=header,
        alloc=transition_tool_output.alloc,
        state_root=transition_tool_output.result.state_root,
        env=env,
        txs=txs,
        ommers=[],
        withdrawals=env.withdrawals,
        requests=requests_list,
        result=transition_tool_output.result,
        expected_exception=block.exception,
        engine_api_error_code=block.engine_api_error_code,
        fork=fork,
        block_access_list=bal,
    )

    try:
        rejected_txs = built_block.verify_transactions(
            transition_tool_exceptions_reliable=t8n.exception_mapper.reliable,
        )
        if (
            not rejected_txs
            and block.rlp_modifier is None
            and block.requests is None
            and not block.skip_exception_verification
            and not (
                block.expected_block_access_list is not None
                and block.expected_block_access_list._modifier is not None
            )
        ):
            # Only verify block level exception if: - No transaction
            # exception was raised, because these are not reported as block
            # exceptions. - No RLP modifier was specified, because the
            # modifier is what normally produces the block exception. - No
            # requests were specified, because modified requests are also
            # what normally produces the block exception. - No BAL modifier
            # was specified, because modified BAL also produces block
            # exceptions.
            built_block.verify_block_exception(
                transition_tool_exceptions_reliable=t8n.exception_mapper.reliable,
            )
        verify_result(transition_tool_output.result, env)
    except Exception as e:
        print_traces(t8n.get_traces())
        pprint(transition_tool_output.result)
        pprint(previous_alloc)
        pprint(transition_tool_output.alloc.get())
        raise e

    if len(rejected_txs) > 0 and block.exception is None:
        print_traces(t8n.get_traces())
        raise Exception(
            "one or more transactions in `BlockchainTest` are "
            + "intrinsically invalid, but the block was not expected "
            + "to be invalid. Please verify whether the transaction "
            + "was indeed expected to fail and add the proper "
            + "`block.exception`"
        )

    return built_block

verify_post_state(t8n, t8n_state, expected_state=None)

Verify post alloc after all block/s or payload/s are generated.

Source code in packages/testing/src/execution_testing/specs/blockchain.py
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
def verify_post_state(
    self,
    t8n: TransitionTool,
    t8n_state: Alloc,
    expected_state: Alloc | None = None,
) -> None:
    """Verify post alloc after all block/s or payload/s are generated."""
    try:
        if expected_state:
            expected_state.verify_post_alloc(t8n_state)
        else:
            self.post.verify_post_alloc(t8n_state)
    except Exception as e:
        print_traces(t8n.get_traces())
        raise e

make_fixture(t8n)

Create a fixture from the blockchain test definition.

Source code in packages/testing/src/execution_testing/specs/blockchain.py
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
def make_fixture(
    self,
    t8n: TransitionTool,
) -> FillResult:
    """Create a fixture from the blockchain test definition."""
    fixture_blocks: List[FixtureBlock | InvalidFixtureBlock] = []

    pre, genesis = self.make_genesis(apply_pre_allocation_blockchain=True)

    alloc: Alloc | LazyAlloc = pre
    state_root = genesis.header.state_root
    env = environment_from_parent_header(genesis.header)
    head = genesis.header.block_hash
    invalid_blocks = 0
    benchmark_gas_used: int | None = None
    benchmark_opcode_count: OpcodeCount | None = None
    for block in self.blocks:
        # This is the most common case, the RLP needs to be constructed
        # based on the transactions to be included in the block.
        # Set the environment according to the block to execute.
        built_block = self.generate_block_data(
            t8n=t8n,
            block=block,
            previous_env=env,
            previous_alloc=alloc,
        )
        block_number = int(built_block.header.number)
        is_last_block = block is self.blocks[-1]
        if is_last_block and self.operation_mode == OpMode.BENCHMARKING:
            benchmark_gas_used = int(built_block.result.gas_used)
            benchmark_opcode_count = built_block.result.opcode_count
        if built_block.result.receipts:
            self.validate_receipt_status(
                receipts=built_block.result.receipts,
                block_number=block_number,
            )
        include_receipts = (
            block.include_receipts_in_output
            if block.include_receipts_in_output is not None
            else self.include_tx_receipts_in_output
        )
        fixture_blocks.append(
            built_block.get_fixture_block(
                include_receipts=include_receipts
            )
        )

        # BAL verification already done in to_fixture_bal() if
        # expected_block_access_list set

        if block.exception is None:
            # Update env, alloc and last block hash for the next block.
            alloc = built_block.alloc
            state_root = built_block.state_root
            env = apply_new_parent(built_block.env, built_block.header)
            head = built_block.header.block_hash
        else:
            invalid_blocks += 1

        if block.expected_post_state:
            self.verify_post_state(
                t8n,
                t8n_state=alloc.get()
                if isinstance(alloc, LazyAlloc)
                else alloc,
                expected_state=block.expected_post_state,
            )
    self.check_exception_test(exception=invalid_blocks > 0)
    alloc = alloc.get() if isinstance(alloc, LazyAlloc) else alloc
    self.verify_post_state(t8n, t8n_state=alloc)
    fixture = BlockchainFixture(
        fork=self.fork,
        genesis=genesis.header,
        genesis_rlp=genesis.rlp,
        blocks=fixture_blocks,
        last_block_hash=head,
        pre=pre,
        post_state=alloc
        if self.include_full_post_state_in_output
        else None,
        post_state_hash=state_root
        if not self.include_full_post_state_in_output
        else None,
        config=FixtureConfig(
            fork=self.fork,
            blob_schedule=FixtureBlobSchedule.from_blob_schedule(
                self.fork.transitions_to().blob_schedule()
            ),
            chain_id=self.chain_id,
        ),
    )
    return FillResult(
        fixture=fixture,
        gas_optimization=None,
        benchmark_gas_used=benchmark_gas_used,
        benchmark_opcode_count=benchmark_opcode_count,
        post_verifications=PostVerifications.from_alloc(self.post),
    )

make_hive_fixture(t8n, fixture_format=BlockchainEngineFixture)

Create a hive fixture from the blocktest definition.

Source code in packages/testing/src/execution_testing/specs/blockchain.py
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
def make_hive_fixture(
    self,
    t8n: TransitionTool,
    fixture_format: FixtureFormat = BlockchainEngineFixture,
) -> FillResult:
    """Create a hive fixture from the blocktest definition."""
    fixture_payloads: List[FixtureEngineNewPayload] = []

    pre, genesis = self.make_genesis(
        apply_pre_allocation_blockchain=fixture_format
        != BlockchainEngineXFixture,
    )
    alloc: Alloc | LazyAlloc = pre
    state_root = genesis.header.state_root
    env = environment_from_parent_header(genesis.header)
    head_hash = genesis.header.block_hash
    invalid_blocks = 0
    benchmark_gas_used: int | None = None
    benchmark_opcode_count: OpcodeCount | None = None
    for block in self.blocks:
        built_block = self.generate_block_data(
            t8n=t8n,
            block=block,
            previous_env=env,
            previous_alloc=alloc,
        )
        block_number = int(built_block.header.number)
        is_last_block = block is self.blocks[-1]
        if is_last_block and self.operation_mode == OpMode.BENCHMARKING:
            benchmark_gas_used = int(built_block.result.gas_used)
            benchmark_opcode_count = built_block.result.opcode_count
        if built_block.result.receipts:
            self.validate_receipt_status(
                receipts=built_block.result.receipts,
                block_number=block_number,
            )
        fixture_payloads.append(
            built_block.get_fixture_engine_new_payload()
        )
        if block.exception is None:
            alloc = built_block.alloc
            state_root = built_block.state_root
            env = apply_new_parent(built_block.env, built_block.header)
            head_hash = built_block.header.block_hash
        else:
            invalid_blocks += 1

        if block.expected_post_state:
            self.verify_post_state(
                t8n,
                t8n_state=alloc.get()
                if isinstance(alloc, LazyAlloc)
                else alloc,
                expected_state=block.expected_post_state,
            )
    self.check_exception_test(exception=invalid_blocks > 0)
    fcu_version = (
        self.fork.transitions_from().engine_forkchoice_updated_version()
    )
    assert fcu_version is not None, (
        "A hive fixture was requested but no forkchoice update is defined."
        " The framework should never try to execute this test case."
    )

    alloc = alloc.get() if isinstance(alloc, LazyAlloc) else alloc
    self.verify_post_state(t8n, t8n_state=alloc)

    # Create base fixture data, common to all fixture formats
    fixture_data: Dict[str, Any] = {
        "fork": self.fork,
        "genesis": genesis.header,
        "payloads": fixture_payloads,
        "last_block_hash": head_hash,
        "post_state_hash": state_root
        if not self.include_full_post_state_in_output
        else None,
        "config": FixtureConfig(
            fork=self.fork,
            chain_id=self.chain_id,
            blob_schedule=FixtureBlobSchedule.from_blob_schedule(
                self.fork.transitions_to().blob_schedule()
            ),
        ),
    }

    # Add format-specific fields
    fixture: BaseFixture
    if fixture_format == BlockchainEngineXFixture:
        # For Engine X format, exclude pre (will be provided via shared
        # state) and prepare for state diff optimization
        fixture_data.update(
            {
                "post_state": alloc
                if self.include_full_post_state_in_output
                else None,
                "pre_hash": "",  # Will be set by BaseTestWrapper
            }
        )
        fixture = BlockchainEngineXFixture(**fixture_data)
    elif fixture_format == BlockchainEngineSyncFixture:
        # Sync fixture format
        assert genesis.header.block_hash != head_hash, (
            "Invalid payload tests negative test via sync is not "
            "supported yet."
        )
        # Most clients require the header to start the sync process, so we
        # create an empty block on top of the last block of the test to
        # send it as new payload and trigger the sync process.
        sync_built_block = self.generate_block_data(
            t8n=t8n,
            block=Block(),
            previous_env=env,
            previous_alloc=alloc,
        )
        fixture_data.update(
            {
                "sync_payload": (
                    sync_built_block.get_fixture_engine_new_payload()
                ),
                "pre": pre,
                "post_state": alloc
                if self.include_full_post_state_in_output
                else None,
            }
        )
        fixture = BlockchainEngineSyncFixture(**fixture_data)
    else:
        # Standard engine fixture
        fixture_data.update(
            {
                "pre": pre,
                "post_state": alloc
                if self.include_full_post_state_in_output
                else None,
            }
        )
        fixture = BlockchainEngineFixture(**fixture_data)

    return FillResult(
        fixture=fixture,
        gas_optimization=None,
        benchmark_gas_used=benchmark_gas_used,
        benchmark_opcode_count=benchmark_opcode_count,
        post_verifications=PostVerifications.from_alloc(self.post),
    )

generate(t8n, fixture_format)

Generate the BlockchainTest fixture.

Source code in packages/testing/src/execution_testing/specs/blockchain.py
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
def generate(
    self,
    t8n: TransitionTool,
    fixture_format: FixtureFormat,
) -> FillResult:
    """Generate the BlockchainTest fixture."""
    if fixture_format in [
        BlockchainEngineFixture,
        BlockchainEngineXFixture,
        BlockchainEngineSyncFixture,
    ]:
        return self.make_hive_fixture(t8n, fixture_format)
    elif fixture_format == BlockchainFixture:
        return self.make_fixture(t8n)

    raise Exception(f"Unknown fixture format: {fixture_format}")

execute(*, execute_format)

Generate the list of test fixtures.

Source code in packages/testing/src/execution_testing/specs/blockchain.py
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
def execute(
    self,
    *,
    execute_format: ExecuteFormat,
) -> BaseExecute:
    """Generate the list of test fixtures."""
    if execute_format == TransactionPost:
        blocks: List[List[Transaction]] = []
        for block in self.blocks:
            blocks += [block.txs]
        # Pass gas validation params for benchmark tests
        # If not benchmark mode, skip gas used validation
        if self.operation_mode != OpMode.BENCHMARKING:
            self.skip_gas_used_validation = True

        benchmark_mode = self.operation_mode == OpMode.BENCHMARKING
        return TransactionPost(
            blocks=blocks,
            post=self.post,
            benchmark_mode=benchmark_mode,
        )
    raise Exception(f"Unsupported execute format: {execute_format}")

Header

Bases: CamelModel

Header type used to describe block header properties in test specs.

Source code in packages/testing/src/execution_testing/specs/blockchain.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
class Header(CamelModel):
    """Header type used to describe block header properties in test specs."""

    parent_hash: Hash | None = None
    ommers_hash: Hash | None = None
    fee_recipient: Address | None = None
    state_root: Hash | None = None
    transactions_trie: Hash | None = None
    receipts_root: Hash | None = None
    logs_bloom: Bloom | None = None
    difficulty: HexNumber | None = None
    number: HexNumber | None = None
    gas_limit: HexNumber | None = None
    gas_used: HexNumber | None = None
    timestamp: HexNumber | None = None
    extra_data: Bytes | None = None
    prev_randao: Hash | None = None
    nonce: HeaderNonce | None = None
    base_fee_per_gas: Removable | HexNumber | None = None
    withdrawals_root: Removable | Hash | None = None
    blob_gas_used: Removable | HexNumber | None = None
    excess_blob_gas: Removable | HexNumber | None = None
    parent_beacon_block_root: Removable | Hash | None = None
    requests_hash: Removable | Hash | None = None
    block_access_list_hash: Removable | Hash | None = None

    REMOVE_FIELD: ClassVar[Removable] = Removable()
    """
    Sentinel object used to specify that a header field should be removed.
    """
    EMPTY_FIELD: ClassVar[Removable] = Removable()
    """
    Sentinel object used to specify that a header field must be empty during
    verification.

    This can be used in a test to explicitly skip a field in a block's RLP
    encoding that would otherwise be included in the (json) output when the
    model is serialized. For example:

    ```
    header_modifier = Header(
        excess_blob_gas=Header.REMOVE_FIELD,
    )
    block = Block(
        timestamp=TIMESTAMP,
        rlp_modifier=header_modifier,
        exception=BlockException.INCORRECT_BLOCK_FORMAT,
        engine_api_error_code=EngineAPIError.InvalidParams,
    )
    ```
    """

    model_config = ConfigDict(
        **CamelModel.model_config,
        arbitrary_types_allowed=True,
    )

    @model_serializer(mode="wrap", when_used="json")
    def _serialize_model(self, serializer: Any, info: Any) -> Dict[str, Any]:
        """Exclude Removable fields from serialization."""
        del info
        data = serializer(self)
        return {k: v for k, v in data.items() if not isinstance(v, Removable)}

    @field_validator("withdrawals_root", mode="before")
    @classmethod
    def validate_withdrawals_root(cls, value: Any) -> Any:
        """Convert a list of withdrawals into the withdrawals root hash."""
        if isinstance(value, list):
            return Withdrawal.list_root(value)
        return value

    def apply(self, target: FixtureHeader) -> FixtureHeader:
        """
        Produce a fixture header copy with the set values from the modifier.
        """
        overrides = {
            k: (v if v is not Header.REMOVE_FIELD else None)
            for k, v in self.model_dump(exclude_none=True).items()
        }
        unknown = overrides.keys() - target.__class__.model_fields.keys()
        if unknown:
            raise ValueError(
                f"Header fields {unknown} do not exist on "
                f"{target.__class__.__name__}. Check for field name "
                f"mismatches between Header and {target.__class__.__name__}."
            )
        return target.copy(**overrides)

    def verify(self, target: FixtureHeader) -> None:
        """Verify that the header fields from self are as expected."""
        for field_name in self.__class__.model_fields:
            baseline_value = getattr(self, field_name)
            if baseline_value is not None:
                assert baseline_value is not Header.REMOVE_FIELD, (
                    "invalid header"
                )
                value = getattr(target, field_name)
                if baseline_value is Header.EMPTY_FIELD:
                    assert value is None, (
                        f"invalid header field {field_name}, "
                        f"got {value}, want None"
                    )
                    continue
                assert value == baseline_value, (
                    f"invalid header field ({field_name}) value, "
                    + f"got {value}, want {baseline_value}"
                )

REMOVE_FIELD = Removable() class-attribute

Sentinel object used to specify that a header field should be removed.

EMPTY_FIELD = Removable() class-attribute

Sentinel object used to specify that a header field must be empty during verification.

This can be used in a test to explicitly skip a field in a block's RLP encoding that would otherwise be included in the (json) output when the model is serialized. For example:

header_modifier = Header(
    excess_blob_gas=Header.REMOVE_FIELD,
)
block = Block(
    timestamp=TIMESTAMP,
    rlp_modifier=header_modifier,
    exception=BlockException.INCORRECT_BLOCK_FORMAT,
    engine_api_error_code=EngineAPIError.InvalidParams,
)

validate_withdrawals_root(value) classmethod

Convert a list of withdrawals into the withdrawals root hash.

Source code in packages/testing/src/execution_testing/specs/blockchain.py
207
208
209
210
211
212
213
@field_validator("withdrawals_root", mode="before")
@classmethod
def validate_withdrawals_root(cls, value: Any) -> Any:
    """Convert a list of withdrawals into the withdrawals root hash."""
    if isinstance(value, list):
        return Withdrawal.list_root(value)
    return value

apply(target)

Produce a fixture header copy with the set values from the modifier.

Source code in packages/testing/src/execution_testing/specs/blockchain.py
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
def apply(self, target: FixtureHeader) -> FixtureHeader:
    """
    Produce a fixture header copy with the set values from the modifier.
    """
    overrides = {
        k: (v if v is not Header.REMOVE_FIELD else None)
        for k, v in self.model_dump(exclude_none=True).items()
    }
    unknown = overrides.keys() - target.__class__.model_fields.keys()
    if unknown:
        raise ValueError(
            f"Header fields {unknown} do not exist on "
            f"{target.__class__.__name__}. Check for field name "
            f"mismatches between Header and {target.__class__.__name__}."
        )
    return target.copy(**overrides)

verify(target)

Verify that the header fields from self are as expected.

Source code in packages/testing/src/execution_testing/specs/blockchain.py
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
def verify(self, target: FixtureHeader) -> None:
    """Verify that the header fields from self are as expected."""
    for field_name in self.__class__.model_fields:
        baseline_value = getattr(self, field_name)
        if baseline_value is not None:
            assert baseline_value is not Header.REMOVE_FIELD, (
                "invalid header"
            )
            value = getattr(target, field_name)
            if baseline_value is Header.EMPTY_FIELD:
                assert value is None, (
                    f"invalid header field {field_name}, "
                    f"got {value}, want None"
                )
                continue
            assert value == baseline_value, (
                f"invalid header field ({field_name}) value, "
                + f"got {value}, want {baseline_value}"
            )

StateTest

Bases: BaseTest

Filler type that tests transactions over the period of a single block.

Source code in packages/testing/src/execution_testing/specs/state.py
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
class StateTest(BaseTest):
    """
    Filler type that tests transactions over the period of a single block.
    """

    env: Environment = Field(default_factory=Environment)
    pre: Alloc
    post: Alloc
    tx: Transaction
    block_exception: (
        List[TransactionException | BlockException]
        | TransactionException
        | BlockException
        | None
    ) = None
    engine_api_error_code: Optional[EngineAPIError] = None
    blockchain_test_header_verify: Optional[Header] = None
    blockchain_test_rlp_modifier: Optional[Header] = None
    expected_block_access_list: Optional[BlockAccessListExpectation] = None
    chain_id: int = 1

    supported_fixture_formats: ClassVar[
        Sequence[FixtureFormat | LabeledFixtureFormat]
    ] = [
        StateFixture,
    ] + [
        LabeledFixtureFormat(
            fixture_format,
            f"{fixture_format.format_name}_from_state_test",
            f"A {fixture_format.format_name} generated from a state_test",
        )
        for fixture_format in BlockchainTest.supported_fixture_formats
        # Exclude sync fixtures from state tests - they don't make sense for
        # state tests
        if not (
            (
                hasattr(fixture_format, "__name__")
                and "Sync" in fixture_format.__name__
            )
            or (
                hasattr(fixture_format, "format")
                and "Sync" in fixture_format.format.__name__
            )
        )
    ]
    supported_execute_formats: ClassVar[Sequence[LabeledExecuteFormat]] = [
        LabeledExecuteFormat(
            TransactionPost,
            "state_test",
            "An execute test derived from a state test",
        ),
    ]

    supported_markers: ClassVar[Dict[str, str]] = {
        "state_test_only": "Only generate a state test fixture",
    }

    def verify_modified_gas_limit(
        self,
        *,
        t8n: TransitionTool,
        base_tool_result: Result,
        base_tool_alloc: Alloc,
        fork: Fork,
        current_gas_limit: int,
        pre_alloc: Alloc,
        env: Environment,
        ignore_gas_differences: bool,
    ) -> bool:
        """Verify a new lower gas limit yields the same transaction outcome."""
        base_traces = base_tool_result.traces
        assert base_traces is not None, (
            "Traces not collected for gas optimization"
        )
        new_tx = self.tx.copy(
            gas_limit=current_gas_limit
        ).with_signature_and_sender()
        modified_tool_output = t8n.evaluate(
            transition_tool_data=TransitionTool.TransitionToolData(
                alloc=pre_alloc,
                txs=[new_tx],
                env=env,
                fork=fork,
                chain_id=self.chain_id,
                reward=0,  # Reward on state tests is always zero
                blob_schedule=fork.blob_schedule(),
                state_test=True,
            ),
            slow_request=self.is_tx_gas_heavy_test,
        )
        modified_traces = modified_tool_output.result.traces
        assert modified_traces is not None, (
            "Traces not collected for gas optimization"
        )
        if not base_traces.are_equivalent(
            modified_tool_output.result.traces,
            ignore_gas_differences,
        ):
            logger.debug(
                f"Traces are not equivalent (gas_limit={current_gas_limit})"
            )
            return False
        modified_tool_alloc = modified_tool_output.alloc.get()
        try:
            self.post.verify_post_alloc(modified_tool_alloc)
        except Exception as e:
            logger.debug(
                f"Post alloc is not equivalent (gas_limit={current_gas_limit})"
            )
            logger.debug(e)
            return False
        try:
            verify_transactions(
                txs=[new_tx],
                result=modified_tool_output.result,
                transition_tool_exceptions_reliable=t8n.exception_mapper.reliable,
            )
        except Exception as e:
            logger.debug(
                "Transactions are not equivalent "
                f"(gas_limit={current_gas_limit})"
            )
            logger.debug(e)
            return False
        if len(base_tool_alloc.root) != len(modified_tool_alloc.root):
            logger.debug(
                f"Post alloc is not equivalent (gas_limit={current_gas_limit})"
            )
            return False
        if base_tool_alloc.root.keys() != modified_tool_alloc.root.keys():
            logger.debug(
                f"Post alloc is not equivalent (gas_limit={current_gas_limit})"
            )
            return False
        for k in base_tool_alloc.root.keys():
            if k not in modified_tool_alloc:
                logger.debug(
                    "Post alloc is not equivalent "
                    f"(gas_limit={current_gas_limit})"
                )
                return False
            base_account = base_tool_alloc[k]
            modified_account = modified_tool_alloc[k]
            if (modified_account is None) != (base_account is None):
                logger.debug(
                    "Post alloc is not equivalent "
                    f"(gas_limit={current_gas_limit})"
                )
                return False
            if (
                modified_account is not None
                and base_account is not None
                and base_account.nonce != modified_account.nonce
            ):
                logger.debug(
                    "Post alloc is not equivalent "
                    f"(gas_limit={current_gas_limit})"
                )
                return False
        logger.debug(
            f"Gas limit is equivalent (gas_limit={current_gas_limit})"
        )
        return True

    @classmethod
    def discard_fixture_format_by_marks(
        cls,
        fixture_format: FixtureFormat,
        fork: Fork | TransitionFork,
        markers: List[pytest.Mark],
    ) -> bool:
        """
        Discard a fixture format from filling if the appropriate marker is
        used.
        """
        del fork

        if "state_test_only" in [m.name for m in markers]:
            return fixture_format != StateFixture
        return False

    def _generate_blockchain_genesis_environment(self) -> Environment:
        """
        Generate the genesis environment for the BlockchainTest formatted test.
        """
        assert self.env.number >= 1, (
            "genesis block number cannot be negative, set state test "
            "env.number to at least 1"
        )
        assert self.env.timestamp >= 1, (
            "genesis timestamp cannot be negative, set state test "
            "env.timestamp to at least 1"
        )
        # There's only a handful of values that we need to set in the genesis
        # for the environment values at block 1 to make sense:
        # - Number: Needs to be N minus 1
        # - Timestamp: Needs to be zero, because the subsequent
        #              block can come at any time.
        # - Gas Limit: Changes from parent to child, needs to be set in genesis
        # - Base Fee Per Gas: Block's base fee depends on the parent's value
        # - Excess Blob Gas: Block's excess blob gas value depends on
        #                    the parent's value
        genesis_block_number = self.env.number - 1
        genesis_timestamp = 0
        genesis_fork = self.fork.fork_at(
            block_number=genesis_block_number, timestamp=genesis_timestamp
        )
        kwargs: Dict[str, Any] = {
            "number": genesis_block_number,
            "timestamp": genesis_timestamp,
        }

        if "gas_limit" in self.env.model_fields_set:
            kwargs["gas_limit"] = self.env.gas_limit

        if self.env.base_fee_per_gas:
            # Calculate genesis base fee per gas from state test's block#1 env
            kwargs["base_fee_per_gas"] = HexNumber(
                int(int(str(self.env.base_fee_per_gas), 0) * 8 / 7)
            )

        if self.env.excess_blob_gas:
            # The excess blob gas environment value means the value of the
            # context (block header) where the transaction is executed. In a
            # blockchain test, we need to indirectly set the excess blob gas by
            # setting the excess blob gas of the genesis block to the expected
            # value plus the BLOB_TARGET_GAS_PER_BLOCK, which is the value that
            # will be subtracted from the excess blob gas when the first block
            # is mined.
            kwargs["excess_blob_gas"] = self.env.excess_blob_gas + (
                genesis_fork.target_blobs_per_block()
                * genesis_fork.blob_gas_per_blob()
            )

        return Environment(**kwargs)

    def _generate_blockchain_blocks(self) -> List[Block]:
        """
        Generate the single block that represents this state test in a
        BlockchainTest format.
        """
        number = self.env.number
        timestamp = self.env.timestamp
        fork = self.fork.fork_at(block_number=number, timestamp=timestamp)
        kwargs = {
            "number": self.env.number,
            "timestamp": self.env.timestamp,
            "prev_randao": self.env.prev_randao,
            "fee_recipient": self.env.fee_recipient,
            "gas_limit": self.env.gas_limit,
            "extra_data": self.env.extra_data,
            "withdrawals": self.env.withdrawals,
            "parent_beacon_block_root": self.env.parent_beacon_block_root,
            "txs": [self.tx],
            "ommers": [],
            "header_verify": self.blockchain_test_header_verify,
            "rlp_modifier": self.blockchain_test_rlp_modifier,
            "expected_block_access_list": self.expected_block_access_list,
        }
        if not fork.header_prev_randao_required():
            kwargs["difficulty"] = self.env.difficulty
        if "block_exception" in self.model_fields_set:
            kwargs["exception"] = self.block_exception  # type: ignore
        elif "error" in self.tx.model_fields_set:
            kwargs["exception"] = self.tx.error  # type: ignore
        return [Block(**kwargs)]

    def generate_blockchain_test(self) -> BlockchainTest:
        """Generate a BlockchainTest fixture from this StateTest fixture."""
        return BlockchainTest.from_test(
            base_test=self,
            genesis_environment=self._generate_blockchain_genesis_environment(),
            pre=self.pre,
            post=self.post,
            blocks=self._generate_blockchain_blocks(),
        )

    def make_state_test_fixture(
        self,
        t8n: TransitionTool,
    ) -> FillResult:
        """Create a fixture from the state test definition."""
        # We can't generate a state test fixture that names a transition fork,
        # so we get the fork at the block number and timestamp of the state
        # test
        fork = self.fork.fork_at(
            block_number=self.env.number, timestamp=self.env.timestamp
        )

        env = self.env.set_fork_requirements(fork)
        tx = self.tx.with_signature_and_sender(keep_secret_key=True)
        pre_alloc = Alloc.merge(
            Alloc.model_validate(fork.pre_allocation()),
            self.pre,
        )
        if empty_accounts := pre_alloc.empty_accounts():
            raise Exception(f"Empty accounts in pre state: {empty_accounts}")

        transition_tool_output = t8n.evaluate(
            transition_tool_data=TransitionTool.TransitionToolData(
                alloc=pre_alloc,
                txs=[tx],
                env=env,
                fork=fork,
                chain_id=self.chain_id,
                reward=0,  # Reward on state tests is always zero
                blob_schedule=fork.blob_schedule(),
                state_test=True,
            ),
            slow_request=self.is_tx_gas_heavy_test,
        )
        output_alloc = transition_tool_output.alloc.get()

        try:
            self.post.verify_post_alloc(output_alloc)
        except Exception as e:
            print_traces(t8n.get_traces())
            raise e

        try:
            verify_transactions(
                txs=[tx],
                result=transition_tool_output.result,
                transition_tool_exceptions_reliable=t8n.exception_mapper.reliable,
            )
        except Exception as e:
            print_traces(t8n.get_traces())
            pprint(transition_tool_output.result)
            pprint(output_alloc)
            raise e

        gas_optimization: int | None = None

        if (
            self.operation_mode == OpMode.OPTIMIZE_GAS
            or self.operation_mode == OpMode.OPTIMIZE_GAS_POST_PROCESSING
        ):
            ignore_gas_differences = (
                self.operation_mode == OpMode.OPTIMIZE_GAS_POST_PROCESSING
            )
            base_tool_output = transition_tool_output
            base_tool_alloc = base_tool_output.alloc.get()
            base_tool_result = base_tool_output.result

            assert base_tool_result.traces is not None, "Traces not found."

            # First try reducing the gas limit only by one, if the validation
            # fails, it means that the traces change even with the slightest
            # modification to the gas.
            if self.verify_modified_gas_limit(
                t8n=t8n,
                base_tool_result=base_tool_result,
                base_tool_alloc=base_tool_alloc,
                fork=fork,
                current_gas_limit=self.tx.gas_limit - 1,
                pre_alloc=pre_alloc,
                env=env,
                ignore_gas_differences=ignore_gas_differences,
            ):
                minimum_gas_limit = 0
                maximum_gas_limit = int(self.tx.gas_limit)
                while minimum_gas_limit < maximum_gas_limit:
                    current_gas_limit = (
                        maximum_gas_limit + minimum_gas_limit
                    ) // 2
                    if self.verify_modified_gas_limit(
                        t8n=t8n,
                        base_tool_result=base_tool_result,
                        base_tool_alloc=base_tool_alloc,
                        fork=fork,
                        current_gas_limit=current_gas_limit,
                        pre_alloc=pre_alloc,
                        env=env,
                        ignore_gas_differences=ignore_gas_differences,
                    ):
                        maximum_gas_limit = current_gas_limit
                    else:
                        minimum_gas_limit = current_gas_limit + 1
                        if (
                            self.gas_optimization_max_gas_limit is not None
                            and minimum_gas_limit
                            > self.gas_optimization_max_gas_limit
                        ):
                            raise Exception(
                                "Requires more than the minimum "
                                f"{self.gas_optimization_max_gas_limit} "
                                "wanted."
                            )

                assert self.verify_modified_gas_limit(
                    t8n=t8n,
                    base_tool_result=base_tool_result,
                    base_tool_alloc=base_tool_alloc,
                    fork=fork,
                    current_gas_limit=minimum_gas_limit,
                    pre_alloc=pre_alloc,
                    env=env,
                    ignore_gas_differences=ignore_gas_differences,
                )
                gas_optimization = current_gas_limit
            else:
                raise Exception("Impossible to compare.")

        if len(transition_tool_output.result.receipts) == 1:
            receipt = FixtureTransactionReceipt.from_transaction_receipt(
                transition_tool_output.result.receipts[0], tx
            )
            receipt_root = FixtureTransactionReceipt.list_root([receipt])
            assert (
                transition_tool_output.result.receipts_root == receipt_root
            ), (
                f"Receipts root mismatch: "
                f"{transition_tool_output.result.receipts_root} != "
                f"{receipt_root.hex()}"
                f"Receipt: {receipt.rlp()}"
            )
        else:
            receipt = None

        fixture = StateFixture(
            env=FixtureEnvironment(**env.model_dump(exclude_none=True)),
            pre=pre_alloc,
            post={
                fork: [
                    FixtureForkPost(
                        state_root=transition_tool_output.result.state_root,
                        logs_hash=transition_tool_output.result.logs_hash,
                        receipt=receipt,
                        tx_bytes=tx.rlp(),
                        expect_exception=tx.error,
                        state=output_alloc,
                    )
                ]
            },
            transaction=FixtureTransaction.from_transaction(tx),
            config=FixtureConfig(
                blob_schedule=FixtureBlobSchedule.from_blob_schedule(
                    fork.blob_schedule()
                ),
                chain_id=self.chain_id,
            ),
        )
        return FillResult(
            fixture=fixture,
            gas_optimization=gas_optimization,
            benchmark_gas_used=transition_tool_output.result.gas_used,
            benchmark_opcode_count=transition_tool_output.result.opcode_count,
            post_verifications=PostVerifications.from_alloc(self.post),
        )

    def get_genesis_environment(self) -> Environment:
        """Get the genesis environment for pre-allocation groups."""
        return self.generate_blockchain_test().get_genesis_environment()

    def generate(
        self,
        t8n: TransitionTool,
        fixture_format: FixtureFormat,
    ) -> FillResult:
        """Generate the BlockchainTest fixture."""
        self.check_exception_test(exception=self.tx.error is not None)
        if fixture_format in BlockchainTest.supported_fixture_formats:
            return self.generate_blockchain_test().generate(
                t8n=t8n, fixture_format=fixture_format
            )
        elif fixture_format == StateFixture:
            return self.make_state_test_fixture(t8n)

        raise Exception(f"Unknown fixture format: {fixture_format}")

    def execute(
        self,
        *,
        execute_format: ExecuteFormat,
    ) -> BaseExecute:
        """Generate the list of test fixtures."""
        if execute_format == TransactionPost:
            # Pass gas validation params for benchmark tests
            # If not benchmark mode, skip gas used validation
            if self.operation_mode != OpMode.BENCHMARKING:
                self.skip_gas_used_validation = True
            benchmark_mode = self.operation_mode == OpMode.BENCHMARKING
            return TransactionPost(
                blocks=[[self.tx]],
                post=self.post,
                benchmark_mode=benchmark_mode,
            )
        raise Exception(f"Unsupported execute format: {execute_format}")

verify_modified_gas_limit(*, t8n, base_tool_result, base_tool_alloc, fork, current_gas_limit, pre_alloc, env, ignore_gas_differences)

Verify a new lower gas limit yields the same transaction outcome.

Source code in packages/testing/src/execution_testing/specs/state.py
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
def verify_modified_gas_limit(
    self,
    *,
    t8n: TransitionTool,
    base_tool_result: Result,
    base_tool_alloc: Alloc,
    fork: Fork,
    current_gas_limit: int,
    pre_alloc: Alloc,
    env: Environment,
    ignore_gas_differences: bool,
) -> bool:
    """Verify a new lower gas limit yields the same transaction outcome."""
    base_traces = base_tool_result.traces
    assert base_traces is not None, (
        "Traces not collected for gas optimization"
    )
    new_tx = self.tx.copy(
        gas_limit=current_gas_limit
    ).with_signature_and_sender()
    modified_tool_output = t8n.evaluate(
        transition_tool_data=TransitionTool.TransitionToolData(
            alloc=pre_alloc,
            txs=[new_tx],
            env=env,
            fork=fork,
            chain_id=self.chain_id,
            reward=0,  # Reward on state tests is always zero
            blob_schedule=fork.blob_schedule(),
            state_test=True,
        ),
        slow_request=self.is_tx_gas_heavy_test,
    )
    modified_traces = modified_tool_output.result.traces
    assert modified_traces is not None, (
        "Traces not collected for gas optimization"
    )
    if not base_traces.are_equivalent(
        modified_tool_output.result.traces,
        ignore_gas_differences,
    ):
        logger.debug(
            f"Traces are not equivalent (gas_limit={current_gas_limit})"
        )
        return False
    modified_tool_alloc = modified_tool_output.alloc.get()
    try:
        self.post.verify_post_alloc(modified_tool_alloc)
    except Exception as e:
        logger.debug(
            f"Post alloc is not equivalent (gas_limit={current_gas_limit})"
        )
        logger.debug(e)
        return False
    try:
        verify_transactions(
            txs=[new_tx],
            result=modified_tool_output.result,
            transition_tool_exceptions_reliable=t8n.exception_mapper.reliable,
        )
    except Exception as e:
        logger.debug(
            "Transactions are not equivalent "
            f"(gas_limit={current_gas_limit})"
        )
        logger.debug(e)
        return False
    if len(base_tool_alloc.root) != len(modified_tool_alloc.root):
        logger.debug(
            f"Post alloc is not equivalent (gas_limit={current_gas_limit})"
        )
        return False
    if base_tool_alloc.root.keys() != modified_tool_alloc.root.keys():
        logger.debug(
            f"Post alloc is not equivalent (gas_limit={current_gas_limit})"
        )
        return False
    for k in base_tool_alloc.root.keys():
        if k not in modified_tool_alloc:
            logger.debug(
                "Post alloc is not equivalent "
                f"(gas_limit={current_gas_limit})"
            )
            return False
        base_account = base_tool_alloc[k]
        modified_account = modified_tool_alloc[k]
        if (modified_account is None) != (base_account is None):
            logger.debug(
                "Post alloc is not equivalent "
                f"(gas_limit={current_gas_limit})"
            )
            return False
        if (
            modified_account is not None
            and base_account is not None
            and base_account.nonce != modified_account.nonce
        ):
            logger.debug(
                "Post alloc is not equivalent "
                f"(gas_limit={current_gas_limit})"
            )
            return False
    logger.debug(
        f"Gas limit is equivalent (gas_limit={current_gas_limit})"
    )
    return True

discard_fixture_format_by_marks(fixture_format, fork, markers) classmethod

Discard a fixture format from filling if the appropriate marker is used.

Source code in packages/testing/src/execution_testing/specs/state.py
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
@classmethod
def discard_fixture_format_by_marks(
    cls,
    fixture_format: FixtureFormat,
    fork: Fork | TransitionFork,
    markers: List[pytest.Mark],
) -> bool:
    """
    Discard a fixture format from filling if the appropriate marker is
    used.
    """
    del fork

    if "state_test_only" in [m.name for m in markers]:
        return fixture_format != StateFixture
    return False

generate_blockchain_test()

Generate a BlockchainTest fixture from this StateTest fixture.

Source code in packages/testing/src/execution_testing/specs/state.py
335
336
337
338
339
340
341
342
343
def generate_blockchain_test(self) -> BlockchainTest:
    """Generate a BlockchainTest fixture from this StateTest fixture."""
    return BlockchainTest.from_test(
        base_test=self,
        genesis_environment=self._generate_blockchain_genesis_environment(),
        pre=self.pre,
        post=self.post,
        blocks=self._generate_blockchain_blocks(),
    )

make_state_test_fixture(t8n)

Create a fixture from the state test definition.

Source code in packages/testing/src/execution_testing/specs/state.py
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
def make_state_test_fixture(
    self,
    t8n: TransitionTool,
) -> FillResult:
    """Create a fixture from the state test definition."""
    # We can't generate a state test fixture that names a transition fork,
    # so we get the fork at the block number and timestamp of the state
    # test
    fork = self.fork.fork_at(
        block_number=self.env.number, timestamp=self.env.timestamp
    )

    env = self.env.set_fork_requirements(fork)
    tx = self.tx.with_signature_and_sender(keep_secret_key=True)
    pre_alloc = Alloc.merge(
        Alloc.model_validate(fork.pre_allocation()),
        self.pre,
    )
    if empty_accounts := pre_alloc.empty_accounts():
        raise Exception(f"Empty accounts in pre state: {empty_accounts}")

    transition_tool_output = t8n.evaluate(
        transition_tool_data=TransitionTool.TransitionToolData(
            alloc=pre_alloc,
            txs=[tx],
            env=env,
            fork=fork,
            chain_id=self.chain_id,
            reward=0,  # Reward on state tests is always zero
            blob_schedule=fork.blob_schedule(),
            state_test=True,
        ),
        slow_request=self.is_tx_gas_heavy_test,
    )
    output_alloc = transition_tool_output.alloc.get()

    try:
        self.post.verify_post_alloc(output_alloc)
    except Exception as e:
        print_traces(t8n.get_traces())
        raise e

    try:
        verify_transactions(
            txs=[tx],
            result=transition_tool_output.result,
            transition_tool_exceptions_reliable=t8n.exception_mapper.reliable,
        )
    except Exception as e:
        print_traces(t8n.get_traces())
        pprint(transition_tool_output.result)
        pprint(output_alloc)
        raise e

    gas_optimization: int | None = None

    if (
        self.operation_mode == OpMode.OPTIMIZE_GAS
        or self.operation_mode == OpMode.OPTIMIZE_GAS_POST_PROCESSING
    ):
        ignore_gas_differences = (
            self.operation_mode == OpMode.OPTIMIZE_GAS_POST_PROCESSING
        )
        base_tool_output = transition_tool_output
        base_tool_alloc = base_tool_output.alloc.get()
        base_tool_result = base_tool_output.result

        assert base_tool_result.traces is not None, "Traces not found."

        # First try reducing the gas limit only by one, if the validation
        # fails, it means that the traces change even with the slightest
        # modification to the gas.
        if self.verify_modified_gas_limit(
            t8n=t8n,
            base_tool_result=base_tool_result,
            base_tool_alloc=base_tool_alloc,
            fork=fork,
            current_gas_limit=self.tx.gas_limit - 1,
            pre_alloc=pre_alloc,
            env=env,
            ignore_gas_differences=ignore_gas_differences,
        ):
            minimum_gas_limit = 0
            maximum_gas_limit = int(self.tx.gas_limit)
            while minimum_gas_limit < maximum_gas_limit:
                current_gas_limit = (
                    maximum_gas_limit + minimum_gas_limit
                ) // 2
                if self.verify_modified_gas_limit(
                    t8n=t8n,
                    base_tool_result=base_tool_result,
                    base_tool_alloc=base_tool_alloc,
                    fork=fork,
                    current_gas_limit=current_gas_limit,
                    pre_alloc=pre_alloc,
                    env=env,
                    ignore_gas_differences=ignore_gas_differences,
                ):
                    maximum_gas_limit = current_gas_limit
                else:
                    minimum_gas_limit = current_gas_limit + 1
                    if (
                        self.gas_optimization_max_gas_limit is not None
                        and minimum_gas_limit
                        > self.gas_optimization_max_gas_limit
                    ):
                        raise Exception(
                            "Requires more than the minimum "
                            f"{self.gas_optimization_max_gas_limit} "
                            "wanted."
                        )

            assert self.verify_modified_gas_limit(
                t8n=t8n,
                base_tool_result=base_tool_result,
                base_tool_alloc=base_tool_alloc,
                fork=fork,
                current_gas_limit=minimum_gas_limit,
                pre_alloc=pre_alloc,
                env=env,
                ignore_gas_differences=ignore_gas_differences,
            )
            gas_optimization = current_gas_limit
        else:
            raise Exception("Impossible to compare.")

    if len(transition_tool_output.result.receipts) == 1:
        receipt = FixtureTransactionReceipt.from_transaction_receipt(
            transition_tool_output.result.receipts[0], tx
        )
        receipt_root = FixtureTransactionReceipt.list_root([receipt])
        assert (
            transition_tool_output.result.receipts_root == receipt_root
        ), (
            f"Receipts root mismatch: "
            f"{transition_tool_output.result.receipts_root} != "
            f"{receipt_root.hex()}"
            f"Receipt: {receipt.rlp()}"
        )
    else:
        receipt = None

    fixture = StateFixture(
        env=FixtureEnvironment(**env.model_dump(exclude_none=True)),
        pre=pre_alloc,
        post={
            fork: [
                FixtureForkPost(
                    state_root=transition_tool_output.result.state_root,
                    logs_hash=transition_tool_output.result.logs_hash,
                    receipt=receipt,
                    tx_bytes=tx.rlp(),
                    expect_exception=tx.error,
                    state=output_alloc,
                )
            ]
        },
        transaction=FixtureTransaction.from_transaction(tx),
        config=FixtureConfig(
            blob_schedule=FixtureBlobSchedule.from_blob_schedule(
                fork.blob_schedule()
            ),
            chain_id=self.chain_id,
        ),
    )
    return FillResult(
        fixture=fixture,
        gas_optimization=gas_optimization,
        benchmark_gas_used=transition_tool_output.result.gas_used,
        benchmark_opcode_count=transition_tool_output.result.opcode_count,
        post_verifications=PostVerifications.from_alloc(self.post),
    )

get_genesis_environment()

Get the genesis environment for pre-allocation groups.

Source code in packages/testing/src/execution_testing/specs/state.py
518
519
520
def get_genesis_environment(self) -> Environment:
    """Get the genesis environment for pre-allocation groups."""
    return self.generate_blockchain_test().get_genesis_environment()

generate(t8n, fixture_format)

Generate the BlockchainTest fixture.

Source code in packages/testing/src/execution_testing/specs/state.py
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
def generate(
    self,
    t8n: TransitionTool,
    fixture_format: FixtureFormat,
) -> FillResult:
    """Generate the BlockchainTest fixture."""
    self.check_exception_test(exception=self.tx.error is not None)
    if fixture_format in BlockchainTest.supported_fixture_formats:
        return self.generate_blockchain_test().generate(
            t8n=t8n, fixture_format=fixture_format
        )
    elif fixture_format == StateFixture:
        return self.make_state_test_fixture(t8n)

    raise Exception(f"Unknown fixture format: {fixture_format}")

execute(*, execute_format)

Generate the list of test fixtures.

Source code in packages/testing/src/execution_testing/specs/state.py
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
def execute(
    self,
    *,
    execute_format: ExecuteFormat,
) -> BaseExecute:
    """Generate the list of test fixtures."""
    if execute_format == TransactionPost:
        # Pass gas validation params for benchmark tests
        # If not benchmark mode, skip gas used validation
        if self.operation_mode != OpMode.BENCHMARKING:
            self.skip_gas_used_validation = True
        benchmark_mode = self.operation_mode == OpMode.BENCHMARKING
        return TransactionPost(
            blocks=[[self.tx]],
            post=self.post,
            benchmark_mode=benchmark_mode,
        )
    raise Exception(f"Unsupported execute format: {execute_format}")

StateStaticTest

Bases: BaseStaticTest

General State Test static filler from ethereum/tests.

Source code in packages/testing/src/execution_testing/specs/static_state/state_static.py
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
class StateStaticTest(BaseStaticTest):
    """General State Test static filler from ethereum/tests."""

    test_name: str = ""
    format_name: ClassVar[str] = "state_test"

    info: Info | None = Field(None, alias="_info")
    env: EnvironmentInStateTestFiller
    pre: PreInFiller
    transaction: GeneralTransactionInFiller
    expect: List[ExpectSectionInStateTestFiller]

    model_config = ConfigDict(extra="forbid")

    def model_post_init(self, context: Any) -> None:
        """Initialize StateStaticTest."""
        super().model_post_init(context)

    @model_validator(mode="after")
    def match_labels(self) -> Self:
        """Replace labels in expect section with corresponding tx.d indexes."""

        def parse_string_indexes(indexes: str) -> List[int]:
            """Parse index that are string in to list of int."""
            if ":label" in indexes:
                # Parse labels in data
                indexes = indexes.replace(":label ", "")
                tx_matches: List[int] = []
                for idx in self.transaction.data:
                    if indexes == idx.label:
                        tx_matches.append(idx.index)
                return tx_matches
            else:
                # Parse ranges in data
                start, end = map(int, indexes.lstrip().split("-"))
                return list(range(start, end + 1))

        def parse_indexes(
            indexes: Union[
                int, str, list[Union[int, str]], list[str], list[int]
            ],
            do_hint: bool = False,
        ) -> List[int] | int:
            """
            Parse indexes and replace all ranges and labels into tx indexes.
            """
            result: List[int] | int = []

            if do_hint:
                print("Before: " + str(indexes))

            if isinstance(indexes, int):
                result = indexes
            if isinstance(indexes, str):
                result = parse_string_indexes(indexes)
            if isinstance(indexes, list):
                result = []
                for element in indexes:
                    parsed = parse_indexes(element)
                    if isinstance(parsed, int):
                        result.append(parsed)
                    else:
                        result.extend(parsed)
                result = list(set(result))

            if do_hint:
                print("After: " + str(result))
            return result

        for expect_section in self.expect:
            expect_section.indexes.data = parse_indexes(
                expect_section.indexes.data
            )
            expect_section.indexes.gas = parse_indexes(
                expect_section.indexes.gas
            )
            expect_section.indexes.value = parse_indexes(
                expect_section.indexes.value
            )

        return self

    def fill_function(self) -> Callable:
        """Return a StateTest spec from a static file."""
        # Check if this test uses tags
        has_tags = False
        tx_tag_dependencies = self.transaction.tag_dependencies()
        if tx_tag_dependencies:
            has_tags = True
        else:
            # Check expect sections for tags
            for expect in self.expect:
                result_tag_dependencies = expect.result.tag_dependencies()
                if result_tag_dependencies:
                    has_tags = True
                    break

        fully_tagged = True
        for address in self.pre.root:
            if not isinstance(address, Tag):
                fully_tagged = False
                break

        d_g_v_parameters: List[ParameterSet] = []
        for d in self.transaction.data:
            for g in range(len(self.transaction.gas_limit)):
                for v in range(len(self.transaction.value)):
                    exception_test = False
                    for expect in self.expect:
                        if (
                            expect.has_index(d.index, g, v)
                            and expect.expect_exception is not None
                        ):
                            exception_test = True
                    # TODO: This does not take into account exceptions that
                    # only happen on specific forks, but this requires a
                    # covariant parametrize
                    marks = (
                        [pytest.mark.exception_test] if exception_test else []
                    )
                    id_label = ""
                    if len(self.transaction.data) > 1 or d.label is not None:
                        if d.label is not None:
                            id_label = f"{d}"
                        else:
                            id_label = f"d{d}"
                    if len(self.transaction.gas_limit) > 1:
                        id_label += f"-g{g}"
                    if len(self.transaction.value) > 1:
                        id_label += f"-v{v}"
                    d_g_v_parameters.append(
                        pytest.param(d.index, g, v, marks=marks, id=id_label)
                    )

        @pytest.mark.valid_at(*self.get_valid_at_forks())
        @pytest.mark.parametrize("d,g,v", d_g_v_parameters)
        def test_state_vectors(
            state_test: StateTestFiller,
            pre: Alloc,
            fork: Fork,
            d: int,
            g: int,
            v: int,
        ) -> None:
            for expect in self.expect:
                if expect.has_index(d, g, v):
                    if fork in expect.network:
                        tx_tag_dependencies = (
                            self.transaction.tag_dependencies()
                        )
                        result_tag_dependencies = (
                            expect.result.tag_dependencies()
                        )
                        all_dependencies = {
                            **tx_tag_dependencies,
                            **result_tag_dependencies,
                        }
                        tags = self.pre.setup(pre, all_dependencies)
                        env = self.env.get_environment(tags)
                        exception = (
                            None
                            if expect.expect_exception is None
                            else expect.expect_exception[fork]
                        )
                        tx = self.transaction.get_transaction(
                            tags, d, g, v, exception
                        )
                        post = expect.result.resolve(tags)
                        state_test(
                            env=env,
                            pre=pre,
                            post=post,
                            tx=tx,
                        )
                        return
            pytest.fail(
                f"Expectation not found for d={d}, g={g}, v={v}, fork={fork}"
            )

        if self.info and self.info.pytest_marks:
            for mark in self.info.pytest_marks:
                apply_mark = getattr(pytest.mark, mark)
                test_state_vectors = apply_mark(test_state_vectors)

        if has_tags:
            test_state_vectors = pytest.mark.tagged(test_state_vectors)
            if fully_tagged:
                test_state_vectors = pytest.mark.fully_tagged(
                    test_state_vectors
                )
        else:
            test_state_vectors = pytest.mark.untagged(test_state_vectors)

        # All static tests are mutable since we do `pre[0x123...] = Account()`
        test_state_vectors = pytest.mark.pre_alloc_mutable(test_state_vectors)

        return test_state_vectors

    def get_valid_at_forks(self) -> List[str]:
        """Return list of forks that are valid for this test."""
        fork_set: Set[Fork] = set()
        for expect in self.expect:
            fork_set.update(expect.network)
        return sorted([str(f) for f in fork_set])

model_post_init(context)

Initialize StateStaticTest.

Source code in packages/testing/src/execution_testing/specs/static_state/state_static.py
42
43
44
def model_post_init(self, context: Any) -> None:
    """Initialize StateStaticTest."""
    super().model_post_init(context)

match_labels()

Replace labels in expect section with corresponding tx.d indexes.

Source code in packages/testing/src/execution_testing/specs/static_state/state_static.py
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
@model_validator(mode="after")
def match_labels(self) -> Self:
    """Replace labels in expect section with corresponding tx.d indexes."""

    def parse_string_indexes(indexes: str) -> List[int]:
        """Parse index that are string in to list of int."""
        if ":label" in indexes:
            # Parse labels in data
            indexes = indexes.replace(":label ", "")
            tx_matches: List[int] = []
            for idx in self.transaction.data:
                if indexes == idx.label:
                    tx_matches.append(idx.index)
            return tx_matches
        else:
            # Parse ranges in data
            start, end = map(int, indexes.lstrip().split("-"))
            return list(range(start, end + 1))

    def parse_indexes(
        indexes: Union[
            int, str, list[Union[int, str]], list[str], list[int]
        ],
        do_hint: bool = False,
    ) -> List[int] | int:
        """
        Parse indexes and replace all ranges and labels into tx indexes.
        """
        result: List[int] | int = []

        if do_hint:
            print("Before: " + str(indexes))

        if isinstance(indexes, int):
            result = indexes
        if isinstance(indexes, str):
            result = parse_string_indexes(indexes)
        if isinstance(indexes, list):
            result = []
            for element in indexes:
                parsed = parse_indexes(element)
                if isinstance(parsed, int):
                    result.append(parsed)
                else:
                    result.extend(parsed)
            result = list(set(result))

        if do_hint:
            print("After: " + str(result))
        return result

    for expect_section in self.expect:
        expect_section.indexes.data = parse_indexes(
            expect_section.indexes.data
        )
        expect_section.indexes.gas = parse_indexes(
            expect_section.indexes.gas
        )
        expect_section.indexes.value = parse_indexes(
            expect_section.indexes.value
        )

    return self

fill_function()

Return a StateTest spec from a static file.

Source code in packages/testing/src/execution_testing/specs/static_state/state_static.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
def fill_function(self) -> Callable:
    """Return a StateTest spec from a static file."""
    # Check if this test uses tags
    has_tags = False
    tx_tag_dependencies = self.transaction.tag_dependencies()
    if tx_tag_dependencies:
        has_tags = True
    else:
        # Check expect sections for tags
        for expect in self.expect:
            result_tag_dependencies = expect.result.tag_dependencies()
            if result_tag_dependencies:
                has_tags = True
                break

    fully_tagged = True
    for address in self.pre.root:
        if not isinstance(address, Tag):
            fully_tagged = False
            break

    d_g_v_parameters: List[ParameterSet] = []
    for d in self.transaction.data:
        for g in range(len(self.transaction.gas_limit)):
            for v in range(len(self.transaction.value)):
                exception_test = False
                for expect in self.expect:
                    if (
                        expect.has_index(d.index, g, v)
                        and expect.expect_exception is not None
                    ):
                        exception_test = True
                # TODO: This does not take into account exceptions that
                # only happen on specific forks, but this requires a
                # covariant parametrize
                marks = (
                    [pytest.mark.exception_test] if exception_test else []
                )
                id_label = ""
                if len(self.transaction.data) > 1 or d.label is not None:
                    if d.label is not None:
                        id_label = f"{d}"
                    else:
                        id_label = f"d{d}"
                if len(self.transaction.gas_limit) > 1:
                    id_label += f"-g{g}"
                if len(self.transaction.value) > 1:
                    id_label += f"-v{v}"
                d_g_v_parameters.append(
                    pytest.param(d.index, g, v, marks=marks, id=id_label)
                )

    @pytest.mark.valid_at(*self.get_valid_at_forks())
    @pytest.mark.parametrize("d,g,v", d_g_v_parameters)
    def test_state_vectors(
        state_test: StateTestFiller,
        pre: Alloc,
        fork: Fork,
        d: int,
        g: int,
        v: int,
    ) -> None:
        for expect in self.expect:
            if expect.has_index(d, g, v):
                if fork in expect.network:
                    tx_tag_dependencies = (
                        self.transaction.tag_dependencies()
                    )
                    result_tag_dependencies = (
                        expect.result.tag_dependencies()
                    )
                    all_dependencies = {
                        **tx_tag_dependencies,
                        **result_tag_dependencies,
                    }
                    tags = self.pre.setup(pre, all_dependencies)
                    env = self.env.get_environment(tags)
                    exception = (
                        None
                        if expect.expect_exception is None
                        else expect.expect_exception[fork]
                    )
                    tx = self.transaction.get_transaction(
                        tags, d, g, v, exception
                    )
                    post = expect.result.resolve(tags)
                    state_test(
                        env=env,
                        pre=pre,
                        post=post,
                        tx=tx,
                    )
                    return
        pytest.fail(
            f"Expectation not found for d={d}, g={g}, v={v}, fork={fork}"
        )

    if self.info and self.info.pytest_marks:
        for mark in self.info.pytest_marks:
            apply_mark = getattr(pytest.mark, mark)
            test_state_vectors = apply_mark(test_state_vectors)

    if has_tags:
        test_state_vectors = pytest.mark.tagged(test_state_vectors)
        if fully_tagged:
            test_state_vectors = pytest.mark.fully_tagged(
                test_state_vectors
            )
    else:
        test_state_vectors = pytest.mark.untagged(test_state_vectors)

    # All static tests are mutable since we do `pre[0x123...] = Account()`
    test_state_vectors = pytest.mark.pre_alloc_mutable(test_state_vectors)

    return test_state_vectors

get_valid_at_forks()

Return list of forks that are valid for this test.

Source code in packages/testing/src/execution_testing/specs/static_state/state_static.py
226
227
228
229
230
231
def get_valid_at_forks(self) -> List[str]:
    """Return list of forks that are valid for this test."""
    fork_set: Set[Fork] = set()
    for expect in self.expect:
        fork_set.update(expect.network)
    return sorted([str(f) for f in fork_set])

TransactionTest

Bases: BaseTest

Filler type that tests the transaction over the period of a single block.

Source code in packages/testing/src/execution_testing/specs/transaction.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
class TransactionTest(BaseTest):
    """
    Filler type that tests the transaction over the period of a single block.
    """

    tx: Transaction
    pre: Alloc | None = None

    supported_fixture_formats: ClassVar[
        Sequence[FixtureFormat | LabeledFixtureFormat]
    ] = [
        TransactionFixture,
    ]
    supported_execute_formats: ClassVar[Sequence[LabeledExecuteFormat]] = [
        LabeledExecuteFormat(
            TransactionPost,
            "transaction_test",
            "An execute test derived from a transaction test",
        ),
    ]

    def make_transaction_test_fixture(
        self,
    ) -> FillResult:
        """Create a fixture from the transaction test definition."""
        fork = self.fork.transitions_from()
        if self.tx.error is not None:
            result = FixtureResult(
                exception=self.tx.error,
                hash=None,
                intrinsic_gas=0,
                sender=None,
            )
        else:
            intrinsic_gas_cost_calculator = (
                fork.transitions_from().transaction_intrinsic_cost_calculator()
            )
            intrinsic_gas = intrinsic_gas_cost_calculator(
                calldata=self.tx.data,
                contract_creation=self.tx.to is None,
                access_list=self.tx.access_list,
                authorization_list_or_count=self.tx.authorization_list,
            )
            result = FixtureResult(
                exception=None,
                hash=self.tx.hash,
                intrinsic_gas=intrinsic_gas,
                sender=self.tx.sender,
            )

        fixture = TransactionFixture(
            result={
                fork: result,
            },
            transaction=self.tx.with_signature_and_sender().rlp(),
        )
        return FillResult(
            fixture=fixture,
            gas_optimization=None,
            benchmark_gas_used=None,
            benchmark_opcode_count=None,
        )

    def generate(
        self,
        t8n: TransitionTool,
        fixture_format: FixtureFormat,
    ) -> FillResult:
        """Generate the TransactionTest fixture."""
        del t8n

        self.check_exception_test(exception=self.tx.error is not None)
        if fixture_format == TransactionFixture:
            return self.make_transaction_test_fixture()

        raise Exception(f"Unknown fixture format: {fixture_format}")

    def execute(
        self,
        *,
        execute_format: ExecuteFormat,
    ) -> BaseExecute:
        """Execute the transaction test by sending it to the live network."""
        if execute_format == TransactionPost:
            benchmark_mode = self.operation_mode == OpMode.BENCHMARKING
            return TransactionPost(
                blocks=[[self.tx]],
                post={},
                benchmark_mode=benchmark_mode,
            )
        raise Exception(f"Unsupported execute format: {execute_format}")

make_transaction_test_fixture()

Create a fixture from the transaction test definition.

Source code in packages/testing/src/execution_testing/specs/transaction.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
def make_transaction_test_fixture(
    self,
) -> FillResult:
    """Create a fixture from the transaction test definition."""
    fork = self.fork.transitions_from()
    if self.tx.error is not None:
        result = FixtureResult(
            exception=self.tx.error,
            hash=None,
            intrinsic_gas=0,
            sender=None,
        )
    else:
        intrinsic_gas_cost_calculator = (
            fork.transitions_from().transaction_intrinsic_cost_calculator()
        )
        intrinsic_gas = intrinsic_gas_cost_calculator(
            calldata=self.tx.data,
            contract_creation=self.tx.to is None,
            access_list=self.tx.access_list,
            authorization_list_or_count=self.tx.authorization_list,
        )
        result = FixtureResult(
            exception=None,
            hash=self.tx.hash,
            intrinsic_gas=intrinsic_gas,
            sender=self.tx.sender,
        )

    fixture = TransactionFixture(
        result={
            fork: result,
        },
        transaction=self.tx.with_signature_and_sender().rlp(),
    )
    return FillResult(
        fixture=fixture,
        gas_optimization=None,
        benchmark_gas_used=None,
        benchmark_opcode_count=None,
    )

generate(t8n, fixture_format)

Generate the TransactionTest fixture.

Source code in packages/testing/src/execution_testing/specs/transaction.py
86
87
88
89
90
91
92
93
94
95
96
97
98
def generate(
    self,
    t8n: TransitionTool,
    fixture_format: FixtureFormat,
) -> FillResult:
    """Generate the TransactionTest fixture."""
    del t8n

    self.check_exception_test(exception=self.tx.error is not None)
    if fixture_format == TransactionFixture:
        return self.make_transaction_test_fixture()

    raise Exception(f"Unknown fixture format: {fixture_format}")

execute(*, execute_format)

Execute the transaction test by sending it to the live network.

Source code in packages/testing/src/execution_testing/specs/transaction.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
def execute(
    self,
    *,
    execute_format: ExecuteFormat,
) -> BaseExecute:
    """Execute the transaction test by sending it to the live network."""
    if execute_format == TransactionPost:
        benchmark_mode = self.operation_mode == OpMode.BENCHMARKING
        return TransactionPost(
            blocks=[[self.tx]],
            post={},
            benchmark_mode=benchmark_mode,
        )
    raise Exception(f"Unsupported execute format: {execute_format}")