finish porting tests to pytest.
* removed httpretty dependency. * some small python3-related fixes.
This commit is contained in:
parent
4ef1e042c5
commit
4d281c94e1
1
Pipfile
1
Pipfile
|
@ -4,7 +4,6 @@ name = "pypi"
|
|||
url = "https://pypi.org/simple"
|
||||
|
||||
[dev-packages]
|
||||
httpretty = { version = "*" }
|
||||
python-dotenv = { version = "*" }
|
||||
pytest = { version = "*" }
|
||||
pytest-mock = "*"
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
{
|
||||
"_meta": {
|
||||
"hash": {
|
||||
"sha256": "95cfcbf0c126943643c31cbabfe6003099eeeda2529e09ff7ec65cd7f8ef8384"
|
||||
"sha256": "e2a63901824adba9432cad8d98ca86858e058d5755a34670868512d3ee4bac88"
|
||||
},
|
||||
"pipfile-spec": 6,
|
||||
"requires": {
|
||||
|
@ -334,13 +334,6 @@
|
|||
],
|
||||
"version": "==18.1.0"
|
||||
},
|
||||
"httpretty": {
|
||||
"hashes": [
|
||||
"sha256:69259e22addf5ab5f25bd00c6568e16fc2e54efdd4da69eb0950718dba3b2dab"
|
||||
],
|
||||
"index": "pypi",
|
||||
"version": "==0.9.4"
|
||||
},
|
||||
"more-itertools": {
|
||||
"hashes": [
|
||||
"sha256:0dd8f72eeab0d2c3bd489025bb2f6a1b8342f9b198f6fc37b52d15cfa4531fea",
|
||||
|
|
|
@ -148,7 +148,7 @@ def get_temp_hostname(nonce):
|
|||
if value is None:
|
||||
raise KeyError("no temp_hostname stored.")
|
||||
redis_store.delete(key)
|
||||
values = value.split(',')
|
||||
values = value.decode('utf-8').split(',')
|
||||
if len(values) != 2:
|
||||
raise ValueError("temp_hostname value is invalid: " + value)
|
||||
else:
|
||||
|
@ -165,7 +165,7 @@ def fetch_first_submission(nonce):
|
|||
key = REDIS_FIRSTSUBMISSION_KEY(nonce=nonce)
|
||||
jsondata = redis_store.get(key)
|
||||
try:
|
||||
return json.loads(jsondata)
|
||||
return json.loads(jsondata.decode('utf-8'))
|
||||
except:
|
||||
return None
|
||||
|
||||
|
|
|
@ -46,7 +46,10 @@ def client(app):
|
|||
|
||||
with app.app_context():
|
||||
DB.create_all()
|
||||
yield app.test_client()
|
||||
|
||||
with app.test_request_context():
|
||||
yield app.test_client()
|
||||
|
||||
DB.session.remove()
|
||||
DB.drop_all()
|
||||
|
||||
|
|
|
@ -0,0 +1,216 @@
|
|||
import json
|
||||
|
||||
from formspree import settings
|
||||
from formspree.stuff import DB
|
||||
from formspree.forms.helpers import HASH
|
||||
from formspree.users.models import User
|
||||
from formspree.forms.models import Form, Submission
|
||||
|
||||
def test_automatically_created_forms(client, msend):
|
||||
# submit a form
|
||||
client.post('/alice@example.com',
|
||||
headers = {'referer': 'http://somewhere.com'},
|
||||
data={'name': 'john'}
|
||||
)
|
||||
query = Form.query.filter_by(host='somewhere.com',
|
||||
email='alice@example.com')
|
||||
assert query.count() == 1
|
||||
form = query.first()
|
||||
|
||||
# this form wasn't confirmed, so it still has no submissions
|
||||
assert form.submissions.count() == 0
|
||||
|
||||
# confirm form
|
||||
form.confirmed = True
|
||||
DB.session.add(form)
|
||||
DB.session.commit()
|
||||
|
||||
# submit again
|
||||
client.post('/alice@example.com',
|
||||
headers = {'referer': 'http://somewhere.com'},
|
||||
data={'_replyto': 'johann@gmail.com', 'name': 'johann'}
|
||||
)
|
||||
|
||||
# submissions now must be 1
|
||||
form = query.first()
|
||||
assert form.submissions.count() == 1
|
||||
|
||||
# submit again
|
||||
client.post('/alice@example.com',
|
||||
headers = {'referer': 'http://somewhere.com'},
|
||||
data={'_replyto': 'joh@ann.es', '_next': 'http://google.com',
|
||||
'name': 'johannes', 'message': 'salve!'}
|
||||
)
|
||||
|
||||
# submissions now must be 2
|
||||
form = query.first()
|
||||
assert form.submissions.count() == 2
|
||||
|
||||
# check archived values
|
||||
submissions = form.submissions.all()
|
||||
|
||||
assert 2 == len(submissions)
|
||||
assert 'message' not in submissions[1].data
|
||||
assert '_next' not in submissions[1].data
|
||||
assert '_next' in submissions[0].data
|
||||
assert 'johann@gmail.com' == submissions[1].data['_replyto']
|
||||
assert 'joh@ann.es' == submissions[0].data['_replyto']
|
||||
assert 'johann' == submissions[1].data['name']
|
||||
assert 'johannes' == submissions[0].data['name']
|
||||
assert 'salve!' == submissions[0].data['message']
|
||||
|
||||
# check if submissions over the limit are correctly deleted
|
||||
assert settings.ARCHIVED_SUBMISSIONS_LIMIT == 2
|
||||
|
||||
client.post('/alice@example.com',
|
||||
headers = {'referer': 'http://somewhere.com'},
|
||||
data={'which-submission-is-this': 'the third!'}
|
||||
)
|
||||
assert 2 == form.submissions.count()
|
||||
newest = form.submissions.first() # first should be the newest
|
||||
assert newest.data['which-submission-is-this'] == 'the third!'
|
||||
|
||||
client.post('/alice@example.com',
|
||||
headers = {'referer': 'http://somewhere.com'},
|
||||
data={'which-submission-is-this': 'the fourth!'}
|
||||
)
|
||||
assert 2 == form.submissions.count()
|
||||
newest, last = form.submissions.all()
|
||||
assert newest.data['which-submission-is-this'] == 'the fourth!'
|
||||
assert last.data['which-submission-is-this'] == 'the third!'
|
||||
|
||||
#
|
||||
# try another form (to ensure that a form is not deleting wrong submissions)
|
||||
client.post('/sokratis@example.com',
|
||||
headers = {'referer': 'http://here.com'},
|
||||
data={'name': 'send me the confirmation!'}
|
||||
)
|
||||
query = Form.query.filter_by(host='here.com',
|
||||
email='sokratis@example.com')
|
||||
assert query.count() == 1
|
||||
secondform = query.first()
|
||||
|
||||
# this form wasn't confirmed, so it still has no submissions
|
||||
assert secondform.submissions.count() == 0
|
||||
|
||||
# confirm
|
||||
secondform.confirmed = True
|
||||
DB.session.add(form)
|
||||
DB.session.commit()
|
||||
|
||||
# submit more times and test
|
||||
client.post('/sokratis@example.com',
|
||||
headers = {'referer': 'http://here.com'},
|
||||
data={'name': 'leibniz'}
|
||||
)
|
||||
|
||||
assert 1 == secondform.submissions.count()
|
||||
assert secondform.submissions.first().data['name'] == 'leibniz'
|
||||
|
||||
client.post('/sokratis@example.com',
|
||||
headers = {'referer': 'http://here.com'},
|
||||
data={'name': 'schelling'}
|
||||
)
|
||||
|
||||
assert 2 == secondform.submissions.count()
|
||||
newest, last = secondform.submissions.all()
|
||||
assert newest.data['name'] == 'schelling'
|
||||
assert last.data['name'] == 'leibniz'
|
||||
|
||||
client.post('/sokratis@example.com',
|
||||
headers = {'referer': 'http://here.com'},
|
||||
data={'name': 'husserl'}
|
||||
)
|
||||
|
||||
assert 2 == secondform.submissions.count()
|
||||
newest, last = secondform.submissions.all()
|
||||
assert newest.data['name'] == 'husserl'
|
||||
assert last.data['name'] == 'schelling'
|
||||
|
||||
# now check the previous form again
|
||||
newest, last = form.submissions.all()
|
||||
assert newest.data['which-submission-is-this'] == 'the fourth!'
|
||||
assert last.data['which-submission-is-this'] == 'the third!'
|
||||
|
||||
client.post('/alice@example.com',
|
||||
headers = {'referer': 'http://somewhere.com'},
|
||||
data={'which-submission-is-this': 'the fifth!'}
|
||||
)
|
||||
assert 2 == form.submissions.count()
|
||||
newest, last = form.submissions.all()
|
||||
assert newest.data['which-submission-is-this'] == 'the fifth!'
|
||||
assert last.data['which-submission-is-this'] == 'the fourth!'
|
||||
|
||||
# just one more time the second form
|
||||
assert 2 == secondform.submissions.count()
|
||||
newest, last = secondform.submissions.all()
|
||||
assert newest.data['name'] == 'husserl'
|
||||
assert last.data['name'] == 'schelling'
|
||||
|
||||
def test_upgraded_user_access(client, msend):
|
||||
# register user
|
||||
r = client.post('/register',
|
||||
data={'email': 'colorado@springs.com',
|
||||
'password': 'banana'}
|
||||
)
|
||||
|
||||
# upgrade user manually
|
||||
user = User.query.filter_by(email='colorado@springs.com').first()
|
||||
user.upgraded = True
|
||||
DB.session.add(user)
|
||||
DB.session.commit()
|
||||
|
||||
# create form
|
||||
r = client.post('/forms',
|
||||
headers={'Accept': 'application/json',
|
||||
'Content-type': 'application/json'},
|
||||
data=json.dumps({'email': 'hope@springs.com'})
|
||||
)
|
||||
resp = json.loads(r.data.decode('utf-8'))
|
||||
form_endpoint = resp['hashid']
|
||||
|
||||
# manually confirm the form
|
||||
form = Form.get_with_hashid(form_endpoint)
|
||||
form.confirmed = True
|
||||
DB.session.add(form)
|
||||
DB.session.commit()
|
||||
|
||||
# submit form
|
||||
r = client.post('/' + form_endpoint,
|
||||
headers={'Referer': 'formspree.io'},
|
||||
data={'name': 'bruce', 'message': 'hi, my name is bruce!'}
|
||||
)
|
||||
|
||||
# test submissions endpoint (/forms/<hashid>/)
|
||||
r = client.get('/forms/' + form_endpoint + '/',
|
||||
headers={'Accept': 'application/json'}
|
||||
)
|
||||
submissions = json.loads(r.data.decode('utf-8'))['submissions']
|
||||
assert len(submissions) == 1
|
||||
assert submissions[0]['name'] == 'bruce'
|
||||
assert submissions[0]['message'] == 'hi, my name is bruce!'
|
||||
|
||||
# test exporting feature (both json and csv file downloads)
|
||||
r = client.get('/forms/' + form_endpoint + '.json')
|
||||
submissions = json.loads(r.data.decode('utf-8'))['submissions']
|
||||
assert len(submissions) == 1
|
||||
assert submissions[0]['name'] == 'bruce'
|
||||
assert submissions[0]['message'] == 'hi, my name is bruce!'
|
||||
|
||||
r = client.get('/forms/' + form_endpoint + '.csv')
|
||||
lines = r.data.decode('utf-8').splitlines()
|
||||
assert len(lines) == 2
|
||||
assert lines[0] == 'date,message,name'
|
||||
assert '"hi in my name is bruce!"', lines[1]
|
||||
|
||||
# test submissions endpoint with the user downgraded
|
||||
user.upgraded = False
|
||||
DB.session.add(user)
|
||||
DB.session.commit()
|
||||
r = client.get('/forms/' + form_endpoint + '/')
|
||||
assert r.status_code == 402 # it should fail
|
||||
|
||||
# test submissions endpoint without a logged user
|
||||
client.get('/logout')
|
||||
r = client.get('/forms/' + form_endpoint + '/')
|
||||
assert r.status_code == 302 # it should return a redirect (via @user_required
|
|
@ -0,0 +1,85 @@
|
|||
import json
|
||||
|
||||
from formspree.forms.models import Form
|
||||
from formspree.stuff import DB
|
||||
from formspree import settings
|
||||
|
||||
def test_various_content_types(client, msend):
|
||||
r = client.post('/bob@testwebsite.com',
|
||||
headers = {'Referer': 'http://testwebsite.com'},
|
||||
data={'name': 'bob'}
|
||||
)
|
||||
f = Form.query.first()
|
||||
f.confirm_sent = True
|
||||
f.confirmed = True
|
||||
DB.session.add(f)
|
||||
DB.session.commit()
|
||||
|
||||
def isjson(res):
|
||||
try:
|
||||
d = json.loads(res.data.decode('utf-8'))
|
||||
assert isinstance(d, dict)
|
||||
assert 'success' in d
|
||||
assert res.mimetype == 'application/json'
|
||||
except ValueError as e:
|
||||
assert not e
|
||||
|
||||
def ishtml(res):
|
||||
try:
|
||||
d = json.loads(res.data.decode('utf-8'))
|
||||
assert not isinstance(d, dict)
|
||||
except ValueError:
|
||||
assert res.mimetype == 'text/html'
|
||||
assert res.status_code == 302
|
||||
|
||||
types = [
|
||||
# content-type # accept # check
|
||||
(None, None, ishtml),
|
||||
('application/json', 'text/json', isjson),
|
||||
('application/json', 'application/json', isjson),
|
||||
(None, 'application/json', isjson),
|
||||
(None, 'application/json, text/javascript, */*; q=0.01', isjson),
|
||||
('application/json', None, isjson),
|
||||
('application/json', 'application/json, text/plain, */*', isjson),
|
||||
('application/x-www-form-urlencoded', 'application/json', isjson),
|
||||
('application/x-www-form-urlencoded', 'application/json, text/plain, */*', isjson),
|
||||
('application/x-www-form-urlencoded', None, ishtml),
|
||||
(None, 'text/html', ishtml),
|
||||
('application/json', 'text/html', ishtml),
|
||||
('application/json', 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', ishtml),
|
||||
('application/x-www-form-urlencoded', 'text/html, */*; q=0.01', ishtml),
|
||||
(None, 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', ishtml)
|
||||
]
|
||||
|
||||
# for this test only we will relax this limit.
|
||||
default_limit = settings.MONTHLY_SUBMISSIONS_LIMIT
|
||||
settings.MONTHLY_SUBMISSIONS_LIMIT = len(types)
|
||||
|
||||
for ct, acc, check in types:
|
||||
headers = {'Referer': 'http://testwebsite.com'}
|
||||
if ct:
|
||||
headers['Content-Type'] = ct
|
||||
if acc:
|
||||
headers['Accept'] = acc
|
||||
|
||||
data = {'name': 'bob'}
|
||||
data = json.dumps(data) if ct and 'json' in ct else data
|
||||
|
||||
res = client.post('/bob@testwebsite.com',
|
||||
headers=headers,
|
||||
data=data
|
||||
)
|
||||
check(res)
|
||||
|
||||
# test all combinations again, but with X-Requested-With header
|
||||
# and expect json in all of them
|
||||
headers['X-Requested-With'] = 'XMLHttpRequest'
|
||||
|
||||
res = client.post('/bob@testwebsite.com',
|
||||
headers=headers,
|
||||
data=data
|
||||
)
|
||||
isjson(res)
|
||||
|
||||
# then we put the default limit back
|
||||
settings.MONTHLY_SUBMISSIONS_LIMIT = default_limit
|
|
@ -0,0 +1,136 @@
|
|||
import json
|
||||
|
||||
from formspree import settings
|
||||
from formspree.stuff import DB
|
||||
from formspree.users.models import User, Email
|
||||
from formspree.forms.models import Form
|
||||
|
||||
from .conftest import parse_confirmation_link_sent
|
||||
|
||||
def test_user_registers_and_adds_emails(client, msend):
|
||||
# register
|
||||
r = client.post('/register',
|
||||
data={'email': 'alice@springs.com',
|
||||
'password': 'canada'}
|
||||
)
|
||||
assert r.status_code == 302
|
||||
assert r.location.endswith('/account')
|
||||
assert 1 == User.query.count()
|
||||
|
||||
# add more emails
|
||||
user = User.query.filter_by(email='alice@springs.com').first()
|
||||
emails = ['alice@example.com', 'team@alice.com', 'extra@email.io']
|
||||
for i, addr in enumerate(emails):
|
||||
client.post('/account/add-email', data={'address': addr})
|
||||
|
||||
link, qs = parse_confirmation_link_sent(msend.call_args[1]['text'])
|
||||
client.get(link, query_string=qs)
|
||||
|
||||
email = Email.query.get([addr, user.id])
|
||||
assert Email.query.count() == i+1 # do not count alice@springs.com
|
||||
assert email is not None
|
||||
assert email.owner_id == user.id
|
||||
|
||||
def test_user_gets_previous_forms_assigned_to_him(client, msend):
|
||||
# verify a form for márkö@example.com
|
||||
client.post(u'/márkö@example.com',
|
||||
headers = {'Referer': 'tomatoes.com'},
|
||||
data={'name': 'alice'}
|
||||
)
|
||||
f = Form.query.filter_by(host='tomatoes.com', email=u'márkö@example.com').first()
|
||||
f.confirm_sent = True
|
||||
f.confirmed = True
|
||||
DB.session.add(f)
|
||||
DB.session.commit()
|
||||
|
||||
# register márkö@example.com
|
||||
r = client.post('/register',
|
||||
data={'email': u'márkö@example.com',
|
||||
'password': 'russia'}
|
||||
)
|
||||
|
||||
# confirm that the user account doesn't have access to the form
|
||||
r = client.get('/forms',
|
||||
headers={'Accept': 'application/json'},
|
||||
)
|
||||
forms = json.loads(r.data.decode('utf-8'))['forms']
|
||||
assert 0 == len(forms)
|
||||
|
||||
# verify user email
|
||||
link, qs = parse_confirmation_link_sent(msend.call_args[1]['text'])
|
||||
client.get(link, query_string=qs)
|
||||
|
||||
# confirm that the user has no access to the form since he is not upgraded
|
||||
r = client.get('/forms',
|
||||
headers={'Accept': 'application/json'},
|
||||
)
|
||||
forms = json.loads(r.data.decode('utf-8'))['forms']
|
||||
assert 0 == len(forms)
|
||||
|
||||
# upgrade user
|
||||
user = User.query.filter_by(email=u'márkö@example.com').first()
|
||||
user.upgraded = True
|
||||
DB.session.add(user)
|
||||
DB.session.commit()
|
||||
|
||||
# confirm that the user account has access to the form
|
||||
r = client.get('/forms',
|
||||
headers={'Accept': 'application/json'},
|
||||
)
|
||||
forms = json.loads(r.data.decode('utf-8'))['forms']
|
||||
assert 1 == len(forms)
|
||||
assert forms[0]['email'] == u'márkö@example.com'
|
||||
assert forms[0]['host'] == 'tomatoes.com'
|
||||
|
||||
# verify a form for another address
|
||||
r = client.post('/contact@mark.com',
|
||||
headers = {'Referer': 'mark.com'},
|
||||
data={'name': 'luke'}
|
||||
)
|
||||
f = Form.query.filter_by(host='mark.com', email='contact@mark.com').first()
|
||||
f.confirm_sent = True
|
||||
f.confirmed = True
|
||||
DB.session.add(f)
|
||||
DB.session.commit()
|
||||
|
||||
# confirm that the user account doesn't have access to the form
|
||||
r = client.get('/forms',
|
||||
headers={'Accept': 'application/json'},
|
||||
)
|
||||
forms = json.loads(r.data.decode('utf-8'))['forms']
|
||||
assert 1 == len(forms)
|
||||
|
||||
# add this other email address to user account
|
||||
client.post('/account/add-email', data={'address': 'contact@mark.com'})
|
||||
|
||||
link, qs = parse_confirmation_link_sent(msend.call_args[1]['text'])
|
||||
client.get(link, query_string=qs)
|
||||
|
||||
# confirm that the user account now has access to the form
|
||||
r = client.get('/forms',
|
||||
headers={'Accept': 'application/json'},
|
||||
)
|
||||
forms = json.loads(r.data.decode('utf-8'))['forms']
|
||||
assert 2 == len(forms)
|
||||
assert forms[0]['email'] == 'contact@mark.com' # forms are sorted by -id, so the newer comes first
|
||||
assert forms[0]['host'] == 'mark.com'
|
||||
|
||||
# create a new form spontaneously with an email already verified
|
||||
r = client.post(u'/márkö@example.com',
|
||||
headers = {'Referer': 'elsewhere.com'},
|
||||
data={'name': 'luke'}
|
||||
)
|
||||
f = Form.query.filter_by(host='elsewhere.com', email=u'márkö@example.com').first()
|
||||
f.confirm_sent = True
|
||||
f.confirmed = True
|
||||
DB.session.add(f)
|
||||
DB.session.commit()
|
||||
|
||||
# confirm that the user has already accessto that form
|
||||
r = client.get('/forms',
|
||||
headers={'Accept': 'application/json'},
|
||||
)
|
||||
forms = json.loads(r.data.decode('utf-8'))['forms']
|
||||
assert 3 == len(forms)
|
||||
assert forms[0]['email'] == u'márkö@example.com'
|
||||
assert forms[0]['host'] == 'elsewhere.com'
|
|
@ -0,0 +1,335 @@
|
|||
import json
|
||||
|
||||
from formspree import settings
|
||||
from formspree.stuff import DB
|
||||
from formspree.forms.helpers import HASH
|
||||
from formspree.users.models import User, Email
|
||||
from formspree.forms.models import Form, Submission
|
||||
|
||||
def test_form_creation(client, msend):
|
||||
# register user
|
||||
r = client.post('/register',
|
||||
data={'email': 'colorado@springs.com',
|
||||
'password': 'banana'}
|
||||
)
|
||||
assert r.status_code == 302
|
||||
assert 1 == User.query.count()
|
||||
|
||||
# fail to create form
|
||||
r = client.post('/forms',
|
||||
headers={'Content-type': 'application/json'},
|
||||
data={'email': 'hope@springs.com'}
|
||||
)
|
||||
assert r.status_code == 402
|
||||
assert 'error' in json.loads(r.data.decode('utf-8'))
|
||||
assert 0 == Form.query.count()
|
||||
|
||||
# upgrade user manually
|
||||
user = User.query.filter_by(email='colorado@springs.com').first()
|
||||
user.upgraded = True
|
||||
DB.session.add(user)
|
||||
DB.session.commit()
|
||||
|
||||
# successfully create form
|
||||
r = client.post('/forms',
|
||||
headers={'Accept': 'application/json', 'Content-type': 'application/json'},
|
||||
data=json.dumps({'email': 'hope@springs.com'})
|
||||
)
|
||||
resp = json.loads(r.data.decode('utf-8'))
|
||||
assert r.status_code == 200
|
||||
assert 'submission_url' in resp
|
||||
assert 'hashid' in resp
|
||||
form_endpoint = resp['hashid']
|
||||
assert resp['hashid'] in resp['submission_url']
|
||||
assert 1 == Form.query.count()
|
||||
assert Form.query.first().id == Form.get_with_hashid(resp['hashid']).id
|
||||
|
||||
# post to form
|
||||
r = client.post('/' + form_endpoint,
|
||||
headers={'Referer': 'http://testsite.com'},
|
||||
data={'name': 'bruce'}
|
||||
)
|
||||
assert 'sent an email confirmation' in r.data.decode('utf-8')
|
||||
assert 'confirm your email' in msend.call_args[1]['text']
|
||||
assert 1 == Form.query.count()
|
||||
|
||||
# confirm form
|
||||
form = Form.query.first()
|
||||
client.get('/confirm/%s:%s' % (HASH(form.email, str(form.id)), form.hashid))
|
||||
assert Form.query.first().confirmed
|
||||
|
||||
# Make sure that it marks the first form as AJAX
|
||||
assert Form.query.first().uses_ajax
|
||||
|
||||
# send 5 forms (monthly limits should not apply to the upgraded user)
|
||||
assert settings.MONTHLY_SUBMISSIONS_LIMIT == 2
|
||||
for i in range(5):
|
||||
r = client.post('/' + form_endpoint,
|
||||
headers={'Referer': 'testsite.com'},
|
||||
data={'name': 'ana',
|
||||
'submission': '__%s__' % i}
|
||||
)
|
||||
form = Form.query.first()
|
||||
assert form.counter == 5
|
||||
assert form.get_monthly_counter() == 5
|
||||
assert 'ana' in msend.call_args[1]['text']
|
||||
assert '__4__' in msend.call_args[1]['text']
|
||||
assert 'You are past our limit' not in msend.call_args[1]['text']
|
||||
|
||||
# try (and fail) to submit from a different host
|
||||
r = client.post('/' + form_endpoint,
|
||||
headers={'Referer': 'bad.com'},
|
||||
data={'name': 'usurper'}
|
||||
)
|
||||
assert r.status_code == 403
|
||||
assert 'ana' in msend.call_args[1]['text'] # no more data is sent to sendgrid
|
||||
assert '__4__' in msend.call_args[1]['text']
|
||||
|
||||
def test_form_creation_with_a_registered_email(client, msend):
|
||||
# register user
|
||||
r = client.post('/register',
|
||||
data={'email': 'user@testsite.com', 'password': 'banana'})
|
||||
# upgrade user manually
|
||||
user = User.query.filter_by(email='user@testsite.com').first()
|
||||
user.upgraded = True
|
||||
DB.session.add(user)
|
||||
DB.session.commit()
|
||||
|
||||
# creating a form without providing an url should not send verification email
|
||||
msend.reset_mock()
|
||||
r = client.post('/forms',
|
||||
headers={'Accept': 'application/json', 'Content-type': 'application/json'},
|
||||
data=json.dumps({'email': 'email@testsite.com'})
|
||||
)
|
||||
assert not msend.called
|
||||
|
||||
# create form without a confirmed email should send a verification email
|
||||
msend.reset_mock()
|
||||
r = client.post('/forms',
|
||||
headers={'Accept': 'application/json', 'Content-type': 'application/json'},
|
||||
data=json.dumps({'email': 'email@testsite.com',
|
||||
'url': 'https://www.testsite.com/contact.html'})
|
||||
)
|
||||
resp = json.loads(r.data.decode('utf-8'))
|
||||
assert resp['confirmed'] == False
|
||||
assert msend.called
|
||||
assert 'Confirm email for' in msend.call_args[1]['subject']
|
||||
assert 'www.testsite.com/contact.html' in msend.call_args[1]['text']
|
||||
|
||||
# manually verify an email
|
||||
email = Email()
|
||||
email.address = 'owned-by@testsite.com'
|
||||
email.owner_id = user.id
|
||||
DB.session.add(email)
|
||||
DB.session.commit()
|
||||
|
||||
# create a form with the verified email address
|
||||
r = client.post('/forms',
|
||||
headers={'Accept': 'application/json', 'Content-type': 'application/json'},
|
||||
data=json.dumps({'email': 'owned-by@testsite.com',
|
||||
'url': 'https://www.testsite.com/about.html'})
|
||||
)
|
||||
resp = json.loads(r.data.decode('utf-8'))
|
||||
assert resp['confirmed'] == True
|
||||
assert 'www.testsite.com/contact.html' in msend.call_args[1]['text'] # same as the last, means no new request was made
|
||||
|
||||
# should have three created forms in the end
|
||||
assert Form.query.count() == 3
|
||||
|
||||
def test_sitewide_forms(client, msend, mocker):
|
||||
m_sitewidecheck = mocker.patch(
|
||||
'formspree.forms.views.sitewide_file_check',
|
||||
side_effect=[True, True, True]
|
||||
)
|
||||
|
||||
# register user
|
||||
r = client.post('/register',
|
||||
data={'email': 'user@testsite.com',
|
||||
'password': 'banana'}
|
||||
)
|
||||
# upgrade user manually
|
||||
user = User.query.filter_by(email='user@testsite.com').first()
|
||||
user.upgraded = True
|
||||
DB.session.add(user)
|
||||
DB.session.commit()
|
||||
|
||||
# manually verify an email
|
||||
email = Email()
|
||||
email.address = 'myüñìćõð€email@email.com'
|
||||
email.owner_id = user.id
|
||||
DB.session.add(email)
|
||||
DB.session.commit()
|
||||
|
||||
# create a sitewide form with the verified email address
|
||||
r = client.post('/forms',
|
||||
headers={'Accept': 'application/json', 'Content-type': 'application/json'},
|
||||
data=json.dumps({'email': 'myüñìćõð€email@email.com',
|
||||
'url': 'http://mysite.com',
|
||||
'sitewide': 'true'})
|
||||
)
|
||||
resp = json.loads(r.data.decode('utf-8'))
|
||||
|
||||
assert m_sitewidecheck.called
|
||||
assert m_sitewidecheck.call_args[0][1] == 'myüñìćõð€email@email.com'
|
||||
assert resp['confirmed']
|
||||
m_sitewidecheck.reset_mock()
|
||||
|
||||
assert 1 == Form.query.count()
|
||||
forms = Form.query.all()
|
||||
form = forms[0]
|
||||
assert form.sitewide
|
||||
assert form.host == 'mysite.com'
|
||||
|
||||
# submit form
|
||||
r = client.post('/' + form.hashid,
|
||||
headers = {'Referer': 'http://www.mysite.com/hipopotamo', 'content-type': 'application/json'},
|
||||
data=json.dumps({'name': 'alice'})
|
||||
)
|
||||
assert 'alice' in msend.call_args[1]['text']
|
||||
|
||||
client.post('/' + form.hashid,
|
||||
headers = {'Referer': 'http://mysite.com/baleia/urso?w=2', 'content-type': 'application/json'},
|
||||
data=json.dumps({'name': 'maria'})
|
||||
)
|
||||
assert 'maria' in msend.call_args[1]['text']
|
||||
|
||||
client.post('/' + form.hashid,
|
||||
headers = {'Referer': 'http://mysite.com/', 'content-type': 'application/json'},
|
||||
data=json.dumps({'name': 'laura'})
|
||||
)
|
||||
assert 'laura' in msend.call_args[1]['text']
|
||||
|
||||
# another form, now with a www prefix that will be stripped
|
||||
r = client.post('/forms',
|
||||
headers={'Accept': 'application/json', 'Content-type': 'application/json'},
|
||||
data=json.dumps({'email': 'myüñìćõð€email@email.com',
|
||||
'url': 'http://www.naive.com',
|
||||
'sitewide': 'true'})
|
||||
)
|
||||
resp = json.loads(r.data.decode('utf-8'))
|
||||
|
||||
assert m_sitewidecheck.called
|
||||
assert m_sitewidecheck.call_args[0][0] == 'http://www.naive.com'
|
||||
assert resp['confirmed']
|
||||
|
||||
assert 2 == Form.query.count()
|
||||
forms = Form.query.all()
|
||||
form = forms[1]
|
||||
assert form.sitewide
|
||||
assert form.host == 'naive.com'
|
||||
|
||||
# submit form
|
||||
r = client.post('/' + form.hashid,
|
||||
headers={'Referer': 'http://naive.com/hipopotamo', 'content-type': 'application/json'},
|
||||
data=json.dumps({'name': 'alice'})
|
||||
)
|
||||
assert 'alice' in msend.call_args[1]['text']
|
||||
|
||||
client.post('/' + form.hashid,
|
||||
headers={'Referer': 'http://www.naive.com/baleia/urso?w=2', 'content-type': 'application/json'},
|
||||
data=json.dumps({'name': 'maria'})
|
||||
)
|
||||
assert 'maria' in msend.call_args[1]['text']
|
||||
|
||||
client.post('/' + form.hashid,
|
||||
headers={'Referer': 'http://www.naive.com/', 'content-type': 'application/json'},
|
||||
data=json.dumps({'name': 'laura'})
|
||||
)
|
||||
assert 'laura' in msend.call_args[1]['text']
|
||||
|
||||
# create a different form with the same email address, now using unprefixed url
|
||||
r = client.post('/forms',
|
||||
headers={'Accept': 'application/json', 'Content-type': 'application/json'},
|
||||
data=json.dumps({'email': 'myüñìćõð€email@email.com',
|
||||
'url': 'mysite.com',
|
||||
'sitewide': 'true'})
|
||||
)
|
||||
resp = json.loads(r.data.decode('utf-8'))
|
||||
|
||||
def test_form_settings(client, msend):
|
||||
# register and upgrade user
|
||||
client.post('/register', data={'email': 'texas@springs.com', 'password': 'water'})
|
||||
user = User.query.filter_by(email='texas@springs.com').first()
|
||||
user.upgraded = True
|
||||
DB.session.add(user)
|
||||
DB.session.commit()
|
||||
|
||||
# create and confirm form
|
||||
r = client.post('/forms',
|
||||
headers={'Accept': 'application/json', 'Content-type': 'application/json'},
|
||||
data=json.dumps({'email': 'texas@springs.com'}))
|
||||
resp = json.loads(r.data.decode('utf-8'))
|
||||
form = Form.query.first()
|
||||
form.confirmed = True
|
||||
DB.session.add(form)
|
||||
DB.session.commit()
|
||||
form_endpoint = resp['hashid']
|
||||
|
||||
# disable email notifications on this form
|
||||
client.post('/forms/' + form_endpoint + '/toggle-emails',
|
||||
headers={'Referer': settings.SERVICE_URL},
|
||||
content_type='application/json',
|
||||
data=json.dumps({'checked': False}))
|
||||
assert Form.query.first().disable_email
|
||||
|
||||
# post to form
|
||||
client.post('/' + form_endpoint,
|
||||
headers={'Referer': 'http://testsite.com'},
|
||||
data={'name': 'bruce'})
|
||||
# make sure it doesn't send the email
|
||||
assert 'Someone just submitted your form' not in msend.call_args[1]['text']
|
||||
|
||||
# disable archive storage on this form
|
||||
client.post('/forms/' + form_endpoint + '/toggle-storage',
|
||||
headers={'Referer': settings.SERVICE_URL},
|
||||
content_type='application/json',
|
||||
data=json.dumps({'checked': False}))
|
||||
assert Form.query.first().disable_storage
|
||||
|
||||
# make sure that we know there's one submission in database from first submission
|
||||
assert 1 == Submission.query.count()
|
||||
|
||||
# make sure that the submission wasn't stored in the database
|
||||
# post to form
|
||||
client.post('/' + form_endpoint,
|
||||
headers={'Referer': 'http://testsite.com'},
|
||||
data={'name': 'wayne'})
|
||||
assert 1 == Submission.query.count()
|
||||
|
||||
# enable email notifications on this form
|
||||
client.post('/forms/' + form_endpoint + '/toggle-emails',
|
||||
headers={'Referer': settings.SERVICE_URL},
|
||||
content_type='application/json',
|
||||
data=json.dumps({'checked': True}))
|
||||
assert not Form.query.first().disable_email
|
||||
|
||||
# make sure that our form still isn't storing submissions
|
||||
assert 1 == Submission.query.count()
|
||||
|
||||
# enable archive storage again
|
||||
client.post('/forms/' + form_endpoint + '/toggle-storage',
|
||||
headers={'Referer': settings.SERVICE_URL},
|
||||
content_type='application/json',
|
||||
data=json.dumps({'checked': True}))
|
||||
assert not Form.query.first().disable_storage
|
||||
|
||||
# post to form again this time it should store the submission
|
||||
client.post('/' + form_endpoint,
|
||||
headers={'Referer': 'http://testsite.com'},
|
||||
data={'name': 'luke'})
|
||||
assert 2 == Submission.query.filter_by(form_id=form.id).count()
|
||||
|
||||
# check captcha disabling
|
||||
assert not Form.query.first().captcha_disabled
|
||||
|
||||
client.post('/forms/' + form_endpoint + '/toggle-recaptcha',
|
||||
headers={'Referer': settings.SERVICE_URL},
|
||||
content_type='application/json',
|
||||
data=json.dumps({'checked': False}))
|
||||
assert Form.query.first().captcha_disabled
|
||||
|
||||
client.post('/forms/' + form_endpoint + '/toggle-recaptcha',
|
||||
headers={'Referer': settings.SERVICE_URL},
|
||||
content_type='application/json',
|
||||
data=json.dumps({'checked': True}))
|
||||
assert not Form.query.first().captcha_disabled
|
|
@ -0,0 +1,278 @@
|
|||
from formspree import settings
|
||||
from formspree.stuff import DB
|
||||
from formspree.forms.models import Form
|
||||
from formspree.users.models import User, Email
|
||||
|
||||
http_headers = {
|
||||
'Referer': 'testwebsite.com'
|
||||
}
|
||||
|
||||
def test_index_page(client):
|
||||
r = client.get('/')
|
||||
assert 200 == r.status_code
|
||||
|
||||
def test_thanks_page(client):
|
||||
r = client.get('/thanks')
|
||||
assert r.status_code == 200
|
||||
|
||||
# test XSS
|
||||
r = client.get('/thanks?next=javascript:alert(document.domain)')
|
||||
assert r.status_code == 400
|
||||
|
||||
r = client.get('/thanks?next=https%3A%2F%2Fformspree.io')
|
||||
assert r.status_code == 200
|
||||
|
||||
def test_submit_form(client, msend):
|
||||
client.post('/alice@testwebsite.com',
|
||||
headers=http_headers,
|
||||
data={'name': 'alice', '_subject': 'my-nice-subject'}
|
||||
)
|
||||
assert 1 == Form.query.count()
|
||||
f = Form.query.first()
|
||||
f.confirmed = True
|
||||
|
||||
client.post('/alice@testwebsite.com',
|
||||
headers=http_headers,
|
||||
data={'name': 'alice',
|
||||
'_subject': 'my-nice-subject',
|
||||
'_format': 'plain'}
|
||||
)
|
||||
assert 'my-nice-subject' in msend.call_args[1]['subject']
|
||||
assert '_subject' not in msend.call_args[1]['text']
|
||||
assert '_format' not in msend.call_args[1]['text']
|
||||
assert 'plain' not in msend.call_args[1]['text']
|
||||
|
||||
def test_fail_form_without_header(client, msend):
|
||||
msend.reset_mock()
|
||||
no_referer = http_headers.copy()
|
||||
del no_referer['Referer']
|
||||
r = client.post('/bob@testwebsite.com',
|
||||
headers = no_referer,
|
||||
data={'name': 'bob'}
|
||||
)
|
||||
assert 200 != r.status_code
|
||||
assert not msend.called
|
||||
assert 0 == Form.query.count()
|
||||
|
||||
def test_fail_form_spoof_formspree(client, msend):
|
||||
msend.reset_mock()
|
||||
r = client.post('/alice@testwebsite.com',
|
||||
headers={'Referer': settings.SERVICE_URL},
|
||||
data={'name': 'alice', '_subject': 'my-nice-subject'}
|
||||
)
|
||||
assert "Unable to submit form" in r.data.decode('utf-8')
|
||||
assert 200 != r.status_code
|
||||
assert not msend.called
|
||||
assert 0 == Form.query.count()
|
||||
|
||||
def test_fail_but_appears_to_have_succeeded_with_gotcha(client, msend):
|
||||
# manually confirm
|
||||
r = client.post('/carlitos@testwebsite.com',
|
||||
headers = {'Referer': 'http://carlitos.net/'},
|
||||
data={'name': 'carlitos'}
|
||||
)
|
||||
f = Form.query.first()
|
||||
f.confirm_sent = True
|
||||
f.confirmed = True
|
||||
DB.session.add(f)
|
||||
DB.session.commit()
|
||||
|
||||
msend.reset_mock()
|
||||
|
||||
r = client.post('/carlitos@testwebsite.com',
|
||||
headers = {'Referer': 'http://carlitos.net/'},
|
||||
data={'name': 'Real Stock', '_gotcha': 'The best offers.'}
|
||||
)
|
||||
assert not msend.called
|
||||
assert 302 == r.status_code
|
||||
assert 0 == Form.query.first().counter
|
||||
|
||||
def test_fail_with_invalid_reply_to(client, msend):
|
||||
# manually confirm
|
||||
r = client.post('/carlitos@testwebsite.com',
|
||||
headers = {'Referer': 'http://carlitos.net/'},
|
||||
data={'name': 'carlitos'}
|
||||
)
|
||||
f = Form.query.first()
|
||||
f.confirm_sent = True
|
||||
f.confirmed = True
|
||||
DB.session.add(f)
|
||||
DB.session.commit()
|
||||
|
||||
# fail with an invalid '_replyto'
|
||||
msend.reset_mock()
|
||||
|
||||
r = client.post('/carlitos@testwebsite.com',
|
||||
headers = {'Referer': 'http://carlitos.net/'},
|
||||
data={'name': 'Real Stock', '_replyto': 'The best offers.'}
|
||||
)
|
||||
assert not msend.called
|
||||
assert 400 == r.status_code
|
||||
assert 0 == Form.query.first().counter
|
||||
|
||||
# fail with an invalid 'email'
|
||||
r = client.post('/carlitos@testwebsite.com',
|
||||
headers = {'Referer': 'http://carlitos.net/'},
|
||||
data={'name': 'Real Stock', 'email': 'The best offers.'}
|
||||
)
|
||||
assert not msend.called
|
||||
assert 400 == r.status_code
|
||||
assert 0 == Form.query.first().counter
|
||||
|
||||
def test_fail_ajax_form(client, msend):
|
||||
msend.reset_mock()
|
||||
|
||||
ajax_headers = http_headers.copy()
|
||||
ajax_headers['X_REQUESTED_WITH'] = 'xmlhttprequest'
|
||||
r = client.post('/bob@example.com',
|
||||
headers = ajax_headers,
|
||||
data={'name': 'bob'}
|
||||
)
|
||||
assert not msend.called
|
||||
assert 200 != r.status_code
|
||||
|
||||
def test_activation_workflow(client, msend):
|
||||
r = client.post('/bob@testwebsite.com',
|
||||
headers=http_headers,
|
||||
data={'name': 'bob'}
|
||||
)
|
||||
f = Form.query.first()
|
||||
assert f.email == 'bob@testwebsite.com'
|
||||
assert f.host == 'testwebsite.com'
|
||||
assert f.confirm_sent == True
|
||||
assert f.counter == 0 # the counter shows zero submissions
|
||||
assert f.owner_id == None
|
||||
assert f.get_monthly_counter() == 0 # monthly submissions also 0
|
||||
|
||||
# form has another submission, number of forms in the table should increase?
|
||||
r = client.post('/bob@testwebsite.com',
|
||||
headers=http_headers,
|
||||
data={'name': 'bob'}
|
||||
)
|
||||
number_of_forms = Form.query.count()
|
||||
assert number_of_forms == 1 # still only one form
|
||||
|
||||
# assert form data is still the same
|
||||
f = Form.query.first()
|
||||
assert f.email == 'bob@testwebsite.com'
|
||||
assert f.host == 'testwebsite.com'
|
||||
assert f.confirm_sent == True
|
||||
assert f.counter == 0 # still zero submissions
|
||||
assert f.owner_id == None
|
||||
|
||||
# test clicking of activation link
|
||||
r = client.get('/confirm/%s' % (f.hash,))
|
||||
|
||||
f = Form.query.first()
|
||||
assert f.confirmed == True
|
||||
assert f.counter == 1 # counter has increased
|
||||
assert f.get_monthly_counter() == 1 # monthly submissions also
|
||||
|
||||
# a third submission should now increase the counter
|
||||
r = client.post('/bob@testwebsite.com',
|
||||
headers=http_headers,
|
||||
data={'name': 'bob'}
|
||||
)
|
||||
number_of_forms = Form.query.count()
|
||||
assert number_of_forms == 1 # still only one form
|
||||
|
||||
f = Form.query.first()
|
||||
assert f.email == 'bob@testwebsite.com'
|
||||
assert f.host == 'testwebsite.com'
|
||||
assert f.confirm_sent == True
|
||||
assert f.owner_id == None
|
||||
assert f.counter == 2 # counter has increased
|
||||
assert f.get_monthly_counter() == 2 # monthly submissions also
|
||||
|
||||
def test_monthly_limits(client, msend):
|
||||
# monthly limit is set to 2 during tests
|
||||
assert settings.MONTHLY_SUBMISSIONS_LIMIT == 2
|
||||
|
||||
# manually verify luke@example.com
|
||||
r = client.post('/luke@testwebsite.com',
|
||||
headers=http_headers,
|
||||
data={'name': 'luke'}
|
||||
)
|
||||
f = Form.query.first()
|
||||
f.confirm_sent = True
|
||||
f.confirmed = True
|
||||
DB.session.add(f)
|
||||
DB.session.commit()
|
||||
|
||||
# first submission
|
||||
r = client.post('/luke@testwebsite.com',
|
||||
headers=http_headers,
|
||||
data={'name': 'peter'}
|
||||
)
|
||||
assert r.status_code == 302
|
||||
assert 'peter' in msend.call_args[1]['text']
|
||||
|
||||
# second submission
|
||||
r = client.post('/luke@testwebsite.com',
|
||||
headers=http_headers,
|
||||
data={'name': 'ana'}
|
||||
)
|
||||
assert r.status_code == 302
|
||||
assert 'ana' in msend.call_args[1]['text']
|
||||
|
||||
# third submission, now we're over the limit
|
||||
r = client.post('/luke@testwebsite.com',
|
||||
headers=http_headers,
|
||||
data={'name': 'maria'}
|
||||
)
|
||||
assert r.status_code == 302 # the response to the user is the same
|
||||
# being the form over the limits or not
|
||||
|
||||
# but the mocked sendgrid should never receive this last form
|
||||
assert 'maria' not in msend.call_args[1]['text']
|
||||
assert 'You are past our limit' in msend.call_args[1]['text']
|
||||
|
||||
# all the other variables are ok:
|
||||
assert 1 == Form.query.count()
|
||||
f = Form.query.first()
|
||||
assert f.counter == 3
|
||||
assert f.get_monthly_counter() == 3 # the counters mark 4
|
||||
|
||||
# the user pays and becomes upgraded
|
||||
r = client.post('/register',
|
||||
data={'email': 'luke@testwebsite.com',
|
||||
'password': 'banana'}
|
||||
)
|
||||
user = User.query.filter_by(email='luke@testwebsite.com').first()
|
||||
user.upgraded = True
|
||||
user.emails = [Email(address='luke@testwebsite.com')]
|
||||
DB.session.add(user)
|
||||
DB.session.commit()
|
||||
|
||||
# the user should receive form posts again
|
||||
r = client.post('/luke@testwebsite.com',
|
||||
headers=http_headers,
|
||||
data={'name': 'noah'}
|
||||
)
|
||||
assert r.status_code == 302
|
||||
assert 'noah' in msend.call_args[1]['text']
|
||||
|
||||
def test_first_submission_is_stored(client, msend):
|
||||
r = client.post('/what@firstsubmissed.com',
|
||||
headers=http_headers,
|
||||
data={'missed': 'this was important'}
|
||||
)
|
||||
f = Form.query.first()
|
||||
assert f.email == 'what@firstsubmissed.com'
|
||||
assert f.confirm_sent == True
|
||||
assert f.counter == 0 # the counter shows zero submissions
|
||||
assert f.get_monthly_counter() == 0 # monthly submissions also 0
|
||||
|
||||
# got a confirmation email
|
||||
assert 'one step away' in msend.call_args[1]['text']
|
||||
|
||||
# clicking of activation link
|
||||
client.get('/confirm/%s' % (f.hash,))
|
||||
|
||||
f = Form.query.first()
|
||||
assert f.confirmed == True
|
||||
assert f.counter == 1 # counter has increased
|
||||
assert f.get_monthly_counter() == 1 # monthly submissions also
|
||||
|
||||
# got the first (missed) submission
|
||||
assert 'this was important' in msend.call_args[1]['text']
|
|
@ -0,0 +1,36 @@
|
|||
from urllib.parse import unquote
|
||||
|
||||
from formspree.forms.models import Form
|
||||
from formspree import settings
|
||||
from formspree.stuff import DB
|
||||
|
||||
def test_list_unsubscribe(client, msend):
|
||||
r = client.post('/bob@testwebsite.com',
|
||||
headers = {'Referer': 'http://testwebsite.com'},
|
||||
data={'name': 'bob'}
|
||||
)
|
||||
f = Form.query.first()
|
||||
|
||||
# List-Unsubscribe header is sent (it is surrounded by brackets <>)
|
||||
list_unsubscribe_url = msend.call_args[1]['headers']['List-Unsubscribe'][1:-1]
|
||||
|
||||
f.confirmed = True
|
||||
DB.session.add(f)
|
||||
DB.session.commit()
|
||||
|
||||
r = client.post('/bob@testwebsite.com',
|
||||
headers = {'Referer': 'http://testwebsite.com'},
|
||||
data={'name': 'carol'}
|
||||
)
|
||||
|
||||
assert r.status_code == 302
|
||||
|
||||
# List-Unsubscribe is present on normal submission
|
||||
assert msend.call_args[1]['headers']['List-Unsubscribe'][1:-1] == list_unsubscribe_url
|
||||
|
||||
r = client.post(list_unsubscribe_url)
|
||||
assert r.status_code == 200
|
||||
|
||||
f = Form.query.first()
|
||||
assert f.confirm_sent == True
|
||||
assert f.confirmed == False
|
|
@ -0,0 +1,33 @@
|
|||
from formspree import settings
|
||||
from formspree.stuff import DB
|
||||
from formspree.forms.models import Form
|
||||
|
||||
def test_rate_limiting_on_form_posts(client, msend):
|
||||
# confirm a form
|
||||
client.post('/alice@example.com',
|
||||
headers={'referer': 'http://somewhere.com'},
|
||||
data={'name': 'john'}
|
||||
)
|
||||
form = Form.query.filter_by(host='somewhere.com', email='alice@example.com').first()
|
||||
form.confirmed = True
|
||||
DB.session.add(form)
|
||||
DB.session.commit()
|
||||
|
||||
# submit form many times
|
||||
replies = []
|
||||
for _ in range(1000):
|
||||
r = client.post('/alice@example.com',
|
||||
headers={'referer': 'http://somewhere.com'},
|
||||
data={'name': 'attacker'}
|
||||
)
|
||||
replies.append(r.status_code)
|
||||
|
||||
limit = int(settings.RATE_LIMIT.split(' ')[0])
|
||||
|
||||
# the number of submissions should not be more than the rate limit
|
||||
form = Form.query.filter_by(host='somewhere.com', email='alice@example.com').first()
|
||||
assert form.counter < limit
|
||||
|
||||
# should have gotten some 302 and then many 429 responses
|
||||
assert replies.count(302) <= limit
|
||||
assert replies.count(429) >= 900 - limit
|
|
@ -0,0 +1,20 @@
|
|||
from formspree.utils import next_url
|
||||
|
||||
def test_next_url(client):
|
||||
# thanks route should have the referrer as its 'next'
|
||||
assert '/thanks?next=http%3A%2F%2Ffun.io' == next_url(referrer='http://fun.io')
|
||||
|
||||
# No referrer and relative next url should result in proper relative next url.
|
||||
assert '/thank-you' == next_url(next='/thank-you')
|
||||
|
||||
# No referrer and absolute next url should result in proper absolute next url.
|
||||
assert 'http://somesite.org/thank-you' == next_url(next='http://somesite.org/thank-you')
|
||||
|
||||
# Referrer set and relative next url should result in proper absolute next url.
|
||||
assert 'http://fun.io/' == next_url(referrer='http://fun.io', next='/')
|
||||
assert 'http://fun.io/thanks.html' == next_url(referrer='http://fun.io', next='thanks.html')
|
||||
assert 'http://fun.io/thanks.html' == next_url(referrer='http://fun.io', next='/thanks.html')
|
||||
|
||||
# Referrer set and absolute next url should result in proper absolute next url.
|
||||
assert 'https://morefun.net/awesome.php' == next_url(referrer='https://fun.io', next='//morefun.net/awesome.php')
|
||||
assert 'http://morefun.net/awesome.php' == next_url(referrer='http://fun.io', next='//morefun.net/awesome.php')
|
Loading…
Reference in New Issue