仅供个人学习,不得用于牟利,仅供学习参考。定时运行脚本为本人开发,其它脚本是他人的,源作者地址:https://github.com/wangquanfugui233/Bilibili_Python
一、脚本说明
定时任务是cron 表达式,不懂的可以百度搜索 cron 在线生成器,可以帮助你有效定时运行。
定时任务的脚本只需要修改定时运行的时间,也可以自己不修改,使用默认的是没有什么问题的(参加天选到现在写脚本都没有中过奖,主要是每天帮我做一下任务升级一下)。
以下是脚本文件说明:
定时运行脚本:Blilibili_Run.py
检查配置文件:Bilibili_Config.py
参加天选时刻:Bilibili_CTime.py
运行日常任务:Bilibili_Daily.py
取消关注up主:Bilibili_Unfollows.py
中奖通知:Bilibili_Prize.py
关注源作者和其他:Bilibili_User.py
二、配置相关
1.需要安装两个python模块:
pip install requests apscheduler
2.运行定时脚本(Blilibili_Run.py),第一次运行或配置文件不存在自动生成配置文件。
3.获取b站的cookie,如图所示:
(1)登录,点击个人中心
(2)在网页空白处,鼠标右键,点击检查
(3)随便找一个链接,查看是否存在cookie
4.修改配置(Bilibili_config.json),添加自己的cookie,
具体如图:
配置文件说明:
coin:投币, 做任务时投币的数量。
max_pageL: 扫描天选页数。
max_thread: 扫描线程数。
black_list:黑名单, 就是你不想关注的up主mid放在这里,一定要是英文逗号。
white_list:白名单, 就是你不想取关的,和中奖的up主mid会存在这里,一定要是英文逗号。
favorite:指定给up主投币,做日常任务时投币的任务。
follow_author:关注源作者的账号(源作者的账号好像被封了)。不想关注就写False。
多账号格式:—》:{"Cookie": ""},{"Cookie": ""},{"Cookie": ""}….. 一定要是英文逗号。
三、修改定时时间(可不用修改)
定时任务是cron 表达式,不懂的可以百度搜索 cron 在线生成器,可以帮助你有效定时运行。
控制运行时间的四个变量如图所示:
四、脚本
我自己的仓库地址:https://github.com/jiubanyipeng/single/tree/main/Bilibili
源作者的:https://github.com/wangquanfugui233/Bilibili_Python
或者你在线复制也可以,注意文件名一定要对应。
定时运行脚本:Blilibili_Run.py
'''
### 定时脚本
### 其它脚本的使用别人的,源作者地址:https://github.com/wangquanfugui233/Bilibili_Python
### 第一次运行会自己生成 Bilibili_config.json 配置文件
### 配置文件说明:
# coin:投币, 做任务时投币的数量
# max_pageL: 扫描天选页数
# max_thread: 扫描线程数
# black_list:黑名单, 就是你不想关注的up主mid放在这里,一定要是英文逗号
# white_list:白名单, 就是你不想取关的,和中奖的up主mid会存在这里,一定要是英文逗号
# favorite:指定给up主投币,做日常任务时投币的任务
# follow_author:关注源作者的账号(源作者的账号好像被封了)。不想关注就写False
# 多账号格式:---》:{"Cookie": ""},{"Cookie": ""},{"Cookie": ""}..... 一定要是英文逗号
仅供个人学习,不得用于牟利,仅供学习参考
'''
import Bilibili_Config # 运行前检查配置文件
from apscheduler.schedulers.blocking import BlockingScheduler # 定时任务框架
import json
import sys
# 参加天选时刻
def Bilibili_CTime():
import Bilibili_CTime
# 运行日常任务
def Bilibili_Dayly():
import Bilibili_Daily
# 取消关注
def Bilibili_Unfollows():
import Bilibili_Unfollows
# 中奖通知
def Bilibili_Prize():
import Bilibili_Prize
# 运行
def func(tx_cron, rc_cron, qx_cron, zj_cron):
scheduler = BlockingScheduler()
try:
scheduler.add_job(Bilibili_CTime, 'cron', year='*', month='*', day=f'{tx_cron[2]}', hour=f'{tx_cron[1]}', minute=f'{tx_cron[0]}', id='run1', name='参加天选时刻', jitter=120)
scheduler.add_job(Bilibili_Dayly, 'cron', day=f'{rc_cron[2]}', hour=f'{rc_cron[1]}', minute=f'{rc_cron[0]}', id='run2', name='运行日常任务', jitter=120)
scheduler.add_job(Bilibili_Unfollows, 'cron', day=f'{qx_cron[2]}', hour=f'{qx_cron[1]}', minute=f'{qx_cron[0]}', id='run3', name='运行取消关注任务', jitter=120)
scheduler.add_job(Bilibili_Prize, 'cron', day=f'{zj_cron[2]}', hour=f'{zj_cron[1]}', minute=f'{zj_cron[0]}', id='run4', name='中奖通知', jitter=120)
scheduler.start()
except Exception as e:
print(e)
if __name__ == '__main__':
try:
with open('Bilibili_config.json') as f:
if len(json.loads(f.read())['Users'][0]['Cookie']) < 1:
print('cookie为空或没有设置好,先设置好cookie!')
sys.exit()
# 下面的是cron 表达式,不懂的可以百度搜索 cron 在线生成器,可以有效帮助
# 分 时 日 ,为了兼容,必须三个数据,且空格分开,不要多个空格
tx_cron = '12 12-14 *' # 天选时刻,每天运行三次,分别12点12分、13点12分、14点12分开始运行,一天最多只能运行四次,超过cookie过期
rc_cron = '12 20 *' # 日常任务,每天运行1次,每天的15点12分开始运行
qx_cron = '25 20 *' # 取消关注,每天运行1次,每天的15点25分开始运行
zj_cron = '30 20 *' # 中奖通知,每天运行1次,每天的15点30分开始运行
if len(tx_cron.split(' ')) == 3 and len(rc_cron.split(' ')) == 3 and len(qx_cron.split(' ')) == 3 and len(
zj_cron.split(' ')) == 3:
func(tx_cron.split(' '), rc_cron.split(' '), qx_cron.split(' '), zj_cron.split(' '))
else:
print('你的cron值错了,请检查!')
except Exception as e:
print('cookie为空或没有设置好,先设置好cookie!')
检查配置文件:Bilibili_Config.py
"""
cron: 1 1 1 1 *
new Env('哔哩哔哩-【配置】')
定时随意,每次更新运行两下
"""
import json
import logging
import os
class Config:
def __init__(self):
self.headers = {
"authority": "api.bilibili.com",
"method": "GET",
"scheme": "https",
"accept": "*/*",
"accept-language": "zh-CN,zh;q=0.9,en;q=0.8,en-US;q=0.6",
"cookie": "",
"dnt": "1",
"origin": "https://www.bilibili.com",
"sec-ch-ua": '" Not A;Brand";v="99", "Chromium";v="101", "Microsoft Edge";v="101"',
"sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "same-site",
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/101.0.4951.64 Safari/537.36 Edg/101.0.1210.47 ",
"Connection": "close"
}
self.url = "https://api.bilibili.com/x/web-interface/nav"
self.url1 = "https://api.bilibili.com/x/relation/modify"
self.url2 = "https://api.bilibili.com/x/polymer/web-dynamic/v1/feed/all"
self.url3 = "https://api.bilibili.com/x/web-interface/coin/add"
self.url4 = "https://api.bilibili.com/x/web-interface/share/add"
self.url5 = "https://api.bilibili.com/x/web-interface/archive/related"
self.url6 = "https://api.bilibili.com/x/click-interface/web/heartbeat"
self.url7 = "https://api.live.bilibili.com/room/v1/Area/getList"
self.url8 = "https://api.live.bilibili.com/xlive/web-ucenter/v1/sign/DoSign"
self.url_re = "https://api.bilibili.com/x/relation?fid=%s"
self.url9 = "https://api.bilibili.com/x/relation/tags"
self.url10 = "https://api.bilibili.com/x/relation/tag"
self.url11 = "https://api.bilibili.com/x/space/arc/search?mid=%s"
self.url_all = "https://api.live.bilibili.com/xlive/web-interface/v1/second/getList?platform=web" \
"&parent_area_id=%s&area_id=%s&page=%s "
self.url_check = "https://api.live.bilibili.com/xlive/lottery-interface/v1/Anchor/Check?roomid=%s"
self.url_tx = "https://api.live.bilibili.com/xlive/lottery-interface/v1/Anchor/Join"
self.url_relationship = "https://api.bilibili.com/x/relation/tags/moveUsers"
self.url_group = "https://api.bilibili.com/x/relation/tags"
self.create_url = "https://api.bilibili.com/x/relation/tag/create"
self.prize = "https://api.live.bilibili.com/xlive/lottery-interface/v1/Anchor/AwardRecord"
self.send = "https://api.live.bilibili.com/msg/send"
self.clockin_url = "https://manga.bilibili.com/twirp/activity.v1.Activity/ClockIn"
self.ua_list = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/101.0.4951.64 Safari/537.36 Edg/101.0.1210.47 ",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.5060.53 "
"Safari/537.36 Edg/103.0.1264.37",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 "
"Safari/537.36 "
]
logging.basicConfig(level=logging.INFO, format='%(message)s')
self.logger = logging.getLogger(__name__)
def check_json(self):
config = os.getcwd() + '/Bilibili_config.json'
if os.path.exists(config):
self.logger.info("配置文件存在")
return True
else:
self.logger.info("配置文件不存在")
return False
def create_file(self):
with open('./Bilibili_config.json', 'w', encoding='utf-8') as f:
json.dump({"Users": [{"Cookie": ""}], "Drop_coin": [{"coin": 1}], "max_page": 50, "max_thread": 7,
"black_list": [], "white_list": [], "favorite": []}, f, ensure_ascii=False, indent=4)
self.logger.info("Bilibili_config.json文件已经生成,请在文件中添加Cookie 如:")
self.logger.info('"Users": [{"Cookie": "这里是你的cookie"}]')
self.logger.info('默认生成单帐号,投币数量为1,天选页数为50,投币线程数为7,黑名单为空,白名单为空,指定的投币up主为空')
self.logger.info('如需多账号,则如 "Users": [{"Cookie": "这里是你的cookie"}, {"Cookie": "这里是你的cookie"},{}...]')
def update_config(self):
coin = {'coin': 1}
with open("./Bilibili_config.json", 'r', encoding='utf-8') as f:
data = json.load(f)
for i in data['Drop_coin']:
if i.keys() != coin.keys():
self.logger.info("检查到你的投币配置有误,自动为你更新")
data['Drop_coin'].remove(i)
data['Drop_coin'].append(coin)
self.update_json(data)
if "black_list" in data:
self.logger.info("黑名单文件已存在")
else:
self.logger.info("黑名单不存在,创建")
data.update({"black_list": []})
self.update_json(data)
if "white_list" in data:
self.logger.info("白名单文件已存在")
else:
self.logger.info("白名单不存在,创建")
data.update({"white_list": []})
self.update_json(data)
if "Unfollows" in data:
del data['Unfollows']
self.update_json(data)
if "favorite" in data:
self.logger.info("给指定up主投币名单配置存在")
else:
self.logger.info("给指定up主投币名单配置不存在,创建")
data.update({"favorite": []})
self.update_json(data)
if "follow_author" in data:
self.logger.info("关注作者开关存在")
else:
self.logger.info("关注作者开关不存在,默认开启")
data.update({"follow_author": True})
self.update_json(data)
def insert_data(self, num):
for i in range(num):
with open("./Bilibili_config.json", 'r', encoding='utf-8') as f:
data = json.load(f)
data['Drop_coin'].append({'coin': 1})
self.update_json(data)
def update_json(self, data):
with open("./Bilibili_config.json", 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=4)
self.logger.info("更新配置文件完成")
def fetch_cookies(self):
cookies = []
try:
with open('./Bilibili_config.json', 'r', encoding='utf-8') as f:
ck = json.load(f)
for i in ck['Users']:
if i['Cookie'] != "":
cookies.append(i['Cookie'])
else:
self.logger.info("Cookie为空")
return cookies
except Exception as e:
self.logger.error("请检查config文件路径是否存在或者文件是否正确,文件应该与当前文件在同一目录下:" + str(e))
return None
def fetch_csrf(self, cookies):
element = []
csrfs = []
try:
for i in cookies:
str1 = i.split('; ')
for j in str1:
str2 = j.split('=')
element.append(str2)
csrf_dict = dict(element)
csrfs.append(csrf_dict['bili_jct'])
element.clear()
return csrfs
except Exception as e:
self.logger.info("请检查你的cookie是否正确 " + str(e))
return None
def fetch_page(self):
try:
with open('./Bilibili_config.json', 'r', encoding='utf-8') as f:
page = json.load(f)
return page['max_page']
except Exception as e:
self.logger.info(e)
return 0
def fetch_thread(self):
try:
with open('./Bilibili_config.json', 'r', encoding='utf-8') as f:
thread = json.load(f)
if thread['max_thread'] <= 12:
return thread['max_thread']
else:
return 12
except Exception as e:
self.logger.info(e)
return 0
def fetch_drop_coin(self):
coin = []
try:
with open('./Bilibili_config.json', 'r', encoding='utf-8') as f:
drop_coin = json.load(f)
for i in drop_coin['Drop_coin']:
coin.append(i['coin'])
return coin
except Exception as e:
self.logger.info(e)
return 0
def fetch_black_list(self):
try:
with open('./Bilibili_config.json', 'r', encoding='utf-8') as f:
black_list = json.load(f)
return black_list['black_list']
except Exception as e:
self.logger.error(e)
return 0
def fetch_white_list(self):
try:
with open('./Bilibili_config.json', 'r', encoding='utf-8') as f:
white_list = json.load(f)
return white_list['white_list']
except Exception as e:
self.logger.error(e)
return 0
def fetch_favorite(self):
try:
with open('./Bilibili_config.json', 'r', encoding='utf-8') as f:
white_list = json.load(f)
return white_list['favorite']
except Exception as e:
self.logger.error(e)
return None
def fetch_follow(self):
try:
with open('./Bilibili_config.json', 'r', encoding='utf-8') as f:
follow = json.load(f)
return follow['follow_author']
except Exception as e:
self.logger.error(e)
return False
def check_config(self):
cookies = self.fetch_cookies()
csrfs = self.fetch_csrf(cookies)
coins = self.fetch_drop_coin()
if cookies:
if len(cookies) == len(coins) == len(csrfs):
self.logger.info("基础配置文件正确")
self.update_config()
else:
if len(cookies) > len(coins):
self.insert_data(len(cookies) - len(coins))
else:
self.logger.info("投币配置貌似多于帐号数量,不影响程序运行")
else:
self.logger.info("检查一下cookie吧")
def basic_info(self):
self.logger.info("json文件路径为:" + str(os.getcwd()) + "\Bilibili_config.json")
self.logger.info("【这是提醒不是错误】:cookie不能有大括号!!!!!!")
self.logger.info("配置文件说明BV号:BV1SB4y1e7Vr")
self.logger.info('author: github@wangquanfugui233')
if self.check_json():
self.logger.info("开始检查配置文件")
self.check_config()
else:
self.logger.info("开始创建配置文件")
self.create_file()
if __name__ == '__main__':
a = Config()
a.basic_info()
else:
a = Config()
a.basic_info()
参加天选时刻:Bilibili_CTime.py
"""
cron: 1 8-12 * * *
new Env('哔哩哔哩-【天选时刻】')
"""
import logging
import os
import random
import signal
import re
import sys
import time
from concurrent.futures import ThreadPoolExecutor
from Bilibili_User import *
logging.basicConfig(level=logging.INFO, format='%(message)s')
# logger = logging.getLogger()
class Bilibili_CTime(Basic):
def __init__(self):
super().__init__()
self.max_page = self.fetch_page()
self.max_thread = self.fetch_thread()
self.black_list = self.fetch_black_list()
def check_group(self, csrf):
try:
group = self.get_requests(self.url9)
tag_id = self.cope_group(group)
if tag_id is not None:
self.logger.info(f'{"**" * 5}存在天选时刻分组{"**" * 5}')
time.sleep(15)
self.Scan_all_live(tag_id, csrf)
else:
self.logger.info('不存在分组,开始创建')
tag_id = self.create_group(csrf)
self.Scan_all_live(tag_id, csrf)
except Exception as e:
self.logger.info(e)
os.kill(os.getpid(), signal.SIGKILL)
# os._exit(0)
@staticmethod
def cope_group(group):
for i in group['data']:
if i['name'] == '天选时刻':
return i['tagid']
else:
pass
def create_group(self, csrf):
data = {'tag': '天选时刻', 'csrf': csrf}
group = self.post_requests(self.create_url, data)
if group['code'] == 0:
self.logger.info('创建成功')
return group['data']['tagid']
else:
self.logger.info('创建失败,后续关注的up在默认分组')
return 0
def Scan_all_live(self, tag_id, csrf):
all_live = self.get_requests(self.url7)
if all_live['code'] == 0:
self.cope_all_live(all_live, tag_id, csrf)
else:
self.logger.info('扫描全部分区失败')
os.kill(os.getpid(), signal.SIGKILL)
sys.exit(0)
def cope_all_live(self, all_live, tag_id, csrf):
for i in all_live['data']:
min_id = [i['id'] for i in i['list']]
min_name = [i['name'] for i in i['list']]
self.logger.info(f'扫描【{i["name"]}】分区')
self.cycle_live(i['id'], min_id, tag_id, csrf)
def cycle_live(self, parent_id, min_live_id, tag_id, csrf):
with ThreadPoolExecutor(max_workers=self.max_thread) as executor:
task_list = []
for i in min_live_id:
task_list.append(executor.submit(self.cycle_page, parent_id, i, tag_id, csrf))
def cycle_page(self, parent_id, child_id, tag_id, csrf):
for i in range(1, self.max_page + 1):
live_page = self.url_all % (parent_id, child_id, i)
page_info = self.get_requests(live_page)
if page_info['code'] == 0:
if page_info['data']['list']:
self.scan_live_page(page_info, tag_id, csrf)
else:
break
if page_info['code'] == -412:
self.logger.info("检测到被拦截,结束程序,一小时后再试")
os.kill(os.getpid(), signal.SIGKILL)
# os._exit(0)
def scan_live_page(self, page_info, tag_id, csrf):
for i in page_info['data']['list']:
room_info = self.screen_ct_room(i)
if room_info:
self.check_room(room_info[0], room_info[1], tag_id, csrf)
else:
continue
@staticmethod
def screen_ct_room(i):
if '2' in i['pendant_info']:
if i['pendant_info']['2']['content'] == '天选时刻':
return i['roomid'], i['uname']
else:
pass
def check_room(self, room_id, uname, tag_id, csrf):
url = self.url_check % room_id
tx_info = self.get_requests(url)
if tx_info['code'] == 0:
award = self.screen_condition(tx_info['data']['award_name'])
require = self.screen_condition(tx_info['data']['require_text'])
if award or require:
pass
else:
if tx_info['data']['gift_id'] == 0:
self.logger.info(f"【{uname}】--【{tx_info['data']['award_name']}】--【{tx_info['data']['require_text']}】")
self.tx_join(tx_info['data']['ruid'], tx_info['data']['id'], uname, room_id, tag_id, csrf)
else:
pass
@staticmethod
def screen_condition(condition):
pattern = re.compile(r'大航海|舰长|.?车车?|手照|代金券|优惠券|勋章|提督|男')
if pattern.findall(condition):
return True
else:
return False
def tx_join(self, uid, tid, uname, room_id, tag_id, csrf):
if self.black_list_check(uid):
self.logger.info(f"【{uname}】已在黑名单中")
else:
data = {'id': tid, 'platfrom': 'pc', 'roomid': room_id, 'csrf': csrf}
join_info = self.post_requests(self.url_tx, data)
if join_info['code'] == 0:
self.logger.info(f'【成功参加"{uname}"的天选】')
self.send_danmu(room_id, csrf)
self.relationship_check(uid, uname, tag_id, csrf)
else:
self.logger.info(join_info['message'])
def black_list_check(self, uid):
if self.black_list:
for i in self.black_list:
if i == uid:
return True
else:
return False
else:
return False
def relationship_check(self, uid, uname, tag_id, csrf):
url = self.url_re % uid
relationship = self.get_requests(url)
if relationship['code'] == 0:
if relationship['data']['attribute'] == 1 or relationship['data']['attribute'] == 2:
if relationship['data']['tag'] is None:
self.Move_User(tag_id, uid, uname, csrf)
else:
pass
else:
pass
def Move_User(self, gid, uid, uname, csrf):
data = {'beforeTagids': 0, 'afterTagids': gid, 'fids': uid, 'csrf': csrf}
Move_info = self.post_requests(self.url_relationship, data)
if Move_info['code'] == 0:
self.logger.info(f'{uname}->【改变分组成功】')
else:
self.logger.info(Move_info['message'])
def send_danmu(self, room_id, csrf):
emotion_list = ['official_147', 'official_109', 'official_113', 'official_120', 'official_150', 'official_103', 'official_128', 'official_133', 'official_149', 'official_124', 'official_146', 'official_148', 'official_102', 'official_121', 'official_137', 'official_118', 'official_129', 'official_108', 'official_104', 'official_105', 'official_106', 'official_114', 'official_107', 'official_110', 'official_111', 'official_136', 'official_115', 'official_116', 'official_117', 'official_119', 'official_122', 'official_123', 'official_125', 'official_126', 'official_127', 'official_134', 'official_135', 'official_138']
rnd = int(time.time())
emotion = random.choice(emotion_list)
data = {'bubble': 0, 'msg': emotion, 'color': 16777215, 'fontsize': 25, 'mode': 1, 'rnd': rnd, 'dm_type': 1,
'roomid': room_id, 'csrf': csrf}
danmu_info = self.post_requests(self.send, data)
if danmu_info['code'] == 0:
self.logger.info(f'------->发送弹幕成功')
else:
self.logger.info(danmu_info['message'])
def decorate(self):
self.logger.info("脚本由GitHub@王权富贵233提供")
self.logger.info("该脚本仅供学习交流,仅供学习参考,仅供学习参考")
self.logger.info("脚本现已支持黑名单,前往Bilibili_config.json添加黑名单")
self.logger.info("格式:black_list = [uid1,uid2,uid3,...,...] uid为数字,逗号为英文逗号")
def run(self):
self.decorate()
for i in range(len(self.cookies)):
self.headers['Cookie'] = self.cookies[i]
self.headers['user-agent'] = random.choice(self.ua_list)
self.headers['referer'] = "https://live.bilibili.com/"
if self.cope_info(self.get_requests(self.url)):
self.logger.info("检查帐号有效性成功")
else:
self.logger.info("帐号cookie过期,跳过该账号")
continue
self.check_group(self.csrfs[i])
self.logger.info(f"{'*' * 5}第{i + 1}帐号结束{'*' * 5}")
if __name__ == '__main__':
ctime = Bilibili_CTime()
ctime.run()
else:
ctime = Bilibili_CTime()
ctime.run()
运行日常任务:Bilibili_Daily.py
"""
cron : 5 1 * * *
new Env("哔哩哔哩-【日常】")
"""
from Bilibili_User import *
class Daily(Basic):
def __init__(self):
super().__init__()
self.specify = self.fetch_favorite()
def check_group(self):
follows = self.get_requests(self.url9)
if follows['code'] == 0:
for i in follows['data']:
if i['tagid'] == 0:
return i['count']
else:
return None
def uid_info(self, coin, csrf):
count = self.check_group()
if count == 0:
self.logger.info("默认分组为空")
elif 55 >= count > 0:
url = self.url10 + '?tagid=0'
self.fetch_uid(url, coin, csrf)
else:
page = random.randint(1, int(count / 50) + 1)
url = self.url10 + '?tagid=0&pn=%s' % page
self.fetch_uid(url, coin, csrf)
def fetch_uid(self, url, coin, csrf):
if len(self.specify) > 0:
num = 0
while True:
mid = random.choice(self.specify)
self.logger.info("你选择了指定用户投币,uid:%s" % mid)
bvid_list, video_title = self.cyc_search_uid(mid)
if bvid_list:
if self.cope_video(bvid_list, video_title, coin, csrf):
num += 1
else:
pass
else:
if len(self.specify) == 1:
break
if num == 5:
break
if num < 5:
self.logger.info("指定up没有可投币的视频,随机投币")
self.cycle_uid(url, coin, csrf)
else:
self.cycle_uid(url, coin, csrf)
def other_task(self, csrf):
url = self.url10 + '?tagid=0'
group_info = self.get_requests(url)
mid_list, uname_list = self.list_uid(group_info['data'])
mid = random.choice(mid_list)
self.logger.info("随机选择了用户:" + uname_list[mid_list.index(mid)])
bvid_list, video_title = self.cyc_search_uid(mid)
if bvid_list:
video = random.choice(bvid_list)
self.play_video(video, video_title[bvid_list.index(video)])
self.share_dynamic(video_title[bvid_list.index(video)], video, csrf)
return True
else:
return False
def cycle_uid(self, url, coin, csrf):
group_info = self.get_requests(url)
mid_list, uname_list = self.list_uid(group_info['data'])
num = 0
while True:
mid = random.choice(mid_list)
self.logger.info("本次投币对象:"+uname_list[mid_list.index(mid)])
bvid_list, video_title = self.cyc_search_uid(mid)
if bvid_list:
if self.cope_video(bvid_list, video_title, coin, csrf):
num += 1
else:
pass
else:
pass
if num == 5:
break
def cope_video(self, bvid_list, video_title, coin, csrf):
choice_video = []
num = 0
for i in range(len(bvid_list)):
video = random.choice(bvid_list)
self.logger.info("视频:%s" % video_title[bvid_list.index(video)])
if self.c_v_d_i(video, coin, csrf) and video is not choice_video:
num += 1
break
else:
choice_video.append(video)
if num == 1:
return True
else:
return False
@staticmethod
def list_uid(data):
mid_list = []
uname_list = []
for i in data:
mid_list.append(i['mid'])
uname_list.append(i['uname'])
return mid_list, uname_list
def search_uid(self, mid):
url = self.url11 % mid
video_num = self.get_requests(url)
if video_num['code'] == 0:
return video_num['data']['page']['count']
else:
return None
def cyc_search_uid(self, mid):
num = self.search_uid(mid)
if num > 50:
page = random.randint(1, int(num / 50) + 1)
bvid_list, video_title = self.bvid_page(mid, page)
return bvid_list, video_title
elif 0 < num <= 50:
bvid_list, video_title = self.bvid_page(mid, 1)
return bvid_list, video_title
else:
return None, None
def bvid_page(self, mid, page):
url = self.url11 % mid + "&ps=50&pn=%s" % page
bvid_data = self.get_requests(url)
if bvid_data['code'] == 0:
bvid_list, video_title = self.bvid_list(bvid_data['data']['list']['vlist'])
return bvid_list, video_title
else:
self.logger.error(bvid_data)
@staticmethod
def bvid_list(data):
bvid_list = []
video_title = []
for i in data:
bvid_list.append(i['bvid'])
video_title.append(i['title'])
return bvid_list, video_title
def c_v_d_i(self, bvid, coin, csrf):
url = "https://api.bilibili.com/x/web-interface/archive/coins?bvid=%s" % bvid
data = self.get_requests(url)
if data['code'] == 0:
if data['data']['multiply'] == 0:
self.drop_coin(bvid, coin, csrf)
return True
else:
self.logger.info("该视频已经投币过,不再投币")
return False
else:
self.logger.error('检查投币情况失败,错误码:%s' % data['code'])
return False
def recommend_video(self, bvid):
url = self.url5 + "?bvid=" + bvid
data = self.get_requests(url)
if data['code'] == 0:
return self.cope_recommend(data['data'])
else:
self.logger.error(data['message'])
@staticmethod
def cope_recommend(data):
title = []
bv = []
for i in data:
title.append(i['title'])
bv.append(i['bvid'])
return title, bv
def drop_coin(self, bv, coin, csrf):
data = {
'bvid': bv,
'multiply': coin,
'csrf': csrf
}
drop = self.post_requests(self.url3, data)
if drop['code'] == 0:
self.logger.info("【投币成功】")
else:
self.logger.error(drop['message'])
def sign_live(self):
self.logger.info('开始直播签到')
sign = self.get_requests(self.url8)
if sign['code'] == 0:
self.logger.info('【直播签到成功】')
else:
self.logger.info(sign['message'])
def share_dynamic(self, title, bv, csrf):
data = {
"bvid": bv,
"csrf": csrf
}
self.logger.info('开始分享动态,标题%s' % title)
share = self.post_requests(self.url4, data)
if share['code'] == 0:
self.logger.info('【分享动态成功】')
else:
self.logger.info(share['message'])
def play_video(self, bv, title):
data = {
"bvid": bv,
"play_time": random.randint(30, 45),
"realtime": random.randint(30, 45)
}
self.logger.info('开始播放视频,标题%s' % title)
play = self.post_requests(self.url6, data)
if play['code'] == 0:
self.logger.info('【播放视频成功】')
else:
self.logger.info(play['message'])
def clockin(self):
data = {'platform': 'android'}
clock_info = self.post_requests(self.clockin_url, data)
if clock_info['code'] == 0:
self.logger.info("漫画签到成功")
else:
self.logger.info(clock_info['msg'])
def run(self):
self.manual()
self.logger.info('目的是随机给我回点硬币,是随机')
self.logger.info('本次更新支持不投币,coin设置为0即可')
cookies = self.fetch_cookies()
coins = self.fetch_drop_coin()
csrfs = self.fetch_csrf(cookies)
for i in cookies:
self.headers['Cookie'] = i
self.headers['user-agent'] = random.choice(self.ua_list)
self.headers['Referer'] = 'https://www.bilibili.com/'
user_info = self.get_requests(self.url)
if self.cope_info(user_info):
self.logger.info('--》帐号有效《--')
else:
self.logger.info('--》帐号无效,跳出该帐号《--')
continue
self.mao_san(csrfs[cookies.index(i)])
self.clockin()
self.sign_live()
if self.other_task(csrfs[cookies.index(i)]):
pass
else:
self.other_task(csrfs[cookies.index(i)])
if coins[cookies.index(i)] == 0:
self.logger.info('设置了不投币,跳过投币任务')
continue
else:
self.uid_info(coins[cookies.index(i)], csrfs[cookies.index(i)])
self.logger.info("= "*8+"任务完成"+"= "*8)
if __name__ == '__main__':
Daily = Daily()
Daily.run()
else:
Daily = Daily()
Daily.run()
取消关注up主:Bilibili_Unfollows.py
"""
cron : 2 1 */15 * *
new Env("哔哩哔哩-取关")
"""
from Bilibili_User import Basic
class Unfollows(Basic):
def __init__(self):
super().__init__()
self.white_list = self.fetch_white_list()
def check_group(self, csrf):
self.logger.info('检查是否有天选时刻分组')
group = self.get_requests(self.url9)
self.cope_group(group, csrf)
def cope_group(self, group, csrf):
for i in group['data']:
if i['name'] == '天选时刻':
self.logger.info('有天选时刻分组,开始检查关注人数')
if i['count'] > 0:
self.logger.info('天选时刻分组关注人数: %s ***>开始执行取关任务' % i['count'])
self.fetch_mid(i['tagid'], i['count'], csrf)
else:
self.logger.info('天选时刻分组关注人数: %s ***>无需取关' % i['count'])
break
else:
self.logger.info('没有天选时刻分组,结束检查')
return None
def fetch_mid(self, group_id, count, csrf):
url = self.url10 + '?tagid=%s' % group_id
if count <= 50:
group_info = self.get_requests(url)
userid, uname = self.cope_User(group_info)
self.cyc_unfollow(userid, uname, csrf)
else:
for i in range(int(count / 50) + 1):
group_info = self.get_requests(url)
userid, uname = self.cope_User(group_info)
self.cyc_unfollow(userid, uname, csrf)
def screen_white_list(self, userid):
if self.white_list is None:
return False
if userid in self.white_list:
return True
else:
return False
@staticmethod
def cope_User(group_info):
mid = []
uname = []
for i in group_info['data']:
mid.append(i['mid'])
uname.append(i['uname'])
return mid, uname
def cyc_unfollow(self, mid, uname, csrf):
for i in range(len(mid)):
if self.screen_white_list(mid[i]):
self.logger.info('【%s】在白名单中,跳过' % uname[i])
continue
else:
self.unfollow(mid[i], csrf)
self.logger.info('取关: 【%s】' % uname[i])
def unfollow(self, mid, csrf):
data = {'fid': mid, 'act': 2, 're_src': 11, 'csrf': csrf}
unfollow = self.post_requests(self.url1, data)
self.unfollow_info(unfollow)
def unfollow_info(self, unfollow):
if unfollow['code'] == 0:
self.logger.info('取关成功')
else:
self.logger.info('取关失败')
def run(self):
self.logger.info('脚本支持白名单,可以在Bilibili_config.json设置不取关的用户ID')
self.logger.info('白名单格式: [uid1, uid2, uid3, ...] uid为数字,逗号为英文逗号')
self.logger.info('脚本作者为github@wangquanfugui233')
self.logger.info("*" * 6 + "开始取关" + "*" * 6)
for i in range(len(self.cookies)):
self.headers['cookie'] = self.cookies[i]
self.headers['referer'] = "https://www.bilibili.com/"
self.check_group(self.csrfs[i])
self.logger.info("=============》结束《============")
if __name__ == '__main__':
Unfollow = Unfollows()
Unfollow.run()
else:
Unfollow = Unfollows()
Unfollow.run()
中奖通知:Bilibili_Prize.py
"""
cron 1 23 * * *
new Env("哔哩哔哩-天选中奖通知")
"""
from Bilibili_User import *
# from notify import send # 暂时不对接
import datetime
class prize(Basic):
def __init__(self):
super().__init__()
self.msg = '详细信息:\n'
def get_prize(self):
data = self.get_requests(self.prize)
if data['code'] == 0:
self.cope_prize(data['data']['list'])
else:
self.logger.info(data)
def cope_prize(self, data):
dt = time.strftime("%Y-%m-%d ", time.localtime())
for i in data:
date_time = datetime.datetime.strptime(i['end_time'], '%Y-%m-%d %H:%M:%S')
if date_time.strftime('%Y-%m-%d ') == dt:
self.logger.info('恭喜你中奖,奖品是:%s --- 兑奖up主是:%s' % (i['award_name'], i['anchor_name']))
self.logger.info('中将时间是:%s' % i['end_time'])
self.msg += '\n 中将时间:【%s】 今天的天选中奖,奖品是:%s --- 兑奖up主是:%s' % (
i['end_time'], i['award_name'], i['anchor_name'])
self.logger.info("检查白名单")
self.check_white_list(i['anchor_uid'])
else:
pass
if len(data) == 0:
self.msg += '毛也没有中'
def check_white_list(self, uid):
num = 0
with open('./Bilibili_config.json', 'r') as f:
data = json.load(f)
if data['white_list']:
for i in data['white_list']:
if i == uid:
self.logger.info('已存在于白名单')
num += 1
break # 如果存在于白名单,则跳出循环
else:
self.logger.info('白名单为空')
self.update_white_list(data, uid)
if num == 0:
self.update_white_list(data, uid)
@staticmethod
def update_white_list(data, uid):
data['white_list'].append(uid)
with open('./Bilibili_config.json', 'w', encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=4)
print('加入白名单成功')
def ql_send(self):
# send("Bilibili天选通知", self.msg) # 暂时不用
print(self.msg)
def run(self):
cookies = self.fetch_cookies()
for i in cookies:
self.headers['Cookie'] = i
user = self.get_requests(self.url)
self.msg += "用户:%s \n" % user['data']['uname']
self.cope_info(user)
self.get_prize()
try:
self.ql_send()
except Exception as e:
self.logger.info('推送失败')
print(e)
if __name__ == '__main__':
prize = prize()
prize.run()
else:
prize = prize()
prize.run()
关注源作者和其他:Bilibili_User.py
"""
cron : 2 1 * * *
new Env("哔哩哔哩-【basic】")
"""
import json
import random
import time
from Bilibili_Config import Config
import requests
class Basic(Config):
def __init__(self):
super().__init__()
self.cookies = self.fetch_cookies()
self.csrfs = self.fetch_csrf(self.cookies)
self.coin = self.fetch_drop_coin()
def get_requests(self, url):
try:
time.sleep(1)
self.headers['user-agent'] = random.choice(self.ua_list)
with requests.session() as s:
# 代理
proxies = {'http': 'http://202.55.5.209:8090'}
s.keep_alive = False
s.adapters.DEFAULT_RETRIES = 2
r = s.get(url, headers=self.headers, proxies=proxies)
if r.status_code == 200:
data = json.loads(r.text)
return data
else:
data_error = json.loads(r.text)
return data_error
except Exception as e:
self.logger.error('请求失败,错误信息:{}'.format(e))
def post_requests(self, url, data):
try:
self.headers['method'] = 'POST'
self.headers['Content-Type'] = 'application/x-www-form-urlencoded'
time.sleep(1)
with requests.session() as s:
s.keep_alive = False
r = s.post(url, headers=self.headers, data=data)
if r.status_code == 200:
post_data = json.loads(r.text)
return post_data
else:
data_error = json.loads(r.text)
return data_error
except Exception as e:
self.logger.error('请求失败,错误信息:{}'.format(e))
def check_author(self):
url = self.url_re % 289549318
author = self.get_requests(url)
if author['code'] == 0:
if author['data']['attribute'] != 0:
return True
else:
return False
else:
return False
def follow_author(self, csrf):
data = {'fid': 289549318, 'act': 1, 're_src': 11, 'csrf': csrf}
follow = self.post_requests(self.url1, data)
if follow['code'] == 0:
self.logger.info("关注作者成功")
else:
self.logger.info(follow['message'])
def cope_info(self, data):
if data['code'] == 0:
self.logger.info("**********" + data['data']['uname'] + "**********")
self.logger.info("当前经验值:" + str(data['data']['level_info']['current_exp']))
if data['data']['level_info']['current_level'] == 6:
self.logger.info("你已经是lv6的大佬了")
else:
level_day = (data['data']['level_info']['next_exp'] - data['data']['level_info']['current_exp']) / 65
self.logger.info('当前硬币数:' + str(data['data']['money']) + ",下一等级升级天数约" + str(int(level_day)))
return True
elif data['code'] == -101:
self.logger.info(data['message'] + "请检查cookie")
return False
elif data['code'] == -111:
self.logger.info(data['message'] + "请检查csrf")
else:
self.logger.info(data['message'])
def mao_san(self, csrf):
if self.fetch_follow():
if self.check_author():
pass
else:
self.follow_author(csrf)
else:
pass
def manual(self):
self.logger.info("author: github@wangquanfugui233")
self.logger.info("本次更新加入关注作者B站号,将随机为作者提供0.1个硬币")
self.logger.info("这让你感到不适,请删除脚本或者配置文件将follow_author设置为小写false")
def user_info(self):
self.manual()
for i in range(len(self.cookies)):
self.headers['Cookie'] = self.cookies[i]
user = self.get_requests(self.url)
self.cope_info(user)
if __name__ == '__main__':
cope = Basic()
cope.user_info()