Skip to content

Ethereum Test Tools Package

Module containing tools for generating cross-client Ethereum execution layer tests.

CalldataCase

Bases: Case

Small helper class to represent a single case whose condition depends on the value of the contract's calldata in a Switch case statement.

By default the calldata is read from position zero, but this can be overridden using position.

The condition is generated automatically based on the value (and optionally position) and may not be set directly.

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
class CalldataCase(Case):
    """
    Small helper class to represent a single case whose condition depends on
    the value of the contract's calldata in a Switch case statement.

    By default the calldata is read from position zero, but this can be
    overridden using `position`.

    The `condition` is generated automatically based on the `value` (and
    optionally `position`) and may not be set directly.
    """

    def __init__(
        self, value: int | str | Bytecode, position: int = 0, **kwargs: Any
    ) -> None:
        """Generate the condition base on `value` and `position`."""
        condition = Op.EQ(Op.CALLDATALOAD(position), value)
        super().__init__(condition=condition, **kwargs)

__init__(value, position=0, **kwargs)

Generate the condition base on value and position.

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
334
335
336
337
338
339
def __init__(
    self, value: int | str | Bytecode, position: int = 0, **kwargs: Any
) -> None:
    """Generate the condition base on `value` and `position`."""
    condition = Op.EQ(Op.CALLDATALOAD(position), value)
    super().__init__(condition=condition, **kwargs)

Case dataclass

Small helper class to represent a single, generic case in a Switch cases list.

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
@dataclass(kw_only=True, slots=True)
class Case:
    """
    Small helper class to represent a single, generic case in a `Switch` cases
    list.
    """

    condition: Bytecode | Op
    action: Bytecode | Op
    terminating: bool | None = None

    @property
    def is_terminating(self) -> bool:
        """Returns whether the case is terminating."""
        return (
            self.terminating
            if self.terminating is not None
            else self.action.terminating
        )

is_terminating property

Returns whether the case is terminating.

CodeGasMeasure

Bases: Bytecode

Helper class used to generate bytecode that measures gas usage of a bytecode, taking into account and subtracting any extra overhead gas costs required to execute. By default, the result gas calculation is saved to storage key 0.

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
class CodeGasMeasure(Bytecode):
    """
    Helper class used to generate bytecode that measures gas usage of a
    bytecode, taking into account and subtracting any extra overhead gas costs
    required to execute. By default, the result gas calculation is saved to
    storage key 0.
    """

    code: Bytecode
    """
    Bytecode to be executed to measure the gas usage.
    """
    overhead_cost: int
    """
    Extra gas cost to be subtracted from extra operations.
    """
    extra_stack_items: int
    """
    Extra stack items that remain at the end of the execution.
    To be considered when subtracting the value of the previous GAS operation,
    and to be popped at the end of the execution.
    """
    sstore_key: int | Bytes
    """
    Storage key to save the gas used.
    """

    def __new__(
        cls,
        *,
        code: Bytecode,
        overhead_cost: int = 0,
        extra_stack_items: int = 0,
        sstore_key: int | Bytes = 0,
        stop: bool = True,
    ) -> Self:
        """Assemble the bytecode that measures gas usage."""
        res = Op.GAS + code + Op.GAS
        # We need to swap and pop for each extra stack item that remained from
        # the execution of the code
        res += (Op.SWAP1 + Op.POP) * extra_stack_items
        res += (
            Op.SWAP1
            + Op.SUB
            + Op.PUSH1[overhead_cost]
            + Op.GAS
            + Op.GAS
            + Op.SWAP1
            + Op.SUB
            + Op.ADD
            + Op.SWAP1
            + Op.SSTORE(sstore_key, Op.SUB)
        )
        if stop:
            res += Op.STOP

        instance = super().__new__(cls, res)
        instance.code = code
        instance.overhead_cost = overhead_cost
        instance.extra_stack_items = extra_stack_items
        instance.sstore_key = sstore_key
        return instance

code instance-attribute

Bytecode to be executed to measure the gas usage.

overhead_cost instance-attribute

Extra gas cost to be subtracted from extra operations.

extra_stack_items instance-attribute

Extra stack items that remain at the end of the execution. To be considered when subtracting the value of the previous GAS operation, and to be popped at the end of the execution.

sstore_key instance-attribute

Storage key to save the gas used.

__new__(*, code, overhead_cost=0, extra_stack_items=0, sstore_key=0, stop=True)

Assemble the bytecode that measures gas usage.

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
def __new__(
    cls,
    *,
    code: Bytecode,
    overhead_cost: int = 0,
    extra_stack_items: int = 0,
    sstore_key: int | Bytes = 0,
    stop: bool = True,
) -> Self:
    """Assemble the bytecode that measures gas usage."""
    res = Op.GAS + code + Op.GAS
    # We need to swap and pop for each extra stack item that remained from
    # the execution of the code
    res += (Op.SWAP1 + Op.POP) * extra_stack_items
    res += (
        Op.SWAP1
        + Op.SUB
        + Op.PUSH1[overhead_cost]
        + Op.GAS
        + Op.GAS
        + Op.SWAP1
        + Op.SUB
        + Op.ADD
        + Op.SWAP1
        + Op.SSTORE(sstore_key, Op.SUB)
    )
    if stop:
        res += Op.STOP

    instance = super().__new__(cls, res)
    instance.code = code
    instance.overhead_cost = overhead_cost
    instance.extra_stack_items = extra_stack_items
    instance.sstore_key = sstore_key
    return instance

Conditional

Bases: Bytecode

Helper class used to generate conditional bytecode.

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
class Conditional(Bytecode):
    """Helper class used to generate conditional bytecode."""

    def __new__(
        cls,
        *,
        condition: Bytecode | Op,
        if_true: Bytecode | Op | None = None,
        if_false: Bytecode | Op | None = None,
    ) -> Self:
        """
        Assemble the conditional bytecode by generating the necessary jump and
        jumpdest opcodes surrounding the condition and the two possible
        execution paths.

        In the future, PC usage should be replaced by using RJUMP and RJUMPI
        """
        if if_true is None:
            if_true = Bytecode()
        if if_false is None:
            if_false = Bytecode()

        # First we append a jumpdest to the start of the true branch
        if_true = Op.JUMPDEST + if_true

        # Then we append the unconditional jump to the end of the false
        # branch, used to skip the true branch
        if_false += Op.JUMP(Op.ADD(Op.PC, len(if_true) + 3))

        # Then we need to do the conditional jump by skipping the false
        # branch
        condition = Op.JUMPI(Op.ADD(Op.PC, len(if_false) + 3), condition)

        # Finally we append the condition, false and true branches, plus
        # the jumpdest at the very end
        bytecode = condition + if_false + if_true + Op.JUMPDEST

        return super().__new__(cls, bytecode)

__new__(*, condition, if_true=None, if_false=None)

Assemble the conditional bytecode by generating the necessary jump and jumpdest opcodes surrounding the condition and the two possible execution paths.

In the future, PC usage should be replaced by using RJUMP and RJUMPI

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
def __new__(
    cls,
    *,
    condition: Bytecode | Op,
    if_true: Bytecode | Op | None = None,
    if_false: Bytecode | Op | None = None,
) -> Self:
    """
    Assemble the conditional bytecode by generating the necessary jump and
    jumpdest opcodes surrounding the condition and the two possible
    execution paths.

    In the future, PC usage should be replaced by using RJUMP and RJUMPI
    """
    if if_true is None:
        if_true = Bytecode()
    if if_false is None:
        if_false = Bytecode()

    # First we append a jumpdest to the start of the true branch
    if_true = Op.JUMPDEST + if_true

    # Then we append the unconditional jump to the end of the false
    # branch, used to skip the true branch
    if_false += Op.JUMP(Op.ADD(Op.PC, len(if_true) + 3))

    # Then we need to do the conditional jump by skipping the false
    # branch
    condition = Op.JUMPI(Op.ADD(Op.PC, len(if_false) + 3), condition)

    # Finally we append the condition, false and true branches, plus
    # the jumpdest at the very end
    bytecode = condition + if_false + if_true + Op.JUMPDEST

    return super().__new__(cls, bytecode)

Create2PreimageLayout

Bases: Bytecode

Set up the preimage in memory for CREATE2 address computation.

Creates the standard memory layout required to compute a CREATE2 address using keccak256(0xFF ++ factory_address ++ salt ++ init_code_hash).

Memory layout after execution: - MEM[offset + 0: offset + 32] = zero padding + factory_address (20 bytes) - MEM[offset + 11] = 0xFF prefix byte - MEM[offset + 32: offset + 64] = salt (32 bytes) - MEM[offset + 64: offset + 96] = init_code_hash (32 bytes)

To compute the CREATE2 address, use: .address_op or Op.SHA3(offset + 11, 85). The resulting hash's lower 20 bytes (bytes 12-31) form the address.

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
class Create2PreimageLayout(Bytecode):
    """
    Set up the preimage in memory for CREATE2 address computation.

    Creates the standard memory layout required to compute a CREATE2 address
    using keccak256(0xFF ++ factory_address ++ salt ++ init_code_hash).

    Memory layout after execution:
    - MEM[offset + 0: offset + 32] = zero padding + factory_address (20 bytes)
    - MEM[offset + 11] = 0xFF prefix byte
    - MEM[offset + 32: offset + 64] = salt (32 bytes)
    - MEM[offset + 64: offset + 96] = init_code_hash (32 bytes)

    To compute the CREATE2 address, use: `.address_op` or
    `Op.SHA3(offset + 11, 85)`.
    The resulting hash's lower 20 bytes (bytes 12-31) form the address.
    """

    offset: int = 0

    def __new__(
        cls,
        *,
        factory_address: int | bytes | Bytecode,
        salt: int | bytes | Bytecode,
        init_code_hash: int | bytes | Bytecode,
        offset: int = 0,
        old_memory_size: int = 0,
    ) -> Self:
        """
        Assemble the bytecode that sets up the memory layout for CREATE2
        address computation.
        """
        required_size = offset + 96
        new_memory_size = max(old_memory_size, required_size)
        bytecode = (
            Op.MSTORE(offset=offset, value=factory_address)
            + Op.MSTORE8(offset=offset + 11, value=0xFF)
            + Op.MSTORE(offset=offset + 32, value=salt)
            + Op.MSTORE(
                offset=offset + 64,
                value=init_code_hash,
                # Gas accounting
                old_memory_size=old_memory_size,
                new_memory_size=new_memory_size,
            )
        )
        instance = super().__new__(cls, bytecode)
        instance.offset = offset
        return instance

    @property
    def salt_offset(self) -> int:
        """
        Return the salt memory offset of the preimage.
        """
        return self.offset + 32

    def address_op(self) -> Bytecode:
        """
        Return the bytecode that computes the CREATE2 address.
        """
        return Op.SHA3(
            offset=self.offset + 11,
            size=85,
            # Gas accounting
            data_size=85,
        )

    def increment_salt_op(self, increment: int = 1) -> Bytecode:
        """Return the bytecode that increments the current salt."""
        return Op.MSTORE(
            self.salt_offset,
            Op.ADD(Op.MLOAD(self.salt_offset), increment),
        )

__new__(*, factory_address, salt, init_code_hash, offset=0, old_memory_size=0)

Assemble the bytecode that sets up the memory layout for CREATE2 address computation.

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
def __new__(
    cls,
    *,
    factory_address: int | bytes | Bytecode,
    salt: int | bytes | Bytecode,
    init_code_hash: int | bytes | Bytecode,
    offset: int = 0,
    old_memory_size: int = 0,
) -> Self:
    """
    Assemble the bytecode that sets up the memory layout for CREATE2
    address computation.
    """
    required_size = offset + 96
    new_memory_size = max(old_memory_size, required_size)
    bytecode = (
        Op.MSTORE(offset=offset, value=factory_address)
        + Op.MSTORE8(offset=offset + 11, value=0xFF)
        + Op.MSTORE(offset=offset + 32, value=salt)
        + Op.MSTORE(
            offset=offset + 64,
            value=init_code_hash,
            # Gas accounting
            old_memory_size=old_memory_size,
            new_memory_size=new_memory_size,
        )
    )
    instance = super().__new__(cls, bytecode)
    instance.offset = offset
    return instance

salt_offset property

Return the salt memory offset of the preimage.

address_op()

Return the bytecode that computes the CREATE2 address.

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
496
497
498
499
500
501
502
503
504
505
def address_op(self) -> Bytecode:
    """
    Return the bytecode that computes the CREATE2 address.
    """
    return Op.SHA3(
        offset=self.offset + 11,
        size=85,
        # Gas accounting
        data_size=85,
    )

increment_salt_op(increment=1)

Return the bytecode that increments the current salt.

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
507
508
509
510
511
512
def increment_salt_op(self, increment: int = 1) -> Bytecode:
    """Return the bytecode that increments the current salt."""
    return Op.MSTORE(
        self.salt_offset,
        Op.ADD(Op.MLOAD(self.salt_offset), increment),
    )

CreatePreimageLayout

Bases: Bytecode

Set up the preimage in memory for CREATE address computation.

Create the standard memory layout required to compute a CREATE address using keccak256(rlp.encode([sender_address, nonce])).

Memory layout after execution: - MEM[offset + 10] = RLP list prefix byte - MEM[offset + 11] = 0x94 (RLP prefix for 20-byte string) - MEM[offset + 12: offset + 32] = sender_address (20 bytes) - MEM[offset + 32:] = RLP-encoded nonce bytes - MEM[offset + 64: offset + 96] = preimage_size - MEM[offset + 96: offset + 128] = raw nonce (scratch)

