Skip to content

Ethereum CLIs Package

Library of Python wrappers for the different implementations of transition tools.

BlockExceptionWithMessage

Bases: ExceptionWithMessage[BlockException]

Block exception with message.

Source code in packages/testing/src/execution_testing/client_clis/cli_types.py
59
60
61
62
class BlockExceptionWithMessage(ExceptionWithMessage[BlockException]):
    """Block exception with message."""

    pass

LazyAlloc dataclass

Bases: Generic[TRaw]

Allocation that is lazily loaded from the transition tool response.

Source code in packages/testing/src/execution_testing/client_clis/cli_types.py
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
@dataclass(kw_only=True)
class LazyAlloc(Generic[TRaw]):
    """
    Allocation that is lazily loaded from the transition tool response.
    """

    raw: TRaw
    _state_root: Hash
    alloc: Alloc | None = None

    def validate(self) -> Alloc:
        """Validate the alloc."""
        raise NotImplementedError("validate method not implemented.")

    def get(self) -> Alloc:
        """Model validate the allocation and return it."""
        if self.alloc is None:
            self.alloc = self.validate()
        return self.alloc

    def state_root(self) -> Hash:
        """Return state root of the allocation."""
        return self._state_root

validate()

Validate the alloc.

Source code in packages/testing/src/execution_testing/client_clis/cli_types.py
419
420
421
def validate(self) -> Alloc:
    """Validate the alloc."""
    raise NotImplementedError("validate method not implemented.")

get()

Model validate the allocation and return it.

Source code in packages/testing/src/execution_testing/client_clis/cli_types.py
423
424
425
426
427
def get(self) -> Alloc:
    """Model validate the allocation and return it."""
    if self.alloc is None:
        self.alloc = self.validate()
    return self.alloc

state_root()

Return state root of the allocation.

Source code in packages/testing/src/execution_testing/client_clis/cli_types.py
429
430
431
def state_root(self) -> Hash:
    """Return state root of the allocation."""
    return self._state_root

Result

Bases: CamelModel

Result of a transition tool output.

Source code in packages/testing/src/execution_testing/client_clis/cli_types.py
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
class Result(CamelModel):
    """Result of a transition tool output."""

    state_root: Hash
    ommers_hash: Hash | None = Field(None, validation_alias="sha3Uncles")
    transactions_trie: Hash = Field(..., alias="txRoot")
    receipts_root: Hash
    logs_hash: Hash
    logs_bloom: Bloom
    receipts: List[TransactionReceipt]
    rejected_transactions: List[RejectedTransaction] = Field(
        default_factory=list, alias="rejected"
    )
    difficulty: HexNumber | None = Field(None, alias="currentDifficulty")
    gas_used: HexNumber
    base_fee_per_gas: HexNumber | None = Field(None, alias="currentBaseFee")
    withdrawals_root: Hash | None = None
    excess_blob_gas: HexNumber | None = Field(
        None, alias="currentExcessBlobGas"
    )
    blob_gas_used: HexNumber | None = None
    requests_hash: Hash | None = None
    requests: List[Bytes] | None = None
    block_access_list: Bytes | None = None
    block_access_list_hash: Hash | None = None
    block_exception: Annotated[
        BlockExceptionWithMessage | UndefinedException | None,
        ExceptionMapperValidator,
    ] = None
    traces: Traces | None = None
    opcode_count: OpcodeCount | None = None

TraceFieldDiff

Bases: NamedTuple

A single diff entry from TransactionTraces.compare().

line_index is None for structural diffs (trace_length, output, gas_used). Field dicts map field name to string value.

Source code in packages/testing/src/execution_testing/client_clis/cli_types.py
134
135
136
137
138
139
140
141
142
143
144
class TraceFieldDiff(NamedTuple):
    """
    A single diff entry from TransactionTraces.compare().

    line_index is None for structural diffs (trace_length, output,
    gas_used). Field dicts map field name to string value.
    """

    line_index: int | None
    baseline_fields: dict[str, str]
    current_fields: dict[str, str]

Traces

Bases: EthereumTestRootModel

Traces returned from the transition tool for all transactions executed.

Source code in packages/testing/src/execution_testing/client_clis/cli_types.py
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
class Traces(EthereumTestRootModel):
    """
    Traces returned from the transition tool for all transactions executed.
    """

    root: List[TransactionTraces]

    def append(self, item: TransactionTraces) -> None:
        """Append the transaction traces to the current list."""
        self.root.append(item)

    def are_equivalent(
        self, other: Self | None, ignore_gas_differences: bool
    ) -> bool:
        """Return True if the only difference is the gas counter."""
        if other is None:
            return False
        if len(self.root) != len(other.root):
            return False
        for i in range(len(self.root)):
            if not self.root[i].are_equivalent(
                other.root[i], ignore_gas_differences
            ):
                logger.debug(f"Trace file {i} is not equivalent.")
                return False
            else:
                logger.debug(f"Trace file {i} is equivalent.")
        logger.debug("All traces are equivalent.")
        return True

    def print(self) -> None:
        """Print the traces in a readable format."""
        for tx_number, tx in enumerate(self.root):
            print(f"Transaction {tx_number}:")
            tx.print()

append(item)

Append the transaction traces to the current list.

Source code in packages/testing/src/execution_testing/client_clis/cli_types.py
293
294
295
def append(self, item: TransactionTraces) -> None:
    """Append the transaction traces to the current list."""
    self.root.append(item)

are_equivalent(other, ignore_gas_differences)

Return True if the only difference is the gas counter.

Source code in packages/testing/src/execution_testing/client_clis/cli_types.py
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
def are_equivalent(
    self, other: Self | None, ignore_gas_differences: bool
) -> bool:
    """Return True if the only difference is the gas counter."""
    if other is None:
        return False
    if len(self.root) != len(other.root):
        return False
    for i in range(len(self.root)):
        if not self.root[i].are_equivalent(
            other.root[i], ignore_gas_differences
        ):
            logger.debug(f"Trace file {i} is not equivalent.")
            return False
        else:
            logger.debug(f"Trace file {i} is equivalent.")
    logger.debug("All traces are equivalent.")
    return True

print()

Print the traces in a readable format.

Source code in packages/testing/src/execution_testing/client_clis/cli_types.py
316
317
318
319
320
def print(self) -> None:
    """Print the traces in a readable format."""
    for tx_number, tx in enumerate(self.root):
        print(f"Transaction {tx_number}:")
        tx.print()

TransactionExceptionWithMessage

Bases: ExceptionWithMessage[TransactionException]

Transaction exception with message.

Source code in packages/testing/src/execution_testing/client_clis/cli_types.py
51
52
53
54
55
56
class TransactionExceptionWithMessage(
    ExceptionWithMessage[TransactionException]
):
    """Transaction exception with message."""

    pass

TransitionToolOutput dataclass

Transition tool output.

Source code in packages/testing/src/execution_testing/client_clis/cli_types.py
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
@dataclass
class TransitionToolOutput:
    """Transition tool output."""

    alloc: LazyAlloc
    result: Result
    body: Bytes | None = None

    @classmethod
    def model_validate_files(
        cls, directory_path: Path, *, context: Any | None = None
    ) -> "Self":
        """
        Validate the model from the file system where each key is a
        different JSON file.
        """
        alloc_data = (directory_path / "alloc.json").read_text()
        result_data = (directory_path / "result.json").read_text()
        result = Result.model_validate_json(
            json_data=result_data, context=context
        )
        alloc = LazyAllocStr(raw=alloc_data, _state_root=result.state_root)
        output = cls(result=result, alloc=alloc)
        return output

    @classmethod
    def model_validate(
        cls, response_json: Dict, *, context: Any | None = None
    ) -> "Self":
        """
        Validate the model from the file system where each key is a
        different JSON file.
        """
        result = Result.model_validate(
            obj=response_json["result"], context=context
        )
        alloc = LazyAllocJson(
            raw=response_json["alloc"], _state_root=result.state_root
        )
        output = cls(result=result, alloc=alloc)
        return output

    @classmethod
    def model_validate_json(
        cls, response_json: str | bytes, *, context: Any | None = None
    ) -> "Self":
        """
        Validate the model from a JSON string.
        """
        # Manually parsing from a JSON string is tricky.
        # We parse using json.loads and then validate.
        parsed_json = json.loads(response_json)
        result = Result.model_validate(
            obj=parsed_json["result"], context=context
        )
        alloc = LazyAllocStr(
            raw=json.dumps(parsed_json["alloc"]), _state_root=result.state_root
        )
        output = cls(result=result, alloc=alloc)
        return output

model_validate_files(directory_path, *, context=None) classmethod

Validate the model from the file system where each key is a different JSON file.

Source code in packages/testing/src/execution_testing/client_clis/cli_types.py
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
@classmethod
def model_validate_files(
    cls, directory_path: Path, *, context: Any | None = None
) -> "Self":
    """
    Validate the model from the file system where each key is a
    different JSON file.
    """
    alloc_data = (directory_path / "alloc.json").read_text()
    result_data = (directory_path / "result.json").read_text()
    result = Result.model_validate_json(
        json_data=result_data, context=context
    )
    alloc = LazyAllocStr(raw=alloc_data, _state_root=result.state_root)
    output = cls(result=result, alloc=alloc)
    return output

model_validate(response_json, *, context=None) classmethod

Validate the model from the file system where each key is a different JSON file.

Source code in packages/testing/src/execution_testing/client_clis/cli_types.py
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
@classmethod
def model_validate(
    cls, response_json: Dict, *, context: Any | None = None
) -> "Self":
    """
    Validate the model from the file system where each key is a
    different JSON file.
    """
    result = Result.model_validate(
        obj=response_json["result"], context=context
    )
    alloc = LazyAllocJson(
        raw=response_json["alloc"], _state_root=result.state_root
    )
    output = cls(result=result, alloc=alloc)
    return output

model_validate_json(response_json, *, context=None) classmethod

Validate the model from a JSON string.

Source code in packages/testing/src/execution_testing/client_clis/cli_types.py
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
@classmethod
def model_validate_json(
    cls, response_json: str | bytes, *, context: Any | None = None
) -> "Self":
    """
    Validate the model from a JSON string.
    """
    # Manually parsing from a JSON string is tricky.
    # We parse using json.loads and then validate.
    parsed_json = json.loads(response_json)
    result = Result.model_validate(
        obj=parsed_json["result"], context=context
    )
    alloc = LazyAllocStr(
        raw=json.dumps(parsed_json["alloc"]), _state_root=result.state_root
    )
    output = cls(result=result, alloc=alloc)
    return output

BesuFixtureConsumer

Bases: BesuEvmTool, FixtureConsumerTool

Besu's implementation of the fixture consumer.

Source code in packages/testing/src/execution_testing/client_clis/clis/besu.py
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
class BesuFixtureConsumer(
    BesuEvmTool,
    FixtureConsumerTool,
    fixture_formats=[StateFixture, BlockchainFixture],
):
    """Besu's implementation of the fixture consumer."""

    def consume_blockchain_test(
        self,
        fixture_path: Path,
        fixture_name: Optional[str] = None,
        debug_output_path: Optional[Path] = None,
    ) -> None:
        """
        Consume a single blockchain test.

        Besu's ``evmtool block-test`` accepts ``--test-name`` to
        select a specific fixture from the file.
        """
        subcommand = "block-test"
        subcommand_options: List[str] = []
        if debug_output_path:
            subcommand_options += ["--json"]

        if fixture_name:
            subcommand_options += [
                "--test-name",
                fixture_name,
            ]

        command = (
            [str(self.binary)]
            + [subcommand]
            + subcommand_options
            + [str(fixture_path)]
        )

        result = self._run_command(command)

        if debug_output_path:
            self._consume_debug_dump(
                command, result, fixture_path, debug_output_path
            )

        if result.returncode != 0:
            raise Exception(
                f"Unexpected exit code:\n{' '.join(command)}\n\n"
                f"Error:\n{result.stderr}"
            )

        # Parse text output for failures
        stdout = result.stdout
        if "Failed:" in stdout:
            failed_match = re.search(r"Failed:\s+(\d+)", stdout)
            if failed_match and int(failed_match.group(1)) > 0:
                raise Exception(f"Blockchain test failed:\n{stdout}")

    @cache  # noqa
    def consume_state_test_file(
        self,
        fixture_path: Path,
        debug_output_path: Optional[Path] = None,
    ) -> List[Dict[str, Any]]:
        """
        Consume an entire state test file.

        Besu's ``evmtool state-test`` outputs one JSON object per
        line (NDJSON) with a ``test`` field instead of ``name``.
        This method normalizes the output to match the expected
        format.
        """
        subcommand = "state-test"
        subcommand_options: List[str] = []
        if debug_output_path:
            subcommand_options += ["--json"]

        command = (
            [str(self.binary)]
            + [subcommand]
            + subcommand_options
            + [str(fixture_path)]
        )
        result = self._run_command(command)

        if debug_output_path:
            self._consume_debug_dump(
                command, result, fixture_path, debug_output_path
            )

        if result.returncode != 0:
            raise Exception(
                f"Unexpected exit code:\n{' '.join(command)}\n\n"
                f"Error:\n{result.stderr}"
            )

        # Parse NDJSON output, normalize "test" -> "name"
        results: List[Dict[str, Any]] = []
        for line in result.stdout.strip().splitlines():
            line = line.strip()
            if not line:
                continue
            try:
                entry = json.loads(line)
                if "test" in entry and "name" not in entry:
                    entry["name"] = entry["test"]
                results.append(entry)
            except json.JSONDecodeError as e:
                raise Exception(
                    f"Failed to parse Besu state-test output as JSON.\n"
                    f"Offending line:\n{line}\n\n"
                    f"Error: {e}"
                ) from e
        return results

    def consume_state_test(
        self,
        fixture_path: Path,
        fixture_name: Optional[str] = None,
        debug_output_path: Optional[Path] = None,
    ) -> None:
        """
        Consume a single state test.

        Uses the cached result from ``consume_state_test_file``
        and selects the requested fixture by name.
        """
        file_results = self.consume_state_test_file(
            fixture_path=fixture_path,
            debug_output_path=debug_output_path,
        )
        if fixture_name:
            test_result = [
                r for r in file_results if r["name"] == fixture_name
            ]
            assert len(test_result) < 2, (
                f"Multiple test results for {fixture_name}"
            )
            assert len(test_result) == 1, (
                f"Test result for {fixture_name} missing"
            )
            assert test_result[0]["pass"], (
                f"State test failed: "
                f"{test_result[0].get('error', 'unknown error')}"
            )
        else:
            if any(not r["pass"] for r in file_results):
                exception_text = "State test failed: \n" + "\n".join(
                    f"{r['name']}: " + r.get("error", "unknown error")
                    for r in file_results
                    if not r["pass"]
                )
                raise Exception(exception_text)

    def consume_fixture(
        self,
        fixture_format: FixtureFormat,
        fixture_path: Path,
        fixture_name: Optional[str] = None,
        debug_output_path: Optional[Path] = None,
    ) -> None:
        """
        Execute the appropriate Besu fixture consumer for the
        fixture at ``fixture_path``.
        """
        if fixture_format == BlockchainFixture:
            self.consume_blockchain_test(
                fixture_path=fixture_path,
                fixture_name=fixture_name,
                debug_output_path=debug_output_path,
            )
        elif fixture_format == StateFixture:
            self.consume_state_test(
                fixture_path=fixture_path,
                fixture_name=fixture_name,
                debug_output_path=debug_output_path,
            )
        else:
            raise Exception(
                f"Fixture format {fixture_format.format_name} "
                f"not supported by {self.binary}"
            )

consume_blockchain_test(fixture_path, fixture_name=None, debug_output_path=None)

Consume a single blockchain test.

Besu's evmtool block-test accepts --test-name to select a specific fixture from the file.

