From d311fe5e996d43316989daf8a67ded22a6a567e2 Mon Sep 17 00:00:00 2001 From: Edward Date: Tue, 19 Apr 2022 19:42:47 +0800 Subject: [PATCH] Support for digest auth with qop=auth --- examples/digest_auth.py | 36 ++++++ src/flask_httpauth.py | 25 +++- tests/test_digest_custom_realm.py | 2 +- tests/test_digest_get_password.py | 31 +++-- tests/test_digest_no_qop.py | 198 ++++++++++++++++++++++++++++++ 5 files changed, 274 insertions(+), 18 deletions(-) create mode 100644 examples/digest_auth.py create mode 100644 tests/test_digest_no_qop.py diff --git a/examples/digest_auth.py b/examples/digest_auth.py new file mode 100644 index 0000000..c0a2fac --- /dev/null +++ b/examples/digest_auth.py @@ -0,0 +1,36 @@ +#!/usr/bin/env python +"""Digest authentication example + +This example demonstrates how to protect Flask endpoints with digest +authentication. + +After running this example, visit http://localhost:5000 in your browser. To +gain access, you can use (username=john, password=hello) or +(username=susan, password=bye). +""" +from flask import Flask +from flask_httpauth import HTTPDigestAuth + +app = Flask(__name__) +app.secret_key = 'this-is-a-secret-key' +auth = HTTPDigestAuth(qop='auth') + +users = { + "john": "hello", + "susan": "bye", +} + + +@auth.get_password +def get_password(username): + return users.get(username) + + +@app.route('/') +@auth.login_required +def index(): + return "Hello, %s!" % auth.current_user() + + +if __name__ == '__main__': + app.run(debug=True, host='0.0.0.0') diff --git a/src/flask_httpauth.py b/src/flask_httpauth.py index 22c4486..f33f6c8 100644 --- a/src/flask_httpauth.py +++ b/src/flask_httpauth.py @@ -254,9 +254,13 @@ def authenticate(self, auth, stored_password): class HTTPDigestAuth(HTTPAuth): - def __init__(self, scheme=None, realm=None, use_ha1_pw=False): + def __init__(self, scheme=None, realm=None, use_ha1_pw=False, qop='auth'): super(HTTPDigestAuth, self).__init__(scheme or 'Digest', realm) self.use_ha1_pw = use_ha1_pw + if isinstance(qop, str): + self.qop = [v.strip() for v in qop.split(',')] + else: + self.qop = qop self.random = SystemRandom() try: self.random.random() @@ -326,9 +330,14 @@ def generate_ha1(self, username, password): def authenticate_header(self): nonce = self.get_nonce() opaque = self.get_opaque() - return '{0} realm="{1}",nonce="{2}",opaque="{3}"'.format( - self.scheme, self.realm, nonce, - opaque) + if self.qop: + return '{0} realm="{1}",nonce="{2}",opaque="{3}",qop="{4}"'.format( + self.scheme, self.realm, nonce, + opaque, ','.join(self.qop)) + else: + return '{0} realm="{1}",nonce="{2}",opaque="{3}"'.format( + self.scheme, self.realm, nonce, + opaque) def authenticate(self, auth, stored_password_or_ha1): if not auth or not auth.username or not auth.realm or not auth.uri \ @@ -338,6 +347,8 @@ def authenticate(self, auth, stored_password_or_ha1): if not(self.verify_nonce_callback(auth.nonce)) or \ not(self.verify_opaque_callback(auth.opaque)): return False + if auth.qop and auth.qop not in self.qop: # pragma: no cover + return False if self.use_ha1_pw: ha1 = stored_password_or_ha1 else: @@ -346,7 +357,11 @@ def authenticate(self, auth, stored_password_or_ha1): ha1 = md5(a1.encode('utf-8')).hexdigest() a2 = request.method + ":" + auth.uri ha2 = md5(a2.encode('utf-8')).hexdigest() - a3 = ha1 + ":" + auth.nonce + ":" + ha2 + if auth.qop == 'auth': + a3 = ha1 + ":" + auth.nonce + ":" + auth.nc + ":" + \ + auth.cnonce + ":auth:" + ha2 + else: + a3 = ha1 + ":" + auth.nonce + ":" + ha2 response = md5(a3.encode('utf-8')).hexdigest() return hmac.compare_digest(response, auth.response) diff --git a/tests/test_digest_custom_realm.py b/tests/test_digest_custom_realm.py index a987c89..2fac03d 100644 --- a/tests/test_digest_custom_realm.py +++ b/tests/test_digest_custom_realm.py @@ -9,7 +9,7 @@ def setUp(self): app = Flask(__name__) app.config['SECRET_KEY'] = 'my secret' - digest_auth_my_realm = HTTPDigestAuth(realm='My Realm') + digest_auth_my_realm = HTTPDigestAuth(realm='My Realm', qop=None) @digest_auth_my_realm.get_password def get_digest_password_3(username): diff --git a/tests/test_digest_get_password.py b/tests/test_digest_get_password.py index d57e2ec..8f6b4bf 100644 --- a/tests/test_digest_get_password.py +++ b/tests/test_digest_get_password.py @@ -51,7 +51,8 @@ def test_digest_auth_prompt(self): self.assertEqual(response.status_code, 401) self.assertTrue('WWW-Authenticate' in response.headers) self.assertTrue(re.match(r'^Digest realm="Authentication Required",' - r'nonce="[0-9a-f]+",opaque="[0-9a-f]+"$', + r'nonce="[0-9a-f]+",opaque="[0-9a-f]+",' + r'qop="auth"$', response.headers['WWW-Authenticate'])) def test_digest_auth_ignore_options(self): @@ -70,13 +71,14 @@ def test_digest_auth_login_valid(self): ha1 = md5(a1).hexdigest() a2 = 'GET:/digest' ha2 = md5(a2).hexdigest() - a3 = ha1 + ':' + d['nonce'] + ':' + ha2 + a3 = ha1 + ':' + d['nonce'] + ':00000001:foobar:auth:' + ha2 auth_response = md5(a3).hexdigest() response = self.client.get( '/digest', headers={ 'Authorization': 'Digest username="john",realm="{0}",' - 'nonce="{1}",uri="/digest",response="{2}",' + 'nonce="{1}",uri="/digest",qop=auth,' + 'nc=00000001,cnonce="foobar",response="{2}",' 'opaque="{3}"'.format(d['realm'], d['nonce'], auth_response, @@ -94,13 +96,14 @@ def test_digest_auth_login_bad_realm(self): ha1 = md5(a1).hexdigest() a2 = 'GET:/digest' ha2 = md5(a2).hexdigest() - a3 = ha1 + ':' + d['nonce'] + ':' + ha2 + a3 = ha1 + ':' + d['nonce'] + ':00000001:foobar:auth:' + ha2 auth_response = md5(a3).hexdigest() response = self.client.get( '/digest', headers={ 'Authorization': 'Digest username="john",realm="{0}",' - 'nonce="{1}",uri="/digest",response="{2}",' + 'nonce="{1}",uri="/digest",qop=auth,' + 'nc=00000001,cnonce="foobar",response="{2}",' 'opaque="{3}"'.format(d['realm'], d['nonce'], auth_response, @@ -108,7 +111,8 @@ def test_digest_auth_login_bad_realm(self): self.assertEqual(response.status_code, 401) self.assertTrue('WWW-Authenticate' in response.headers) self.assertTrue(re.match(r'^Digest realm="Authentication Required",' - r'nonce="[0-9a-f]+",opaque="[0-9a-f]+"$', + r'nonce="[0-9a-f]+",opaque="[0-9a-f]+",' + r'qop="auth"$', response.headers['WWW-Authenticate'])) def test_digest_auth_login_invalid2(self): @@ -122,13 +126,14 @@ def test_digest_auth_login_invalid2(self): ha1 = md5(a1).hexdigest() a2 = 'GET:/digest' ha2 = md5(a2).hexdigest() - a3 = ha1 + ':' + d['nonce'] + ':' + ha2 + a3 = ha1 + ':' + d['nonce'] + ':00000001:foobar:auth:' + ha2 auth_response = md5(a3).hexdigest() response = self.client.get( '/digest', headers={ - 'Authorization': 'Digest username="david",realm="{0}",' - 'nonce="{1}",uri="/digest",response="{2}",' + 'Authorization': 'Digest username="john",realm="{0}",' + 'nonce="{1}",uri="/digest",qop=auth,' + 'nc=00000001,cnonce="foobar",response="{2}",' 'opaque="{3}"'.format(d['realm'], d['nonce'], auth_response, @@ -136,7 +141,8 @@ def test_digest_auth_login_invalid2(self): self.assertEqual(response.status_code, 401) self.assertTrue('WWW-Authenticate' in response.headers) self.assertTrue(re.match(r'^Digest realm="Authentication Required",' - r'nonce="[0-9a-f]+",opaque="[0-9a-f]+"$', + r'nonce="[0-9a-f]+",opaque="[0-9a-f]+",' + r'qop="auth"$', response.headers['WWW-Authenticate'])) def test_digest_generate_ha1(self): @@ -180,13 +186,14 @@ def verify_opaque(provided_opaque): ha1 = md5(a1).hexdigest() a2 = 'GET:/digest' ha2 = md5(a2).hexdigest() - a3 = ha1 + ':' + d['nonce'] + ':' + ha2 + a3 = ha1 + ':' + d['nonce'] + ':00000001:foobar:auth:' + ha2 auth_response = md5(a3).hexdigest() response = self.client.get( '/digest', headers={ 'Authorization': 'Digest username="john",realm="{0}",' - 'nonce="{1}",uri="/digest",response="{2}",' + 'nonce="{1}",uri="/digest",qop=auth,' + 'nc=00000001,cnonce="foobar",response="{2}",' 'opaque="{3}"'.format(d['realm'], d['nonce'], auth_response, diff --git a/tests/test_digest_no_qop.py b/tests/test_digest_no_qop.py new file mode 100644 index 0000000..195b4f1 --- /dev/null +++ b/tests/test_digest_no_qop.py @@ -0,0 +1,198 @@ +import unittest +import re +from hashlib import md5 as basic_md5 +from flask import Flask +from flask_httpauth import HTTPDigestAuth +from werkzeug.http import parse_dict_header + + +def md5(str): + if type(str).__name__ == 'str': + str = str.encode('utf-8') + return basic_md5(str) + + +def get_ha1(user, pw, realm): + a1 = user + ":" + realm + ":" + pw + return md5(a1).hexdigest() + + +class HTTPAuthTestCase(unittest.TestCase): + def setUp(self): + app = Flask(__name__) + app.config['SECRET_KEY'] = 'my secret' + + digest_auth = HTTPDigestAuth(qop=None) + + @digest_auth.get_password + def get_digest_password_2(username): + if username == 'susan': + return 'hello' + elif username == 'john': + return 'bye' + else: + return None + + @app.route('/') + def index(): + return 'index' + + @app.route('/digest') + @digest_auth.login_required + def digest_auth_route(): + return 'digest_auth:' + digest_auth.username() + + self.app = app + self.digest_auth = digest_auth + self.client = app.test_client() + + def test_digest_auth_prompt(self): + response = self.client.get('/digest') + self.assertEqual(response.status_code, 401) + self.assertTrue('WWW-Authenticate' in response.headers) + self.assertTrue(re.match(r'^Digest realm="Authentication Required",' + r'nonce="[0-9a-f]+",opaque="[0-9a-f]+"$', + response.headers['WWW-Authenticate'])) + + def test_digest_auth_ignore_options(self): + response = self.client.options('/digest') + self.assertEqual(response.status_code, 200) + self.assertTrue('WWW-Authenticate' not in response.headers) + + def test_digest_auth_login_valid(self): + response = self.client.get('/digest') + self.assertTrue(response.status_code == 401) + header = response.headers.get('WWW-Authenticate') + auth_type, auth_info = header.split(None, 1) + d = parse_dict_header(auth_info) + + a1 = 'john:' + d['realm'] + ':bye' + ha1 = md5(a1).hexdigest() + a2 = 'GET:/digest' + ha2 = md5(a2).hexdigest() + a3 = ha1 + ':' + d['nonce'] + ':' + ha2 + auth_response = md5(a3).hexdigest() + + response = self.client.get( + '/digest', headers={ + 'Authorization': 'Digest username="john",realm="{0}",' + 'nonce="{1}",uri="/digest",response="{2}",' + 'opaque="{3}"'.format(d['realm'], + d['nonce'], + auth_response, + d['opaque'])}) + self.assertEqual(response.data, b'digest_auth:john') + + def test_digest_auth_login_bad_realm(self): + response = self.client.get('/digest') + self.assertTrue(response.status_code == 401) + header = response.headers.get('WWW-Authenticate') + auth_type, auth_info = header.split(None, 1) + d = parse_dict_header(auth_info) + + a1 = 'john:' + 'Wrong Realm' + ':bye' + ha1 = md5(a1).hexdigest() + a2 = 'GET:/digest' + ha2 = md5(a2).hexdigest() + a3 = ha1 + ':' + d['nonce'] + ':' + ha2 + auth_response = md5(a3).hexdigest() + + response = self.client.get( + '/digest', headers={ + 'Authorization': 'Digest username="john",realm="{0}",' + 'nonce="{1}",uri="/digest",response="{2}",' + 'opaque="{3}"'.format(d['realm'], + d['nonce'], + auth_response, + d['opaque'])}) + self.assertEqual(response.status_code, 401) + self.assertTrue('WWW-Authenticate' in response.headers) + self.assertTrue(re.match(r'^Digest realm="Authentication Required",' + r'nonce="[0-9a-f]+",opaque="[0-9a-f]+"$', + response.headers['WWW-Authenticate'])) + + def test_digest_auth_login_invalid2(self): + response = self.client.get('/digest') + self.assertEqual(response.status_code, 401) + header = response.headers.get('WWW-Authenticate') + auth_type, auth_info = header.split(None, 1) + d = parse_dict_header(auth_info) + + a1 = 'david:' + 'Authentication Required' + ':bye' + ha1 = md5(a1).hexdigest() + a2 = 'GET:/digest' + ha2 = md5(a2).hexdigest() + a3 = ha1 + ':' + d['nonce'] + ':' + ha2 + auth_response = md5(a3).hexdigest() + + response = self.client.get( + '/digest', headers={ + 'Authorization': 'Digest username="david",realm="{0}",' + 'nonce="{1}",uri="/digest",response="{2}",' + 'opaque="{3}"'.format(d['realm'], + d['nonce'], + auth_response, + d['opaque'])}) + self.assertEqual(response.status_code, 401) + self.assertTrue('WWW-Authenticate' in response.headers) + self.assertTrue(re.match(r'^Digest realm="Authentication Required",' + r'nonce="[0-9a-f]+",opaque="[0-9a-f]+"$', + response.headers['WWW-Authenticate'])) + + def test_digest_generate_ha1(self): + ha1 = self.digest_auth.generate_ha1('pawel', 'test') + ha1_expected = get_ha1('pawel', 'test', self.digest_auth.realm) + self.assertEqual(ha1, ha1_expected) + + def test_digest_custom_nonce_checker(self): + @self.digest_auth.generate_nonce + def noncemaker(): + return 'not a good nonce' + + @self.digest_auth.generate_opaque + def opaquemaker(): + return 'some opaque' + + verify_nonce_called = [] + + @self.digest_auth.verify_nonce + def verify_nonce(provided_nonce): + verify_nonce_called.append(provided_nonce) + return True + + verify_opaque_called = [] + + @self.digest_auth.verify_opaque + def verify_opaque(provided_opaque): + verify_opaque_called.append(provided_opaque) + return True + + response = self.client.get('/digest') + self.assertEqual(response.status_code, 401) + header = response.headers.get('WWW-Authenticate') + auth_type, auth_info = header.split(None, 1) + d = parse_dict_header(auth_info) + + self.assertEqual(d['nonce'], 'not a good nonce') + self.assertEqual(d['opaque'], 'some opaque') + + a1 = 'john:' + d['realm'] + ':bye' + ha1 = md5(a1).hexdigest() + a2 = 'GET:/digest' + ha2 = md5(a2).hexdigest() + a3 = ha1 + ':' + d['nonce'] + ':' + ha2 + auth_response = md5(a3).hexdigest() + + response = self.client.get( + '/digest', headers={ + 'Authorization': 'Digest username="john",realm="{0}",' + 'nonce="{1}",uri="/digest",response="{2}",' + 'opaque="{3}"'.format(d['realm'], + d['nonce'], + auth_response, + d['opaque'])}) + self.assertEqual(response.data, b'digest_auth:john') + self.assertEqual(verify_nonce_called, ['not a good nonce'], + "Should have verified the nonce.") + self.assertEqual(verify_opaque_called, ['some opaque'], + "Should have verified the opaque.")