Source code for cubicweb.dataimport.pgstore

# copyright 2003-2015 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
#
# This file is part of CubicWeb.
#
# CubicWeb is free software: you can redistribute it and/or modify it under the
# terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 2.1 of the License, or (at your option)
# any later version.
#
# CubicWeb is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
"""Postgres specific store"""
from __future__ import print_function

import warnings
import os.path as osp
from io import StringIO
from time import asctime
from datetime import date, datetime, time
from collections import defaultdict
from base64 import b64encode

from six import (string_types, integer_types, text_type, binary_type,
                 add_metaclass)
from six.moves import cPickle as pickle, range

from logilab.common.deprecation import class_deprecated

from cubicweb.utils import make_uid
from cubicweb.server.sqlutils import SQL_PREFIX
from cubicweb.dataimport.stores import NoHookRQLObjectStore


def _execmany_thread_not_copy_from(cu, statement, data, table=None,
                                   columns=None, encoding='utf-8'):
    """ Execute thread without copy from
    """
    cu.executemany(statement, data)


def _execmany_thread_copy_from(cu, statement, data, table,
                               columns, encoding='utf-8'):
    """ Execute thread with copy from
    """
    try:
        buf = _create_copyfrom_buffer(data, columns, encoding=encoding)
    except ValueError:
        _execmany_thread_not_copy_from(cu, statement, data)
    else:
        if columns is None:
            cu.copy_from(buf, table, null=u'NULL')
        else:
            cu.copy_from(buf, table, null=u'NULL', columns=columns)


def _execmany_thread(sql_connect, statements, dump_output_dir=None,
                     support_copy_from=True, encoding='utf-8'):
    """
    Execute sql statement. If 'INSERT INTO', try to use 'COPY FROM' command,
    or fallback to execute_many.
    """
    if support_copy_from:
        execmany_func = _execmany_thread_copy_from
    else:
        execmany_func = _execmany_thread_not_copy_from
    cnx = sql_connect()
    cu = cnx.cursor()
    try:
        for statement, data in statements:
            table = None
            columns = None
            try:
                if not statement.startswith('INSERT INTO'):
                    cu.executemany(statement, data)
                    continue
                table = statement.split()[2]
                if isinstance(data[0], (tuple, list)):
                    columns = None
                else:
                    columns = list(data[0])
                execmany_func(cu, statement, data, table, columns, encoding)
            except Exception:
                print('unable to copy data into table %s' % table)
                # Error in import statement, save data in dump_output_dir
                if dump_output_dir is not None:
                    pdata = {'data': data, 'statement': statement,
                             'time': asctime(), 'columns': columns}
                    filename = make_uid()
                    try:
                        with open(osp.join(dump_output_dir,
                                           '%s.pickle' % filename), 'wb') as fobj:
                            pickle.dump(pdata, fobj)
                    except IOError:
                        print('ERROR while pickling in', dump_output_dir, filename+'.pickle')
                cnx.rollback()
                raise
    finally:
        cnx.commit()
        cu.close()


def _copyfrom_buffer_convert_None(value, **opts):
    '''Convert None value to "NULL"'''
    return u'NULL'

def _copyfrom_buffer_convert_number(value, **opts):
    '''Convert a number into its string representation'''
    return text_type(value)

def _copyfrom_buffer_convert_string(value, **opts):
    '''Convert string value.
    '''
    escape_chars = ((u'\\', u'\\\\'), (u'\t', u'\\t'), (u'\r', u'\\r'),
                    (u'\n', u'\\n'))
    for char, replace in escape_chars:
        value = value.replace(char, replace)
    return value

def _copyfrom_buffer_convert_date(value, **opts):
    '''Convert date into "YYYY-MM-DD"'''
    # Do not use strftime, as it yields issue with date < 1900
    # (http://bugs.python.org/issue1777412)
    return u'%04d-%02d-%02d' % (value.year, value.month, value.day)

