Skip to content

Commit

Permalink
Refactor configuration parsing
Browse files Browse the repository at this point in the history
  • Loading branch information
geigerzaehler committed Nov 16, 2024
1 parent 15aebf6 commit d06724d
Showing 1 changed file with 106 additions and 68 deletions.
174 changes: 106 additions & 68 deletions beetsplug/alternatives.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@
import os.path
import shutil
import threading
from collections.abc import Callable, Iterable, Iterator, Sequence
from collections.abc import Callable, Iterator, Sequence
from concurrent import futures
from enum import Enum
from pathlib import Path
from typing import Literal

import beets
import confuse
Expand Down Expand Up @@ -68,19 +69,18 @@ def list_tracks(self, lib: Library, options: argparse.Namespace):
print_(format(item))

def alternative(self, name: str, lib: Library):
conf = self.config[name]
if not conf.exists():
config_raw = self.config[name]
if not config_raw.exists():
raise KeyError(name)

if conf["formats"].exists():
fmt = conf["formats"].as_str()
assert isinstance(fmt, str)
if fmt == "link":
return SymlinkView(self._log, name, lib, conf)
else:
return ExternalConvert(self._log, name, fmt.split(), lib, conf)
config = Config(name, config_raw, lib)

if config.type == "link":
return SymlinkView(self._log, lib, config)
elif config.formats:
return ExternalConvert(self._log, lib, config)
else:
return External(self._log, name, lib, conf)
return External(self._log, lib, config)


class AlternativesCommand(Subcommand):
Expand Down Expand Up @@ -152,46 +152,102 @@ def _get_all_options(self) -> Sequence[Never]:
return []


class Action(Enum):
ADD = 1
REMOVE = 2
WRITE = 3
MOVE = 4
SYNC_ART = 5
class Config:
collection_id: str

type: Literal["copy_convert"] | Literal["link"]
"""Determines whether item files are copied and/or converted or symlinked"""

class External:
def __init__(
self, log: logging.Logger, name: str, lib: Library, config: confuse.ConfigView
):
self._log = log
self.name = name
self.lib = lib
self.path_key = f"alt.{name}"
self.max_workers = int(str(beets.config["convert"]["threads"]))
self.parse_config(config)
directory: Path
"""Directory under which items in the collection are located."""

path_formats: Sequence[tuple[str, str]]
"""Formats that determine the path of items in the collection. See
<https://beets.readthedocs.io/en/stable/reference/pathformat.html>.
"""

formats: Sequence[str]
"""List of acceptable formats for the collection. If an item’s format is not
in this list the item is transcoded to the first format in the list."""

removable: bool
"""If true, the user is asked to confirm root directory creation."""

album_art_maxwidth: int | None
"""Maximum width of embedded album art. Larger art is resized."""

def __init__(self, collection_id: str, config: confuse.ConfigView, lib: Library):
self.collection_id = collection_id

if "formats" in config:
fmt = config["formats"].as_str()
assert isinstance(fmt, str)
if fmt == "link":
self.type = "link"
else:
self.type = "copy_convert"
self.formats = tuple(f.lower() for f in fmt.split())
else:
self.type = "copy_convert"
self.formats = ()

def parse_config(self, config: confuse.ConfigView):
if "paths" in config:
path_config = config["paths"]
else:
path_config = beets.config["paths"]
self.path_formats = get_path_formats(path_config)
query = config["query"].as_str()

query = config["query"].get(confuse.Optional(confuse.String(), default=""))
self.query, _ = parse_query_string(query, Item)

self.removable = config.get(dict).get("removable", True) # type: ignore
self.album_art_maxwidth = config.get(dict).get("album_art_maxwidth", None) # type: ignore
removable = config["removable"].get(confuse.TypeTemplate(bool, default=True))
assert isinstance(removable, bool)
self.removable = removable

album_art_maxwidth = config["album_art_maxwidth"].get(
confuse.Optional(confuse.Integer())
)
assert album_art_maxwidth is None or isinstance(album_art_maxwidth, int)
self.album_art_maxwidth = album_art_maxwidth

if "directory" in config:
dir = config["directory"].as_path()
assert isinstance(dir, Path)
else:
dir = Path(self.name)
dir = Path(collection_id)
if not dir.is_absolute():
dir = Path(str(self.lib.directory, "utf8")) / dir # type: ignore
dir = Path(str(lib.directory, "utf8")) / dir
self.directory = dir

link_type = config["link_type"].get(
confuse.Choice(
{
"relative": SymlinkType.RELATIVE,
"absolute": SymlinkType.ABSOLUTE,
},
default=SymlinkType.ABSOLUTE,
)
)
assert isinstance(link_type, SymlinkType)
self.link_type = link_type


class Action(Enum):
ADD = 1
REMOVE = 2
WRITE = 3
MOVE = 4
SYNC_ART = 5


class External:
def __init__(self, log: logging.Logger, lib: Library, config: Config):
self._log = log
self._config = config
self.lib = lib
self.path_key = f"alt.{config.collection_id}"
self.max_workers = int(str(beets.config["convert"]["threads"]))

