Skip to content

Ethereum Test Tools Package

Module containing tools for generating cross-client Ethereum execution layer tests.

CalldataCase

Bases: Case

Small helper class to represent a single case whose condition depends on the value of the contract's calldata in a Switch case statement.

By default the calldata is read from position zero, but this can be overridden using position.

The condition is generated automatically based on the value (and optionally position) and may not be set directly.

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
class CalldataCase(Case):
    """
    Small helper class to represent a single case whose condition depends on
    the value of the contract's calldata in a Switch case statement.

    By default the calldata is read from position zero, but this can be
    overridden using `position`.

    The `condition` is generated automatically based on the `value` (and
    optionally `position`) and may not be set directly.
    """

    def __init__(
        self, value: int | str | Bytecode, position: int = 0, **kwargs: Any
    ) -> None:
        """Generate the condition base on `value` and `position`."""
        condition = Op.EQ(Op.CALLDATALOAD(position), value)
        super().__init__(condition=condition, **kwargs)

__init__(value, position=0, **kwargs)

Generate the condition base on value and position.

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
311
312
313
314
315
316
def __init__(
    self, value: int | str | Bytecode, position: int = 0, **kwargs: Any
) -> None:
    """Generate the condition base on `value` and `position`."""
    condition = Op.EQ(Op.CALLDATALOAD(position), value)
    super().__init__(condition=condition, **kwargs)

Case dataclass

Small helper class to represent a single, generic case in a Switch cases list.

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
@dataclass(kw_only=True, slots=True)
class Case:
    """
    Small helper class to represent a single, generic case in a `Switch` cases
    list.
    """

    condition: Bytecode | Op
    action: Bytecode | Op
    terminating: bool | None = None

    @property
    def is_terminating(self) -> bool:
        """Returns whether the case is terminating."""
        return (
            self.terminating
            if self.terminating is not None
            else self.action.terminating
        )

is_terminating property

Returns whether the case is terminating.

CodeGasMeasure

Bases: Bytecode

Helper class used to generate bytecode that measures gas usage of a bytecode, taking into account and subtracting any extra overhead gas costs required to execute. By default, the result gas calculation is saved to storage key 0.

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
class CodeGasMeasure(Bytecode):
    """
    Helper class used to generate bytecode that measures gas usage of a
    bytecode, taking into account and subtracting any extra overhead gas costs
    required to execute. By default, the result gas calculation is saved to
    storage key 0.
    """

    code: Bytecode
    """
    Bytecode to be executed to measure the gas usage.
    """
    overhead_cost: int
    """
    Extra gas cost to be subtracted from extra operations.
    """
    extra_stack_items: int
    """
    Extra stack items that remain at the end of the execution.
    To be considered when subtracting the value of the previous GAS operation,
    and to be popped at the end of the execution.
    """
    sstore_key: int | Bytes
    """
    Storage key to save the gas used.
    """

    def __new__(
        cls,
        *,
        code: Bytecode,
        overhead_cost: int = 0,
        extra_stack_items: int = 0,
        sstore_key: int | Bytes = 0,
        stop: bool = True,
    ) -> Self:
        """Assemble the bytecode that measures gas usage."""
        res = Op.GAS + code + Op.GAS
        # We need to swap and pop for each extra stack item that remained from
        # the execution of the code
        res += (Op.SWAP1 + Op.POP) * extra_stack_items
        res += (
            Op.SWAP1
            + Op.SUB
            + Op.PUSH1[overhead_cost]
            + Op.GAS
            + Op.GAS
            + Op.SWAP1
            + Op.SUB
            + Op.ADD
            + Op.SWAP1
            + Op.SSTORE(sstore_key, Op.SUB)
        )
        if stop:
            res += Op.STOP

        instance = super().__new__(cls, res)
        instance.code = code
        instance.overhead_cost = overhead_cost
        instance.extra_stack_items = extra_stack_items
        instance.sstore_key = sstore_key
        return instance

code instance-attribute

Bytecode to be executed to measure the gas usage.

overhead_cost instance-attribute

Extra gas cost to be subtracted from extra operations.

extra_stack_items instance-attribute

Extra stack items that remain at the end of the execution. To be considered when subtracting the value of the previous GAS operation, and to be popped at the end of the execution.

sstore_key instance-attribute

Storage key to save the gas used.

__new__(*, code, overhead_cost=0, extra_stack_items=0, sstore_key=0, stop=True)

Assemble the bytecode that measures gas usage.

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
def __new__(
    cls,
    *,
    code: Bytecode,
    overhead_cost: int = 0,
    extra_stack_items: int = 0,
    sstore_key: int | Bytes = 0,
    stop: bool = True,
) -> Self:
    """Assemble the bytecode that measures gas usage."""
    res = Op.GAS + code + Op.GAS
    # We need to swap and pop for each extra stack item that remained from
    # the execution of the code
    res += (Op.SWAP1 + Op.POP) * extra_stack_items
    res += (
        Op.SWAP1
        + Op.SUB
        + Op.PUSH1[overhead_cost]
        + Op.GAS
        + Op.GAS
        + Op.SWAP1
        + Op.SUB
        + Op.ADD
        + Op.SWAP1
        + Op.SSTORE(sstore_key, Op.SUB)
    )
    if stop:
        res += Op.STOP

    instance = super().__new__(cls, res)
    instance.code = code
    instance.overhead_cost = overhead_cost
    instance.extra_stack_items = extra_stack_items
    instance.sstore_key = sstore_key
    return instance

Conditional

Bases: Bytecode

Helper class used to generate conditional bytecode.

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
class Conditional(Bytecode):
    """Helper class used to generate conditional bytecode."""

    def __new__(
        cls,
        *,
        condition: Bytecode | Op,
        if_true: Bytecode | Op | None = None,
        if_false: Bytecode | Op | None = None,
    ) -> Self:
        """
        Assemble the conditional bytecode by generating the necessary jump and
        jumpdest opcodes surrounding the condition and the two possible
        execution paths.

        In the future, PC usage should be replaced by using RJUMP and RJUMPI
        """
        if if_true is None:
            if_true = Bytecode()
        if if_false is None:
            if_false = Bytecode()

        # First we append a jumpdest to the start of the true branch
        if_true = Op.JUMPDEST + if_true

        # Then we append the unconditional jump to the end of the false
        # branch, used to skip the true branch
        if_false += Op.JUMP(Op.ADD(Op.PC, len(if_true) + 3))

        # Then we need to do the conditional jump by skipping the false
        # branch
        condition = Op.JUMPI(Op.ADD(Op.PC, len(if_false) + 3), condition)

        # Finally we append the condition, false and true branches, plus
        # the jumpdest at the very end
        bytecode = condition + if_false + if_true + Op.JUMPDEST

        return super().__new__(cls, bytecode)

__new__(*, condition, if_true=None, if_false=None)

Assemble the conditional bytecode by generating the necessary jump and jumpdest opcodes surrounding the condition and the two possible execution paths.

In the future, PC usage should be replaced by using RJUMP and RJUMPI

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
def __new__(
    cls,
    *,
    condition: Bytecode | Op,
    if_true: Bytecode | Op | None = None,
    if_false: Bytecode | Op | None = None,
) -> Self:
    """
    Assemble the conditional bytecode by generating the necessary jump and
    jumpdest opcodes surrounding the condition and the two possible
    execution paths.

    In the future, PC usage should be replaced by using RJUMP and RJUMPI
    """
    if if_true is None:
        if_true = Bytecode()
    if if_false is None:
        if_false = Bytecode()

    # First we append a jumpdest to the start of the true branch
    if_true = Op.JUMPDEST + if_true

    # Then we append the unconditional jump to the end of the false
    # branch, used to skip the true branch
    if_false += Op.JUMP(Op.ADD(Op.PC, len(if_true) + 3))

    # Then we need to do the conditional jump by skipping the false
    # branch
    condition = Op.JUMPI(Op.ADD(Op.PC, len(if_false) + 3), condition)

    # Finally we append the condition, false and true branches, plus
    # the jumpdest at the very end
    bytecode = condition + if_false + if_true + Op.JUMPDEST

    return super().__new__(cls, bytecode)

Create2PreimageLayout

Bases: Bytecode

Set up the preimage in memory for CREATE2 address computation.

Creates the standard memory layout required to compute a CREATE2 address using keccak256(0xFF ++ factory_address ++ salt ++ init_code_hash).

Memory layout after execution: - MEM[offset + 0: offset + 32] = zero padding + factory_address (20 bytes) - MEM[offset + 11] = 0xFF prefix byte - MEM[offset + 32: offset + 64] = salt (32 bytes) - MEM[offset + 64: offset + 96] = init_code_hash (32 bytes)

To compute the CREATE2 address, use: .address_op or Op.SHA3(offset + 11, 85). The resulting hash's lower 20 bytes (bytes 12-31) form the address.

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
class Create2PreimageLayout(Bytecode):
    """
    Set up the preimage in memory for CREATE2 address computation.

    Creates the standard memory layout required to compute a CREATE2 address
    using keccak256(0xFF ++ factory_address ++ salt ++ init_code_hash).

    Memory layout after execution:
    - MEM[offset + 0: offset + 32] = zero padding + factory_address (20 bytes)
    - MEM[offset + 11] = 0xFF prefix byte
    - MEM[offset + 32: offset + 64] = salt (32 bytes)
    - MEM[offset + 64: offset + 96] = init_code_hash (32 bytes)

    To compute the CREATE2 address, use: `.address_op` or
    `Op.SHA3(offset + 11, 85)`.
    The resulting hash's lower 20 bytes (bytes 12-31) form the address.
    """

    offset: int = 0

    def __new__(
        cls,
        *,
        factory_address: int | bytes | Bytecode,
        salt: int | bytes | Bytecode,
        init_code_hash: int | bytes | Bytecode,
        offset: int = 0,
        old_memory_size: int = 0,
    ) -> Self:
        """
        Assemble the bytecode that sets up the memory layout for CREATE2
        address computation.
        """
        required_size = offset + 96
        new_memory_size = max(old_memory_size, required_size)
        bytecode = (
            Op.MSTORE(offset=offset, value=factory_address)
            + Op.MSTORE8(offset=offset + 11, value=0xFF)
            + Op.MSTORE(offset=offset + 32, value=salt)
            + Op.MSTORE(
                offset=offset + 64,
                value=init_code_hash,
                # Gas accounting
                old_memory_size=old_memory_size,
                new_memory_size=new_memory_size,
            )
        )
        instance = super().__new__(cls, bytecode)
        instance.offset = offset
        return instance

    @property
    def salt_offset(self) -> int:
        """
        Return the salt memory offset of the preimage.
        """
        return self.offset + 32

    def address_op(self) -> Bytecode:
        """
        Return the bytecode that computes the CREATE2 address.
        """
        return Op.SHA3(
            offset=self.offset + 11,
            size=85,
            # Gas accounting
            data_size=85,
        )

    def increment_salt_op(self, increment: int = 1) -> Bytecode:
        """Return the bytecode that increments the current salt."""
        return Op.MSTORE(
            self.salt_offset,
            Op.ADD(Op.MLOAD(self.salt_offset), increment),
        )

__new__(*, factory_address, salt, init_code_hash, offset=0, old_memory_size=0)

Assemble the bytecode that sets up the memory layout for CREATE2 address computation.

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
def __new__(
    cls,
    *,
    factory_address: int | bytes | Bytecode,
    salt: int | bytes | Bytecode,
    init_code_hash: int | bytes | Bytecode,
    offset: int = 0,
    old_memory_size: int = 0,
) -> Self:
    """
    Assemble the bytecode that sets up the memory layout for CREATE2
    address computation.
    """
    required_size = offset + 96
    new_memory_size = max(old_memory_size, required_size)
    bytecode = (
        Op.MSTORE(offset=offset, value=factory_address)
        + Op.MSTORE8(offset=offset + 11, value=0xFF)
        + Op.MSTORE(offset=offset + 32, value=salt)
        + Op.MSTORE(
            offset=offset + 64,
            value=init_code_hash,
            # Gas accounting
            old_memory_size=old_memory_size,
            new_memory_size=new_memory_size,
        )
    )
    instance = super().__new__(cls, bytecode)
    instance.offset = offset
    return instance