Supported nonce range: 1 to 2^64 - 1. Requires Osaka (CLZ opcode).

To compute the CREATE address, use .address_op(). The resulting hash's lower 20 bytes form the address.

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
class CreatePreimageLayout(Bytecode):
    """
    Set up the preimage in memory for CREATE address computation.

    Create the standard memory layout required to compute a CREATE
    address using keccak256(rlp.encode([sender_address, nonce])).

    Memory layout after execution:
    - MEM[offset + 10] = RLP list prefix byte
    - MEM[offset + 11] = 0x94 (RLP prefix for 20-byte string)
    - MEM[offset + 12: offset + 32] = sender_address (20 bytes)
    - MEM[offset + 32:] = RLP-encoded nonce bytes
    - MEM[offset + 64: offset + 96] = preimage_size
    - MEM[offset + 96: offset + 128] = raw nonce (scratch)

    Supported nonce range: 1 to 2^64 - 1. Requires Osaka
    (CLZ opcode).

    To compute the CREATE address, use `.address_op()`.
    The resulting hash's lower 20 bytes form the address.
    """

    offset: int = 0
    _preimage_size_offset: int = 0
    _nonce_scratch_offset: int = 0

    def __new__(
        cls,
        *,
        sender_address: int | bytes | Bytecode,
        nonce: int | bytes | Bytecode,
        offset: int = 0,
        old_memory_size: int = 0,
    ) -> Self:
        """
        Assemble the bytecode that sets up the memory layout for
        CREATE address computation.
        """
        required_size = offset + 128
        new_memory_size = max(old_memory_size, required_size)

        bytecode = Op.MSTORE(offset=offset, value=sender_address)
        bytecode += Op.MSTORE8(offset=offset + 11, value=0x94)
        bytecode += _dynamic_nonce_encode_bytecode(
            nonce,
            offset,
            old_memory_size=old_memory_size,
            new_memory_size=new_memory_size,
        )

        instance = super().__new__(cls, bytecode)
        instance.offset = offset
        instance._preimage_size_offset = offset + 64
        instance._nonce_scratch_offset = offset + 96
        return instance

    @property
    def nonce_offset(self) -> int:
        """Return the nonce memory offset of the preimage."""
        return self.offset + 32

    def address_op(self) -> Bytecode:
        """Return bytecode that computes the CREATE address."""
        address_mask = (1 << 160) - 1
        return Op.AND(
            address_mask,
            Op.SHA3(
                offset=self.offset + 10,
                size=Op.MLOAD(self._preimage_size_offset),
                data_size=25,
            ),
        )

    def set_nonce_op(self, nonce: int | Bytecode) -> Bytecode:
        """
        Re-encode a nonce and update the memory layout.

        Update the RLP list prefix, encoded nonce, and
        preimage_size in memory. The sender address and 0x94
        prefix are unchanged.
        """
        return _dynamic_nonce_encode_bytecode(nonce, self.offset)

    def increment_nonce_op(self, increment: int = 1) -> Bytecode:
        """
        Increment the nonce, re-encode, and update memory.

        Read the raw nonce from scratch memory, add the
        increment, and re-encode.
        """
        new_nonce = Op.ADD(Op.MLOAD(self._nonce_scratch_offset), increment)
        return _dynamic_nonce_encode_bytecode(new_nonce, self.offset)

__new__(*, sender_address, nonce, offset=0, old_memory_size=0)

Assemble the bytecode that sets up the memory layout for CREATE address computation.

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
def __new__(
    cls,
    *,
    sender_address: int | bytes | Bytecode,
    nonce: int | bytes | Bytecode,
    offset: int = 0,
    old_memory_size: int = 0,
) -> Self:
    """
    Assemble the bytecode that sets up the memory layout for
    CREATE address computation.
    """
    required_size = offset + 128
    new_memory_size = max(old_memory_size, required_size)

    bytecode = Op.MSTORE(offset=offset, value=sender_address)
    bytecode += Op.MSTORE8(offset=offset + 11, value=0x94)
    bytecode += _dynamic_nonce_encode_bytecode(
        nonce,
        offset,
        old_memory_size=old_memory_size,
        new_memory_size=new_memory_size,
    )

    instance = super().__new__(cls, bytecode)
    instance.offset = offset
    instance._preimage_size_offset = offset + 64
    instance._nonce_scratch_offset = offset + 96
    return instance

nonce_offset property

Return the nonce memory offset of the preimage.

address_op()

Return bytecode that computes the CREATE address.

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
682
683
684
685
686
687
688
689
690
691
692
def address_op(self) -> Bytecode:
    """Return bytecode that computes the CREATE address."""
    address_mask = (1 << 160) - 1
    return Op.AND(
        address_mask,
        Op.SHA3(
            offset=self.offset + 10,
            size=Op.MLOAD(self._preimage_size_offset),
            data_size=25,
        ),
    )

set_nonce_op(nonce)

Re-encode a nonce and update the memory layout.

Update the RLP list prefix, encoded nonce, and preimage_size in memory. The sender address and 0x94 prefix are unchanged.

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
694
695
696
697
698
699
700
701
702
def set_nonce_op(self, nonce: int | Bytecode) -> Bytecode:
    """
    Re-encode a nonce and update the memory layout.

    Update the RLP list prefix, encoded nonce, and
    preimage_size in memory. The sender address and 0x94
    prefix are unchanged.
    """
    return _dynamic_nonce_encode_bytecode(nonce, self.offset)

increment_nonce_op(increment=1)

Increment the nonce, re-encode, and update memory.

Read the raw nonce from scratch memory, add the increment, and re-encode.

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
704
705
706
707
708
709
710
711
712
def increment_nonce_op(self, increment: int = 1) -> Bytecode:
    """
    Increment the nonce, re-encode, and update memory.

    Read the raw nonce from scratch memory, add the
    increment, and re-encode.
    """
    new_nonce = Op.ADD(Op.MLOAD(self._nonce_scratch_offset), increment)
    return _dynamic_nonce_encode_bytecode(new_nonce, self.offset)

FixedIterationsBytecode

Bases: IteratingBytecode

Bytecode that contains a setup phase, an iterating phase, and a cleanup phase, with a fixed number of iterations.

This type can be used in place of a normal Bytecode and will return the appropriate gas cost for the given number of iterations.

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
class FixedIterationsBytecode(IteratingBytecode):
    """
    Bytecode that contains a setup phase, an iterating phase, and a cleanup
    phase, with a fixed number of iterations.

    This type can be used in place of a normal Bytecode and will return the
    appropriate gas cost for the given number of iterations.
    """

    iteration_count: int
    """The fixed number of times the iterating bytecode will be executed."""

    def __new__(
        cls,
        *,
        setup: Bytecode,
        iterating: Bytecode,
        cleanup: Bytecode,
        iteration_count: int,
        warm_iterating: Bytecode | None = None,
        iterating_subcall: Bytecode | int | None = None,
    ) -> Self:
        """
        Create a new FixedIterationsBytecode instance.

        Args:
            setup: Bytecode executed once at the beginning before
                iterations start.
            iterating: Bytecode executed in the first iteration.
            cleanup: Bytecode executed once at the end after all
                iterations complete.
            iteration_count: The fixed number of times the iterating
                bytecode will be executed.
            warm_iterating: Bytecode executed in subsequent iterations
                after the first. If None, uses the same bytecode as
                iterating.
            iterating_subcall: Analytical bytecode representing a subcall
                performed during each iteration. This bytecode is _not_
                included in the final bytecode, and it's only used for gas
                calculation. The value can also be an integer, in which case it
                represents the gas cost of the subcall (e.g. the subcall is a
                precompiled contract).

        Returns:
            A new FixedIterationsBytecode instance.

        """
        instance = super(FixedIterationsBytecode, cls).__new__(
            cls,
            setup=setup,
            iterating=iterating,
            cleanup=cleanup,
            warm_iterating=warm_iterating,
            iterating_subcall=iterating_subcall,
        )
        instance.iteration_count = iteration_count
        return instance

    def gas_cost(self, fork: Type[ForkOpcodeInterface]) -> int:
        """Return the cost of iterating through the bytecode N times."""
        return self.gas_cost_by_iteration_count(
            fork=fork,
            iteration_count=self.iteration_count,
        )

iteration_count instance-attribute

The fixed number of times the iterating bytecode will be executed.

__new__(*, setup, iterating, cleanup, iteration_count, warm_iterating=None, iterating_subcall=None)

Create a new FixedIterationsBytecode instance.

Parameters:

Name Type Description Default
setup Bytecode

Bytecode executed once at the beginning before iterations start.

required
iterating Bytecode

Bytecode executed in the first iteration.

required
cleanup Bytecode

Bytecode executed once at the end after all iterations complete.

required
iteration_count int

The fixed number of times the iterating bytecode will be executed.

required
warm_iterating Bytecode | None

Bytecode executed in subsequent iterations after the first. If None, uses the same bytecode as iterating.

None
iterating_subcall Bytecode | int | None

Analytical bytecode representing a subcall performed during each iteration. This bytecode is not included in the final bytecode, and it's only used for gas calculation. The value can also be an integer, in which case it represents the gas cost of the subcall (e.g. the subcall is a precompiled contract).

None

Returns:

Type Description
Self

A new FixedIterationsBytecode instance.

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
def __new__(
    cls,
    *,
    setup: Bytecode,
    iterating: Bytecode,
    cleanup: Bytecode,
    iteration_count: int,
    warm_iterating: Bytecode | None = None,
    iterating_subcall: Bytecode | int | None = None,
) -> Self:
    """
    Create a new FixedIterationsBytecode instance.

    Args:
        setup: Bytecode executed once at the beginning before
            iterations start.
        iterating: Bytecode executed in the first iteration.
        cleanup: Bytecode executed once at the end after all
            iterations complete.
        iteration_count: The fixed number of times the iterating
            bytecode will be executed.
        warm_iterating: Bytecode executed in subsequent iterations
            after the first. If None, uses the same bytecode as
            iterating.
        iterating_subcall: Analytical bytecode representing a subcall
            performed during each iteration. This bytecode is _not_
            included in the final bytecode, and it's only used for gas
            calculation. The value can also be an integer, in which case it
            represents the gas cost of the subcall (e.g. the subcall is a
            precompiled contract).

    Returns:
        A new FixedIterationsBytecode instance.

    """
    instance = super(FixedIterationsBytecode, cls).__new__(
        cls,
        setup=setup,
        iterating=iterating,
        cleanup=cleanup,
        warm_iterating=warm_iterating,
        iterating_subcall=iterating_subcall,
    )
    instance.iteration_count = iteration_count
    return instance

gas_cost(fork)

Return the cost of iterating through the bytecode N times.

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
1354
1355
1356
1357
1358
1359
def gas_cost(self, fork: Type[ForkOpcodeInterface]) -> int:
    """Return the cost of iterating through the bytecode N times."""
    return self.gas_cost_by_iteration_count(
        fork=fork,
        iteration_count=self.iteration_count,
    )

Initcode

Bases: Bytecode

Helper class used to generate initcode for the specified deployment code.

The execution gas cost of the initcode is calculated, and also the deployment gas costs for the deployed code.

The initcode can be padded to a certain length if necessary, which does not affect the deployed code.

Other costs such as the CREATE2 hashing costs or the initcode_word_cost of EIP-3860 are not taken into account by any of these calculated costs.

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
class Initcode(Bytecode):
    """
    Helper class used to generate initcode for the specified deployment code.

    The execution gas cost of the initcode is calculated, and also the
    deployment gas costs for the deployed code.

    The initcode can be padded to a certain length if necessary, which does not
    affect the deployed code.

    Other costs such as the CREATE2 hashing costs or the initcode_word_cost of
    EIP-3860 are *not* taken into account by any of these calculated costs.
    """

    deploy_code: Bytes | Bytecode
    """
    Bytecode to be deployed by the initcode.
    """

    def __new__(
        cls,
        *,
        deploy_code: Bytecode | SupportsBytes | None = None,
        initcode_length: int | None = None,
        initcode_prefix: Bytecode | None = None,
        padding_byte: int = 0x00,
        name: str = "",
    ) -> Self:
        """
        Generate an initcode that returns a contract with the specified code.
        The initcode can be padded to a specified length for testing purposes.
        """
        if deploy_code is None:
            deploy_code = Bytecode()
        elif not isinstance(deploy_code, Bytecode):
            deploy_code = Bytes(deploy_code)
        if initcode_prefix is None:
            initcode_prefix = Bytecode()

        initcode = initcode_prefix
        code_length = len(deploy_code)

        # PUSH2: length=<bytecode length>
        initcode += Op.PUSH2(code_length)

        # PUSH1: offset=0
        initcode += Op.PUSH1(0)

        # DUP2
        initcode += Op.DUP2

        # PUSH1: initcode_length=11 + len(initcode_prefix_bytes) (constant)
        no_prefix_length = 0x0B
        assert no_prefix_length + len(initcode_prefix) <= 0xFF, (
            "initcode prefix too long"
        )
        initcode += Op.PUSH1(no_prefix_length + len(initcode_prefix))

        # DUP3
        initcode += Op.DUP3

        # CODECOPY: destinationOffset=0, offset=0, length
        initcode += Op.CODECOPY(
            data_size=code_length, new_memory_size=code_length
        )

        # RETURN: offset=0, length
        initcode += Op.RETURN(code_deposit_size=len(deploy_code))

        initcode_plus_deploy_code = bytes(initcode) + bytes(deploy_code)
        padding_bytes = bytes()

        if initcode_length is not None:
            assert initcode_length >= len(initcode_plus_deploy_code), (
                "specified invalid length for initcode"
            )

            padding_bytes = bytes(
                [padding_byte]
                * (initcode_length - len(initcode_plus_deploy_code))
            )

        initcode_bytes = initcode_plus_deploy_code + padding_bytes
        instance = super().__new__(
            cls,
            initcode_bytes,
            popped_stack_items=initcode.popped_stack_items,
            pushed_stack_items=initcode.pushed_stack_items,
            max_stack_height=initcode.max_stack_height,
            min_stack_height=initcode.min_stack_height,
            name=name,
            opcode_list=initcode.opcode_list,
        )
        instance.deploy_code = deploy_code

        return instance

    def execution_gas(self, fork: Type[ForkOpcodeInterface]) -> int:
        """
        Gas cost of executing the initcode, charged before the code
        deposit fee.
        """
        return self.gas_cost(fork) - self.deployment_gas(fork)

    def deployment_gas(self, fork: Type[ForkOpcodeInterface]) -> int:
        """
        Gas cost of deploying the contract.
        """
        return Op.RETURN(code_deposit_size=len(self.deploy_code)).gas_cost(
            fork
        )

