Source code for cubicweb.pyramid.core

# copyright 2017 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# copyright 2014-2016 UNLISH S.A.S. (Montpellier, FRANCE), all rights reserved.
#
# contact https://www.logilab.fr/ -- mailto:contact@logilab.fr
#
# This file is part of CubicWeb.
#
# CubicWeb is free software: you can redistribute it and/or modify it under the
# terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 2.1 of the License, or (at your option)
# any later version.
#
# CubicWeb is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with CubicWeb.  If not, see <https://www.gnu.org/licenses/>.

"""Binding of CubicWeb connection to Pyramid request."""

import itertools
import logging
import warnings
from cgi import FieldStorage
from contextlib import contextmanager

import rql
from pyramid import httpexceptions

import cubicweb_web
from cubicweb.pyramid import tools
from cubicweb.server import session as cwsession
from cubicweb_web.request import CubicWebRequestBase

log = logging.getLogger(__name__)


class Connection(cwsession.Connection):
    """A specialised Connection that access the session data through a
    property.

    This behavior makes sure the actual session data is not loaded until
    actually accessed.
    """

    def __init__(self, session, *args, **kw):
        super().__init__(session._repo, session._user, *args, **kw)
        self.session = session
        self.lang = session._cached_lang

    def _get_session_data(self):
        return self.session.data

    def _set_session_data(self, data):
        pass

    _session_data = property(_get_session_data, _set_session_data)


class Session:
    """A Session that access the session data through a property.

    Along with :class:`Connection`, it avoid any load of the pyramid session
    data until it is actually accessed.
    """

    def __init__(self, pyramid_request, user, repo):
        self._pyramid_request = pyramid_request
        self._user = user
        self._repo = repo

    @property
    def anonymous_session(self):
        # XXX for now, anonymous_user only exists in webconfig (and testconfig).
        # It will only be present inside all-in-one instance.
        # there is plan to move it down to global config.
        if not hasattr(self._repo.config, "anonymous_user"):
            # not a web or test config, no anonymous user
            return False
        return self._user.login == self._repo.config.anonymous_user()[0]

    def get_data(self):
        if not getattr(self, "_protect_data_access", False):
            self._data_accessed = True
            return self._pyramid_request.session

    def set_data(self, data):
        if getattr(self, "_data_accessed", False):
            self._pyramid_request.session.clear()
            self._pyramid_request.session.update(data)

    data = property(get_data, set_data)

    def new_cnx(self):
        self._protect_data_access = True
        try:
            return Connection(self)
        finally:
            self._protect_data_access = False


def cw_headers(request):
    return itertools.chain(
        *[
            [(k, item) for item in v]
            for k, v in request.cw_request.headers_out.getAllRawHeaders()
        ]
    )


