"""Watch parts of the file system for changes.""" from __future__ import annotations from typing import AbstractSet, Iterable, NamedTuple from mypy.fscache import FileSystemCache class FileData(NamedTuple): st_mtime: float st_size: int hash: str class FileSystemWatcher: """Watcher for file system changes among specific paths. All file system access is performed using FileSystemCache. We detect changed files by stat()ing them all and comparing hashes of potentially changed files. If a file has both size and mtime unmodified, the file is assumed to be unchanged. An important goal of this class is to make it easier to eventually use file system events to detect file changes. Note: This class doesn't flush the file system cache. If you don't manually flush it, changes won't be seen. """ # TODO: Watching directories? # TODO: Handle non-files def __init__(self, fs: FileSystemCache) -> None: self.fs = fs self._paths: set[str] = set() self._file_data: dict[str, FileData | None] = {} def dump_file_data(self) -> dict[str, tuple[float, int, str]]: return {k: v for k, v in self._file_data.items() if v is not None} def set_file_data(self, path: str, data: FileData) -> None: self._file_data[path] = data def add_watched_paths(self, paths: Iterable[str]) -> None: for path in paths: if path not in self._paths: # By storing None this path will get reported as changed by # find_changed if it exists. self._file_data[path] = None self._paths |= set(paths) def remove_watched_paths(self, paths: Iterable[str]) -> None: for path in paths: if path in self._file_data: del self._file_data[path] self._paths -= set(paths) def _update(self, path: str) -> None: st = self.fs.stat(path) hash_digest = self.fs.hash_digest(path) self._file_data[path] = FileData(st.st_mtime, st.st_size, hash_digest) def _find_changed(self, paths: Iterable[str]) -> AbstractSet[str]: changed = set() for path in paths: old = self._file_data[path] try: st = self.fs.stat(path) except FileNotFoundError: if old is not None: # File was deleted. changed.add(path) self._file_data[path] = None else: if old is None: # File is new. changed.add(path) self._update(path) # Round mtimes down, to match the mtimes we write to meta files elif st.st_size != old.st_size or int(st.st_mtime) != int(old.st_mtime): # Only look for changes if size or mtime has changed as an # optimization, since calculating hash is expensive. new_hash = self.fs.hash_digest(path) self._update(path) if st.st_size != old.st_size or new_hash != old.hash: # Changed file. changed.add(path) return changed def find_changed(self) -> AbstractSet[str]: """Return paths that have changes since the last call, in the watched set.""" return self._find_changed(self._paths) def update_changed(self, remove: list[str], update: list[str]) -> AbstractSet[str]: """Alternative to find_changed() given explicit changes. This only calls self.fs.stat() on added or updated files, not on all files. It believes all other files are unchanged! Implies add_watched_paths() for add and update, and remove_watched_paths() for remove. """ self.remove_watched_paths(remove) self.add_watched_paths(update) return self._find_changed(update)