--永恒的烦恼

Bilibili 自动做升级任务以及参加天选

仅供个人学习,不得用于牟利,仅供学习参考。定时运行脚本为本人开发,其它脚本是他人的,源作者地址:https://github.com/wangquanfugui233/Bilibili_Python

一、脚本说明

定时任务是cron 表达式,不懂的可以百度搜索 cron 在线生成器,可以帮助你有效定时运行。
定时任务的脚本只需要修改定时运行的时间,也可以自己不修改,使用默认的是没有什么问题的(参加天选到现在写脚本都没有中过奖,主要是每天帮我做一下任务升级一下)。
以下是脚本文件说明:
定时运行脚本:Blilibili_Run.py
检查配置文件:Bilibili_Config.py
参加天选时刻:Bilibili_CTime.py
运行日常任务:Bilibili_Daily.py
取消关注up主:Bilibili_Unfollows.py
中奖通知:Bilibili_Prize.py
关注源作者和其他:Bilibili_User.py

二、配置相关

1.需要安装两个python模块:

pip install requests apscheduler

2.运行定时脚本(Blilibili_Run.py),第一次运行或配置文件不存在自动生成配置文件。
3.获取b站的cookie,如图所示:
(1)登录,点击个人中心

(2)在网页空白处,鼠标右键,点击检查

(3)随便找一个链接,查看是否存在cookie

4.修改配置(Bilibili_config.json),添加自己的cookie,
具体如图:

配置文件说明:
coin:投币, 做任务时投币的数量。
max_pageL: 扫描天选页数。
max_thread: 扫描线程数。
black_list:黑名单, 就是你不想关注的up主mid放在这里,一定要是英文逗号。
white_list:白名单, 就是你不想取关的,和中奖的up主mid会存在这里,一定要是英文逗号。
favorite:指定给up主投币,做日常任务时投币的任务。
follow_author:关注源作者的账号(源作者的账号好像被封了)。不想关注就写False。
多账号格式:—》:{"Cookie": ""},{"Cookie": ""},{"Cookie": ""}….. 一定要是英文逗号。

三、修改定时时间(可不用修改)

定时任务是cron 表达式,不懂的可以百度搜索 cron 在线生成器,可以帮助你有效定时运行。
控制运行时间的四个变量如图所示:

四、脚本

我自己的仓库地址:https://github.com/jiubanyipeng/single/tree/main/Bilibili
源作者的:https://github.com/wangquanfugui233/Bilibili_Python
或者你在线复制也可以,注意文件名一定要对应。
定时运行脚本:Blilibili_Run.py

'''
### 定时脚本
### 其它脚本的使用别人的,源作者地址:https://github.com/wangquanfugui233/Bilibili_Python
### 第一次运行会自己生成 Bilibili_config.json 配置文件
### 配置文件说明:
# coin:投币, 做任务时投币的数量
# max_pageL: 扫描天选页数
# max_thread: 扫描线程数
# black_list:黑名单, 就是你不想关注的up主mid放在这里,一定要是英文逗号
# white_list:白名单, 就是你不想取关的,和中奖的up主mid会存在这里,一定要是英文逗号
# favorite:指定给up主投币,做日常任务时投币的任务
# follow_author:关注源作者的账号(源作者的账号好像被封了)。不想关注就写False
# 多账号格式:---》:{"Cookie": ""},{"Cookie": ""},{"Cookie": ""}..... 一定要是英文逗号
仅供个人学习,不得用于牟利,仅供学习参考
'''

import Bilibili_Config    # 运行前检查配置文件
from apscheduler.schedulers.blocking import BlockingScheduler  # 定时任务框架
import json
import sys

# 参加天选时刻
def Bilibili_CTime():
    import Bilibili_CTime

# 运行日常任务
def Bilibili_Dayly():
    import Bilibili_Daily

# 取消关注
def Bilibili_Unfollows():
    import Bilibili_Unfollows

# 中奖通知
def Bilibili_Prize():
    import Bilibili_Prize

# 运行
def func(tx_cron, rc_cron, qx_cron, zj_cron):
    scheduler = BlockingScheduler()
    try:
        scheduler.add_job(Bilibili_CTime, 'cron',  year='*', month='*', day=f'{tx_cron[2]}', hour=f'{tx_cron[1]}', minute=f'{tx_cron[0]}', id='run1', name='参加天选时刻', jitter=120)
        scheduler.add_job(Bilibili_Dayly, 'cron',  day=f'{rc_cron[2]}', hour=f'{rc_cron[1]}', minute=f'{rc_cron[0]}', id='run2', name='运行日常任务', jitter=120)
        scheduler.add_job(Bilibili_Unfollows, 'cron', day=f'{qx_cron[2]}', hour=f'{qx_cron[1]}', minute=f'{qx_cron[0]}', id='run3', name='运行取消关注任务', jitter=120)
        scheduler.add_job(Bilibili_Prize, 'cron',  day=f'{zj_cron[2]}', hour=f'{zj_cron[1]}', minute=f'{zj_cron[0]}', id='run4', name='中奖通知', jitter=120)
        scheduler.start()

    except Exception as e:
        print(e)

