416 lines
14 KiB
Python
416 lines
14 KiB
Python
|
"""Finders try to find right section for passed module name"""
|
||
|
import importlib.machinery
|
||
|
import inspect
|
||
|
import os
|
||
|
import os.path
|
||
|
import re
|
||
|
import sys
|
||
|
import sysconfig
|
||
|
from abc import ABCMeta, abstractmethod
|
||
|
from contextlib import contextmanager
|
||
|
from fnmatch import fnmatch
|
||
|
from functools import lru_cache
|
||
|
from glob import glob
|
||
|
from pathlib import Path
|
||
|
from typing import Dict, Iterable, Iterator, List, Optional, Pattern, Sequence, Tuple, Type
|
||
|
|
||
|
from isort import sections
|
||
|
from isort.settings import KNOWN_SECTION_MAPPING, Config
|
||
|
from isort.utils import exists_case_sensitive
|
||
|
|
||
|
try:
|
||
|
from pipreqs import pipreqs # type: ignore
|
||
|
|
||
|
except ImportError:
|
||
|
pipreqs = None
|
||
|
|
||
|
try:
|
||
|
from pip_api import parse_requirements # type: ignore
|
||
|
|
||
|
except ImportError:
|
||
|
parse_requirements = None
|
||
|
|
||
|
try:
|
||
|
from requirementslib import Pipfile # type: ignore
|
||
|
|
||
|
except ImportError:
|
||
|
Pipfile = None
|
||
|
|
||
|
|
||
|
@contextmanager
|
||
|
def chdir(path: str) -> Iterator[None]:
|
||
|
"""Context manager for changing dir and restoring previous workdir after exit."""
|
||
|
curdir = os.getcwd()
|
||
|
os.chdir(path)
|
||
|
try:
|
||
|
yield
|
||
|
finally:
|
||
|
os.chdir(curdir)
|
||
|
|
||
|
|
||
|
class BaseFinder(metaclass=ABCMeta):
|
||
|
def __init__(self, config: Config) -> None:
|
||
|
self.config = config
|
||
|
|
||
|
@abstractmethod
|
||
|
def find(self, module_name: str) -> Optional[str]:
|
||
|
raise NotImplementedError
|
||
|
|
||
|
|
||
|
class ForcedSeparateFinder(BaseFinder):
|
||
|
def find(self, module_name: str) -> Optional[str]:
|
||
|
for forced_separate in self.config.forced_separate:
|
||
|
# Ensure all forced_separate patterns will match to end of string
|
||
|
path_glob = forced_separate
|
||
|
if not forced_separate.endswith("*"):
|
||
|
path_glob = f"{forced_separate}*"
|
||
|
|
||
|
if fnmatch(module_name, path_glob) or fnmatch(module_name, "." + path_glob):
|
||
|
return forced_separate
|
||
|
return None
|
||
|
|
||
|
|
||
|
class LocalFinder(BaseFinder):
|
||
|
def find(self, module_name: str) -> Optional[str]:
|
||
|
if module_name.startswith("."):
|
||
|
return "LOCALFOLDER"
|
||
|
return None
|
||
|
|
||
|
|
||
|
class KnownPatternFinder(BaseFinder):
|
||
|
def __init__(self, config: Config) -> None:
|
||
|
super().__init__(config)
|
||
|
|
||
|
self.known_patterns: List[Tuple[Pattern[str], str]] = []
|
||
|
for placement in reversed(config.sections):
|
||
|
known_placement = KNOWN_SECTION_MAPPING.get(placement, placement).lower()
|
||
|
config_key = f"known_{known_placement}"
|
||
|
known_patterns = list(
|
||
|
getattr(self.config, config_key, self.config.known_other.get(known_placement, []))
|
||
|
)
|
||
|
known_patterns = [
|
||
|
pattern
|
||
|
for known_pattern in known_patterns
|
||
|
for pattern in self._parse_known_pattern(known_pattern)
|
||
|
]
|
||
|
for known_pattern in known_patterns:
|
||
|
regexp = "^" + known_pattern.replace("*", ".*").replace("?", ".?") + "$"
|
||
|
self.known_patterns.append((re.compile(regexp), placement))
|
||
|
|
||
|
def _parse_known_pattern(self, pattern: str) -> List[str]:
|
||
|
"""Expand pattern if identified as a directory and return found sub packages"""
|
||
|
if pattern.endswith(os.path.sep):
|
||
|
patterns = [
|
||
|
filename
|
||
|
for filename in os.listdir(os.path.join(self.config.directory, pattern))
|
||
|
if os.path.isdir(os.path.join(self.config.directory, pattern, filename))
|
||
|
]
|
||
|
else:
|
||
|
patterns = [pattern]
|
||
|
|
||
|
return patterns
|
||
|
|
||
|
def find(self, module_name: str) -> Optional[str]:
|
||
|
# Try to find most specific placement instruction match (if any)
|
||
|
parts = module_name.split(".")
|
||
|
module_names_to_check = (".".join(parts[:first_k]) for first_k in range(len(parts), 0, -1))
|
||
|
for module_name_to_check in module_names_to_check:
|
||
|
for pattern, placement in self.known_patterns:
|
||
|
if pattern.match(module_name_to_check):
|
||
|
return placement
|
||
|
return None
|
||
|
|
||
|
|
||
|
class PathFinder(BaseFinder):
|
||
|
def __init__(self, config: Config, path: str = ".") -> None:
|
||
|
super().__init__(config)
|
||
|
|
||
|
# restore the original import path (i.e. not the path to bin/isort)
|
||
|
root_dir = os.path.abspath(path)
|
||
|
src_dir = f"{root_dir}/src"
|
||
|
self.paths = [root_dir, src_dir]
|
||
|
|
||
|
# virtual env
|
||
|
self.virtual_env = self.config.virtual_env or os.environ.get("VIRTUAL_ENV")
|
||
|
if self.virtual_env:
|
||
|
self.virtual_env = os.path.realpath(self.virtual_env)
|
||
|
self.virtual_env_src = ""
|
||
|
if self.virtual_env:
|
||
|
self.virtual_env_src = f"{self.virtual_env}/src/"
|
||
|
for venv_path in glob(f"{self.virtual_env}/lib/python*/site-packages"):
|
||
|
if venv_path not in self.paths:
|
||
|
self.paths.append(venv_path)
|
||
|
for nested_venv_path in glob(f"{self.virtual_env}/lib/python*/*/site-packages"):
|
||
|
if nested_venv_path not in self.paths:
|
||
|
self.paths.append(nested_venv_path)
|
||
|
for venv_src_path in glob(f"{self.virtual_env}/src/*"):
|
||
|
if os.path.isdir(venv_src_path):
|
||
|
self.paths.append(venv_src_path)
|
||
|
|
||
|
# conda
|
||
|
self.conda_env = self.config.conda_env or os.environ.get("CONDA_PREFIX") or ""
|
||
|
if self.conda_env:
|
||
|
self.conda_env = os.path.realpath(self.conda_env)
|
||
|
for conda_path in glob(f"{self.conda_env}/lib/python*/site-packages"):
|
||
|
if conda_path not in self.paths:
|
||
|
self.paths.append(conda_path)
|
||
|
for nested_conda_path in glob(f"{self.conda_env}/lib/python*/*/site-packages"):
|
||
|
if nested_conda_path not in self.paths:
|
||
|
self.paths.append(nested_conda_path)
|
||
|
|
||
|
# handle case-insensitive paths on windows
|
||
|
self.stdlib_lib_prefix = os.path.normcase(sysconfig.get_paths()["stdlib"])
|
||
|
if self.stdlib_lib_prefix not in self.paths:
|
||
|
self.paths.append(self.stdlib_lib_prefix)
|
||
|
|
||
|
# add system paths
|
||
|
for system_path in sys.path[1:]:
|
||
|
if system_path not in self.paths:
|
||
|
self.paths.append(system_path)
|
||
|
|
||
|
def find(self, module_name: str) -> Optional[str]:
|
||
|
for prefix in self.paths:
|
||
|
package_path = "/".join((prefix, module_name.split(".")[0]))
|
||
|
path_obj = Path(package_path).resolve()
|
||
|
is_module = (
|
||
|
exists_case_sensitive(package_path + ".py")
|
||
|
or any(
|
||
|
exists_case_sensitive(package_path + ext_suffix)
|
||
|
for ext_suffix in importlib.machinery.EXTENSION_SUFFIXES
|
||
|
)
|
||
|
or exists_case_sensitive(package_path + "/__init__.py")
|
||
|
)
|
||
|
is_package = exists_case_sensitive(package_path) and os.path.isdir(package_path)
|
||
|
if is_module or is_package:
|
||
|
if (
|
||
|
"site-packages" in prefix
|
||
|
or "dist-packages" in prefix
|
||
|
or (self.virtual_env and self.virtual_env_src in prefix)
|
||
|
):
|
||
|
return sections.THIRDPARTY
|
||
|
if os.path.normcase(prefix) == self.stdlib_lib_prefix:
|
||
|
return sections.STDLIB
|
||
|
if self.conda_env and self.conda_env in prefix:
|
||
|
return sections.THIRDPARTY
|
||
|
for src_path in self.config.src_paths:
|
||
|
if src_path in path_obj.parents and not self.config.is_skipped(path_obj):
|
||
|
return sections.FIRSTPARTY
|
||
|
|
||
|
if os.path.normcase(prefix).startswith(self.stdlib_lib_prefix):
|
||
|
return sections.STDLIB # pragma: no cover - edge case for one OS. Hard to test.
|
||
|
|
||
|
return self.config.default_section
|
||
|
return None
|
||
|
|
||
|
|
||
|
class ReqsBaseFinder(BaseFinder):
|
||
|
enabled = False
|
||
|
|
||
|
def __init__(self, config: Config, path: str = ".") -> None:
|
||
|
super().__init__(config)
|
||
|
self.path = path
|
||
|
if self.enabled:
|
||
|
self.mapping = self._load_mapping()
|
||
|
self.names = self._load_names()
|
||
|
|
||
|
@abstractmethod
|
||
|
def _get_names(self, path: str) -> Iterator[str]:
|
||
|
raise NotImplementedError
|
||
|
|
||
|
@abstractmethod
|
||
|
def _get_files_from_dir(self, path: str) -> Iterator[str]:
|
||
|
raise NotImplementedError
|
||
|
|
||
|
@staticmethod
|
||
|
def _load_mapping() -> Optional[Dict[str, str]]:
|
||
|
"""Return list of mappings `package_name -> module_name`
|
||
|
|
||
|
Example:
|
||
|
django-haystack -> haystack
|
||
|
"""
|
||
|
if not pipreqs:
|
||
|
return None
|
||
|
path = os.path.dirname(inspect.getfile(pipreqs))
|
||
|
path = os.path.join(path, "mapping")
|
||
|
with open(path) as f:
|
||
|
mappings: Dict[str, str] = {} # pypi_name: import_name
|
||
|
for line in f:
|
||
|
import_name, _, pypi_name = line.strip().partition(":")
|
||
|
mappings[pypi_name] = import_name
|
||
|
return mappings
|
||
|
# return dict(tuple(line.strip().split(":")[::-1]) for line in f)
|
||
|
|
||
|
def _load_names(self) -> List[str]:
|
||
|
"""Return list of thirdparty modules from requirements"""
|
||
|
names = []
|
||
|
for path in self._get_files():
|
||
|
for name in self._get_names(path):
|
||
|
names.append(self._normalize_name(name))
|
||
|
return names
|
||
|
|
||
|
@staticmethod
|
||
|
def _get_parents(path: str) -> Iterator[str]:
|
||
|
prev = ""
|
||
|
while path != prev:
|
||
|
prev = path
|
||
|
yield path
|
||
|
path = os.path.dirname(path)
|
||
|
|
||
|
def _get_files(self) -> Iterator[str]:
|
||
|
"""Return paths to all requirements files"""
|
||
|
path = os.path.abspath(self.path)
|
||
|
if os.path.isfile(path):
|
||
|
path = os.path.dirname(path)
|
||
|
|
||
|
for path in self._get_parents(path): # noqa
|
||
|
yield from self._get_files_from_dir(path)
|
||
|
|
||
|
def _normalize_name(self, name: str) -> str:
|
||
|
"""Convert package name to module name
|
||
|
|
||
|
Examples:
|
||
|
Django -> django
|
||
|
django-haystack -> django_haystack
|
||
|
Flask-RESTFul -> flask_restful
|
||
|
"""
|
||
|
if self.mapping:
|
||
|
name = self.mapping.get(name.replace("-", "_"), name)
|
||
|
return name.lower().replace("-", "_")
|
||
|
|
||
|
def find(self, module_name: str) -> Optional[str]:
|
||
|
# required lib not installed yet
|
||
|
if not self.enabled:
|
||
|
return None
|
||
|
|
||
|
module_name, _sep, _submodules = module_name.partition(".")
|
||
|
module_name = module_name.lower()
|
||
|
if not module_name:
|
||
|
return None
|
||
|
|
||
|
for name in self.names:
|
||
|
if module_name == name:
|
||
|
return sections.THIRDPARTY
|
||
|
return None
|
||
|
|
||
|
|
||
|
class RequirementsFinder(ReqsBaseFinder):
|
||
|
exts = (".txt", ".in")
|
||
|
enabled = bool(parse_requirements)
|
||
|
|
||
|
def _get_files_from_dir(self, path: str) -> Iterator[str]:
|
||
|
"""Return paths to requirements files from passed dir."""
|
||
|
yield from self._get_files_from_dir_cached(path)
|
||
|
|
||
|
@classmethod
|
||
|
@lru_cache(maxsize=16)
|
||
|
def _get_files_from_dir_cached(cls, path: str) -> List[str]:
|
||
|
results = []
|
||
|
|
||
|
for fname in os.listdir(path):
|
||
|
if "requirements" not in fname:
|
||
|
continue
|
||
|
full_path = os.path.join(path, fname)
|
||
|
|
||
|
# *requirements*/*.{txt,in}
|
||
|
if os.path.isdir(full_path):
|
||
|
for subfile_name in os.listdir(full_path):
|
||
|
for ext in cls.exts:
|
||
|
if subfile_name.endswith(ext):
|
||
|
results.append(os.path.join(full_path, subfile_name))
|
||
|
continue
|
||
|
|
||
|
# *requirements*.{txt,in}
|
||
|
if os.path.isfile(full_path):
|
||
|
for ext in cls.exts:
|
||
|
if fname.endswith(ext):
|
||
|
results.append(full_path)
|
||
|
break
|
||
|
|
||
|
return results
|
||
|
|
||
|
def _get_names(self, path: str) -> Iterator[str]:
|
||
|
"""Load required packages from path to requirements file"""
|
||
|
yield from self._get_names_cached(path)
|
||
|
|
||
|
@classmethod
|
||
|
@lru_cache(maxsize=16)
|
||
|
def _get_names_cached(cls, path: str) -> List[str]:
|
||
|
result = []
|
||
|
|
||
|
with chdir(os.path.dirname(path)):
|
||
|
requirements = parse_requirements(path)
|
||
|
for req in requirements.values():
|
||
|
if req.name:
|
||
|
result.append(req.name)
|
||
|
|
||
|
return result
|
||
|
|
||
|
|
||
|
class PipfileFinder(ReqsBaseFinder):
|
||
|
enabled = bool(Pipfile)
|
||
|
|
||
|
def _get_names(self, path: str) -> Iterator[str]:
|
||
|
with chdir(path):
|
||
|
project = Pipfile.load(path)
|
||
|
for req in project.packages:
|
||
|
yield req.name
|
||
|
|
||
|
def _get_files_from_dir(self, path: str) -> Iterator[str]:
|
||
|
if "Pipfile" in os.listdir(path):
|
||
|
yield path
|
||
|
|
||
|
|
||
|
class DefaultFinder(BaseFinder):
|
||
|
def find(self, module_name: str) -> Optional[str]:
|
||
|
return self.config.default_section
|
||
|
|
||
|
|
||
|
class FindersManager:
|
||
|
_default_finders_classes: Sequence[Type[BaseFinder]] = (
|
||
|
ForcedSeparateFinder,
|
||
|
LocalFinder,
|
||
|
KnownPatternFinder,
|
||
|
PathFinder,
|
||
|
PipfileFinder,
|
||
|
RequirementsFinder,
|
||
|
DefaultFinder,
|
||
|
)
|
||
|
|
||
|
def __init__(
|
||
|
self, config: Config, finder_classes: Optional[Iterable[Type[BaseFinder]]] = None
|
||
|
) -> None:
|
||
|
self.verbose: bool = config.verbose
|
||
|
|
||
|
if finder_classes is None:
|
||
|
finder_classes = self._default_finders_classes
|
||
|
finders: List[BaseFinder] = []
|
||
|
for finder_cls in finder_classes:
|
||
|
try:
|
||
|
finders.append(finder_cls(config))
|
||
|
except Exception as exception:
|
||
|
# if one finder fails to instantiate isort can continue using the rest
|
||
|
if self.verbose:
|
||
|
print(
|
||
|
(
|
||
|
f"{finder_cls.__name__} encountered an error ({exception}) during "
|
||
|
"instantiation and cannot be used"
|
||
|
)
|
||
|
)
|
||
|
self.finders: Tuple[BaseFinder, ...] = tuple(finders)
|
||
|
|
||
|
def find(self, module_name: str) -> Optional[str]:
|
||
|
for finder in self.finders:
|
||
|
try:
|
||
|
section = finder.find(module_name)
|
||
|
if section is not None:
|
||
|
return section
|
||
|
except Exception as exception:
|
||
|
# isort has to be able to keep trying to identify the correct
|
||
|
# import section even if one approach fails
|
||
|
if self.verbose:
|
||
|
print(
|
||
|
f"{finder.__class__.__name__} encountered an error ({exception}) while "
|
||
|
f"trying to identify the {module_name} module"
|
||
|
)
|
||
|
return None
|