diff --git a/tests/test_mfa.py b/tests/test_mfa.py index b42a20ae11ed9eea103b013f5e4b40ab4fe057b5..d52912a44b1ff27f338d9070eb0ce1dde1af1463 100644 --- a/tests/test_mfa.py +++ b/tests/test_mfa.py @@ -401,7 +401,6 @@ class TestMfaViews(UffdTestCase): self.assertEqual(r.status_code, 200) self.assertFalse(is_valid_session()) - @unittest.skip('Not implemented, see #10') def test_auth_ratelimit(self): self.add_recovery_codes() self.add_totp() diff --git a/tests/test_ratelimit.py b/tests/test_ratelimit.py new file mode 100644 index 0000000000000000000000000000000000000000..cf7501fa99e752d3d549f528191a501a997eb8b4 --- /dev/null +++ b/tests/test_ratelimit.py @@ -0,0 +1,66 @@ +import time + +from flask import Flask, Blueprint, session, url_for + +from uffd.ratelimit import get_addrkey, format_delay, Ratelimit, RatelimitEvent + +from utils import UffdTestCase + +class TestRatelimit(UffdTestCase): + def test_limiting(self): + cases = [ + (1*60, 3), + (1*60*60, 3), + (1*60*60, 25), + ] + for index, case in enumerate(cases): + interval, limit = case + key = str(index) + ratelimit = Ratelimit('test', interval, limit) + for i in range(limit): + ratelimit.log(key) + self.assertLessEqual(ratelimit.get_delay(key), interval) + ratelimit.log(key) + self.assertGreater(ratelimit.get_delay(key), interval) + + def test_addrkey(self): + self.assertEqual(get_addrkey('192.168.0.1'), get_addrkey('192.168.0.99')) + self.assertNotEqual(get_addrkey('192.168.0.1'), get_addrkey('192.168.1.1')) + self.assertEqual(get_addrkey('fdee:707a:f38a:c369::'), get_addrkey('fdee:707a:f38a:ffff::')) + self.assertNotEqual(get_addrkey('fdee:707a:f38a:c369::'), get_addrkey('fdee:707a:f38b:c369::')) + cases = [ + '', + '192.168.0.', + ':', + '::', + '192.168.0.1/24', + '192.168.0.1/24', + 'host.example.com', + ] + for case in cases: + self.assertIsInstance(get_addrkey(case), str) + + def test_format_delay(self): + self.assertIsInstance(format_delay(0), str) + self.assertIsInstance(format_delay(1), str) + self.assertIsInstance(format_delay(30), str) + self.assertIsInstance(format_delay(60), str) + self.assertIsInstance(format_delay(120), str) + self.assertIsInstance(format_delay(3600), str) + self.assertIsInstance(format_delay(4000), str) + + def test_cleanup(self): + ratelimit = Ratelimit('test', 1, 1) + ratelimit.log('') + ratelimit.log('1') + ratelimit.log('2') + ratelimit.log('3') + ratelimit.log('4') + time.sleep(1) + ratelimit.log('5') + self.assertEqual(RatelimitEvent.query.filter(RatelimitEvent.name == 'test').count(), 6) + ratelimit.cleanup() + self.assertEqual(RatelimitEvent.query.filter(RatelimitEvent.name == 'test').count(), 1) + time.sleep(1) + ratelimit.cleanup() + self.assertEqual(RatelimitEvent.query.filter(RatelimitEvent.name == 'test').count(), 0) diff --git a/tests/test_session.py b/tests/test_session.py index fb66d2f21ea7fe7565dc9b151b6a5c07711a5de5..d3e00d7acbb189c7a6ec3016e9ea860ca792913f 100644 --- a/tests/test_session.py +++ b/tests/test_session.py @@ -124,7 +124,6 @@ class TestSession(UffdTestCase): time.sleep(3) self.assertLogout() - @unittest.skip('Not implemented, see #10') def test_ratelimit(self): for i in range(20): self.client.post(path=url_for('session.login'), diff --git a/uffd/mfa/views.py b/uffd/mfa/views.py index 7255ae026ec9c3992d9fdc88b917b76cb5edd8d4..d3eed7d304c1c972c86ad9e79711d9f6827099ee 100644 --- a/uffd/mfa/views.py +++ b/uffd/mfa/views.py @@ -9,9 +9,12 @@ from uffd.session.views import get_current_user, login_required, pre_mfa_login_r from uffd.ldap import uid_to_dn from uffd.user.models import User from uffd.csrf import csrf_protect +from uffd.ratelimit import Ratelimit, format_delay bp = Blueprint('mfa', __name__, template_folder='templates', url_prefix='/mfa/') +mfa_ratelimit = Ratelimit('mfa', 1*60, 3) + @bp.route('/', methods=['GET']) @login_required() def setup(): @@ -223,6 +226,10 @@ def auth(): @pre_mfa_login_required() def auth_finish(): user = get_current_user() + delay = mfa_ratelimit.get_delay(user.dn) + if delay: + flash('We received too many invalid attempts! Please wait at least %s.'%format_delay(delay)) + return redirect(url_for('mfa.auth', ref=request.values.get('ref'))) recovery_methods = RecoveryCodeMethod.query.filter_by(dn=user.dn).all() totp_methods = TOTPMethod.query.filter_by(dn=user.dn).all() for method in totp_methods: @@ -241,5 +248,6 @@ def auth_finish(): flash('You only have a few recovery codes remaining. Make sure to generate new ones before they run out.') return redirect(url_for('mfa.setup')) return redirect(request.values.get('ref', url_for('index'))) + mfa_ratelimit.log(user.dn) flash('Two-factor authentication failed') return redirect(url_for('mfa.auth', ref=request.values.get('ref'))) diff --git a/uffd/ratelimit.py b/uffd/ratelimit.py new file mode 100644 index 0000000000000000000000000000000000000000..86bc2f0b92145228136635c690dc38babf0f1d4d --- /dev/null +++ b/uffd/ratelimit.py @@ -0,0 +1,81 @@ +import datetime +import ipaddress +import math + +from flask import request +from sqlalchemy import Column, Integer, String, DateTime + +from uffd.database import db + +class RatelimitEvent(db.Model): + __tablename__ = 'ratelimit_event' + id = Column(Integer(), primary_key=True, autoincrement=True) + timestamp = Column(DateTime(), default=datetime.datetime.now) + name = Column(String(128)) + key = Column(String(128)) + +class Ratelimit: + def __init__(self, name, interval, limit): + self.name = name + self.interval = interval + self.limit = limit + self.base = interval**(1/limit) + + def cleanup(self): + limit = datetime.datetime.now() - datetime.timedelta(seconds=self.interval) + RatelimitEvent.query.filter(RatelimitEvent.name == self.name, RatelimitEvent.timestamp <= limit).delete() + db.session.commit() + + def log(self, key=None): + db.session.add(RatelimitEvent(name=self.name, key=key)) + db.session.commit() + + def get_delay(self, key=None): + self.cleanup() + events = RatelimitEvent.query.filter(RatelimitEvent.name == self.name, RatelimitEvent.key == key).all() + if not events: + return 0 + delay = math.ceil(self.base**len(events)) + if delay < 5: + delay = 0 + delay = min(delay, 365*24*60*60) # prevent overflow of datetime objetcs + remaining = events[0].timestamp + datetime.timedelta(seconds=delay) - datetime.datetime.now() + return max(0, math.ceil(remaining.total_seconds())) + +def get_addrkey(addr=None): + if addr is None: + addr = request.remote_addr + try: + addr = ipaddress.ip_address(addr) + except ValueError: + return '"'+addr+'"' + if isinstance(addr, ipaddress.IPv4Address): + net = ipaddress.IPv4Network((addr, '24'), strict=False) + elif isinstance(addr, ipaddress.IPv6Address): + net = ipaddress.IPv6Network((addr, '48'), strict=False) + else: + net = ipaddress.ip_network(addr) + return net.network_address.compressed + +class HostRatelimit(Ratelimit): + def log(self, key=None): + super().log(get_addrkey(key)) + + def get_delay(self, key=None): + return super().get_delay(get_addrkey(key)) + +def format_delay(seconds): + if seconds <= 15: + return 'a few seconds' + if seconds <= 30: + return '30 seconds' + if seconds <= 60: + return 'one minute' + if seconds < 3000: + return '%d minutes'%(math.ceil(seconds/60)+1) + if seconds <= 3600: + return 'one hour' + return '%d hours'%math.ceil(seconds/3600) + +# Global host-based ratelimit +host_ratelimit = HostRatelimit('host', 1*60*60, 25) diff --git a/uffd/selfservice/views.py b/uffd/selfservice/views.py index 4770c950f0d8aaca587be67ea127e6d56d854230..9b1e88908727073ff74b9b6e8950c6e15d62b76d 100644 --- a/uffd/selfservice/views.py +++ b/uffd/selfservice/views.py @@ -13,9 +13,12 @@ from uffd.session import get_current_user, login_required, is_valid_session from uffd.ldap import loginname_to_dn from uffd.selfservice.models import PasswordToken, MailToken from uffd.database import db +from uffd.ratelimit import host_ratelimit, Ratelimit, format_delay bp = Blueprint("selfservice", __name__, template_folder='templates', url_prefix='/self/') +reset_ratelimit = Ratelimit('passwordreset', 1*60*60, 3) + @bp.route("/") @register_navbar('Selfservice', icon='portrait', blueprint=bp, visible=is_valid_session) @login_required() @@ -53,6 +56,16 @@ def forgot_password(): loginname = request.values['loginname'] mail = request.values['mail'] + reset_delay = reset_ratelimit.get_delay(loginname+'/'+mail) + host_delay = host_ratelimit.get_delay() + if reset_delay or host_delay: + if reset_delay > host_delay: + flash('We received too many password reset requests for this user! Please wait at least %s.'%format_delay(reset_delay)) + else: + flash('We received too many requests from your ip address/network! Please wait at least %s.'%format_delay(host_delay)) + return redirect(url_for('.forgot_password')) + reset_ratelimit.log(loginname+'/'+mail) + host_ratelimit.log() flash("We sent a mail to this users mail address if you entered the correct mail and login name combination") user = User.from_ldap_dn(loginname_to_dn(loginname)) if user and user.mail == mail: diff --git a/uffd/session/views.py b/uffd/session/views.py index 2178055cac75294855382d04c0ced951cd1f8aa9..50eaecea1301d745fb42f41fe3f949ad0d2a0585 100644 --- a/uffd/session/views.py +++ b/uffd/session/views.py @@ -6,9 +6,12 @@ from flask import Blueprint, render_template, request, url_for, redirect, flash, from uffd.user.models import User from uffd.ldap import user_conn, uid_to_dn +from uffd.ratelimit import Ratelimit, host_ratelimit, format_delay bp = Blueprint("session", __name__, template_folder='templates', url_prefix='/') +login_ratelimit = Ratelimit('login', 1*60, 3) + @bp.route("/logout") def logout(): # The oauth2 module takes data from `session` and injects it into the url, @@ -24,10 +27,20 @@ def login(): username = request.form['loginname'] password = request.form['password'] + login_delay = login_ratelimit.get_delay(username) + host_delay = host_ratelimit.get_delay() + if login_delay or host_delay: + if login_delay > host_delay: + flash('We received too many invalid login attempts for this user! Please wait at least %s.'%format_delay(login_delay)) + else: + flash('We received too many requests from your ip address/network! Please wait at least %s.'%format_delay(host_delay)) + return render_template('login.html', ref=request.values.get('ref')) conn = user_conn(username, password) if conn: conn.search(conn.user, '(objectClass=person)') if not conn or len(conn.entries) != 1: + login_ratelimit.log(username) + host_ratelimit.log() flash('Login name or password is wrong') return render_template('login.html', ref=request.values.get('ref')) user = User.from_ldap(conn.entries[0])