def _copyfrom_buffer_convert_datetime(value, **opts):
    '''Convert date into "YYYY-MM-DD HH:MM:SS.UUUUUU"'''
    # Do not use strftime, as it yields issue with date < 1900
    # (http://bugs.python.org/issue1777412)
    return u'%s %s' % (_copyfrom_buffer_convert_date(value, **opts),
                       _copyfrom_buffer_convert_time(value, **opts))

def _copyfrom_buffer_convert_time(value, **opts):
    '''Convert time into "HH:MM:SS.UUUUUU"'''
    return u'%02d:%02d:%02d.%06d' % (value.hour, value.minute,
                                     value.second, value.microsecond)

# (types, converter) list.
_COPYFROM_BUFFER_CONVERTERS = [
    (type(None), _copyfrom_buffer_convert_None),
    (integer_types + (float,), _copyfrom_buffer_convert_number),
    (string_types, _copyfrom_buffer_convert_string),
    (datetime, _copyfrom_buffer_convert_datetime),
    (date, _copyfrom_buffer_convert_date),
    (time, _copyfrom_buffer_convert_time),
]

def _create_copyfrom_buffer(data, columns=None, **convert_opts):
    """
    Create a StringIO buffer for 'COPY FROM' command.
    Deals with Unicode, Int, Float, Date... (see ``converters``)

    :data: a sequence/dict of tuples
    :columns: list of columns to consider (default to all columns)
    :converter_opts: keyword arguements given to converters
    """
    # Create a list rather than directly create a StringIO
    # to correctly write lines separated by '\n' in a single step
    rows = []
    if columns is None:
        if isinstance(data[0], (tuple, list)):
            columns = list(range(len(data[0])))
        elif isinstance(data[0], dict):
            columns = data[0].keys()
        else:
            raise ValueError('Could not get columns: you must provide columns.')
    for row in data:
        # Iterate over the different columns and the different values
        # and try to convert them to a correct datatype.
        # If an error is raised, do not continue.
        formatted_row = []
        for col in columns:
            try:
                value = row[col]
            except KeyError:
                warnings.warn(u"Column %s is not accessible in row %s"
                              % (col, row), RuntimeWarning)
                # XXX 'value' set to None so that the import does not end in
                # error.
                # Instead, the extra keys are set to NULL from the
                # database point of view.
                value = None
            for types, converter in _COPYFROM_BUFFER_CONVERTERS:
                if isinstance(value, types):
                    value = converter(value, **convert_opts)
                    assert isinstance(value, text_type)
                    break
            else:
                raise ValueError("Unsupported value type %s" % type(value))
            # We push the value to the new formatted row
            # if the value is not None and could be converted to a string.
            formatted_row.append(value)
        rows.append('\t'.join(formatted_row))
    return StringIO('\n'.join(rows))