[docs]@contextmanager def cw_to_pyramid(request): """Context manager to wrap a call to the cubicweb API. All CW exceptions will be transformed into their pyramid equivalent. When needed, some CW reponse bits may be converted too (mainly headers)""" try: yield except cubicweb_web.Redirect as ex: assert 300 <= ex.status < 400 raise httpexceptions.status_map[ex.status]( ex.location, headers=cw_headers(request) ) except cubicweb_web.Unauthorized: raise httpexceptions.HTTPForbidden( request.cw_request._( "You're not authorized to access this page. " "If you think you should, please contact the site " "administrator." ), headers=cw_headers(request), ) except cubicweb_web.Forbidden: raise httpexceptions.HTTPForbidden( request.cw_request._( "This action is forbidden. " "If you think it should be allowed, please contact the site " "administrator." ), headers=cw_headers(request), ) except (rql.BadRQLQuery, cubicweb_web.RequestError): raise
[docs]class CubicWebPyramidRequest(CubicWebRequestBase): """A CubicWeb request that only wraps a pyramid request. :param request: A pyramid request """ def __init__(self, request): self._request = request self.path = request.upath_info vreg = request.registry["cubicweb.registry"] post = request.params.mixed() headers_in = request.headers super().__init__(vreg, post, headers=headers_in) self.content = request.body_file_seekable
[docs] def setup_params(self, params): self._uncleaned_form = {} for param, val in params.items(): if param in self.no_script_form_params and val: val = self.no_script_form_param(param, val) if isinstance(val, FieldStorage) and val.file: val = (val.filename, val.file) if param == "_cwmsgid": self.set_message_id(val) else: self._uncleaned_form[param] = val
@property def form(self): class_name = self.__class__.__name__ warnings.warn( f"{class_name}.form is deprecated and will be remove, uses " f"{class_name}.get_cleaned_form using a form_validator instead" ) return self._uncleaned_form @form.setter def form(self, new_form): class_name = self.__class__.__name__ warnings.warn( f"{class_name}.form is deprecated and will be remove, you won't be able to set it " "explictely, better options for specific cases where modifying it is needed" ) self._uncleaned_form = new_form def get_cleaned_form(self, form_validator): return form_validator.validate(self._uncleaned_form)
[docs] def relative_path(self, includeparams=True): path = self._request.path_info[1:] if includeparams and self._request.query_string: return f"{path}?{self._request.query_string}" return path
def instance_uri(self): return self._request.application_url def get_full_path(self): path = self._request.path if self._request.query_string: return f"{path}?{self._request.query_string}" return path
[docs] def http_method(self): return self._request.method
def _set_status_out(self, value): self._request.response.status_int = value def _get_status_out(self): return self._request.response.status_int status_out = property(_get_status_out, _set_status_out) @property def message(self): """Returns a '<br>' joined list of the cubicweb current message and the default pyramid flash queue messages. """ return "\n<br>\n".join( self._request.session.pop_flash() + self._request.session.pop_flash("cubicweb") ) def set_message(self, msg): self.reset_message() self._request.session.flash(msg, "cubicweb") def set_message_id(self, msgid): self.reset_message() self.set_message(self._request.session.pop(msgid, "")) def reset_message(self): self._request.session.pop_flash("cubicweb")
[docs]def render_view(request, vid, **kwargs): """Helper function to render a CubicWeb view. :param request: A pyramid request :param vid: A CubicWeb view id :param kwargs: Keyword arguments to select and instanciate the view :returns: The rendered view content """ vreg = request.registry["cubicweb.registry"] # XXX The select() function could, know how to handle a pyramid # request, and feed it directly to the views that supports it. # On the other hand, we could refine the View concept and decide it works # with a cnx, and never with a WebRequest with cw_to_pyramid(request): view = vreg["views"].select(vid, request.cw_request, **kwargs) view.set_stream() view.render() return view._stream.getvalue()
[docs]def _cw_cnx(request): """Obtains a cw session from a pyramid request The connection will be commited or rolled-back in a request finish callback (this is temporary, we should make use of the transaction manager in a later version). Not meant for direct use, use ``request.cw_cnx`` instead. :param request: A pyramid request :returns type: :class:`cubicweb.server.session.Connection` """ session = request.cw_session if session is None: return None cnx = session.new_cnx() def commit_state(cnx): return cnx.commit_state def cleanup(request): try: if request.exception is not None and not isinstance( request.exception, (httpexceptions.HTTPSuccessful, httpexceptions.HTTPRedirection), ): cnx.rollback() elif commit_state(cnx) == "uncommitable": cnx.rollback() else: cnx.commit() finally: cnx.__exit__(None, None, None) request.add_finished_callback(cleanup) cnx.__enter__() return cnx
[docs]def repo_connect(request, repo, eid): """A lightweight version of :meth:`cubicweb.server.repository.Repository.connect` that does not keep track of opened sessions, removing the need of closing them""" user, lang = tools.cached_build_user(repo, eid) session = Session(request, user, repo) session._cached_lang = lang tools.cnx_attach_entity(session, user) return session
[docs]def _cw_session(request): """Obtains a cw session from a pyramid request :param request: A pyramid request :returns type: :class:`cubicweb.server.session.Session` Not meant for direct use, use ``request.cw_session`` instead. """ repo = request.registry["cubicweb.repository"] if not request.authenticated_userid: eid = request.registry.get("cubicweb.anonymous_eid") if eid is None: return None session = repo_connect(request, repo, eid=eid) else: session = request._cw_cached_session return session
[docs]def _cw_request(request): """Obtains a CubicWeb request wrapper for the pyramid request. :param request: A pyramid request :return: A CubicWeb request :returns type: :class:`CubicWebPyramidRequest` Not meant for direct use, use ``request.cw_request`` instead. """ req = CubicWebPyramidRequest(request) cnx = request.cw_cnx if cnx is not None: req.set_cnx(request.cw_cnx) return req
[docs]def get_principals(login, request): """Returns the group names of the authenticated user. This function is meant to be used as an authentication policy callback. It also pre-open the cubicweb session and put it in request._cw_cached_session for later usage by :func:`_cw_session`. .. note:: If the default authentication policy is not used, make sure this function gets called by the active authentication policy. :param login: A cubicweb user eid :param request: A pyramid request :returns: A list of group names """ repo = request.registry["cubicweb.repository"] try: session = repo_connect(request, repo, eid=login) request._cw_cached_session = session except Exception: log.exception("Failed") raise with session.new_cnx() as cnx: with cnx.security_enabled(read=False): return { group for group, in cnx.execute( "Any GN WHERE U in_group G, G name GN, U eid %(userid)s", {"userid": login}, ) }
[docs]def includeme(config): """Enables the core features of Pyramid CubicWeb. Automatically called by the 'pyramid' command, or via ``config.include('cubicweb.pyramid.code')``. In the later case, the following registry entries must be defined first: 'cubicweb.config' A cubicweb 'config' instance. 'cubicweb.repository' The correponding cubicweb repository. 'cubicweb.registry' The vreg. """ repo = config.registry["cubicweb.repository"] with repo.internal_cnx() as cnx: login = config.registry["cubicweb.config"].anonymous_user()[0] if login is not None: config.registry["cubicweb.anonymous_eid"] = ( cnx.find("CWUser", login=login).one().eid ) config.add_request_method(_cw_session, name="cw_session", property=True, reify=True) config.add_request_method(_cw_cnx, name="cw_cnx", property=True, reify=True) config.add_request_method(_cw_request, name="cw_request", property=True, reify=True) cwcfg = config.registry["cubicweb.config"] for cube in cwcfg.cubes(): pkgname = f"cubicweb_{cube}" mod = __import__(pkgname) if hasattr(mod, "includeme"): config.include(pkgname)