salt_offset property

Return the salt memory offset of the preimage.

address_op()

Return the bytecode that computes the CREATE2 address.

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
473
474
475
476
477
478
479
480
481
482
def address_op(self) -> Bytecode:
    """
    Return the bytecode that computes the CREATE2 address.
    """
    return Op.SHA3(
        offset=self.offset + 11,
        size=85,
        # Gas accounting
        data_size=85,
    )

increment_salt_op(increment=1)

Return the bytecode that increments the current salt.

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
484
485
486
487
488
489
def increment_salt_op(self, increment: int = 1) -> Bytecode:
    """Return the bytecode that increments the current salt."""
    return Op.MSTORE(
        self.salt_offset,
        Op.ADD(Op.MLOAD(self.salt_offset), increment),
    )

FixedIterationsBytecode

Bases: IteratingBytecode

Bytecode that contains a setup phase, an iterating phase, and a cleanup phase, with a fixed number of iterations.

This type can be used in place of a normal Bytecode and will return the appropriate gas cost for the given number of iterations.

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
class FixedIterationsBytecode(IteratingBytecode):
    """
    Bytecode that contains a setup phase, an iterating phase, and a cleanup
    phase, with a fixed number of iterations.

    This type can be used in place of a normal Bytecode and will return the
    appropriate gas cost for the given number of iterations.
    """

    iteration_count: int
    """The fixed number of times the iterating bytecode will be executed."""

    def __new__(
        cls,
        *,
        setup: Bytecode,
        iterating: Bytecode,
        cleanup: Bytecode,
        iteration_count: int,
        warm_iterating: Bytecode | None = None,
        iterating_subcall: Bytecode | int | None = None,
    ) -> Self:
        """
        Create a new FixedIterationsBytecode instance.

        Args:
            setup: Bytecode executed once at the beginning before
                iterations start.
            iterating: Bytecode executed in the first iteration.
            cleanup: Bytecode executed once at the end after all
                iterations complete.
            iteration_count: The fixed number of times the iterating
                bytecode will be executed.
            warm_iterating: Bytecode executed in subsequent iterations
                after the first. If None, uses the same bytecode as
                iterating.
            iterating_subcall: Analytical bytecode representing a subcall
                performed during each iteration. This bytecode is _not_
                included in the final bytecode, and it's only used for gas
                calculation. The value can also be an integer, in which case it
                represents the gas cost of the subcall (e.g. the subcall is a
                precompiled contract).

        Returns:
            A new FixedIterationsBytecode instance.

        """
        instance = super(FixedIterationsBytecode, cls).__new__(
            cls,
            setup=setup,
            iterating=iterating,
            cleanup=cleanup,
            warm_iterating=warm_iterating,
            iterating_subcall=iterating_subcall,
        )
        instance.iteration_count = iteration_count
        return instance

    def gas_cost(
        self,
        fork: Type[ForkOpcodeInterface],
        *,
        block_number: int = 0,
        timestamp: int = 0,
    ) -> int:
        """Return the cost of iterating through the bytecode N times."""
        del block_number, timestamp
        return self.gas_cost_by_iteration_count(
            fork=fork,
            iteration_count=self.iteration_count,
        )

iteration_count instance-attribute

The fixed number of times the iterating bytecode will be executed.

__new__(*, setup, iterating, cleanup, iteration_count, warm_iterating=None, iterating_subcall=None)

Create a new FixedIterationsBytecode instance.

Parameters:

Name Type Description Default
setup Bytecode

Bytecode executed once at the beginning before iterations start.

required
iterating Bytecode

Bytecode executed in the first iteration.

required
cleanup Bytecode

Bytecode executed once at the end after all iterations complete.

required
iteration_count int

The fixed number of times the iterating bytecode will be executed.

required
warm_iterating Bytecode | None

Bytecode executed in subsequent iterations after the first. If None, uses the same bytecode as iterating.

None
iterating_subcall Bytecode | int | None

Analytical bytecode representing a subcall performed during each iteration. This bytecode is not included in the final bytecode, and it's only used for gas calculation. The value can also be an integer, in which case it represents the gas cost of the subcall (e.g. the subcall is a precompiled contract).

None

Returns:

Type Description
Self

A new FixedIterationsBytecode instance.

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
def __new__(
    cls,
    *,
    setup: Bytecode,
    iterating: Bytecode,
    cleanup: Bytecode,
    iteration_count: int,
    warm_iterating: Bytecode | None = None,
    iterating_subcall: Bytecode | int | None = None,
) -> Self:
    """
    Create a new FixedIterationsBytecode instance.

    Args:
        setup: Bytecode executed once at the beginning before
            iterations start.
        iterating: Bytecode executed in the first iteration.
        cleanup: Bytecode executed once at the end after all
            iterations complete.
        iteration_count: The fixed number of times the iterating
            bytecode will be executed.
        warm_iterating: Bytecode executed in subsequent iterations
            after the first. If None, uses the same bytecode as
            iterating.
        iterating_subcall: Analytical bytecode representing a subcall
            performed during each iteration. This bytecode is _not_
            included in the final bytecode, and it's only used for gas
            calculation. The value can also be an integer, in which case it
            represents the gas cost of the subcall (e.g. the subcall is a
            precompiled contract).

    Returns:
        A new FixedIterationsBytecode instance.

    """
    instance = super(FixedIterationsBytecode, cls).__new__(
        cls,
        setup=setup,
        iterating=iterating,
        cleanup=cleanup,
        warm_iterating=warm_iterating,
        iterating_subcall=iterating_subcall,
    )
    instance.iteration_count = iteration_count
    return instance

gas_cost(fork, *, block_number=0, timestamp=0)

Return the cost of iterating through the bytecode N times.

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
def gas_cost(
    self,
    fork: Type[ForkOpcodeInterface],
    *,
    block_number: int = 0,
    timestamp: int = 0,
) -> int:
    """Return the cost of iterating through the bytecode N times."""
    del block_number, timestamp
    return self.gas_cost_by_iteration_count(
        fork=fork,
        iteration_count=self.iteration_count,
    )

Initcode

Bases: Bytecode

Helper class used to generate initcode for the specified deployment code.

The execution gas cost of the initcode is calculated, and also the deployment gas costs for the deployed code.

The initcode can be padded to a certain length if necessary, which does not affect the deployed code.

Other costs such as the CREATE2 hashing costs or the initcode_word_cost of EIP-3860 are not taken into account by any of these calculated costs.

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
class Initcode(Bytecode):
    """
    Helper class used to generate initcode for the specified deployment code.

    The execution gas cost of the initcode is calculated, and also the
    deployment gas costs for the deployed code.

    The initcode can be padded to a certain length if necessary, which does not
    affect the deployed code.

    Other costs such as the CREATE2 hashing costs or the initcode_word_cost of
    EIP-3860 are *not* taken into account by any of these calculated costs.
    """

    deploy_code: Bytes | Bytecode
    """
    Bytecode to be deployed by the initcode.
    """

    def __new__(
        cls,
        *,
        deploy_code: Bytecode | SupportsBytes | None = None,
        initcode_length: int | None = None,
        initcode_prefix: Bytecode | None = None,
        padding_byte: int = 0x00,
        name: str = "",
    ) -> Self:
        """
        Generate an initcode that returns a contract with the specified code.
        The initcode can be padded to a specified length for testing purposes.
        """
        if deploy_code is None:
            deploy_code = Bytecode()
        elif not isinstance(deploy_code, Bytecode):
            deploy_code = Bytes(deploy_code)
        if initcode_prefix is None:
            initcode_prefix = Bytecode()

        initcode = initcode_prefix
        code_length = len(deploy_code)

        # PUSH2: length=<bytecode length>
        initcode += Op.PUSH2(code_length)

        # PUSH1: offset=0
        initcode += Op.PUSH1(0)

        # DUP2
        initcode += Op.DUP2

        # PUSH1: initcode_length=11 + len(initcode_prefix_bytes) (constant)
        no_prefix_length = 0x0B
        assert no_prefix_length + len(initcode_prefix) <= 0xFF, (
            "initcode prefix too long"
        )
        initcode += Op.PUSH1(no_prefix_length + len(initcode_prefix))

        # DUP3
        initcode += Op.DUP3

        # CODECOPY: destinationOffset=0, offset=0, length
        initcode += Op.CODECOPY(
            data_size=code_length, new_memory_size=code_length
        )

        # RETURN: offset=0, length
        initcode += Op.RETURN(code_deposit_size=len(deploy_code))

        initcode_plus_deploy_code = bytes(initcode) + bytes(deploy_code)
        padding_bytes = bytes()

        if initcode_length is not None:
            assert initcode_length >= len(initcode_plus_deploy_code), (
                "specified invalid length for initcode"
            )

            padding_bytes = bytes(
                [padding_byte]
                * (initcode_length - len(initcode_plus_deploy_code))
            )

        initcode_bytes = initcode_plus_deploy_code + padding_bytes
        instance = super().__new__(
            cls,
            initcode_bytes,
            popped_stack_items=initcode.popped_stack_items,
            pushed_stack_items=initcode.pushed_stack_items,
            max_stack_height=initcode.max_stack_height,
            min_stack_height=initcode.min_stack_height,
            name=name,
            opcode_list=initcode.opcode_list,
        )
        instance.deploy_code = deploy_code

        return instance

    def execution_gas(
        self,
        fork: Type[ForkOpcodeInterface],
        *,
        block_number: int = 0,
        timestamp: int = 0,
    ) -> int:
        """
        Gas cost of executing the initcode, charged before the code
        deposit fee.
        """
        return self.gas_cost(
            fork,
            block_number=block_number,
            timestamp=timestamp,
        ) - self.deployment_gas(
            fork,
            block_number=block_number,
            timestamp=timestamp,
        )

    def deployment_gas(
        self,
        fork: Type[ForkOpcodeInterface],
        *,
        block_number: int = 0,
        timestamp: int = 0,
    ) -> int:
        """
        Gas cost of deploying the contract.
        """
        return Op.RETURN(code_deposit_size=len(self.deploy_code)).gas_cost(
            fork, block_number=block_number, timestamp=timestamp
        )

deploy_code instance-attribute

Bytecode to be deployed by the initcode.

__new__(*, deploy_code=None, initcode_length=None, initcode_prefix=None, padding_byte=0, name='')

Generate an initcode that returns a contract with the specified code. The initcode can be padded to a specified length for testing purposes.

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
def __new__(
    cls,
    *,
    deploy_code: Bytecode | SupportsBytes | None = None,
    initcode_length: int | None = None,
    initcode_prefix: Bytecode | None = None,
    padding_byte: int = 0x00,
    name: str = "",
) -> Self:
    """
    Generate an initcode that returns a contract with the specified code.
    The initcode can be padded to a specified length for testing purposes.
    """
    if deploy_code is None:
        deploy_code = Bytecode()
    elif not isinstance(deploy_code, Bytecode):
        deploy_code = Bytes(deploy_code)
    if initcode_prefix is None:
        initcode_prefix = Bytecode()

    initcode = initcode_prefix
    code_length = len(deploy_code)

    # PUSH2: length=<bytecode length>
    initcode += Op.PUSH2(code_length)

    # PUSH1: offset=0
    initcode += Op.PUSH1(0)

    # DUP2
    initcode += Op.DUP2

    # PUSH1: initcode_length=11 + len(initcode_prefix_bytes) (constant)
    no_prefix_length = 0x0B
    assert no_prefix_length + len(initcode_prefix) <= 0xFF, (
        "initcode prefix too long"
    )
    initcode += Op.PUSH1(no_prefix_length + len(initcode_prefix))

    # DUP3
    initcode += Op.DUP3

    # CODECOPY: destinationOffset=0, offset=0, length
    initcode += Op.CODECOPY(
        data_size=code_length, new_memory_size=code_length
    )

    # RETURN: offset=0, length
    initcode += Op.RETURN(code_deposit_size=len(deploy_code))

    initcode_plus_deploy_code = bytes(initcode) + bytes(deploy_code)
    padding_bytes = bytes()

    if initcode_length is not None:
        assert initcode_length >= len(initcode_plus_deploy_code), (
            "specified invalid length for initcode"
        )

        padding_bytes = bytes(
            [padding_byte]
            * (initcode_length - len(initcode_plus_deploy_code))
        )

    initcode_bytes = initcode_plus_deploy_code + padding_bytes
    instance = super().__new__(
        cls,
        initcode_bytes,
        popped_stack_items=initcode.popped_stack_items,
        pushed_stack_items=initcode.pushed_stack_items,
        max_stack_height=initcode.max_stack_height,
        min_stack_height=initcode.min_stack_height,
        name=name,
        opcode_list=initcode.opcode_list,
    )
    instance.deploy_code = deploy_code

    return instance

