Source code for cubicweb.devtools

# copyright 2003-2016 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact https://www.logilab.fr/ -- mailto:contact@logilab.fr
#
# This file is part of CubicWeb.
#
# CubicWeb is free software: you can redistribute it and/or modify it under the
# terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 2.1 of the License, or (at your option)
# any later version.
#
# CubicWeb is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with CubicWeb.  If not, see <https://www.gnu.org/licenses/>.
"""Test tools for cubicweb"""

import atexit
import os
import sys
import errno
import logging
import shutil
import glob
import shlex
import subprocess
import tempfile
import getpass
from hashlib import sha1  # pylint: disable=E0611
from os.path import abspath, join, exists, split, isdir, dirname
from functools import partial
import pickle

import filelock

from logilab.common.decorators import cached, clear_cache

from cubicweb import ExecutionError
from cubicweb import schema, cwconfig
from cubicweb.server.serverconfig import ServerConfiguration
from cubicweb.web.webconfig import WebConfigurationBase

# db auto-population configuration #############################################

SYSTEM_ENTITIES = (
    schema.SCHEMA_TYPES
    | schema.INTERNAL_TYPES
    | schema.WORKFLOW_TYPES
    | set(
        (
            "CWGroup",
            "CWUser",
        )
    )
)
SYSTEM_RELATIONS = (
    schema.META_RTYPES
    | schema.WORKFLOW_RTYPES
    | schema.WORKFLOW_DEF_RTYPES
    | schema.SYSTEM_RTYPES
    | schema.SCHEMA_TYPES
    | set(("primary_email",))  # deducted from other relations
)

# content validation configuration #############################################

# validators are used to validate (XML, DTD, whatever) view's content
# validators availables are :
#  'dtd' : validates XML + declared DTD
#  'xml' : guarantees XML is well formed
#  None : do not try to validate anything

# {'vid': validator}
VIEW_VALIDATORS = {}


# cubicweb test configuration ##################################################

BASE_URL = "http://testing.fr/cubicweb/"

DEFAULT_SOURCES = {
    "system": {
        "adapter": "native",
        "db-encoding": "UTF-8",
        "db-user": "admin",
        "db-password": "gingkow",
        "db-name": "tmpdb",
        "db-driver": "sqlite",
        "db-host": None,
    },
    "admin": {
        "login": "admin",
        "password": "gingkow",
    },
}
DEFAULT_PSQL_SOURCES = DEFAULT_SOURCES.copy()
DEFAULT_PSQL_SOURCES["system"] = DEFAULT_SOURCES["system"].copy()
DEFAULT_PSQL_SOURCES["system"]["db-driver"] = "postgres"
DEFAULT_PSQL_SOURCES["system"]["db-user"] = getpass.getuser()
DEFAULT_PSQL_SOURCES["system"]["db-password"] = None
# insert a dumb value as db-host to avoid unexpected connection to local server
DEFAULT_PSQL_SOURCES["system"]["db-host"] = "REPLACEME"


def turn_repo_off(repo):
    """Idea: this is less costly than a full re-creation of the repo object.
    off:
    * session are closed,
    * cnxsets are closed
    * system source is shutdown
    """
    if not repo._needs_refresh:
        for cnxset in repo.cnxsets:
            cnxset.close(True)
        repo.system_source.shutdown()
        repo._needs_refresh = True
        repo._has_started = False


def turn_repo_on(repo):
    """Idea: this is less costly than a full re-creation of the repo object.
    on:
    * cnxsets are connected
    * cache are cleared
    """
    if repo._needs_refresh:
        for cnxset in repo.cnxsets:
            cnxset.reconnect()
        repo.clear_caches()
        repo._needs_refresh = False