deploy_code instance-attribute

Bytecode to be deployed by the initcode.

__new__(*, deploy_code=None, initcode_length=None, initcode_prefix=None, padding_byte=0, name='')

Generate an initcode that returns a contract with the specified code. The initcode can be padded to a specified length for testing purposes.

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
def __new__(
    cls,
    *,
    deploy_code: Bytecode | SupportsBytes | None = None,
    initcode_length: int | None = None,
    initcode_prefix: Bytecode | None = None,
    padding_byte: int = 0x00,
    name: str = "",
) -> Self:
    """
    Generate an initcode that returns a contract with the specified code.
    The initcode can be padded to a specified length for testing purposes.
    """
    if deploy_code is None:
        deploy_code = Bytecode()
    elif not isinstance(deploy_code, Bytecode):
        deploy_code = Bytes(deploy_code)
    if initcode_prefix is None:
        initcode_prefix = Bytecode()

    initcode = initcode_prefix
    code_length = len(deploy_code)

    # PUSH2: length=<bytecode length>
    initcode += Op.PUSH2(code_length)

    # PUSH1: offset=0
    initcode += Op.PUSH1(0)

    # DUP2
    initcode += Op.DUP2

    # PUSH1: initcode_length=11 + len(initcode_prefix_bytes) (constant)
    no_prefix_length = 0x0B
    assert no_prefix_length + len(initcode_prefix) <= 0xFF, (
        "initcode prefix too long"
    )
    initcode += Op.PUSH1(no_prefix_length + len(initcode_prefix))

    # DUP3
    initcode += Op.DUP3

    # CODECOPY: destinationOffset=0, offset=0, length
    initcode += Op.CODECOPY(
        data_size=code_length, new_memory_size=code_length
    )

    # RETURN: offset=0, length
    initcode += Op.RETURN(code_deposit_size=len(deploy_code))

    initcode_plus_deploy_code = bytes(initcode) + bytes(deploy_code)
    padding_bytes = bytes()

    if initcode_length is not None:
        assert initcode_length >= len(initcode_plus_deploy_code), (
            "specified invalid length for initcode"
        )

        padding_bytes = bytes(
            [padding_byte]
            * (initcode_length - len(initcode_plus_deploy_code))
        )

    initcode_bytes = initcode_plus_deploy_code + padding_bytes
    instance = super().__new__(
        cls,
        initcode_bytes,
        popped_stack_items=initcode.popped_stack_items,
        pushed_stack_items=initcode.pushed_stack_items,
        max_stack_height=initcode.max_stack_height,
        min_stack_height=initcode.min_stack_height,
        name=name,
        opcode_list=initcode.opcode_list,
    )
    instance.deploy_code = deploy_code

    return instance

execution_gas(fork)

Gas cost of executing the initcode, charged before the code deposit fee.

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
111
112
113
114
115
116
def execution_gas(self, fork: Type[ForkOpcodeInterface]) -> int:
    """
    Gas cost of executing the initcode, charged before the code
    deposit fee.
    """
    return self.gas_cost(fork) - self.deployment_gas(fork)

deployment_gas(fork)

Gas cost of deploying the contract.

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
118
119
120
121
122
123
124
def deployment_gas(self, fork: Type[ForkOpcodeInterface]) -> int:
    """
    Gas cost of deploying the contract.
    """
    return Op.RETURN(code_deposit_size=len(self.deploy_code)).gas_cost(
        fork
    )

IteratingBytecode

Bases: Bytecode

Bytecode composed of distinct execution phases: setup, iteration, and cleanup.

Some phases (warm_iterating and iterating_subcall) are analytical only and exist solely to model gas costs; they are not emitted in the final bytecode.

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
class IteratingBytecode(Bytecode):
    """
    Bytecode composed of distinct execution phases: setup, iteration, and
    cleanup.

    Some phases (warm_iterating and iterating_subcall) are analytical only and
    exist solely to model gas costs; they are not emitted in the final
    bytecode.
    """

    setup: Bytecode
    """Bytecode executed once at the beginning before iterations start."""
    iterating: Bytecode
    """Bytecode executed in the first iteration."""
    warm_iterating: Bytecode
    """
    Analytical bytecode representing subsequent iterations after the first
    (warm state).
    This bytecode is _not_ included in the final bytecode, and it's only
    used for the gas accounting properties of its opcodes and therefore gas
    calculation.
    """
    iterating_subcall: Bytecode | int
    """
    Analytical bytecode representing a subcall performed during each iteration.
    This bytecode is _not_ included in the final bytecode, and it's only
    used for gas calculation.

    The value can also be an integer, in which case it represents the gas cost
    of the subcall (e.g. the subcall is a precompiled contract)
    """
    cleanup: Bytecode
    """Bytecode executed once at the end after all iterations complete."""

    def __new__(
        cls,
        *,
        setup: Bytecode | None = None,
        iterating: Bytecode,
        cleanup: Bytecode | None = None,
        warm_iterating: Bytecode | None = None,
        iterating_subcall: Bytecode | int | None = None,
    ) -> Self:
        """
        Create a new iterating bytecode.

        Args:
            setup: Bytecode executed once at the beginning before
                iterations start.
            iterating: Bytecode executed in the first iteration.
            cleanup: Bytecode executed once at the end after all
                iterations complete.
            warm_iterating: Analytical bytecode representing subsequent
                iterations after the first (warm state).
            iterating_subcall: Analytical bytecode representing a subcall
                performed during each iteration. This bytecode is _not_
                included in the final bytecode, and it's only used for gas
                calculation. The value can also be an integer, in which case it
                represents the gas cost of the subcall (e.g. the subcall is a
                precompiled contract).

        Returns:
            A new IteratingBytecode instance.

        """
        instance = super(IteratingBytecode, cls).__new__(
            cls,
            setup + iterating + cleanup,
        )
        if setup is None:
            setup = Bytecode()
        instance.setup = setup
        instance.iterating = iterating
        if warm_iterating is None:
            instance.warm_iterating = iterating
        else:
            assert bytes(iterating) == bytes(warm_iterating), (
                "iterating and warm_iterating must have the same bytecode"
            )
            instance.warm_iterating = warm_iterating
        if iterating_subcall is None:
            instance.iterating_subcall = Bytecode()
        else:
            instance.iterating_subcall = iterating_subcall
        if cleanup is None:
            cleanup = Bytecode()
        instance.cleanup = cleanup
        return instance

    def iterating_subcall_gas_cost(
        self, *, fork: Type[ForkOpcodeInterface]
    ) -> int:
        """Return the gas cost of the iterating subcall."""
        if isinstance(self.iterating_subcall, int):
            return self.iterating_subcall
        return self.iterating_subcall.gas_cost(fork=fork)

    def iterating_subcall_reserve(
        self, *, fork: Type[ForkOpcodeInterface]
    ) -> int:
        """
        Return the gas reserve needed so that the last iterating subcall does
        not fail due to the 63/64 rule.
        """
        iterating_subcall_gas_cost = self.iterating_subcall_gas_cost(fork=fork)
        return (
            iterating_subcall_gas_cost * 64 // 63
        ) - iterating_subcall_gas_cost

    def gas_cost_by_iteration_count(
        self, *, fork: Type[ForkOpcodeInterface], iteration_count: int
    ) -> int:
        """Return the cost of iterating through the bytecode N times."""
        loop_gas_cost = 0
        if iteration_count > 0:
            # Cold cost is just charged for the first iteration
            loop_gas_cost = self.iterating.gas_cost(fork=fork)
            # Warm cost is charged for all iterations except the first
            loop_gas_cost += self.warm_iterating.gas_cost(fork=fork) * (
                iteration_count - 1
            )
            # Subcall cost is charged for all iterations.
            loop_gas_cost += (
                self.iterating_subcall_gas_cost(fork=fork) * iteration_count
            )
        return (
            self.setup.gas_cost(fork=fork)
            + loop_gas_cost
            + self.cleanup.gas_cost(fork=fork)
        )

    def with_fixed_iteration_count(
        self, *, iteration_count: int
    ) -> "FixedIterationsBytecode":
        """
        Return a new FixedIterationsBytecode with the iteration count fixed.
        """
        return FixedIterationsBytecode(
            setup=self.setup,
            iterating=self.iterating,
            cleanup=self.cleanup,
            warm_iterating=self.warm_iterating,
            iterating_subcall=self.iterating_subcall,
            iteration_count=iteration_count,
        )

    # Methods to calculate transactions that call a contract containing the
    # iterating bytecode.

    def tx_gas_cost_by_iteration_count(
        self,
        *,
        fork: Fork,
        iteration_count: int,
        start_iteration: int = 0,
        **intrinsic_cost_kwargs: Any,
    ) -> int:
        """
        Calculate the exact gas cost of a transaction calling the bytecode
        for a given number of iterations.

        The method accepts intrinsic gas cost kwargs to allow for the
        calculation of the intrinsic gas cost of the transaction.

        If any of the intrinsic gas cost kwarg is callable, it will be called
        with iteration_count and start_iteration as keyword arguments.
        """
        intrinsic_gas_cost_calc = fork.transaction_intrinsic_cost_calculator()
        if "data" in intrinsic_cost_kwargs:
            intrinsic_cost_kwargs["calldata"] = intrinsic_cost_kwargs.pop(
                "data"
            )
        if "authorization_list" in intrinsic_cost_kwargs:
            intrinsic_cost_kwargs["authorization_list_or_count"] = len(
                intrinsic_cost_kwargs.pop("authorization_list")
            )
        if "return_cost_deducted_prior_execution" not in intrinsic_cost_kwargs:
            intrinsic_cost_kwargs["return_cost_deducted_prior_execution"] = (
                True
            )
        for key, value in intrinsic_cost_kwargs.items():
            if callable(value):
                intrinsic_cost_kwargs[key] = value(
                    iteration_count=iteration_count,
                    start_iteration=start_iteration,
                )
        return self.gas_cost_by_iteration_count(
            fork=fork, iteration_count=iteration_count
        ) + intrinsic_gas_cost_calc(**intrinsic_cost_kwargs)

    def tx_gas_limit_by_iteration_count(
        self,
        *,
        fork: Fork,
        iteration_count: int,
        start_iteration: int = 0,
        **intrinsic_cost_kwargs: Any,
    ) -> int:
        """
        Calculate the minimum gas limit of a transaction calling the bytecode
        for a given number of iterations.

        The gas limit is calculated by adding the required extra gas for the
        last iteration due to the 63/64 rule.
        """
        return self.tx_gas_cost_by_iteration_count(
            fork=fork,
            iteration_count=iteration_count,
            start_iteration=start_iteration,
            **intrinsic_cost_kwargs,
        ) + self.iterating_subcall_reserve(fork=fork)

    def _binary_search_iterations(
        self,
        *,
        fork: Fork,
        gas_limit: int,
        start_iteration: int,
        **intrinsic_cost_kwargs: Any,
    ) -> Tuple[int, int]:
        """
        Binary search for the maximum iterations that fit within a gas limit.
        """
        single_iteration_gas = self.tx_gas_limit_by_iteration_count(
            fork=fork,
            iteration_count=1,
            start_iteration=start_iteration,
            **intrinsic_cost_kwargs,
        )
        if single_iteration_gas > gas_limit:
            raise ValueError(
                "Single iteration gas cost is greater than gas limit."
            )
        low = 1
        high = 2

        # Exponential search to find upper bound
        high_gas_cost = self.tx_gas_limit_by_iteration_count(
            fork=fork,
            iteration_count=high,
            start_iteration=start_iteration,
            **intrinsic_cost_kwargs,
        )
        while high_gas_cost < gas_limit:
            low = high
            high *= 2
            high_gas_cost = self.tx_gas_limit_by_iteration_count(
                fork=fork,
                iteration_count=high,
                start_iteration=start_iteration,
                **intrinsic_cost_kwargs,
            )

        # Binary search for exact fit
        best_iterations = 0
        while low < high:
            mid = (low + high) // 2

            if (
                self.tx_gas_limit_by_iteration_count(
                    fork=fork,
                    iteration_count=mid,
                    start_iteration=start_iteration,
                    **intrinsic_cost_kwargs,
                )
                > gas_limit
            ):
                high = mid
            else:
                low = mid + 1

        best_iterations = low - 1
        best_iterations_gas = self.tx_gas_limit_by_iteration_count(
            fork=fork,
            iteration_count=best_iterations,
            start_iteration=start_iteration,
            **intrinsic_cost_kwargs,
        )
        return best_iterations, best_iterations_gas

    def tx_iterations_by_gas_limit(
        self,
        *,
        fork: Fork,
        gas_limit: int,
        start_iteration: int = 0,
        **intrinsic_cost_kwargs: Any,
    ) -> Generator[int, None, None]:
        """
        Calculate the number of iterations needed to reach the given a
        gas-to-be-used value.

        Each element of the returned list represents the number of iterations
        for a single transaction.

        If the fork's transaction gas limit cap is not `None`, the returned
        list will contain one item per transaction that represents the
        iteration count for that transaction, and no transaction will exceed
        the gas limit cap.
        """
        gas_limit_cap = fork.transaction_gas_limit_cap()
        remaining_gas = gas_limit

        while remaining_gas >= self.tx_gas_limit_by_iteration_count(
            fork=fork,
            iteration_count=1,
            start_iteration=start_iteration,
            **intrinsic_cost_kwargs,
        ):
            # Binary search for the maximum number of iterations that fits
            # within remaining_gas
            max_gas_limit = (
                min(remaining_gas, gas_limit_cap)
                if gas_limit_cap is not None
                else remaining_gas
            )
            best_iterations, best_iterations_gas = (
                self._binary_search_iterations(
                    fork=fork,
                    gas_limit=max_gas_limit,
                    start_iteration=start_iteration,
                    **intrinsic_cost_kwargs,
                )
            )
            yield best_iterations
            remaining_gas -= best_iterations_gas
            start_iteration += best_iterations

    def _intrinsic_cost_is_constant(
        self,
        intrinsic_cost_kwargs: Dict[str, Any],
    ) -> bool:
        """If none of the kwarg values is callable, return True."""
        for _, value in intrinsic_cost_kwargs.items():
            if callable(value):
                return False
        return True

    def tx_iterations_by_total_iteration_count(
        self,
        *,
        fork: Fork,
        total_iterations: int,
        start_iteration: int = 0,
        **intrinsic_cost_kwargs: Any,
    ) -> Generator[int, None, None]:
        """
        Calculate how to split a total number of iterations across multiple
        transactions so that each transaction fits within the gas limit cap.

        Returns a list where each element represents the number of iterations
        for that transaction, and the sum equals total_iterations.
        """
        gas_limit_cap = fork.transaction_gas_limit_cap()
        if gas_limit_cap is None:
            # No limit, all iterations fit in a single transaction.
            yield total_iterations
            return
        remaining_iterations = total_iterations
        best_iterations: int | None = None
        constant_intrinsic_gas_cost = self._intrinsic_cost_is_constant(
            intrinsic_cost_kwargs
        )

        while remaining_iterations > 0:
            if best_iterations is None or not constant_intrinsic_gas_cost:
                best_iterations, _ = self._binary_search_iterations(
                    fork=fork,
                    gas_limit=gas_limit_cap,
                    start_iteration=start_iteration,
                    **intrinsic_cost_kwargs,
                )
            if best_iterations >= remaining_iterations:
                yield remaining_iterations
                return
            else:
                yield best_iterations
                remaining_iterations -= best_iterations
                start_iteration += best_iterations

    # Transaction generators that call the iterating bytecode with given
    # limits.

    def transactions_by_gas_limit(
        self,
        *,
        fork: Fork,
        gas_limit: int,
        start_iteration: int = 0,
        sender: EOA,
        to: Address | None,
        tx_gas_limit_delta: int = 0,
        **tx_kwargs: Any,
    ) -> Generator[TransactionWithCost, None, None]:
        """
        Generate a list of transactions calling the bytecode with a given gas
        limit.

        The method accepts all keyword arguments that can be passed to the
        `Transaction` constructor.

        If any of the keyword arguments is callable, it will be called with
        iteration_count and start_iteration as keyword arguments.
        E.g. when the calldata that needs to be passed to the iterating
        bytecode changes with each iteration, the calldata can be generated
        dynamically by passing a callable to the calldata keyword argument.

        The returned object also contains an extra field with the expected
        gas cost of the transaction by the end of execution.
        """
        intrinsic_cost_kwargs = tx_kwargs.copy()

        if "calldata" in tx_kwargs:
            tx_kwargs["data"] = tx_kwargs.pop("calldata")
        if "return_cost_deducted_prior_execution" in tx_kwargs:
            tx_kwargs.pop("return_cost_deducted_prior_execution")
        for iteration_count in self.tx_iterations_by_gas_limit(
            fork=fork,
            gas_limit=gas_limit,
            start_iteration=start_iteration,
            **intrinsic_cost_kwargs,
        ):
            tx_gas_limit = self.tx_gas_limit_by_iteration_count(
                fork=fork,
                iteration_count=iteration_count,
                start_iteration=start_iteration,
                **intrinsic_cost_kwargs,
            )
            tx_gas_cost = self.tx_gas_cost_by_iteration_count(
                fork=fork,
                iteration_count=iteration_count,
                start_iteration=start_iteration,
                **intrinsic_cost_kwargs,
            )
            current_tx_kwargs = tx_kwargs.copy()

            for key, value in current_tx_kwargs.items():
                if callable(value):
                    current_tx_kwargs[key] = value(
                        iteration_count=iteration_count,
                        start_iteration=start_iteration,
                    )
            yield TransactionWithCost(
                to=to,
                gas_limit=tx_gas_limit + tx_gas_limit_delta,
                sender=sender,
                gas_cost=tx_gas_cost,
                **current_tx_kwargs,
            )
            start_iteration += iteration_count

    def transactions_by_total_iteration_count(
        self,
        *,
        fork: Fork,
        total_iterations: int,
        start_iteration: int = 0,
        sender: EOA,
        to: Address | None,
        tx_gas_limit_delta: int = 0,
        **tx_kwargs: Any,
    ) -> Generator[TransactionWithCost, None, None]:
        """
        Generate a list of transactions calling the bytecode with a given
        total iteration count.

        The method accepts all keyword arguments that can be passed to the
        `Transaction` constructor.

        If any of the keyword arguments is callable, it will be called with
        iteration_count and start_iteration as keyword arguments.
        E.g. when the calldata that needs to be passed to the iterating
        bytecode changes with each iteration, the calldata can be generated
        dynamically by passing a callable to the calldata keyword argument.

        The returned object also contains an extra field with the expected
        gas cost of the transaction by the end of execution.
        """
        intrinsic_cost_kwargs = tx_kwargs.copy()

        if "calldata" in tx_kwargs:
            tx_kwargs["data"] = tx_kwargs.pop("calldata")
        if "return_cost_deducted_prior_execution" in tx_kwargs:
            tx_kwargs.pop("return_cost_deducted_prior_execution")
        for iteration_count in self.tx_iterations_by_total_iteration_count(
            fork=fork,
            total_iterations=total_iterations,
            start_iteration=start_iteration,
            **intrinsic_cost_kwargs,
        ):
            tx_gas_limit = self.tx_gas_limit_by_iteration_count(
                fork=fork,
                iteration_count=iteration_count,
                start_iteration=start_iteration,
                **intrinsic_cost_kwargs,
            )
            tx_gas_cost = self.tx_gas_cost_by_iteration_count(
                fork=fork,
                iteration_count=iteration_count,
                start_iteration=start_iteration,
                **intrinsic_cost_kwargs,
            )
            current_tx_kwargs = tx_kwargs.copy()

            for key, value in current_tx_kwargs.items():
                if callable(value):
                    current_tx_kwargs[key] = value(
                        iteration_count=iteration_count,
                        start_iteration=start_iteration,
                    )
            yield TransactionWithCost(
                to=to,
                gas_limit=tx_gas_limit + tx_gas_limit_delta,
                sender=sender,
                gas_cost=tx_gas_cost,
                **current_tx_kwargs,
            )
            start_iteration += iteration_count