execution_gas(fork, *, block_number=0, timestamp=0)

Gas cost of executing the initcode, charged before the code deposit fee.

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
def execution_gas(
    self,
    fork: Type[ForkOpcodeInterface],
    *,
    block_number: int = 0,
    timestamp: int = 0,
) -> int:
    """
    Gas cost of executing the initcode, charged before the code
    deposit fee.
    """
    return self.gas_cost(
        fork,
        block_number=block_number,
        timestamp=timestamp,
    ) - self.deployment_gas(
        fork,
        block_number=block_number,
        timestamp=timestamp,
    )

deployment_gas(fork, *, block_number=0, timestamp=0)

Gas cost of deploying the contract.

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
132
133
134
135
136
137
138
139
140
141
142
143
144
def deployment_gas(
    self,
    fork: Type[ForkOpcodeInterface],
    *,
    block_number: int = 0,
    timestamp: int = 0,
) -> int:
    """
    Gas cost of deploying the contract.
    """
    return Op.RETURN(code_deposit_size=len(self.deploy_code)).gas_cost(
        fork, block_number=block_number, timestamp=timestamp
    )

IteratingBytecode

Bases: Bytecode

Bytecode composed of distinct execution phases: setup, iteration, and cleanup.

Some phases (warm_iterating and iterating_subcall) are analytical only and exist solely to model gas costs; they are not emitted in the final bytecode.

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
class IteratingBytecode(Bytecode):
    """
    Bytecode composed of distinct execution phases: setup, iteration, and
    cleanup.

    Some phases (warm_iterating and iterating_subcall) are analytical only and
    exist solely to model gas costs; they are not emitted in the final
    bytecode.
    """

    setup: Bytecode
    """Bytecode executed once at the beginning before iterations start."""
    iterating: Bytecode
    """Bytecode executed in the first iteration."""
    warm_iterating: Bytecode
    """
    Analytical bytecode representing subsequent iterations after the first
    (warm state).
    This bytecode is _not_ included in the final bytecode, and it's only
    used for the gas accounting properties of its opcodes and therefore gas
    calculation.
    """
    iterating_subcall: Bytecode | int
    """
    Analytical bytecode representing a subcall performed during each iteration.
    This bytecode is _not_ included in the final bytecode, and it's only
    used for gas calculation.

    The value can also be an integer, in which case it represents the gas cost
    of the subcall (e.g. the subcall is a precompiled contract)
    """
    cleanup: Bytecode
    """Bytecode executed once at the end after all iterations complete."""

    def __new__(
        cls,
        *,
        setup: Bytecode | None = None,
        iterating: Bytecode,
        cleanup: Bytecode | None = None,
        warm_iterating: Bytecode | None = None,
        iterating_subcall: Bytecode | int | None = None,
    ) -> Self:
        """
        Create a new iterating bytecode.

        Args:
            setup: Bytecode executed once at the beginning before
                iterations start.
            iterating: Bytecode executed in the first iteration.
            cleanup: Bytecode executed once at the end after all
                iterations complete.
            warm_iterating: Analytical bytecode representing subsequent
                iterations after the first (warm state).
            iterating_subcall: Analytical bytecode representing a subcall
                performed during each iteration. This bytecode is _not_
                included in the final bytecode, and it's only used for gas
                calculation. The value can also be an integer, in which case it
                represents the gas cost of the subcall (e.g. the subcall is a
                precompiled contract).

        Returns:
            A new IteratingBytecode instance.

        """
        instance = super(IteratingBytecode, cls).__new__(
            cls,
            setup + iterating + cleanup,
        )
        if setup is None:
            setup = Bytecode()
        instance.setup = setup
        instance.iterating = iterating
        if warm_iterating is None:
            instance.warm_iterating = iterating
        else:
            assert bytes(iterating) == bytes(warm_iterating), (
                "iterating and warm_iterating must have the same bytecode"
            )
            instance.warm_iterating = warm_iterating
        if iterating_subcall is None:
            instance.iterating_subcall = Bytecode()
        else:
            instance.iterating_subcall = iterating_subcall
        if cleanup is None:
            cleanup = Bytecode()
        instance.cleanup = cleanup
        return instance

    def iterating_subcall_gas_cost(
        self, *, fork: Type[ForkOpcodeInterface]
    ) -> int:
        """Return the gas cost of the iterating subcall."""
        if isinstance(self.iterating_subcall, int):
            return self.iterating_subcall
        return self.iterating_subcall.gas_cost(fork=fork)

    def iterating_subcall_reserve(
        self, *, fork: Type[ForkOpcodeInterface]
    ) -> int:
        """
        Return the gas reserve needed so that the last iterating subcall does
        not fail due to the 63/64 rule.
        """
        iterating_subcall_gas_cost = self.iterating_subcall_gas_cost(fork=fork)
        return (
            iterating_subcall_gas_cost * 64 // 63
        ) - iterating_subcall_gas_cost

    def gas_cost_by_iteration_count(
        self, *, fork: Type[ForkOpcodeInterface], iteration_count: int
    ) -> int:
        """Return the cost of iterating through the bytecode N times."""
        loop_gas_cost = 0
        if iteration_count > 0:
            # Cold cost is just charged for the first iteration
            loop_gas_cost = self.iterating.gas_cost(fork=fork)
            # Warm cost is charged for all iterations except the first
            loop_gas_cost += self.warm_iterating.gas_cost(fork=fork) * (
                iteration_count - 1
            )
            # Subcall cost is charged for all iterations.
            loop_gas_cost += (
                self.iterating_subcall_gas_cost(fork=fork) * iteration_count
            )
        return (
            self.setup.gas_cost(fork=fork)
            + loop_gas_cost
            + self.cleanup.gas_cost(fork=fork)
        )

    def with_fixed_iteration_count(
        self, *, iteration_count: int
    ) -> "FixedIterationsBytecode":
        """
        Return a new FixedIterationsBytecode with the iteration count fixed.
        """
        return FixedIterationsBytecode(
            setup=self.setup,
            iterating=self.iterating,
            cleanup=self.cleanup,
            warm_iterating=self.warm_iterating,
            iterating_subcall=self.iterating_subcall,
            iteration_count=iteration_count,
        )

    # Methods to calculate transactions that call a contract containing the
    # iterating bytecode.

    def tx_gas_cost_by_iteration_count(
        self,
        *,
        fork: Fork,
        iteration_count: int,
        start_iteration: int = 0,
        **intrinsic_cost_kwargs: Any,
    ) -> int:
        """
        Calculate the exact gas cost of a transaction calling the bytecode
        for a given number of iterations.

        The method accepts intrinsic gas cost kwargs to allow for the
        calculation of the intrinsic gas cost of the transaction.

        If any of the intrinsic gas cost kwarg is callable, it will be called
        with iteration_count and start_iteration as keyword arguments.
        """
        intrinsic_gas_cost_calc = fork.transaction_intrinsic_cost_calculator()
        if "data" in intrinsic_cost_kwargs:
            intrinsic_cost_kwargs["calldata"] = intrinsic_cost_kwargs.pop(
                "data"
            )
        if "authorization_list" in intrinsic_cost_kwargs:
            intrinsic_cost_kwargs["authorization_list_or_count"] = len(
                intrinsic_cost_kwargs.pop("authorization_list")
            )
        if "return_cost_deducted_prior_execution" not in intrinsic_cost_kwargs:
            intrinsic_cost_kwargs["return_cost_deducted_prior_execution"] = (
                True
            )
        for key, value in intrinsic_cost_kwargs.items():
            if callable(value):
                intrinsic_cost_kwargs[key] = value(
                    iteration_count=iteration_count,
                    start_iteration=start_iteration,
                )
        return self.gas_cost_by_iteration_count(
            fork=fork, iteration_count=iteration_count
        ) + intrinsic_gas_cost_calc(**intrinsic_cost_kwargs)

    def tx_gas_limit_by_iteration_count(
        self,
        *,
        fork: Fork,
        iteration_count: int,
        start_iteration: int = 0,
        **intrinsic_cost_kwargs: Any,
    ) -> int:
        """
        Calculate the minimum gas limit of a transaction calling the bytecode
        for a given number of iterations.

        The gas limit is calculated by adding the required extra gas for the
        last iteration due to the 63/64 rule.
        """
        return self.tx_gas_cost_by_iteration_count(
            fork=fork,
            iteration_count=iteration_count,
            start_iteration=start_iteration,
            **intrinsic_cost_kwargs,
        ) + self.iterating_subcall_reserve(fork=fork)

    def _binary_search_iterations(
        self,
        *,
        fork: Fork,
        gas_limit: int,
        start_iteration: int,
        **intrinsic_cost_kwargs: Any,
    ) -> Tuple[int, int]:
        """
        Binary search for the maximum iterations that fit within a gas limit.
        """
        single_iteration_gas = self.tx_gas_limit_by_iteration_count(
            fork=fork,
            iteration_count=1,
            start_iteration=start_iteration,
            **intrinsic_cost_kwargs,
        )
        if single_iteration_gas > gas_limit:
            raise ValueError(
                "Single iteration gas cost is greater than gas limit."
            )
        low = 1
        high = 2

        # Exponential search to find upper bound
        high_gas_cost = self.tx_gas_limit_by_iteration_count(
            fork=fork,
            iteration_count=high,
            start_iteration=start_iteration,
            **intrinsic_cost_kwargs,
        )
        while high_gas_cost < gas_limit:
            low = high
            high *= 2
            high_gas_cost = self.tx_gas_limit_by_iteration_count(
                fork=fork,
                iteration_count=high,
                start_iteration=start_iteration,
                **intrinsic_cost_kwargs,
            )

        # Binary search for exact fit
        best_iterations = 0
        while low < high:
            mid = (low + high) // 2

            if (
                self.tx_gas_limit_by_iteration_count(
                    fork=fork,
                    iteration_count=mid,
                    start_iteration=start_iteration,
                    **intrinsic_cost_kwargs,
                )
                > gas_limit
            ):
                high = mid
            else:
                low = mid + 1

        best_iterations = low - 1
        best_iterations_gas = self.tx_gas_limit_by_iteration_count(
            fork=fork,
            iteration_count=best_iterations,
            start_iteration=start_iteration,
            **intrinsic_cost_kwargs,
        )
        return best_iterations, best_iterations_gas

    def tx_iterations_by_gas_limit(
        self,
        *,
        fork: Fork,
        gas_limit: int,
        start_iteration: int = 0,
        **intrinsic_cost_kwargs: Any,
    ) -> Generator[int, None, None]:
        """
        Calculate the number of iterations needed to reach the given a
        gas-to-be-used value.

        Each element of the returned list represents the number of iterations
        for a single transaction.

        If the fork's transaction gas limit cap is not `None`, the returned
        list will contain one item per transaction that represents the
        iteration count for that transaction, and no transaction will exceed
        the gas limit cap.
        """
        gas_limit_cap = fork.transaction_gas_limit_cap()
        remaining_gas = gas_limit

        while remaining_gas >= self.tx_gas_limit_by_iteration_count(
            fork=fork,
            iteration_count=1,
            start_iteration=start_iteration,
            **intrinsic_cost_kwargs,
        ):
            # Binary search for the maximum number of iterations that fits
            # within remaining_gas
            max_gas_limit = (
                min(remaining_gas, gas_limit_cap)
                if gas_limit_cap is not None
                else remaining_gas
            )
            best_iterations, best_iterations_gas = (
                self._binary_search_iterations(
                    fork=fork,
                    gas_limit=max_gas_limit,
                    start_iteration=start_iteration,
                    **intrinsic_cost_kwargs,
                )
            )
            yield best_iterations
            remaining_gas -= best_iterations_gas
            start_iteration += best_iterations

    def _intrinsic_cost_is_constant(
        self,
        intrinsic_cost_kwargs: Dict[str, Any],
    ) -> bool:
        """If none of the kwarg values is callable, return True."""
        for _, value in intrinsic_cost_kwargs.items():
            if callable(value):
                return False
        return True

    def tx_iterations_by_total_iteration_count(
        self,
        *,
        fork: Fork,
        total_iterations: int,
        start_iteration: int = 0,
        **intrinsic_cost_kwargs: Any,
    ) -> Generator[int, None, None]:
        """
        Calculate how to split a total number of iterations across multiple
        transactions so that each transaction fits within the gas limit cap.

        Returns a list where each element represents the number of iterations
        for that transaction, and the sum equals total_iterations.
        """
        gas_limit_cap = fork.transaction_gas_limit_cap()
        if gas_limit_cap is None:
            # No limit, all iterations fit in a single transaction.
            yield total_iterations
            return
        remaining_iterations = total_iterations
        best_iterations: int | None = None
        constant_intrinsic_gas_cost = self._intrinsic_cost_is_constant(
            intrinsic_cost_kwargs
        )

        while remaining_iterations > 0:
            if best_iterations is None or not constant_intrinsic_gas_cost:
                best_iterations, _ = self._binary_search_iterations(
                    fork=fork,
                    gas_limit=gas_limit_cap,
                    start_iteration=start_iteration,
                    **intrinsic_cost_kwargs,
                )
            if best_iterations >= remaining_iterations:
                yield remaining_iterations
                return
            else:
                yield best_iterations
                remaining_iterations -= best_iterations
                start_iteration += best_iterations

    # Transaction generators that call the iterating bytecode with given
    # limits.

    def transactions_by_gas_limit(
        self,
        *,
        fork: Fork,
        gas_limit: int,
        start_iteration: int = 0,
        sender: EOA,
        to: Address | None,
        tx_gas_limit_delta: int = 0,
        **tx_kwargs: Any,
    ) -> Generator[TransactionWithCost, None, None]:
        """
        Generate a list of transactions calling the bytecode with a given gas
        limit.

        The method accepts all keyword arguments that can be passed to the
        `Transaction` constructor.

        If any of the keyword arguments is callable, it will be called with
        iteration_count and start_iteration as keyword arguments.
        E.g. when the calldata that needs to be passed to the iterating
        bytecode changes with each iteration, the calldata can be generated
        dynamically by passing a callable to the calldata keyword argument.

        The returned object also contains an extra field with the expected
        gas cost of the transaction by the end of execution.
        """
        intrinsic_cost_kwargs = tx_kwargs.copy()

        if "calldata" in tx_kwargs:
            tx_kwargs["data"] = tx_kwargs.pop("calldata")
        if "return_cost_deducted_prior_execution" in tx_kwargs:
            tx_kwargs.pop("return_cost_deducted_prior_execution")
        for iteration_count in self.tx_iterations_by_gas_limit(
            fork=fork,
            gas_limit=gas_limit,
            start_iteration=start_iteration,
            **intrinsic_cost_kwargs,
        ):
            tx_gas_limit = self.tx_gas_limit_by_iteration_count(
                fork=fork,
                iteration_count=iteration_count,
                start_iteration=start_iteration,
                **intrinsic_cost_kwargs,
            )
            tx_gas_cost = self.tx_gas_cost_by_iteration_count(
                fork=fork,
                iteration_count=iteration_count,
                start_iteration=start_iteration,
                **intrinsic_cost_kwargs,
            )
            current_tx_kwargs = tx_kwargs.copy()

            for key, value in current_tx_kwargs.items():
                if callable(value):
                    current_tx_kwargs[key] = value(
                        iteration_count=iteration_count,
                        start_iteration=start_iteration,
                    )
            yield TransactionWithCost(
                to=to,
                gas_limit=tx_gas_limit + tx_gas_limit_delta,
                sender=sender,
                gas_cost=tx_gas_cost,
                **current_tx_kwargs,
            )
            start_iteration += iteration_count

    def transactions_by_total_iteration_count(
        self,
        *,
        fork: Fork,
        total_iterations: int,
        start_iteration: int = 0,
        sender: EOA,
        to: Address | None,
        tx_gas_limit_delta: int = 0,
        **tx_kwargs: Any,
    ) -> Generator[TransactionWithCost, None, None]:
        """
        Generate a list of transactions calling the bytecode with a given
        total iteration count.

        The method accepts all keyword arguments that can be passed to the
        `Transaction` constructor.

        If any of the keyword arguments is callable, it will be called with
        iteration_count and start_iteration as keyword arguments.
        E.g. when the calldata that needs to be passed to the iterating
        bytecode changes with each iteration, the calldata can be generated
        dynamically by passing a callable to the calldata keyword argument.

        The returned object also contains an extra field with the expected
        gas cost of the transaction by the end of execution.
        """
        intrinsic_cost_kwargs = tx_kwargs.copy()

        if "calldata" in tx_kwargs:
            tx_kwargs["data"] = tx_kwargs.pop("calldata")
        if "return_cost_deducted_prior_execution" in tx_kwargs:
            tx_kwargs.pop("return_cost_deducted_prior_execution")
        for iteration_count in self.tx_iterations_by_total_iteration_count(
            fork=fork,
            total_iterations=total_iterations,
            start_iteration=start_iteration,
            **intrinsic_cost_kwargs,
        ):
            tx_gas_limit = self.tx_gas_limit_by_iteration_count(
                fork=fork,
                iteration_count=iteration_count,
                start_iteration=start_iteration,
                **intrinsic_cost_kwargs,
            )
            tx_gas_cost = self.tx_gas_cost_by_iteration_count(
                fork=fork,
                iteration_count=iteration_count,
                start_iteration=start_iteration,
                **intrinsic_cost_kwargs,
            )
            current_tx_kwargs = tx_kwargs.copy()

            for key, value in current_tx_kwargs.items():
                if callable(value):
                    current_tx_kwargs[key] = value(
                        iteration_count=iteration_count,
                        start_iteration=start_iteration,
                    )
            yield TransactionWithCost(
                to=to,
                gas_limit=tx_gas_limit + tx_gas_limit_delta,
                sender=sender,
                gas_cost=tx_gas_cost,
                **current_tx_kwargs,
            )
            start_iteration += iteration_count

