Source code for ged2doc.input

"""Module which handles input files.

This module is responsible for locating all files (GEDCOM data and images)
given the application inputs. Currently it handles two cases:

  - Input is specified as path to GEDCOM file, that file can contain names
    of image files that are either absolute or relative to directory
    containing GEDCOM file or some other directory. Program options can
    specify directory where images are located.
  - Input file is a ZIP archive that includes both GEDCOM file and files
    with images. Depending on how GEDCOM file and archive were prepared
    names of image files in GEDCOM file can be specified as absolute paths
    to their original location or relative paths to their common directory.

Additional issue to consider is that files can be prepared on a system
which is different from the system where the file is parsed. For example
GEDCOM file could be prepared on Windows machine and names of image files
could be given using Windows path convention (either absolute as
``C:\\Users\\JosephSmith\\Documents\\Pictures\\Family\\Tree\\Me.BMP``
or relative as ``Pictures\\Family\\Tree\\Me.BMP``) and later this GEDCOM
file could be copied to Linux host and processed using ``ged2doc`` package.
Files on Linux machine will have different absolute and possibly relative
paths (and definitely different path separator character).

In case of ZIP archive the names of images in GEDCOM file could be different
from the names in in the archive (e.g. image path in GEDCOM file
``C:\\Users\\JosephSmith\\Documents\\Pictures\\Family\\Tree\\Me.BMP`` could
be stored in ZIP archive as ``Pictures/Family/Tree/Me.BMP``).

Logic in this module is supposed to handle all those possible cases where
names of files in GEDCOM file could be different from their location on a
target storage system.

Typical use cases for GEDCOM file returned by this module is to be passed to
methods in :py:mod:`ged4py` package and that package expects true
filesystem-backed file which supports ``seek()`` and ``tell()`` methods.
Image files do not typically need support for these methods and are usually
read as a byte stream using ``read()`` method. This module returns seek-able
file object open in binary mode for GEDCOM file (meaning that temporary file
on disk may need to be created in some cases) and a "simple" binary stream
for images.
"""

__all__ = ["make_file_locator", "FileLocator", "MultipleMatchesError"]

import abc
import errno
import fnmatch
import io
import logging
import os
import shutil
import tempfile
import zipfile


_log = logging.getLogger(__name__)


