327 lines
12 KiB
Python
327 lines
12 KiB
Python
|
# Licensed under the GPL: https://www.gnu.org/licenses/old-licenses/gpl-2.0.html
|
||
|
# For details: https://github.com/pylint-dev/pylint/blob/main/LICENSE
|
||
|
# Copyright (c) https://github.com/pylint-dev/pylint/blob/main/CONTRIBUTORS.txt
|
||
|
|
||
|
from __future__ import annotations
|
||
|
|
||
|
import csv
|
||
|
import operator
|
||
|
import platform
|
||
|
import sys
|
||
|
from collections import Counter
|
||
|
from io import StringIO
|
||
|
from pathlib import Path
|
||
|
from typing import Counter as CounterType
|
||
|
from typing import TextIO, Tuple
|
||
|
|
||
|
import pytest
|
||
|
from _pytest.config import Config
|
||
|
|
||
|
from pylint import checkers
|
||
|
from pylint.config.config_initialization import _config_initialization
|
||
|
from pylint.constants import IS_PYPY
|
||
|
from pylint.lint import PyLinter
|
||
|
from pylint.message.message import Message
|
||
|
from pylint.testutils.constants import _EXPECTED_RE, _OPERATORS, UPDATE_OPTION
|
||
|
|
||
|
# need to import from functional.test_file to avoid cyclic import
|
||
|
from pylint.testutils.functional.test_file import (
|
||
|
FunctionalTestFile,
|
||
|
NoFileError,
|
||
|
parse_python_version,
|
||
|
)
|
||
|
from pylint.testutils.output_line import OutputLine
|
||
|
from pylint.testutils.reporter_for_tests import FunctionalTestReporter
|
||
|
|
||
|
MessageCounter = CounterType[Tuple[int, str]]
|
||
|
|
||
|
PYLINTRC = Path(__file__).parent / "testing_pylintrc"
|
||
|
|
||
|
|
||
|
class LintModuleTest:
|
||
|
maxDiff = None
|
||
|
|
||
|
def __init__(
|
||
|
self, test_file: FunctionalTestFile, config: Config | None = None
|
||
|
) -> None:
|
||
|
_test_reporter = FunctionalTestReporter()
|
||
|
self._linter = PyLinter()
|
||
|
self._linter.config.persistent = 0
|
||
|
checkers.initialize(self._linter)
|
||
|
|
||
|
# See if test has its own .rc file, if so we use that one
|
||
|
rc_file: Path | str = PYLINTRC
|
||
|
try:
|
||
|
rc_file = test_file.option_file
|
||
|
self._linter.disable("suppressed-message")
|
||
|
self._linter.disable("locally-disabled")
|
||
|
self._linter.disable("useless-suppression")
|
||
|
except NoFileError:
|
||
|
pass
|
||
|
|
||
|
self._test_file = test_file
|
||
|
try:
|
||
|
args = [test_file.source]
|
||
|
except NoFileError:
|
||
|
# If we're still raising NoFileError the actual source file doesn't exist
|
||
|
args = [""]
|
||
|
if config and config.getoption("minimal_messages_config"):
|
||
|
with self._open_source_file() as f:
|
||
|
messages_to_enable = {msg[1] for msg in self.get_expected_messages(f)}
|
||
|
# Always enable fatal errors
|
||
|
messages_to_enable.add("astroid-error")
|
||
|
messages_to_enable.add("fatal")
|
||
|
messages_to_enable.add("syntax-error")
|
||
|
args.extend(["--disable=all", f"--enable={','.join(messages_to_enable)}"])
|
||
|
|
||
|
# Add testoptions
|
||
|
self._linter._arg_parser.add_argument(
|
||
|
"--min_pyver", type=parse_python_version, default=(2, 5)
|
||
|
)
|
||
|
self._linter._arg_parser.add_argument(
|
||
|
"--max_pyver", type=parse_python_version, default=(4, 0)
|
||
|
)
|
||
|
self._linter._arg_parser.add_argument(
|
||
|
"--min_pyver_end_position", type=parse_python_version, default=(3, 8)
|
||
|
)
|
||
|
self._linter._arg_parser.add_argument(
|
||
|
"--requires", type=lambda s: [i.strip() for i in s.split(",")], default=[]
|
||
|
)
|
||
|
self._linter._arg_parser.add_argument(
|
||
|
"--except_implementations",
|
||
|
type=lambda s: [i.strip() for i in s.split(",")],
|
||
|
default=[],
|
||
|
)
|
||
|
self._linter._arg_parser.add_argument(
|
||
|
"--exclude_platforms",
|
||
|
type=lambda s: [i.strip() for i in s.split(",")],
|
||
|
default=[],
|
||
|
)
|
||
|
self._linter._arg_parser.add_argument(
|
||
|
"--exclude_from_minimal_messages_config", default=False
|
||
|
)
|
||
|
|
||
|
_config_initialization(
|
||
|
self._linter, args_list=args, config_file=rc_file, reporter=_test_reporter
|
||
|
)
|
||
|
|
||
|
self._check_end_position = (
|
||
|
sys.version_info >= self._linter.config.min_pyver_end_position
|
||
|
)
|
||
|
# TODO: PY3.9: PyPy supports end_lineno from 3.9 and above
|
||
|
if self._check_end_position and IS_PYPY:
|
||
|
self._check_end_position = sys.version_info >= (3, 9) # pragma: no cover
|
||
|
|
||
|
self._config = config
|
||
|
|
||
|
def setUp(self) -> None:
|
||
|
if self._should_be_skipped_due_to_version():
|
||
|
pytest.skip(
|
||
|
f"Test cannot run with Python {sys.version.split(' ', maxsplit=1)[0]}."
|
||
|
)
|
||
|
missing = []
|
||
|
for requirement in self._linter.config.requires:
|
||
|
try:
|
||
|
__import__(requirement)
|
||
|
except ImportError:
|
||
|
missing.append(requirement)
|
||
|
if missing:
|
||
|
pytest.skip(f"Requires {','.join(missing)} to be present.")
|
||
|
except_implementations = self._linter.config.except_implementations
|
||
|
if except_implementations:
|
||
|
if platform.python_implementation() in except_implementations:
|
||
|
msg = "Test cannot run with Python implementation %r"
|
||
|
pytest.skip(msg % platform.python_implementation())
|
||
|
excluded_platforms = self._linter.config.exclude_platforms
|
||
|
if excluded_platforms:
|
||
|
if sys.platform.lower() in excluded_platforms:
|
||
|
pytest.skip(f"Test cannot run on platform {sys.platform!r}")
|
||
|
if (
|
||
|
self._config
|
||
|
and self._config.getoption("minimal_messages_config")
|
||
|
and self._linter.config.exclude_from_minimal_messages_config
|
||
|
):
|
||
|
pytest.skip("Test excluded from --minimal-messages-config")
|
||
|
|
||
|
def runTest(self) -> None:
|
||
|
self._runTest()
|
||
|
|
||
|
def _should_be_skipped_due_to_version(self) -> bool:
|
||
|
return ( # type: ignore[no-any-return]
|
||
|
sys.version_info < self._linter.config.min_pyver
|
||
|
or sys.version_info > self._linter.config.max_pyver
|
||
|
)
|
||
|
|
||
|
def __str__(self) -> str:
|
||
|
return f"{self._test_file.base} ({self.__class__.__module__}.{self.__class__.__name__})"
|
||
|
|
||
|
@staticmethod
|
||
|
def get_expected_messages(stream: TextIO) -> MessageCounter:
|
||
|
"""Parses a file and get expected messages.
|
||
|
|
||
|
:param stream: File-like input stream.
|
||
|
:type stream: enumerable
|
||
|
:returns: A dict mapping line,msg-symbol tuples to the count on this line.
|
||
|
:rtype: dict
|
||
|
"""
|
||
|
messages: MessageCounter = Counter()
|
||
|
for i, line in enumerate(stream):
|
||
|
match = _EXPECTED_RE.search(line)
|
||
|
if match is None:
|
||
|
continue
|
||
|
line = match.group("line")
|
||
|
if line is None:
|
||
|
lineno = i + 1
|
||
|
elif line.startswith("+") or line.startswith("-"):
|
||
|
lineno = i + 1 + int(line)
|
||
|
else:
|
||
|
lineno = int(line)
|
||
|
|
||
|
version = match.group("version")
|
||
|
op = match.group("op")
|
||
|
if version:
|
||
|
required = parse_python_version(version)
|
||
|
if not _OPERATORS[op](sys.version_info, required):
|
||
|
continue
|
||
|
|
||
|
for msg_id in match.group("msgs").split(","):
|
||
|
messages[lineno, msg_id.strip()] += 1
|
||
|
return messages
|
||
|
|
||
|
@staticmethod
|
||
|
def multiset_difference(
|
||
|
expected_entries: MessageCounter,
|
||
|
actual_entries: MessageCounter,
|
||
|
) -> tuple[MessageCounter, dict[tuple[int, str], int]]:
|
||
|
"""Takes two multisets and compares them.
|
||
|
|
||
|
A multiset is a dict with the cardinality of the key as the value.
|
||
|
"""
|
||
|
missing = expected_entries.copy()
|
||
|
missing.subtract(actual_entries)
|
||
|
unexpected = {}
|
||
|
for key, value in list(missing.items()):
|
||
|
if value <= 0:
|
||
|
missing.pop(key)
|
||
|
if value < 0:
|
||
|
unexpected[key] = -value
|
||
|
return missing, unexpected
|
||
|
|
||
|
def _open_expected_file(self) -> TextIO:
|
||
|
try:
|
||
|
return open(self._test_file.expected_output, encoding="utf-8")
|
||
|
except FileNotFoundError:
|
||
|
return StringIO("")
|
||
|
|
||
|
def _open_source_file(self) -> TextIO:
|
||
|
if self._test_file.base == "invalid_encoded_data":
|
||
|
return open(self._test_file.source, encoding="utf-8")
|
||
|
if "latin1" in self._test_file.base:
|
||
|
return open(self._test_file.source, encoding="latin1")
|
||
|
return open(self._test_file.source, encoding="utf8")
|
||
|
|
||
|
def _get_expected(self) -> tuple[MessageCounter, list[OutputLine]]:
|
||
|
with self._open_source_file() as f:
|
||
|
expected_msgs = self.get_expected_messages(f)
|
||
|
if not expected_msgs:
|
||
|
expected_msgs = Counter()
|
||
|
with self._open_expected_file() as f:
|
||
|
expected_output_lines = [
|
||
|
OutputLine.from_csv(row, self._check_end_position)
|
||
|
for row in csv.reader(f, "test")
|
||
|
]
|
||
|
return expected_msgs, expected_output_lines
|
||
|
|
||
|
def _get_actual(self) -> tuple[MessageCounter, list[OutputLine]]:
|
||
|
messages: list[Message] = self._linter.reporter.messages
|
||
|
messages.sort(key=lambda m: (m.line, m.symbol, m.msg))
|
||
|
received_msgs: MessageCounter = Counter()
|
||
|
received_output_lines = []
|
||
|
for msg in messages:
|
||
|
assert (
|
||
|
msg.symbol != "fatal"
|
||
|
), f"Pylint analysis failed because of '{msg.msg}'"
|
||
|
received_msgs[msg.line, msg.symbol] += 1
|
||
|
received_output_lines.append(
|
||
|
OutputLine.from_msg(msg, self._check_end_position)
|
||
|
)
|
||
|
return received_msgs, received_output_lines
|
||
|
|
||
|
def _runTest(self) -> None:
|
||
|
__tracebackhide__ = True # pylint: disable=unused-variable
|
||
|
modules_to_check = [self._test_file.source]
|
||
|
self._linter.check(modules_to_check)
|
||
|
expected_messages, expected_output = self._get_expected()
|
||
|
actual_messages, actual_output = self._get_actual()
|
||
|
assert (
|
||
|
expected_messages == actual_messages
|
||
|
), self.error_msg_for_unequal_messages(
|
||
|
actual_messages, expected_messages, actual_output
|
||
|
)
|
||
|
self._check_output_text(expected_messages, expected_output, actual_output)
|
||
|
|
||
|
def error_msg_for_unequal_messages(
|
||
|
self,
|
||
|
actual_messages: MessageCounter,
|
||
|
expected_messages: MessageCounter,
|
||
|
actual_output: list[OutputLine],
|
||
|
) -> str:
|
||
|
msg = [f'Wrong message(s) raised for "{Path(self._test_file.source).name}":']
|
||
|
missing, unexpected = self.multiset_difference(
|
||
|
expected_messages, actual_messages
|
||
|
)
|
||
|
if missing:
|
||
|
msg.append("\nExpected in testdata:")
|
||
|
msg.extend(f" {msg[0]:3}: {msg[1]}" for msg in sorted(missing))
|
||
|
if unexpected:
|
||
|
msg.append("\nUnexpected in testdata:")
|
||
|
msg.extend(f" {msg[0]:3}: {msg[1]}" for msg in sorted(unexpected))
|
||
|
error_msg = "\n".join(msg)
|
||
|
if self._config and self._config.getoption("verbose") > 0:
|
||
|
error_msg += "\n\nActual pylint output for this file:\n"
|
||
|
error_msg += "\n".join(str(o) for o in actual_output)
|
||
|
return error_msg
|
||
|
|
||
|
def error_msg_for_unequal_output(
|
||
|
self,
|
||
|
expected_lines: list[OutputLine],
|
||
|
received_lines: list[OutputLine],
|
||
|
) -> str:
|
||
|
missing = set(expected_lines) - set(received_lines)
|
||
|
unexpected = set(received_lines) - set(expected_lines)
|
||
|
error_msg = f'Wrong output for "{Path(self._test_file.expected_output).name}":'
|
||
|
sort_by_line_number = operator.attrgetter("lineno")
|
||
|
if missing:
|
||
|
error_msg += "\n- Missing lines:\n"
|
||
|
for line in sorted(missing, key=sort_by_line_number):
|
||
|
error_msg += f"{line}\n"
|
||
|
if unexpected:
|
||
|
error_msg += "\n- Unexpected lines:\n"
|
||
|
for line in sorted(unexpected, key=sort_by_line_number):
|
||
|
error_msg += f"{line}\n"
|
||
|
error_msg += (
|
||
|
"\nYou can update the expected output automatically with:\n'"
|
||
|
f"python tests/test_functional.py {UPDATE_OPTION} -k "
|
||
|
f'"test_functional[{self._test_file.base}]"\'\n\n'
|
||
|
"Here's the update text in case you can't:\n"
|
||
|
)
|
||
|
expected_csv = StringIO()
|
||
|
writer = csv.writer(expected_csv, dialect="test")
|
||
|
for line in sorted(received_lines, key=sort_by_line_number):
|
||
|
writer.writerow(line.to_csv())
|
||
|
error_msg += expected_csv.getvalue()
|
||
|
return error_msg
|
||
|
|
||
|
def _check_output_text(
|
||
|
self,
|
||
|
_: MessageCounter,
|
||
|
expected_output: list[OutputLine],
|
||
|
actual_output: list[OutputLine],
|
||
|
) -> None:
|
||
|
"""This is a function because we want to be able to update the text in
|
||
|
LintModuleOutputUpdate.
|
||
|
"""
|
||
|
assert expected_output == actual_output, self.error_msg_for_unequal_output(
|
||
|
expected_output, actual_output
|
||
|
)
|