if __name__ == '__main__':
    try:
        with open('Bilibili_config.json') as f:
            if len(json.loads(f.read())['Users'][0]['Cookie']) < 1:
                print('cookie为空或没有设置好,先设置好cookie!')
                sys.exit()
        # 下面的是cron 表达式,不懂的可以百度搜索 cron 在线生成器,可以有效帮助
        # 分 时 日  ,为了兼容,必须三个数据,且空格分开,不要多个空格
        tx_cron = '12 12-14 *'    # 天选时刻,每天运行三次,分别12点12分、13点12分、14点12分开始运行,一天最多只能运行四次,超过cookie过期
        rc_cron = '12 20 *'    # 日常任务,每天运行1次,每天的15点12分开始运行
        qx_cron = '25 20 *'    # 取消关注,每天运行1次,每天的15点25分开始运行
        zj_cron = '30 20 *'    # 中奖通知,每天运行1次,每天的15点30分开始运行

        if len(tx_cron.split(' ')) == 3 and len(rc_cron.split(' ')) == 3 and len(qx_cron.split(' ')) == 3 and len(
                zj_cron.split(' ')) == 3:
            func(tx_cron.split(' '), rc_cron.split(' '), qx_cron.split(' '), zj_cron.split(' '))
        else:
            print('你的cron值错了,请检查!')

    except Exception as e:
        print('cookie为空或没有设置好,先设置好cookie!')

检查配置文件:Bilibili_Config.py

"""
cron: 1 1 1 1 *
new Env('哔哩哔哩-【配置】')
定时随意,每次更新运行两下
"""

import json
import logging
import os