[docs]class MultipleMatchesError(RuntimeError): """Class for exceptions generated when there is more than one file matching specified criteria. """ pass
[docs]class FileLocator(metaclass=abc.ABCMeta): """Abstract interface for file locator instances. """
[docs] @abc.abstractmethod def open_gedcom(self): """Returns file object for the input GEDCOM file. If no GEDCOM file is found `None` is returned. If more than one file is found than :py:exc:`MultipleMatchesError` exception is raised. Can throw other exceptions, e.g. if file cannot be open. Returned file object will be open in binary mode and will support ``seek()`` and ``tell()`` methods. Note that this may be a temporary file which will be deleted after file is closed. Returns ------- file File object open in binary mode supporting ``seek()`` and ``tell()`` methods. Raises ------ MultipleMatchesError Raised if more than one file file is found. """ raise NotImplementedError()
[docs] @abc.abstractmethod def open_image(self, name): """Returns open file object for the named image file. If image file is not found `None` is returned. If more than one matching file is found than :py:exc:`MultipleMatchesError` exception is raised. Can throw other exceptions if file cannot be open. Note that this file object may not support all operations (it may be an object inside zip archive for example) so you may need to copy it if you want full file protocol support. Parameters ---------- name : `str` Name of the image file to open. This can be relative or absolute path name. Usually this is the name that is stored in GEDCOM file and it can use separator character which is different from a system reading this file. Returns ------- image File object open in binary mode, only ``read()`` method is guaranteed to work. Raises ------ MultipleMatchesError Raised if more than one file is found. """ raise NotImplementedError("Method open_image() is not implemented")
class _Path: """Internal representation of the (relative) file path. In this representation path is a just a sequence of path components - zero or more folders and a file name. Parameters ---------- components : `list` [ `str` ] List of (unicode) strings representing path. dirname : `str`, optional Optional prefix directory. """ def __init__(self, components, dirname=None): self.components = components[:] self.dirname = dirname @classmethod def from_path(cls, path, dirname=None): """Construct instance of this type from full path name. Parameters ---------- path : `str` String representing path. dirname : `str`, optional Optional prefix directory. """ # Trouble here is that GEDCOM file can be prepared on different type # of system with different path separator. First try to convert path # into canonical form using slashes as separators and stripping # Windows drive. if len(path) > 2 and path[0].isalpha() and path[1] == ':': # strip windows drive name path = path[2:] path = path.replace('\\', '/').lstrip('/') # split file name into components return cls(path.split('/'), dirname) def match_rank(self, other): """Returns match "rank" with the other path. Rank is a count of identical matching components at the end of paths. Parameters ---------- other : `_Path` Path instance to match. Returns ------- rank : `int` Match rank. """ if self.components[-1] != other.components[-1]: return 0 rank = 0 for comp1, comp2 in zip(reversed(self.components), reversed(other.components)): if comp1 != comp2: break rank += 1 return rank def os_path(self): """Return full path of the file as a string. Returns ------- path : `str` """ if self.dirname: return os.path.join(self.dirname, *self.components) else: return os.path.join(*self.components) def __str__(self): return "/".join(self.components) class _FileSearch(metaclass=abc.ABCMeta): """Implementation of recursive file search in a folder tree. This is an abstract class which can match files but does not know how to build folder tree. Sub classes must implement `_paths()` method which returns the list of file "paths" to match. """ _path_cache = None @staticmethod def _enc(name): """If string is Unicode encode it into UTF-8""" if isinstance(name, str): name = name.encode("utf_8") return name @staticmethod def _enc_list(path): """If strings are Unicode encode them into UTF-8""" return [_FileSearch._enc(comp) for comp in path] def find_file(self, name): """Returns file path for the named file. Parameters ---------- name : `str` File name to search, this is usually the path as it comes directly from GEDCOM file. """ path = _Path.from_path(name) # for each match assign its rank matches = [] max_rank = 1 # need at least basename match for cand in self.paths: rank = path.match_rank(cand) # _log.debug("find_file: %s and %s: rank=%s", path, cand, rank) if rank > max_rank: matches = [cand] max_rank = rank elif rank == max_rank: matches += [cand] if not matches: _log.debug("_FileSearch.find_file: nothing found") return elif len(matches) > 1: _log.debug("_FileSearch.find_file: many files found: %s", matches) raise MultipleMatchesError('More than one file matches name ' + str(path) + ": " + ', '.join(str(m) for m in matches)) else: _log.debug("_FileSearch.find_file: found: %s", matches[0]) return matches[0] @property def paths(self): """The list of all path names (_Path instances) to use for matching. """ if self._path_cache is None: self._path_cache = self._paths() return self._path_cache @abc.abstractmethod def _paths(self): """Return list of file paths (_Path instances), must be implemented in a subclass. Returns ------- paths : `list` [ `_Path` ] """ raise NotImplementedError() class _FSFileSearch(_FileSearch): """Implementation of recursive file search on file system. One complication here is encoding, `os.listdir` is returning stings/bytes of the same type as its argument (self._path). To avoid complications we convert self._path to unicode using UTF-8 encoding. This could fail in some cases. Parameters ---------- path : `str` Directory on a file system to search for files. """ def __init__(self, path): if path is not None and not isinstance(path, str): path = path.decode("utf_8") self._path = path def _paths(self): # docstring inherited from _FileSearch class _log.debug("_FSFileSearch.find_file: recursively scan " "directory %r", self._path) if self._path is None: # do not search return [] return list(self._scan(self._path)) def _scan(self, path, current=None): """Recursively scan folder, return each file path as a list of its components. Parameters ---------- path : `str` Filesystem directory to scan. current : `list` [ `str` ], optional Current context, to support recursion. Yields ------ path : `_Path` """ for fname in os.listdir(path): fpath = os.path.join(path, fname) components = (current or []) + [fname] if os.path.isdir(fpath): # scan recursively for p in self._scan(fpath, components): yield p elif os.path.isfile(fpath): p = _Path(components, self._path) # _log.debug("_scan: %s", p) yield p class _ZIPFileSearch(_FileSearch): """Implementation of recursive file search on file system. Parameters ---------- toc : `list` [ `str` ] List of entries in ZIP archive. """ def __init__(self, toc): self._toc = toc def _paths(self): # docstring inherited from _FileSearch class paths = [] for entry in self._toc: paths.append(_Path([comp for comp in entry.split('/') if comp])) return paths class _FSLocator(FileLocator): """Implementation of `FileLocator` interface which can find files located on a regular file system. Parameters ---------- input_file : `str` Path of the input GEDCOM file or file object. If argument is a file object then it must support ``seek()`` method and be open in a binary mode. image_path : `str` Directory on a file system where images are found. Images could be located in sub-directories of the given path. If ``image_path`` is ``None`` then file system is not searched for files. If ``image_path`` is an empty string then current directory is searched. """ def __init__(self, input_file, image_path=None): self._input_file = input_file if image_path is None: # use parent folder of GEDCOM file for image search if hasattr(input_file, 'read'): # it's probably a file image_path = getattr(input_file, "name", None) else: image_path = input_file if image_path: image_path = os.path.dirname(os.path.abspath(image_path)) _log.debug("_FSLocator: use image folder: %r", image_path) self._image_path = image_path self._fsearch = _FSFileSearch(image_path) def open_gedcom(self): # docstring inherited from base class _log.debug("_FSLocator.open_gedcom") if hasattr(self._input_file, 'read'): # it's likely a file return self._input_file return io.open(self._input_file, 'rb') def open_image(self, name): # docstring inherited from base class # `name` could be an absolute or relative path name, usually this is # the name given in GEDCOM file. GEDCOM file can be prepared on a # a different type of system where file names can use different # separators. This method first tries to open the file using argument # as a file name, if that does not succeed then it strips folder part # from file name and tries to search recursively for that file name # in the configured folder. _log.debug("_FSLocator.open_image: find image %s", name) # first, if file name looks like absolute path (on current OS) # try unmodified name if os.path.isabs(name): try: _log.debug('_ZipLocator.open_image: Trying FS path %s', name) return open(name, 'rb') except IOError: pass else: # if path looks like relative path try to open it relative to image # search path if self._image_path: try: path = os.path.join(self._image_path, name) _log.debug('_ZipLocator.open_image: Trying FS path %s', name) return open(path, 'rb') except IOError: pass # Otherwise try to search in the image folder. fname = self._fsearch.find_file(name) if fname is not None: return open(fname.os_path(), 'rb') class _ZipLocator(FileLocator): """Implementation of `FileLocator` interface which can find files located in zip archive. Parameters ---------- input_file : `str` Path of the input ZIP file or file object. file_name_pattern : `str` Name pattern (in ``fnmatch`` syntax) to search for a GEDCOM file. image_path : `str` Directory on a filesystem where images are found. Images could be located in sub-directories of the given path. Images are searched inside ZIP archive and then in ``image_path``. If ``image_path`` is ``None`` then filesystem is not searched for files. If ``image_path`` is an empty string then current directory is searched. """ def __init__(self, input_file, file_name_pattern, image_path): self._zip = zipfile.ZipFile(input_file, 'r') self._toc = self._zip.namelist() self._pattern = file_name_pattern self._zipsearch = _ZIPFileSearch(self._toc) self._fsearch = _FSFileSearch(image_path) def open_gedcom(self): # docstring inherited from base class matches = [f for f in self._toc if fnmatch.fnmatch(f, self._pattern)] if not matches: return None if len(matches) > 1: raise MultipleMatchesError('Multiple matching files found in ' 'archive: ' + ' '.join(matches)) member = matches[0] _log.debug("_ZipLocator.open_gedcom: %r", member) # wee need a file on disk which supports seek, open in binary mode fobj = tempfile.NamedTemporaryFile("w+b", suffix=os.path.basename(member)) with self._zip.open(member, 'r') as src: shutil.copyfileobj(src, fobj) fobj.seek(0) return fobj def open_image(self, name): # docstring inherited from base class _log.debug("_ZipLocator.open_image: find image %s", name) _log.debug('_ZipLocator.open_image: Trying archive name %r', name) fname = self._zipsearch.find_file(name) if fname: _log.debug("_ZipLocator.open_image: found in ZIP: %r", fname) return self._zip.open(str(fname), 'r') # if file name looks like absolute path (on current OS) # try unmodified name if os.path.isabs(name): try: _log.debug('_ZipLocator.open_image: Trying FS path %s', name) return open(name, 'rb') except IOError: pass # search on filesystem _log.debug('_ZipLocator.open_image: Trying FS name %s', name) fname = self._fsearch.find_file(name) if fname is not None: return open(fname.os_path(), 'rb')
[docs]def make_file_locator(input_file, file_name_pattern, image_path): """Create and return file locator instance For a given input file (which can be GEDCOM file or ZIP archive) return corresponding file locator object (instance of :py:class:`FileLocator` type). Parameters ---------- input_file Path of the input file or file object, can be a ZIP archive or a GEDCOM file. If argument is a file object then it must support ``seek()`` method and be open in a binary mode. file_name_pattern : `str` If input file is a ZIP archive then this pattern is used to search for a GEDCOM file in archive. Could be ``"*.ged"`` for example or can include more specific pattern. image_path : `str` Directory on a filesystem where images are found. Images could be located in sub-directories of the given path. If ``file_name`` is a ZIP archive then images are searched inside ZIP archive and then in ``image_path``. If ``image_path`` is ``None`` then filesystem is not searched for files. If ``image_path`` is an empty string then current directory is searched. Returns ------- locator : `FileLocator` File locator instance. Raises ------ OSError Raised if file is not found. AttributeError Raised if file object is given as input file but it does not support ``seek()`` method. """ if zipfile.is_zipfile(input_file): return _ZipLocator(input_file, file_name_pattern, image_path) elif hasattr(input_file, 'read'): if not hasattr(input_file, 'seek'): raise AttributeError('File object has no `seek` attribute') input_file.seek(0) return _FSLocator(input_file, image_path) elif os.path.exists(input_file): return _FSLocator(input_file, image_path) else: raise OSError(errno.ENOENT, input_file)