setup instance-attribute

Bytecode executed once at the beginning before iterations start.

iterating instance-attribute

Bytecode executed in the first iteration.

warm_iterating instance-attribute

Analytical bytecode representing subsequent iterations after the first (warm state). This bytecode is not included in the final bytecode, and it's only used for the gas accounting properties of its opcodes and therefore gas calculation.

iterating_subcall instance-attribute

Analytical bytecode representing a subcall performed during each iteration. This bytecode is not included in the final bytecode, and it's only used for gas calculation.

The value can also be an integer, in which case it represents the gas cost of the subcall (e.g. the subcall is a precompiled contract)

cleanup instance-attribute

Bytecode executed once at the end after all iterations complete.

__new__(*, setup=None, iterating, cleanup=None, warm_iterating=None, iterating_subcall=None)

Create a new iterating bytecode.

Parameters:

Name Type Description Default
setup Bytecode | None

Bytecode executed once at the beginning before iterations start.

None
iterating Bytecode

Bytecode executed in the first iteration.

required
cleanup Bytecode | None

Bytecode executed once at the end after all iterations complete.

None
warm_iterating Bytecode | None

Analytical bytecode representing subsequent iterations after the first (warm state).

None
iterating_subcall Bytecode | int | None

Analytical bytecode representing a subcall performed during each iteration. This bytecode is not included in the final bytecode, and it's only used for gas calculation. The value can also be an integer, in which case it represents the gas cost of the subcall (e.g. the subcall is a precompiled contract).

None

Returns:

Type Description
Self

A new IteratingBytecode instance.

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
def __new__(
    cls,
    *,
    setup: Bytecode | None = None,
    iterating: Bytecode,
    cleanup: Bytecode | None = None,
    warm_iterating: Bytecode | None = None,
    iterating_subcall: Bytecode | int | None = None,
) -> Self:
    """
    Create a new iterating bytecode.

    Args:
        setup: Bytecode executed once at the beginning before
            iterations start.
        iterating: Bytecode executed in the first iteration.
        cleanup: Bytecode executed once at the end after all
            iterations complete.
        warm_iterating: Analytical bytecode representing subsequent
            iterations after the first (warm state).
        iterating_subcall: Analytical bytecode representing a subcall
            performed during each iteration. This bytecode is _not_
            included in the final bytecode, and it's only used for gas
            calculation. The value can also be an integer, in which case it
            represents the gas cost of the subcall (e.g. the subcall is a
            precompiled contract).

    Returns:
        A new IteratingBytecode instance.

    """
    instance = super(IteratingBytecode, cls).__new__(
        cls,
        setup + iterating + cleanup,
    )
    if setup is None:
        setup = Bytecode()
    instance.setup = setup
    instance.iterating = iterating
    if warm_iterating is None:
        instance.warm_iterating = iterating
    else:
        assert bytes(iterating) == bytes(warm_iterating), (
            "iterating and warm_iterating must have the same bytecode"
        )
        instance.warm_iterating = warm_iterating
    if iterating_subcall is None:
        instance.iterating_subcall = Bytecode()
    else:
        instance.iterating_subcall = iterating_subcall
    if cleanup is None:
        cleanup = Bytecode()
    instance.cleanup = cleanup
    return instance

iterating_subcall_gas_cost(*, fork)

Return the gas cost of the iterating subcall.

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
587
588
589
590
591
592
593
def iterating_subcall_gas_cost(
    self, *, fork: Type[ForkOpcodeInterface]
) -> int:
    """Return the gas cost of the iterating subcall."""
    if isinstance(self.iterating_subcall, int):
        return self.iterating_subcall
    return self.iterating_subcall.gas_cost(fork=fork)

iterating_subcall_reserve(*, fork)

Return the gas reserve needed so that the last iterating subcall does not fail due to the 63/64 rule.

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
595
596
597
598
599
600
601
602
603
604
605
def iterating_subcall_reserve(
    self, *, fork: Type[ForkOpcodeInterface]
) -> int:
    """
    Return the gas reserve needed so that the last iterating subcall does
    not fail due to the 63/64 rule.
    """
    iterating_subcall_gas_cost = self.iterating_subcall_gas_cost(fork=fork)
    return (
        iterating_subcall_gas_cost * 64 // 63
    ) - iterating_subcall_gas_cost

gas_cost_by_iteration_count(*, fork, iteration_count)

Return the cost of iterating through the bytecode N times.

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
def gas_cost_by_iteration_count(
    self, *, fork: Type[ForkOpcodeInterface], iteration_count: int
) -> int:
    """Return the cost of iterating through the bytecode N times."""
    loop_gas_cost = 0
    if iteration_count > 0:
        # Cold cost is just charged for the first iteration
        loop_gas_cost = self.iterating.gas_cost(fork=fork)
        # Warm cost is charged for all iterations except the first
        loop_gas_cost += self.warm_iterating.gas_cost(fork=fork) * (
            iteration_count - 1
        )
        # Subcall cost is charged for all iterations.
        loop_gas_cost += (
            self.iterating_subcall_gas_cost(fork=fork) * iteration_count
        )
    return (
        self.setup.gas_cost(fork=fork)
        + loop_gas_cost
        + self.cleanup.gas_cost(fork=fork)
    )

