Skip to content

test_consolidation_requests_during_fork()

Documentation for tests/prague/eip7251_consolidations/test_consolidations_during_fork.py::test_consolidation_requests_during_fork@20373115.

Generate fixtures for these test cases for Prague with:

fill -v tests/prague/eip7251_consolidations/test_consolidations_during_fork.py::test_consolidation_requests_during_fork --fork Prague

Test making a consolidation request to the beacon chain at the time of the fork.

Source code in tests/prague/eip7251_consolidations/test_consolidations_during_fork.py
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
@pytest.mark.parametrize(
    "blocks_consolidation_requests",
    [
        pytest.param(
            [
                [],  # No consolidation requests, but we deploy the contract
                [
                    ConsolidationRequestTransaction(
                        requests=[
                            ConsolidationRequest(
                                source_pubkey=0x01,
                                target_pubkey=0x02,
                                fee=Spec.get_fee(10),
                                # Pre-fork consolidation request
                                valid=False,
                            )
                        ],
                    ),
                ],
                [
                    ConsolidationRequestTransaction(
                        requests=[
                            ConsolidationRequest(
                                source_pubkey=0x03,
                                target_pubkey=0x04,
                                fee=Spec.get_fee(10),
                                # First post-fork consolidation request, will
                                # not be included because the inhibitor is
                                # cleared at the end of the block
                                valid=False,
                            )
                        ],
                    ),
                ],
                [
                    ConsolidationRequestTransaction(
                        requests=[
                            ConsolidationRequest(
                                source_pubkey=0x05,
                                target_pubkey=0x06,
                                fee=Spec.get_fee(0),
                                # First consolidation that is valid
                                valid=True,
                            )
                        ],
                    ),
                ],
            ],
            id="one_valid_request_second_block_after_fork",
        ),
    ],
)
@pytest.mark.parametrize("timestamp", [15_000 - BLOCKS_BEFORE_FORK], ids=[""])
@pytest.mark.pre_alloc_mutable
def test_consolidation_requests_during_fork(
    blockchain_test: BlockchainTestFiller,
    blocks: List[Block],
    pre: Alloc,
) -> None:
    """
    Test making a consolidation request to the beacon chain at the time of the
    fork.
    """
    # We need to delete the deployed contract that comes by default in the pre
    # state.
    pre[Spec.CONSOLIDATION_REQUEST_PREDEPLOY_ADDRESS] = Account(
        balance=0,
        code=bytes(),
        nonce=0,
        storage={},
    )

    with open(
        Path(realpath(__file__)).parent / "contract_deploy_tx.json", mode="r"
    ) as f:
        deploy_tx = Transaction.model_validate_json(
            f.read()
        ).with_signature_and_sender()

    deployer_address = deploy_tx.sender
    assert deployer_address is not None
    assert (
        Address(deployer_address)
        == Spec.CONSOLIDATION_REQUEST_PREDEPLOY_SENDER
    )

    tx_gas_price = deploy_tx.gas_price
    assert tx_gas_price is not None
    deployer_required_balance = deploy_tx.gas_limit * tx_gas_price

    pre.fund_address(
        Spec.CONSOLIDATION_REQUEST_PREDEPLOY_SENDER, deployer_required_balance
    )

    # Append the deployment transaction to the first block
    blocks[0].txs.append(deploy_tx)

    blockchain_test(
        genesis_environment=Environment(),
        pre=pre,
        post={},
        blocks=blocks,
    )

Parametrized Test Cases

This test generates 1 parametrized test case across 1 fork.