class Config:
    def __init__(self):
        self.headers = {
            "authority": "api.bilibili.com",
            "method": "GET",
            "scheme": "https",
            "accept": "*/*",
            "accept-language": "zh-CN,zh;q=0.9,en;q=0.8,en-US;q=0.6",
            "cookie": "",
            "dnt": "1",
            "origin": "https://www.bilibili.com",
            "sec-ch-ua": '" Not A;Brand";v="99", "Chromium";v="101", "Microsoft Edge";v="101"',
            "sec-fetch-dest": "empty",
            "sec-fetch-mode": "cors",
            "sec-fetch-site": "same-site",
            "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) "
                          "Chrome/101.0.4951.64 Safari/537.36 Edg/101.0.1210.47 ",
            "Connection": "close"
        }
        self.url = "https://api.bilibili.com/x/web-interface/nav"
        self.url1 = "https://api.bilibili.com/x/relation/modify"
        self.url2 = "https://api.bilibili.com/x/polymer/web-dynamic/v1/feed/all"
        self.url3 = "https://api.bilibili.com/x/web-interface/coin/add"
        self.url4 = "https://api.bilibili.com/x/web-interface/share/add"
        self.url5 = "https://api.bilibili.com/x/web-interface/archive/related"
        self.url6 = "https://api.bilibili.com/x/click-interface/web/heartbeat"
        self.url7 = "https://api.live.bilibili.com/room/v1/Area/getList"
        self.url8 = "https://api.live.bilibili.com/xlive/web-ucenter/v1/sign/DoSign"
        self.url_re = "https://api.bilibili.com/x/relation?fid=%s"
        self.url9 = "https://api.bilibili.com/x/relation/tags"
        self.url10 = "https://api.bilibili.com/x/relation/tag"
        self.url11 = "https://api.bilibili.com/x/space/arc/search?mid=%s"
        self.url_all = "https://api.live.bilibili.com/xlive/web-interface/v1/second/getList?platform=web" \
                       "&parent_area_id=%s&area_id=%s&page=%s "
        self.url_check = "https://api.live.bilibili.com/xlive/lottery-interface/v1/Anchor/Check?roomid=%s"
        self.url_tx = "https://api.live.bilibili.com/xlive/lottery-interface/v1/Anchor/Join"
        self.url_relationship = "https://api.bilibili.com/x/relation/tags/moveUsers"
        self.url_group = "https://api.bilibili.com/x/relation/tags"
        self.create_url = "https://api.bilibili.com/x/relation/tag/create"
        self.prize = "https://api.live.bilibili.com/xlive/lottery-interface/v1/Anchor/AwardRecord"
        self.send = "https://api.live.bilibili.com/msg/send"
        self.clockin_url = "https://manga.bilibili.com/twirp/activity.v1.Activity/ClockIn"
        self.ua_list = [
            "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) "
            "Chrome/101.0.4951.64 Safari/537.36 Edg/101.0.1210.47 ",
            "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.5060.53 "
            "Safari/537.36 Edg/103.0.1264.37",
            "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 "
            "Safari/537.36 "
        ]
        logging.basicConfig(level=logging.INFO, format='%(message)s')
        self.logger = logging.getLogger(__name__)

    def check_json(self):
        config = os.getcwd() + '/Bilibili_config.json'
        if os.path.exists(config):
            self.logger.info("配置文件存在")
            return True
        else:
            self.logger.info("配置文件不存在")
            return False

    def create_file(self):
        with open('./Bilibili_config.json', 'w', encoding='utf-8') as f:
            json.dump({"Users": [{"Cookie": ""}], "Drop_coin": [{"coin": 1}], "max_page": 50, "max_thread": 7,
                       "black_list": [], "white_list": [], "favorite": []}, f, ensure_ascii=False, indent=4)
        self.logger.info("Bilibili_config.json文件已经生成,请在文件中添加Cookie 如:")
        self.logger.info('"Users": [{"Cookie": "这里是你的cookie"}]')
        self.logger.info('默认生成单帐号,投币数量为1,天选页数为50,投币线程数为7,黑名单为空,白名单为空,指定的投币up主为空')
        self.logger.info('如需多账号,则如 "Users": [{"Cookie": "这里是你的cookie"}, {"Cookie": "这里是你的cookie"},{}...]')

    def update_config(self):
        coin = {'coin': 1}
        with open("./Bilibili_config.json", 'r', encoding='utf-8') as f:
            data = json.load(f)
            for i in data['Drop_coin']:
                if i.keys() != coin.keys():
                    self.logger.info("检查到你的投币配置有误,自动为你更新")
                    data['Drop_coin'].remove(i)
                    data['Drop_coin'].append(coin)
                    self.update_json(data)
            if "black_list" in data:
                self.logger.info("黑名单文件已存在")
            else:
                self.logger.info("黑名单不存在,创建")
                data.update({"black_list": []})
                self.update_json(data)
            if "white_list" in data:
                self.logger.info("白名单文件已存在")
            else:
                self.logger.info("白名单不存在,创建")
                data.update({"white_list": []})
                self.update_json(data)
            if "Unfollows" in data:
                del data['Unfollows']
                self.update_json(data)
            if "favorite" in data:
                self.logger.info("给指定up主投币名单配置存在")
            else:
                self.logger.info("给指定up主投币名单配置不存在,创建")
                data.update({"favorite": []})
                self.update_json(data)
            if "follow_author" in data:
                self.logger.info("关注作者开关存在")
            else:
                self.logger.info("关注作者开关不存在,默认开启")
                data.update({"follow_author": True})
                self.update_json(data)

    def insert_data(self, num):
        for i in range(num):
            with open("./Bilibili_config.json", 'r', encoding='utf-8') as f:
                data = json.load(f)
                data['Drop_coin'].append({'coin': 1})
            self.update_json(data)

    def update_json(self, data):
        with open("./Bilibili_config.json", 'w', encoding='utf-8') as f:
            json.dump(data, f, ensure_ascii=False, indent=4)
        self.logger.info("更新配置文件完成")

    def fetch_cookies(self):
        cookies = []
        try:
            with open('./Bilibili_config.json', 'r', encoding='utf-8') as f:
                ck = json.load(f)
                for i in ck['Users']:
                    if i['Cookie'] != "":
                        cookies.append(i['Cookie'])
                    else:
                        self.logger.info("Cookie为空")
                return cookies
        except Exception as e:
            self.logger.error("请检查config文件路径是否存在或者文件是否正确,文件应该与当前文件在同一目录下:" + str(e))
            return None

    def fetch_csrf(self, cookies):
        element = []
        csrfs = []
        try:
            for i in cookies:
                str1 = i.split('; ')
                for j in str1:
                    str2 = j.split('=')
                    element.append(str2)
                csrf_dict = dict(element)
                csrfs.append(csrf_dict['bili_jct'])
                element.clear()
            return csrfs
        except Exception as e:
            self.logger.info("请检查你的cookie是否正确 " + str(e))
            return None

    def fetch_page(self):
        try:
            with open('./Bilibili_config.json', 'r', encoding='utf-8') as f:
                page = json.load(f)
                return page['max_page']
        except Exception as e:
            self.logger.info(e)
            return 0

    def fetch_thread(self):
        try:
            with open('./Bilibili_config.json', 'r', encoding='utf-8') as f:
                thread = json.load(f)
                if thread['max_thread'] <= 12:
                    return thread['max_thread']
                else:
                    return 12
        except Exception as e:
            self.logger.info(e)
            return 0

    def fetch_drop_coin(self):
        coin = []
        try:
            with open('./Bilibili_config.json', 'r', encoding='utf-8') as f:
                drop_coin = json.load(f)
                for i in drop_coin['Drop_coin']:
                    coin.append(i['coin'])
                return coin
        except Exception as e:
            self.logger.info(e)
            return 0

    def fetch_black_list(self):
        try:
            with open('./Bilibili_config.json', 'r', encoding='utf-8') as f:
                black_list = json.load(f)
                return black_list['black_list']
        except Exception as e:
            self.logger.error(e)
            return 0

    def fetch_white_list(self):
        try:
            with open('./Bilibili_config.json', 'r', encoding='utf-8') as f:
                white_list = json.load(f)
                return white_list['white_list']
        except Exception as e:
            self.logger.error(e)
            return 0

    def fetch_favorite(self):
        try:
            with open('./Bilibili_config.json', 'r', encoding='utf-8') as f:
                white_list = json.load(f)
                return white_list['favorite']
        except Exception as e:
            self.logger.error(e)
            return None

    def fetch_follow(self):
        try:
            with open('./Bilibili_config.json', 'r', encoding='utf-8') as f:
                follow = json.load(f)
                return follow['follow_author']
        except Exception as e:
            self.logger.error(e)
            return False

    def check_config(self):
        cookies = self.fetch_cookies()
        csrfs = self.fetch_csrf(cookies)
        coins = self.fetch_drop_coin()
        if cookies:
            if len(cookies) == len(coins) == len(csrfs):
                self.logger.info("基础配置文件正确")
                self.update_config()
            else:
                if len(cookies) > len(coins):
                    self.insert_data(len(cookies) - len(coins))
                else:
                    self.logger.info("投币配置貌似多于帐号数量,不影响程序运行")
        else:
            self.logger.info("检查一下cookie吧")

    def basic_info(self):
        self.logger.info("json文件路径为:" + str(os.getcwd()) + "\Bilibili_config.json")
        self.logger.info("【这是提醒不是错误】:cookie不能有大括号!!!!!!")
        self.logger.info("配置文件说明BV号:BV1SB4y1e7Vr")
        self.logger.info('author: github@wangquanfugui233')
        if self.check_json():
            self.logger.info("开始检查配置文件")
            self.check_config()
        else:
            self.logger.info("开始创建配置文件")
            self.create_file()

