fs_store.py (965B)
1 from pathlib import Path 2 from urllib.parse import urlparse, urlunparse 3 from .base_store import UnganaBaseStore 4 5 6 class UnganaFSStore(UnganaBaseStore): 7 def _resolve_base_path(self, uri: str) -> Path: 8 parsed = urlparse(uri) 9 path = parsed.path if parsed.scheme else uri 10 return Path(path).expanduser().resolve() 11 12 def put(self, base_uri: str, key: str, data: bytes) -> str: 13 base_path = self._resolve_base_path(base_uri) 14 target = base_path.joinpath(key) 15 target.parent.mkdir(parents=True, exist_ok=True) 16 17 with open(target, "wb") as f: 18 f.write(data) 19 20 return urlunparse(("file", "", str(target), "", "", "")) 21 22 def get(self, base_uri: str, key: str) -> bytes: 23 base_path = self._resolve_base_path(base_uri) 24 target = base_path.joinpath(key) 25 26 if not target.exists(): 27 raise FileNotFoundError(f"{target} does not exist") 28 29 return target.read_bytes()