Source code in packages/testing/src/execution_testing/client_clis/clis/besu.py
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
def consume_blockchain_test(
    self,
    fixture_path: Path,
    fixture_name: Optional[str] = None,
    debug_output_path: Optional[Path] = None,
) -> None:
    """
    Consume a single blockchain test.

    Besu's ``evmtool block-test`` accepts ``--test-name`` to
    select a specific fixture from the file.
    """
    subcommand = "block-test"
    subcommand_options: List[str] = []
    if debug_output_path:
        subcommand_options += ["--json"]

    if fixture_name:
        subcommand_options += [
            "--test-name",
            fixture_name,
        ]

    command = (
        [str(self.binary)]
        + [subcommand]
        + subcommand_options
        + [str(fixture_path)]
    )

    result = self._run_command(command)

    if debug_output_path:
        self._consume_debug_dump(
            command, result, fixture_path, debug_output_path
        )

    if result.returncode != 0:
        raise Exception(
            f"Unexpected exit code:\n{' '.join(command)}\n\n"
            f"Error:\n{result.stderr}"
        )

    # Parse text output for failures
    stdout = result.stdout
    if "Failed:" in stdout:
        failed_match = re.search(r"Failed:\s+(\d+)", stdout)
        if failed_match and int(failed_match.group(1)) > 0:
            raise Exception(f"Blockchain test failed:\n{stdout}")

consume_state_test_file(fixture_path, debug_output_path=None) cached

Consume an entire state test file.

Besu's evmtool state-test outputs one JSON object per line (NDJSON) with a test field instead of name. This method normalizes the output to match the expected format.

Source code in packages/testing/src/execution_testing/client_clis/clis/besu.py
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
@cache  # noqa
def consume_state_test_file(
    self,
    fixture_path: Path,
    debug_output_path: Optional[Path] = None,
) -> List[Dict[str, Any]]:
    """
    Consume an entire state test file.

    Besu's ``evmtool state-test`` outputs one JSON object per
    line (NDJSON) with a ``test`` field instead of ``name``.
    This method normalizes the output to match the expected
    format.
    """
    subcommand = "state-test"
    subcommand_options: List[str] = []
    if debug_output_path:
        subcommand_options += ["--json"]

    command = (
        [str(self.binary)]
        + [subcommand]
        + subcommand_options
        + [str(fixture_path)]
    )
    result = self._run_command(command)

    if debug_output_path:
        self._consume_debug_dump(
            command, result, fixture_path, debug_output_path
        )

    if result.returncode != 0:
        raise Exception(
            f"Unexpected exit code:\n{' '.join(command)}\n\n"
            f"Error:\n{result.stderr}"
        )

    # Parse NDJSON output, normalize "test" -> "name"
    results: List[Dict[str, Any]] = []
    for line in result.stdout.strip().splitlines():
        line = line.strip()
        if not line:
            continue
        try:
            entry = json.loads(line)
            if "test" in entry and "name" not in entry:
                entry["name"] = entry["test"]
            results.append(entry)
        except json.JSONDecodeError as e:
            raise Exception(
                f"Failed to parse Besu state-test output as JSON.\n"
                f"Offending line:\n{line}\n\n"
                f"Error: {e}"
            ) from e
    return results

consume_state_test(fixture_path, fixture_name=None, debug_output_path=None)

Consume a single state test.

Uses the cached result from consume_state_test_file and selects the requested fixture by name.

Source code in packages/testing/src/execution_testing/client_clis/clis/besu.py
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
def consume_state_test(
    self,
    fixture_path: Path,
    fixture_name: Optional[str] = None,
    debug_output_path: Optional[Path] = None,
) -> None:
    """
    Consume a single state test.

    Uses the cached result from ``consume_state_test_file``
    and selects the requested fixture by name.
    """
    file_results = self.consume_state_test_file(
        fixture_path=fixture_path,
        debug_output_path=debug_output_path,
    )
    if fixture_name:
        test_result = [
            r for r in file_results if r["name"] == fixture_name
        ]
        assert len(test_result) < 2, (
            f"Multiple test results for {fixture_name}"
        )
        assert len(test_result) == 1, (
            f"Test result for {fixture_name} missing"
        )
        assert test_result[0]["pass"], (
            f"State test failed: "
            f"{test_result[0].get('error', 'unknown error')}"
        )
    else:
        if any(not r["pass"] for r in file_results):
            exception_text = "State test failed: \n" + "\n".join(
                f"{r['name']}: " + r.get("error", "unknown error")
                for r in file_results
                if not r["pass"]
            )
            raise Exception(exception_text)

consume_fixture(fixture_format, fixture_path, fixture_name=None, debug_output_path=None)

Execute the appropriate Besu fixture consumer for the fixture at fixture_path.

Source code in packages/testing/src/execution_testing/client_clis/clis/besu.py
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
def consume_fixture(
    self,
    fixture_format: FixtureFormat,
    fixture_path: Path,
    fixture_name: Optional[str] = None,
    debug_output_path: Optional[Path] = None,
) -> None:
    """
    Execute the appropriate Besu fixture consumer for the
    fixture at ``fixture_path``.
    """
    if fixture_format == BlockchainFixture:
        self.consume_blockchain_test(
            fixture_path=fixture_path,
            fixture_name=fixture_name,
            debug_output_path=debug_output_path,
        )
    elif fixture_format == StateFixture:
        self.consume_state_test(
            fixture_path=fixture_path,
            fixture_name=fixture_name,
            debug_output_path=debug_output_path,
        )
    else:
        raise Exception(
            f"Fixture format {fixture_format.format_name} "
            f"not supported by {self.binary}"
        )

BesuTransitionTool

Bases: TransitionTool

Besu EvmTool Transition tool frontend wrapper class.

Source code in packages/testing/src/execution_testing/client_clis/clis/besu.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
class BesuTransitionTool(TransitionTool):
    """Besu EvmTool Transition tool frontend wrapper class."""

    default_binary = Path("evm")
    detect_binary_pattern = BESU_BIN_DETECT_PATTERN
    binary: Path
    cached_version: Optional[str] = None
    trace: bool
    process: Optional[subprocess.Popen] = None
    server_url: str
    besu_trace_dir: Optional[tempfile.TemporaryDirectory]

    supports_xdist: ClassVar[bool] = False

    def __init__(
        self,
        *,
        binary: Optional[Path] = None,
        trace: bool = False,
    ):
        """Initialize the BesuTransitionTool class."""
        super().__init__(
            exception_mapper=BesuExceptionMapper(), binary=binary, trace=trace
        )
        args = [str(self.binary), "t8n", "--help"]
        try:
            result = subprocess.run(args, capture_output=True, text=True)
        except subprocess.CalledProcessError as e:
            raise Exception(
                "evm process unexpectedly returned a non-zero status "
                f"code: {e}."
            ) from e
        except Exception as e:
            raise Exception(
                f"Unexpected exception calling evm tool: {e}."
            ) from e
        self.help_string = result.stdout
        self.besu_trace_dir = (
            tempfile.TemporaryDirectory() if self.trace else None
        )

    def start_server(self) -> None:
        """
        Start the t8n-server process, extract the port, and leave it
        running for future reuse.
        """
        args = [
            str(self.binary),
            "t8n-server",
            "--port=0",  # OS assigned server port
        ]

        if self.trace:
            args.append("--trace")
            if self.besu_trace_dir:
                args.append(f"--output.basedir={self.besu_trace_dir.name}")

        self.process = subprocess.Popen(
            args=args,
            stdout=subprocess.PIPE,
            stderr=subprocess.STDOUT,
        )

        while True:
            if self.process.stdout is None:
                raise Exception("Failed starting Besu subprocess")
            line = str(self.process.stdout.readline())

            if not line or "Failed to start transition server" in line:
                raise Exception("Failed starting Besu subprocess\n" + line)
            if "Transition server listening on" in line:
                match = re.search(
                    "Transition server listening on (\\d+)", line
                )
                if match:
                    port = match.group(1)
                    self.server_url = f"http://localhost:{port}/"
                    break

    def shutdown(self) -> None:
        """Stop the t8n-server process if it was started."""
        if self.process:
            self.process.kill()
        if self.besu_trace_dir:
            self.besu_trace_dir.cleanup()

    def _evaluate(
        self,
        *,
        transition_tool_data: TransitionTool.TransitionToolData,
        debug_output_path: Path | None,
        slow_request: bool,
        profiler: Profiler,
    ) -> TransitionToolOutput:
        """Execute `evm t8n` with the specified arguments."""
        del slow_request, profiler

        if not self.process:
            self.start_server()

        input_json = transition_tool_data.to_input().model_dump(
            mode="json", **model_dump_config
        )

        state_json = {
            "fork": transition_tool_data.fork_name,
            "chainid": transition_tool_data.chain_id,
            "reward": transition_tool_data.reward,
        }

        post_data = {"state": state_json, "input": input_json}

        if debug_output_path:
            post_data_string = json.dumps(post_data, indent=4)
            additional_indent = " " * 16  # for pretty indentation in t8n.sh
            indented_post_data_string = "{\n" + "\n".join(
                additional_indent + line
                for line in post_data_string[1:].splitlines()
            )
            t8n_script = textwrap.dedent(
                f"""\
                #!/bin/bash
                # Use $1 as t8n-server port if provided, else default to 3000
                PORT=${{1:-3000}}
                curl http://localhost:${{PORT}}/ -X POST \\
                -H "Content-Type: application/json" \\
                --data '{indented_post_data_string}'
                """
            )
            dump_files_to_directory(
                debug_output_path,
                {
                    "state.json": state_json,
                    "input/alloc.json": input_json["alloc"],
                    "input/env.json": input_json["env"],
                    "input/txs.json": input_json["txs"],
                    "t8n.sh+x": t8n_script,
                },
            )

        response = requests.post(self.server_url, json=post_data, timeout=5)
        # exception visible in pytest failure output
        response.raise_for_status()
        output: TransitionToolOutput = TransitionToolOutput.model_validate(
            response.json(),
            context={"exception_mapper": self.exception_mapper},
        )

        if debug_output_path:
            dump_files_to_directory(
                debug_output_path,
                {
                    "response.txt": response.text,
                    "status_code.txt": response.status_code,
                    "time_elapsed_seconds.txt": (
                        response.elapsed.total_seconds()
                    ),
                },
            )

        if response.status_code != 200:
            raise Exception(
                f"t8n-server returned status code {response.status_code}, "
                f"response: {response.text}"
            )

        if debug_output_path:
            dump_files_to_directory(
                debug_output_path,
                {
                    "output/alloc.json": output.alloc.raw,
                    "output/result.json": output.result.model_dump(
                        mode="json", **model_dump_config
                    ),
                    "output/txs.rlp": str(output.body),
                },
            )

        if self.trace and self.besu_trace_dir:
            self.collect_traces(
                output.result.receipts, self.besu_trace_dir, debug_output_path
            )
            for i, r in enumerate(output.result.receipts):
                trace_file_name = f"trace-{i}-{r.transaction_hash}.jsonl"
                os.remove(
                    os.path.join(self.besu_trace_dir.name, trace_file_name)
                )

        return output

    def is_fork_supported(self, fork: Fork) -> bool:
        """Return True if the fork is supported by the tool."""
        return fork.transition_tool_name() in self.help_string

__init__(*, binary=None, trace=False)

Initialize the BesuTransitionTool class.

Source code in packages/testing/src/execution_testing/client_clis/clis/besu.py
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
def __init__(
    self,
    *,
    binary: Optional[Path] = None,
    trace: bool = False,
):
    """Initialize the BesuTransitionTool class."""
    super().__init__(
        exception_mapper=BesuExceptionMapper(), binary=binary, trace=trace
    )
    args = [str(self.binary), "t8n", "--help"]
    try:
        result = subprocess.run(args, capture_output=True, text=True)
    except subprocess.CalledProcessError as e:
        raise Exception(
            "evm process unexpectedly returned a non-zero status "
            f"code: {e}."
        ) from e
    except Exception as e:
        raise Exception(
            f"Unexpected exception calling evm tool: {e}."
        ) from e
    self.help_string = result.stdout
    self.besu_trace_dir = (
        tempfile.TemporaryDirectory() if self.trace else None
    )

start_server()

Start the t8n-server process, extract the port, and leave it running for future reuse.

Source code in packages/testing/src/execution_testing/client_clis/clis/besu.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
def start_server(self) -> None:
    """
    Start the t8n-server process, extract the port, and leave it
    running for future reuse.
    """
    args = [
        str(self.binary),
        "t8n-server",
        "--port=0",  # OS assigned server port
    ]

    if self.trace:
        args.append("--trace")
        if self.besu_trace_dir:
            args.append(f"--output.basedir={self.besu_trace_dir.name}")

    self.process = subprocess.Popen(
        args=args,
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,
    )

    while True:
        if self.process.stdout is None:
            raise Exception("Failed starting Besu subprocess")
        line = str(self.process.stdout.readline())

        if not line or "Failed to start transition server" in line:
            raise Exception("Failed starting Besu subprocess\n" + line)
        if "Transition server listening on" in line:
            match = re.search(
                "Transition server listening on (\\d+)", line
            )
            if match:
                port = match.group(1)
                self.server_url = f"http://localhost:{port}/"
                break

shutdown()

Stop the t8n-server process if it was started.

Source code in packages/testing/src/execution_testing/client_clis/clis/besu.py
190
191
192
193
194
195
def shutdown(self) -> None:
    """Stop the t8n-server process if it was started."""
    if self.process:
        self.process.kill()
    if self.besu_trace_dir:
        self.besu_trace_dir.cleanup()

is_fork_supported(fork)

Return True if the fork is supported by the tool.

Source code in packages/testing/src/execution_testing/client_clis/clis/besu.py
301
302
303
def is_fork_supported(self, fork: Fork) -> bool:
    """Return True if the fork is supported by the tool."""
    return fork.transition_tool_name() in self.help_string

EthereumJSTransitionTool

Bases: TransitionTool

EthereumJS Transition tool interface wrapper class.

Source code in packages/testing/src/execution_testing/client_clis/clis/ethereumjs.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
class EthereumJSTransitionTool(TransitionTool):
    """EthereumJS Transition tool interface wrapper class."""

    default_binary = Path("ethereumjs-t8ntool.sh")
    detect_binary_pattern = re.compile(r"^ethereumjs t8n\b")
    version_flag: str = "--version"
    t8n_use_stream = False

    binary: Path
    cached_version: Optional[str] = None
    trace: bool

    def __init__(
        self,
        *,
        binary: Optional[Path] = None,
        trace: bool = False,
    ):
        """Initialize the EthereumJS Transition tool interface."""
        super().__init__(
            exception_mapper=EthereumJSExceptionMapper(),
            binary=binary,
            trace=trace,
        )

    def is_fork_supported(self, fork: Fork) -> bool:
        """
        Return True if the fork is supported by the tool.

        Currently, EthereumJS-t8n provides no way to determine supported forks.
        """
        del fork
        return True

__init__(*, binary=None, trace=False)

Initialize the EthereumJS Transition tool interface.

Source code in packages/testing/src/execution_testing/client_clis/clis/ethereumjs.py
30
31
32
33
34
35
36
37
38
39
40
41
def __init__(
    self,
    *,
    binary: Optional[Path] = None,
    trace: bool = False,
):
    """Initialize the EthereumJS Transition tool interface."""
    super().__init__(
        exception_mapper=EthereumJSExceptionMapper(),
        binary=binary,
        trace=trace,
    )

is_fork_supported(fork)

Return True if the fork is supported by the tool.

Currently, EthereumJS-t8n provides no way to determine supported forks.

Source code in packages/testing/src/execution_testing/client_clis/clis/ethereumjs.py
43
44
45
46
47
48
49
50
def is_fork_supported(self, fork: Fork) -> bool:
    """
    Return True if the fork is supported by the tool.

    Currently, EthereumJS-t8n provides no way to determine supported forks.
    """
    del fork
    return True

EvmOneBlockchainFixtureConsumer

Bases: EvmoneFixtureConsumerCommon, FixtureConsumerTool

Evmone's implementation of the fixture consumer for blockchain tests.

Source code in packages/testing/src/execution_testing/client_clis/clis/evmone.py
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
class EvmOneBlockchainFixtureConsumer(
    EvmoneFixtureConsumerCommon,
    FixtureConsumerTool,
    fixture_formats=[BlockchainFixture],
):
    """Evmone's implementation of the fixture consumer for blockchain tests."""

    default_binary = Path("evmone-blockchaintest")
    detect_binary_pattern = re.compile(r"^evmone-blockchaintest\b")

    def __init__(
        self,
        binary: Optional[Path] = None,
        trace: bool = False,
    ):
        """Initialize the EvmOneBlockchainFixtureConsumer class."""
        self.binary = binary if binary else self.default_binary
        super().__init__(trace=trace)

    def consume_fixture(
        self,
        fixture_format: FixtureFormat,
        fixture_path: Path,
        fixture_name: Optional[str] = None,
        debug_output_path: Optional[Path] = None,
    ) -> None:
        """
        Execute the appropriate fixture consumer for the fixture at
        `fixture_path`.
        """
        if fixture_format == BlockchainFixture:
            self.consume_test(
                fixture_path=fixture_path,
                fixture_name=fixture_name,
                debug_output_path=debug_output_path,
            )
        else:
            pytest.skip(self._skip_message(fixture_format))