if __name__ == '__main__':
    a = Config()
    a.basic_info()
else:
   a = Config()
   a.basic_info()

参加天选时刻:Bilibili_CTime.py

"""
cron: 1 8-12 * * *
new Env('哔哩哔哩-【天选时刻】')
"""
import logging
import os
import random
import signal
import re
import sys
import time
from concurrent.futures import ThreadPoolExecutor
from Bilibili_User import *

logging.basicConfig(level=logging.INFO, format='%(message)s')

# logger = logging.getLogger()

class Bilibili_CTime(Basic):
    def __init__(self):
        super().__init__()
        self.max_page = self.fetch_page()
        self.max_thread = self.fetch_thread()
        self.black_list = self.fetch_black_list()

    def check_group(self, csrf):
        try:
            group = self.get_requests(self.url9)
            tag_id = self.cope_group(group)
            if tag_id is not None:
                self.logger.info(f'{"**" * 5}存在天选时刻分组{"**" * 5}')
                time.sleep(15)
                self.Scan_all_live(tag_id, csrf)
            else:
                self.logger.info('不存在分组,开始创建')
                tag_id = self.create_group(csrf)
                self.Scan_all_live(tag_id, csrf)
        except Exception as e:
            self.logger.info(e)
            os.kill(os.getpid(), signal.SIGKILL)
            # os._exit(0)

    @staticmethod
    def cope_group(group):
        for i in group['data']:
            if i['name'] == '天选时刻':
                return i['tagid']
            else:
                pass

    def create_group(self, csrf):
        data = {'tag': '天选时刻', 'csrf': csrf}
        group = self.post_requests(self.create_url, data)
        if group['code'] == 0:
            self.logger.info('创建成功')
            return group['data']['tagid']
        else:
            self.logger.info('创建失败,后续关注的up在默认分组')
            return 0

    def Scan_all_live(self, tag_id, csrf):
        all_live = self.get_requests(self.url7)
        if all_live['code'] == 0:
            self.cope_all_live(all_live, tag_id, csrf)
        else:
            self.logger.info('扫描全部分区失败')
            os.kill(os.getpid(), signal.SIGKILL)
            sys.exit(0)

    def cope_all_live(self, all_live, tag_id, csrf):
        for i in all_live['data']:
            min_id = [i['id'] for i in i['list']]
            min_name = [i['name'] for i in i['list']]
            self.logger.info(f'扫描【{i["name"]}】分区')
            self.cycle_live(i['id'], min_id, tag_id, csrf)

    def cycle_live(self, parent_id, min_live_id, tag_id, csrf):
        with ThreadPoolExecutor(max_workers=self.max_thread) as executor:
            task_list = []
            for i in min_live_id:
                task_list.append(executor.submit(self.cycle_page, parent_id, i, tag_id, csrf))

    def cycle_page(self, parent_id, child_id, tag_id, csrf):
        for i in range(1, self.max_page + 1):
            live_page = self.url_all % (parent_id, child_id, i)
            page_info = self.get_requests(live_page)
            if page_info['code'] == 0:
                if page_info['data']['list']:
                    self.scan_live_page(page_info, tag_id, csrf)
                else:
                    break
            if page_info['code'] == -412:
                self.logger.info("检测到被拦截,结束程序,一小时后再试")
                os.kill(os.getpid(), signal.SIGKILL)
                # os._exit(0)

    def scan_live_page(self, page_info, tag_id, csrf):
        for i in page_info['data']['list']:
            room_info = self.screen_ct_room(i)
            if room_info:
                self.check_room(room_info[0], room_info[1], tag_id, csrf)
            else:
                continue

    @staticmethod
    def screen_ct_room(i):
        if '2' in i['pendant_info']:
            if i['pendant_info']['2']['content'] == '天选时刻':
                return i['roomid'], i['uname']
            else:
                pass

    def check_room(self, room_id, uname, tag_id, csrf):
        url = self.url_check % room_id
        tx_info = self.get_requests(url)
        if tx_info['code'] == 0:
            award = self.screen_condition(tx_info['data']['award_name'])
            require = self.screen_condition(tx_info['data']['require_text'])
            if award or require:
                pass
            else:
                if tx_info['data']['gift_id'] == 0:
                    self.logger.info(f"【{uname}】--【{tx_info['data']['award_name']}】--【{tx_info['data']['require_text']}】")
                    self.tx_join(tx_info['data']['ruid'], tx_info['data']['id'], uname, room_id, tag_id, csrf)
                else:
                    pass

    @staticmethod
    def screen_condition(condition):
        pattern = re.compile(r'大航海|舰长|.?车车?|手照|代金券|优惠券|勋章|提督|男')
        if pattern.findall(condition):
            return True
        else:
            return False

    def tx_join(self, uid, tid, uname, room_id, tag_id, csrf):
        if self.black_list_check(uid):
            self.logger.info(f"【{uname}】已在黑名单中")
        else:
            data = {'id': tid, 'platfrom': 'pc', 'roomid': room_id, 'csrf': csrf}
            join_info = self.post_requests(self.url_tx, data)
            if join_info['code'] == 0:
                self.logger.info(f'【成功参加"{uname}"的天选】')
                self.send_danmu(room_id, csrf)
                self.relationship_check(uid, uname, tag_id, csrf)
            else:
                self.logger.info(join_info['message'])

    def black_list_check(self, uid):
        if self.black_list:
            for i in self.black_list:
                if i == uid:
                    return True
                else:
                    return False
        else:
            return False

    def relationship_check(self, uid, uname, tag_id, csrf):
        url = self.url_re % uid
        relationship = self.get_requests(url)
        if relationship['code'] == 0:
            if relationship['data']['attribute'] == 1 or relationship['data']['attribute'] == 2:
                if relationship['data']['tag'] is None:
                    self.Move_User(tag_id, uid, uname, csrf)
                else:
                    pass
            else:
                pass

    def Move_User(self, gid, uid, uname, csrf):
        data = {'beforeTagids': 0, 'afterTagids': gid, 'fids': uid, 'csrf': csrf}
        Move_info = self.post_requests(self.url_relationship, data)
        if Move_info['code'] == 0:
            self.logger.info(f'{uname}->【改变分组成功】')
        else:
            self.logger.info(Move_info['message'])

    def send_danmu(self, room_id, csrf):
        emotion_list = ['official_147', 'official_109', 'official_113', 'official_120', 'official_150', 'official_103', 'official_128', 'official_133', 'official_149', 'official_124', 'official_146', 'official_148', 'official_102', 'official_121', 'official_137', 'official_118', 'official_129', 'official_108', 'official_104', 'official_105', 'official_106', 'official_114', 'official_107', 'official_110', 'official_111', 'official_136', 'official_115', 'official_116', 'official_117', 'official_119', 'official_122', 'official_123', 'official_125', 'official_126', 'official_127', 'official_134', 'official_135', 'official_138']
        rnd = int(time.time())
        emotion = random.choice(emotion_list)
        data = {'bubble': 0, 'msg': emotion, 'color': 16777215, 'fontsize': 25, 'mode': 1, 'rnd': rnd, 'dm_type': 1,
                'roomid': room_id, 'csrf': csrf}
        danmu_info = self.post_requests(self.send, data)
        if danmu_info['code'] == 0:
            self.logger.info(f'------->发送弹幕成功')
        else:
            self.logger.info(danmu_info['message'])

    def decorate(self):
        self.logger.info("脚本由GitHub@王权富贵233提供")
        self.logger.info("该脚本仅供学习交流,仅供学习参考,仅供学习参考")
        self.logger.info("脚本现已支持黑名单,前往Bilibili_config.json添加黑名单")
        self.logger.info("格式:black_list = [uid1,uid2,uid3,...,...] uid为数字,逗号为英文逗号")

    def run(self):
        self.decorate()
        for i in range(len(self.cookies)):
            self.headers['Cookie'] = self.cookies[i]
            self.headers['user-agent'] = random.choice(self.ua_list)
            self.headers['referer'] = "https://live.bilibili.com/"
            if self.cope_info(self.get_requests(self.url)):
                self.logger.info("检查帐号有效性成功")
            else:
                self.logger.info("帐号cookie过期,跳过该账号")
                continue
            self.check_group(self.csrfs[i])
            self.logger.info(f"{'*' * 5}第{i + 1}帐号结束{'*' * 5}")

