Source code for cubicweb.req

# copyright 2003-2016 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact https://www.logilab.fr/ -- mailto:contact@logilab.fr
#
# This file is part of CubicWeb.
#
# CubicWeb is free software: you can redistribute it and/or modify it under the
# terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 2.1 of the License, or (at your option)
# any later version.
#
# CubicWeb is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with CubicWeb.  If not, see <https://www.gnu.org/licenses/>.
"""Base class for request/session"""

from datetime import time, datetime, timedelta
from urllib.parse import (
    parse_qs,
    parse_qsl,
    quote as urlquote,
    unquote as urlunquote,
    urlsplit,
    urlunsplit,
)

from logilab.common.date import ustrftime, todate, todatetime
from logilab.common.decorators import cached
from rql.utils import rqlvar_maker

from cubicweb import Unauthorized, NoSelectableObject, uilib
from cubicweb.rset import ResultSet

ONESECOND = timedelta(0, 1, 0)
CACHE_REGISTRY = {}


class Cache(dict):
    def __init__(self):
        super(Cache, self).__init__()
        _now = datetime.now()
        self.cache_creation_date = _now
        self.latest_cache_lookup = _now


[docs]class RequestSessionBase(object): """base class containing stuff shared by server session and web request request/session is the main resources accessor, mainly through it's vreg attribute: :attribute vreg: the instance's registry :attribute vreg.schema: the instance's schema :attribute vreg.config: the instance's configuration """ is_request = True # False for repository session def __init__(self, vreg): self.vreg = vreg try: encoding = vreg.property_value("ui.encoding") except Exception: # no vreg or property not registered encoding = "utf-8" self.encoding = encoding # cache result of execution for (rql expr / eids), # should be emptied on commit/rollback of the server session / web # connection self.user = None self.lang = None self.local_perm_cache = {} self._ = str def _set_user(self, orig_user): """set the user for this req_session_base A special method is needed to ensure the linked user is linked to the connection too. """ rset = self.eid_rset(orig_user.eid, "CWUser") user = self.vreg["etypes"].etype_class("CWUser")(self, rset, row=0) user.cw_attr_cache["login"] = orig_user.login # cache login self.user = user self.set_entity_cache(user)
[docs] def set_language(self, lang): """install i18n configuration for `lang` translation. Raises :exc:`KeyError` if translation doesn't exist. """ self.lang = lang try: gettext, pgettext = self.vreg.config.translations[lang] except KeyError: assert self.vreg.config.mode == "test" gettext = str def pgettext(x, y): return str(y) # use _cw.__ to translate a message without registering it to the catalog self._ = self.__ = gettext self.pgettext = pgettext
def get_option_value(self, option): raise NotImplementedError
[docs] def property_value(self, key): """return value of the property with the given key, giving priority to user specific value if any, else using site value """ if self.user: val = self.user.property_value(key) if val is not None: return val return self.vreg.property_value(key)
[docs] def etype_rset(self, etype, size=1): """return a fake result set for a particular entity type""" rset = ResultSet([("A",)] * size, "%s X" % etype, description=[(etype,)] * size) def get_entity(row, col=0, etype=etype, req=self, rset=rset): return req.vreg["etypes"].etype_class(etype)(req, rset, row, col) rset.get_entity = get_entity rset.req = self return rset
[docs] def eid_rset(self, eid, etype=None): """return a result set for the given eid without doing actual query (we have the eid, we can suppose it exists and user has access to the entity) """ eid = int(eid) if etype is None: etype = self.entity_type(eid) rset = ResultSet([(eid,)], "Any X WHERE X eid %(x)s", {"x": eid}, [(etype,)]) rset.req = self return rset
[docs] def empty_rset(self): """return a guaranteed empty result""" rset = ResultSet([], "Any X WHERE X eid -1") rset.req = self return rset
[docs] def entity_from_eid(self, eid, etype=None): """return an entity instance for the given eid. No query is done""" try: return self.entity_cache(eid) except KeyError: rset = self.eid_rset(eid, etype) entity = rset.get_entity(0, 0) self.set_entity_cache(entity) return entity
def entity_cache(self, eid): raise KeyError def set_entity_cache(self, entity): pass
[docs] def create_entity(self, etype, **kwargs): """add a new entity of the given type Example (in a shell session): >>> c = create_entity('Company', name=u'Logilab') >>> create_entity('Person', firstname=u'John', surname=u'Doe', ... works_for=c) """ cls = self.vreg["etypes"].etype_class(etype) return cls.cw_instantiate(self.execute, **kwargs)
def _build_rql_request(self, start, etype, **kwargs): parts = ["{:s} WHERE X is {:s}".format(start, etype)] varmaker = rqlvar_maker(defined="X") eschema = self.vreg.schema.entity_schema_for(etype) for attr, value in kwargs.items(): if isinstance(value, (list, tuple)): raise NotImplementedError( "{}: list of values are not supported".format(attr) ) if hasattr(value, "eid"): kwargs[attr] = value.eid if attr.startswith("reverse_"): attr = attr[8:] if attr not in eschema.object_relations: raise KeyError( "{:s} not in {} object relations".format(attr, eschema) ) parts.append( "{var} {attr} X, {var} eid %(reverse_{attr})s".format( var=next(varmaker), attr=attr ) ) else: rel = eschema.subject_relations.get(attr) if rel is None: raise KeyError( "{} not in {} subject relations".format(attr, eschema) ) if rel.final: parts.append("X {attr} %({attr})s".format(attr=attr)) else: parts.append( "X {attr} {var}, {var} eid %({attr})s".format( attr=attr, var=next(varmaker) ) ) rql = ", ".join(parts) return rql, kwargs
[docs] def exists(self, etype, **kwargs): """return if it exists at least one entity of the given type and attribute values. >>> is_stallman_there = cnx.exists('CWUser', login=u"rms") >>> # more optimized than >>> is_stallman_there = bool(cnx.find('CWUser', login=u"rms")) >>> # it did a full scan table :-/ """ try: del kwargs["limit"] except KeyError: pass return bool(self.find(etype, limit=1, **kwargs))
[docs] def find(self, etype, limit=None, **kwargs): """find entities of the given type and attribute values. :param etype: the type of entities to return. :param limit: the max number of entities to return. :returns: A :class:`ResultSet` >>> users = find('CWGroup', name=u"users").one() >>> groups = find('CWGroup').entities() """ base_rql = "Any X" if limit is not None: base_rql = f"{base_rql} LIMIT {limit}" rql, kwargs = self._build_rql_request(base_rql, etype, **kwargs) return self.execute(rql, kwargs)
[docs] def ensure_ro_rql(self, rql): """raise an exception if the given rql is not a select query""" first = rql.split(None, 1)[0].lower() if first in ("insert", "set", "delete"): raise Unauthorized(self._("only select queries are authorized"))
# url generation methods ##################################################
[docs] def build_url(self, *args, **kwargs): """return an absolute URL using params dictionary key/values as URL parameters. Values are automatically URL quoted, and the publishing method to use may be specified or will be guessed. raises :exc:`ValueError` if None is found in arguments """ # use *args since we don't want first argument to be "anonymous" to # avoid potential clash with kwargs method = None if args: assert len(args) == 1, "only 0 or 1 non-named-argument expected" method = args[0] if method is None: method = "view" # XXX I (adim) think that if method is passed explicitly, we should # not try to process it and directly call req.build_url() base_url = kwargs.pop("base_url", None) if base_url is None: base_url = self.base_url() path = self.build_url_path(method, kwargs) if not kwargs: return "%s%s" % (base_url, path) return "%s%s?%s" % (base_url, path, self.build_url_params(**kwargs))
[docs] def build_url_path(self, method, kwargs): """return the "path" part of an URL""" if "_restpath" in kwargs: assert method == "view", repr(method) path = kwargs.pop("_restpath") else: path = method return path
[docs] def build_url_params(self, **kwargs): """return encoded params to incorporate them in a URL""" args = [] for param, values in kwargs.items(): if not isinstance(values, (list, tuple)): values = (values,) for value in values: assert value is not None args.append("%s=%s" % (param, self.url_quote(value))) return "&".join(args)
[docs] def url_quote(self, value, safe=""): """urllib.quote is not unicode safe, use this method to do the necessary encoding / decoding. Also it's designed to quote each part of a url path and so the '/' character will be encoded as well. """ return urlquote(str(value), safe=safe)
[docs] def url_unquote(self, quoted): """returns a unicode unquoted string decoding is based on `self.encoding` which is the encoding used in `url_quote` """ return urlunquote(quoted)
[docs] def url_parse_qsl(self, querystring): """return a list of (key, val) found in the url quoted query string""" for key, val in parse_qsl(querystring): yield key, val return
[docs] def rebuild_url(self, url, **newparams): """return the given url with newparams inserted. If any new params is already specified in the url, it's overriden by the new value newparams may only be mono-valued. """ schema, netloc, path, query, fragment = urlsplit(url) query = parse_qs(query) # sort for testing predictability for key, val in sorted(newparams.items()): query[key] = (self.url_quote(val),) query = "&".join( "%s=%s" % (param, value) for param, values in sorted(query.items()) for value in values ) return urlunsplit((schema, netloc, path, query, fragment))
# bound user related methods ############################################### @cached def user_data(self): """returns a dictionary with this user's information. The keys are : login The user login name The user name, returned by user.name() email The user principal email """ userinfo = {} user = self.user userinfo["login"] = user.login userinfo["name"] = user.name() userinfo["email"] = user.cw_adapt_to("IEmailable").get_email() return userinfo # formating methods #######################################################
[docs] def view( self, __vid, rset=None, __fallback_oid=None, __registry="views", initargs=None, w=None, **kwargs, ): """Select object with the given id (`__oid`) then render it. If the object isn't selectable, try to select fallback object if `__fallback_oid` is specified. If specified `initargs` is expected to be a dictionary containing arguments that should be given to selection (hence to object's __init__ as well), but not to render(). Other arbitrary keyword arguments will be given to selection *and* to render(), and so should be handled by object's call or cell_call method.. """ if initargs is None: initargs = kwargs else: initargs.update(kwargs) try: view = self.vreg[__registry].select(__vid, self, rset=rset, **initargs) except NoSelectableObject: if __fallback_oid is None: raise view = self.vreg[__registry].select( __fallback_oid, self, rset=rset, **initargs ) return view.render(w=w, **kwargs)
[docs] def printable_value( self, attrtype, value, props=None, displaytime=True, formatters=uilib.PRINTERS ): """return a displayablye value (i.e. unicode string)""" if value is None: return "" try: as_string = formatters[attrtype] except KeyError: self.error("given bad attrtype %s", attrtype) return str(value) return as_string(value, self, props, displaytime)
[docs] def format_date(self, date, date_format=None, time=False): """return a string for a date time according to instance's configuration """ if date is not None: if date_format is None: if time: date_format = self.property_value("ui.datetime-format") else: date_format = self.property_value("ui.date-format") return ustrftime(date, date_format) return ""
[docs] def format_time(self, time): """return a string for a time according to instance's configuration """ if time is not None: return ustrftime(time, self.property_value("ui.time-format")) return ""
[docs] def format_float(self, num): """return a string for floating point number according to instance's configuration """ if num is not None: return self.property_value("ui.float-format") % num return ""
[docs] def parse_datetime(self, value, etype="Datetime"): """get a datetime or time from a string (according to etype) Datetime formatted as Date are accepted """ assert etype in ("Datetime", "Date", "Time"), etype # XXX raise proper validation error if etype == "Datetime": format = self.property_value("ui.datetime-format") try: return todatetime(datetime.strptime(value, format)) except ValueError: pass elif etype == "Time": format = self.property_value("ui.time-format") try: # (adim) I can't find a way to parse a time with a custom format date = datetime.strptime(value, format) # this returns a datetime return time(date.hour, date.minute, date.second) except ValueError: raise ValueError( self._("can't parse %(value)r (expected %(format)s)") % {"value": value, "format": format} ) try: format = self.property_value("ui.date-format") dt = datetime.strptime(value, format) if etype == "Datetime": return todatetime(dt) return todate(dt) except ValueError: raise ValueError( self._("can't parse %(value)r (expected %(format)s)") % {"value": value, "format": format} )
[docs] def base_url(self): """Return the root url of the instance.""" url = self.vreg.config["base-url"] return url if url is None else url.rstrip("/") + "/"