__init__(binary=None, trace=False)

Initialize the EvmOneBlockchainFixtureConsumer class.

Source code in packages/testing/src/execution_testing/client_clis/clis/evmone.py
299
300
301
302
303
304
305
306
def __init__(
    self,
    binary: Optional[Path] = None,
    trace: bool = False,
):
    """Initialize the EvmOneBlockchainFixtureConsumer class."""
    self.binary = binary if binary else self.default_binary
    super().__init__(trace=trace)

consume_fixture(fixture_format, fixture_path, fixture_name=None, debug_output_path=None)

Execute the appropriate fixture consumer for the fixture at fixture_path.

Source code in packages/testing/src/execution_testing/client_clis/clis/evmone.py
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
def consume_fixture(
    self,
    fixture_format: FixtureFormat,
    fixture_path: Path,
    fixture_name: Optional[str] = None,
    debug_output_path: Optional[Path] = None,
) -> None:
    """
    Execute the appropriate fixture consumer for the fixture at
    `fixture_path`.
    """
    if fixture_format == BlockchainFixture:
        self.consume_test(
            fixture_path=fixture_path,
            fixture_name=fixture_name,
            debug_output_path=debug_output_path,
        )
    else:
        pytest.skip(self._skip_message(fixture_format))

EvmoneExceptionMapper

Bases: ExceptionMapper

Translate between EEST exceptions and error strings returned by Evmone.

Source code in packages/testing/src/execution_testing/client_clis/clis/evmone.py
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
class EvmoneExceptionMapper(ExceptionMapper):
    """
    Translate between EEST exceptions and error strings returned by Evmone.
    """

    mapping_substring: ClassVar[Dict[ExceptionBase, str]] = {
        TransactionException.SENDER_NOT_EOA: "sender not an eoa:",
        TransactionException.GAS_ALLOWANCE_EXCEEDED: "gas limit reached",
        TransactionException.PRIORITY_GREATER_THAN_MAX_FEE_PER_GAS: (
            "max priority fee per gas higher than max fee per gas"
        ),
        TransactionException.NONCE_IS_MAX: "nonce has max value:",
        TransactionException.TYPE_4_TX_CONTRACT_CREATION: (
            "set code transaction must "
        ),
        TransactionException.TYPE_4_INVALID_AUTHORITY_SIGNATURE: (
            "invalid authorization signature"
        ),
        TransactionException.TYPE_4_INVALID_AUTHORITY_SIGNATURE_S_TOO_HIGH: (
            "authorization signature s value too high"
        ),
        TransactionException.TYPE_4_EMPTY_AUTHORIZATION_LIST: (
            "empty authorization list"
        ),
        TransactionException.INTRINSIC_GAS_TOO_LOW: "intrinsic gas too low",
        TransactionException.INTRINSIC_GAS_BELOW_FLOOR_GAS_COST: (
            "intrinsic gas too low"
        ),
        TransactionException.TYPE_3_TX_MAX_BLOB_GAS_ALLOWANCE_EXCEEDED: (
            "blob gas limit exceeded"
        ),
        TransactionException.INITCODE_SIZE_EXCEEDED: (
            "max initcode size exceeded"
        ),
        TransactionException.INSUFFICIENT_ACCOUNT_FUNDS: (
            "insufficient funds for gas * price + value"
        ),
        TransactionException.INSUFFICIENT_MAX_FEE_PER_GAS: (
            "max fee per gas less than block base fee"
        ),
        TransactionException.INSUFFICIENT_MAX_FEE_PER_BLOB_GAS: (
            "max blob fee per gas less than block base fee"
        ),
        TransactionException.TYPE_4_TX_PRE_FORK: (
            "transaction type not supported"
        ),
        TransactionException.TYPE_3_TX_PRE_FORK: (
            "transaction type not supported"
        ),
        TransactionException.TYPE_2_TX_PRE_FORK: (
            "transaction type not supported"
        ),
        TransactionException.TYPE_1_TX_PRE_FORK: (
            "transaction type not supported"
        ),
        TransactionException.TYPE_3_TX_INVALID_BLOB_VERSIONED_HASH: (
            "invalid blob hash version"
        ),
        TransactionException.TYPE_3_TX_BLOB_COUNT_EXCEEDED: (
            "blob gas limit exceeded"
        ),
        TransactionException.TYPE_3_TX_ZERO_BLOBS: "empty blob hashes list",
        TransactionException.TYPE_3_TX_CONTRACT_CREATION: (
            "blob transaction must not be a create transaction"
        ),
        TransactionException.NONCE_MISMATCH_TOO_LOW: "nonce too low",
        TransactionException.NONCE_MISMATCH_TOO_HIGH: "nonce too high",
        TransactionException.GAS_LIMIT_EXCEEDS_MAXIMUM: (
            "max gas limit exceeded"
        ),
        BlockException.INVALID_DEPOSIT_EVENT_LAYOUT: (
            "invalid deposit event layout"
        ),
        # TODO EVMONE needs to differentiate when the system contract is
        # missing or failing
        BlockException.SYSTEM_CONTRACT_EMPTY: (
            "system contract empty or failed"
        ),
        BlockException.SYSTEM_CONTRACT_CALL_FAILED: (
            "system contract empty or failed"
        ),
    }
    mapping_regex: ClassVar[Dict[ExceptionBase, str]] = {}

EvmOneStateFixtureConsumer

Bases: EvmoneFixtureConsumerCommon, FixtureConsumerTool

Evmone's implementation of the fixture consumer for state tests.

Source code in packages/testing/src/execution_testing/client_clis/clis/evmone.py
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
class EvmOneStateFixtureConsumer(
    EvmoneFixtureConsumerCommon,
    FixtureConsumerTool,
    fixture_formats=[StateFixture],
):
    """Evmone's implementation of the fixture consumer for state tests."""

    default_binary = Path("evmone-statetest")
    detect_binary_pattern = re.compile(r"^evmone-statetest\b")

    def __init__(
        self,
        binary: Optional[Path] = None,
        trace: bool = False,
    ):
        """Initialize the EvmOneStateFixtureConsumer class."""
        self.binary = binary if binary else self.default_binary
        super().__init__(trace=trace)

    def consume_fixture(
        self,
        fixture_format: FixtureFormat,
        fixture_path: Path,
        fixture_name: Optional[str] = None,
        debug_output_path: Optional[Path] = None,
    ) -> None:
        """
        Execute the appropriate fixture consumer for the fixture at
        `fixture_path`.
        """
        if fixture_format == StateFixture:
            self.consume_test(
                fixture_path=fixture_path,
                fixture_name=fixture_name,
                debug_output_path=debug_output_path,
            )
        else:
            pytest.skip(self._skip_message(fixture_format))

__init__(binary=None, trace=False)

Initialize the EvmOneStateFixtureConsumer class.

Source code in packages/testing/src/execution_testing/client_clis/clis/evmone.py
259
260
261
262
263
264
265
266
def __init__(
    self,
    binary: Optional[Path] = None,
    trace: bool = False,
):
    """Initialize the EvmOneStateFixtureConsumer class."""
    self.binary = binary if binary else self.default_binary
    super().__init__(trace=trace)

consume_fixture(fixture_format, fixture_path, fixture_name=None, debug_output_path=None)

Execute the appropriate fixture consumer for the fixture at fixture_path.

Source code in packages/testing/src/execution_testing/client_clis/clis/evmone.py
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
def consume_fixture(
    self,
    fixture_format: FixtureFormat,
    fixture_path: Path,
    fixture_name: Optional[str] = None,
    debug_output_path: Optional[Path] = None,
) -> None:
    """
    Execute the appropriate fixture consumer for the fixture at
    `fixture_path`.
    """
    if fixture_format == StateFixture:
        self.consume_test(
            fixture_path=fixture_path,
            fixture_name=fixture_name,
            debug_output_path=debug_output_path,
        )
    else:
        pytest.skip(self._skip_message(fixture_format))

EvmOneTransitionTool

Bases: TransitionTool

Evmone evmone-t8n Transition tool interface wrapper class.

Source code in packages/testing/src/execution_testing/client_clis/clis/evmone.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
class EvmOneTransitionTool(TransitionTool):
    """Evmone `evmone-t8n` Transition tool interface wrapper class."""

    default_binary = Path("evmone-t8n")
    detect_binary_pattern = re.compile(r"^evmone-t8n\b")
    t8n_use_stream = False

    binary: Path
    cached_version: Optional[str] = None
    trace: bool
    supports_opcode_count: ClassVar[bool] = True
    supports_blob_params: ClassVar[bool] = True

    # evmone uses space-separated fork names for some forks
    fork_name_map: ClassVar[Dict[str, str]] = {
        "TangerineWhistle": "Tangerine Whistle",
        "SpuriousDragon": "Spurious Dragon",
    }

    def __init__(
        self,
        *,
        binary: Optional[Path] = None,
        trace: bool = False,
    ):
        """Initialize the Evmone Transition tool interface."""
        super().__init__(
            exception_mapper=EvmoneExceptionMapper(),
            binary=binary,
            trace=trace,
        )

    def is_fork_supported(self, fork: Fork) -> bool:
        """
        Return True if the fork is supported by the tool. Currently, evmone-t8n
        provides no way to determine supported forks.
        """
        del fork
        return True

__init__(*, binary=None, trace=False)

Initialize the Evmone Transition tool interface.

Source code in packages/testing/src/execution_testing/client_clis/clis/evmone.py
55
56
57
58
59
60
61
62
63
64
65
66
def __init__(
    self,
    *,
    binary: Optional[Path] = None,
    trace: bool = False,
):
    """Initialize the Evmone Transition tool interface."""
    super().__init__(
        exception_mapper=EvmoneExceptionMapper(),
        binary=binary,
        trace=trace,
    )

is_fork_supported(fork)

Return True if the fork is supported by the tool. Currently, evmone-t8n provides no way to determine supported forks.

Source code in packages/testing/src/execution_testing/client_clis/clis/evmone.py
68
69
70
71
72
73
74
def is_fork_supported(self, fork: Fork) -> bool:
    """
    Return True if the fork is supported by the tool. Currently, evmone-t8n
    provides no way to determine supported forks.
    """
    del fork
    return True

ExecutionSpecsTransitionTool

Bases: TransitionTool

Implementation of the EELS T8N for execution-spec-tests.

Source code in packages/testing/src/execution_testing/client_clis/clis/execution_specs.py
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
class ExecutionSpecsTransitionTool(TransitionTool):
    """Implementation of the EELS T8N for execution-spec-tests."""

    supports_opcode_count: ClassVar[bool] = True
    supports_blob_params: ClassVar[bool] = True

    def __init__(
        self,
        *,
        binary: Optional[Path] = None,
        trace: bool = False,
    ):
        """Initialize the EELS Transition Tool interface."""
        del binary  # EELS doesn't use an external binary
        self.exception_mapper = ExecutionSpecsExceptionMapper()
        self.trace = trace
        self._info_metadata: Optional[Dict[str, Any]] = {}
        self.fork_cache = ForkCache()

    @override
    def shutdown(self) -> None:
        self.fork_cache.__exit__()

    def version(self) -> str:
        """Version of the t8n tool."""
        return ethereum.__version__

    def is_fork_supported(self, fork: Fork) -> bool:
        """Return True if the fork is supported by the tool."""
        return fork.transition_tool_name() in get_supported_forks()

    def _evaluate(
        self,
        *,
        transition_tool_data: TransitionTool.TransitionToolData,
        debug_output_path: Path | None,
        slow_request: bool,
        profiler: Profiler,
    ) -> TransitionToolOutput:
        """
        Evaluate using the EELS T8N entry point.
        """
        del slow_request, profiler
        request_data = transition_tool_data.get_request_data()
        request_data_json = request_data.model_dump(
            mode="json", **model_dump_config
        )

        temp_dir = tempfile.TemporaryDirectory()
        t8n_args = [
            "t8n",
            "--input.alloc=stdin",
            "--input.env=stdin",
            "--input.txs=stdin",
            "--output.result=stdout",
            "--output.body=stdout",
            "--output.alloc=stdout",
            f"--output.basedir={temp_dir.name}",
            f"--state.fork={request_data_json['state']['fork']}",
            f"--state.chainid={request_data_json['state']['chainid']}",
            f"--state.reward={request_data_json['state']['reward']}",
        ]

        if transition_tool_data.state_test:
            t8n_args.append("--state-test")

        if transition_tool_data.blob_params:
            fork = transition_tool_data.fork
            if fork.bpo_fork() and fork != fork.non_bpo_ancestor():
                # Only send this information for BPO forks.
                # TODO: This should be optimized by the t8n tool instead.
                t8n_args.append("--input.blobParams=stdin")

        if self.trace:
            t8n_args.extend(
                [
                    "--trace",
                    "--trace.memory",
                    "--trace.returndata",
                ]
            )

        parser = create_parser()
        t8n_options = parser.parse_args(t8n_args)

        out_stream = StringIO()

        in_stream = StringIO(json.dumps(request_data_json["input"]))

        t8n = T8N(t8n_options, out_stream, in_stream, self.fork_cache)
        t8n.run()

        output_dict = json.loads(out_stream.getvalue())
        output: TransitionToolOutput = TransitionToolOutput.model_validate(
            output_dict, context={"exception_mapper": self.exception_mapper}
        )

        if debug_output_path:
            dump_files_to_directory(
                debug_output_path,
                {
                    "input/alloc.json": request_data.input.alloc,
                    "input/env.json": request_data.input.env,
                    "input/txs.json": [
                        tx.model_dump(mode="json", **model_dump_config)
                        for tx in request_data.input.txs
                    ],
                },
            )

            dump_files_to_directory(
                debug_output_path,
                {
                    "output/alloc.json": output.alloc,
                    "output/result.json": output.result,
                },
            )

        if self.trace:
            self.collect_traces(
                output.result.receipts, temp_dir, debug_output_path
            )
        temp_dir.cleanup()

        return output

    @classmethod
    def is_installed(cls, binary_path: Optional[Path] = None) -> bool:
        """ExecutionSpecs is always installed."""
        del binary_path
        return True

__init__(*, binary=None, trace=False)

Initialize the EELS Transition Tool interface.

Source code in packages/testing/src/execution_testing/client_clis/clis/execution_specs.py
41
42
43
44
45
46
47
48
49
50
51
52
def __init__(
    self,
    *,
    binary: Optional[Path] = None,
    trace: bool = False,
):
    """Initialize the EELS Transition Tool interface."""
    del binary  # EELS doesn't use an external binary
    self.exception_mapper = ExecutionSpecsExceptionMapper()
    self.trace = trace
    self._info_metadata: Optional[Dict[str, Any]] = {}
    self.fork_cache = ForkCache()

version()

Version of the t8n tool.

Source code in packages/testing/src/execution_testing/client_clis/clis/execution_specs.py
58
59
60
def version(self) -> str:
    """Version of the t8n tool."""
    return ethereum.__version__

is_fork_supported(fork)

Return True if the fork is supported by the tool.

Source code in packages/testing/src/execution_testing/client_clis/clis/execution_specs.py
62
63
64
def is_fork_supported(self, fork: Fork) -> bool:
    """Return True if the fork is supported by the tool."""
    return fork.transition_tool_name() in get_supported_forks()

is_installed(binary_path=None) classmethod

ExecutionSpecs is always installed.

Source code in packages/testing/src/execution_testing/client_clis/clis/execution_specs.py
161
162
163
164
165
@classmethod
def is_installed(cls, binary_path: Optional[Path] = None) -> bool:
    """ExecutionSpecs is always installed."""
    del binary_path
    return True

GethFixtureConsumer

Bases: GethEvm, FixtureConsumerTool

Geth's implementation of the fixture consumer.