class TestServerConfiguration(ServerConfiguration):
    mode = "test"
    read_instance_schema = False
    init_repository = True
    skip_db_create_and_restore = False
    default_sources = DEFAULT_SOURCES

    def __init__(self, appid, test_module_file, log_threshold=logging.CRITICAL + 10):
        # must be set before calling parent __init__
        apphome = abspath(join(dirname(test_module_file), appid))
        self._apphome = apphome
        super(TestServerConfiguration, self).__init__(appid)
        self.init_log(log_threshold, force=True)
        # need this, usually triggered by cubicweb-ctl
        self.load_cwctl_plugins()
        self.test_module_file = test_module_file

    # By default anonymous login are allow but some test need to deny of to
    # change the default user. Set it to None to prevent anonymous login.
    anonymous_credential = ("anon", "anon")

    def anonymous_user(self):
        if not self.anonymous_credential:
            return None, None
        return self.anonymous_credential

    def set_anonymous_allowed(self, allowed, anonuser="anon"):
        if allowed:
            self.anonymous_credential = (anonuser, anonuser)
        else:
            self.anonymous_credential = None

    @property
    def apphome(self):
        return self._apphome

    appdatahome = apphome

    def load_configuration(self, **kw):
        super(TestServerConfiguration, self).load_configuration(**kw)
        # no undo support in tests
        self.global_set_option("undo-enabled", "n")

    def main_config_file(self):
        """return instance's control configuration file"""
        return join(self.apphome, "%s.conf" % self.name)

    def bootstrap_cubes(self):
        try:
            super(TestServerConfiguration, self).bootstrap_cubes()
        except IOError:
            # no cubes
            self.init_cubes(())

    def read_sources_file(self):
        """By default, we run tests with the sqlite DB backend.  One may use its
        own configuration by just creating a 'sources' file in the test
        directory from which tests are launched or by specifying an alternative
        sources file using self.sourcefile.
        """
        if getattr(self, "sourcefile", None):
            raise Exception(
                "sourcefile isn't supported anymore, specify your database "
                "configuration using proper configuration class (e.g. "
                "PostgresApptestConfiguration)"
            )
        try:
            super(TestServerConfiguration, self).read_sources_file()
            raise Exception(
                "test configuration shouldn't provide a sources file, specify your "
                "database configuration using proper configuration class (e.g. "
                "PostgresApptestConfiguration)"
            )
        except ExecutionError:
            pass
        return self.default_sources

    # web config methods needed here for cases when we use this config as a web
    # config

    def default_base_url(self):
        return BASE_URL


class BaseApptestConfiguration(TestServerConfiguration, WebConfigurationBase):
    name = "all-in-one"  # so it search for all-in-one.conf, not repository.conf
    options = cwconfig.merge_options(
        TestServerConfiguration.options + WebConfigurationBase.options
    )
    cubicweb_appobject_path = (
        TestServerConfiguration.cubicweb_appobject_path
        | WebConfigurationBase.cubicweb_appobject_path
    )
    cube_appobject_path = (
        TestServerConfiguration.cube_appobject_path
        | WebConfigurationBase.cube_appobject_path
    )

    def available_languages(self, *args):
        return self.cw_languages()


# XXX merge with BaseApptestConfiguration ?
class ApptestConfiguration(BaseApptestConfiguration):
    # `skip_db_create_and_restore` controls wether or not the test database
    # should be created / backuped / restored. If set to True, those
    # steps are completely skipped, the database is used as is and is
    # considered initialized
    skip_db_create_and_restore = False


class PostgresApptestConfiguration(ApptestConfiguration):
    default_sources = DEFAULT_PSQL_SOURCES


class RealDatabaseConfiguration(ApptestConfiguration):
    """configuration class for tests to run on a real database.

    The intialization is done by specifying a source file path.

    Important note: init_test_database / reset_test_database steps are
    skipped. It's thus up to the test developer to implement setUp/tearDown
    accordingly.

    Example usage::

      class MyTests(CubicWebTC):
          _config = RealDatabaseConfiguration('myapp',
                                              sourcefile='/path/to/sources')

          def test_something(self):
              with self.admin_access.web_request() as req:
                  rset = req.execute('Any X WHERE X is CWUser')
                  self.view('foaf', rset, req=req)

    """

    skip_db_create_and_restore = True
    read_instance_schema = True  # read schema from database


# test database handling #######################################################

DEFAULT_EMPTY_DB_ID = "__default_empty_db__"