def item_change_actions(
self, item: Item, actual: Path, dest: Path
) -> Sequence[Action]:
Expand Down Expand Up @@ -233,32 +289,32 @@ def _matched_item_action(self, item: Item) -> Sequence[Action]:
def _items_actions(self) -> Iterator[tuple[Item, Sequence[Action]]]:
matched_ids = set()
for album in self.lib.albums():
if self.query.match(album):
if self._config.query.match(album):
matched_items = album.items()
matched_ids.update(item.id for item in matched_items)

for item in self.lib.items():
if item.id in matched_ids or self.query.match(item):
if item.id in matched_ids or self._config.query.match(item):
yield (item, self._matched_item_action(item))
elif self._get_stored_path(item):
yield (item, [Action.REMOVE])

def ask_create(self, create: bool | None = None) -> bool:
if not self.removable:
if not self._config.removable:
return True
if create is not None:
return create

msg = (
f"Collection at '{self.directory}' does not exists. "
f"Collection at '{self._config.directory}' does not exists. "
"Maybe you forgot to mount it.\n"
"Do you want to create the collection? (y/n)"
)
return input_yn(msg, require=True)

def update(self, create: bool | None = None):
if not self.directory.is_dir() and not self.ask_create(create):
print_(f"Skipping creation of {self.directory}")
if not self._config.directory.is_dir() and not self.ask_create(create):
print_(f"Skipping creation of {self._config.directory}")
return

converter = self._converter()
Expand All @@ -272,7 +328,7 @@ def update(self, create: bool | None = None):
dest.parent.mkdir(parents=True, exist_ok=True)
path.rename(dest)
# beets types are confusing
util.prune_dirs(str(path.parent), root=str(self.directory)) # pyright: ignore
util.prune_dirs(str(path.parent), root=str(self._config.directory)) # pyright: ignore
self._set_stored_path(item, dest)
item.store()
path = dest
Expand Down Expand Up @@ -300,11 +356,11 @@ def update(self, create: bool | None = None):

def destination(self, item: Item) -> Path:
"""Returns the path for `item` in the external collection."""
path = item.destination(path_formats=self.path_formats, fragment=True)
path = item.destination(path_formats=self._config.path_formats, fragment=True)
# When using `fragment=True` the returned path is guaranteed to be a
# string.
assert isinstance(path, str)
return self.directory / path
return self._config.directory / path

def _set_stored_path(self, item: Item, path: Path):
item[self.path_key] = str(path)
Expand All @@ -326,7 +382,7 @@ def _remove_file(self, item: Item):
if path:
path.unlink(missing_ok=True)
# beets types are confusing
util.prune_dirs(str(path), root=str(self.directory)) # pyright: ignore
util.prune_dirs(str(path), root=str(self._config.directory)) # pyright: ignore
del item[self.path_key]

def _converter(self) -> "Worker":
Expand All @@ -348,7 +404,7 @@ def _sync_art(self, item: Item, path: Path):
self._log,
item,
album.artpath,
maxwidth=self.album_art_maxwidth,
maxwidth=self._config.album_art_maxwidth,
itempath=bytes(path),
)

Expand All @@ -357,18 +413,15 @@ class ExternalConvert(External):
def __init__(
self,
log: logging.Logger,
name: str,
formats: Iterable[str],
lib: Library,
config: confuse.ConfigView,
config: Config,
):
super().__init__(log, name, lib, config)
super().__init__(log, lib, config)
convert_plugin = convert.ConvertPlugin()
self._encode = convert_plugin.encode
self._embed = convert_plugin.config["embed"].get(bool)
formats = [f.lower() for f in formats]
self.formats = [convert.ALIASES.get(f, f) for f in formats]
self.convert_cmd, self.ext = convert.get_format(self.formats[0])
self._formats = [convert.ALIASES.get(f, f) for f in config.formats]
self.convert_cmd, self.ext = convert.get_format(self._formats[0])

@override
def _converter(self) -> "Worker":
Expand Down Expand Up @@ -401,7 +454,7 @@ def destination(self, item: Item) -> Path:
return dest

def _should_transcode(self, item: Item):
return item.format.lower() not in self.formats
return item.format.lower() not in self._formats


class SymlinkType(Enum):
Expand All @@ -410,21 +463,6 @@ class SymlinkType(Enum):


class SymlinkView(External):
@override
def parse_config(self, config: confuse.ConfigView):
if "query" not in config:
config["query"] = "" # This is a TrueQuery()
if "link_type" not in config:
# Default as absolute so it doesn't break previous implementation
config["link_type"] = "absolute"

self.relativelinks = config["link_type"].as_choice({
"relative": SymlinkType.RELATIVE,
"absolute": SymlinkType.ABSOLUTE,
})

super().parse_config(config)

@override
def item_change_actions(
self, item: Item, actual: Path, dest: Path
Expand Down Expand Up @@ -472,7 +510,7 @@ def _create_symlink(self, item: Item):
item_path = Path(str(item.path, "utf8"))
link = (
os.path.relpath(item_path, dest.parent)
if self.relativelinks == SymlinkType.RELATIVE
if self._config.link_type == SymlinkType.RELATIVE
else item_path
)
dest.symlink_to(link)
Expand Down

0 comments on commit d06724d

Please sign in to comment.