# copyright 2017-2024 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# copyright 2014-2016 UNLISH S.A.S. (Montpellier, FRANCE), all rights reserved.
#
# contact https://www.logilab.fr/ -- mailto:contact@logilab.fr
#
# This file is part of CubicWeb.
#
# CubicWeb is free software: you can redistribute it and/or modify it under the
# terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 2.1 of the License, or (at your option)
# any later version.
#
# CubicWeb is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with CubicWeb. If not, see <https://www.gnu.org/licenses/>.
"""Backward compatibility layer for CubicWeb to run as a Pyramid application."""
import os
import inspect
import itertools
import logging
import sys
import traceback
import warnings
from cgi import FieldStorage
from contextlib import contextmanager
from datetime import datetime
from urllib.parse import quote
import pyramid
from pyramid import httpexceptions
from pyramid import security
from pyramid import tweens
from pyramid.csrf import (
check_csrf_token,
check_csrf_origin,
get_csrf_token,
new_csrf_token,
)
from pyramid.httpexceptions import HTTPSeeOther, HTTPException
from pyramid.settings import asbool
import yams
import cubicweb
from cubicweb.debug import emit_to_debug_channel
from rql import BadRQLQuery
from cubicweb.devtools import BASE_URL
from cubicweb_web.application import CubicWebPublisher
from cubicweb_web.request import CubicWebRequestBase
from cubicweb_web.view import inject_html_generating_call_on_w
from cubicweb_web._exceptions import (
PublishException,
LogOut,
Redirect,
RequestError,
RemoteCallFailed,
NotFound,
)
log = logging.getLogger(__name__)
NO_SESSION_ERROR_MSG = (
"No session factory registered, you can use pyramid_session_redis or "
"include in pyramid.ini cubicweb.pyramid.session (which is not recommended "
"for production instance. You can also see the Sessions chapter of the Pyramid documentation, "
"if you want to define your own session factory."
)
[docs]class PyramidSessionHandler:
"""A CW Session handler that rely on the pyramid API to fetch the needed
informations.
It implements the :class:`cubicweb.web.application.CookieSessionHandler`
API.
"""
def __init__(self, appli):
self.appli = appli
def get_session(self, req):
return req._request.cw_session
def logout(self, req, goto_url):
raise LogOut(url=goto_url)
def _cw_headers(request):
return itertools.chain(
*[
[(k, item) for item in v]
for k, v in request.cw_request.headers_out.getAllRawHeaders()
]
)
@contextmanager
def cw_to_pyramid(request):
"""Context manager to wrap a call to the cubicweb API.
All CW exceptions will be transformed into their pyramid equivalent.
When needed, some CW reponse bits may be converted too (mainly headers)"""
try:
yield
except Redirect as ex:
assert 300 <= ex.status < 400
raise httpexceptions.status_map[ex.status](
ex.location, headers=_cw_headers(request)
)
except cubicweb.Unauthorized:
raise httpexceptions.HTTPForbidden(
request.cw_request._(
"You're not authorized to access this page. "
"If you think you should, please contact the site "
"administrator."
),
headers=_cw_headers(request),
)
except cubicweb.Forbidden:
raise httpexceptions.HTTPForbidden(
request.cw_request._(
"This action is forbidden. "
"If you think it should be allowed, please contact the site "
"administrator."
),
headers=_cw_headers(request),
)
except (BadRQLQuery, RequestError):
raise
[docs]class CubicWebPyramidHandler:
"""A Pyramid request handler that rely on a cubicweb instance to do the
whole job
:param appli: A CubicWeb 'Application' object.
"""
def __init__(self, appli, cubicweb_config):
self.appli = appli
if cubicweb_config["query-log-file"]:
self._query_log = open(cubicweb_config["query-log-file"], "a")
self._write_to_log = self._write_to_log_file
else:
self._write_to_log = self._write_to_logger
def _write_to_log_file(self, text):
self._query_log.write(text)
self._query_log.flush()
def _write_to_logger(self, text):
log.info(text)
[docs] def __call__(self, request):
"""
Handler that mimics what CubicWebPublisher.main_handle_request and
CubicWebPublisher.core_handle do
"""
cubicweb_request = request.cw_request
cubicweb_registry = request.registry["cubicweb.registry"]
try:
content = None
try:
with cw_to_pyramid(request):
controller_id, rset = self.appli.url_resolver.process(
cubicweb_request, cubicweb_request.path
)
try:
controller = cubicweb_registry["controllers"].select(
controller_id, cubicweb_request, appli=self.appli
)
except cubicweb.NoSelectableObject as ex:
warning_message = (
"failed to select a controller for this request "
f"{cubicweb_request.path} {request.method}."
)
if ex.objects:
candidates = "\n * ".join(repr(x) for x in ex.objects)
warning_message += (
" Here were the candidates controllers (but none matched): \n * "
f"{candidates}"
)
else:
warning_message += " There was no candidate controller."
log.warning(warning_message)
raise httpexceptions.HTTPBadRequest(
cubicweb_request._(
"couldn't handle this request as it is either badly formed or is "
"lacking the correct authorizations"
)
)
get_csrf_token(
request
) # ensure that we have a CSRF token on all requests
safe_methods = frozenset(["GET", "HEAD", "OPTIONS", "TRACE"])
if request.method not in safe_methods and getattr(
controller, "require_csrf", True
):
check_csrf_token(request)
check_csrf_origin(request)
self._write_to_log(
"REQUEST [%s] '%s' selected controller %s at %s:%s"
% (
controller_id,
cubicweb_request.path,
controller,
inspect.getsourcefile(controller.__class__),
inspect.getsourcelines(controller.__class__)[1],
)
)
emit_to_debug_channel(
"vreg",
{
"vreg": cubicweb_registry,
},
)
emit_to_debug_channel(
"controller",
{
"kind": controller_id,
"request": cubicweb_request,
"path": cubicweb_request.path,
"controller": controller,
"config": self.appli.repo.config,
},
)
cubicweb_request.update_search_state()
content = controller.publish(rset=rset)
# XXX this auto-commit should be handled by the cw_request
# cleanup or the pyramid transaction manager.
# It is kept here to have the ValidationError handling bw
# compatible
if cubicweb_request.cnx:
transaction_uuid = cubicweb_request.cnx.commit()
# commited = True
if transaction_uuid is not None:
cubicweb_request.data["last_undoable_transaction"] = (
transaction_uuid
)
except yams.ValidationError as ex:
# XXX The validation_error_handler implementation is light, we
# should redo it better in cw_to_pyramid, so it can be properly
# handled when raised from a cubicweb view.
# BUT the real handling of validation errors should be done
# earlier in the controllers, not here. In the end, the
# ValidationError should never by handled here.
content = self.appli.validation_error_handler(cubicweb_request, ex)
except RemoteCallFailed as exception:
raise pyramid.httpexceptions.exception_response(
exception.status,
body=exception.dumps(),
content_type="application/json",
charset="utf-8",
)
if content is not None:
request.response.body = content
except LogOut as ex:
# The actual 'logging out' logic should be in separated function
# that is accessible by the pyramid views
headers = security.forget(request)
new_csrf_token(request)
raise HTTPSeeOther(ex.url, headers=headers)
except cubicweb.AuthenticationError:
# Will occur upon access to cubicweb_request.cnx which is a
# cubicweb.dbapi._NeedAuthAccessMock.
if not content:
params = ""
new_path = request.path_qs
if new_path != "/":
params = f"?postlogin_path={quote(new_path)}"
raise HTTPSeeOther(f"/login{params}")
except NotFound as ex:
if not cubicweb_request.cnx:
new_path = request.path_qs
params = f"?postlogin_path={quote(new_path)}" if new_path != "/" else ""
raise HTTPSeeOther(f"/login{params}")
view = cubicweb_registry["views"].select("404", cubicweb_request)
content = cubicweb_registry["views"].main_template(
cubicweb_request, view=view
)
request.response.status_code = ex.status
request.response.body = content
finally:
# XXX CubicWebPyramidRequest.headers_out should
# access directly the pyramid response headers.
request.response.headers.clear()
for (
header_name,
header_values,
) in cubicweb_request.headers_out.getAllRawHeaders():
for item in header_values:
request.response.headers.add(header_name, item)
return request.response
def error_handler(self, exc, request):
req = request.cw_request
if isinstance(exc, httpexceptions.HTTPException):
request.response = exc
elif isinstance(exc, PublishException) and exc.status is not None:
request.response = httpexceptions.exception_response(exc.status)
else:
request.response = httpexceptions.HTTPInternalServerError()
request.response.cache_control = "no-cache"
vreg = request.registry["cubicweb.registry"]
excinfo = sys.exc_info()
req.reset_message()
if req.ajax_request:
content = self.appli.ajax_error_handler(req, exc)
else:
try:
req.data["ex"] = exc
req.data["excinfo"] = excinfo
errview = vreg["views"].select("error", req)
template = self.appli.main_template_id(req)
content = vreg["views"].main_template(req, template, view=errview)
except Exception:
content = vreg["views"].main_template(req, "error-template")
log.exception(exc)
request.response.body = content
return request.response
class CubicWebPyramidRequest(CubicWebRequestBase):
"""A CubicWeb request that only wraps a pyramid request.
:param request: A pyramid request
"""
def __init__(self, request):
self._request = request
vreg = request.registry["cubicweb.registry"]
base_url = vreg.config["base-url"]
if "PYTEST_CURRENT_TEST" in os.environ:
assert request.url.startswith(base_url), (
f"Error: request {request} must start with the BASE_URL {BASE_URL} when running "
"tests. Please prefix it with cubicweb.devtools.BASE_URL"
)
self.path = request.upath_info
post = request.params.mixed()
headers_in = request.headers
super().__init__(vreg, post, headers=headers_in)
self.content = request.body_file_seekable
def __repr__(self):
"""
Will render something like:
<cubicweb_web.bwcompat.CubicWebPyramidRequest object at 0x.... GET https://example.com/path/>
"""
return f"{super().__repr__()[:-1]} {self._request.method} {self._request.url}>"
def setup_params(self, params):
self._uncleaned_form = {}
for param, val in params.items():
if param in self.no_script_form_params and val:
val = self.no_script_form_param(param, val)
if isinstance(val, FieldStorage) and val.file:
val = (val.filename, val.file)
if param == "_cwmsgid":
self.set_message_id(val)
else:
self._uncleaned_form[param] = val
@property
def form(self):
class_name = self.__class__.__name__
warnings.warn(
f"{class_name}.form is deprecated and will be remove, uses "
f"{class_name}.get_cleaned_form using a form_validator instead"
)
return self._uncleaned_form
@form.setter
def form(self, new_form):
class_name = self.__class__.__name__
warnings.warn(
f"{class_name}.form is deprecated and will be remove, you won't be able to set it "
"explictely, better options for specific cases where modifying it is needed"
)
self._uncleaned_form = new_form
def get_cleaned_form(self, form_validator):
return form_validator.validate(self._uncleaned_form)
def relative_path(self, includeparams=True):
path = self._request.path_info[1:]
if includeparams and self._request.query_string:
return f"{path}?{self._request.query_string}"
return path
def instance_uri(self):
return self._request.application_url
def get_full_path(self):
path = self._request.path
if self._request.query_string:
return f"{path}?{self._request.query_string}"
return path
def http_method(self):
return self._request.method
def _set_status_out(self, value):
self._request.response.status_int = value
def _get_status_out(self):
return self._request.response.status_int
status_out = property(_get_status_out, _set_status_out)
@property
def message(self):
"""Returns a '<br>' joined list of the cubicweb current message and the
default pyramid flash queue messages.
"""
try:
return "\n<br>\n".join(
self._request.session.pop_flash()
+ self._request.session.pop_flash("cubicweb")
)
except AttributeError:
log.exception(NO_SESSION_ERROR_MSG)
return ""
def set_message(self, msg):
try:
self.reset_message()
self._request.session.flash(msg, "cubicweb")
except AttributeError:
log.exception(NO_SESSION_ERROR_MSG)
def set_message_id(self, msgid):
try:
self.reset_message()
self.set_message(self._request.session.pop(msgid, ""))
except AttributeError:
log.exception(NO_SESSION_ERROR_MSG)
def reset_message(self):
try:
self._request.session.pop_flash("cubicweb")
except AttributeError:
log.exception(NO_SESSION_ERROR_MSG)
def _cw_request(request):
"""Obtains a CubicWeb request wrapper for the pyramid request.
:param request: A pyramid request
:return: A CubicWeb request
:returns type: :class:`CubicWebPyramidRequest`
Not meant for direct use, use ``request.cw_request`` instead.
"""
req = CubicWebPyramidRequest(request)
cnx = request.cw_cnx
if cnx is not None:
req.set_cnx(request.cw_cnx)
return req
[docs]class TweenHandler:
"""A Pyramid tween handler that submit unhandled requests to a Cubicweb
handler.
The CubicWeb handler to use is expected to be in the pyramid registry, at
key ``'cubicweb.handler'``.
"""
def __init__(self, handler, registry):
self.handler = handler
self.cwhandler = registry["cubicweb.handler"]
def __call__(self, request):
view_kind = "pyramid"
now = str(datetime.now()).split(".")[0]
try:
try:
response = self.handler(request)
except httpexceptions.HTTPNotFound:
view_kind = "cubicweb"
response = self.cwhandler(request)
print(
f"{now} - ({view_kind} view) "
f'"{request.method} {request.path}'
f' {request.http_version}" '
f"{response.status_code} {len(response.body)}"
)
except HTTPException as e:
print(
f'{now} - ({view_kind} view) "{request.method} '
f'{request.path} {request.http_version}" '
f"{e.code} {len(e.body)}"
)
# we don't want a traceback for a redirection, only for errors or others
if not (300 <= e.code < 400):
traceback.print_exc()
raise
return response
def render_view(request, vid, **kwargs):
"""Helper function to render a CubicWeb view.
:param request: A pyramid request
:param vid: A CubicWeb view id
:param kwargs: Keyword arguments to select and instanciate the view
:returns: The rendered view content
"""
vreg = request.registry["cubicweb.registry"]
# XXX The select() function could, know how to handle a pyramid
# request, and feed it directly to the views that supports it.
# On the other hand, we could refine the View concept and decide it works
# with a cnx, and never with a WebRequest
with cw_to_pyramid(request):
view = vreg["views"].select(vid, request.cw_request, **kwargs)
view.set_stream()
view.render()
return view._stream.getvalue()
[docs]def includeme(config):
"""Set up a tween app that will handle the request if the main application
raises a HTTPNotFound exception.
This is to keep legacy compatibility for cubes that makes use of the
cubicweb urlresolvers.
It provides, for now, support for cubicweb controllers, but this feature
will be reimplemented separatly in a less compatible way.
It is automatically included by the configuration system, but can be
disabled in the :ref:`pyramid_settings`:
.. code-block:: ini
cubicweb.bwcompat = no
"""
cwconfig = config.registry["cubicweb.config"]
repository = config.registry["cubicweb.repository"]
config.add_request_method(_cw_request, name="cw_request", property=True, reify=True)
cwappli = CubicWebPublisher(
repository, cwconfig, session_handler_fact=PyramidSessionHandler
)
cwhandler = CubicWebPyramidHandler(cwappli, cwconfig)
config.registry["cubicweb.appli"] = cwappli
config.registry["cubicweb.handler"] = cwhandler
config.add_tween("cubicweb_web.bwcompat.TweenHandler", under=tweens.EXCVIEW)
if asbool(config.registry.settings.get("cubicweb.bwcompat.errorhandler", True)):
config.add_view(cwhandler.error_handler, context=Exception)
# XXX why do i need this?
config.add_view(cwhandler.error_handler, context=httpexceptions.HTTPForbidden)
if cwconfig.in_debug_mode:
# this is for injecting those into generated html:
# > cubicweb-generated-by="module.Class" cubicweb-from-source="/path/to/file.py:42"
inject_html_generating_call_on_w()