class TestDataBaseHandler(object):
    DRIVER = None

    db_cache = {}
    explored_glob = set()

    def __init__(self, config, init_config=None):
        self.config = config
        self.init_config = init_config
        self._repo = None
        # pure consistency check
        assert self.system_source["db-driver"] == self.DRIVER

        # some handlers want to store info here, avoid a warning
        from cubicweb.server.sources.native import NativeSQLSource

        NativeSQLSource.options += (
            ("global-db-name", {"type": "string", "help": "for internal use only"}),
        )

    def _ensure_test_backup_db_dir(self):
        """Return path of directory for database backup.

        The function create it if necessary"""
        backupdir = join(self.config.apphome, "database")
        try:
            os.makedirs(backupdir)
        except:
            if not isdir(backupdir):
                raise
        # Add a .nobackup file in the database directory
        # so that backup utilities, like Borg, do not backup this directory
        open(join(backupdir, ".nobackup"), "w").close()
        return backupdir

    def config_path(self, db_id):
        """Path for config backup of a given database id"""
        return self.absolute_backup_file(db_id, "config")

    def absolute_backup_file(self, db_id, suffix):
        """Path for config backup of a given database id"""
        # in case db name is an absolute path, we don't want to replace anything
        # in parent directories
        directory, basename = split(self.dbname)
        dbname = basename.replace("-", "_")
        assert "." not in db_id
        filename = join(directory, "%s-%s.%s" % (dbname, db_id, suffix))
        return join(self._ensure_test_backup_db_dir(), filename)

    def db_cache_key(self, db_id, dbname=None):
        """Build a database cache key for a db_id with the current config

        This key is meant to be used in the cls.db_cache mapping"""
        if dbname is None:
            dbname = self.dbname
        dbname = os.path.basename(dbname)
        dbname = dbname.replace("-", "_")
        return (self.config.apphome, dbname, db_id)

    def backup_database(self, db_id):
        """Store the content of the current database as <db_id>

        The config used are also stored."""
        backup_data = self._backup_database(db_id)
        config_path = self.config_path(db_id)
        # XXX we dump a dict of the config
        # This is an experimental to help config dependant setup (like BFSS) to
        # be propertly restored
        pdir = os.path.dirname(config_path)
        with tempfile.NamedTemporaryFile(dir=pdir, delete=False) as conf_file:
            conf_file.write(pickle.dumps(dict(self.config)))
        os.rename(conf_file.name, config_path)
        self.db_cache[self.db_cache_key(db_id)] = (backup_data, config_path)

    def _backup_database(self, db_id):
        """Actual backup the current database.

        return a value to be stored in db_cache to allow restoration"""
        raise NotImplementedError()

    def restore_database(self, db_id):
        """Restore a database.

        takes as argument value stored in db_cache by self._backup_database"""
        # XXX set a clearer error message ???
        backup_coordinates, config_path = self.db_cache[self.db_cache_key(db_id)]
        # reload the config used to create the database.
        with open(config_path, "rb") as f:
            config = pickle.load(f)
        # shutdown repo before changing database content
        if self._repo is not None:
            self._repo.turn_repo_off()
        self._restore_database(backup_coordinates, config)

    def _restore_database(self, backup_coordinates, config):
        """Actual restore of the current database.

        Use the value stored in db_cache as input"""
        raise NotImplementedError()

    def get_repo(self, startup=False):
        """return Repository object on the current database.

        (turn the current repo object "on" if there is one or recreate one)
        if startup is True, server startup server hooks will be called if needed
        """
        if self._repo is None:
            self._repo = self._new_repo(self.config)
        # config has now been bootstrapped, call init_config if specified
        if self.init_config is not None:
            self.init_config(self.config)
        repo = self._repo
        repo.turn_repo_on()
        if startup and not repo._has_started:
            repo.hm.call_hooks("server_startup", repo=repo)
            repo._has_started = True
        return repo

    def _new_repo(self, config):
        """Factory method to create a new Repository Instance"""
        config._cubes = None
        repo = config.repository()
        config.repository = lambda vreg=None: repo
        # extending Repository class
        repo._has_started = False
        repo._needs_refresh = False
        repo.turn_repo_on = partial(turn_repo_on, repo)
        repo.turn_repo_off = partial(turn_repo_off, repo)
        return repo

    def get_cnx(self):
        """return Connection object on the current repository"""
        from cubicweb.repoapi import connect

        repo = self.get_repo()
        sources = self.config.read_sources_file()
        login = sources["admin"]["login"]
        password = sources["admin"]["password"] or "xxx"
        cnx = connect(repo, login, password=password)
        return cnx

    def get_repo_and_cnx(self, db_id=DEFAULT_EMPTY_DB_ID):
        """Reset database with the current db_id and return (repo, cnx)

        A database *MUST* have been build with the current <db_id> prior to
        call this method. See the ``build_db_cache`` method. The returned
        repository have it's startup hooks called and the connection is
        establised as admin."""

        self.restore_database(db_id)
        repo = self.get_repo(startup=True)
        cnx = self.get_cnx()
        return repo, cnx

    @property
    def system_source(self):
        return self.config.system_source_config

    @property
    def dbname(self):
        return self.system_source["db-name"]

    def init_test_database(self):
        """actual initialisation of the database"""
        raise ValueError("no initialization function for driver %r" % self.DRIVER)

    def has_cache(self, db_id):
        """Check if a given database id exist in db cache for the current config"""
        key = self.db_cache_key(db_id)
        if key in self.db_cache:
            return True
        self.discover_cached_db()
        return key in self.db_cache

    def discover_cached_db(self):
        """Search available db_if for the current config"""
        cache_glob = self.absolute_backup_file("*", "*")
        directory = os.path.dirname(cache_glob)
        entries = {}
        candidates = glob.glob(cache_glob)
        for filepath in candidates:
            data = os.path.basename(filepath)
            # database backup are in the forms are <dbname>-<db_id>.<backtype>
            dbname, data = data.split("-", 1)
            db_id, filetype = data.split(".", 1)
            entries.setdefault((dbname, db_id), {})[filetype] = filepath
        for (dbname, db_id), entry in entries.items():
            # apply necessary transformation from the driver
            value = self.process_cache_entry(directory, dbname, db_id, entry)
            assert "config" in entry
            if value is not None:  # None value means "not handled by this driver"
                key = self.db_cache_key(db_id, dbname=dbname)
                self.db_cache[key] = value, entry["config"]
        self.explored_glob.add(cache_glob)

    def process_cache_entry(self, directory, dbname, db_id, entry):
        """Transforms potential cache entry to proper backup coordinate

        entry argument is a "filetype" -> "filepath" mapping
        Return None if an entry should be ignored."""
        return None

    def build_db_cache(self, test_db_id=DEFAULT_EMPTY_DB_ID, pre_setup_func=None):
        """Build Database cache for ``test_db_id`` if a cache doesn't exist

        if ``test_db_id is DEFAULT_EMPTY_DB_ID`` self.init_test_database is
        called. otherwise, DEFAULT_EMPTY_DB_ID is build/restored and
        ``pre_setup_func`` to setup the database.

        This function backup any database it build"""
        lockfile = join(self._ensure_test_backup_db_dir(), "{}.lock".format(test_db_id))
        with filelock.FileLock(lockfile):
            if self.has_cache(test_db_id):
                return  # test_db_id, 'already in cache'
            if test_db_id is DEFAULT_EMPTY_DB_ID:
                self.init_test_database()
            else:
                print("Building %s for database %s" % (test_db_id, self.dbname))
                self.build_db_cache(DEFAULT_EMPTY_DB_ID)
                self.restore_database(DEFAULT_EMPTY_DB_ID)
                self.get_repo(startup=True)
                cnx = self.get_cnx()
                with cnx:
                    pre_setup_func(cnx, self.config)
                    cnx.commit()
            self.backup_database(test_db_id)