Source code in packages/testing/src/execution_testing/client_clis/clis/geth.py
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
class GethFixtureConsumer(
    GethEvm,
    FixtureConsumerTool,
    fixture_formats=[StateFixture, BlockchainFixture],
):
    """Geth's implementation of the fixture consumer."""

    def consume_blockchain_test(
        self,
        fixture_path: Path,
        fixture_name: Optional[str] = None,
        debug_output_path: Optional[Path] = None,
    ) -> None:
        """
        Consume a single blockchain test.

        The `evm blocktest` command takes the `--run` argument which can be
        used to select a specific fixture from the fixture file when executing.
        """
        subcommand = "blocktest"
        global_options = []
        subcommand_options = []
        if debug_output_path:
            global_options += ["--verbosity", "100"]
            subcommand_options += ["--trace"]

        if fixture_name:
            subcommand_options += ["--run", re.escape(fixture_name)]

        command = (
            [str(self.binary)]
            + global_options
            + [subcommand]
            + subcommand_options
            + [str(fixture_path)]
        )

        result = self._run_command(command)

        if debug_output_path:
            self._consume_debug_dump(
                command, result, fixture_path, debug_output_path
            )

        if result.returncode != 0:
            raise Exception(
                f"Unexpected exit code:\n{' '.join(command)}\n\n"
                f"Error:\n{result.stderr}"
            )

        result_json = json.loads(result.stdout)
        if not isinstance(result_json, list):
            raise Exception(
                f"Unexpected result from evm blocktest: {result_json}"
            )

        if any(not test_result["pass"] for test_result in result_json):
            exception_text = "Blockchain test failed: \n" + "\n".join(
                f"{test_result['name']}: " + test_result["error"]
                for test_result in result_json
                if not test_result["pass"]
            )
            raise Exception(exception_text)

    @cache  # noqa
    def consume_state_test_file(
        self,
        fixture_path: Path,
        debug_output_path: Optional[Path] = None,
    ) -> List[Dict[str, Any]]:
        """
        Consume an entire state test file.

        The `evm statetest` will always execute all the tests contained in a
        file without the possibility of selecting a single test, so this
        function is cached in order to only call the command once and
        `consume_state_test` can simply select the result that was requested.
        """
        subcommand = "statetest"
        global_options: List[str] = []
        subcommand_options: List[str] = []
        if debug_output_path:
            global_options += ["--verbosity", "100"]
            subcommand_options += ["--trace"]

        command = (
            [str(self.binary)]
            + global_options
            + [subcommand]
            + subcommand_options
            + [str(fixture_path)]
        )
        result = self._run_command(command)

        if debug_output_path:
            self._consume_debug_dump(
                command, result, fixture_path, debug_output_path
            )

        if result.returncode != 0:
            raise Exception(
                f"Unexpected exit code:\n{' '.join(command)}\n\n"
                f"Error:\n{result.stderr}"
            )

        result_json = json.loads(result.stdout)
        if not isinstance(result_json, list):
            raise Exception(
                f"Unexpected result from evm statetest: {result_json}"
            )
        return result_json

    def consume_state_test(
        self,
        fixture_path: Path,
        fixture_name: Optional[str] = None,
        debug_output_path: Optional[Path] = None,
    ) -> None:
        """
        Consume a single state test.

        Uses the cached result from `consume_state_test_file` in order to not
        call the command every time and select a single result from there.
        """
        file_results = self.consume_state_test_file(
            fixture_path=fixture_path,
            debug_output_path=debug_output_path,
        )
        if fixture_name:
            test_result = [
                test_result
                for test_result in file_results
                if test_result["name"] == fixture_name
            ]
            assert len(test_result) < 2, (
                f"Multiple test results for {fixture_name}"
            )
            assert len(test_result) == 1, (
                f"Test result for {fixture_name} missing"
            )
            assert test_result[0]["pass"], (
                f"State test failed: {test_result[0]['error']}"
            )
        else:
            if any(not test_result["pass"] for test_result in file_results):
                exception_text = "State test failed: \n" + "\n".join(
                    f"{test_result['name']}: " + test_result["error"]
                    for test_result in file_results
                    if not test_result["pass"]
                )
                raise Exception(exception_text)

    def consume_fixture(
        self,
        fixture_format: FixtureFormat,
        fixture_path: Path,
        fixture_name: Optional[str] = None,
        debug_output_path: Optional[Path] = None,
    ) -> None:
        """
        Execute the appropriate geth fixture consumer for the fixture at
        `fixture_path`.
        """
        if fixture_format == BlockchainFixture:
            self.consume_blockchain_test(
                fixture_path=fixture_path,
                fixture_name=fixture_name,
                debug_output_path=debug_output_path,
            )
        elif fixture_format == StateFixture:
            self.consume_state_test(
                fixture_path=fixture_path,
                fixture_name=fixture_name,
                debug_output_path=debug_output_path,
            )
        else:
            raise Exception(
                f"Fixture format {fixture_format.format_name} "
                f"not supported by {self.binary}"
            )

consume_blockchain_test(fixture_path, fixture_name=None, debug_output_path=None)

Consume a single blockchain test.

The evm blocktest command takes the --run argument which can be used to select a specific fixture from the fixture file when executing.

Source code in packages/testing/src/execution_testing/client_clis/clis/geth.py
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
def consume_blockchain_test(
    self,
    fixture_path: Path,
    fixture_name: Optional[str] = None,
    debug_output_path: Optional[Path] = None,
) -> None:
    """
    Consume a single blockchain test.

    The `evm blocktest` command takes the `--run` argument which can be
    used to select a specific fixture from the fixture file when executing.
    """
    subcommand = "blocktest"
    global_options = []
    subcommand_options = []
    if debug_output_path:
        global_options += ["--verbosity", "100"]
        subcommand_options += ["--trace"]

    if fixture_name:
        subcommand_options += ["--run", re.escape(fixture_name)]

    command = (
        [str(self.binary)]
        + global_options
        + [subcommand]
        + subcommand_options
        + [str(fixture_path)]
    )

    result = self._run_command(command)

    if debug_output_path:
        self._consume_debug_dump(
            command, result, fixture_path, debug_output_path
        )

    if result.returncode != 0:
        raise Exception(
            f"Unexpected exit code:\n{' '.join(command)}\n\n"
            f"Error:\n{result.stderr}"
        )

    result_json = json.loads(result.stdout)
    if not isinstance(result_json, list):
        raise Exception(
            f"Unexpected result from evm blocktest: {result_json}"
        )

    if any(not test_result["pass"] for test_result in result_json):
        exception_text = "Blockchain test failed: \n" + "\n".join(
            f"{test_result['name']}: " + test_result["error"]
            for test_result in result_json
            if not test_result["pass"]
        )
        raise Exception(exception_text)

consume_state_test_file(fixture_path, debug_output_path=None) cached

Consume an entire state test file.

The evm statetest will always execute all the tests contained in a file without the possibility of selecting a single test, so this function is cached in order to only call the command once and consume_state_test can simply select the result that was requested.

Source code in packages/testing/src/execution_testing/client_clis/clis/geth.py
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
@cache  # noqa
def consume_state_test_file(
    self,
    fixture_path: Path,
    debug_output_path: Optional[Path] = None,
) -> List[Dict[str, Any]]:
    """
    Consume an entire state test file.

    The `evm statetest` will always execute all the tests contained in a
    file without the possibility of selecting a single test, so this
    function is cached in order to only call the command once and
    `consume_state_test` can simply select the result that was requested.
    """
    subcommand = "statetest"
    global_options: List[str] = []
    subcommand_options: List[str] = []
    if debug_output_path:
        global_options += ["--verbosity", "100"]
        subcommand_options += ["--trace"]

    command = (
        [str(self.binary)]
        + global_options
        + [subcommand]
        + subcommand_options
        + [str(fixture_path)]
    )
    result = self._run_command(command)

    if debug_output_path:
        self._consume_debug_dump(
            command, result, fixture_path, debug_output_path
        )

    if result.returncode != 0:
        raise Exception(
            f"Unexpected exit code:\n{' '.join(command)}\n\n"
            f"Error:\n{result.stderr}"
        )

    result_json = json.loads(result.stdout)
    if not isinstance(result_json, list):
        raise Exception(
            f"Unexpected result from evm statetest: {result_json}"
        )
    return result_json

consume_state_test(fixture_path, fixture_name=None, debug_output_path=None)

Consume a single state test.

Uses the cached result from consume_state_test_file in order to not call the command every time and select a single result from there.

Source code in packages/testing/src/execution_testing/client_clis/clis/geth.py
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
def consume_state_test(
    self,
    fixture_path: Path,
    fixture_name: Optional[str] = None,
    debug_output_path: Optional[Path] = None,
) -> None:
    """
    Consume a single state test.

    Uses the cached result from `consume_state_test_file` in order to not
    call the command every time and select a single result from there.
    """
    file_results = self.consume_state_test_file(
        fixture_path=fixture_path,
        debug_output_path=debug_output_path,
    )
    if fixture_name:
        test_result = [
            test_result
            for test_result in file_results
            if test_result["name"] == fixture_name
        ]
        assert len(test_result) < 2, (
            f"Multiple test results for {fixture_name}"
        )
        assert len(test_result) == 1, (
            f"Test result for {fixture_name} missing"
        )
        assert test_result[0]["pass"], (
            f"State test failed: {test_result[0]['error']}"
        )
    else:
        if any(not test_result["pass"] for test_result in file_results):
            exception_text = "State test failed: \n" + "\n".join(
                f"{test_result['name']}: " + test_result["error"]
                for test_result in file_results
                if not test_result["pass"]
            )
            raise Exception(exception_text)

consume_fixture(fixture_format, fixture_path, fixture_name=None, debug_output_path=None)

Execute the appropriate geth fixture consumer for the fixture at fixture_path.

Source code in packages/testing/src/execution_testing/client_clis/clis/geth.py
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
def consume_fixture(
    self,
    fixture_format: FixtureFormat,
    fixture_path: Path,
    fixture_name: Optional[str] = None,
    debug_output_path: Optional[Path] = None,
) -> None:
    """
    Execute the appropriate geth fixture consumer for the fixture at
    `fixture_path`.
    """
    if fixture_format == BlockchainFixture:
        self.consume_blockchain_test(
            fixture_path=fixture_path,
            fixture_name=fixture_name,
            debug_output_path=debug_output_path,
        )
    elif fixture_format == StateFixture:
        self.consume_state_test(
            fixture_path=fixture_path,
            fixture_name=fixture_name,
            debug_output_path=debug_output_path,
        )
    else:
        raise Exception(
            f"Fixture format {fixture_format.format_name} "
            f"not supported by {self.binary}"
        )

GethTransitionTool

Bases: GethEvm, TransitionTool

go-ethereum evm Transition tool interface wrapper class.

Source code in packages/testing/src/execution_testing/client_clis/clis/geth.py
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
class GethTransitionTool(GethEvm, TransitionTool):
    """go-ethereum `evm` Transition tool interface wrapper class."""

    subcommand: Optional[str] = "t8n"
    trace: bool
    t8n_use_stream = True
    supports_opcode_count: ClassVar[bool] = True

    def __init__(
        self,
        *,
        exception_mapper: Optional[ExceptionMapper] = None,
        binary: Optional[Path] = None,
        trace: bool = False,
    ):
        """Initialize the GethTransitionTool class."""
        if not exception_mapper:
            exception_mapper = GethExceptionMapper()
        GethEvm.__init__(self, binary=binary, trace=trace)
        TransitionTool.__init__(
            self, binary=binary, exception_mapper=exception_mapper, trace=trace
        )
        help_command = [str(self.binary), str(self.subcommand), "--help"]
        result = self._run_command(help_command)
        self.help_string = result.stdout

    def is_fork_supported(self, fork: Fork) -> bool:
        """
        Return True if the fork is supported by the tool.

        If the fork is a transition fork, we want to check the fork it
        transitions to.
        """
        return fork.transition_tool_name() in self.help_string

__init__(*, exception_mapper=None, binary=None, trace=False)

Initialize the GethTransitionTool class.

Source code in packages/testing/src/execution_testing/client_clis/clis/geth.py
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
def __init__(
    self,
    *,
    exception_mapper: Optional[ExceptionMapper] = None,
    binary: Optional[Path] = None,
    trace: bool = False,
):
    """Initialize the GethTransitionTool class."""
    if not exception_mapper:
        exception_mapper = GethExceptionMapper()
    GethEvm.__init__(self, binary=binary, trace=trace)
    TransitionTool.__init__(
        self, binary=binary, exception_mapper=exception_mapper, trace=trace
    )
    help_command = [str(self.binary), str(self.subcommand), "--help"]
    result = self._run_command(help_command)
    self.help_string = result.stdout

is_fork_supported(fork)

Return True if the fork is supported by the tool.

If the fork is a transition fork, we want to check the fork it transitions to.

Source code in packages/testing/src/execution_testing/client_clis/clis/geth.py
282
283
284
285
286
287
288
289
def is_fork_supported(self, fork: Fork) -> bool:
    """
    Return True if the fork is supported by the tool.

    If the fork is a transition fork, we want to check the fork it
    transitions to.
    """
    return fork.transition_tool_name() in self.help_string

Nethtest

Bases: EthereumCLI

Nethermind nethtest binary base class.

Source code in packages/testing/src/execution_testing/client_clis/clis/nethermind.py
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
class Nethtest(EthereumCLI):
    """Nethermind `nethtest` binary base class."""

    default_binary = Path("nethtest")
    # new pattern allows e.g. '1.2.3', in the past that was denied
    detect_binary_pattern = re.compile(
        r"^\d+\.\d+\.\d+(-[a-zA-Z0-9]+)?(\+[a-f0-9]{40})?$"
    )
    version_flag: str = "--version"
    cached_version: Optional[str] = None

    def __init__(
        self,
        binary: Path,
        trace: bool = False,
        exception_mapper: ExceptionMapper | None = None,
    ):
        """Initialize the Nethtest class."""
        self.binary = binary
        self.trace = trace
        # TODO: Implement NethermindExceptionMapper
        self.exception_mapper = exception_mapper if exception_mapper else None

    def _run_command(self, command: List[str]) -> subprocess.CompletedProcess:
        try:
            return subprocess.run(
                command,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                text=True,
            )
        except subprocess.CalledProcessError as e:
            raise Exception("Command failed with non-zero status.") from e
        except Exception as e:
            raise Exception("Unexpected exception calling evm tool.") from e

    def _consume_debug_dump(
        self,
        command: Tuple[str, ...],
        result: subprocess.CompletedProcess,
        debug_output_path: Path,
    ) -> None:
        # our assumption is that each command element is a string
        assert all(isinstance(x, str) for x in command), (
            f"Not all elements of 'command' list are strings: {command}"
        )

        # ensure that flags with spaces are wrapped in double-quotes
        consume_direct_call = " ".join(shlex.quote(arg) for arg in command)

        consume_direct_script = textwrap.dedent(
            f"""\
            #!/bin/bash
            {consume_direct_call}
            """
        )

        dump_files_to_directory(
            debug_output_path,
            {
                "consume_direct_args.py": command,
                "consume_direct_returncode.txt": result.returncode,
                "consume_direct_stdout.txt": result.stdout,
                "consume_direct_stderr.txt": result.stderr,
                "consume_direct.sh+x": consume_direct_script,
            },
        )

    @cache  # noqa
    def help(self, subcommand: str | None = None) -> str:
        """Return the help string, optionally for a subcommand."""
        help_command = [str(self.binary)]
        if subcommand:
            help_command.append(subcommand)
        help_command.append("--help")
        return self._run_command(help_command).stdout

__init__(binary, trace=False, exception_mapper=None)

Initialize the Nethtest class.

Source code in packages/testing/src/execution_testing/client_clis/clis/nethermind.py
39
40
41
42
43
44
45
46
47
48
49
def __init__(
    self,
    binary: Path,
    trace: bool = False,
    exception_mapper: ExceptionMapper | None = None,
):
    """Initialize the Nethtest class."""
    self.binary = binary
    self.trace = trace
    # TODO: Implement NethermindExceptionMapper
    self.exception_mapper = exception_mapper if exception_mapper else None

help(subcommand=None) cached

Return the help string, optionally for a subcommand.

Source code in packages/testing/src/execution_testing/client_clis/clis/nethermind.py
 96
 97
 98
 99
100
101
102
103
@cache  # noqa
def help(self, subcommand: str | None = None) -> str:
    """Return the help string, optionally for a subcommand."""
    help_command = [str(self.binary)]
    if subcommand:
        help_command.append(subcommand)
    help_command.append("--help")
    return self._run_command(help_command).stdout

NethtestFixtureConsumer

Bases: Nethtest, FixtureConsumerTool

Nethermind implementation of the fixture consumer.

