# copyright 2017 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# copyright 2014-2016 UNLISH S.A.S. (Montpellier, FRANCE), all rights reserved.
#
# contact https://www.logilab.fr/ -- mailto:contact@logilab.fr
#
# This file is part of CubicWeb.
#
# CubicWeb is free software: you can redistribute it and/or modify it under the
# terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 2.1 of the License, or (at your option)
# any later version.
#
# CubicWeb is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with CubicWeb. If not, see <https://www.gnu.org/licenses/>.
"""Binding of CubicWeb connection to Pyramid request."""
import itertools
import logging
import warnings
from cgi import FieldStorage
from contextlib import contextmanager
import rql
from pyramid import httpexceptions
import cubicweb_web
from cubicweb.pyramid import tools
from cubicweb.server import session as cwsession
from cubicweb_web.request import CubicWebRequestBase
log = logging.getLogger(__name__)
class Connection(cwsession.Connection):
"""A specialised Connection that access the session data through a
property.
This behavior makes sure the actual session data is not loaded until
actually accessed.
"""
def __init__(self, session, *args, **kw):
super().__init__(session._repo, session._user, *args, **kw)
self.session = session
self.lang = session._cached_lang
def _get_session_data(self):
return self.session.data
def _set_session_data(self, data):
pass
_session_data = property(_get_session_data, _set_session_data)
class Session:
"""A Session that access the session data through a property.
Along with :class:`Connection`, it avoid any load of the pyramid session
data until it is actually accessed.
"""
def __init__(self, pyramid_request, user, repo):
self._pyramid_request = pyramid_request
self._user = user
self._repo = repo
@property
def anonymous_session(self):
# XXX for now, anonymous_user only exists in webconfig (and testconfig).
# It will only be present inside all-in-one instance.
# there is plan to move it down to global config.
if not hasattr(self._repo.config, "anonymous_user"):
# not a web or test config, no anonymous user
return False
return self._user.login == self._repo.config.anonymous_user()[0]
def get_data(self):
if not getattr(self, "_protect_data_access", False):
self._data_accessed = True
return self._pyramid_request.session
def set_data(self, data):
if getattr(self, "_data_accessed", False):
self._pyramid_request.session.clear()
self._pyramid_request.session.update(data)
data = property(get_data, set_data)
def new_cnx(self):
self._protect_data_access = True
try:
return Connection(self)
finally:
self._protect_data_access = False
def cw_headers(request):
return itertools.chain(
*[
[(k, item) for item in v]
for k, v in request.cw_request.headers_out.getAllRawHeaders()
]
)
[docs]@contextmanager
def cw_to_pyramid(request):
"""Context manager to wrap a call to the cubicweb API.
All CW exceptions will be transformed into their pyramid equivalent.
When needed, some CW reponse bits may be converted too (mainly headers)"""
try:
yield
except cubicweb_web.Redirect as ex:
assert 300 <= ex.status < 400
raise httpexceptions.status_map[ex.status](
ex.location, headers=cw_headers(request)
)
except cubicweb_web.Unauthorized:
raise httpexceptions.HTTPForbidden(
request.cw_request._(
"You're not authorized to access this page. "
"If you think you should, please contact the site "
"administrator."
),
headers=cw_headers(request),
)
except cubicweb_web.Forbidden:
raise httpexceptions.HTTPForbidden(
request.cw_request._(
"This action is forbidden. "
"If you think it should be allowed, please contact the site "
"administrator."
),
headers=cw_headers(request),
)
except (rql.BadRQLQuery, cubicweb_web.RequestError):
raise
[docs]class CubicWebPyramidRequest(CubicWebRequestBase):
"""A CubicWeb request that only wraps a pyramid request.
:param request: A pyramid request
"""
def __init__(self, request):
self._request = request
self.path = request.upath_info
vreg = request.registry["cubicweb.registry"]
post = request.params.mixed()
headers_in = request.headers
super().__init__(vreg, post, headers=headers_in)
self.content = request.body_file_seekable
[docs] def setup_params(self, params):
self._uncleaned_form = {}
for param, val in params.items():
if param in self.no_script_form_params and val:
val = self.no_script_form_param(param, val)
if isinstance(val, FieldStorage) and val.file:
val = (val.filename, val.file)
if param == "_cwmsgid":
self.set_message_id(val)
else:
self._uncleaned_form[param] = val
@property
def form(self):
class_name = self.__class__.__name__
warnings.warn(
f"{class_name}.form is deprecated and will be remove, uses "
f"{class_name}.get_cleaned_form using a form_validator instead"
)
return self._uncleaned_form
@form.setter
def form(self, new_form):
class_name = self.__class__.__name__
warnings.warn(
f"{class_name}.form is deprecated and will be remove, you won't be able to set it "
"explictely, better options for specific cases where modifying it is needed"
)
self._uncleaned_form = new_form
def get_cleaned_form(self, form_validator):
return form_validator.validate(self._uncleaned_form)
[docs] def relative_path(self, includeparams=True):
path = self._request.path_info[1:]
if includeparams and self._request.query_string:
return f"{path}?{self._request.query_string}"
return path
def instance_uri(self):
return self._request.application_url
def get_full_path(self):
path = self._request.path
if self._request.query_string:
return f"{path}?{self._request.query_string}"
return path
[docs] def http_method(self):
return self._request.method
def _set_status_out(self, value):
self._request.response.status_int = value
def _get_status_out(self):
return self._request.response.status_int
status_out = property(_get_status_out, _set_status_out)
@property
def message(self):
"""Returns a '<br>' joined list of the cubicweb current message and the
default pyramid flash queue messages.
"""
return "\n<br>\n".join(
self._request.session.pop_flash()
+ self._request.session.pop_flash("cubicweb")
)
def set_message(self, msg):
self.reset_message()
self._request.session.flash(msg, "cubicweb")
def set_message_id(self, msgid):
self.reset_message()
self.set_message(self._request.session.pop(msgid, ""))
def reset_message(self):
self._request.session.pop_flash("cubicweb")
[docs]def render_view(request, vid, **kwargs):
"""Helper function to render a CubicWeb view.
:param request: A pyramid request
:param vid: A CubicWeb view id
:param kwargs: Keyword arguments to select and instanciate the view
:returns: The rendered view content
"""
vreg = request.registry["cubicweb.registry"]
# XXX The select() function could, know how to handle a pyramid
# request, and feed it directly to the views that supports it.
# On the other hand, we could refine the View concept and decide it works
# with a cnx, and never with a WebRequest
with cw_to_pyramid(request):
view = vreg["views"].select(vid, request.cw_request, **kwargs)
view.set_stream()
view.render()
return view._stream.getvalue()
[docs]def _cw_cnx(request):
"""Obtains a cw session from a pyramid request
The connection will be commited or rolled-back in a request finish
callback (this is temporary, we should make use of the transaction manager
in a later version).
Not meant for direct use, use ``request.cw_cnx`` instead.
:param request: A pyramid request
:returns type: :class:`cubicweb.server.session.Connection`
"""
session = request.cw_session
if session is None:
return None
cnx = session.new_cnx()
def commit_state(cnx):
return cnx.commit_state
def cleanup(request):
try:
if request.exception is not None and not isinstance(
request.exception,
(httpexceptions.HTTPSuccessful, httpexceptions.HTTPRedirection),
):
cnx.rollback()
elif commit_state(cnx) == "uncommitable":
cnx.rollback()
else:
cnx.commit()
finally:
cnx.__exit__(None, None, None)
request.add_finished_callback(cleanup)
cnx.__enter__()
return cnx
[docs]def repo_connect(request, repo, eid):
"""A lightweight version of
:meth:`cubicweb.server.repository.Repository.connect` that does not keep
track of opened sessions, removing the need of closing them"""
user, lang = tools.cached_build_user(repo, eid)
session = Session(request, user, repo)
session._cached_lang = lang
tools.cnx_attach_entity(session, user)
return session
[docs]def _cw_session(request):
"""Obtains a cw session from a pyramid request
:param request: A pyramid request
:returns type: :class:`cubicweb.server.session.Session`
Not meant for direct use, use ``request.cw_session`` instead.
"""
repo = request.registry["cubicweb.repository"]
if not request.authenticated_userid:
eid = request.registry.get("cubicweb.anonymous_eid")
if eid is None:
return None
session = repo_connect(request, repo, eid=eid)
else:
session = request._cw_cached_session
return session
[docs]def _cw_request(request):
"""Obtains a CubicWeb request wrapper for the pyramid request.
:param request: A pyramid request
:return: A CubicWeb request
:returns type: :class:`CubicWebPyramidRequest`
Not meant for direct use, use ``request.cw_request`` instead.
"""
req = CubicWebPyramidRequest(request)
cnx = request.cw_cnx
if cnx is not None:
req.set_cnx(request.cw_cnx)
return req
[docs]def get_principals(login, request):
"""Returns the group names of the authenticated user.
This function is meant to be used as an authentication policy callback.
It also pre-open the cubicweb session and put it in
request._cw_cached_session for later usage by :func:`_cw_session`.
.. note::
If the default authentication policy is not used, make sure this
function gets called by the active authentication policy.
:param login: A cubicweb user eid
:param request: A pyramid request
:returns: A list of group names
"""
repo = request.registry["cubicweb.repository"]
try:
session = repo_connect(request, repo, eid=login)
request._cw_cached_session = session
except Exception:
log.exception("Failed")
raise
with session.new_cnx() as cnx:
with cnx.security_enabled(read=False):
return {
group
for group, in cnx.execute(
"Any GN WHERE U in_group G, G name GN, U eid %(userid)s",
{"userid": login},
)
}
[docs]def includeme(config):
"""Enables the core features of Pyramid CubicWeb.
Automatically called by the 'pyramid' command, or via
``config.include('cubicweb.pyramid.code')``. In the later case,
the following registry entries must be defined first:
'cubicweb.config'
A cubicweb 'config' instance.
'cubicweb.repository'
The correponding cubicweb repository.
'cubicweb.registry'
The vreg.
"""
repo = config.registry["cubicweb.repository"]
with repo.internal_cnx() as cnx:
login = config.registry["cubicweb.config"].anonymous_user()[0]
if login is not None:
config.registry["cubicweb.anonymous_eid"] = (
cnx.find("CWUser", login=login).one().eid
)
config.add_request_method(_cw_session, name="cw_session", property=True, reify=True)
config.add_request_method(_cw_cnx, name="cw_cnx", property=True, reify=True)
config.add_request_method(_cw_request, name="cw_request", property=True, reify=True)
cwcfg = config.registry["cubicweb.config"]
for cube in cwcfg.cubes():
pkgname = f"cubicweb_{cube}"
mod = __import__(pkgname)
if hasattr(mod, "includeme"):
config.include(pkgname)