setup instance-attribute

Bytecode executed once at the beginning before iterations start.

iterating instance-attribute

Bytecode executed in the first iteration.

warm_iterating instance-attribute

Analytical bytecode representing subsequent iterations after the first (warm state). This bytecode is not included in the final bytecode, and it's only used for the gas accounting properties of its opcodes and therefore gas calculation.

iterating_subcall instance-attribute

Analytical bytecode representing a subcall performed during each iteration. This bytecode is not included in the final bytecode, and it's only used for gas calculation.

The value can also be an integer, in which case it represents the gas cost of the subcall (e.g. the subcall is a precompiled contract)

cleanup instance-attribute

Bytecode executed once at the end after all iterations complete.

__new__(*, setup=None, iterating, cleanup=None, warm_iterating=None, iterating_subcall=None)

Create a new iterating bytecode.

Parameters:

Name Type Description Default
setup Bytecode | None

Bytecode executed once at the beginning before iterations start.

None
iterating Bytecode

Bytecode executed in the first iteration.

required
cleanup Bytecode | None

Bytecode executed once at the end after all iterations complete.

None
warm_iterating Bytecode | None

Analytical bytecode representing subsequent iterations after the first (warm state).

None
iterating_subcall Bytecode | int | None

Analytical bytecode representing a subcall performed during each iteration. This bytecode is not included in the final bytecode, and it's only used for gas calculation. The value can also be an integer, in which case it represents the gas cost of the subcall (e.g. the subcall is a precompiled contract).

None

Returns:

Type Description
Self

A new IteratingBytecode instance.

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
def __new__(
    cls,
    *,
    setup: Bytecode | None = None,
    iterating: Bytecode,
    cleanup: Bytecode | None = None,
    warm_iterating: Bytecode | None = None,
    iterating_subcall: Bytecode | int | None = None,
) -> Self:
    """
    Create a new iterating bytecode.

    Args:
        setup: Bytecode executed once at the beginning before
            iterations start.
        iterating: Bytecode executed in the first iteration.
        cleanup: Bytecode executed once at the end after all
            iterations complete.
        warm_iterating: Analytical bytecode representing subsequent
            iterations after the first (warm state).
        iterating_subcall: Analytical bytecode representing a subcall
            performed during each iteration. This bytecode is _not_
            included in the final bytecode, and it's only used for gas
            calculation. The value can also be an integer, in which case it
            represents the gas cost of the subcall (e.g. the subcall is a
            precompiled contract).

    Returns:
        A new IteratingBytecode instance.

    """
    instance = super(IteratingBytecode, cls).__new__(
        cls,
        setup + iterating + cleanup,
    )
    if setup is None:
        setup = Bytecode()
    instance.setup = setup
    instance.iterating = iterating
    if warm_iterating is None:
        instance.warm_iterating = iterating
    else:
        assert bytes(iterating) == bytes(warm_iterating), (
            "iterating and warm_iterating must have the same bytecode"
        )
        instance.warm_iterating = warm_iterating
    if iterating_subcall is None:
        instance.iterating_subcall = Bytecode()
    else:
        instance.iterating_subcall = iterating_subcall
    if cleanup is None:
        cleanup = Bytecode()
    instance.cleanup = cleanup
    return instance

iterating_subcall_gas_cost(*, fork)

Return the gas cost of the iterating subcall.

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
865
866
867
868
869
870
871
def iterating_subcall_gas_cost(
    self, *, fork: Type[ForkOpcodeInterface]
) -> int:
    """Return the gas cost of the iterating subcall."""
    if isinstance(self.iterating_subcall, int):
        return self.iterating_subcall
    return self.iterating_subcall.gas_cost(fork=fork)

iterating_subcall_reserve(*, fork)

Return the gas reserve needed so that the last iterating subcall does not fail due to the 63/64 rule.

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
873
874
875
876
877
878
879
880
881
882
883
def iterating_subcall_reserve(
    self, *, fork: Type[ForkOpcodeInterface]
) -> int:
    """
    Return the gas reserve needed so that the last iterating subcall does
    not fail due to the 63/64 rule.
    """
    iterating_subcall_gas_cost = self.iterating_subcall_gas_cost(fork=fork)
    return (
        iterating_subcall_gas_cost * 64 // 63
    ) - iterating_subcall_gas_cost

gas_cost_by_iteration_count(*, fork, iteration_count)

Return the cost of iterating through the bytecode N times.

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
def gas_cost_by_iteration_count(
    self, *, fork: Type[ForkOpcodeInterface], iteration_count: int
) -> int:
    """Return the cost of iterating through the bytecode N times."""
    loop_gas_cost = 0
    if iteration_count > 0:
        # Cold cost is just charged for the first iteration
        loop_gas_cost = self.iterating.gas_cost(fork=fork)
        # Warm cost is charged for all iterations except the first
        loop_gas_cost += self.warm_iterating.gas_cost(fork=fork) * (
            iteration_count - 1
        )
        # Subcall cost is charged for all iterations.
        loop_gas_cost += (
            self.iterating_subcall_gas_cost(fork=fork) * iteration_count
        )
    return (
        self.setup.gas_cost(fork=fork)
        + loop_gas_cost
        + self.cleanup.gas_cost(fork=fork)
    )

with_fixed_iteration_count(*, iteration_count)

Return a new FixedIterationsBytecode with the iteration count fixed.

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
907
908
909
910
911
912
913
914
915
916
917
918
919
920
def with_fixed_iteration_count(
    self, *, iteration_count: int
) -> "FixedIterationsBytecode":
    """
    Return a new FixedIterationsBytecode with the iteration count fixed.
    """
    return FixedIterationsBytecode(
        setup=self.setup,
        iterating=self.iterating,
        cleanup=self.cleanup,
        warm_iterating=self.warm_iterating,
        iterating_subcall=self.iterating_subcall,
        iteration_count=iteration_count,
    )