Source code in packages/testing/src/execution_testing/client_clis/clis/nethermind.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
class NethtestFixtureConsumer(
    Nethtest,
    FixtureConsumerTool,
    fixture_formats=[StateFixture, BlockchainFixture],
):
    """Nethermind implementation of the fixture consumer."""

    def _build_command_with_options(
        self,
        fixture_format: FixtureFormat,
        fixture_path: Path,
        fixture_name: Optional[str] = None,
        debug_output_path: Optional[Path] = None,
    ) -> Tuple[str, ...]:
        assert fixture_name, "Fixture name must be provided for nethtest."
        command = [str(self.binary)]
        if fixture_format is BlockchainFixture:
            command += [
                "--blockTest",
                "--filter",
                f"{re.escape(fixture_name)}",
            ]
        elif fixture_format is StateFixture:
            # TODO: consider using `--filter` here to readily access traces
            # from the output
            pass  # no additional options needed
        else:
            raise Exception(
                f"Fixture format {fixture_format.format_name} "
                f"not supported by {self.binary}"
            )
        command += ["--input", str(fixture_path)]
        if debug_output_path:
            command += ["--trace"]
        return tuple(command)

    @cache  # noqa
    def consume_state_test_file(
        self,
        fixture_path: Path,
        command: Tuple[str, ...],
        debug_output_path: Optional[Path] = None,
    ) -> Tuple[List[Dict[str, Any]], str]:
        """
        Consume an entire state test file.

        The `evm statetest` will always execute all the tests contained in a
        file without the possibility of selecting a single test, so this
        function is cached in order to only call the command once and
        `consume_state_test` can simply select the result that was requested.
        """
        del fixture_path
        result = subprocess.run(
            command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
        )

        if debug_output_path:
            self._consume_debug_dump(command, result, debug_output_path)

        if result.returncode != 0:
            raise Exception(
                f"Unexpected exit code:\n{' '.join(command)}\n\n"
                f"Error:\n{result.stderr}"
            )

        try:
            result_json = json.loads(result.stdout)
        except json.JSONDecodeError as e:
            raise Exception(
                f"Failed to parse JSON output on stdout from nethtest:\n"
                f"{result.stdout}"
            ) from e

        if not isinstance(result_json, list):
            raise Exception(
                f"Unexpected result from evm statetest: {result_json}"
            )
        return result_json, result.stderr

    def consume_state_test(
        self,
        command: Tuple[str, ...],
        fixture_path: Path,
        fixture_name: Optional[str] = None,
        debug_output_path: Optional[Path] = None,
    ) -> None:
        """
        Consume a single state test.

        Uses the cached result from `consume_state_test_file` in order to not
        call the command every time and select a single result from there.
        """
        file_results, stderr = self.consume_state_test_file(
            fixture_path=fixture_path,
            command=command,
            debug_output_path=debug_output_path,
        )

        if fixture_name:
            # TODO: this check is too fragile; extend for ethereum/tests?
            nethtest_suffix = "_d0g0v0_"
            assert all(
                test_result["name"].endswith(nethtest_suffix)
                for test_result in file_results
            ), (
                "consume direct with nethtest doesn't support the "
                "multi-data statetest format used in ethereum/tests (yet)"
            )
            test_result = [
                test_result
                for test_result in file_results
                if test_result["name"].removesuffix(nethtest_suffix)
                == f"{fixture_name.split('/')[-1]}"
            ]
            assert len(test_result) < 2, (
                f"Multiple test results for {fixture_name}"
            )
            assert len(test_result) == 1, (
                f"Test result for {fixture_name} missing"
            )
            assert test_result[0]["pass"], (
                f"State test '{fixture_name}' failed, "
                f"available stderr:\n {stderr}"
            )
        else:
            if any(not test_result["pass"] for test_result in file_results):
                exception_text = "State test failed: \n" + "\n".join(
                    f"{test_result['name']}: " + test_result["error"]
                    for test_result in file_results
                    if not test_result["pass"]
                )
                raise Exception(exception_text)

    def consume_blockchain_test(
        self,
        command: Tuple[str, ...],
        fixture_path: Path,
        fixture_name: Optional[str] = None,
        debug_output_path: Optional[Path] = None,
    ) -> None:
        """Execute the the fixture at `fixture_path` via `nethtest`."""
        del fixture_path
        del fixture_name
        result = subprocess.run(
            command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
        )

        if debug_output_path:
            self._consume_debug_dump(command, result, debug_output_path)

        if result.returncode != 0:
            raise Exception(
                f"nethtest exited with non-zero exit code "
                f"({result.returncode}).\n"
                f"stdout:\n{result.stdout}\n"
                f"stderr:\n{result.stderr}\n"
                f"{' '.join(command)}"
            )

    def consume_fixture(
        self,
        fixture_format: FixtureFormat,
        fixture_path: Path,
        fixture_name: Optional[str] = None,
        debug_output_path: Optional[Path] = None,
    ) -> None:
        """
        Execute the appropriate geth fixture consumer for the fixture at
        `fixture_path`.
        """
        command = self._build_command_with_options(
            fixture_format, fixture_path, fixture_name, debug_output_path
        )
        if fixture_format == BlockchainFixture:
            self.consume_blockchain_test(
                command=command,
                fixture_path=fixture_path,
                fixture_name=fixture_name,
                debug_output_path=debug_output_path,
            )
        elif fixture_format == StateFixture:
            self.consume_state_test(
                command=command,
                fixture_path=fixture_path,
                fixture_name=fixture_name,
                debug_output_path=debug_output_path,
            )
        else:
            raise Exception(
                f"Fixture format {fixture_format.format_name} "
                f"not supported by {self.binary}"
            )

consume_state_test_file(fixture_path, command, debug_output_path=None) cached

Consume an entire state test file.

The evm statetest will always execute all the tests contained in a file without the possibility of selecting a single test, so this function is cached in order to only call the command once and consume_state_test can simply select the result that was requested.

Source code in packages/testing/src/execution_testing/client_clis/clis/nethermind.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
@cache  # noqa
def consume_state_test_file(
    self,
    fixture_path: Path,
    command: Tuple[str, ...],
    debug_output_path: Optional[Path] = None,
) -> Tuple[List[Dict[str, Any]], str]:
    """
    Consume an entire state test file.

    The `evm statetest` will always execute all the tests contained in a
    file without the possibility of selecting a single test, so this
    function is cached in order to only call the command once and
    `consume_state_test` can simply select the result that was requested.
    """
    del fixture_path
    result = subprocess.run(
        command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
    )

    if debug_output_path:
        self._consume_debug_dump(command, result, debug_output_path)

    if result.returncode != 0:
        raise Exception(
            f"Unexpected exit code:\n{' '.join(command)}\n\n"
            f"Error:\n{result.stderr}"
        )

    try:
        result_json = json.loads(result.stdout)
    except json.JSONDecodeError as e:
        raise Exception(
            f"Failed to parse JSON output on stdout from nethtest:\n"
            f"{result.stdout}"
        ) from e

    if not isinstance(result_json, list):
        raise Exception(
            f"Unexpected result from evm statetest: {result_json}"
        )
    return result_json, result.stderr

consume_state_test(command, fixture_path, fixture_name=None, debug_output_path=None)

Consume a single state test.

Uses the cached result from consume_state_test_file in order to not call the command every time and select a single result from there.

Source code in packages/testing/src/execution_testing/client_clis/clis/nethermind.py
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
def consume_state_test(
    self,
    command: Tuple[str, ...],
    fixture_path: Path,
    fixture_name: Optional[str] = None,
    debug_output_path: Optional[Path] = None,
) -> None:
    """
    Consume a single state test.

    Uses the cached result from `consume_state_test_file` in order to not
    call the command every time and select a single result from there.
    """
    file_results, stderr = self.consume_state_test_file(
        fixture_path=fixture_path,
        command=command,
        debug_output_path=debug_output_path,
    )

    if fixture_name:
        # TODO: this check is too fragile; extend for ethereum/tests?
        nethtest_suffix = "_d0g0v0_"
        assert all(
            test_result["name"].endswith(nethtest_suffix)
            for test_result in file_results
        ), (
            "consume direct with nethtest doesn't support the "
            "multi-data statetest format used in ethereum/tests (yet)"
        )
        test_result = [
            test_result
            for test_result in file_results
            if test_result["name"].removesuffix(nethtest_suffix)
            == f"{fixture_name.split('/')[-1]}"
        ]
        assert len(test_result) < 2, (
            f"Multiple test results for {fixture_name}"
        )
        assert len(test_result) == 1, (
            f"Test result for {fixture_name} missing"
        )
        assert test_result[0]["pass"], (
            f"State test '{fixture_name}' failed, "
            f"available stderr:\n {stderr}"
        )
    else:
        if any(not test_result["pass"] for test_result in file_results):
            exception_text = "State test failed: \n" + "\n".join(
                f"{test_result['name']}: " + test_result["error"]
                for test_result in file_results
                if not test_result["pass"]
            )
            raise Exception(exception_text)

consume_blockchain_test(command, fixture_path, fixture_name=None, debug_output_path=None)

Execute the the fixture at fixture_path via nethtest.

Source code in packages/testing/src/execution_testing/client_clis/clis/nethermind.py
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
def consume_blockchain_test(
    self,
    command: Tuple[str, ...],
    fixture_path: Path,
    fixture_name: Optional[str] = None,
    debug_output_path: Optional[Path] = None,
) -> None:
    """Execute the the fixture at `fixture_path` via `nethtest`."""
    del fixture_path
    del fixture_name
    result = subprocess.run(
        command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
    )

    if debug_output_path:
        self._consume_debug_dump(command, result, debug_output_path)

    if result.returncode != 0:
        raise Exception(
            f"nethtest exited with non-zero exit code "
            f"({result.returncode}).\n"
            f"stdout:\n{result.stdout}\n"
            f"stderr:\n{result.stderr}\n"
            f"{' '.join(command)}"
        )

consume_fixture(fixture_format, fixture_path, fixture_name=None, debug_output_path=None)

Execute the appropriate geth fixture consumer for the fixture at fixture_path.

Source code in packages/testing/src/execution_testing/client_clis/clis/nethermind.py
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
def consume_fixture(
    self,
    fixture_format: FixtureFormat,
    fixture_path: Path,
    fixture_name: Optional[str] = None,
    debug_output_path: Optional[Path] = None,
) -> None:
    """
    Execute the appropriate geth fixture consumer for the fixture at
    `fixture_path`.
    """
    command = self._build_command_with_options(
        fixture_format, fixture_path, fixture_name, debug_output_path
    )
    if fixture_format == BlockchainFixture:
        self.consume_blockchain_test(
            command=command,
            fixture_path=fixture_path,
            fixture_name=fixture_name,
            debug_output_path=debug_output_path,
        )
    elif fixture_format == StateFixture:
        self.consume_state_test(
            command=command,
            fixture_path=fixture_path,
            fixture_name=fixture_name,
            debug_output_path=debug_output_path,
        )
    else:
        raise Exception(
            f"Fixture format {fixture_format.format_name} "
            f"not supported by {self.binary}"
        )

NimbusTransitionTool

Bases: TransitionTool

Nimbus evm Transition tool interface wrapper class.

Source code in packages/testing/src/execution_testing/client_clis/clis/nimbus.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
class NimbusTransitionTool(TransitionTool):
    """Nimbus `evm` Transition tool interface wrapper class."""

    default_binary = Path("t8n")
    detect_binary_pattern = re.compile(r"^Nimbus-t8n\b")
    version_flag: str = "--version"

    binary: Path
    cached_version: Optional[str] = None
    trace: bool

    def __init__(
        self,
        *,
        binary: Optional[Path] = None,
        trace: bool = False,
    ):
        """Initialize the Nimbus Transition tool interface."""
        super().__init__(
            exception_mapper=NimbusExceptionMapper(),
            binary=binary,
            trace=trace,
        )
        args = [str(self.binary), "--help"]
        try:
            result = subprocess.run(args, capture_output=True, text=True)
        except subprocess.CalledProcessError as e:
            raise Exception(
                f"evm process unexpectedly returned "
                f"a non-zero status code: {e}."
            ) from e
        except Exception as e:
            raise Exception(
                f"Unexpected exception calling evm tool: {e}."
            ) from e
        self.help_string = result.stdout

    def version(self) -> str:
        """Get `evm` binary version."""
        if self.cached_version is None:
            self.cached_version = re.sub(
                r"\x1b\[0m", "", super().version()
            ).strip()

        return self.cached_version

    def is_fork_supported(self, fork: Fork) -> bool:
        """
        Return True if the fork is supported by the tool.

        If the fork is a transition fork, we want to check the fork it
        transitions to.
        """
        return fork.transition_tool_name() in self.help_string

__init__(*, binary=None, trace=False)

Initialize the Nimbus Transition tool interface.

Source code in packages/testing/src/execution_testing/client_clis/clis/nimbus.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
def __init__(
    self,
    *,
    binary: Optional[Path] = None,
    trace: bool = False,
):
    """Initialize the Nimbus Transition tool interface."""
    super().__init__(
        exception_mapper=NimbusExceptionMapper(),
        binary=binary,
        trace=trace,
    )
    args = [str(self.binary), "--help"]
    try:
        result = subprocess.run(args, capture_output=True, text=True)
    except subprocess.CalledProcessError as e:
        raise Exception(
            f"evm process unexpectedly returned "
            f"a non-zero status code: {e}."
        ) from e
    except Exception as e:
        raise Exception(
            f"Unexpected exception calling evm tool: {e}."
        ) from e
    self.help_string = result.stdout

version()

Get evm binary version.

Source code in packages/testing/src/execution_testing/client_clis/clis/nimbus.py
56
57
58
59
60
61
62
63
def version(self) -> str:
    """Get `evm` binary version."""
    if self.cached_version is None:
        self.cached_version = re.sub(
            r"\x1b\[0m", "", super().version()
        ).strip()

    return self.cached_version

is_fork_supported(fork)

Return True if the fork is supported by the tool.

If the fork is a transition fork, we want to check the fork it transitions to.

Source code in packages/testing/src/execution_testing/client_clis/clis/nimbus.py
65
66
67
68
69
70
71
72
def is_fork_supported(self, fork: Fork) -> bool:
    """
    Return True if the fork is supported by the tool.

    If the fork is a transition fork, we want to check the fork it
    transitions to.
    """
    return fork.transition_tool_name() in self.help_string

CLINotFoundInPathError

Bases: Exception

Exception raised if the specified CLI binary isn't found in the path.

Source code in packages/testing/src/execution_testing/client_clis/ethereum_cli.py
24
25
26
27
28
29
30
31
32
33
34
35
class CLINotFoundInPathError(Exception):
    """Exception raised if the specified CLI binary isn't found in the path."""

    def __init__(
        self,
        message: str = "The CLI binary was not found in the path",
        binary: Path | None = None,
    ) -> None:
        """Initialize the exception."""
        if binary:
            message = f"{message} ({binary})"
        super().__init__(message)

__init__(message='The CLI binary was not found in the path', binary=None)

Initialize the exception.

Source code in packages/testing/src/execution_testing/client_clis/ethereum_cli.py
27
28
29
30
31
32
33
34
35
def __init__(
    self,
    message: str = "The CLI binary was not found in the path",
    binary: Path | None = None,
) -> None:
    """Initialize the exception."""
    if binary:
        message = f"{message} ({binary})"
    super().__init__(message)

UnknownCLIError

Bases: Exception

Exception raised if an unknown CLI is encountered.

Source code in packages/testing/src/execution_testing/client_clis/ethereum_cli.py
18
19
20
21
class UnknownCLIError(Exception):
    """Exception raised if an unknown CLI is encountered."""

    pass

FixtureConsumerTool

Bases: FixtureConsumer, EthereumCLI

Fixture consumer tool abstract base class which should be inherited by all fixture consumer tool implementations.

Source code in packages/testing/src/execution_testing/client_clis/fixture_consumer_tool.py
10
11
12
13
14
15
16
17
18
19
20
21
22
class FixtureConsumerTool(FixtureConsumer, EthereumCLI):
    """
    Fixture consumer tool abstract base class which should be inherited by all
    fixture consumer tool implementations.
    """

    registered_tools: List[Type["FixtureConsumerTool"]] = []
    default_tool: Type["FixtureConsumerTool"] | None = None

    def __init_subclass__(cls, *, fixture_formats: List[FixtureFormat]):
        """Register all subclasses of FixtureConsumerTool as possible tools."""
        FixtureConsumerTool.register_tool(cls)
        cls.fixture_formats = fixture_formats