if __name__ == '__main__':
    ctime = Bilibili_CTime()
    ctime.run()
else:
    ctime = Bilibili_CTime()
    ctime.run()

运行日常任务:Bilibili_Daily.py

"""
cron : 5 1 * * *
new Env("哔哩哔哩-【日常】")
"""
from Bilibili_User import *

class Daily(Basic):
    def __init__(self):
        super().__init__()
        self.specify = self.fetch_favorite()

    def check_group(self):
        follows = self.get_requests(self.url9)
        if follows['code'] == 0:
            for i in follows['data']:
                if i['tagid'] == 0:
                    return i['count']
        else:
            return None

    def uid_info(self, coin, csrf):
        count = self.check_group()
        if count == 0:
            self.logger.info("默认分组为空")
        elif 55 >= count > 0:
            url = self.url10 + '?tagid=0'
            self.fetch_uid(url, coin, csrf)
        else:
            page = random.randint(1, int(count / 50) + 1)
            url = self.url10 + '?tagid=0&pn=%s' % page
            self.fetch_uid(url, coin, csrf)

    def fetch_uid(self, url, coin, csrf):
        if len(self.specify) > 0:
            num = 0
            while True:
                mid = random.choice(self.specify)
                self.logger.info("你选择了指定用户投币,uid:%s" % mid)
                bvid_list, video_title = self.cyc_search_uid(mid)
                if bvid_list:
                    if self.cope_video(bvid_list, video_title, coin, csrf):
                        num += 1
                    else:
                        pass
                else:
                    if len(self.specify) == 1:
                        break
                if num == 5:
                    break
            if num < 5:
                self.logger.info("指定up没有可投币的视频,随机投币")
                self.cycle_uid(url, coin, csrf)
        else:
            self.cycle_uid(url, coin, csrf)

    def other_task(self, csrf):
        url = self.url10 + '?tagid=0'
        group_info = self.get_requests(url)
        mid_list, uname_list = self.list_uid(group_info['data'])
        mid = random.choice(mid_list)
        self.logger.info("随机选择了用户:" + uname_list[mid_list.index(mid)])
        bvid_list, video_title = self.cyc_search_uid(mid)
        if bvid_list:
            video = random.choice(bvid_list)
            self.play_video(video, video_title[bvid_list.index(video)])
            self.share_dynamic(video_title[bvid_list.index(video)], video, csrf)
            return True
        else:
            return False

    def cycle_uid(self, url, coin, csrf):
        group_info = self.get_requests(url)
        mid_list, uname_list = self.list_uid(group_info['data'])
        num = 0
        while True:
            mid = random.choice(mid_list)
            self.logger.info("本次投币对象:"+uname_list[mid_list.index(mid)])
            bvid_list, video_title = self.cyc_search_uid(mid)
            if bvid_list:
                if self.cope_video(bvid_list, video_title, coin, csrf):
                    num += 1
                else:
                    pass
            else:
                pass
            if num == 5:
                break

    def cope_video(self, bvid_list, video_title, coin, csrf):
        choice_video = []
        num = 0
        for i in range(len(bvid_list)):
            video = random.choice(bvid_list)
            self.logger.info("视频:%s" % video_title[bvid_list.index(video)])
            if self.c_v_d_i(video, coin, csrf) and video is not choice_video:
                num += 1
                break
            else:
                choice_video.append(video)
        if num == 1:
            return True
        else:
            return False

    @staticmethod
    def list_uid(data):
        mid_list = []
        uname_list = []
        for i in data:
            mid_list.append(i['mid'])
            uname_list.append(i['uname'])
        return mid_list, uname_list

    def search_uid(self, mid):
        url = self.url11 % mid
        video_num = self.get_requests(url)
        if video_num['code'] == 0:
            return video_num['data']['page']['count']
        else:
            return None

    def cyc_search_uid(self, mid):
        num = self.search_uid(mid)
        if num > 50:
            page = random.randint(1, int(num / 50) + 1)
            bvid_list, video_title = self.bvid_page(mid, page)
            return bvid_list, video_title
        elif 0 < num <= 50:
            bvid_list, video_title = self.bvid_page(mid, 1)
            return bvid_list, video_title
        else:
            return None, None

    def bvid_page(self, mid, page):
        url = self.url11 % mid + "&ps=50&pn=%s" % page
        bvid_data = self.get_requests(url)
        if bvid_data['code'] == 0:
            bvid_list, video_title = self.bvid_list(bvid_data['data']['list']['vlist'])
            return bvid_list, video_title
        else:
            self.logger.error(bvid_data)

    @staticmethod
    def bvid_list(data):
        bvid_list = []
        video_title = []
        for i in data:
            bvid_list.append(i['bvid'])
            video_title.append(i['title'])
        return bvid_list, video_title

    def c_v_d_i(self, bvid, coin, csrf):
        url = "https://api.bilibili.com/x/web-interface/archive/coins?bvid=%s" % bvid
        data = self.get_requests(url)
        if data['code'] == 0:
            if data['data']['multiply'] == 0:
                self.drop_coin(bvid, coin, csrf)
                return True
            else:
                self.logger.info("该视频已经投币过,不再投币")
                return False
        else:
            self.logger.error('检查投币情况失败,错误码:%s' % data['code'])
            return False

    def recommend_video(self, bvid):
        url = self.url5 + "?bvid=" + bvid
        data = self.get_requests(url)
        if data['code'] == 0:
            return self.cope_recommend(data['data'])
        else:
            self.logger.error(data['message'])

    @staticmethod
    def cope_recommend(data):
        title = []
        bv = []
        for i in data:
            title.append(i['title'])
            bv.append(i['bvid'])
        return title, bv

    def drop_coin(self, bv, coin, csrf):
        data = {
            'bvid': bv,
            'multiply': coin,
            'csrf': csrf
        }
        drop = self.post_requests(self.url3, data)
        if drop['code'] == 0:
            self.logger.info("【投币成功】")
        else:
            self.logger.error(drop['message'])

    def sign_live(self):
        self.logger.info('开始直播签到')
        sign = self.get_requests(self.url8)
        if sign['code'] == 0:
            self.logger.info('【直播签到成功】')
        else:
            self.logger.info(sign['message'])

    def share_dynamic(self, title, bv, csrf):
        data = {
            "bvid": bv,
            "csrf": csrf
        }
        self.logger.info('开始分享动态,标题%s' % title)
        share = self.post_requests(self.url4, data)
        if share['code'] == 0:
            self.logger.info('【分享动态成功】')
        else:
            self.logger.info(share['message'])

    def play_video(self, bv, title):
        data = {
            "bvid": bv,
            "play_time": random.randint(30, 45),
            "realtime": random.randint(30, 45)
        }
        self.logger.info('开始播放视频,标题%s' % title)
        play = self.post_requests(self.url6, data)
        if play['code'] == 0:
            self.logger.info('【播放视频成功】')
        else:
            self.logger.info(play['message'])

    def clockin(self):
        data = {'platform': 'android'}
        clock_info = self.post_requests(self.clockin_url, data)
        if clock_info['code'] == 0:
            self.logger.info("漫画签到成功")
        else:
            self.logger.info(clock_info['msg'])

    def run(self):
        self.manual()
        self.logger.info('目的是随机给我回点硬币,是随机')
        self.logger.info('本次更新支持不投币,coin设置为0即可')
        cookies = self.fetch_cookies()
        coins = self.fetch_drop_coin()
        csrfs = self.fetch_csrf(cookies)
        for i in cookies:
            self.headers['Cookie'] = i
            self.headers['user-agent'] = random.choice(self.ua_list)
            self.headers['Referer'] = 'https://www.bilibili.com/'
            user_info = self.get_requests(self.url)
            if self.cope_info(user_info):
                self.logger.info('--》帐号有效《--')
            else:
                self.logger.info('--》帐号无效,跳出该帐号《--')
                continue
            self.mao_san(csrfs[cookies.index(i)])
            self.clockin()
            self.sign_live()
            if self.other_task(csrfs[cookies.index(i)]):
                pass
            else:
                self.other_task(csrfs[cookies.index(i)])
            if coins[cookies.index(i)] == 0:
                self.logger.info('设置了不投币,跳过投币任务')
                continue
            else:
                self.uid_info(coins[cookies.index(i)], csrfs[cookies.index(i)])
        self.logger.info("= "*8+"任务完成"+"= "*8)