tx_gas_cost_by_iteration_count(*, fork, iteration_count, start_iteration=0, **intrinsic_cost_kwargs)

Calculate the exact gas cost of a transaction calling the bytecode for a given number of iterations.

The method accepts intrinsic gas cost kwargs to allow for the calculation of the intrinsic gas cost of the transaction.

If any of the intrinsic gas cost kwarg is callable, it will be called with iteration_count and start_iteration as keyword arguments.

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
def tx_gas_cost_by_iteration_count(
    self,
    *,
    fork: Fork,
    iteration_count: int,
    start_iteration: int = 0,
    **intrinsic_cost_kwargs: Any,
) -> int:
    """
    Calculate the exact gas cost of a transaction calling the bytecode
    for a given number of iterations.

    The method accepts intrinsic gas cost kwargs to allow for the
    calculation of the intrinsic gas cost of the transaction.

    If any of the intrinsic gas cost kwarg is callable, it will be called
    with iteration_count and start_iteration as keyword arguments.
    """
    intrinsic_gas_cost_calc = fork.transaction_intrinsic_cost_calculator()
    if "data" in intrinsic_cost_kwargs:
        intrinsic_cost_kwargs["calldata"] = intrinsic_cost_kwargs.pop(
            "data"
        )
    if "authorization_list" in intrinsic_cost_kwargs:
        intrinsic_cost_kwargs["authorization_list_or_count"] = len(
            intrinsic_cost_kwargs.pop("authorization_list")
        )
    if "return_cost_deducted_prior_execution" not in intrinsic_cost_kwargs:
        intrinsic_cost_kwargs["return_cost_deducted_prior_execution"] = (
            True
        )
    for key, value in intrinsic_cost_kwargs.items():
        if callable(value):
            intrinsic_cost_kwargs[key] = value(
                iteration_count=iteration_count,
                start_iteration=start_iteration,
            )
    return self.gas_cost_by_iteration_count(
        fork=fork, iteration_count=iteration_count
    ) + intrinsic_gas_cost_calc(**intrinsic_cost_kwargs)

tx_gas_limit_by_iteration_count(*, fork, iteration_count, start_iteration=0, **intrinsic_cost_kwargs)

Calculate the minimum gas limit of a transaction calling the bytecode for a given number of iterations.

The gas limit is calculated by adding the required extra gas for the last iteration due to the 63/64 rule.

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
def tx_gas_limit_by_iteration_count(
    self,
    *,
    fork: Fork,
    iteration_count: int,
    start_iteration: int = 0,
    **intrinsic_cost_kwargs: Any,
) -> int:
    """
    Calculate the minimum gas limit of a transaction calling the bytecode
    for a given number of iterations.

    The gas limit is calculated by adding the required extra gas for the
    last iteration due to the 63/64 rule.
    """
    return self.tx_gas_cost_by_iteration_count(
        fork=fork,
        iteration_count=iteration_count,
        start_iteration=start_iteration,
        **intrinsic_cost_kwargs,
    ) + self.iterating_subcall_reserve(fork=fork)

tx_iterations_by_gas_limit(*, fork, gas_limit, start_iteration=0, **intrinsic_cost_kwargs)

Calculate the number of iterations needed to reach the given a gas-to-be-used value.

Each element of the returned list represents the number of iterations for a single transaction.

If the fork's transaction gas limit cap is not None, the returned list will contain one item per transaction that represents the iteration count for that transaction, and no transaction will exceed the gas limit cap.

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
def tx_iterations_by_gas_limit(
    self,
    *,
    fork: Fork,
    gas_limit: int,
    start_iteration: int = 0,
    **intrinsic_cost_kwargs: Any,
) -> Generator[int, None, None]:
    """
    Calculate the number of iterations needed to reach the given a
    gas-to-be-used value.

    Each element of the returned list represents the number of iterations
    for a single transaction.

    If the fork's transaction gas limit cap is not `None`, the returned
    list will contain one item per transaction that represents the
    iteration count for that transaction, and no transaction will exceed
    the gas limit cap.
    """
    gas_limit_cap = fork.transaction_gas_limit_cap()
    remaining_gas = gas_limit

    while remaining_gas >= self.tx_gas_limit_by_iteration_count(
        fork=fork,
        iteration_count=1,
        start_iteration=start_iteration,
        **intrinsic_cost_kwargs,
    ):
        # Binary search for the maximum number of iterations that fits
        # within remaining_gas
        max_gas_limit = (
            min(remaining_gas, gas_limit_cap)
            if gas_limit_cap is not None
            else remaining_gas
        )
        best_iterations, best_iterations_gas = (
            self._binary_search_iterations(
                fork=fork,
                gas_limit=max_gas_limit,
                start_iteration=start_iteration,
                **intrinsic_cost_kwargs,
            )
        )
        yield best_iterations
        remaining_gas -= best_iterations_gas
        start_iteration += best_iterations

tx_iterations_by_total_iteration_count(*, fork, total_iterations, start_iteration=0, **intrinsic_cost_kwargs)

Calculate how to split a total number of iterations across multiple transactions so that each transaction fits within the gas limit cap.

Returns a list where each element represents the number of iterations for that transaction, and the sum equals total_iterations.

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
def tx_iterations_by_total_iteration_count(
    self,
    *,
    fork: Fork,
    total_iterations: int,
    start_iteration: int = 0,
    **intrinsic_cost_kwargs: Any,
) -> Generator[int, None, None]:
    """
    Calculate how to split a total number of iterations across multiple
    transactions so that each transaction fits within the gas limit cap.

    Returns a list where each element represents the number of iterations
    for that transaction, and the sum equals total_iterations.
    """
    gas_limit_cap = fork.transaction_gas_limit_cap()
    if gas_limit_cap is None:
        # No limit, all iterations fit in a single transaction.
        yield total_iterations
        return
    remaining_iterations = total_iterations
    best_iterations: int | None = None
    constant_intrinsic_gas_cost = self._intrinsic_cost_is_constant(
        intrinsic_cost_kwargs
    )

    while remaining_iterations > 0:
        if best_iterations is None or not constant_intrinsic_gas_cost:
            best_iterations, _ = self._binary_search_iterations(
                fork=fork,
                gas_limit=gas_limit_cap,
                start_iteration=start_iteration,
                **intrinsic_cost_kwargs,
            )
        if best_iterations >= remaining_iterations:
            yield remaining_iterations
            return
        else:
            yield best_iterations
            remaining_iterations -= best_iterations
            start_iteration += best_iterations

transactions_by_gas_limit(*, fork, gas_limit, start_iteration=0, sender, to, tx_gas_limit_delta=0, **tx_kwargs)

Generate a list of transactions calling the bytecode with a given gas limit.

The method accepts all keyword arguments that can be passed to the Transaction constructor.

If any of the keyword arguments is callable, it will be called with iteration_count and start_iteration as keyword arguments. E.g. when the calldata that needs to be passed to the iterating bytecode changes with each iteration, the calldata can be generated dynamically by passing a callable to the calldata keyword argument.

The returned object also contains an extra field with the expected gas cost of the transaction by the end of execution.

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
def transactions_by_gas_limit(
    self,
    *,
    fork: Fork,
    gas_limit: int,
    start_iteration: int = 0,
    sender: EOA,
    to: Address | None,
    tx_gas_limit_delta: int = 0,
    **tx_kwargs: Any,
) -> Generator[TransactionWithCost, None, None]:
    """
    Generate a list of transactions calling the bytecode with a given gas
    limit.

    The method accepts all keyword arguments that can be passed to the
    `Transaction` constructor.

    If any of the keyword arguments is callable, it will be called with
    iteration_count and start_iteration as keyword arguments.
    E.g. when the calldata that needs to be passed to the iterating
    bytecode changes with each iteration, the calldata can be generated
    dynamically by passing a callable to the calldata keyword argument.

    The returned object also contains an extra field with the expected
    gas cost of the transaction by the end of execution.
    """
    intrinsic_cost_kwargs = tx_kwargs.copy()

    if "calldata" in tx_kwargs:
        tx_kwargs["data"] = tx_kwargs.pop("calldata")
    if "return_cost_deducted_prior_execution" in tx_kwargs:
        tx_kwargs.pop("return_cost_deducted_prior_execution")
    for iteration_count in self.tx_iterations_by_gas_limit(
        fork=fork,
        gas_limit=gas_limit,
        start_iteration=start_iteration,
        **intrinsic_cost_kwargs,
    ):
        tx_gas_limit = self.tx_gas_limit_by_iteration_count(
            fork=fork,
            iteration_count=iteration_count,
            start_iteration=start_iteration,
            **intrinsic_cost_kwargs,
        )
        tx_gas_cost = self.tx_gas_cost_by_iteration_count(
            fork=fork,
            iteration_count=iteration_count,
            start_iteration=start_iteration,
            **intrinsic_cost_kwargs,
        )
        current_tx_kwargs = tx_kwargs.copy()

        for key, value in current_tx_kwargs.items():
            if callable(value):
                current_tx_kwargs[key] = value(
                    iteration_count=iteration_count,
                    start_iteration=start_iteration,
                )
        yield TransactionWithCost(
            to=to,
            gas_limit=tx_gas_limit + tx_gas_limit_delta,
            sender=sender,
            gas_cost=tx_gas_cost,
            **current_tx_kwargs,
        )
        start_iteration += iteration_count

transactions_by_total_iteration_count(*, fork, total_iterations, start_iteration=0, sender, to, tx_gas_limit_delta=0, **tx_kwargs)

Generate a list of transactions calling the bytecode with a given total iteration count.

The method accepts all keyword arguments that can be passed to the Transaction constructor.

If any of the keyword arguments is callable, it will be called with iteration_count and start_iteration as keyword arguments. E.g. when the calldata that needs to be passed to the iterating bytecode changes with each iteration, the calldata can be generated dynamically by passing a callable to the calldata keyword argument.

The returned object also contains an extra field with the expected gas cost of the transaction by the end of execution.

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
def transactions_by_total_iteration_count(
    self,
    *,
    fork: Fork,
    total_iterations: int,
    start_iteration: int = 0,
    sender: EOA,
    to: Address | None,
    tx_gas_limit_delta: int = 0,
    **tx_kwargs: Any,
) -> Generator[TransactionWithCost, None, None]:
    """
    Generate a list of transactions calling the bytecode with a given
    total iteration count.

    The method accepts all keyword arguments that can be passed to the
    `Transaction` constructor.

    If any of the keyword arguments is callable, it will be called with
    iteration_count and start_iteration as keyword arguments.
    E.g. when the calldata that needs to be passed to the iterating
    bytecode changes with each iteration, the calldata can be generated
    dynamically by passing a callable to the calldata keyword argument.

    The returned object also contains an extra field with the expected
    gas cost of the transaction by the end of execution.
    """
    intrinsic_cost_kwargs = tx_kwargs.copy()

    if "calldata" in tx_kwargs:
        tx_kwargs["data"] = tx_kwargs.pop("calldata")
    if "return_cost_deducted_prior_execution" in tx_kwargs:
        tx_kwargs.pop("return_cost_deducted_prior_execution")
    for iteration_count in self.tx_iterations_by_total_iteration_count(
        fork=fork,
        total_iterations=total_iterations,
        start_iteration=start_iteration,
        **intrinsic_cost_kwargs,
    ):
        tx_gas_limit = self.tx_gas_limit_by_iteration_count(
            fork=fork,
            iteration_count=iteration_count,
            start_iteration=start_iteration,
            **intrinsic_cost_kwargs,
        )
        tx_gas_cost = self.tx_gas_cost_by_iteration_count(
            fork=fork,
            iteration_count=iteration_count,
            start_iteration=start_iteration,
            **intrinsic_cost_kwargs,
        )
        current_tx_kwargs = tx_kwargs.copy()

        for key, value in current_tx_kwargs.items():
            if callable(value):
                current_tx_kwargs[key] = value(
                    iteration_count=iteration_count,
                    start_iteration=start_iteration,
                )
        yield TransactionWithCost(
            to=to,
            gas_limit=tx_gas_limit + tx_gas_limit_delta,
            sender=sender,
            gas_cost=tx_gas_cost,
            **current_tx_kwargs,
        )
        start_iteration += iteration_count

SequentialAddressLayout

Bases: Bytecode

Set up sequential address iteration in memory.

Store a starting address in a single memory word and provide methods to read the current address and advance to the next one.

Memory layout after execution: - MEM[offset: offset + 32] = current address

To obtain the address, use .address_op() which returns Op.MLOAD(offset).

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
class SequentialAddressLayout(Bytecode):
    """
    Set up sequential address iteration in memory.

    Store a starting address in a single memory word and provide methods
    to read the current address and advance to the next one.

    Memory layout after execution:
    - MEM[offset: offset + 32] = current address

    To obtain the address, use `.address_op()` which returns
    `Op.MLOAD(offset)`.
    """

    offset: int = 0
    _increment: int = 1

    def __new__(
        cls,
        *,
        starting_address: int | bytes | Bytecode = 0x1000,
        offset: int = 0,
        old_memory_size: int = 0,
        increment: int = 1,
    ) -> Self:
        """
        Assemble the bytecode that stores the starting address in memory.
        """
        required_size = offset + 32
        new_memory_size = max(old_memory_size, required_size)
        bytecode = Op.MSTORE(
            offset=offset,
            value=starting_address,
            # Gas accounting
            old_memory_size=old_memory_size,
            new_memory_size=new_memory_size,
        )
        instance = super().__new__(cls, bytecode)
        instance.offset = offset
        instance._increment = increment
        return instance

    def address_op(self) -> Bytecode:
        """Return bytecode that loads the current address from memory."""
        return Op.MLOAD(self.offset)

    def increment_address_op(self, increment: int | None = None) -> Bytecode:
        """Return bytecode that advances to the next address."""
        inc = increment if increment is not None else self._increment
        return Op.MSTORE(
            self.offset,
            Op.ADD(Op.MLOAD(self.offset), inc),
        )