with_fixed_iteration_count(*, iteration_count)

Return a new FixedIterationsBytecode with the iteration count fixed.

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
629
630
631
632
633
634
635
636
637
638
639
640
641
642
def with_fixed_iteration_count(
    self, *, iteration_count: int
) -> "FixedIterationsBytecode":
    """
    Return a new FixedIterationsBytecode with the iteration count fixed.
    """
    return FixedIterationsBytecode(
        setup=self.setup,
        iterating=self.iterating,
        cleanup=self.cleanup,
        warm_iterating=self.warm_iterating,
        iterating_subcall=self.iterating_subcall,
        iteration_count=iteration_count,
    )

tx_gas_cost_by_iteration_count(*, fork, iteration_count, start_iteration=0, **intrinsic_cost_kwargs)

Calculate the exact gas cost of a transaction calling the bytecode for a given number of iterations.

The method accepts intrinsic gas cost kwargs to allow for the calculation of the intrinsic gas cost of the transaction.

If any of the intrinsic gas cost kwarg is callable, it will be called with iteration_count and start_iteration as keyword arguments.

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
def tx_gas_cost_by_iteration_count(
    self,
    *,
    fork: Fork,
    iteration_count: int,
    start_iteration: int = 0,
    **intrinsic_cost_kwargs: Any,
) -> int:
    """
    Calculate the exact gas cost of a transaction calling the bytecode
    for a given number of iterations.

    The method accepts intrinsic gas cost kwargs to allow for the
    calculation of the intrinsic gas cost of the transaction.

    If any of the intrinsic gas cost kwarg is callable, it will be called
    with iteration_count and start_iteration as keyword arguments.
    """
    intrinsic_gas_cost_calc = fork.transaction_intrinsic_cost_calculator()
    if "data" in intrinsic_cost_kwargs:
        intrinsic_cost_kwargs["calldata"] = intrinsic_cost_kwargs.pop(
            "data"
        )
    if "authorization_list" in intrinsic_cost_kwargs:
        intrinsic_cost_kwargs["authorization_list_or_count"] = len(
            intrinsic_cost_kwargs.pop("authorization_list")
        )
    if "return_cost_deducted_prior_execution" not in intrinsic_cost_kwargs:
        intrinsic_cost_kwargs["return_cost_deducted_prior_execution"] = (
            True
        )
    for key, value in intrinsic_cost_kwargs.items():
        if callable(value):
            intrinsic_cost_kwargs[key] = value(
                iteration_count=iteration_count,
                start_iteration=start_iteration,
            )
    return self.gas_cost_by_iteration_count(
        fork=fork, iteration_count=iteration_count
    ) + intrinsic_gas_cost_calc(**intrinsic_cost_kwargs)

tx_gas_limit_by_iteration_count(*, fork, iteration_count, start_iteration=0, **intrinsic_cost_kwargs)

Calculate the minimum gas limit of a transaction calling the bytecode for a given number of iterations.

The gas limit is calculated by adding the required extra gas for the last iteration due to the 63/64 rule.

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
def tx_gas_limit_by_iteration_count(
    self,
    *,
    fork: Fork,
    iteration_count: int,
    start_iteration: int = 0,
    **intrinsic_cost_kwargs: Any,
) -> int:
    """
    Calculate the minimum gas limit of a transaction calling the bytecode
    for a given number of iterations.

    The gas limit is calculated by adding the required extra gas for the
    last iteration due to the 63/64 rule.
    """
    return self.tx_gas_cost_by_iteration_count(
        fork=fork,
        iteration_count=iteration_count,
        start_iteration=start_iteration,
        **intrinsic_cost_kwargs,
    ) + self.iterating_subcall_reserve(fork=fork)

tx_iterations_by_gas_limit(*, fork, gas_limit, start_iteration=0, **intrinsic_cost_kwargs)

Calculate the number of iterations needed to reach the given a gas-to-be-used value.

Each element of the returned list represents the number of iterations for a single transaction.

If the fork's transaction gas limit cap is not None, the returned list will contain one item per transaction that represents the iteration count for that transaction, and no transaction will exceed the gas limit cap.

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
def tx_iterations_by_gas_limit(
    self,
    *,
    fork: Fork,
    gas_limit: int,
    start_iteration: int = 0,
    **intrinsic_cost_kwargs: Any,
) -> Generator[int, None, None]:
    """
    Calculate the number of iterations needed to reach the given a
    gas-to-be-used value.

    Each element of the returned list represents the number of iterations
    for a single transaction.

    If the fork's transaction gas limit cap is not `None`, the returned
    list will contain one item per transaction that represents the
    iteration count for that transaction, and no transaction will exceed
    the gas limit cap.
    """
    gas_limit_cap = fork.transaction_gas_limit_cap()
    remaining_gas = gas_limit

    while remaining_gas >= self.tx_gas_limit_by_iteration_count(
        fork=fork,
        iteration_count=1,
        start_iteration=start_iteration,
        **intrinsic_cost_kwargs,
    ):
        # Binary search for the maximum number of iterations that fits
        # within remaining_gas
        max_gas_limit = (
            min(remaining_gas, gas_limit_cap)
            if gas_limit_cap is not None
            else remaining_gas
        )
        best_iterations, best_iterations_gas = (
            self._binary_search_iterations(
                fork=fork,
                gas_limit=max_gas_limit,
                start_iteration=start_iteration,
                **intrinsic_cost_kwargs,
            )
        )
        yield best_iterations
        remaining_gas -= best_iterations_gas
        start_iteration += best_iterations

tx_iterations_by_total_iteration_count(*, fork, total_iterations, start_iteration=0, **intrinsic_cost_kwargs)

Calculate how to split a total number of iterations across multiple transactions so that each transaction fits within the gas limit cap.

Returns a list where each element represents the number of iterations for that transaction, and the sum equals total_iterations.

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
def tx_iterations_by_total_iteration_count(
    self,
    *,
    fork: Fork,
    total_iterations: int,
    start_iteration: int = 0,
    **intrinsic_cost_kwargs: Any,
) -> Generator[int, None, None]:
    """
    Calculate how to split a total number of iterations across multiple
    transactions so that each transaction fits within the gas limit cap.

    Returns a list where each element represents the number of iterations
    for that transaction, and the sum equals total_iterations.
    """
    gas_limit_cap = fork.transaction_gas_limit_cap()
    if gas_limit_cap is None:
        # No limit, all iterations fit in a single transaction.
        yield total_iterations
        return
    remaining_iterations = total_iterations
    best_iterations: int | None = None
    constant_intrinsic_gas_cost = self._intrinsic_cost_is_constant(
        intrinsic_cost_kwargs
    )

    while remaining_iterations > 0:
        if best_iterations is None or not constant_intrinsic_gas_cost:
            best_iterations, _ = self._binary_search_iterations(
                fork=fork,
                gas_limit=gas_limit_cap,
                start_iteration=start_iteration,
                **intrinsic_cost_kwargs,
            )
        if best_iterations >= remaining_iterations:
            yield remaining_iterations
            return
        else:
            yield best_iterations
            remaining_iterations -= best_iterations
            start_iteration += best_iterations

transactions_by_gas_limit(*, fork, gas_limit, start_iteration=0, sender, to, tx_gas_limit_delta=0, **tx_kwargs)

Generate a list of transactions calling the bytecode with a given gas limit.

The method accepts all keyword arguments that can be passed to the Transaction constructor.

If any of the keyword arguments is callable, it will be called with iteration_count and start_iteration as keyword arguments. E.g. when the calldata that needs to be passed to the iterating bytecode changes with each iteration, the calldata can be generated dynamically by passing a callable to the calldata keyword argument.

The returned object also contains an extra field with the expected gas cost of the transaction by the end of execution.

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
def transactions_by_gas_limit(
    self,
    *,
    fork: Fork,
    gas_limit: int,
    start_iteration: int = 0,
    sender: EOA,
    to: Address | None,
    tx_gas_limit_delta: int = 0,
    **tx_kwargs: Any,
) -> Generator[TransactionWithCost, None, None]:
    """
    Generate a list of transactions calling the bytecode with a given gas
    limit.

    The method accepts all keyword arguments that can be passed to the
    `Transaction` constructor.

    If any of the keyword arguments is callable, it will be called with
    iteration_count and start_iteration as keyword arguments.
    E.g. when the calldata that needs to be passed to the iterating
    bytecode changes with each iteration, the calldata can be generated
    dynamically by passing a callable to the calldata keyword argument.

    The returned object also contains an extra field with the expected
    gas cost of the transaction by the end of execution.
    """
    intrinsic_cost_kwargs = tx_kwargs.copy()

    if "calldata" in tx_kwargs:
        tx_kwargs["data"] = tx_kwargs.pop("calldata")
    if "return_cost_deducted_prior_execution" in tx_kwargs:
        tx_kwargs.pop("return_cost_deducted_prior_execution")
    for iteration_count in self.tx_iterations_by_gas_limit(
        fork=fork,
        gas_limit=gas_limit,
        start_iteration=start_iteration,
        **intrinsic_cost_kwargs,
    ):
        tx_gas_limit = self.tx_gas_limit_by_iteration_count(
            fork=fork,
            iteration_count=iteration_count,
            start_iteration=start_iteration,
            **intrinsic_cost_kwargs,
        )
        tx_gas_cost = self.tx_gas_cost_by_iteration_count(
            fork=fork,
            iteration_count=iteration_count,
            start_iteration=start_iteration,
            **intrinsic_cost_kwargs,
        )
        current_tx_kwargs = tx_kwargs.copy()

        for key, value in current_tx_kwargs.items():
            if callable(value):
                current_tx_kwargs[key] = value(
                    iteration_count=iteration_count,
                    start_iteration=start_iteration,
                )
        yield TransactionWithCost(
            to=to,
            gas_limit=tx_gas_limit + tx_gas_limit_delta,
            sender=sender,
            gas_cost=tx_gas_cost,
            **current_tx_kwargs,
        )
        start_iteration += iteration_count

transactions_by_total_iteration_count(*, fork, total_iterations, start_iteration=0, sender, to, tx_gas_limit_delta=0, **tx_kwargs)

Generate a list of transactions calling the bytecode with a given total iteration count.

The method accepts all keyword arguments that can be passed to the Transaction constructor.

If any of the keyword arguments is callable, it will be called with iteration_count and start_iteration as keyword arguments. E.g. when the calldata that needs to be passed to the iterating bytecode changes with each iteration, the calldata can be generated dynamically by passing a callable to the calldata keyword argument.

The returned object also contains an extra field with the expected gas cost of the transaction by the end of execution.

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
def transactions_by_total_iteration_count(
    self,
    *,
    fork: Fork,
    total_iterations: int,
    start_iteration: int = 0,
    sender: EOA,
    to: Address | None,
    tx_gas_limit_delta: int = 0,
    **tx_kwargs: Any,
) -> Generator[TransactionWithCost, None, None]:
    """
    Generate a list of transactions calling the bytecode with a given
    total iteration count.

    The method accepts all keyword arguments that can be passed to the
    `Transaction` constructor.

    If any of the keyword arguments is callable, it will be called with
    iteration_count and start_iteration as keyword arguments.
    E.g. when the calldata that needs to be passed to the iterating
    bytecode changes with each iteration, the calldata can be generated
    dynamically by passing a callable to the calldata keyword argument.

    The returned object also contains an extra field with the expected
    gas cost of the transaction by the end of execution.
    """
    intrinsic_cost_kwargs = tx_kwargs.copy()

    if "calldata" in tx_kwargs:
        tx_kwargs["data"] = tx_kwargs.pop("calldata")
    if "return_cost_deducted_prior_execution" in tx_kwargs:
        tx_kwargs.pop("return_cost_deducted_prior_execution")
    for iteration_count in self.tx_iterations_by_total_iteration_count(
        fork=fork,
        total_iterations=total_iterations,
        start_iteration=start_iteration,
        **intrinsic_cost_kwargs,
    ):
        tx_gas_limit = self.tx_gas_limit_by_iteration_count(
            fork=fork,
            iteration_count=iteration_count,
            start_iteration=start_iteration,
            **intrinsic_cost_kwargs,
        )
        tx_gas_cost = self.tx_gas_cost_by_iteration_count(
            fork=fork,
            iteration_count=iteration_count,
            start_iteration=start_iteration,
            **intrinsic_cost_kwargs,
        )
        current_tx_kwargs = tx_kwargs.copy()

        for key, value in current_tx_kwargs.items():
            if callable(value):
                current_tx_kwargs[key] = value(
                    iteration_count=iteration_count,
                    start_iteration=start_iteration,
                )
        yield TransactionWithCost(
            to=to,
            gas_limit=tx_gas_limit + tx_gas_limit_delta,
            sender=sender,
            gas_cost=tx_gas_cost,
            **current_tx_kwargs,
        )
        start_iteration += iteration_count