__init_subclass__(*, fixture_formats)

Register all subclasses of FixtureConsumerTool as possible tools.

Source code in packages/testing/src/execution_testing/client_clis/fixture_consumer_tool.py
19
20
21
22
def __init_subclass__(cls, *, fixture_formats: List[FixtureFormat]):
    """Register all subclasses of FixtureConsumerTool as possible tools."""
    FixtureConsumerTool.register_tool(cls)
    cls.fixture_formats = fixture_formats

FieldExclusionTraceComparator

Bases: TraceComparator

Compare traces field-by-field, optionally excluding fields.

Source code in packages/testing/src/execution_testing/client_clis/trace_comparators.py
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
class FieldExclusionTraceComparator(TraceComparator):
    """Compare traces field-by-field, optionally excluding fields."""

    def __init__(
        self,
        comparator_name: str,
        exclude_fields: set[str] | None = None,
        ignore_gas_differences: bool = False,
    ) -> None:
        self._name = comparator_name
        self._exclude_fields = exclude_fields
        self._ignore_gas_differences = ignore_gas_differences

    @property
    def name(self) -> str:
        """Return the comparator's name."""
        return self._name

    def compare_transaction_traces(
        self,
        baseline: TransactionTraces,
        current: TransactionTraces,
        transaction_index: int,
    ) -> TraceComparisonResult:
        """Compare trace fields, excluding configured fields."""
        return _build_result_from_compare(
            baseline,
            current,
            transaction_index,
            exclude_fields=self._exclude_fields,
            ignore_gas_differences=self._ignore_gas_differences,
        )

name property

Return the comparator's name.

compare_transaction_traces(baseline, current, transaction_index)

Compare trace fields, excluding configured fields.

Source code in packages/testing/src/execution_testing/client_clis/trace_comparators.py
199
200
201
202
203
204
205
206
207
208
209
210
211
212
def compare_transaction_traces(
    self,
    baseline: TransactionTraces,
    current: TransactionTraces,
    transaction_index: int,
) -> TraceComparisonResult:
    """Compare trace fields, excluding configured fields."""
    return _build_result_from_compare(
        baseline,
        current,
        transaction_index,
        exclude_fields=self._exclude_fields,
        ignore_gas_differences=self._ignore_gas_differences,
    )

GasExhaustionTraceComparator

Bases: TraceComparator

Detect differences in gas exhaustion between traces.

Equivalent when both sides have no out-of-gas errors or when both run out of gas at the same trace line(s). Different when the out-of-gas points diverge.

Source code in packages/testing/src/execution_testing/client_clis/trace_comparators.py
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
class GasExhaustionTraceComparator(TraceComparator):
    """
    Detect differences in gas exhaustion between traces.

    Equivalent when both sides have no out-of-gas errors or when
    both run out of gas at the same trace line(s). Different when
    the out-of-gas points diverge.
    """

    @property
    def name(self) -> str:
        """Return the comparator's name."""
        return "gas-exhaustion"

    def compare_transaction_traces(
        self,
        baseline: TransactionTraces,
        current: TransactionTraces,
        transaction_index: int,
    ) -> TraceComparisonResult:
        """Compare gas exhaustion points between two transaction traces."""
        b_set = set(_find_gas_exhaustion_points(baseline))
        c_set = set(_find_gas_exhaustion_points(current))

        if b_set == c_set:
            return TraceComparisonResult(equivalent=True)

        differences: list[TraceDifference] = []
        for line_index in sorted(b_set - c_set):
            differences.append(
                TraceDifference(
                    transaction_index=transaction_index,
                    trace_line_index=line_index,
                    baseline=_format_oog_trace_line(baseline, line_index),
                    current="no out-of-gas",
                )
            )
        for line_index in sorted(c_set - b_set):
            differences.append(
                TraceDifference(
                    transaction_index=transaction_index,
                    trace_line_index=line_index,
                    baseline="no out-of-gas",
                    current=_format_oog_trace_line(current, line_index),
                )
            )

        return TraceComparisonResult(
            equivalent=False,
            differences=differences,
        )

name property

Return the comparator's name.

compare_transaction_traces(baseline, current, transaction_index)

Compare gas exhaustion points between two transaction traces.

Source code in packages/testing/src/execution_testing/client_clis/trace_comparators.py
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
def compare_transaction_traces(
    self,
    baseline: TransactionTraces,
    current: TransactionTraces,
    transaction_index: int,
) -> TraceComparisonResult:
    """Compare gas exhaustion points between two transaction traces."""
    b_set = set(_find_gas_exhaustion_points(baseline))
    c_set = set(_find_gas_exhaustion_points(current))

    if b_set == c_set:
        return TraceComparisonResult(equivalent=True)

    differences: list[TraceDifference] = []
    for line_index in sorted(b_set - c_set):
        differences.append(
            TraceDifference(
                transaction_index=transaction_index,
                trace_line_index=line_index,
                baseline=_format_oog_trace_line(baseline, line_index),
                current="no out-of-gas",
            )
        )
    for line_index in sorted(c_set - b_set):
        differences.append(
            TraceDifference(
                transaction_index=transaction_index,
                trace_line_index=line_index,
                baseline="no out-of-gas",
                current=_format_oog_trace_line(current, line_index),
            )
        )

    return TraceComparisonResult(
        equivalent=False,
        differences=differences,
    )

TraceComparator

Bases: ABC

Abstract base class for trace comparison strategies.

Source code in packages/testing/src/execution_testing/client_clis/trace_comparators.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
class TraceComparator(ABC):
    """Abstract base class for trace comparison strategies."""

    @property
    @abstractmethod
    def name(self) -> str:
        """Return the comparator's name."""
        ...

    @abstractmethod
    def compare_transaction_traces(
        self,
        baseline: TransactionTraces,
        current: TransactionTraces,
        transaction_index: int,
    ) -> TraceComparisonResult:
        """Compare a single transaction's traces."""
        ...

    def compare_traces(
        self,
        baseline: Traces,
        current: Traces,
    ) -> TraceComparisonResult:
        """Compare two Traces objects by iterating transaction pairs."""
        if len(baseline.root) != len(current.root):
            return TraceComparisonResult(
                equivalent=False,
                differences=[
                    TransactionCountMismatch(
                        baseline_count=len(baseline.root),
                        current_count=len(current.root),
                    )
                ],
            )

        all_differences: list[TraceDifference] = []
        for i, (b_tx, c_tx) in enumerate(
            zip(baseline.root, current.root, strict=False)
        ):
            result = self.compare_transaction_traces(b_tx, c_tx, i)
            all_differences.extend(result.differences)

        return TraceComparisonResult(
            equivalent=len(all_differences) == 0,
            differences=all_differences,
        )

name abstractmethod property

Return the comparator's name.

compare_transaction_traces(baseline, current, transaction_index) abstractmethod

Compare a single transaction's traces.

Source code in packages/testing/src/execution_testing/client_clis/trace_comparators.py
 93
 94
 95
 96
 97
 98
 99
100
101
@abstractmethod
def compare_transaction_traces(
    self,
    baseline: TransactionTraces,
    current: TransactionTraces,
    transaction_index: int,
) -> TraceComparisonResult:
    """Compare a single transaction's traces."""
    ...

compare_traces(baseline, current)

Compare two Traces objects by iterating transaction pairs.

Source code in packages/testing/src/execution_testing/client_clis/trace_comparators.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
def compare_traces(
    self,
    baseline: Traces,
    current: Traces,
) -> TraceComparisonResult:
    """Compare two Traces objects by iterating transaction pairs."""
    if len(baseline.root) != len(current.root):
        return TraceComparisonResult(
            equivalent=False,
            differences=[
                TransactionCountMismatch(
                    baseline_count=len(baseline.root),
                    current_count=len(current.root),
                )
            ],
        )

    all_differences: list[TraceDifference] = []
    for i, (b_tx, c_tx) in enumerate(
        zip(baseline.root, current.root, strict=False)
    ):
        result = self.compare_transaction_traces(b_tx, c_tx, i)
        all_differences.extend(result.differences)

    return TraceComparisonResult(
        equivalent=len(all_differences) == 0,
        differences=all_differences,
    )

TraceComparatorType

Bases: StrEnum

Supported trace comparator strategies.

Source code in packages/testing/src/execution_testing/client_clis/trace_comparators.py
17
18
19
20
21
22
23
24
25
class TraceComparatorType(StrEnum):
    """Supported trace comparator strategies."""

    EXACT = "exact"
    EXACT_NO_GAS = "exact-no-gas"
    EXACT_NO_STACK = "exact-no-stack"
    EXACT_NO_STACK_NO_GAS = "exact-no-stack-no-gas"
    EXACT_NO_STACK_NO_GAS_NO_GAS_COST = "exact-no-stack-no-gas-no-gas-cost"
    GAS_EXHAUSTION = "gas-exhaustion"

TraceComparisonResult

Bases: EthereumTestBaseModel

Result of comparing two Traces objects.

Source code in packages/testing/src/execution_testing/client_clis/trace_comparators.py
77
78
79
80
81
class TraceComparisonResult(EthereumTestBaseModel):
    """Result of comparing two Traces objects."""

    equivalent: bool
    differences: list[AnyTraceDifference] = Field(default_factory=list)

TraceDifference

Bases: EthereumTestBaseModel

A difference between baseline and current trace at a specific line.

Source code in packages/testing/src/execution_testing/client_clis/trace_comparators.py
45
46
47
48
49
50
51
52
53
54
55
56
class TraceDifference(EthereumTestBaseModel):
    """A difference between baseline and current trace at a specific line."""

    # Tag used by the discriminated union in ``TraceComparisonResult`` so
    # that serialization (``model_dump``) and deserialization
    # (``model_validate``) pick the correct concrete subclass when
    # differences are round-tripped — e.g. across xdist workers.
    kind: Literal["trace_difference"] = "trace_difference"
    transaction_index: int
    trace_line_index: int
    baseline: str
    current: str

TransactionCountMismatch

Bases: TraceDifference

Structural mismatch: different number of transactions.

Source code in packages/testing/src/execution_testing/client_clis/trace_comparators.py
59
60
61
62
63
64
65
66
67
68
class TransactionCountMismatch(TraceDifference):
    """Structural mismatch: different number of transactions."""

    kind: Literal["transaction_count_mismatch"] = "transaction_count_mismatch"  # type: ignore[assignment]
    transaction_index: int = 0
    trace_line_index: int = -1
    baseline: str = ""
    current: str = ""
    baseline_count: int = 0
    current_count: int = 0

create_comparator(comparator_type)

Create a comparator instance from the given type.

Source code in packages/testing/src/execution_testing/client_clis/trace_comparators.py
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
def create_comparator(
    comparator_type: TraceComparatorType,
) -> TraceComparator:
    """Create a comparator instance from the given type."""
    if comparator_type == TraceComparatorType.GAS_EXHAUSTION:
        return GasExhaustionTraceComparator()
    if comparator_type in _FIELD_EXCLUSION_CONFIGS:
        exclude_fields, ignore_gas_differences = _FIELD_EXCLUSION_CONFIGS[
            comparator_type
        ]
        return FieldExclusionTraceComparator(
            comparator_type.value,
            exclude_fields=exclude_fields,
            ignore_gas_differences=ignore_gas_differences,
        )
    raise ValueError(f"Unknown comparator type: {comparator_type}")

TransitionTool

Bases: EthereumCLI

Transition tool abstract base class which should be inherited by all transition tool implementations.