__new__(*, starting_address=4096, offset=0, old_memory_size=0, increment=1)

Assemble the bytecode that stores the starting address in memory.

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
def __new__(
    cls,
    *,
    starting_address: int | bytes | Bytecode = 0x1000,
    offset: int = 0,
    old_memory_size: int = 0,
    increment: int = 1,
) -> Self:
    """
    Assemble the bytecode that stores the starting address in memory.
    """
    required_size = offset + 32
    new_memory_size = max(old_memory_size, required_size)
    bytecode = Op.MSTORE(
        offset=offset,
        value=starting_address,
        # Gas accounting
        old_memory_size=old_memory_size,
        new_memory_size=new_memory_size,
    )
    instance = super().__new__(cls, bytecode)
    instance.offset = offset
    instance._increment = increment
    return instance

address_op()

Return bytecode that loads the current address from memory.

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
757
758
759
def address_op(self) -> Bytecode:
    """Return bytecode that loads the current address from memory."""
    return Op.MLOAD(self.offset)

increment_address_op(increment=None)

Return bytecode that advances to the next address.

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
761
762
763
764
765
766
767
def increment_address_op(self, increment: int | None = None) -> Bytecode:
    """Return bytecode that advances to the next address."""
    inc = increment if increment is not None else self._increment
    return Op.MSTORE(
        self.offset,
        Op.ADD(Op.MLOAD(self.offset), inc),
    )

Switch

Bases: Bytecode

Helper class used to generate switch-case expressions in EVM bytecode.

Switch-case behavior
  • If no condition is met in the list of BytecodeCases conditions, the default_action bytecode is executed.

  • If multiple conditions are met, the action from the first valid condition is the only one executed.

  • There is no fall through; it is not possible to execute multiple actions.

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
class Switch(Bytecode):
    """
    Helper class used to generate switch-case expressions in EVM bytecode.

    Switch-case behavior:
      - If no condition is met in the list of BytecodeCases
        conditions, the `default_action` bytecode is executed.

      - If multiple conditions are met, the action from the first valid
        condition is the only one executed.

      - There is no fall through; it is not possible to execute
        multiple actions.
    """

    default_action: Bytecode | Op | None
    """
    The default bytecode to execute; if no condition is met, this bytecode is
    executed.
    """

    cases: List[Case]
    """
    A list of Cases: The first element with a condition that
    evaluates to a non-zero value is the one that is executed.
    """

    def __new__(
        cls,
        *,
        default_action: Bytecode | Op | None = None,
        cases: List[Case],
    ) -> Self:
        """
        Assemble the bytecode by looping over the list of cases and adding the
        necessary [R]JUMPI and JUMPDEST opcodes in order to replicate
        switch-case behavior.
        """
        # The length required to jump over subsequent actions to the final
        # JUMPDEST at the end of the switch-case block:
        #   - add 6 per case for the length of the JUMPDEST and
        #     JUMP(ADD(PC, action_jump_length)) bytecode
        #
        #   - add 3 to the total to account for this action's JUMP;
        #     the PC within the call requires a "correction" of 3.

        bytecode = Bytecode()

        # All conditions get prepended to this bytecode; if none are met, we
        # reach the default
        action_jump_length = sum(len(case.action) + 6 for case in cases) + 3
        bytecode = default_action + Op.JUMP(Op.ADD(Op.PC, action_jump_length))
        # The length required to jump over the default action and its JUMP
        # bytecode
        condition_jump_length = len(bytecode) + 3

        # Reversed: first case in the list has priority; it will become the
        # outer-most onion layer. We build up layers around the default_action,
        # after 1 iteration of the loop, a simplified representation of the
        # bytecode is:
        #
        # JUMPI(case[n-1].condition)
        # + default_action + JUMP()
        # + JUMPDEST + case[n-1].action + JUMP()
        #
        # and after n=len(cases) iterations:
        #
        # JUMPI(case[0].condition)
        # + JUMPI(case[1].condition)
        # ...
        # + JUMPI(case[n-1].condition) + default_action + JUMP() + JUMPDEST +
        # case[n-1].action + JUMP() + ... + JUMPDEST + case[1].action + JUMP()
        # + JUMPDEST + case[0].action + JUMP()
        for case in reversed(cases):
            action = case.action
            action_jump_length -= len(action) + 6
            action = (
                Op.JUMPDEST
                + action
                + Op.JUMP(Op.ADD(Op.PC, action_jump_length))
            )
            condition = Op.JUMPI(
                Op.ADD(Op.PC, condition_jump_length), case.condition
            )
            # wrap the current case around the onion as its next layer
            bytecode = condition + bytecode + action
            condition_jump_length += len(condition) + len(action)

        bytecode += Op.JUMPDEST

        instance = super().__new__(cls, bytecode)
        instance.default_action = default_action
        instance.cases = cases
        return instance

default_action instance-attribute

The default bytecode to execute; if no condition is met, this bytecode is executed.

cases instance-attribute

A list of Cases: The first element with a condition that evaluates to a non-zero value is the one that is executed.

__new__(*, default_action=None, cases)

Assemble the bytecode by looping over the list of cases and adding the necessary [R]JUMPI and JUMPDEST opcodes in order to replicate switch-case behavior.

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
def __new__(
    cls,
    *,
    default_action: Bytecode | Op | None = None,
    cases: List[Case],
) -> Self:
    """
    Assemble the bytecode by looping over the list of cases and adding the
    necessary [R]JUMPI and JUMPDEST opcodes in order to replicate
    switch-case behavior.
    """
    # The length required to jump over subsequent actions to the final
    # JUMPDEST at the end of the switch-case block:
    #   - add 6 per case for the length of the JUMPDEST and
    #     JUMP(ADD(PC, action_jump_length)) bytecode
    #
    #   - add 3 to the total to account for this action's JUMP;
    #     the PC within the call requires a "correction" of 3.

    bytecode = Bytecode()

    # All conditions get prepended to this bytecode; if none are met, we
    # reach the default
    action_jump_length = sum(len(case.action) + 6 for case in cases) + 3
    bytecode = default_action + Op.JUMP(Op.ADD(Op.PC, action_jump_length))
    # The length required to jump over the default action and its JUMP
    # bytecode
    condition_jump_length = len(bytecode) + 3

    # Reversed: first case in the list has priority; it will become the
    # outer-most onion layer. We build up layers around the default_action,
    # after 1 iteration of the loop, a simplified representation of the
    # bytecode is:
    #
    # JUMPI(case[n-1].condition)
    # + default_action + JUMP()
    # + JUMPDEST + case[n-1].action + JUMP()
    #
    # and after n=len(cases) iterations:
    #
    # JUMPI(case[0].condition)
    # + JUMPI(case[1].condition)
    # ...
    # + JUMPI(case[n-1].condition) + default_action + JUMP() + JUMPDEST +
    # case[n-1].action + JUMP() + ... + JUMPDEST + case[1].action + JUMP()
    # + JUMPDEST + case[0].action + JUMP()
    for case in reversed(cases):
        action = case.action
        action_jump_length -= len(action) + 6
        action = (
            Op.JUMPDEST
            + action
            + Op.JUMP(Op.ADD(Op.PC, action_jump_length))
        )
        condition = Op.JUMPI(
            Op.ADD(Op.PC, condition_jump_length), case.condition
        )
        # wrap the current case around the onion as its next layer
        bytecode = condition + bytecode + action
        condition_jump_length += len(condition) + len(action)

    bytecode += Op.JUMPDEST

    instance = super().__new__(cls, bytecode)
    instance.default_action = default_action
    instance.cases = cases
    return instance

TransactionWithCost

Bases: Transaction

Transaction object that can include the expected gas to be consumed.

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
770
771
772
773
class TransactionWithCost(Transaction):
    """Transaction object that can include the expected gas to be consumed."""

    gas_cost: int = Field(..., exclude=True)

While

Bases: Bytecode

Helper class used to generate while-loop bytecode.

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
class While(Bytecode):
    """Helper class used to generate while-loop bytecode."""

    def __new__(
        cls,
        *,
        body: Bytecode | Op,
        condition: Bytecode | Op | None = None,
    ) -> Self:
        """
        Assemble the loop bytecode.

        The condition nor the body can leave a stack item on the stack.
        """
        bytecode = Bytecode()
        bytecode += Op.JUMPDEST
        bytecode += body
        if condition is not None:
            bytecode += Op.JUMPI(
                Op.SUB(Op.PC, Op.PUSH4[len(body) + len(condition) + 6]),
                condition,
            )
        else:
            bytecode += Op.JUMP(Op.SUB(Op.PC, Op.PUSH4[len(body) + 6]))
        return super().__new__(cls, bytecode)

__new__(*, body, condition=None)

Assemble the loop bytecode.

The condition nor the body can leave a stack item on the stack.

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
def __new__(
    cls,
    *,
    body: Bytecode | Op,
    condition: Bytecode | Op | None = None,
) -> Self:
    """
    Assemble the loop bytecode.

    The condition nor the body can leave a stack item on the stack.
    """
    bytecode = Bytecode()
    bytecode += Op.JUMPDEST
    bytecode += body
    if condition is not None:
        bytecode += Op.JUMPI(
            Op.SUB(Op.PC, Op.PUSH4[len(body) + len(condition) + 6]),
            condition,
        )
    else:
        bytecode += Op.JUMP(Op.SUB(Op.PC, Op.PUSH4[len(body) + 6]))
    return super().__new__(cls, bytecode)

WhileGas

Bases: Bytecode

Helper class to generate a gas-bounded while-loop.

Similar to While but automatically generates a condition that checks whether there is sufficient remaining gas for another iteration.

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
class WhileGas(Bytecode):
    """
    Helper class to generate a gas-bounded while-loop.

    Similar to While but automatically generates a condition that checks
    whether there is sufficient remaining gas for another iteration.
    """

    _iteration_cost: int

    def __new__(
        cls,
        *,
        body: Bytecode | Op,
        fork: Type[ForkOpcodeInterface],
        extra_gas: int = 0,
    ) -> Self:
        """
        Assemble the loop bytecode with an automatic gas check condition.

        The loop continues while the remaining gas exceeds the cost of one
        full iteration (body + loop overhead + extra_gas).
        """
        # Build a temporary While with a dummy condition of the same byte
        # length to calculate the real per-iteration gas cost.
        dummy_condition = Op.GT(Op.GAS, Op.PUSH4[0])
        dummy_loop = While(body=body, condition=dummy_condition)

        # After GAS reads the remaining gas, the exit path still executes
        # GT + PUSH4[offset] + PC + SUB + JUMPI. We must ensure enough gas
        # remains for this exit path after a full iteration, otherwise the
        # loop OOGs on exit instead of terminating cleanly.
        exit_overhead = (
            Op.GT + Op.PUSH4[0] + Op.PC + Op.SUB + Op.JUMPI
        ).gas_cost(fork)
        minimum_gas = dummy_loop.gas_cost(fork) + extra_gas + exit_overhead

        # Now build the real condition with the actual cost.
        condition = Op.GT(Op.GAS, Op.PUSH4[minimum_gas])
        bytecode = While(body=body, condition=condition)
        return super().__new__(cls, bytecode)

__new__(*, body, fork, extra_gas=0)

Assemble the loop bytecode with an automatic gas check condition.

The loop continues while the remaining gas exceeds the cost of one full iteration (body + loop overhead + extra_gas).

Source code in packages/testing/src/execution_testing/tools/tools_code/generators.py
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
def __new__(
    cls,
    *,
    body: Bytecode | Op,
    fork: Type[ForkOpcodeInterface],
    extra_gas: int = 0,
) -> Self:
    """
    Assemble the loop bytecode with an automatic gas check condition.

    The loop continues while the remaining gas exceeds the cost of one
    full iteration (body + loop overhead + extra_gas).
    """
    # Build a temporary While with a dummy condition of the same byte
    # length to calculate the real per-iteration gas cost.
    dummy_condition = Op.GT(Op.GAS, Op.PUSH4[0])
    dummy_loop = While(body=body, condition=dummy_condition)

    # After GAS reads the remaining gas, the exit path still executes
    # GT + PUSH4[offset] + PC + SUB + JUMPI. We must ensure enough gas
    # remains for this exit path after a full iteration, otherwise the
    # loop OOGs on exit instead of terminating cleanly.
    exit_overhead = (
        Op.GT + Op.PUSH4[0] + Op.PC + Op.SUB + Op.JUMPI
    ).gas_cost(fork)
    minimum_gas = dummy_loop.gas_cost(fork) + extra_gas + exit_overhead

    # Now build the real condition with the actual cost.
    condition = Op.GT(Op.GAS, Op.PUSH4[minimum_gas])
    bytecode = While(body=body, condition=condition)
    return super().__new__(cls, bytecode)

DeploymentTestType

Bases: StrEnum

Represents the type of deployment test.

Source code in packages/testing/src/execution_testing/tools/utility/generators.py
21
22
23
24
25
26
class DeploymentTestType(StrEnum):
    """Represents the type of deployment test."""

    DEPLOY_BEFORE_FORK = "deploy_before_fork"
    DEPLOY_ON_FORK_BLOCK = "deploy_on_fork_block"
    DEPLOY_AFTER_FORK = "deploy_after_fork"