if __name__ == '__main__':
    Daily = Daily()
    Daily.run()
else:
    Daily = Daily()
    Daily.run()

取消关注up主:Bilibili_Unfollows.py

"""
cron : 2 1 */15 * *
new Env("哔哩哔哩-取关")
"""
from Bilibili_User import Basic

class Unfollows(Basic):
    def __init__(self):
        super().__init__()
        self.white_list = self.fetch_white_list()

    def check_group(self, csrf):
        self.logger.info('检查是否有天选时刻分组')
        group = self.get_requests(self.url9)
        self.cope_group(group, csrf)

    def cope_group(self, group, csrf):
        for i in group['data']:
            if i['name'] == '天选时刻':
                self.logger.info('有天选时刻分组,开始检查关注人数')
                if i['count'] > 0:
                    self.logger.info('天选时刻分组关注人数: %s ***>开始执行取关任务' % i['count'])
                    self.fetch_mid(i['tagid'], i['count'], csrf)
                else:
                    self.logger.info('天选时刻分组关注人数: %s ***>无需取关' % i['count'])
                break
        else:
            self.logger.info('没有天选时刻分组,结束检查')
            return None

    def fetch_mid(self, group_id, count, csrf):
        url = self.url10 + '?tagid=%s' % group_id
        if count <= 50:
            group_info = self.get_requests(url)
            userid, uname = self.cope_User(group_info)
            self.cyc_unfollow(userid, uname, csrf)
        else:
            for i in range(int(count / 50) + 1):
                group_info = self.get_requests(url)
                userid, uname = self.cope_User(group_info)
                self.cyc_unfollow(userid, uname, csrf)

    def screen_white_list(self, userid):
        if self.white_list is None:
            return False
        if userid in self.white_list:
            return True
        else:
            return False

    @staticmethod
    def cope_User(group_info):
        mid = []
        uname = []
        for i in group_info['data']:
            mid.append(i['mid'])
            uname.append(i['uname'])
        return mid, uname

    def cyc_unfollow(self, mid, uname, csrf):
        for i in range(len(mid)):
            if self.screen_white_list(mid[i]):
                self.logger.info('【%s】在白名单中,跳过' % uname[i])
                continue
            else:
                self.unfollow(mid[i], csrf)
                self.logger.info('取关: 【%s】' % uname[i])

    def unfollow(self, mid, csrf):
        data = {'fid': mid, 'act': 2, 're_src': 11, 'csrf': csrf}
        unfollow = self.post_requests(self.url1, data)
        self.unfollow_info(unfollow)

    def unfollow_info(self, unfollow):
        if unfollow['code'] == 0:
            self.logger.info('取关成功')
        else:
            self.logger.info('取关失败')

    def run(self):
        self.logger.info('脚本支持白名单,可以在Bilibili_config.json设置不取关的用户ID')
        self.logger.info('白名单格式: [uid1, uid2, uid3, ...] uid为数字,逗号为英文逗号')
        self.logger.info('脚本作者为github@wangquanfugui233')
        self.logger.info("*" * 6 + "开始取关" + "*" * 6)
        for i in range(len(self.cookies)):
            self.headers['cookie'] = self.cookies[i]
            self.headers['referer'] = "https://www.bilibili.com/"
            self.check_group(self.csrfs[i])
        self.logger.info("=============》结束《============")