Source code in packages/testing/src/execution_testing/client_clis/transition_tool.py
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
class TransitionTool(EthereumCLI):
    """
    Transition tool abstract base class which should be inherited by all
    transition tool implementations.
    """

    traces: List[Traces] | None = None

    registered_tools: List[Type["TransitionTool"]] = []
    default_tool: Optional[Type["TransitionTool"]] = None

    exception_mapper: ExceptionMapper

    subcommand: Optional[str] = None
    cached_version: Optional[str] = None
    t8n_use_stream: bool = False
    t8n_use_server: bool = False
    server_url: str | None = None
    process: Optional[subprocess.Popen] = None
    output_cache: OutputCache | None = None
    debug_dump_dir: Path | None = None
    call_counter: int = 0
    opcode_count: OpcodeCount | None = None

    supports_opcode_count: ClassVar[bool] = False
    supports_xdist: ClassVar[bool] = True
    supports_blob_params: ClassVar[bool] = False
    fork_name_map: ClassVar[Dict[str, str]] = {}

    @abstractmethod
    def __init__(
        self,
        *,
        exception_mapper: Optional[ExceptionMapper] = None,
        binary: Optional[Path] = None,
        trace: bool = False,
    ):
        """
        Abstract initialization method that all subclasses must implement.
        """
        assert exception_mapper is not None
        self.exception_mapper = exception_mapper
        super().__init__(binary=binary)
        self.trace = trace
        self._info_metadata: Optional[Dict[str, Any]] = {}

    def __init_subclass__(cls) -> None:
        """Register all subclasses of TransitionTool as possible tools."""
        TransitionTool.register_tool(cls)

    @abstractmethod
    def is_fork_supported(self, fork: Fork) -> bool:
        """Return True if the fork is supported by the tool."""
        pass

    def start_server(self) -> None:
        """
        Start the t8n-server process, extract the port, and leave it
        running for future reuse.
        """
        pass

    def shutdown(self) -> None:
        """Perform any cleanup tasks related to the tested tool."""
        pass

    def reset_traces(self) -> None:
        """Reset the internal trace storage for a new test to begin."""
        self.traces = []

    def append_traces(self, new_traces: Traces) -> None:
        """
        Append a list of traces of a state transition to the current
        list.
        """
        assert self.traces is not None
        self.traces.append(new_traces)

    def get_traces(self) -> List[Traces] | None:
        """Return the accumulated traces."""
        return self.traces

    def collect_traces(
        self,
        receipts: List[TransactionReceipt],
        temp_dir: tempfile.TemporaryDirectory,
        debug_output_path: Path | None,
    ) -> Traces:
        """
        Collect the traces from the t8n tool output and store them in the
        traces list.
        """
        traces: Traces = Traces(root=[])
        temp_dir_path = Path(temp_dir.name)
        for i, r in enumerate(receipts):
            trace_file_name = f"trace-{i}-{r.transaction_hash}.jsonl"
            trace_file_path = temp_dir_path / trace_file_name
            if not trace_file_path.exists():
                # Transaction was rejected mid-processing (e.g. EIP-3607
                # collision): the receipt exists but the tracer's
                # TransactionEnd event never fired, so no trace file was
                # written. Record an empty trace for this tx.
                traces.append(TransactionTraces(traces=[]))
                continue
            if debug_output_path:
                shutil.copy(
                    trace_file_path,
                    Path(debug_output_path) / trace_file_name,
                )
            traces.append(TransactionTraces.from_file(trace_file_path))
        self.append_traces(traces)
        return traces

    def set_cache(self, *, key: str) -> bool:
        """
        Set the current cache key.

        Creates the cache on first call, then reuses it for single-key
        eviction.
        Returns True if the key was already in the cache (hit).
        """
        if self.output_cache is None:
            self.output_cache = OutputCache()
        return self.output_cache.set_key(key)

    def remove_cache(self) -> None:
        """Clear the cache (test doesn't use caching)."""
        if self.output_cache is not None:
            self.output_cache.clear()

    def reset_opcode_count(self) -> None:
        """
        Reset the opcode count to zero.
        """
        self.opcode_count = OpcodeCount({})

    @dataclass
    class TransitionToolData:
        """Transition tool files and data to pass between methods."""

        alloc: Alloc | LazyAlloc
        txs: List[Transaction]
        env: Environment
        fork: Fork
        chain_id: int
        reward: int
        blob_schedule: BlobSchedule | None
        state_test: bool = False

        @property
        def fork_name(self) -> str:
            """Return the fork name."""
            return self.fork.transition_tool_name()

        @property
        def fork_name_if_supports_blob_params(self) -> str:
            """Return the fork name."""
            fork = self.fork()

            # For tools that support blob_params, return base fork for BPO
            # forks.
            if fork.bpo_fork():
                return fork.non_bpo_ancestor().transition_tool_name()
            else:
                return self.fork.transition_tool_name()

        @property
        def blob_params(self) -> ForkBlobSchedule | None:
            """Return the blob parameters for the current fork."""
            if self.blob_schedule:
                fork_name = self.fork.name()
                # Only return blob params if this fork has them
                if fork_name in self.blob_schedule.root:
                    return self.blob_schedule[fork_name]
            return None

        def __post_init__(self) -> None:
            """Modify the reward if the environment number is 0."""
            if self.env.number == 0:
                self.reward = -1

        def to_input(self) -> TransitionToolInput:
            """Convert the data to a TransactionToolInput object."""
            return TransitionToolInput(
                alloc=self.alloc,
                txs=self.txs,
                env=self.env,
                blob_params=self.blob_params,
            )

        def get_request_data(self) -> TransitionToolRequest:
            """Convert the data to a TransitionToolRequest object."""
            return TransitionToolRequest(
                state=TransitionToolContext(
                    fork=self.fork_name,
                    chain_id=self.chain_id,
                    reward=self.reward,
                ),
                input=self.to_input(),
            )

    def _evaluate_filesystem(
        self,
        *,
        t8n_data: TransitionToolData,
        debug_output_path: Path | None,
        profiler: Profiler,
    ) -> TransitionToolOutput:
        """
        Execute a transition tool using the filesystem for its inputs and
        outputs.
        """
        temp_dir = tempfile.TemporaryDirectory()
        temp_dir_path = Path(temp_dir.name)
        os.mkdir(os.path.join(temp_dir.name, "input"))
        os.mkdir(os.path.join(temp_dir.name, "output"))

        input_paths = t8n_data.to_input().to_files(
            temp_dir_path / "input", **model_dump_config
        )

        output_paths = {
            output: os.path.join("output", f"{output}.json")
            for output in ["alloc", "result"]
        }
        output_paths["body"] = os.path.join("output", "txs.rlp")

        # Get fork name and apply any tool-specific mapping
        fork_name = (
            t8n_data.fork_name_if_supports_blob_params
            if self.supports_blob_params
            else t8n_data.fork_name
        )
        fork_name = self.fork_name_map.get(fork_name, fork_name)

        # Construct args for evmone-t8n binary
        args = [
            str(self.binary),
            "--state.fork",
            fork_name,
            "--input.alloc",
            input_paths["alloc"],
            "--input.env",
            input_paths["env"],
            "--input.txs",
            input_paths["txs"],
            "--output.basedir",
            temp_dir.name,
            "--output.result",
            output_paths["result"],
            "--output.alloc",
            output_paths["alloc"],
            "--output.body",
            output_paths["body"],
            "--state.reward",
            str(t8n_data.reward),
            "--state.chainid",
            str(t8n_data.chain_id),
        ]
        if self.supports_opcode_count and self.opcode_count is not None:
            args.extend(
                [
                    "--opcode.count",
                    "opcodes.json",
                ]
            )
        if self.supports_blob_params and input_paths.get("blobParams"):
            args.extend(
                [
                    "--input.blobParams",
                    input_paths["blobParams"],
                ]
            )

        if self.trace:
            args.append("--trace")

        result = subprocess.run(
            args,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
        )

        if debug_output_path:
            with profiler.pause():
                if os.path.exists(debug_output_path):
                    shutil.rmtree(debug_output_path)
                shutil.copytree(temp_dir.name, debug_output_path)
                t8n_output_base_dir = os.path.join(
                    debug_output_path, "t8n.sh.out"
                )
                t8n_call = " ".join(args)
                for file_path in input_paths.values():  # update input paths
                    t8n_call = t8n_call.replace(
                        os.path.dirname(file_path),
                        os.path.join(debug_output_path, "input"),
                    )
                # use a new output path for basedir and outputs
                t8n_call = t8n_call.replace(
                    temp_dir.name,
                    t8n_output_base_dir,
                )
                t8n_script = textwrap.dedent(
                    f"""\
                    #!/bin/bash
                    # hard-coded to avoid surprises
                    rm -rf {debug_output_path}/t8n.sh.out
                    mkdir -p {debug_output_path}/t8n.sh.out/output
                    {t8n_call}
                    """
                )
                dump_files_to_directory(
                    debug_output_path,
                    {
                        "args.py": args,
                        "returncode.txt": result.returncode,
                        "stdout.txt": result.stdout.decode(),
                        "stderr.txt": result.stderr.decode(),
                        "t8n.sh+x": t8n_script,
                    },
                )

        if result.returncode != 0:
            raise Exception("failed to evaluate: " + result.stderr.decode())

        output = TransitionToolOutput.model_validate_files(
            temp_dir_path / "output",
            context={"exception_mapper": self.exception_mapper},
        )
        if self.supports_opcode_count and self.opcode_count is not None:
            opcode_count_file_path = Path(temp_dir.name) / "opcodes.json"
            if opcode_count_file_path.exists():
                opcode_count = OpcodeCount.model_validate_json(
                    opcode_count_file_path.read_text()
                )
                output.result.opcode_count = opcode_count

                if debug_output_path:
                    with profiler.pause():
                        dump_files_to_directory(
                            debug_output_path,
                            {
                                "opcodes.json": opcode_count.model_dump(),
                            },
                        )

        if self.trace:
            output.result.traces = self.collect_traces(
                output.result.receipts, temp_dir, debug_output_path
            )

        temp_dir.cleanup()

        return output

    def _restart_server(self) -> None:
        """Check if server is still responsive and restart if needed."""
        self.shutdown()
        time.sleep(0.1)
        self.start_server()

    def _server_post(
        self,
        data: Dict[str, Any],
        timeout: int,
        url_args: Optional[Dict[str, List[str] | str]] = None,
        retries: int = 5,
    ) -> Response:
        """Send a POST request to the t8n-server and return the response."""
        if url_args is None:
            url_args = {}
        post_delay = 0.1

        while True:
            try:
                response = Session().post(
                    f"{self.server_url}?{urlencode(url_args, doseq=True)}",
                    json=data,
                    timeout=timeout,
                )
                break
            except (RequestsConnectionError, ReadTimeout) as e:
                self._restart_server()
                retries -= 1
                if retries == 0:
                    raise e
                time.sleep(post_delay)
                post_delay *= 2
        response.raise_for_status()
        if response.status_code != 200:
            raise Exception(
                f"t8n-server returned status code {response.status_code}, "
                f"response: {response.text}"
            )
        return response

    def _generate_post_args(
        self, t8n_data: TransitionToolData
    ) -> Dict[str, List[str] | str]:
        """Generate the arguments for the POST request to the t8n-server."""
        del t8n_data
        return {}

    def _evaluate_server(
        self,
        *,
        t8n_data: TransitionToolData,
        debug_output_path: Path | None,
        timeout: int,
        profiler: Profiler,
    ) -> TransitionToolOutput:
        """
        Execute the transition tool sending inputs and outputs via a server.
        """
        request_data = t8n_data.get_request_data()
        request_data_json = request_data.model_dump(
            mode="json", **model_dump_config
        )

        temp_dir = tempfile.TemporaryDirectory()
        request_data_json["trace"] = self.trace
        if self.trace:
            request_data_json["output-basedir"] = temp_dir.name

        if debug_output_path:
            with profiler.pause():
                request_data_str = json.dumps(request_data_json, indent=2)
                request_info = (
                    f"Server URL: {self.server_url}\n\n"
                    f"Request Data:\n{request_data_str}\n"
                )
                dump_files_to_directory(
                    debug_output_path,
                    {
                        "input/alloc.json": request_data.input.alloc.raw
                        if isinstance(request_data.input.alloc, LazyAlloc)
                        else request_data.input.alloc.model_dump(
                            mode="json", **model_dump_config
                        ),
                        "input/env.json": request_data.input.env,
                        "input/txs.json": [
                            tx.model_dump(mode="json", **model_dump_config)
                            for tx in request_data.input.txs
                        ],
                        "input/blob_params.json": (
                            request_data.input.blob_params
                        ),
                        "request_info.txt": request_info,
                    },
                )

        response = self._server_post(
            data=request_data_json,
            url_args=self._generate_post_args(t8n_data),
            timeout=timeout,
        )
        response_json = response.json()

        # pop optional test ``_info`` metadata from response, if present
        self._info_metadata = response_json.pop("_info_metadata", {})

        output: TransitionToolOutput = TransitionToolOutput.model_validate(
            response_json, context={"exception_mapper": self.exception_mapper}
        )

        if self.trace:
            output.result.traces = self.collect_traces(
                output.result.receipts, temp_dir, debug_output_path
            )
        temp_dir.cleanup()

        if debug_output_path:
            with profiler.pause():
                headers_str = json.dumps(dict(response.headers), indent=2)
                response_info = (
                    f"Status Code: {response.status_code}\n\n"
                    f"Headers:\n{headers_str}\n\n"
                    f"Content:\n{response.text}\n"
                )
                dump_files_to_directory(
                    debug_output_path,
                    {
                        "output/alloc.json": output.alloc.raw,
                        "output/result.json": output.result,
                        "output/txs.rlp": str(output.body),
                        "response_info.txt": response_info,
                    },
                )

        return output

    def _evaluate_stream(
        self,
        *,
        t8n_data: TransitionToolData,
        debug_output_path: Path | None,
        profiler: Profiler,
    ) -> TransitionToolOutput:
        """
        Execute a transition tool using stdin and stdout for its inputs and
        outputs.
        """
        temp_dir = tempfile.TemporaryDirectory()
        args = self.construct_args_stream(t8n_data, temp_dir)

        stdin = t8n_data.to_input()

        process_input = stdin.model_dump_json(**model_dump_config)
        encoded_process_input = process_input.encode()

        result = subprocess.run(
            args,
            input=encoded_process_input,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
        )

        if debug_output_path:
            with profiler.pause():
                self.dump_debug_stream(
                    debug_output_path, temp_dir, stdin, args, result
                )

        if result.returncode != 0:
            raise Exception("failed to evaluate: " + result.stderr.decode())

        output: TransitionToolOutput = (
            TransitionToolOutput.model_validate_json(
                result.stdout,
                context={"exception_mapper": self.exception_mapper},
            )
        )

        if debug_output_path:
            with profiler.pause():
                dump_files_to_directory(
                    debug_output_path,
                    {
                        "output/alloc.json": output.alloc.raw,
                        "output/result.json": output.result,
                        "output/txs.rlp": str(output.body),
                    },
                )

        if self.supports_opcode_count:
            opcode_count_file_path = Path(temp_dir.name) / "opcodes.json"
            if opcode_count_file_path.exists():
                opcode_count = OpcodeCount.model_validate_json(
                    opcode_count_file_path.read_text()
                )
                output.result.opcode_count = opcode_count

                if debug_output_path:
                    with profiler.pause():
                        dump_files_to_directory(
                            debug_output_path,
                            {
                                "opcodes.json": opcode_count.model_dump(),
                            },
                        )

        if self.trace:
            output.result.traces = self.collect_traces(
                output.result.receipts, temp_dir, debug_output_path
            )

        temp_dir.cleanup()
        return output

    def safe_t8n_args(
        self,
        fork_name: str,
        chain_id: int,
        reward: int,
        temp_dir: tempfile.TemporaryDirectory | None = None,
    ) -> List[str]:
        """Safely construct t8n arguments with validated inputs."""
        # Validate fork name against actual transition tool names from all
        # available forks
        valid_forks = get_valid_transition_tool_names()
        if fork_name not in valid_forks:
            raise ValueError(f"Invalid fork name: {fork_name}")

        # Validate chain ID (should be positive integer)
        if not isinstance(chain_id, int) or chain_id <= 0:
            raise ValueError(f"Invalid chain ID: {chain_id}")

        # Validate reward (should be non-negative integer)
        if not isinstance(reward, int) or reward < 0:
            raise ValueError(f"Invalid reward: {reward}")

        # Use literal strings for command flags
        input_alloc: LiteralString = "--input.alloc=stdin"
        input_txs: LiteralString = "--input.txs=stdin"
        input_env: LiteralString = "--input.env=stdin"
        output_result: LiteralString = "--output.result=stdout"
        output_alloc: LiteralString = "--output.alloc=stdout"
        output_body: LiteralString = "--output.body=stdout"
        trace_flag: LiteralString = "--trace"

        args = [
            input_alloc,
            input_txs,
            input_env,
            output_result,
            output_alloc,
            output_body,
            f"--state.fork={fork_name}",
            f"--state.chainid={chain_id}",
            f"--state.reward={reward}",
        ]

        if temp_dir and (self.trace or self.supports_opcode_count):
            args.append(f"--output.basedir={temp_dir.name}")
        if self.trace:
            args.append(trace_flag)
        if self.supports_opcode_count and temp_dir:
            args.extend(["--opcode.count", "opcodes.json"])

        return args

    def construct_args_stream(
        self,
        t8n_data: TransitionToolData,
        temp_dir: tempfile.TemporaryDirectory,
    ) -> List[str]:
        """Construct arguments for t8n interaction via streams."""
        command: list[str] = [str(self.binary)]
        if self.subcommand:
            command.append(self.subcommand)

        safe_args = self.safe_t8n_args(
            t8n_data.fork_name, t8n_data.chain_id, t8n_data.reward, temp_dir
        )
        return command + safe_args

    def dump_debug_stream(
        self,
        debug_output_path: Path,
        temp_dir: tempfile.TemporaryDirectory,
        stdin: TransitionToolInput,
        args: List[str],
        result: subprocess.CompletedProcess,
    ) -> None:
        """
        Export debug files if requested when interacting with t8n via streams.
        """
        t8n_call = " ".join(args)
        t8n_output_base_dir = os.path.join(debug_output_path, "t8n.sh.out")
        if self.trace:
            t8n_call = t8n_call.replace(temp_dir.name, t8n_output_base_dir)
        t8n_script = textwrap.dedent(
            f"""\
            #!/bin/bash
            # hard-coded to avoid surprises
            rm -rf {debug_output_path}/t8n.sh.out

            # unused if tracing is not enabled
            mkdir {debug_output_path}/t8n.sh.out
            {t8n_call} < {debug_output_path}/stdin.txt
            """
        )
        dump_files_to_directory(
            debug_output_path,
            {
                "args.py": args,
                "input/alloc.json": stdin.alloc,
                "input/env.json": stdin.env,
                "input/txs.json": [
                    tx.model_dump(mode="json", **model_dump_config)
                    for tx in stdin.txs
                ],
                "returncode.txt": result.returncode,
                "stdin.txt": stdin,
                "stdout.txt": result.stdout.decode(),
                "stderr.txt": result.stderr.decode(),
                "t8n.sh+x": t8n_script,
            },
        )

    def _evaluate(
        self,
        *,
        transition_tool_data: TransitionToolData,
        debug_output_path: Path | None,
        slow_request: bool,
        profiler: Profiler,
    ) -> TransitionToolOutput:
        """
        Execute the relevant evaluate method as required by the `t8n` tool.

        If a client's `t8n` tool varies from the default behavior, this method
        can be overridden.
        """
        if self.t8n_use_server:
            if not self.server_url:
                self.start_server()
            return self._evaluate_server(
                t8n_data=transition_tool_data,
                debug_output_path=debug_output_path,
                timeout=SLOW_REQUEST_TIMEOUT
                if slow_request
                else NORMAL_SERVER_TIMEOUT,
                profiler=profiler,
            )

        elif self.t8n_use_stream:
            return self._evaluate_stream(
                t8n_data=transition_tool_data,
                debug_output_path=debug_output_path,
                profiler=profiler,
            )
        else:
            return self._evaluate_filesystem(
                t8n_data=transition_tool_data,
                debug_output_path=debug_output_path,
                profiler=profiler,
            )

    def get_next_transition_tool_output_path(
        self, call_id: int
    ) -> Path | None:
        """Return path to the next transition tool output file."""
        debug_dump_dir = self.debug_dump_dir
        if debug_dump_dir is None:
            return None
        return debug_dump_dir / str(call_id)

    def increment_call_counter(self) -> int:
        """Increment the call counter by one and return the previous value."""
        previous_value = self.call_counter
        self.call_counter += 1
        return previous_value

    def process_result(
        self, result: TransitionToolOutput
    ) -> TransitionToolOutput:
        """
        Process the result of the transition tool evaluation performing the
        following operations:
        - Add opcode count to the result if available.
        """
        if (
            result.result.opcode_count is not None
            and self.opcode_count is not None
        ):
            self.opcode_count += result.result.opcode_count
        return result

    def evaluate(
        self,
        *,
        transition_tool_data: TransitionToolData,
        slow_request: bool = False,
    ) -> TransitionToolOutput:
        """
        Execute the relevant evaluate method as required by the `t8n` tool.

        If a client's `t8n` tool varies from the default behavior, this method
        can be overridden.
        """
        current_call_id = self.increment_call_counter()
        if self.output_cache is not None:
            cached_result = self.output_cache.get(current_call_id)
            if cached_result is not None:
                if self.trace and cached_result.result.traces is not None:
                    self.append_traces(cached_result.result.traces)
                return self.process_result(cached_result)
        debug_output_path = self.get_next_transition_tool_output_path(
            current_call_id
        )
        with Profiler(
            enabled=debug_output_path is not None,
            filename=debug_output_path / "profile.out"
            if debug_output_path
            else None,
        ) as profiler:
            result = self._evaluate(
                transition_tool_data=transition_tool_data,
                debug_output_path=debug_output_path,
                slow_request=slow_request,
                profiler=profiler,
            )
        if self.output_cache is not None:
            self.output_cache.set(current_call_id, result)
        return self.process_result(result)

