Source code for webdav4.client

"""Client for the webdav."""
import locale
import shutil
import threading
from contextlib import contextmanager, suppress
from http import HTTPStatus
from io import TextIOWrapper
from typing import (
    TYPE_CHECKING,
    Any,
    BinaryIO,
    Callable,
    Dict,
    Iterator,
    List,
    Optional,
    Set,
    TextIO,
    TypeVar,
    Union,
    cast,
)

from .callback import wrap_file_like
from .fs_utils import peek_filelike_length
from .func_utils import wrap_fn
from .http import Client as HTTPClient
from .http import HTTPStatusError
from .http import Method as HTTPMethod
from .multistatus import (
    MultiStatusResponseError,
    Response,
    parse_multistatus_response,
    prepare_propfind_request_data,
)
from .retry import retry as _retry
from .stream import IterStream, read_chunks
from .urls import URL, join_url

if TYPE_CHECKING:
    from datetime import datetime
    from os import PathLike
    from typing import AnyStr

    from .multistatus import DAVProperties, MultiStatusResponse
    from .types import AuthTypes, HeaderTypes, HTTPResponse, URLTypes

_T = TypeVar("_T")


DEFAULT_CHUNK_SIZE = 2**22


def _prepare_result_info(
    response: Response, base_url: URL, detail: bool = True
) -> Union[str, Dict[str, Any]]:
    """Transform response to a dictionary/str for info/ls."""
    rel = response.path_relative_to(base_url)
    if not detail:
        return rel
    return {
        "name": rel,
        "href": response.href,
        **response.properties.as_dict(),
    }