if __name__ == '__main__':
    Unfollow = Unfollows()
    Unfollow.run()
else:
    Unfollow = Unfollows()
    Unfollow.run()

中奖通知:Bilibili_Prize.py

"""
cron 1 23 * * *
new Env("哔哩哔哩-天选中奖通知")

"""
from Bilibili_User import *
# from notify import send  # 暂时不对接
import datetime

class prize(Basic):
    def __init__(self):
        super().__init__()
        self.msg = '详细信息:\n'

    def get_prize(self):
        data = self.get_requests(self.prize)
        if data['code'] == 0:
            self.cope_prize(data['data']['list'])
        else:
            self.logger.info(data)

    def cope_prize(self, data):
        dt = time.strftime("%Y-%m-%d ", time.localtime())
        for i in data:
            date_time = datetime.datetime.strptime(i['end_time'], '%Y-%m-%d %H:%M:%S')
            if date_time.strftime('%Y-%m-%d ') == dt:
                self.logger.info('恭喜你中奖,奖品是:%s --- 兑奖up主是:%s' % (i['award_name'], i['anchor_name']))
                self.logger.info('中将时间是:%s' % i['end_time'])
                self.msg += '\n 中将时间:【%s】 今天的天选中奖,奖品是:%s --- 兑奖up主是:%s' % (
                    i['end_time'], i['award_name'], i['anchor_name'])
                self.logger.info("检查白名单")
                self.check_white_list(i['anchor_uid'])
            else:
                pass
        if len(data) == 0:
            self.msg += '毛也没有中'

    def check_white_list(self, uid):
        num = 0
        with open('./Bilibili_config.json', 'r') as f:
            data = json.load(f)
            if data['white_list']:
                for i in data['white_list']:
                    if i == uid:
                        self.logger.info('已存在于白名单')
                        num += 1
                        break  # 如果存在于白名单,则跳出循环
            else:
                self.logger.info('白名单为空')
                self.update_white_list(data, uid)
        if num == 0:
            self.update_white_list(data, uid)

    @staticmethod
    def update_white_list(data, uid):
        data['white_list'].append(uid)
        with open('./Bilibili_config.json', 'w', encoding="utf-8") as f:
            json.dump(data, f, ensure_ascii=False, indent=4)
        print('加入白名单成功')

    def ql_send(self):
        # send("Bilibili天选通知", self.msg)   # 暂时不用
        print(self.msg)

    def run(self):
        cookies = self.fetch_cookies()
        for i in cookies:
            self.headers['Cookie'] = i
            user = self.get_requests(self.url)
            self.msg += "用户:%s \n" % user['data']['uname']
            self.cope_info(user)
            self.get_prize()
        try:
            self.ql_send()
        except Exception as e:
            self.logger.info('推送失败')
            print(e)