class NoCreateDropDatabaseHandler(TestDataBaseHandler):
    """This handler is used if config.skip_db_create_and_restore is True

    This is typically the case with RealDBConfig. In that case,
    we explicitely want to skip init / backup / restore phases.

    This handler redefines the three corresponding methods and delegates
    to original handler for any other method / attribute
    """

    def __init__(self, base_handler):
        self.base_handler = base_handler

    # override init / backup / restore methods
    def init_test_database(self):
        pass

    def backup_database(self, db_id):
        pass

    def restore_database(self, db_id):
        pass

    # delegate to original handler in all other cases
    def __getattr__(self, attrname):
        return getattr(self.base_handler, attrname)


### postgres test database handling ############################################


def startpgcluster(pyfile):
    """Start a postgresql cluster next to pyfile"""
    datadir = join(
        os.path.dirname(pyfile),
        "data",
        "database",
        "pgdb-%s" % os.path.splitext(os.path.basename(pyfile))[0],
    )
    if not exists(datadir):
        try:
            subprocess.check_call(
                ["initdb", "-D", datadir, "-E", "utf-8", "--locale=C"]
            )

        except OSError as err:
            if err.errno == errno.ENOENT:
                raise OSError(
                    '"initdb" could not be found. '
                    "You should add the postgresql bin folder to your PATH "
                    "(/usr/lib/postgresql/9.1/bin for example)."
                )
            raise
    datadir = os.path.abspath(datadir)
    pgport = "5432"
    env = os.environ.copy()
    sockdir = tempfile.mkdtemp(prefix="cwpg")
    DEFAULT_PSQL_SOURCES["system"]["db-host"] = sockdir
    DEFAULT_PSQL_SOURCES["system"]["db-port"] = pgport
    options = '-h "" -k %s -p %s' % (sockdir, pgport)
    options += " -c fsync=off -c full_page_writes=off"
    options += " -c synchronous_commit=off"

    command = ["pg_ctl", "start", "-w", "-D", datadir, "-o", options]
    try:
        # don't use check_output or run() here, it will deadlock
        p = subprocess.Popen(
            command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=env
        )
        p.wait()

        if p.returncode != 0:
            printable_command = " ".join(shlex.quote(arg) for arg in command)
            raise Exception(
                f'"pg_ctl" command failed with the return code '
                f'"{p.returncode}".\n\nFull command:\n\n    {printable_command}\n\n'
                f'Output:\n\n{p.stdout.read().decode("Utf-8")}'
            )

    except OSError as err:
        try:
            os.rmdir(sockdir)
        except OSError:
            pass
        if err.errno == errno.ENOENT:
            raise OSError(
                '"pg_ctl" could not be found. '
                "You should add the postgresql bin folder to your PATH "
                "(/usr/lib/postgresql/9.1/bin for example)."
            )
        raise