gas_test(*, fork, state_test, pre, setup_code, subject_code, subject_code_warm=None, tear_down_code=None, cold_gas=None, warm_gas=None, subject_address=None, subject_balance=0, oog_difference=1, out_of_gas_testing=True, prelude_code=None, tx_gas=None)

Create State Test to check the gas cost of a sequence of code.

setup_code and tear_down_code are called multiple times during the test, and MUST NOT have any side-effects which persist across message calls, and in particular, any effects on the gas usage of subject_code.

Source code in packages/testing/src/execution_testing/tools/utility/generators.py
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
def gas_test(
    *,
    fork: Fork,
    state_test: StateTestFiller,
    pre: Alloc,
    setup_code: Bytecode,
    subject_code: Bytecode,
    subject_code_warm: Bytecode | None = None,
    tear_down_code: Bytecode | None = None,
    cold_gas: int | None = None,
    warm_gas: int | None = None,
    subject_address: Address | None = None,
    subject_balance: int = 0,
    oog_difference: int = 1,
    out_of_gas_testing: bool = True,
    prelude_code: Bytecode | None = None,
    tx_gas: int | None = None,
) -> None:
    """
    Create State Test to check the gas cost of a sequence of code.

    `setup_code` and `tear_down_code` are called multiple times during the
    test, and MUST NOT have any side-effects which persist across message
    calls, and in particular, any effects on the gas usage of `subject_code`.
    """
    if fork < Berlin:
        raise ValueError(
            "Gas tests before Berlin are not supported due to CALL gas changes"
        )

    if cold_gas is None:
        cold_gas = subject_code.gas_cost(fork)

    if cold_gas <= 0:
        raise ValueError(
            f"Target gas allocations (cold_gas) must be > 0, got {cold_gas}"
        )
    if warm_gas is None:
        if subject_code_warm is not None:
            warm_gas = subject_code_warm.gas_cost(fork)
        else:
            warm_gas = cold_gas

    sender = pre.fund_eoa()
    if tear_down_code is None:
        tear_down_code = Op.STOP
    address_baseline = pre.deploy_contract(setup_code + tear_down_code)
    code_subject = setup_code + subject_code + tear_down_code
    address_subject = pre.deploy_contract(
        code_subject,
        balance=subject_balance,
        address=subject_address,
    )
    # 2 times GAS, POP, CALL, 6 times PUSH1 - instructions charged for at every
    # gas run
    gas_costs = fork.gas_costs()
    opcode_gas_cost = gas_costs.BASE
    opcode_pop_cost = gas_costs.BASE
    opcode_push_cost = gas_costs.VERY_LOW
    gas_single_gas_run = (
        2 * opcode_gas_cost
        + opcode_pop_cost
        + gas_costs.WARM_ACCESS
        + 6 * opcode_push_cost
    )
    address_legacy_harness = pre.deploy_contract(
        code=(
            # warm subject and baseline without executing
            (
                Op.BALANCE(address_subject)
                + Op.POP
                + Op.BALANCE(address_baseline)
                + Op.POP
            )
            # run any "prelude" code that may have universal side effects
            + prelude_code
            # Baseline gas run
            + (
                Op.GAS
                + Op.CALL(address=address_baseline, gas=Op.GAS)
                + Op.POP
                + Op.GAS
                + Op.SWAP1
                + Op.SUB
            )
            # cold gas run
            + (
                Op.GAS
                + Op.CALL(address=address_subject, gas=Op.GAS)
                + Op.POP
                + Op.GAS
                + Op.SWAP1
                + Op.SUB
            )
            # warm gas run
            + (
                Op.GAS
                + Op.CALL(address=address_subject, gas=Op.GAS)
                + Op.POP
                + Op.GAS
                + Op.SWAP1
                + Op.SUB
            )
            # Store warm gas: DUP3 is the gas of the baseline gas run
            + (
                Op.DUP3
                + Op.SWAP1
                + Op.SUB
                + Op.PUSH2(slot_warm_gas)
                + Op.SSTORE
            )
            # store cold gas: DUP2 is the gas of the baseline gas run
            + (
                Op.DUP2
                + Op.SWAP1
                + Op.SUB
                + Op.PUSH2(slot_cold_gas)
                + Op.SSTORE
            )
            + (
                (
                    # do an oog gas run, unless skipped with
                    # `out_of_gas_testing=False`:
                    #
                    # - DUP7 is the gas of the baseline gas run, after other
                    #   CALL args were pushed
                    # - subtract the gas charged by the harness
                    # - add warm gas charged by the subject
                    # - subtract `oog_difference` to cause OOG exception
                    #   (1 by default)
                    Op.SSTORE(
                        slot_oog_call_result,
                        Op.CALL(
                            gas=Op.ADD(
                                warm_gas - gas_single_gas_run - oog_difference,
                                Op.DUP7,
                            ),
                            address=address_subject,
                        ),
                    )
                    # sanity gas run: not subtracting 1 to see if enough gas
                    # makes the call succeed
                    + Op.SSTORE(
                        slot_sanity_call_result,
                        Op.CALL(
                            gas=Op.ADD(warm_gas - gas_single_gas_run, Op.DUP7),
                            address=address_subject,
                        ),
                    )
                    + Op.STOP
                )
                if out_of_gas_testing
                else Op.STOP
            )
        ),
    )

    post = {
        address_legacy_harness: Account(
            storage={
                slot_warm_gas: warm_gas,
                slot_cold_gas: cold_gas,
            },
        ),
    }

    if out_of_gas_testing:
        post[address_legacy_harness].storage[slot_oog_call_result] = (
            LEGACY_CALL_FAILURE
        )
        post[address_legacy_harness].storage[slot_sanity_call_result] = (
            LEGACY_CALL_SUCCESS
        )

    if tx_gas is None:
        tx_gas = gas_single_gas_run + cold_gas + 500_000
    tx = Transaction(
        to=address_legacy_harness, gas_limit=tx_gas, sender=sender
    )

    state_test(pre=pre, tx=tx, post=post)

generate_system_contract_deploy_test(*, fork, tx_json_path, expected_deploy_address, fail_on_empty_code, expected_system_contract_storage=None)

Generate a test that verifies the correct deployment of a system contract.

Generates following test cases:

                | before/after fork | fail on      | invalid block  |
                                      empty block  |                |

--------------------|-------------------|--------------|----------------| deploy_before_fork-| before | False | False | nonzero_balance

deploy_before_fork-| before | True | False | zero_balance

deploy_on_fork_ | on fork block | False | False | block-nonzero_ balance

deploy_on_fork_ | on fork block | True | False | block-zero_balance

deploy_after_fork | after | False | False | -nonzero_balance

deploy_after_fork | after | True | True | -zero_balance

The has balance parametrization does not have an effect on the expectation of the test.

Parameters:

Name Type Description Default
fork Fork

The fork to test.

required
tx_json_path Path

Path to the JSON file with the transaction to deploy the system contract. Providing a JSON file is useful to copy-paste the transaction from the EIP.

required
expected_deploy_address Address

The expected address of the deployed contract.

required
fail_on_empty_code bool

If True, the test is expected to fail on empty code.

required
expected_system_contract_storage Dict | None

The expected storage of the system contract.

None
Source code in packages/testing/src/execution_testing/tools/utility/generators.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
def generate_system_contract_deploy_test(
    *,
    fork: Fork,
    tx_json_path: Path,
    expected_deploy_address: Address,
    fail_on_empty_code: bool,
    expected_system_contract_storage: Dict | None = None,
) -> Callable[[SystemContractDeployTestFunction], Callable]:
    """
    Generate a test that verifies the correct deployment of a system contract.

    Generates following test cases:

                        | before/after fork | fail on      | invalid block  |
                                              empty block  |                |
    --------------------|-------------------|--------------|----------------|
    `deploy_before_fork-| before            | False        | False          |
    nonzero_balance`

    `deploy_before_fork-| before            | True         | False          |
    zero_balance`

    `deploy_on_fork_    | on fork block     | False        | False          |
    block-nonzero_
    balance`

    `deploy_on_fork_    | on fork block     | True         | False          |
    block-zero_balance`

    `deploy_after_fork  | after             | False        | False          |
    -nonzero_balance`

    `deploy_after_fork  | after             | True         | True           |
    -zero_balance`


    The `has balance` parametrization does not have an effect on the
    expectation of the test.

    Arguments:
      fork (Fork): The fork to test.
      tx_json_path (Path): Path to the JSON file with the transaction to
                           deploy the system contract. Providing a JSON
                           file is useful to copy-paste the transaction
                           from the EIP.
      expected_deploy_address (Address): The expected address of the deployed
                                         contract.
      fail_on_empty_code (bool): If True, the test is expected to fail
                                 on empty code.
      expected_system_contract_storage (Dict | None): The expected storage of
                                                      the system contract.

    """
    if not issubclass(fork, BaseFork):
        raise TypeError(
            "fork parameter of generate_system_contract_deploy_test must be "
            "a subclass of Fork"
        )
    with open(tx_json_path, mode="r") as f:
        tx_json = json.loads(f.read())
    if "gasLimit" not in tx_json and "gas" in tx_json:
        tx_json["gasLimit"] = tx_json["gas"]
        del tx_json["gas"]
    if "protected" not in tx_json:
        tx_json["protected"] = False
    deploy_tx = Transaction.model_validate(tx_json).with_signature_and_sender()
    gas_price = deploy_tx.gas_price
    assert gas_price is not None
    deployer_required_balance = deploy_tx.gas_limit * gas_price
    deployer_address = deploy_tx.sender
    if "hash" in tx_json:
        assert deploy_tx.hash == Hash(tx_json["hash"])
    if "sender" in tx_json:
        assert deploy_tx.sender == Address(tx_json["sender"])

    def decorator(func: SystemContractDeployTestFunction) -> Callable:
        @pytest.mark.parametrize(
            "has_balance",
            [
                pytest.param(ContractAddressHasBalance.NONZERO_BALANCE),
                pytest.param(ContractAddressHasBalance.ZERO_BALANCE),
            ],
            ids=lambda x: x.name.lower(),
        )
        @pytest.mark.parametrize(
            "test_type",
            [
                pytest.param(DeploymentTestType.DEPLOY_BEFORE_FORK),
                pytest.param(DeploymentTestType.DEPLOY_ON_FORK_BLOCK),
                pytest.param(
                    DeploymentTestType.DEPLOY_AFTER_FORK,
                    marks=[pytest.mark.exception_test]
                    if fail_on_empty_code
                    else [],
                ),
            ],
            ids=lambda x: x.name.lower(),
        )
        @pytest.mark.pre_alloc_mutable
        @pytest.mark.valid_at_transition_to(fork.name())
        def wrapper(
            blockchain_test: BlockchainTestFiller,
            has_balance: ContractAddressHasBalance,
            pre: Alloc,
            test_type: DeploymentTestType,
            fork: TransitionFork,
        ) -> None:
            assert fork.at_block == 0, (
                "Block number based transition forks are not supported by "
                "generate_system_contract_deploy_test"
            )
            assert deployer_address is not None
            assert deploy_tx.created_contract == expected_deploy_address
            blocks: List[Block] = []

            if test_type == DeploymentTestType.DEPLOY_BEFORE_FORK:
                blocks = [
                    Block(  # Deployment block
                        txs=[deploy_tx],
                        timestamp=fork.at_timestamp - 1,
                    ),
                    Block(  # Empty block on fork
                        txs=[],
                        timestamp=fork.at_timestamp,
                    ),
                ]
            elif test_type == DeploymentTestType.DEPLOY_ON_FORK_BLOCK:
                blocks = [
                    Block(  # Deployment on fork block
                        txs=[deploy_tx],
                        timestamp=fork.at_timestamp,
                    ),
                    Block(  # Empty block after fork
                        txs=[],
                        timestamp=fork.at_timestamp + 1,
                    ),
                ]
            elif test_type == DeploymentTestType.DEPLOY_AFTER_FORK:
                blocks = [
                    Block(  # Empty block on fork
                        txs=[],
                        timestamp=fork.at_timestamp,
                        exception=BlockException.SYSTEM_CONTRACT_EMPTY
                        if fail_on_empty_code
                        else None,
                    )
                ]
                if not fail_on_empty_code:
                    blocks.append(
                        Block(  # Deployment after fork block
                            txs=[deploy_tx],
                            timestamp=fork.at_timestamp + 1,
                        )
                    )
                    blocks.append(
                        Block(  # Empty block after deployment
                            txs=[],
                            timestamp=fork.at_timestamp + 2,
                        ),
                    )
            balance = (
                1
                if has_balance == ContractAddressHasBalance.NONZERO_BALANCE
                else 0
            )
            pre[expected_deploy_address] = Account(
                code=b"",  # Remove the code that is automatically allocated on
                # the fork
                nonce=0,
                balance=balance,
            )
            pre.fund_address(deployer_address, deployer_required_balance)

            expected_deploy_address_int = int.from_bytes(
                expected_deploy_address, "big"
            )

            post = Alloc()
            fork_pre_allocation = (
                fork.transitions_to().pre_allocation_blockchain()
            )
            assert expected_deploy_address_int in fork_pre_allocation
            expected_code = fork_pre_allocation[expected_deploy_address_int][
                "code"
            ]
            # Note: balance check is omitted; it may be modified by the
            # underlying, decorated test
            account_kwargs = {
                "code": expected_code,
                "nonce": 1,
            }
            if expected_system_contract_storage:
                account_kwargs["storage"] = expected_system_contract_storage
            if (
                test_type != DeploymentTestType.DEPLOY_AFTER_FORK
                or not fail_on_empty_code
            ):
                post[expected_deploy_address] = Account(**account_kwargs)
                post[deployer_address] = Account(
                    nonce=1,
                )

            # Extra blocks (if any) returned by the decorated function to add
            # after the contract is deployed.
            if (
                test_type != DeploymentTestType.DEPLOY_AFTER_FORK
                or not fail_on_empty_code
            ):
                # Only fill more blocks if the deploy block does not fail.
                blocks += list(
                    func(fork=fork, pre=pre, post=post, test_type=test_type)
                )

            blockchain_test(
                pre=pre,
                blocks=blocks,
                post=post,
            )

        wrapper.__name__ = func.__name__  # type: ignore
        wrapper.__doc__ = func.__doc__

        return wrapper

    return decorator