Switch

Bases: Bytecode

Helper class used to generate switch-case expressions in EVM bytecode.

Switch-case behavior
  • If no condition is met in the list of BytecodeCases conditions, the default_action bytecode is executed.

  • If multiple conditions are met, the action from the first valid condition is the only one executed.

  • There is no fall through; it is not possible to execute multiple actions.

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
class Switch(Bytecode):
    """
    Helper class used to generate switch-case expressions in EVM bytecode.

    Switch-case behavior:
      - If no condition is met in the list of BytecodeCases
        conditions, the `default_action` bytecode is executed.

      - If multiple conditions are met, the action from the first valid
        condition is the only one executed.

      - There is no fall through; it is not possible to execute
        multiple actions.
    """

    default_action: Bytecode | Op | None
    """
    The default bytecode to execute; if no condition is met, this bytecode is
    executed.
    """

    cases: List[Case]
    """
    A list of Cases: The first element with a condition that
    evaluates to a non-zero value is the one that is executed.
    """

    def __new__(
        cls,
        *,
        default_action: Bytecode | Op | None = None,
        cases: List[Case],
    ) -> Self:
        """
        Assemble the bytecode by looping over the list of cases and adding the
        necessary [R]JUMPI and JUMPDEST opcodes in order to replicate
        switch-case behavior.
        """
        # The length required to jump over subsequent actions to the final
        # JUMPDEST at the end of the switch-case block:
        #   - add 6 per case for the length of the JUMPDEST and
        #     JUMP(ADD(PC, action_jump_length)) bytecode
        #
        #   - add 3 to the total to account for this action's JUMP;
        #     the PC within the call requires a "correction" of 3.

        bytecode = Bytecode()

        # All conditions get prepended to this bytecode; if none are met, we
        # reach the default
        action_jump_length = sum(len(case.action) + 6 for case in cases) + 3
        bytecode = default_action + Op.JUMP(Op.ADD(Op.PC, action_jump_length))
        # The length required to jump over the default action and its JUMP
        # bytecode
        condition_jump_length = len(bytecode) + 3

        # Reversed: first case in the list has priority; it will become the
        # outer-most onion layer. We build up layers around the default_action,
        # after 1 iteration of the loop, a simplified representation of the
        # bytecode is:
        #
        # JUMPI(case[n-1].condition)
        # + default_action + JUMP()
        # + JUMPDEST + case[n-1].action + JUMP()
        #
        # and after n=len(cases) iterations:
        #
        # JUMPI(case[0].condition)
        # + JUMPI(case[1].condition)
        # ...
        # + JUMPI(case[n-1].condition) + default_action + JUMP() + JUMPDEST +
        # case[n-1].action + JUMP() + ... + JUMPDEST + case[1].action + JUMP()
        # + JUMPDEST + case[0].action + JUMP()
        for case in reversed(cases):
            action = case.action
            action_jump_length -= len(action) + 6
            action = (
                Op.JUMPDEST
                + action
                + Op.JUMP(Op.ADD(Op.PC, action_jump_length))
            )
            condition = Op.JUMPI(
                Op.ADD(Op.PC, condition_jump_length), case.condition
            )
            # wrap the current case around the onion as its next layer
            bytecode = condition + bytecode + action
            condition_jump_length += len(condition) + len(action)

        bytecode += Op.JUMPDEST

        instance = super().__new__(cls, bytecode)
        instance.default_action = default_action
        instance.cases = cases
        return instance

default_action instance-attribute

The default bytecode to execute; if no condition is met, this bytecode is executed.

cases instance-attribute

A list of Cases: The first element with a condition that evaluates to a non-zero value is the one that is executed.

__new__(*, default_action=None, cases)

Assemble the bytecode by looping over the list of cases and adding the necessary [R]JUMPI and JUMPDEST opcodes in order to replicate switch-case behavior.

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
def __new__(
    cls,
    *,
    default_action: Bytecode | Op | None = None,
    cases: List[Case],
) -> Self:
    """
    Assemble the bytecode by looping over the list of cases and adding the
    necessary [R]JUMPI and JUMPDEST opcodes in order to replicate
    switch-case behavior.
    """
    # The length required to jump over subsequent actions to the final
    # JUMPDEST at the end of the switch-case block:
    #   - add 6 per case for the length of the JUMPDEST and
    #     JUMP(ADD(PC, action_jump_length)) bytecode
    #
    #   - add 3 to the total to account for this action's JUMP;
    #     the PC within the call requires a "correction" of 3.

    bytecode = Bytecode()

    # All conditions get prepended to this bytecode; if none are met, we
    # reach the default
    action_jump_length = sum(len(case.action) + 6 for case in cases) + 3
    bytecode = default_action + Op.JUMP(Op.ADD(Op.PC, action_jump_length))
    # The length required to jump over the default action and its JUMP
    # bytecode
    condition_jump_length = len(bytecode) + 3

    # Reversed: first case in the list has priority; it will become the
    # outer-most onion layer. We build up layers around the default_action,
    # after 1 iteration of the loop, a simplified representation of the
    # bytecode is:
    #
    # JUMPI(case[n-1].condition)
    # + default_action + JUMP()
    # + JUMPDEST + case[n-1].action + JUMP()
    #
    # and after n=len(cases) iterations:
    #
    # JUMPI(case[0].condition)
    # + JUMPI(case[1].condition)
    # ...
    # + JUMPI(case[n-1].condition) + default_action + JUMP() + JUMPDEST +
    # case[n-1].action + JUMP() + ... + JUMPDEST + case[1].action + JUMP()
    # + JUMPDEST + case[0].action + JUMP()
    for case in reversed(cases):
        action = case.action
        action_jump_length -= len(action) + 6
        action = (
            Op.JUMPDEST
            + action
            + Op.JUMP(Op.ADD(Op.PC, action_jump_length))
        )
        condition = Op.JUMPI(
            Op.ADD(Op.PC, condition_jump_length), case.condition
        )
        # wrap the current case around the onion as its next layer
        bytecode = condition + bytecode + action
        condition_jump_length += len(condition) + len(action)

    bytecode += Op.JUMPDEST

    instance = super().__new__(cls, bytecode)
    instance.default_action = default_action
    instance.cases = cases
    return instance

TransactionWithCost

Bases: Transaction

Transaction object that can include the expected gas to be consumed.

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
492
493
494
495
class TransactionWithCost(Transaction):
    """Transaction object that can include the expected gas to be consumed."""

    gas_cost: int = Field(..., exclude=True)

While

Bases: Bytecode

Helper class used to generate while-loop bytecode.

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
class While(Bytecode):
    """Helper class used to generate while-loop bytecode."""

    def __new__(
        cls,
        *,
        body: Bytecode | Op,
        condition: Bytecode | Op | None = None,
    ) -> Self:
        """
        Assemble the loop bytecode.

        The condition nor the body can leave a stack item on the stack.
        """
        bytecode = Bytecode()
        bytecode += Op.JUMPDEST
        bytecode += body
        if condition is not None:
            bytecode += Op.JUMPI(
                Op.SUB(Op.PC, Op.PUSH4[len(body) + len(condition) + 6]),
                condition,
            )
        else:
            bytecode += Op.JUMP(Op.SUB(Op.PC, Op.PUSH4[len(body) + 6]))
        return super().__new__(cls, bytecode)

__new__(*, body, condition=None)

Assemble the loop bytecode.

The condition nor the body can leave a stack item on the stack.

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
def __new__(
    cls,
    *,
    body: Bytecode | Op,
    condition: Bytecode | Op | None = None,
) -> Self:
    """
    Assemble the loop bytecode.

    The condition nor the body can leave a stack item on the stack.
    """
    bytecode = Bytecode()
    bytecode += Op.JUMPDEST
    bytecode += body
    if condition is not None:
        bytecode += Op.JUMPI(
            Op.SUB(Op.PC, Op.PUSH4[len(body) + len(condition) + 6]),
            condition,
        )
    else:
        bytecode += Op.JUMP(Op.SUB(Op.PC, Op.PUSH4[len(body) + 6]))
    return super().__new__(cls, bytecode)

DeploymentTestType

Bases: StrEnum

Represents the type of deployment test.

Source code in packages/testing/src/execution_testing/tools/utility/generators.py
20
21
22
23
24
25
class DeploymentTestType(StrEnum):
    """Represents the type of deployment test."""

    DEPLOY_BEFORE_FORK = "deploy_before_fork"
    DEPLOY_ON_FORK_BLOCK = "deploy_on_fork_block"
    DEPLOY_AFTER_FORK = "deploy_after_fork"

gas_test(*, fork, state_test, pre, setup_code, subject_code, subject_code_warm=None, tear_down_code=None, cold_gas=None, warm_gas=None, subject_address=None, subject_balance=0, oog_difference=1, out_of_gas_testing=True, prelude_code=None, tx_gas=None)

Create State Test to check the gas cost of a sequence of code.

setup_code and tear_down_code are called multiple times during the test, and MUST NOT have any side-effects which persist across message calls, and in particular, any effects on the gas usage of subject_code.