def stoppgcluster(pyfile):
    """Kill the postgresql cluster running next to pyfile"""
    datadir = join(
        os.path.dirname(pyfile),
        "data",
        "database",
        "pgdb-%s" % os.path.splitext(os.path.basename(pyfile))[0],
    )
    subprocess.call(["pg_ctl", "stop", "-D", datadir, "-m", "fast"])
    try:
        os.rmdir(DEFAULT_PSQL_SOURCES["system"]["db-host"])
    except OSError:
        pass


class PostgresTestDataBaseHandler(TestDataBaseHandler):
    DRIVER = "postgres"

    # Separate db_cache for PG databases, to avoid collisions with sqlite dbs
    db_cache = {}
    explored_glob = set()

    __CTL = set()

    def __init__(self, *args, **kwargs):
        super(PostgresTestDataBaseHandler, self).__init__(*args, **kwargs)
        if "global-db-name" not in self.system_source:
            self.system_source["global-db-name"] = self.system_source["db-name"]
            self.system_source["db-name"] = self.system_source["db-name"] + str(
                os.getpid()
            )

    @property
    @cached
    def helper(self):
        from logilab.database import get_db_helper

        return get_db_helper("postgres")

    @property
    def dbname(self):
        return self.system_source["global-db-name"]

    @property
    def dbcnx(self):
        try:
            return self._cnx
        except AttributeError:
            from cubicweb.server.serverctl import _db_sys_cnx

            try:
                self._cnx = _db_sys_cnx(
                    self.system_source,
                    "CREATE DATABASE and / or USER",
                    interactive=False,
                )
                return self._cnx
            except Exception:
                self._cnx = None
                raise

    @property
    @cached
    def cursor(self):
        return self.dbcnx.cursor()

    def process_cache_entry(self, directory, dbname, db_id, entry):
        backup_name = self._backup_name(db_id)
        if backup_name in self.helper.list_databases(self.cursor):
            return backup_name
        return None

    def has_cache(self, db_id):
        backup_name = self._backup_name(db_id)
        return super(PostgresTestDataBaseHandler, self).has_cache(
            db_id
        ) and backup_name in self.helper.list_databases(self.cursor)

    def init_test_database(self):
        """initialize a fresh postgresql database used for testing purpose"""
        from cubicweb.server import init_repository
        from cubicweb.server.serverctl import source_cnx, createdb

        # connect on the dbms system base to create our base
        try:
            self._drop(self.system_source["db-name"])
            createdb(self.helper, self.system_source, self.dbcnx, self.cursor)
            self.dbcnx.commit()
            cnx = source_cnx(
                self.system_source, special_privs="LANGUAGE C", interactive=False
            )
            templcursor = cnx.cursor()
            try:
                # XXX factorize with db-create code
                self.helper.init_fti_extensions(templcursor)
                # install plpythonu/plpgsql language if not installed by the cube
                langs = ("plpgsql",)
                for extlang in langs:
                    self.helper.create_language(templcursor, extlang)
                cnx.commit()
            finally:
                templcursor.close()
                cnx.close()
            init_repository(
                self.config, interactive=False, init_config=self.init_config
            )
        except BaseException:
            if self.dbcnx is not None:
                self.dbcnx.rollback()
            sys.stderr.write("building %s failed\n" % self.dbname)
            raise

    def helper_clear_cache(self):
        # clear_cache might be unavailable during process termination, see:
        # https://docs.python.org/3/reference/datamodel.html#object.__del__

        if self.dbcnx is not None:
            self.dbcnx.commit()
            self.dbcnx.close()
            del self._cnx
            if clear_cache is not None:
                clear_cache(self, "cursor")

        if clear_cache is not None:
            clear_cache(self, "helper")

    def __del__(self):
        self.helper_clear_cache()

    @property
    def _config_id(self):
        return sha1(self.config.apphome.encode("utf-8")).hexdigest()[:10]

    def _backup_name(self, db_id):  # merge me with parent
        backup_name = "_".join(("cache", self._config_id, self.dbname, db_id))
        return backup_name.lower()

    def _drop(self, db_name):
        if db_name in self.helper.list_databases(self.cursor):
            self.cursor.execute("DROP DATABASE %s" % db_name)
            self.dbcnx.commit()

    def _backup_database(self, db_id):
        """Actual backup the current database.

        return a value to be stored in db_cache to allow restoration
        """
        from cubicweb.server.serverctl import createdb

        orig_name = self.system_source["db-name"]
        try:
            backup_name = self._backup_name(db_id)
            self._drop(backup_name)
            self.system_source["db-name"] = backup_name
            if self._repo:
                self._repo.turn_repo_off()
            try:
                createdb(
                    self.helper,
                    self.system_source,
                    self.dbcnx,
                    self.cursor,
                    template=orig_name,
                )
                self.dbcnx.commit()
            finally:
                if self._repo:
                    self._repo.turn_repo_on()
            return backup_name
        finally:
            self.system_source["db-name"] = orig_name

    def _restore_database(self, backup_coordinates, config):
        from cubicweb.server.serverctl import createdb

        """Actual restore of the current database.

        Use the value tostored in db_cache as input """
        self._drop(self.system_source["db-name"])
        createdb(
            self.helper,
            self.system_source,
            self.dbcnx,
            self.cursor,
            template=backup_coordinates,
        )
        self.dbcnx.commit()


