最近在做一个关于测试实验,特地记录一下关键代码
代码如下
(注意,ip_commitlimit.json文件要有数据)
# 读文件
def readfile(pathname):
try:
with open(pathname, 'r', encoding='utf-8') as f:
return f.read()
except Exception as e:
print(e)
# 写入文件:默认将数据转化为json并换行
def writfile(pathname,data,writtype='a',datatype='json',line='y'):
try:
with open(pathname, writtype, encoding='utf-8') as f:
if datatype == 'json':
if line == 'y':
f.write(json.dumps(data, ensure_ascii=False,indent=2) + '\n')
else:
f.write(json.dumps(data, ensure_ascii=False,indent=2))
else:
if line == 'y':
f.write(data + '\n')
else:
f.write(data)
except Exception as e:
print(e)
# 限制ip访问次数 使用文件,后期数据会越来越大需要迁移到数据库中
def ip_commitlimit(ip,number=10,pathname='../ip_commitlimit.json'):
currenttime = time.strftime('%Y-%m-%d %H', time.localtime(time.time())).split(' ')
datajson = json.loads(readfile(pathname))
if datajson.get(ip):
if datajson.get(ip).get(currenttime[0]):
if datajson.get(ip).get(currenttime[0]).get(currenttime[1]):
visittimes = datajson.get(ip).get(currenttime[0]).get(currenttime[1]) # 获取访问次数
if visittimes > number:
return {'verify': False, 'data': '访问次数上限!请稍后再访问'}
else:
update = datajson.get(ip).get(currenttime[0])
update.update({currenttime[1]:visittimes+1}) # 更新ip访问次数
writfile(pathname,datajson,'w') # 重写更新字典
return {'verify': True, 'data': f'ip小时内访问次数为:{visittimes}'}
else:
update = datajson.get(ip).get(currenttime[0])
update.update({currenttime[1]:1})
writfile(pathname, datajson, 'w')
return {'verify': True, 'data': 'ip小时内无访问次数'}
else:
update = datajson.get(ip)
update.update({currenttime[0]: {currenttime[1]:1}})
writfile(pathname, datajson, 'w')
return {'verify': True, 'data': 'ip当天无访问次数'}
else:
update = datajson
update.update({ip:{currenttime[0]: {currenttime[1]: 1}}})
writfile(pathname, datajson, 'w')
return {'verify': True, 'data': 'ip未访问过'}
这三个函数分别是读文件、写文件、限制IP的主要功能函数。
限制IP函数 ip_commitlimit 传参:ip(如何获取IP参考下面)、限制访问次数(默认一个小时内10次,注意这里的小时是按照本地的小时时间,并不是访问时间间隔)、文件路径
我觉得这种方式太水了,但目前还能用。(注意,json文件要有数据)
使用方式
# 验证ip次数
ip_commitlimit(ip).get('verify'):
print('验证通过')
else:
print('ip已经访问10次了')
获取用户IP
# 判断是否使用代理,如果使用代理尝试获取源IP,注意返回值应该是列表,注意修改
if request.META.get('HTTP_X_FORWARDED_FOR'):
ip = request.META.get('HTTP_X_FORWARDED_FOR')
else:
ip = request.META.get('REMOTE_ADDR')
我使用的方式
from django.shortcuts import render
# Create your views here.
from django.http import HttpResponse,HttpResponseRedirect
from django.contrib import messages
from .verify import verify,ip_commitlimit # 这两个是我自己写的函数
def index(request):
if request.META.get('HTTP_X_FORWARDED_FOR'):
ip = request.META.get('HTTP_X_FORWARDED_FOR')
else:
ip = request.META.get('REMOTE_ADDR')
if ip_commitlimit(str(ip)).get('verify'):
return render(request, 'stepFrequency/index.html', {'data': 'json数据'}, )
else:
return HttpResponseRedirect('/404')
文件格式:ip_commitlimit.json
ip地址{日期:{小时:访问次数}}
{
"127.0.0.1": {
"2023-02-05": {
"17": 1
}
}
}