Source code in packages/testing/src/execution_testing/tools/utility/generators.py
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
def gas_test(
    *,
    fork: Fork,
    state_test: StateTestFiller,
    pre: Alloc,
    setup_code: Bytecode,
    subject_code: Bytecode,
    subject_code_warm: Bytecode | None = None,
    tear_down_code: Bytecode | None = None,
    cold_gas: int | None = None,
    warm_gas: int | None = None,
    subject_address: Address | None = None,
    subject_balance: int = 0,
    oog_difference: int = 1,
    out_of_gas_testing: bool = True,
    prelude_code: Bytecode | None = None,
    tx_gas: int | None = None,
) -> None:
    """
    Create State Test to check the gas cost of a sequence of code.

    `setup_code` and `tear_down_code` are called multiple times during the
    test, and MUST NOT have any side-effects which persist across message
    calls, and in particular, any effects on the gas usage of `subject_code`.
    """
    if fork < Berlin:
        raise ValueError(
            "Gas tests before Berlin are not supported due to CALL gas changes"
        )

    if cold_gas is None:
        cold_gas = subject_code.gas_cost(fork)

    if cold_gas <= 0:
        raise ValueError(
            f"Target gas allocations (cold_gas) must be > 0, got {cold_gas}"
        )
    if warm_gas is None:
        if subject_code_warm is not None:
            warm_gas = subject_code_warm.gas_cost(fork)
        else:
            warm_gas = cold_gas

    sender = pre.fund_eoa()
    if tear_down_code is None:
        tear_down_code = Op.STOP
    address_baseline = pre.deploy_contract(setup_code + tear_down_code)
    code_subject = setup_code + subject_code + tear_down_code
    address_subject = pre.deploy_contract(
        code_subject,
        balance=subject_balance,
        address=subject_address,
    )
    # 2 times GAS, POP, CALL, 6 times PUSH1 - instructions charged for at every
    # gas run
    gas_costs = fork.gas_costs()
    opcode_gas_cost = gas_costs.G_BASE
    opcode_pop_cost = gas_costs.G_BASE
    opcode_push_cost = gas_costs.G_VERY_LOW
    gas_single_gas_run = (
        2 * opcode_gas_cost
        + opcode_pop_cost
        + gas_costs.G_WARM_ACCOUNT_ACCESS
        + 6 * opcode_push_cost
    )
    address_legacy_harness = pre.deploy_contract(
        code=(
            # warm subject and baseline without executing
            (
                Op.BALANCE(address_subject)
                + Op.POP
                + Op.BALANCE(address_baseline)
                + Op.POP
            )
            # run any "prelude" code that may have universal side effects
            + prelude_code
            # Baseline gas run
            + (
                Op.GAS
                + Op.CALL(address=address_baseline, gas=Op.GAS)
                + Op.POP
                + Op.GAS
                + Op.SWAP1
                + Op.SUB
            )
            # cold gas run
            + (
                Op.GAS
                + Op.CALL(address=address_subject, gas=Op.GAS)
                + Op.POP
                + Op.GAS
                + Op.SWAP1
                + Op.SUB
            )
            # warm gas run
            + (
                Op.GAS
                + Op.CALL(address=address_subject, gas=Op.GAS)
                + Op.POP
                + Op.GAS
                + Op.SWAP1
                + Op.SUB
            )
            # Store warm gas: DUP3 is the gas of the baseline gas run
            + (
                Op.DUP3
                + Op.SWAP1
                + Op.SUB
                + Op.PUSH2(slot_warm_gas)
                + Op.SSTORE
            )
            # store cold gas: DUP2 is the gas of the baseline gas run
            + (
                Op.DUP2
                + Op.SWAP1
                + Op.SUB
                + Op.PUSH2(slot_cold_gas)
                + Op.SSTORE
            )
            + (
                (
                    # do an oog gas run, unless skipped with
                    # `out_of_gas_testing=False`:
                    #
                    # - DUP7 is the gas of the baseline gas run, after other
                    #   CALL args were pushed
                    # - subtract the gas charged by the harness
                    # - add warm gas charged by the subject
                    # - subtract `oog_difference` to cause OOG exception
                    #   (1 by default)
                    Op.SSTORE(
                        slot_oog_call_result,
                        Op.CALL(
                            gas=Op.ADD(
                                warm_gas - gas_single_gas_run - oog_difference,
                                Op.DUP7,
                            ),
                            address=address_subject,
                        ),
                    )
                    # sanity gas run: not subtracting 1 to see if enough gas
                    # makes the call succeed
                    + Op.SSTORE(
                        slot_sanity_call_result,
                        Op.CALL(
                            gas=Op.ADD(warm_gas - gas_single_gas_run, Op.DUP7),
                            address=address_subject,
                        ),
                    )
                    + Op.STOP
                )
                if out_of_gas_testing
                else Op.STOP
            )
        ),
    )

    post = {
        address_legacy_harness: Account(
            storage={
                slot_warm_gas: warm_gas,
                slot_cold_gas: cold_gas,
            },
        ),
    }

    if out_of_gas_testing:
        post[address_legacy_harness].storage[slot_oog_call_result] = (
            LEGACY_CALL_FAILURE
        )
        post[address_legacy_harness].storage[slot_sanity_call_result] = (
            LEGACY_CALL_SUCCESS
        )

    if tx_gas is None:
        tx_gas = gas_single_gas_run + cold_gas + 500_000
    tx = Transaction(
        to=address_legacy_harness, gas_limit=tx_gas, sender=sender
    )

    state_test(pre=pre, tx=tx, post=post)

generate_system_contract_deploy_test(*, fork, tx_json_path, expected_deploy_address, fail_on_empty_code, expected_system_contract_storage=None)

Generate a test that verifies the correct deployment of a system contract.

Generates following test cases:

                | before/after fork | fail on      | invalid block  |
                                      empty block  |                |

--------------------|-------------------|--------------|----------------| deploy_before_fork-| before | False | False | nonzero_balance

deploy_before_fork-| before | True | False | zero_balance

deploy_on_fork_ | on fork block | False | False | block-nonzero_ balance

deploy_on_fork_ | on fork block | True | False | block-zero_balance

deploy_after_fork | after | False | False | -nonzero_balance

deploy_after_fork | after | True | True | -zero_balance

The has balance parametrization does not have an effect on the expectation of the test.

Parameters:

Name Type Description Default
fork Fork

The fork to test.

required
tx_json_path Path

Path to the JSON file with the transaction to deploy the system contract. Providing a JSON file is useful to copy-paste the transaction from the EIP.

required
expected_deploy_address Address

The expected address of the deployed contract.

required
fail_on_empty_code bool

If True, the test is expected to fail on empty code.

required
expected_system_contract_storage Dict | None

The expected storage of the system contract.

None
Source code in packages/testing/src/execution_testing/tools/utility/generators.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
def generate_system_contract_deploy_test(
    *,
    fork: Fork,
    tx_json_path: Path,
    expected_deploy_address: Address,
    fail_on_empty_code: bool,
    expected_system_contract_storage: Dict | None = None,
) -> Callable[[SystemContractDeployTestFunction], Callable]:
    """
    Generate a test that verifies the correct deployment of a system contract.

    Generates following test cases:

                        | before/after fork | fail on      | invalid block  |
                                              empty block  |                |
    --------------------|-------------------|--------------|----------------|
    `deploy_before_fork-| before            | False        | False          |
    nonzero_balance`

    `deploy_before_fork-| before            | True         | False          |
    zero_balance`

    `deploy_on_fork_    | on fork block     | False        | False          |
    block-nonzero_
    balance`

    `deploy_on_fork_    | on fork block     | True         | False          |
    block-zero_balance`

    `deploy_after_fork  | after             | False        | False          |
    -nonzero_balance`

    `deploy_after_fork  | after             | True         | True           |
    -zero_balance`


    The `has balance` parametrization does not have an effect on the
    expectation of the test.

    Arguments:
      fork (Fork): The fork to test.
      tx_json_path (Path): Path to the JSON file with the transaction to
                           deploy the system contract. Providing a JSON
                           file is useful to copy-paste the transaction
                           from the EIP.
      expected_deploy_address (Address): The expected address of the deployed
                                         contract.
      fail_on_empty_code (bool): If True, the test is expected to fail
                                 on empty code.
      expected_system_contract_storage (Dict | None): The expected storage of
                                                      the system contract.

    """
    with open(tx_json_path, mode="r") as f:
        tx_json = json.loads(f.read())
    if "gasLimit" not in tx_json and "gas" in tx_json:
        tx_json["gasLimit"] = tx_json["gas"]
        del tx_json["gas"]
    if "protected" not in tx_json:
        tx_json["protected"] = False
    deploy_tx = Transaction.model_validate(tx_json).with_signature_and_sender()
    gas_price = deploy_tx.gas_price
    assert gas_price is not None
    deployer_required_balance = deploy_tx.gas_limit * gas_price
    deployer_address = deploy_tx.sender
    if "hash" in tx_json:
        assert deploy_tx.hash == Hash(tx_json["hash"])
    if "sender" in tx_json:
        assert deploy_tx.sender == Address(tx_json["sender"])

    def decorator(func: SystemContractDeployTestFunction) -> Callable:
        @pytest.mark.parametrize(
            "has_balance",
            [
                pytest.param(ContractAddressHasBalance.NONZERO_BALANCE),
                pytest.param(ContractAddressHasBalance.ZERO_BALANCE),
            ],
            ids=lambda x: x.name.lower(),
        )
        @pytest.mark.parametrize(
            "test_type",
            [
                pytest.param(DeploymentTestType.DEPLOY_BEFORE_FORK),
                pytest.param(DeploymentTestType.DEPLOY_ON_FORK_BLOCK),
                pytest.param(
                    DeploymentTestType.DEPLOY_AFTER_FORK,
                    marks=[pytest.mark.exception_test]
                    if fail_on_empty_code
                    else [],
                ),
            ],
            ids=lambda x: x.name.lower(),
        )
        @pytest.mark.pre_alloc_mutable
        @pytest.mark.valid_at_transition_to(fork.name())
        def wrapper(
            blockchain_test: BlockchainTestFiller,
            has_balance: ContractAddressHasBalance,
            pre: Alloc,
            test_type: DeploymentTestType,
            fork: Fork,
        ) -> None:
            assert deployer_address is not None
            assert deploy_tx.created_contract == expected_deploy_address
            blocks: List[Block] = []

            if test_type == DeploymentTestType.DEPLOY_BEFORE_FORK:
                blocks = [
                    Block(  # Deployment block
                        txs=[deploy_tx],
                        timestamp=14_999,
                    ),
                    Block(  # Empty block on fork
                        txs=[],
                        timestamp=15_000,
                    ),
                ]
            elif test_type == DeploymentTestType.DEPLOY_ON_FORK_BLOCK:
                blocks = [
                    Block(  # Deployment on fork block
                        txs=[deploy_tx],
                        timestamp=15_000,
                    ),
                    Block(  # Empty block after fork
                        txs=[],
                        timestamp=15_001,
                    ),
                ]
            elif test_type == DeploymentTestType.DEPLOY_AFTER_FORK:
                blocks = [
                    Block(  # Empty block on fork
                        txs=[],
                        timestamp=15_000,
                        exception=BlockException.SYSTEM_CONTRACT_EMPTY
                        if fail_on_empty_code
                        else None,
                    )
                ]
                if not fail_on_empty_code:
                    blocks.append(
                        Block(  # Deployment after fork block
                            txs=[deploy_tx],
                            timestamp=15_001,
                        )
                    )
                    blocks.append(
                        Block(  # Empty block after deployment
                            txs=[],
                            timestamp=15_002,
                        ),
                    )
            balance = (
                1
                if has_balance == ContractAddressHasBalance.NONZERO_BALANCE
                else 0
            )
            pre[expected_deploy_address] = Account(
                code=b"",  # Remove the code that is automatically allocated on
                # the fork
                nonce=0,
                balance=balance,
            )
            pre.fund_address(deployer_address, deployer_required_balance)

            expected_deploy_address_int = int.from_bytes(
                expected_deploy_address, "big"
            )

            post = Alloc()
            fork_pre_allocation = fork.pre_allocation_blockchain()
            assert expected_deploy_address_int in fork_pre_allocation
            expected_code = fork_pre_allocation[expected_deploy_address_int][
                "code"
            ]
            # Note: balance check is omitted; it may be modified by the
            # underlying, decorated test
            account_kwargs = {
                "code": expected_code,
                "nonce": 1,
            }
            if expected_system_contract_storage:
                account_kwargs["storage"] = expected_system_contract_storage
            if (
                test_type != DeploymentTestType.DEPLOY_AFTER_FORK
                or not fail_on_empty_code
            ):
                post[expected_deploy_address] = Account(**account_kwargs)
                post[deployer_address] = Account(
                    nonce=1,
                )

            # Extra blocks (if any) returned by the decorated function to add
            # after the contract is deployed.
            if (
                test_type != DeploymentTestType.DEPLOY_AFTER_FORK
                or not fail_on_empty_code
            ):
                # Only fill more blocks if the deploy block does not fail.
                blocks += list(
                    func(fork=fork, pre=pre, post=post, test_type=test_type)
                )

            blockchain_test(
                pre=pre,
                blocks=blocks,
                post=post,
            )

        wrapper.__name__ = func.__name__  # type: ignore
        wrapper.__doc__ = func.__doc__

        return wrapper

    return decorator

generate_system_contract_error_test(*, max_gas_limit)

Generate a test that verifies the correct behavior when a system contract fails execution.

Parametrizations required: - system_contract (Address): The address of the system contract to deploy. - valid_from (Fork): The fork from which the test is valid.

Parameters:

Name Type Description Default
max_gas_limit int

The maximum gas limit for the system transaction.