# sqlite test database handling ################################################


class SQLiteTestDataBaseHandler(TestDataBaseHandler):
    DRIVER = "sqlite"

    __TMPDB = set()

    @classmethod
    def _cleanup_all_tmpdb(cls):
        for dbpath in cls.__TMPDB:
            cls._cleanup_database(dbpath)

    def __init__(self, *args, **kwargs):
        super(SQLiteTestDataBaseHandler, self).__init__(*args, **kwargs)
        # use a dedicated base for each process.
        if "global-db-name" not in self.system_source:
            self.system_source["global-db-name"] = self.system_source["db-name"]
            process_db = self.system_source["db-name"] + str(os.getpid())
            self.system_source["db-name"] = process_db
        process_db = self.absolute_dbfile()  # update db-name to absolute path
        self.__TMPDB.add(process_db)

    @staticmethod
    def _cleanup_database(dbfile):
        try:
            os.remove(dbfile)
            os.remove("%s-journal" % dbfile)
        except OSError:
            pass

    @property
    def dbname(self):
        return self.system_source["global-db-name"]

    def absolute_dbfile(self):
        """absolute path of current database file"""
        dbfile = join(self._ensure_test_backup_db_dir(), self.system_source["db-name"])
        self.system_source["db-name"] = dbfile
        return dbfile

    def process_cache_entry(self, directory, dbname, db_id, entry):
        return entry.get("sqlite")

    def _backup_database(self, db_id=DEFAULT_EMPTY_DB_ID):
        # XXX remove database file if it exists ???
        dbfile = self.absolute_dbfile()
        backup_file = self.absolute_backup_file(db_id, "sqlite")
        shutil.copy(dbfile, backup_file)
        return backup_file

    def _restore_database(self, backup_coordinates, _config):
        # remove database file if it exists ?
        dbfile = self.absolute_dbfile()
        self._cleanup_database(dbfile)
        shutil.copy(backup_coordinates, dbfile)
        self.get_repo()

    def init_test_database(self):
        """initialize a fresh sqlite databse used for testing purpose"""
        # initialize the database
        from cubicweb.server import init_repository

        self._cleanup_database(self.absolute_dbfile())
        init_repository(self.config, interactive=False, init_config=self.init_config)