@add_metaclass(class_deprecated)
[docs]class SQLGenObjectStore(NoHookRQLObjectStore): """Controller of the data import process. This version is based on direct insertions throught SQL command (COPY FROM or execute many). >>> store = SQLGenObjectStore(cnx) >>> store.create_entity('Person', ...) >>> store.flush() """ __deprecation_warning__ = '[3.23] this class is deprecated, use MassiveObjectStore instead' def __init__(self, cnx, dump_output_dir=None, nb_threads_statement=1): """ Initialize a SQLGenObjectStore. Parameters: - cnx: connection on the cubicweb instance - dump_output_dir: a directory to dump failed statements for easier recovery. Default is None (no dump). """ super(SQLGenObjectStore, self).__init__(cnx) ### hijack default source self._system_source = SQLGenSourceWrapper( self._system_source, cnx.vreg.schema, dump_output_dir=dump_output_dir) ### XXX This is done in super().__init__(), but should be ### redone here to link to the correct source self._add_relation = self._system_source.add_relation self.indexes_etypes = {} if nb_threads_statement != 1: warnings.warn('[3.21] SQLGenObjectStore is no longer threaded', DeprecationWarning)
[docs] def flush(self): """Flush data to the database""" self._system_source.flush()
def relate(self, subj_eid, rtype, obj_eid, **kwargs): if subj_eid is None or obj_eid is None: return # XXX Could subjtype be inferred ? self._add_relation(self._cnx, subj_eid, rtype, obj_eid, self.rschema(rtype).inlined, **kwargs) if self.rschema(rtype).symmetric: self._add_relation(self._cnx, obj_eid, rtype, subj_eid, self.rschema(rtype).inlined, **kwargs)
[docs] def drop_indexes(self, etype): """Drop indexes for a given entity type""" if etype not in self.indexes_etypes: cu = self._cnx.cnxset.cu def index_to_attr(index): """turn an index name to (database) attribute name""" return index.replace(etype.lower(), '').replace('idx', '').strip('_') indices = [(index, index_to_attr(index)) for index in self._system_source.dbhelper.list_indices(cu, etype) # Do not consider 'cw_etype_pkey' index if not index.endswith('key')] self.indexes_etypes[etype] = indices for index, attr in self.indexes_etypes[etype]: self._cnx.system_sql('DROP INDEX %s' % index)
[docs] def create_indexes(self, etype): """Recreate indexes for a given entity type""" for index, attr in self.indexes_etypes.get(etype, []): sql = 'CREATE INDEX %s ON cw_%s(%s)' % (index, etype, attr) self._cnx.system_sql(sql)
########################################################################### ## SQL Source ############################################################# ########################################################################### class SQLGenSourceWrapper(object): def __init__(self, system_source, schema, dump_output_dir=None): self.system_source = system_source # Explicitely backport attributes from system source self._storage_handler = self.system_source._storage_handler self.preprocess_entity = self.system_source.preprocess_entity self.sqlgen = self.system_source.sqlgen self.uri = self.system_source.uri self.eid = self.system_source.eid # Directory to write temporary files self.dump_output_dir = dump_output_dir # Allow to execute code with SQLite backend that does # not support (yet...) copy_from # XXX Should be dealt with in logilab.database spcfrom = system_source.dbhelper.dbapi_module.support_copy_from self.support_copy_from = spcfrom self.dbencoding = system_source.dbhelper.dbencoding self.init_statement_lists() self._inlined_rtypes_cache = {} self._fill_inlined_rtypes_cache(schema) self.schema = schema self.do_fti = False def _fill_inlined_rtypes_cache(self, schema): cache = self._inlined_rtypes_cache for eschema in schema.entities(): for rschema in eschema.ordered_relations(): if rschema.inlined: cache[eschema.type] = SQL_PREFIX + rschema.type def init_statement_lists(self): self._sql_entities = defaultdict(list) self._sql_relations = {} self._sql_inlined_relations = {} self._sql_eids = defaultdict(list) # keep track, for each eid of the corresponding data dict self._sql_eid_insertdicts = {} def flush(self): print('starting flush') _entities_sql = self._sql_entities _relations_sql = self._sql_relations _inlined_relations_sql = self._sql_inlined_relations _insertdicts = self._sql_eid_insertdicts try: # try, for each inlined_relation, to find if we're also creating # the host entity (i.e. the subject of the relation). # In that case, simply update the insert dict and remove # the need to make the # UPDATE statement for statement, datalist in _inlined_relations_sql.items(): new_datalist = [] # for a given inlined relation, # browse each couple to be inserted for data in datalist: keys = list(data) # For inlined relations, it exists only two case: # (rtype, cw_eid) or (cw_eid, rtype) if keys[0] == 'cw_eid': rtype = keys[1] else: rtype = keys[0] updated_eid = data['cw_eid'] if updated_eid in _insertdicts: _insertdicts[updated_eid][rtype] = data[rtype] else: # could not find corresponding insert dict, keep the # UPDATE query new_datalist.append(data) _inlined_relations_sql[statement] = new_datalist _execmany_thread(self.system_source.get_connection, list(self._sql_eids.items()) + list(_entities_sql.items()) + list(_relations_sql.items()) + list(_inlined_relations_sql.items()), dump_output_dir=self.dump_output_dir, support_copy_from=self.support_copy_from, encoding=self.dbencoding) finally: _entities_sql.clear() _relations_sql.clear() _insertdicts.clear() _inlined_relations_sql.clear() def add_relation(self, cnx, subject, rtype, object, inlined=False, **kwargs): if inlined: _sql = self._sql_inlined_relations data = {'cw_eid': subject, SQL_PREFIX + rtype: object} subjtype = kwargs.get('subjtype') if subjtype is None: # Try to infer it targets = [t.type for t in self.schema.rschema(rtype).subjects()] if len(targets) == 1: subjtype = targets[0] else: raise ValueError('You should give the subject etype for ' 'inlined relation %s' ', as it cannot be inferred: ' 'this type is given as keyword argument ' '``subjtype``' % rtype) statement = self.sqlgen.update(SQL_PREFIX + subjtype, data, ['cw_eid']) else: _sql = self._sql_relations data = {'eid_from': subject, 'eid_to': object} statement = self.sqlgen.insert('%s_relation' % rtype, data) if statement in _sql: _sql[statement].append(data) else: _sql[statement] = [data] def add_entity(self, cnx, entity): with self._storage_handler(cnx, entity, 'added'): attrs = self.preprocess_entity(entity) rtypes = self._inlined_rtypes_cache.get(entity.cw_etype, ()) if isinstance(rtypes, str): rtypes = (rtypes,) for rtype in rtypes: if rtype not in attrs: attrs[rtype] = None sql = self.sqlgen.insert(SQL_PREFIX + entity.cw_etype, attrs) self._sql_eid_insertdicts[entity.eid] = attrs self._append_to_entities(sql, attrs) def _append_to_entities(self, sql, attrs): self._sql_entities[sql].append(attrs) def _handle_insert_entity_sql(self, cnx, sql, attrs): # We have to overwrite the source given in parameters # as here, we directly use the system source attrs['asource'] = self.system_source.uri self._sql_eids[sql].append(attrs) def _handle_is_relation_sql(self, cnx, sql, attrs): self._append_to_entities(sql, attrs) def _handle_is_instance_of_sql(self, cnx, sql, attrs): self._append_to_entities(sql, attrs) def _handle_source_relation_sql(self, cnx, sql, attrs): self._append_to_entities(sql, attrs) # add_info is _copypasted_ from the one in NativeSQLSource. We want it # there because it will use the _handlers of the SQLGenSourceWrapper, which # are not like the ones in the native source. def add_info(self, cnx, entity, source, extid): """add type and source info for an eid into the system table""" # begin by inserting eid/type/source/extid into the entities table if extid is not None: assert isinstance(extid, binary_type) extid = b64encode(extid).decode('ascii') attrs = {'type': entity.cw_etype, 'eid': entity.eid, 'extid': extid, 'asource': source.uri} self._handle_insert_entity_sql(cnx, self.sqlgen.insert('entities', attrs), attrs) # insert core relations: is, is_instance_of and cw_source self._handle_is_relation_sql(cnx, 'INSERT INTO is_relation(eid_from,eid_to) VALUES (%s,%s)', (entity.eid, entity.e_schema.eid)) for eschema in entity.e_schema.ancestors() + [entity.e_schema]: self._handle_is_relation_sql(cnx, 'INSERT INTO is_instance_of_relation(eid_from,eid_to) VALUES (%s,%s)', (entity.eid, eschema.eid)) self._handle_is_relation_sql(cnx, 'INSERT INTO cw_source_relation(eid_from,eid_to) VALUES (%s,%s)', (entity.eid, source.eid)) # now we can update the full text index if self.do_fti and self.need_fti_indexation(entity.cw_etype): self.index_entity(cnx, entity=entity)