Commit 6faf4db5 authored by Laurent Peuch's avatar Laurent Peuch
Browse files

feat(security)!: protect against CSRF attacks using pyramid CookieCSRFStoragePolicy

This is very likely a breaking change and will force all code bases that
haven't already do so to migrate to pyramid as web server.

90% of the job has been porting and adapting a lot of tests code.

See pyramid documentation as a reference:

* https://docs.pylonsproject.org/projects/pyramid/en/2.0-branch/narr/security.html#preventing-cross-site-request-forgery-attacks
* https://docs.pylonsproject.org/projects/pyramid/en/2.0-branch/narr/security.html#checking-csrf-tokens-automatically
* https://docs.pylonsproject.org/projects/pyramid/en/latest/api/csrf.html#pyramid.csrf.CookieCSRFStoragePolicy
parent 605cf3b16bb6
......@@ -19,6 +19,9 @@
"""
from contextlib import contextmanager
from pyramid.csrf import LegacySessionCSRFStoragePolicy
from pyramid.testing import DummyRequest
from pyramid.interfaces import ICSRFStoragePolicy
from logilab.database import get_db_helper
......@@ -71,6 +74,11 @@ class FakeRequest(ConnectionCubicWebRequestBase):
self._url = "view?rql=Blop&vid=blop"
super(FakeRequest, self).__init__(*args, **kwargs)
self._session_data = {}
self._request = DummyRequest()
self._csrf_storage = LegacySessionCSRFStoragePolicy()
self._request.registry.registerUtility(
self._csrf_storage, provided=ICSRFStoragePolicy
)
def set_cookie(
self, name, value, maxage=300, expires=None, secure=False, httponly=False
......
......@@ -19,13 +19,8 @@
http server
"""
import http.client
import random
import threading
import socket
from urllib.parse import urlparse
from cubicweb.devtools.testlib import CubicWebTC
def get_available_port(ports_scan):
......@@ -56,128 +51,3 @@ def get_available_port(ports_scan):
raise RuntimeError(
"get_available_port([ports_range]) cannot find an available port"
)
class _CubicWebServerTC(CubicWebTC):
"""Base class for running a test web server."""
ports_range = range(7000, 8000)
def start_server(self):
raise NotImplementedError
def stop_server(self, timeout=15):
"""Stop the webserver, waiting for the thread to return"""
raise NotImplementedError
def web_login(self, user=None, passwd=None):
"""Log the current http session for the provided credential
If no user is provided, admin connection are used.
"""
if user is None:
user = self.admlogin
passwd = self.admpassword
if passwd is None:
passwd = user
response = self.web_get("login?__login=%s&__password=%s" % (user, passwd))
assert response.status == http.client.SEE_OTHER, response.status
self._ident_cookie = response.getheader("Set-Cookie")
assert self._ident_cookie
return True
def web_logout(self, user="admin", pwd=None):
"""Log out current http user"""
if self._ident_cookie is not None:
self.web_get("logout")
self._ident_cookie = None
def web_request(self, path="", method="GET", body=None, headers=None):
"""Return an http.client.HTTPResponse object for the specified path
Use available credential if available.
"""
if headers is None:
headers = {}
if self._ident_cookie is not None:
assert "Cookie" not in headers
headers["Cookie"] = self._ident_cookie
self._web_test_cnx.request(method, "/" + path, headers=headers, body=body)
response = self._web_test_cnx.getresponse()
response.body = response.read() # to chain request
response.read = lambda: response.body
return response
def web_get(self, path="", body=None, headers=None):
return self.web_request(path=path, body=body, headers=headers)
def setUp(self):
super(_CubicWebServerTC, self).setUp()
port = self.config["port"] or get_available_port(self.ports_range)
self.config.global_set_option("port", port) # force rewrite here
self.config.global_set_option("base-url", "http://127.0.0.1:%d/" % port)
# call load_configuration again to let the config reset its datadir_url
self.config.load_configuration()
self.start_server()
def tearDown(self):
self.stop_server()
super(_CubicWebServerTC, self).tearDown()
class CubicWebServerTC(_CubicWebServerTC):
def start_server(self):
from cubicweb.wsgi.handler import CubicWebWSGIApplication
from wsgiref import simple_server
import queue
config = self.config
port = config["port"] or 8080
interface = config["interface"]
handler_cls = simple_server.WSGIRequestHandler
app = CubicWebWSGIApplication(config)
start_flag = queue.Queue()
def run(config, *args, **kwargs):
try:
self.httpd = simple_server.WSGIServer((interface, port), handler_cls)
self.httpd.set_app(app)
except Exception as exc:
start_flag.put(False)
start_flag.put(exc)
raise
else:
start_flag.put(True)
try:
self.httpd.serve_forever()
finally:
self.httpd.server_close()
t = threading.Thread(
target=run,
name="cubicweb_test_web_server",
args=(self.config, True),
kwargs={"repo": self.repo},
)
self.web_thread = t
t.start()
flag = start_flag.get()
if not flag:
t.join()
self.fail(start_flag.get())
parseurl = urlparse(self.config["base-url"])
assert parseurl.port == self.config["port"], (
self.config["base-url"],
self.config["port"],
)
self._web_test_cnx = http.client.HTTPConnection(
parseurl.hostname, parseurl.port
)
self._ident_cookie = None
def stop_server(self, timeout=15):
if self._web_test_cnx is None:
self.web_logout()
self._web_test_cnx.close()
self.httpd.shutdown()
self.web_thread.join(timeout)
# copyright 2010 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact https://www.logilab.fr/ -- mailto:contact@logilab.fr
#
# This file is part of CubicWeb.
#
# CubicWeb is free software: you can redistribute it and/or modify it under the
# terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 2.1 of the License, or (at your option)
# any later version.
#
# CubicWeb is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with CubicWeb. If not, see <https://www.gnu.org/licenses/>.
"""unittest for cubicweb.devtools.httptest module"""
import http.client
from logilab.common.testlib import Tags
from cubicweb.devtools.httptest import CubicWebServerTC
class WsgiCWAnonTC(CubicWebServerTC):
def test_response(self):
try:
response = self.web_get()
except http.client.NotConnected as ex:
self.fail("Can't connection to test server: %s" % ex)
def test_response_anon(self):
response = self.web_get()
self.assertEqual(response.status, http.client.OK)
def test_base_url(self):
if self.config["base-url"] not in self.web_get().read().decode("ascii"):
self.fail("no mention of base url in retrieved page")
class WsgiCWIdentTC(CubicWebServerTC):
test_db_id = "httptest-cwident"
anonymous_allowed = False
tags = CubicWebServerTC.tags | Tags(("auth",))
def test_response_denied(self):
response = self.web_get()
self.assertEqual(response.status, http.client.FORBIDDEN)
def test_login(self):
response = self.web_get()
if response.status != http.client.FORBIDDEN:
self.skipTest(
'Already authenticated, "test_response_denied" must have failed'
)
# login
self.web_login(self.admlogin, self.admpassword)
response = self.web_get()
self.assertEqual(response.status, http.client.OK, response.body)
# logout
self.web_logout()
response = self.web_get()
self.assertEqual(response.status, http.client.FORBIDDEN, response.body)
if __name__ == "__main__":
from logilab.common.testlib import unittest_main
unittest_main()
import http.client
from logilab.common.testlib import Tags
from cubicweb.devtools.webtest import CubicWebTestTC
from cubicweb.pyramid.test import PyramidCWTest
class CWTTC(CubicWebTestTC):
class CWTTC(PyramidCWTest):
settings = {"cubicweb.bwcompat": True}
def test_response(self):
response = self.webapp.get("/")
self.assertEqual(200, response.status_int)
......@@ -14,10 +16,11 @@ class CWTTC(CubicWebTestTC):
self.fail("no mention of base url in retrieved page")
class CWTIdentTC(CubicWebTestTC):
class CWTIdentTC(PyramidCWTest):
settings = {"cubicweb.bwcompat": True}
test_db_id = "webtest-ident"
anonymous_allowed = False
tags = CubicWebTestTC.tags | Tags(("auth",))
tags = PyramidCWTest.tags | Tags(("auth",))
def test_reponse_denied(self):
res = self.webapp.get("/", expect_errors=True)
......
......@@ -33,6 +33,8 @@ from pyramid import tweens
from pyramid.httpexceptions import HTTPSeeOther, HTTPException
from pyramid import httpexceptions
from pyramid.settings import asbool
from pyramid.csrf import CookieCSRFStoragePolicy
from pyramid.csrf import check_csrf_token
import cubicweb
import cubicweb.web
......@@ -72,15 +74,32 @@ class CubicWebPyramidHandler(object):
:param appli: A CubicWeb 'Application' object.
"""
def __init__(self, appli):
def __init__(self, appli, cubicweb_config):
self.appli = appli
if cubicweb_config["query-log-file"]:
self._query_log = open(cubicweb_config["query-log-file"], "a")
self._write_to_log = self._write_to_log_file
else:
self._write_to_log = self._write_to_logger
def _write_to_log_file(self, text):
self._query_log.write(text)
self._query_log.flush()
def _write_to_logger(self, text):
log.info(text)
def __call__(self, request):
"""
Handler that mimics what CubicWebPublisher.main_handle_request and
CubicWebPublisher.core_handle do
"""
safe_methods = frozenset(["GET", "HEAD", "OPTIONS", "TRACE"])
if request.method not in safe_methods:
check_csrf_token(request)
req = request.cw_request
vreg = request.registry["cubicweb.registry"]
......@@ -94,13 +113,15 @@ class CubicWebPyramidHandler(object):
controller = vreg["controllers"].select(
ctrlid, req, appli=self.appli
)
log.info(
"REQUEST [%s] '%s' selected controller %s at %s:%s",
ctrlid,
req.path,
controller,
inspect.getsourcefile(controller.__class__),
inspect.getsourcelines(controller.__class__)[1],
self._write_to_log(
"REQUEST [%s] '%s' selected controller %s at %s:%s"
% (
ctrlid,
req.path,
controller,
inspect.getsourcefile(controller.__class__),
inspect.getsourcelines(controller.__class__)[1],
)
)
emit_to_debug_channel(
"vreg",
......@@ -275,12 +296,16 @@ def includeme(config):
cwappli = CubicWebPublisher(
repository, cwconfig, session_handler_fact=PyramidSessionHandler
)
cwhandler = CubicWebPyramidHandler(cwappli)
cwhandler = CubicWebPyramidHandler(cwappli, cwconfig)
config.registry["cubicweb.appli"] = cwappli
config.registry["cubicweb.handler"] = cwhandler
config.add_tween("cubicweb.pyramid.bwcompat.TweenHandler", under=tweens.EXCVIEW)
config.set_default_csrf_options(require_csrf=True)
config.set_csrf_storage_policy(CookieCSRFStoragePolicy())
if asbool(config.registry.settings.get("cubicweb.bwcompat.errorhandler", True)):
config.add_view(cwhandler.error_handler, context=Exception)
# XXX why do i need this?
......
......@@ -4,6 +4,109 @@ from pyramid.config import Configurator
from cubicweb.devtools.webtest import CubicWebTestTC
ACCEPTED_ORIGINS = ["example.com"]
class TestApp(webtest.TestApp):
def __init__(self, *args, admin_login, admin_password, **kwargs):
super().__init__(*args, **kwargs)
self.admin_login = admin_login
self.admin_password = admin_password
self._ident_cookie = None
def post(
self,
route,
params,
do_not_grab_the_crsf_token=False,
do_not_inject_origin=False,
form_with_csrf_token_url="/login",
form_number=0,
**kwargs,
):
if self._ident_cookie:
if "headers" in kwargs and "Cookie" not in kwargs["headers"]:
kwargs["headers"]["Cookie"] = self._ident_cookie
elif "headers" not in kwargs:
kwargs["headers"] = {"Cookie": self._ident_cookie}
if (
isinstance(params, dict)
and not do_not_grab_the_crsf_token
and "csrf_token" not in params
):
if form_with_csrf_token_url is None:
form_with_csrf_token_url = route
csrf_token = self.get_csrf_token(
form_with_csrf_token_url=form_with_csrf_token_url,
form_number=form_number,
)
# "application/json" doesn't submit token in form params but as header value
if kwargs.get("headers", {}).get("Content-Type") != "application/json":
if "csrf_token" not in params:
params["csrf_token"] = csrf_token
else:
if "headers" in kwargs:
kwargs["headers"]["X-CSRF-Token"] = csrf_token
else:
kwargs["headers"] = {"X-CSRF-Token": csrf_token}
if not do_not_inject_origin:
if "headers" in kwargs and "Origin" not in kwargs["headers"]:
kwargs["headers"]["Origin"] = "https://" + ACCEPTED_ORIGINS[0]
elif "headers" not in kwargs:
kwargs["headers"] = {"Origin": "https://" + ACCEPTED_ORIGINS[0]}
return super().post(route, params, **kwargs)
def get_csrf_token(self, form_with_csrf_token_url="/login", form_number=0):
get_form = self.get(form_with_csrf_token_url)
if "html" not in get_form.content_type:
raise Exception(
f"Error while trying to get the form at url {form_with_csrf_token_url}, it "
f"returns a response with a content type of {get_form.content_type} while a "
"content type with 'html' in it is expected.\n\n"
"Maybe you need to use this function parameters 'form_with_csrf_token_url' or "
"'do_not_grab_the_crsf_token'?"
)
form = get_form.forms[form_number]
return form.fields["csrf_token"][0].value
def get(self, route, *args, **kwargs):
if self._ident_cookie:
if "headers" in kwargs and "Cookie" not in kwargs["headers"]:
kwargs["headers"]["Cookie"] = self._ident_cookie
elif "headers" not in kwargs:
kwargs["headers"] = {"Cookie": self._ident_cookie}
return super().get(route, *args, **kwargs)
def login(self, user=None, password=None):
"""Log the current http session for the provided credential
If no user is provided, admin connection are used.
"""
if user is None:
user = self.admin_login
password = self.admin_password
if password is None:
password = user
response = self.post("/login", {"__login": user, "__password": password})
assert response.status_int == 303
self._ident_cookie = response.headers["Set-Cookie"]
assert self._ident_cookie
return response
class PyramidCWTest(CubicWebTestTC):
settings = {}
......@@ -20,14 +123,19 @@ class PyramidCWTest(CubicWebTestTC):
"cubicweb.session.secret": "test",
}
settings.update(self.settings)
config = Configurator(settings=settings)
config.registry["cubicweb.repository"] = self.repo
config.include("cubicweb.pyramid")
self.includeme(config)
self.pyr_registry = config.registry
self.webapp = webtest.TestApp(
config.make_wsgi_app(), extra_environ={"wsgi.url_scheme": "https"}
pyramid_config = Configurator(settings=settings)
pyramid_config.registry["cubicweb.repository"] = self.repo
pyramid_config.include("cubicweb.pyramid")
self.includeme(pyramid_config)
self.pyr_registry = pyramid_config.registry
self.webapp = TestApp(
pyramid_config.make_wsgi_app(),
extra_environ={"wsgi.url_scheme": "https"},
admin_login=self.admlogin,
admin_password=self.admpassword,
)
def includeme(self, config):
pass
config.registry.settings["pyramid.csrf_trusted_origins"] = ACCEPTED_ORIGINS
......@@ -59,7 +59,9 @@ class WSGIAppTest(PyramidCWTest):
def test_post(self):
self.webapp.post(
"/", params={"__login": self.admlogin, "__password": self.admpassword}
"/",
params={"__login": self.admlogin, "__password": self.admpassword},
form_with_csrf_token_url="/login",
)
def test_get_multiple_variables(self):
......
from cubicweb.pyramid.test import PyramidCWTest
class CSRFTest(PyramidCWTest):
settings = {"cubicweb.bwcompat": True}
def test_pyramid_route_csrf_token_is_present(self):
res = self.webapp.get("/login")
self.assertIn("csrf_token", res.form.fields)
def test_pyramid_route_csrf_bad_token(self):
self.webapp.post(
"/login",
{
"__login": self.admlogin,
"__password": self.admpassword,
"csrf_token": "bad_token",
},
status=400,
)
def test_pyramid_route_csrf_no_token(self):
self.webapp.post(
"/login",
{
"__login": self.admlogin,
"__password": self.admpassword,
"csrf_token": None,
},
status=400,
)
def test_pyramid_route_csrf_bad_origin(self):
self.webapp.post(
"/login",
{"__login": self.admlogin, "__password": self.admpassword},
headers={"Origin": "bad_origin.net"},
status=400,
)
def test_pyramid_route_csrf_no_origin(self):
self.webapp.post(
"/login",
{"__login": self.admlogin, "__password": self.admpassword},
do_not_inject_origin=True,
status=400,
)
def test_cubicweb_route_csrf_token_is_present(self):
self.webapp.post(
"/validateform",
{
"__form_id": "edition",
"__type:6": "CWUser",
"eid": 6,
"firstname-subject:6": "loutre",
},
)
def test_cubicweb_route_no_csrf_token(self):
self.webapp.post(
"/validateform",
{
"__form_id": "edition",
"__type:6": "CWUser",
"eid": 6,
"firstname-subject:6": "loutre",
"csrf_token": None,
},
status=400,
)
......@@ -39,37 +39,56 @@ class SessionSyncHoooksTC(PyramidCWTest):
config.add_route(view.__name__, "/" + view.__name__)
config.add_view(view, route_name=view.__name__)
super().includeme(config)
def setUp(self):
super(SessionSyncHoooksTC, self).setUp()
with self.admin_access.repo_cnx() as cnx:
self.admin_eid = cnx.user.eid
def test_sync_props(self):
# grab the crsf token before login
response = self.webapp.get("/login")
csrf_token = response.forms[0].fields["csrf_token"][0].value
# initialize a pyramid session using admin credentials
res = self.webapp.post(
"/login", {"__login": self.admlogin, "__password": self.admpassword}
)
self.assertEqual(res.status_int, 303)
# new property
res = self.webapp.post("/set_language", {"lang": "fr"})
res = self.webapp.post(
"/set_language", {"lang": "fr", "csrf_token": csrf_token}
)
self.assertEqual(res.text, "fr")
# updated property
res = self.webapp.post("/set_language", {"lang": "en"})
res = self.webapp.post(
"/set_language", {"lang": "en", "csrf_token": csrf_token}
)