Source code for cubicweb.dataimport.csv

# copyright 2003-2015 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact https://www.logilab.fr/ -- mailto:contact@logilab.fr
#
# This file is part of CubicWeb.
#
# CubicWeb is free software: you can redistribute it and/or modify it under the
# terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 2.1 of the License, or (at your option)
# any later version.
#
# CubicWeb is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with CubicWeb.  If not, see <https://www.gnu.org/licenses/>.
"""Functions to help importing CSV data"""
import codecs
import csv as csvmod

from logilab.common import shellutils


[docs]def count_lines(stream_or_filename): if isinstance(stream_or_filename, str): f = open(stream_or_filename) else: f = stream_or_filename f.seek(0) i = 0 # useful is f is an empty file for i, line in enumerate(f): pass f.seek(0) return i + 1
[docs]def ucsvreader_pb( stream_or_path, encoding="utf-8", delimiter=",", quotechar='"', skipfirst=False, withpb=True, skip_empty=True, ): """same as :func:`ucsvreader` but a progress bar is displayed as we iter on rows""" if isinstance(stream_or_path, str): stream = open(stream_or_path, "rb") else: stream = stream_or_path rowcount = count_lines(stream) if skipfirst: rowcount -= 1 if withpb: pb = shellutils.ProgressBar(rowcount, 50) for urow in ucsvreader( stream, encoding, delimiter, quotechar, skipfirst=skipfirst, skip_empty=skip_empty, ): yield urow if withpb: pb.update() print(" %s rows imported" % rowcount)
[docs]def ucsvreader( stream, encoding="utf-8", delimiter=",", quotechar='"', skipfirst=False, ignore_errors=False, skip_empty=True, ): """A csv reader that accepts files with any encoding and outputs unicode strings if skip_empty (the default), lines without any values specified (only separators) will be skipped. This is useful for Excel exports which may be full of such lines. """ stream = codecs.getreader(encoding)(stream) it = iter(csvmod.reader(stream, delimiter=delimiter, quotechar=quotechar)) if not ignore_errors: if skipfirst: next(it) for row in it: if not skip_empty or any(row): yield row else: if skipfirst: try: row = next(it) except csvmod.Error: pass # Safe version, that can cope with error in CSV file while True: try: row = next(it) # End of CSV, break except StopIteration: break # Error in CSV, ignore line and continue except csvmod.Error: continue if not skip_empty or any(row): yield row