generate_system_contract_error_test(*, max_gas_limit)

Generate a test that verifies the correct behavior when a system contract fails execution.

Parametrizations required: - system_contract (Address): The address of the system contract to deploy. - valid_from (Fork): The fork from which the test is valid.

Parameters:

Name Type Description Default
max_gas_limit int

The maximum gas limit for the system transaction.

required
Source code in packages/testing/src/execution_testing/tools/utility/generators.py
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
def generate_system_contract_error_test(
    *,
    max_gas_limit: int,
) -> Callable[[SystemContractDeployTestFunction], Callable]:
    """
    Generate a test that verifies the correct behavior when a system contract
    fails execution.

    Parametrizations required:
    - system_contract (Address): The address of the system contract to deploy.
    - valid_from (Fork): The fork from which the test is valid.

    Arguments:
      max_gas_limit (int): The maximum gas limit for the system transaction.

    """

    def decorator(func: SystemContractDeployTestFunction) -> Callable:
        @pytest.mark.parametrize(
            "test_type", [v.param() for v in SystemContractTestType]
        )
        @pytest.mark.execute(pytest.mark.skip(reason="modifies pre-alloc"))
        def wrapper(
            blockchain_test: BlockchainTestFiller,
            pre: Alloc,
            test_type: SystemContractTestType,
            system_contract: Address,
            fork: Fork,
        ) -> None:
            modified_system_contract_code = Bytecode()

            # Depending on the test case, we need to modify the system contract
            # code accordingly.
            if (
                test_type == SystemContractTestType.GAS_LIMIT
                or test_type == SystemContractTestType.OUT_OF_GAS_ERROR
            ):
                # Run code so that it reaches the gas limit.
                gas_costs = fork.gas_costs()
                # The code works by storing N values to storage, and N is
                # calculated based on the gas costs for the given fork. This
                # code will only work once, so if the system contract is re-
                # executed in a subsequent block, it will consume less gas.
                gas_used_per_storage = (
                    gas_costs.STORAGE_SET
                    + gas_costs.COLD_STORAGE_ACCESS
                    + (gas_costs.VERY_LOW * 2)
                )
                modified_system_contract_code += sum(
                    Op.SSTORE(i, 1)
                    for i in range(max_gas_limit // gas_used_per_storage)
                )
                # If the gas limit is not divisible by the gas used per
                # storage, we need to add some NO-OP (JUMPDEST) to the code
                # that each consume 1 gas.
                assert gas_costs.OPCODE_JUMPDEST == 1, (
                    "JUMPDEST gas cost should be 1, but got "
                    f"{gas_costs.OPCODE_JUMPDEST}. Generator "
                    "`generate_system_contract_error_test` needs updating."
                )
                modified_system_contract_code += sum(
                    Op.JUMPDEST
                    for _ in range(max_gas_limit % gas_used_per_storage)
                )

                if test_type == SystemContractTestType.OUT_OF_GAS_ERROR:
                    # If the test type is OUT_OF_GAS_ERROR, we need to add a
                    # JUMPDEST to the code to ensure that we go over the limit
                    # by one gas.
                    modified_system_contract_code += Op.JUMPDEST
                modified_system_contract_code += Op.STOP
            elif test_type == SystemContractTestType.REVERT_ERROR:
                # Run a simple revert.
                modified_system_contract_code = Op.REVERT(0, 0)
            elif test_type == SystemContractTestType.EXCEPTION_ERROR:
                # Run a simple exception.
                modified_system_contract_code = Op.INVALID()
            else:
                raise ValueError(f"Invalid test type: {test_type}")

            pre[system_contract] = Account(
                code=modified_system_contract_code,
                nonce=1,
                balance=0,
            )

            # Simple test transaction to verify the block failed to modify the
            # state.
            value_receiver = pre.fund_eoa(amount=0)
            test_tx = Transaction(
                to=value_receiver,
                value=1,
                gas_limit=100_000,
                sender=pre.fund_eoa(),
            )
            post = Alloc()
            post[value_receiver] = (
                Account.NONEXISTENT
                if test_type != SystemContractTestType.GAS_LIMIT
                else Account(
                    balance=1,
                )
            )

            blockchain_test(
                pre=pre,
                blocks=[
                    Block(  # Deployment block
                        txs=[test_tx],
                        exception=BlockException.SYSTEM_CONTRACT_CALL_FAILED
                        if test_type != SystemContractTestType.GAS_LIMIT
                        else None,
                    )
                ],
                post=post,
            )

        wrapper.__name__ = func.__name__  # type: ignore
        wrapper.__doc__ = func.__doc__

        return wrapper

    return decorator

extend_with_defaults(defaults, cases, **parametrize_kwargs)

Extend test cases with default parameter values.

This utility function extends test case parameters by adding default values from the defaults dictionary to each case in the cases list. If a case already specifies a value for a parameter, its default is ignored.

This function is particularly useful in scenarios where you want to define a common set of default values but allow individual test cases to override them as needed.

The function returns a dictionary that can be directly unpacked and passed to the @pytest.mark.parametrize decorator.

Parameters:

Name Type Description Default
defaults Dict[str, Any]

A dictionary of default parameter names and their values. These values will be added to each case unless the case already defines a value for each parameter.

required
cases List[ParameterSet]

A list of pytest.param objects representing different test cases. Its first argument must be a dictionary defining parameter names and values.

required
parametrize_kwargs Any

Additional keyword arguments to be passed to @pytest.mark.parametrize. These arguments are not modified by this function and are passed through unchanged.

{}

Returns:

Type Description
Dict[str, Any]

Dict[str, Any]: A dictionary with the following structure: argnames: A list of parameter names. argvalues: A list of test cases with modified parameter values. parametrize_kwargs: Additional keyword arguments passed through unchanged.

Example:

@pytest.mark.parametrize(**extend_with_defaults(
    defaults=dict(
        min_value=0,  # default minimum value is 0
        max_value=100, # default maximum value is 100
        average=50,  # default average value is 50
    ),

    cases=[
        pytest.param(
            dict(),  # use default
            values id='default_case',
        ),

        pytest.param(
            dict(min_value=10),  # override with min_value=10
            id='min_value_10',
        ),

        pytest.param(
            dict(max_value=200),  # override with max_value=200
            id='max_value_200',
        ),

        pytest.param(
            dict(min_value=-10, max_value=50),  # override both min_value
                                                # and max_value
            id='min_-10_max_50',
        ),

        pytest.param(
            # all defaults are overridden
            dict(min_value=20, max_value=80, average=50),
            id="min_20_max_80_avg_50",
        ),

        pytest.param(
            dict(min_value=100, max_value=0),  # invalid range
            id='invalid_range',
            marks=pytest.mark.xfail(reason='invalid range'),
        )
    ],
))
def test_range(min_value, max_value, average):
    assert min_value <= max_value
    assert min_value <= average <= max_value

The above test will execute with the following sets of parameters:

"default_case": {"min_value": 0, "max_value": 100, "average": 50}
"min_value_10": {"min_value": 10, "max_value": 100, "average": 50}
"max_value_200": {"min_value": 0, "max_value": 200, "average": 50}
"min_-10_max_50": {"min_value": -10, "max_value": 50, "average": 50}
"min_20_max_80_avg_50": {"min_value": 20, "max_value": 80, "average": 50}
# expected to fail
"invalid_range": {"min_value": 100, "max_value": 0, "average": 50}

Notes: - Each case in cases must contain exactly one value, which is a dictionary of parameter values. - The function performs an in-place update of the cases list, so the original cases list is modified.

Source code in packages/testing/src/execution_testing/tools/utility/pytest.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
def extend_with_defaults(
    defaults: Dict[str, Any],
    cases: List[ParameterSet],
    **parametrize_kwargs: Any,
) -> Dict[str, Any]:
    """
    Extend test cases with default parameter values.

    This utility function extends test case parameters by adding default values
    from the `defaults` dictionary to each case in the `cases` list. If a case
    already specifies a value for a parameter, its default is ignored.

    This function is particularly useful in scenarios where you want to define
    a common set of default values but allow individual test cases to override
    them as needed.

    The function returns a dictionary that can be directly unpacked and passed
    to the `@pytest.mark.parametrize` decorator.

    Arguments:
      defaults (Dict[str, Any]): A dictionary of default parameter names
                                 and their values. These values will be added
                                 to each case unless the case already defines
                                 a value for each parameter.
      cases (List[ParameterSet]): A list of `pytest.param` objects
                                  representing different test cases.
                                  Its first argument must be a dictionary
                                  defining parameter names and values.
      parametrize_kwargs (Any): Additional keyword arguments to be passed to
                              `@pytest.mark.parametrize`. These arguments are
                              not modified by this function and are passed
                              through unchanged.

    Returns:
      Dict[str, Any]: A dictionary with the following structure:
          `argnames`: A list of parameter names.
          `argvalues`: A list of test cases with modified parameter values.
          `parametrize_kwargs`: Additional keyword arguments passed
                                through unchanged.


    Example:
    ```python
    @pytest.mark.parametrize(**extend_with_defaults(
        defaults=dict(
            min_value=0,  # default minimum value is 0
            max_value=100, # default maximum value is 100
            average=50,  # default average value is 50
        ),

        cases=[
            pytest.param(
                dict(),  # use default
                values id='default_case',
            ),

            pytest.param(
                dict(min_value=10),  # override with min_value=10
                id='min_value_10',
            ),

            pytest.param(
                dict(max_value=200),  # override with max_value=200
                id='max_value_200',
            ),

            pytest.param(
                dict(min_value=-10, max_value=50),  # override both min_value
                                                    # and max_value
                id='min_-10_max_50',
            ),

            pytest.param(
                # all defaults are overridden
                dict(min_value=20, max_value=80, average=50),
                id="min_20_max_80_avg_50",
            ),

            pytest.param(
                dict(min_value=100, max_value=0),  # invalid range
                id='invalid_range',
                marks=pytest.mark.xfail(reason='invalid range'),
            )
        ],
    ))
    def test_range(min_value, max_value, average):
        assert min_value <= max_value
        assert min_value <= average <= max_value
    ```

    The above test will execute with the following sets of parameters:

    ```python
    "default_case": {"min_value": 0, "max_value": 100, "average": 50}
    "min_value_10": {"min_value": 10, "max_value": 100, "average": 50}
    "max_value_200": {"min_value": 0, "max_value": 200, "average": 50}
    "min_-10_max_50": {"min_value": -10, "max_value": 50, "average": 50}
    "min_20_max_80_avg_50": {"min_value": 20, "max_value": 80, "average": 50}
    # expected to fail
    "invalid_range": {"min_value": 100, "max_value": 0, "average": 50}
    ```

    Notes:
    - Each case in `cases` must contain exactly one value, which is a
      dictionary of parameter values.
    - The function performs an in-place update of the `cases` list, so
      the original `cases` list is modified.

    """
    for i, case in enumerate(cases):
        if not (len(case.values) == 1 and isinstance(case.values[0], dict)):
            raise ValueError(
                "each case must contain exactly one value; "
                "a dict of parameter values"
            )
        if set(case.values[0].keys()) - set(defaults.keys()):
            raise UnknownParameterInCasesError()
        # Overwrite values in defaults if the parameter is present in the test
        # case values
        merged_params = {**defaults, **case.values[0]}
        cases[i] = pytest.param(
            *merged_params.values(), id=case.id, marks=case.marks
        )

    return {
        "argnames": list(defaults),
        "argvalues": cases,
        **parametrize_kwargs,
    }

get_current_commit_hash_or_tag(repo_path='.', shorten_hash=False)

Get the latest commit tag or commit hash from the repository.

If a tag points to the current commit, return the tag name. If no tag exists: - If shorten_hash is True, return the first 8 characters of the commit hash. - Otherwise, return the full commit hash.

Source code in packages/testing/src/execution_testing/tools/utility/versioning.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def get_current_commit_hash_or_tag(
    repo_path: str = ".", shorten_hash: bool = False
) -> str:
    """
    Get the latest commit tag or commit hash from the repository.

    If a tag points to the current commit, return the tag name. If no tag
    exists:
      - If shorten_hash is True, return the first 8 characters of the
        commit hash.
      - Otherwise, return the full commit hash.
    """
    try:
        repo = Repo(repo_path)
        current_commit = repo.head.commit
        # Check if current commit has a tag using lookup
        for tag in repo.tags:
            if tag.commit == current_commit:
                return tag.name
        # No tag found, return commit hash
        return (
            current_commit.hexsha[:8]
            if shorten_hash
            else current_commit.hexsha
        )
    except InvalidGitRepositoryError:
        # Handle the case where the repository is not a valid Git repository
        return "Not a git repository; only seen in framework tests."