__init__(*, exception_mapper=None, binary=None, trace=False) abstractmethod

Abstract initialization method that all subclasses must implement.

Source code in packages/testing/src/execution_testing/client_clis/transition_tool.py
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
@abstractmethod
def __init__(
    self,
    *,
    exception_mapper: Optional[ExceptionMapper] = None,
    binary: Optional[Path] = None,
    trace: bool = False,
):
    """
    Abstract initialization method that all subclasses must implement.
    """
    assert exception_mapper is not None
    self.exception_mapper = exception_mapper
    super().__init__(binary=binary)
    self.trace = trace
    self._info_metadata: Optional[Dict[str, Any]] = {}

__init_subclass__()

Register all subclasses of TransitionTool as possible tools.

Source code in packages/testing/src/execution_testing/client_clis/transition_tool.py
218
219
220
def __init_subclass__(cls) -> None:
    """Register all subclasses of TransitionTool as possible tools."""
    TransitionTool.register_tool(cls)

is_fork_supported(fork) abstractmethod

Return True if the fork is supported by the tool.

Source code in packages/testing/src/execution_testing/client_clis/transition_tool.py
222
223
224
225
@abstractmethod
def is_fork_supported(self, fork: Fork) -> bool:
    """Return True if the fork is supported by the tool."""
    pass

start_server()

Start the t8n-server process, extract the port, and leave it running for future reuse.

Source code in packages/testing/src/execution_testing/client_clis/transition_tool.py
227
228
229
230
231
232
def start_server(self) -> None:
    """
    Start the t8n-server process, extract the port, and leave it
    running for future reuse.
    """
    pass

shutdown()

Perform any cleanup tasks related to the tested tool.

Source code in packages/testing/src/execution_testing/client_clis/transition_tool.py
234
235
236
def shutdown(self) -> None:
    """Perform any cleanup tasks related to the tested tool."""
    pass

reset_traces()

Reset the internal trace storage for a new test to begin.

Source code in packages/testing/src/execution_testing/client_clis/transition_tool.py
238
239
240
def reset_traces(self) -> None:
    """Reset the internal trace storage for a new test to begin."""
    self.traces = []

append_traces(new_traces)

Append a list of traces of a state transition to the current list.

Source code in packages/testing/src/execution_testing/client_clis/transition_tool.py
242
243
244
245
246
247
248
def append_traces(self, new_traces: Traces) -> None:
    """
    Append a list of traces of a state transition to the current
    list.
    """
    assert self.traces is not None
    self.traces.append(new_traces)

get_traces()

Return the accumulated traces.

Source code in packages/testing/src/execution_testing/client_clis/transition_tool.py
250
251
252
def get_traces(self) -> List[Traces] | None:
    """Return the accumulated traces."""
    return self.traces

collect_traces(receipts, temp_dir, debug_output_path)

Collect the traces from the t8n tool output and store them in the traces list.

Source code in packages/testing/src/execution_testing/client_clis/transition_tool.py
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
def collect_traces(
    self,
    receipts: List[TransactionReceipt],
    temp_dir: tempfile.TemporaryDirectory,
    debug_output_path: Path | None,
) -> Traces:
    """
    Collect the traces from the t8n tool output and store them in the
    traces list.
    """
    traces: Traces = Traces(root=[])
    temp_dir_path = Path(temp_dir.name)
    for i, r in enumerate(receipts):
        trace_file_name = f"trace-{i}-{r.transaction_hash}.jsonl"
        trace_file_path = temp_dir_path / trace_file_name
        if not trace_file_path.exists():
            # Transaction was rejected mid-processing (e.g. EIP-3607
            # collision): the receipt exists but the tracer's
            # TransactionEnd event never fired, so no trace file was
            # written. Record an empty trace for this tx.
            traces.append(TransactionTraces(traces=[]))
            continue
        if debug_output_path:
            shutil.copy(
                trace_file_path,
                Path(debug_output_path) / trace_file_name,
            )
        traces.append(TransactionTraces.from_file(trace_file_path))
    self.append_traces(traces)
    return traces

set_cache(*, key)

Set the current cache key.

Creates the cache on first call, then reuses it for single-key eviction. Returns True if the key was already in the cache (hit).

Source code in packages/testing/src/execution_testing/client_clis/transition_tool.py
285
286
287
288
289
290
291
292
293
294
295
def set_cache(self, *, key: str) -> bool:
    """
    Set the current cache key.

    Creates the cache on first call, then reuses it for single-key
    eviction.
    Returns True if the key was already in the cache (hit).
    """
    if self.output_cache is None:
        self.output_cache = OutputCache()
    return self.output_cache.set_key(key)

remove_cache()

Clear the cache (test doesn't use caching).

Source code in packages/testing/src/execution_testing/client_clis/transition_tool.py
297
298
299
300
def remove_cache(self) -> None:
    """Clear the cache (test doesn't use caching)."""
    if self.output_cache is not None:
        self.output_cache.clear()

reset_opcode_count()

Reset the opcode count to zero.

Source code in packages/testing/src/execution_testing/client_clis/transition_tool.py
302
303
304
305
306
def reset_opcode_count(self) -> None:
    """
    Reset the opcode count to zero.
    """
    self.opcode_count = OpcodeCount({})

TransitionToolData dataclass

Transition tool files and data to pass between methods.

Source code in packages/testing/src/execution_testing/client_clis/transition_tool.py
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
@dataclass
class TransitionToolData:
    """Transition tool files and data to pass between methods."""

    alloc: Alloc | LazyAlloc
    txs: List[Transaction]
    env: Environment
    fork: Fork
    chain_id: int
    reward: int
    blob_schedule: BlobSchedule | None
    state_test: bool = False

    @property
    def fork_name(self) -> str:
        """Return the fork name."""
        return self.fork.transition_tool_name()

    @property
    def fork_name_if_supports_blob_params(self) -> str:
        """Return the fork name."""
        fork = self.fork()

        # For tools that support blob_params, return base fork for BPO
        # forks.
        if fork.bpo_fork():
            return fork.non_bpo_ancestor().transition_tool_name()
        else:
            return self.fork.transition_tool_name()

    @property
    def blob_params(self) -> ForkBlobSchedule | None:
        """Return the blob parameters for the current fork."""
        if self.blob_schedule:
            fork_name = self.fork.name()
            # Only return blob params if this fork has them
            if fork_name in self.blob_schedule.root:
                return self.blob_schedule[fork_name]
        return None

    def __post_init__(self) -> None:
        """Modify the reward if the environment number is 0."""
        if self.env.number == 0:
            self.reward = -1

    def to_input(self) -> TransitionToolInput:
        """Convert the data to a TransactionToolInput object."""
        return TransitionToolInput(
            alloc=self.alloc,
            txs=self.txs,
            env=self.env,
            blob_params=self.blob_params,
        )

    def get_request_data(self) -> TransitionToolRequest:
        """Convert the data to a TransitionToolRequest object."""
        return TransitionToolRequest(
            state=TransitionToolContext(
                fork=self.fork_name,
                chain_id=self.chain_id,
                reward=self.reward,
            ),
            input=self.to_input(),
        )

fork_name property

Return the fork name.

fork_name_if_supports_blob_params property

Return the fork name.

blob_params property

Return the blob parameters for the current fork.

__post_init__()

Modify the reward if the environment number is 0.

Source code in packages/testing/src/execution_testing/client_clis/transition_tool.py
348
349
350
351
def __post_init__(self) -> None:
    """Modify the reward if the environment number is 0."""
    if self.env.number == 0:
        self.reward = -1

to_input()

Convert the data to a TransactionToolInput object.

Source code in packages/testing/src/execution_testing/client_clis/transition_tool.py
353
354
355
356
357
358
359
360
def to_input(self) -> TransitionToolInput:
    """Convert the data to a TransactionToolInput object."""
    return TransitionToolInput(
        alloc=self.alloc,
        txs=self.txs,
        env=self.env,
        blob_params=self.blob_params,
    )

get_request_data()

Convert the data to a TransitionToolRequest object.

Source code in packages/testing/src/execution_testing/client_clis/transition_tool.py
362
363
364
365
366
367
368
369
370
371
def get_request_data(self) -> TransitionToolRequest:
    """Convert the data to a TransitionToolRequest object."""
    return TransitionToolRequest(
        state=TransitionToolContext(
            fork=self.fork_name,
            chain_id=self.chain_id,
            reward=self.reward,
        ),
        input=self.to_input(),
    )

safe_t8n_args(fork_name, chain_id, reward, temp_dir=None)

Safely construct t8n arguments with validated inputs.

Source code in packages/testing/src/execution_testing/client_clis/transition_tool.py
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
def safe_t8n_args(
    self,
    fork_name: str,
    chain_id: int,
    reward: int,
    temp_dir: tempfile.TemporaryDirectory | None = None,
) -> List[str]:
    """Safely construct t8n arguments with validated inputs."""
    # Validate fork name against actual transition tool names from all
    # available forks
    valid_forks = get_valid_transition_tool_names()
    if fork_name not in valid_forks:
        raise ValueError(f"Invalid fork name: {fork_name}")

    # Validate chain ID (should be positive integer)
    if not isinstance(chain_id, int) or chain_id <= 0:
        raise ValueError(f"Invalid chain ID: {chain_id}")

    # Validate reward (should be non-negative integer)
    if not isinstance(reward, int) or reward < 0:
        raise ValueError(f"Invalid reward: {reward}")

    # Use literal strings for command flags
    input_alloc: LiteralString = "--input.alloc=stdin"
    input_txs: LiteralString = "--input.txs=stdin"
    input_env: LiteralString = "--input.env=stdin"
    output_result: LiteralString = "--output.result=stdout"
    output_alloc: LiteralString = "--output.alloc=stdout"
    output_body: LiteralString = "--output.body=stdout"
    trace_flag: LiteralString = "--trace"

    args = [
        input_alloc,
        input_txs,
        input_env,
        output_result,
        output_alloc,
        output_body,
        f"--state.fork={fork_name}",
        f"--state.chainid={chain_id}",
        f"--state.reward={reward}",
    ]

    if temp_dir and (self.trace or self.supports_opcode_count):
        args.append(f"--output.basedir={temp_dir.name}")
    if self.trace:
        args.append(trace_flag)
    if self.supports_opcode_count and temp_dir:
        args.extend(["--opcode.count", "opcodes.json"])

    return args

construct_args_stream(t8n_data, temp_dir)

Construct arguments for t8n interaction via streams.

Source code in packages/testing/src/execution_testing/client_clis/transition_tool.py
793
794
795
796
797
798
799
800
801
802
803
804
805
806
def construct_args_stream(
    self,
    t8n_data: TransitionToolData,
    temp_dir: tempfile.TemporaryDirectory,
) -> List[str]:
    """Construct arguments for t8n interaction via streams."""
    command: list[str] = [str(self.binary)]
    if self.subcommand:
        command.append(self.subcommand)

    safe_args = self.safe_t8n_args(
        t8n_data.fork_name, t8n_data.chain_id, t8n_data.reward, temp_dir
    )
    return command + safe_args

dump_debug_stream(debug_output_path, temp_dir, stdin, args, result)

Export debug files if requested when interacting with t8n via streams.

Source code in packages/testing/src/execution_testing/client_clis/transition_tool.py
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
def dump_debug_stream(
    self,
    debug_output_path: Path,
    temp_dir: tempfile.TemporaryDirectory,
    stdin: TransitionToolInput,
    args: List[str],
    result: subprocess.CompletedProcess,
) -> None:
    """
    Export debug files if requested when interacting with t8n via streams.
    """
    t8n_call = " ".join(args)
    t8n_output_base_dir = os.path.join(debug_output_path, "t8n.sh.out")
    if self.trace:
        t8n_call = t8n_call.replace(temp_dir.name, t8n_output_base_dir)
    t8n_script = textwrap.dedent(
        f"""\
        #!/bin/bash
        # hard-coded to avoid surprises
        rm -rf {debug_output_path}/t8n.sh.out

        # unused if tracing is not enabled
        mkdir {debug_output_path}/t8n.sh.out
        {t8n_call} < {debug_output_path}/stdin.txt
        """
    )
    dump_files_to_directory(
        debug_output_path,
        {
            "args.py": args,
            "input/alloc.json": stdin.alloc,
            "input/env.json": stdin.env,
            "input/txs.json": [
                tx.model_dump(mode="json", **model_dump_config)
                for tx in stdin.txs
            ],
            "returncode.txt": result.returncode,
            "stdin.txt": stdin,
            "stdout.txt": result.stdout.decode(),
            "stderr.txt": result.stderr.decode(),
            "t8n.sh+x": t8n_script,
        },
    )

get_next_transition_tool_output_path(call_id)

Return path to the next transition tool output file.

Source code in packages/testing/src/execution_testing/client_clis/transition_tool.py
891
892
893
894
895
896
897
898
def get_next_transition_tool_output_path(
    self, call_id: int
) -> Path | None:
    """Return path to the next transition tool output file."""
    debug_dump_dir = self.debug_dump_dir
    if debug_dump_dir is None:
        return None
    return debug_dump_dir / str(call_id)

increment_call_counter()

Increment the call counter by one and return the previous value.

Source code in packages/testing/src/execution_testing/client_clis/transition_tool.py
900
901
902
903
904
def increment_call_counter(self) -> int:
    """Increment the call counter by one and return the previous value."""
    previous_value = self.call_counter
    self.call_counter += 1
    return previous_value

process_result(result)

Process the result of the transition tool evaluation performing the following operations: - Add opcode count to the result if available.

Source code in packages/testing/src/execution_testing/client_clis/transition_tool.py
906
907
908
909
910
911
912
913
914
915
916
917
918
919
def process_result(
    self, result: TransitionToolOutput
) -> TransitionToolOutput:
    """
    Process the result of the transition tool evaluation performing the
    following operations:
    - Add opcode count to the result if available.
    """
    if (
        result.result.opcode_count is not None
        and self.opcode_count is not None
    ):
        self.opcode_count += result.result.opcode_count
    return result

evaluate(*, transition_tool_data, slow_request=False)

Execute the relevant evaluate method as required by the t8n tool.

If a client's t8n tool varies from the default behavior, this method can be overridden.

Source code in packages/testing/src/execution_testing/client_clis/transition_tool.py
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
def evaluate(
    self,
    *,
    transition_tool_data: TransitionToolData,
    slow_request: bool = False,
) -> TransitionToolOutput:
    """
    Execute the relevant evaluate method as required by the `t8n` tool.

    If a client's `t8n` tool varies from the default behavior, this method
    can be overridden.
    """
    current_call_id = self.increment_call_counter()
    if self.output_cache is not None:
        cached_result = self.output_cache.get(current_call_id)
        if cached_result is not None:
            if self.trace and cached_result.result.traces is not None:
                self.append_traces(cached_result.result.traces)
            return self.process_result(cached_result)
    debug_output_path = self.get_next_transition_tool_output_path(
        current_call_id
    )
    with Profiler(
        enabled=debug_output_path is not None,
        filename=debug_output_path / "profile.out"
        if debug_output_path
        else None,
    ) as profiler:
        result = self._evaluate(
            transition_tool_data=transition_tool_data,
            debug_output_path=debug_output_path,
            slow_request=slow_request,
            profiler=profiler,
        )
    if self.output_cache is not None:
        self.output_cache.set(current_call_id, result)
    return self.process_result(result)