[docs]class ClientError(Exception): """Custom exception thrown by the Client.""" def __init__(self, msg: str) -> None: """Instantiate exception with a msg.""" self.msg: str = msg super().__init__(msg) def __str__(self) -> str: """Provide str repr of the msg.""" return self.msg
[docs]class ResourceConflict(ClientError): """Raised when there was conflict during the operation (got 409)."""
[docs]class ForbiddenOperation(ClientError): """Raised when the operation was forbidden (got 403)."""
[docs]class ResourceAlreadyExists(ClientError): """Error returned if the resource already exists.""" def __init__(self, path: str) -> None: """Instantiate exception with the path that already exists.""" self.path = path super().__init__(f"The resource {path} already exists")
[docs]class InsufficientStorage(ClientError): """Error when the resource does not exist on the server.""" def __init__(self, path: str) -> None: """Instantiate exception with the path for which the request failed.""" self.path = path super().__init__("Insufficient Storage on the server")
[docs]class BadGatewayError(ClientError): """Error when bad gateway error is thrown.""" def __init__(self) -> None: """Raised when 502 status code is raised by the server.""" msg = "The destination server may have refused to accept the resource" super().__init__(msg)
[docs]class ResourceLocked(ClientError): """Error raised when the resource is locked."""
[docs]class ResourceNotFound(ClientError): """Error when the resource does not exist on the server.""" def __init__(self, path: str) -> None: """Instantiate exception with path that does not exist.""" self.path = path super().__init__( f"The resource {path} could not be found in the server" )
[docs]class HTTPError(ClientError): """Custom Exception for our HTTPStatusError.""" def __init__(self, response: "HTTPResponse") -> None: """Instantiate exception with the failed response.""" self.response = response self.status_code = response.status_code self.request = response.request super().__init__( f"received {self.status_code} ({self.response.reason_phrase})" )
[docs]class IsAResourceError(ClientError): """Exception thrown when the path is a resource. Could be thrown when the collection is expected. """ def __init__(self, path: str, msg: str = "") -> None: """Initialize with the path and the appropriate message.""" self.path: str = path super().__init__(f"{path} is a collection. {msg}")
[docs]class IsACollectionError(ClientError): """Exception thrown when the path is a collection. Could be thrown when the resource/non-collection is expected. """ def __init__(self, path: str, msg: str = "") -> None: """Initialize with the path and the appropriate message.""" self.path: str = path super().__init__(f"{path} is a collection. {msg}")
[docs]class MultiStatusError(ClientError): """Wrapping MultiStatusResponseError with ClientError."""
[docs]class FeatureDetection: """Detect features in the webdav resources. Mostly used for detecting support for Accept-Ranges as ownCloud/NextCloud don't advertise support for it in GET requests. """ supports_ranges: bool dav_compliances: Set[str] def __init__( self, options_response: Optional["HTTPResponse"] = None ) -> None: """Initialize with the given response.""" dav_compliances = set() supports_ranges = False if options_response: dav_header = options_response.headers.get("dav", "") dav_compliances = {f.strip() for f in dav_header.split(",")} supports_ranges = ( options_response.headers.get("accept-ranges") == "bytes" ) self.dav_compliances = dav_compliances self.supports_ranges = supports_ranges
[docs]class Client: """Provides higher level APIs for interacting with Webdav server.""" def __init__( self, base_url: "URLTypes", auth: Optional["AuthTypes"] = None, http_client: Optional["HTTPClient"] = None, retry: Union[Callable[[Callable[[], _T]], _T], bool] = True, chunk_size: int = DEFAULT_CHUNK_SIZE, **client_opts: Any, ) -> None: """Instantiate client for webdav. Examples: >>> client = Client("https://webdav.example.org") >>> client.ls("/") Args: base_url: base url of the Webdav server auth: Auth for the webdav. Auth can be any of the following: - a tuple of (user, password) - None if no auth is required. Refer to `Customizing Authentication \ <https://www.python-httpx.org/advanced/ \ #customizing-authentication>`_ for more options. http_client: http client to use instead, useful in mocking (when extending, it is expected to have implemented additional verbs from webdav) retry: disable or enable retry on client. Can also pass a callable to handle it there. Some well-known errors are handled and retried a few times with the backoff. All of the following keyword arguments are passed along to the `httpx <https://www.python-httpx.org/api/#client>`_, the http library this client is built on. Keyword Args: headers: Dict. of HTTP headers to include when sending requests cookies: Dict. of Cookie items to include when sending requests verify: SSL certificates used to verify the identity of requested hosts. Can be any of: - True (uses default CA bundle), - a path to an SSL certificate file, - False (disable verification), or - a :py:class:`ssl.SSLContext` cert: An SSL certificate used by the requested host to authenticate the client. Either a path to an SSL certificate file, or two-tuple of (certificate file, key file), or a three-tuple of (certificate file, key file, password). proxies: A dictionary mapping proxy keys to proxy URLs timeout: The timeout configuration to use when sending requests limits: The limits configuration to use max_redirects: The maximum number of redirect responses that should be followed trust_env: Enables or disables usage of environment variables for configuration """ client_opts.update({"base_url": base_url, "auth": auth}) self.http: HTTPClient = http_client or HTTPClient(**client_opts) self.base_url = URL(base_url) self.with_retry = retry if callable(retry) else _retry(retry) self._detected_features: Optional[FeatureDetection] = None self._detect_feature_lock = threading.RLock() self.chunk_size = chunk_size @property def detected_features(self) -> FeatureDetection: """Feature detection for the server.""" if not self._detected_features: with self._detect_feature_lock: # a lot of threads might be stuck on thread lock # and if one is done with it, it means we already # have it set, so we should look for it rather than # sending out a request. if self._detected_features: # pragma: no cover return self._detected_features resp = None with suppress(Exception): resp = self.http.options(self.base_url) self._detected_features = FeatureDetection(resp) return self._detected_features
[docs] def options(self, path: str = "") -> Set[str]: """Returns features detected in the webdav server.""" resp = self.http.options(path) detected_features = FeatureDetection(resp) return detected_features.dav_compliances
[docs] def join_url(self, path: str, add_trailing_slash: bool = False) -> URL: """Join resource path with base url of the webdav server.""" return join_url( self.base_url, path, add_trailing_slash=add_trailing_slash )
[docs] def propfind( self, path: str, data: Optional[str] = None, headers: Optional["HeaderTypes"] = None, follow_redirects: bool = False, ) -> "MultiStatusResponse": """Returns properties of the specific resource by propfind request.""" call = wrap_fn( self._request, HTTPMethod.PROPFIND, path, content=data, headers=headers, follow_redirects=follow_redirects, ) http_resp = self.with_retry(call) return parse_multistatus_response(http_resp)
[docs] def get_props( self, path: str, name: Optional[str] = None, namespace: Optional[str] = None, data: Optional[str] = None, ) -> "DAVProperties": """Returns properties of a resource by doing a propfind request. Can also selectively request the properties by passing name or data. """ data = data or prepare_propfind_request_data(name, namespace) headers = {"Content-Type": "application/xml"} if data else {} result = self.propfind(path, headers=headers, data=data) response = result.get_response_for_path(self.base_url.path, path) return response.properties
[docs] def get_property( self, path: str, name: str, namespace: Optional[str] = None ) -> Any: """Returns appropriate property from the propfind response. Also supports getting named properties (for now restricted to a single string with the given namespace) """ props = self.get_props(path, name=name, namespace=namespace) return getattr(props, name, "")
[docs] def set_property(self) -> None: """Setting additional property to a resource."""
def _request( self, method: str, path: str, add_trailing_slash: bool = False, **kwargs: Any, ) -> "HTTPResponse": """Internal method for sending request to the server. It handles joining path correctly and checks for common http errors. """ url = self.join_url(path, add_trailing_slash=add_trailing_slash) http_resp = self.http.request(method, url, **kwargs) if http_resp.status_code == HTTPStatus.NOT_FOUND: raise ResourceNotFound(path) if http_resp.status_code == HTTPStatus.INSUFFICIENT_STORAGE: raise InsufficientStorage(path) if http_resp.status_code == HTTPStatus.BAD_GATEWAY: raise BadGatewayError try: http_resp.raise_for_status() except HTTPStatusError as exc: raise HTTPError(http_resp) from exc return http_resp
[docs] def request(self, method: str, path: str, **kwargs: Any) -> "HTTPResponse": """Sends request to a server with given method and path. Also checks for Multistatus response and other http errors. """ http_resp = self._request(method, path, **kwargs) if http_resp.status_code == HTTPStatus.MULTI_STATUS: # if it's 207, it's most likely an error # or, a partial success (however you see it). # except for the propfind, for which we use `_request` directly) # in the above `propfind` function. result = parse_multistatus_response(http_resp) try: result.raise_for_status() except MultiStatusResponseError as exc: raise MultiStatusError(exc.msg) from exc return http_resp
[docs] def move( self, from_path: str, to_path: str, overwrite: bool = False ) -> None: """Move resource to a new destination (with or without overwriting).""" return self._transfer( HTTPMethod.MOVE, from_path, to_path, overwrite=overwrite )
def _transfer( self, operation: str, from_path: str, to_path: str, overwrite: bool, depth: Union[int, str] = "infinity", ) -> None: """Transfer a resource by copying/moving from a path to the other.""" assert operation in {HTTPMethod.MOVE, HTTPMethod.COPY} to_url = self.join_url(to_path) headers = { "Destination": str(to_url), "Overwrite": "T" if overwrite else "F", "Depth": str(depth), } call = wrap_fn(self.request, operation, from_path, headers=headers) try: self.with_retry(call) except HTTPError as exc: if exc.status_code == HTTPStatus.FORBIDDEN: msg = "the source and the destination could be the same" raise ForbiddenOperation(msg) from exc if exc.status_code == HTTPStatus.CONFLICT: msg = ( "there was a conflict when trying to " f"{operation.lower()} the resource" ) raise ResourceConflict(msg) from exc if exc.status_code == HTTPStatus.PRECONDITION_FAILED: raise ResourceAlreadyExists(to_path) from exc if exc.status_code == HTTPStatus.LOCKED: msg = "the source or the destination resource is locked" raise ResourceLocked(msg) from exc raise
[docs] def copy( self, from_path: str, to_path: str, depth: Union[int, str] = "infinity", overwrite: bool = False, ) -> None: """Copy resource.""" return self._transfer( HTTPMethod.COPY, from_path, to_path, depth=depth, overwrite=overwrite, )
[docs] def mkdir(self, path: str) -> None: """Create a collection.""" call = wrap_fn( self.request, HTTPMethod.MKCOL, path, add_trailing_slash=True ) try: http_resp = self.with_retry(call) except HTTPError as exc: if exc.status_code == HTTPStatus.METHOD_NOT_ALLOWED: raise ResourceAlreadyExists(path) from exc if exc.status_code == HTTPStatus.FORBIDDEN: msg = ( "the server does not allow creation in the namespace" "or cannot accept members" ) raise ForbiddenOperation(msg) from exc if exc.status_code == HTTPStatus.CONFLICT: msg = "parent of the collection does not exist" raise ResourceConflict(msg) from exc raise assert http_resp.status_code in (HTTPStatus.OK, HTTPStatus.CREATED)
[docs] def remove(self, path: str) -> None: """Remove a resource.""" call = wrap_fn(self.request, HTTPMethod.DELETE, path) try: self.with_retry(call) except HTTPError as exc: if exc.status_code == HTTPStatus.LOCKED: raise ResourceLocked("the resource is locked") from exc raise
[docs] def ls( # pylint: disable=invalid-name self, path: str, detail: bool = True, allow_listing_resource: bool = True, ) -> List[Union[str, Dict[str, Any]]]: """List items in a resource/collection. Args: path: Path to the resource detail: If detail=True, additional information is returned in a dictionary allow_listing_resource: If True and path is a resource (non-collection), ls will return the file entry/details. Otherwise, it will raise an error. """ result = self.propfind( path, headers={"Depth": "1"}, follow_redirects=True ) responses = result.responses url = self.join_url(path) response = responses.get(url.path) if response: typ = response.properties.resource_type if typ == "file" and not allow_listing_resource: raise IsAResourceError( path, "cannot list from a resource itself" ) if typ == "directory": responses.pop(url.path) else: # pragma: no cover assert not response # response should always be there return [ _prepare_result_info(resp, self.base_url, detail) for resp in responses.values() ]
[docs] def info(self, path: str) -> Dict[str, Any]: """Returns information about the path itself.""" result = self.propfind(path, headers={"Depth": "1"}) responses = result.responses url = self.join_url(path) details = _prepare_result_info( responses[url.path], self.base_url, detail=True ) assert not isinstance(details, str) return details
[docs] def exists(self, path: str) -> bool: """Checks whether the resource with the given path exists or not.""" try: self.propfind(path) except ResourceNotFound: return False return True
[docs] def isdir(self, path: str) -> bool: """Checks whether the resource with the given path is a directory.""" return bool(self.get_props(path).collection)
[docs] def isfile(self, path: str) -> bool: """Checks whether the resource with the given path is a file.""" return not self.isdir(path)
[docs] def content_length(self, path: str) -> Optional[int]: """Returns content-length of the resource with the given path.""" return self.get_props(path, "content_length").content_length
[docs] def created(self, path: str) -> Optional["datetime"]: """Returns creationdate of the resource with the given path.""" return self.get_props(path, "created").created
[docs] def modified(self, path: str) -> Optional["datetime"]: """Returns getlastmodified of the resource with the given path.""" return self.get_props(path, "modified").modified
[docs] def etag(self, path: str) -> Optional[str]: """Returns etag of the resource with the given path.""" return self.get_props(path, "etag").etag
[docs] def content_type(self, path: str) -> Optional[str]: """Returns content type of the resource with the given path.""" return self.get_props(path, "content_type").content_type
[docs] def content_language(self, path: str) -> Optional[str]: """Returns content language of the resource with the given path.""" return self.get_props(path, "content_language").content_language
[docs] @contextmanager def open( self, path: str, mode: str = "r", encoding: Optional[str] = None, chunk_size: Optional[int] = None, ) -> Iterator[Union[TextIO, BinaryIO]]: """Returns file-like object to a resource.""" if self.isdir(path): raise IsACollectionError(path, "Cannot open a collection") assert mode in {"r", "rt", "rb"} with IterStream( self, self.join_url(path), chunk_size=chunk_size or self.chunk_size, ) as buffer: buff = cast(BinaryIO, buffer) if mode == "rb": yield buff else: encoding = ( encoding or buffer.encoding or locale.getpreferredencoding(False) ) yield TextIOWrapper(buff, encoding=encoding)
[docs] def download_fileobj( self, from_path: str, file_obj: BinaryIO, callback: Optional[Callable[[int], Any]] = None, chunk_size: Optional[int] = None, ) -> None: """Write stream from path to given file object.""" with self.open( from_path, mode="rb", chunk_size=chunk_size ) as remote_obj: # TODO: fix typings for open to always return BinaryIO on mode=rb remote_obj = cast(BinaryIO, remote_obj) wrapped = wrap_file_like(file_obj, callback, method="write") shutil.copyfileobj(remote_obj, wrapped)
[docs] def download_file( self, from_path: str, to_path: "PathLike[AnyStr]", chunk_size: Optional[int] = None, callback: Optional[Callable[[int], Any]] = None, ) -> None: """Download file from remote path to local path.""" with open(to_path, mode="wb") as fobj: self.download_fileobj( from_path, fobj, callback=callback, chunk_size=chunk_size )
[docs] def upload_file( self, from_path: "PathLike[AnyStr]", to_path: str, overwrite: bool = False, chunk_size: Optional[int] = None, callback: Optional[Callable[[int], Any]] = None, ) -> None: """Upload file from local path to a given remote path.""" with open(from_path, mode="rb") as fobj: self.upload_fileobj( fobj, to_path, overwrite=overwrite, chunk_size=chunk_size, callback=callback, )
[docs] def upload_fileobj( self, file_obj: BinaryIO, to_path: str, overwrite: bool = False, callback: Optional[Callable[[int], Any]] = None, chunk_size: Optional[int] = None, size: Optional[int] = None, ) -> None: """Upload file from file object to given path.""" # we try to avoid chunked transfer as much as possible # so we try to use size as a hint if provided. # else, we will try to find that out from the file object # if we are not successful in that, we gracefully fallback # to the chunked encoding. if size is None: size = peek_filelike_length(file_obj) headers = {"Content-Length": str(size)} if size is not None else None if not overwrite and self.exists(to_path): raise ResourceAlreadyExists(to_path) wrapped = wrap_file_like(file_obj, callback) content = read_chunks( wrapped, chunk_size=chunk_size or self.chunk_size ) http_resp = self.request( HTTPMethod.PUT, to_path, content=content, headers=headers ) http_resp.raise_for_status()