atexit.register(SQLiteTestDataBaseHandler._cleanup_all_tmpdb)


HANDLERS = {}


def register_handler(handlerkls, overwrite=False):
    assert handlerkls is not None
    if overwrite or handlerkls.DRIVER not in HANDLERS:
        HANDLERS[handlerkls.DRIVER] = handlerkls
    else:
        msg = (
            "%s: Handler already exists use overwrite if it's intended\n"
            "(existing handler class is %r)"
        )
        raise ValueError(msg % (handlerkls.DRIVER, HANDLERS[handlerkls.DRIVER]))


register_handler(PostgresTestDataBaseHandler)
register_handler(SQLiteTestDataBaseHandler)


class HCache(object):
    """Handler cache object: store database handler for a given configuration.

    We only keep one repo in cache to prevent too much objects to stay alive
    (database handler holds a reference to a repository). As at the moment a new
    handler is created for each TestCase class and all test methods are executed
    sequentially whithin this class, there should not have more cache miss that
    if we had a wider cache as once a Handler stop being used it won't be used
    again.
    """

    def __init__(self):
        self.config = None
        self.handler = None

    def get(self, config):
        if config is self.config:
            return self.handler
        else:
            return None

    def set(self, config, handler):
        self.config = config
        self.handler = handler


HCACHE = HCache()

# XXX a class method on Test ?
_CONFIG = None


def get_test_db_handler(config, init_config=None):
    global _CONFIG
    if _CONFIG is not None and config is not _CONFIG:
        from logilab.common.modutils import cleanup_sys_modules

        # cleanup all dynamically loaded modules and everything in the instance
        # directory
        apphome = _CONFIG.apphome
        if apphome:  # may be unset in tests
            cleanup_sys_modules([apphome])
        # also cleanup sys.path
        if apphome in sys.path:
            sys.path.remove(apphome)
    _CONFIG = config
    config.adjust_sys_path()
    handler = HCACHE.get(config)
    if handler is not None:
        return handler
    driver = config.system_source_config["db-driver"]
    handlerkls = HANDLERS.get(driver, None)
    if handlerkls is not None:
        handler = handlerkls(config, init_config)
        if config.skip_db_create_and_restore:
            handler = NoCreateDropDatabaseHandler(handler)
        HCACHE.set(config, handler)
        return handler
    else:
        raise ValueError("no initialization function for driver %r" % driver)