required
Source code in packages/testing/src/execution_testing/tools/utility/generators.py
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
def generate_system_contract_error_test(
    *,
    max_gas_limit: int,
) -> Callable[[SystemContractDeployTestFunction], Callable]:
    """
    Generate a test that verifies the correct behavior when a system contract
    fails execution.

    Parametrizations required:
    - system_contract (Address): The address of the system contract to deploy.
    - valid_from (Fork): The fork from which the test is valid.

    Arguments:
      max_gas_limit (int): The maximum gas limit for the system transaction.

    """

    def decorator(func: SystemContractDeployTestFunction) -> Callable:
        @pytest.mark.parametrize(
            "test_type", [v.param() for v in SystemContractTestType]
        )
        @pytest.mark.execute(pytest.mark.skip(reason="modifies pre-alloc"))
        def wrapper(
            blockchain_test: BlockchainTestFiller,
            pre: Alloc,
            test_type: SystemContractTestType,
            system_contract: Address,
            fork: Fork,
        ) -> None:
            modified_system_contract_code = Bytecode()

            # Depending on the test case, we need to modify the system contract
            # code accordingly.
            if (
                test_type == SystemContractTestType.GAS_LIMIT
                or test_type == SystemContractTestType.OUT_OF_GAS_ERROR
            ):
                # Run code so that it reaches the gas limit.
                gas_costs = fork.gas_costs()
                # The code works by storing N values to storage, and N is
                # calculated based on the gas costs for the given fork. This
                # code will only work once, so if the system contract is re-
                # executed in a subsequent block, it will consume less gas.
                gas_used_per_storage = (
                    gas_costs.G_STORAGE_SET
                    + gas_costs.G_COLD_SLOAD
                    + (gas_costs.G_VERY_LOW * 2)
                )
                modified_system_contract_code += sum(
                    Op.SSTORE(i, 1)
                    for i in range(max_gas_limit // gas_used_per_storage)
                )
                # If the gas limit is not divisible by the gas used per
                # storage, we need to add some NO-OP (JUMPDEST) to the code
                # that each consume 1 gas.
                assert gas_costs.G_JUMPDEST == 1, (
                    "JUMPDEST gas cost should be 1, but got "
                    f"{gas_costs.G_JUMPDEST}. Generator "
                    "`generate_system_contract_error_test` needs updating."
                )
                modified_system_contract_code += sum(
                    Op.JUMPDEST
                    for _ in range(max_gas_limit % gas_used_per_storage)
                )

                if test_type == SystemContractTestType.OUT_OF_GAS_ERROR:
                    # If the test type is OUT_OF_GAS_ERROR, we need to add a
                    # JUMPDEST to the code to ensure that we go over the limit
                    # by one gas.
                    modified_system_contract_code += Op.JUMPDEST
                modified_system_contract_code += Op.STOP
            elif test_type == SystemContractTestType.REVERT_ERROR:
                # Run a simple revert.
                modified_system_contract_code = Op.REVERT(0, 0)
            elif test_type == SystemContractTestType.EXCEPTION_ERROR:
                # Run a simple exception.
                modified_system_contract_code = Op.INVALID()
            else:
                raise ValueError(f"Invalid test type: {test_type}")

            pre[system_contract] = Account(
                code=modified_system_contract_code,
                nonce=1,
                balance=0,
            )

            # Simple test transaction to verify the block failed to modify the
            # state.
            value_receiver = pre.fund_eoa(amount=0)
            test_tx = Transaction(
                to=value_receiver,
                value=1,
                gas_limit=100_000,
                sender=pre.fund_eoa(),
            )
            post = Alloc()
            post[value_receiver] = (
                Account.NONEXISTENT
                if test_type != SystemContractTestType.GAS_LIMIT
                else Account(
                    balance=1,
                )
            )

            blockchain_test(
                pre=pre,
                blocks=[
                    Block(  # Deployment block
                        txs=[test_tx],
                        exception=BlockException.SYSTEM_CONTRACT_CALL_FAILED
                        if test_type != SystemContractTestType.GAS_LIMIT
                        else None,
                    )
                ],
                post=post,
            )

        wrapper.__name__ = func.__name__  # type: ignore
        wrapper.__doc__ = func.__doc__

        return wrapper

    return decorator

extend_with_defaults(defaults, cases, **parametrize_kwargs)

Extend test cases with default parameter values.

This utility function extends test case parameters by adding default values from the defaults dictionary to each case in the cases list. If a case already specifies a value for a parameter, its default is ignored.

This function is particularly useful in scenarios where you want to define a common set of default values but allow individual test cases to override them as needed.

The function returns a dictionary that can be directly unpacked and passed to the @pytest.mark.parametrize decorator.

Parameters:

Name Type Description Default
defaults Dict[str, Any]

A dictionary of default parameter names and their values. These values will be added to each case unless the case already defines a value for each parameter.

required
cases List[ParameterSet]

A list of pytest.param objects representing different test cases. Its first argument must be a dictionary defining parameter names and values.

required
parametrize_kwargs Any

Additional keyword arguments to be passed to @pytest.mark.parametrize. These arguments are not modified by this function and are passed through unchanged.

{}

Returns:

Type Description
Dict[str, Any]

Dict[str, Any]: A dictionary with the following structure: argnames: A list of parameter names. argvalues: A list of test cases with modified parameter values. parametrize_kwargs: Additional keyword arguments passed through unchanged.

Example:

@pytest.mark.parametrize(**extend_with_defaults(
    defaults=dict(
        min_value=0,  # default minimum value is 0
        max_value=100, # default maximum value is 100
        average=50,  # default average value is 50
    ),

    cases=[
        pytest.param(
            dict(),  # use default
            values id='default_case',
        ),

        pytest.param(
            dict(min_value=10),  # override with min_value=10
            id='min_value_10',
        ),

        pytest.param(
            dict(max_value=200),  # override with max_value=200
            id='max_value_200',
        ),

        pytest.param(
            dict(min_value=-10, max_value=50),  # override both min_value
                                                # and max_value
            id='min_-10_max_50',
        ),

        pytest.param(
            # all defaults are overridden
            dict(min_value=20, max_value=80, average=50),
            id="min_20_max_80_avg_50",
        ),

        pytest.param(
            dict(min_value=100, max_value=0),  # invalid range
            id='invalid_range',
            marks=pytest.mark.xfail(reason='invalid range'),
        )
    ],
))
def test_range(min_value, max_value, average):
    assert min_value <= max_value
    assert min_value <= average <= max_value

The above test will execute with the following sets of parameters:

"default_case": {"min_value": 0, "max_value": 100, "average": 50}
"min_value_10": {"min_value": 10, "max_value": 100, "average": 50}
"max_value_200": {"min_value": 0, "max_value": 200, "average": 50}
"min_-10_max_50": {"min_value": -10, "max_value": 50, "average": 50}
"min_20_max_80_avg_50": {"min_value": 20, "max_value": 80, "average": 50}
# expected to fail
"invalid_range": {"min_value": 100, "max_value": 0, "average": 50}

Notes: - Each case in cases must contain exactly one value, which is a dictionary of parameter values. - The function performs an in-place update of the cases list, so the original cases list is modified.

Source code in packages/testing/src/execution_testing/tools/utility/pytest.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
def extend_with_defaults(
    defaults: Dict[str, Any],
    cases: List[ParameterSet],
    **parametrize_kwargs: Any,
) -> Dict[str, Any]:
    """
    Extend test cases with default parameter values.

    This utility function extends test case parameters by adding default values
    from the `defaults` dictionary to each case in the `cases` list. If a case
    already specifies a value for a parameter, its default is ignored.

    This function is particularly useful in scenarios where you want to define
    a common set of default values but allow individual test cases to override
    them as needed.

    The function returns a dictionary that can be directly unpacked and passed
    to the `@pytest.mark.parametrize` decorator.

    Arguments:
      defaults (Dict[str, Any]): A dictionary of default parameter names
                                 and their values. These values will be added
                                 to each case unless the case already defines
                                 a value for each parameter.
      cases (List[ParameterSet]): A list of `pytest.param` objects
                                  representing different test cases.
                                  Its first argument must be a dictionary
                                  defining parameter names and values.
      parametrize_kwargs (Any): Additional keyword arguments to be passed to
                              `@pytest.mark.parametrize`. These arguments are
                              not modified by this function and are passed
                              through unchanged.

    Returns:
      Dict[str, Any]: A dictionary with the following structure:
          `argnames`: A list of parameter names.
          `argvalues`: A list of test cases with modified parameter values.
          `parametrize_kwargs`: Additional keyword arguments passed
                                through unchanged.


    Example:
    ```python
    @pytest.mark.parametrize(**extend_with_defaults(
        defaults=dict(
            min_value=0,  # default minimum value is 0
            max_value=100, # default maximum value is 100
            average=50,  # default average value is 50
        ),

        cases=[
            pytest.param(
                dict(),  # use default
                values id='default_case',
            ),

            pytest.param(
                dict(min_value=10),  # override with min_value=10
                id='min_value_10',
            ),

            pytest.param(
                dict(max_value=200),  # override with max_value=200
                id='max_value_200',
            ),

            pytest.param(
                dict(min_value=-10, max_value=50),  # override both min_value
                                                    # and max_value
                id='min_-10_max_50',
            ),

            pytest.param(
                # all defaults are overridden
                dict(min_value=20, max_value=80, average=50),
                id="min_20_max_80_avg_50",
            ),

            pytest.param(
                dict(min_value=100, max_value=0),  # invalid range
                id='invalid_range',
                marks=pytest.mark.xfail(reason='invalid range'),
            )
        ],
    ))
    def test_range(min_value, max_value, average):
        assert min_value <= max_value
        assert min_value <= average <= max_value
    ```

    The above test will execute with the following sets of parameters:

    ```python
    "default_case": {"min_value": 0, "max_value": 100, "average": 50}
    "min_value_10": {"min_value": 10, "max_value": 100, "average": 50}
    "max_value_200": {"min_value": 0, "max_value": 200, "average": 50}
    "min_-10_max_50": {"min_value": -10, "max_value": 50, "average": 50}
    "min_20_max_80_avg_50": {"min_value": 20, "max_value": 80, "average": 50}
    # expected to fail
    "invalid_range": {"min_value": 100, "max_value": 0, "average": 50}
    ```

    Notes:
    - Each case in `cases` must contain exactly one value, which is a
      dictionary of parameter values.
    - The function performs an in-place update of the `cases` list, so
      the original `cases` list is modified.

    """
    for i, case in enumerate(cases):
        if not (len(case.values) == 1 and isinstance(case.values[0], dict)):
            raise ValueError(
                "each case must contain exactly one value; "
                "a dict of parameter values"
            )
        if set(case.values[0].keys()) - set(defaults.keys()):
            raise UnknownParameterInCasesError()
        # Overwrite values in defaults if the parameter is present in the test
        # case values
        merged_params = {**defaults, **case.values[0]}
        cases[i] = pytest.param(
            *merged_params.values(), id=case.id, marks=case.marks
        )

    return {
        "argnames": list(defaults),
        "argvalues": cases,
        **parametrize_kwargs,
    }

get_current_commit_hash_or_tag(repo_path='.', shorten_hash=False)

Get the latest commit tag or commit hash from the repository.

If a tag points to the current commit, return the tag name. If no tag exists: - If shorten_hash is True, return the first 8 characters of the commit hash. - Otherwise, return the full commit hash.

Source code in packages/testing/src/execution_testing/tools/utility/versioning.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def get_current_commit_hash_or_tag(
    repo_path: str = ".", shorten_hash: bool = False
) -> str:
    """
    Get the latest commit tag or commit hash from the repository.

    If a tag points to the current commit, return the tag name. If no tag
    exists:
      - If shorten_hash is True, return the first 8 characters of the
        commit hash.
      - Otherwise, return the full commit hash.
    """
    try:
        repo = Repo(repo_path)
        current_commit = repo.head.commit
        # Check if current commit has a tag using lookup
        for tag in repo.tags:
            if tag.commit == current_commit:
                return tag.name
        # No tag found, return commit hash
        return (
            current_commit.hexsha[:8]
            if shorten_hash
            else current_commit.hexsha
        )
    except InvalidGitRepositoryError:
        # Handle the case where the repository is not a valid Git repository
        return "Not a git repository; only seen in framework tests."