if __name__ == '__main__':
    prize = prize()
    prize.run()
else:
    prize = prize()
    prize.run()

关注源作者和其他:Bilibili_User.py

"""
cron : 2 1 * * *
new Env("哔哩哔哩-【basic】")
"""

import json
import random
import time
from Bilibili_Config import Config
import requests

class Basic(Config):
    def __init__(self):
        super().__init__()
        self.cookies = self.fetch_cookies()
        self.csrfs = self.fetch_csrf(self.cookies)
        self.coin = self.fetch_drop_coin()

    def get_requests(self, url):
        try:
            time.sleep(1)
            self.headers['user-agent'] = random.choice(self.ua_list)
            with requests.session() as s:
                # 代理
                proxies = {'http': 'http://202.55.5.209:8090'}
                s.keep_alive = False
                s.adapters.DEFAULT_RETRIES = 2
                r = s.get(url, headers=self.headers, proxies=proxies)
                if r.status_code == 200:
                    data = json.loads(r.text)
                    return data
                else:
                    data_error = json.loads(r.text)
                    return data_error
        except Exception as e:
            self.logger.error('请求失败,错误信息:{}'.format(e))

    def post_requests(self, url, data):
        try:
            self.headers['method'] = 'POST'
            self.headers['Content-Type'] = 'application/x-www-form-urlencoded'
            time.sleep(1)
            with requests.session() as s:
                s.keep_alive = False
                r = s.post(url, headers=self.headers, data=data)
                if r.status_code == 200:
                    post_data = json.loads(r.text)
                    return post_data
                else:
                    data_error = json.loads(r.text)
                    return data_error
        except Exception as e:
            self.logger.error('请求失败,错误信息:{}'.format(e))

    def check_author(self):
        url = self.url_re % 289549318
        author = self.get_requests(url)
        if author['code'] == 0:
            if author['data']['attribute'] != 0:
                return True
            else:
                return False
        else:
            return False

    def follow_author(self, csrf):
        data = {'fid': 289549318, 'act': 1, 're_src': 11, 'csrf': csrf}
        follow = self.post_requests(self.url1, data)
        if follow['code'] == 0:
            self.logger.info("关注作者成功")
        else:
            self.logger.info(follow['message'])

    def cope_info(self, data):
        if data['code'] == 0:
            self.logger.info("**********" + data['data']['uname'] + "**********")
            self.logger.info("当前经验值:" + str(data['data']['level_info']['current_exp']))
            if data['data']['level_info']['current_level'] == 6:
                self.logger.info("你已经是lv6的大佬了")
            else:
                level_day = (data['data']['level_info']['next_exp'] - data['data']['level_info']['current_exp']) / 65
                self.logger.info('当前硬币数:' + str(data['data']['money']) + ",下一等级升级天数约" + str(int(level_day)))
            return True
        elif data['code'] == -101:
            self.logger.info(data['message'] + "请检查cookie")
            return False
        elif data['code'] == -111:
            self.logger.info(data['message'] + "请检查csrf")
        else:
            self.logger.info(data['message'])

    def mao_san(self, csrf):
        if self.fetch_follow():
            if self.check_author():
                pass
            else:
                self.follow_author(csrf)
        else:
            pass

    def manual(self):
        self.logger.info("author: github@wangquanfugui233")
        self.logger.info("本次更新加入关注作者B站号,将随机为作者提供0.1个硬币")
        self.logger.info("这让你感到不适,请删除脚本或者配置文件将follow_author设置为小写false")

    def user_info(self):
        self.manual()
        for i in range(len(self.cookies)):
            self.headers['Cookie'] = self.cookies[i]
            user = self.get_requests(self.url)
            self.cope_info(user)

if __name__ == '__main__':
    cope = Basic()
    cope.user_info()
赞(4) 打赏
转载请带上源站链接:玖伴一鹏 » Bilibili 自动做升级任务以及参加天选

觉得文章有用就打赏一下文章作者

非常感谢你的打赏,我们将继续给力更多优质内容,让我们一起创建更加美好的网络世界!

支付宝扫一扫打赏

微信扫一扫打赏