安装Docker

这里用的是Ubuntu的系统,就直接使用APT来安装docker.io。

apt update
apt install docker.io

安装青龙面板

cd /opt/
docker run -dit   -v $PWD/ql/config:/ql/config   -v $PWD/ql/log:/ql/log   -v $PWD/ql/db:/ql/db   -v $PWD/ql/repo:/ql/repo   -v $PWD/ql/raw:/ql/raw   -v $PWD/ql/scripts:/ql/scripts   -p 5700:5700   --name qinglong   --hostname qinglong   --restart unless-stopped   whyour/qinglong:latest

这里就已经安装完成了,访问服务器地址5700端口就可以访问青龙面板了。

限制面板CPU使用

实际使用的时候碰到过几次青龙面板占用CPU过高,其他服务无法访问,可以通过以下命令限制面板cpu使用。我的CPU是4核,配置使用2个核心,如果是单核这里可以配置为小数。

docker update --cpus=2 qinglong

配置青龙环境

Shell下运行以下命令,进入docker

docker exec -it qinglong bash

安装依赖

npm install -g npm
pip3 install requests
pip3 install pytz
npm install -g download
pnpm install jsdom
apk add --no-cache build-base g++ cairo-dev pango-dev giflib-dev && cd scripts && npm install canvas png-js md5 date-fns axios crypto-js tslib ts-md5 @types/node --build-from-source

其他的依赖就可以在依赖管理里面手动添加,目前需要安装的依赖如下
NODEJS:

png-js
crypto-js
typescript
canvas
js-base64
xml
tslib
json
axios
md5
jsdom
download
ts-md5

PYTHON3

pytz
requests

后续脚本运行时候,如果有提示Cannot find module './xxxx'就可能是依赖安装不完整,可以自行百度,并在这里安装对应的依赖。

拉库

面板安装完成之后需要拉脚本,这里直接配置计划任务,定时更新脚本并禁用重复脚本,在脚本管理页面新建脚本update.sh,disable.py
update.sh脚本内容:

ql repo https://github.com/zero205/JD_tencent_scf.git "jd_|jx_|getJDCookie" "backUp|icon" "^jd[^_]|USER|sendNotify|sign_graphics_validate|JDJR|JDSign" "main"
ql repo https://github.com/JDHelloWorld/jd_scripts.git "jd_|jx_|getCookie" "activity|backUp|Coupon|enen|update" "^jd[^_]|USER|tools"
ql repo https://github.com/shufflewzc/faker2.git "jd_|jx_|gua_|jddj_|getJDCookie" "activity|backUp" "^jd[^_]|USER|function|utils|sendnotify|ZooFaker_Necklace.js|JDJRValidator_|sign_graphics_validate|ql"
ql repo https://github.com/KingRan/JDJB.git "jd_|jx_|jdCookie" "activity|backUp" "^jd[^_]|USER|utils|function|sign|sendNotify|ql|JDJR"
ql repo https://github.com/smiek2221/scripts.git "jd_|gua_" "" "ZooFaker_Necklace.js|JDJRValidator_Pure.js|sign_graphics_validate.js"
ql repo https://github.com/ccwav/QLScript2.git "jd_" "NoUsed" "ql|utils|USER_AGENTS|jdCookie|JS_USER_AGENTS"
ql repo https://github.com/Tsukasa007/my_script.git "" "jdCookie|USER_AGENTS|sendNotify|backup" "" "master"
python3 disable.py

disable.py脚本内容:

# -*- coding:utf-8 -*-


import json
import logging
import os
import sys
import time
import traceback
import requests

logger = logging.getLogger(name=None)  # 创建一个日志对象
logging.Formatter("%(message)s")  # 日志内容格式化
logger.setLevel(logging.INFO)  # 设置日志等级
logger.addHandler(logging.StreamHandler())  # 添加控制台日志
# logger.addHandler(logging.FileHandler(filename="text.log", mode="w"))  # 添加文件日志


ip = "localhost"
sub_str = os.getenv("RES_SUB", "shufflewzc_faker2")
sub_list = sub_str.split("&")
res_only = os.getenv("RES_ONLY", True)
headers = {
    "Accept": "application/json",
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.131 Safari/537.36",
}


def load_send() -> None:
    logger.info("加载推送功能中...")
    global send
    send = None
    cur_path = os.path.abspath(os.path.dirname(__file__))
    sys.path.append(cur_path)
    if os.path.exists(cur_path + "/notify.py"):
        try:
            from notify import send
        except Exception:
            send = None
            logger.info(f"❌加载通知服务失败!!!\n{traceback.format_exc()}")


def get_tasklist() -> list:
    tasklist = []
    t = round(time.time() * 1000)
    url = f"http://{ip}:5700/api/crons?searchValue=&t={t}"
    response = requests.get(url=url, headers=headers)
    datas = json.loads(response.content.decode("utf-8"))
    if datas.get("code") == 200:
        tasklist = datas.get("data")
    return tasklist


def filter_res_sub(tasklist: list) -> tuple:
    filter_list = []
    res_list = []
    for task in tasklist:
        for sub in sub_list:
            if task.get("command").find(sub) == -1:
                flag = False
            else:
                flag = True
                break
        if flag:
            res_list.append(task)
        else:
            filter_list.append(task)
    return filter_list, res_list


def get_index(lst: list, item: str) -> list:
    return [index for (index, value) in enumerate(lst) if value == item]


def get_duplicate_list(tasklist: list) -> tuple:
    logger.info("\n=== 第一轮初筛开始 ===")

    ids = []
    names = []
    cmds = []
    for task in tasklist:
        ids.append(task.get("_id"))
        names.append(task.get("name"))
        cmds.append(task.get("command"))

    name_list = []
    for i, name in enumerate(names):
        if name not in name_list:
            name_list.append(name)

    tem_tasks = []
    tem_ids = []
    dup_ids = []
    for name2 in name_list:
        name_index = get_index(names, name2)
        for i in range(len(name_index)):
            if i == 0:
                logger.info(f"【✅保留】{cmds[name_index[0]]}")
                tem_tasks.append(tasklist[name_index[0]])
                tem_ids.append(ids[name_index[0]])
            else:
                logger.info(f"【?禁用】{cmds[name_index[i]]}")
                dup_ids.append(ids[name_index[i]])
        logger.info("")

    logger.info("=== 第一轮初筛结束 ===")

    return tem_ids, tem_tasks, dup_ids


def reserve_task_only(
    tem_ids: list, tem_tasks: list, dup_ids: list, res_list: list
) -> list:
    if len(tem_ids) == 0:
        return tem_ids

    logger.info("\n=== 最终筛选开始 ===")
    task3 = None
    for task1 in tem_tasks:
        for task2 in res_list:
            if task1.get("name") == task2.get("name"):
                dup_ids.append(task1.get("_id"))
                logger.info(f"【✅保留】{task2.get('command')}")
                task3 = task1
        if task3:
            logger.info(f"【?禁用】{task3.get('command')}\n")
            task3 = None
    logger.info("=== 最终筛选结束 ===")
    return dup_ids


def disable_duplicate_tasks(ids: list) -> None:
    t = round(time.time() * 1000)
    url = f"http://{ip}:5700/api/crons/disable?t={t}"
    data = json.dumps(ids)
    headers["Content-Type"] = "application/json;charset=UTF-8"
    response = requests.put(url=url, headers=headers, data=data)
    datas = json.loads(response.content.decode("utf-8"))
    if datas.get("code") != 200:
        logger.info(f"❌出错!!!错误信息为:{datas}")
    else:
        logger.info("?成功禁用重复任务~")


def get_token() -> str or None:
    try:
        with open("/ql/config/auth.json", "r", encoding="utf-8") as f:
            data = json.load(f)
    except Exception:
        logger.info(f"❌无法获取 token!!!\n{traceback.format_exc()}")
        send("?禁用重复任务失败", "无法获取 token!!!")
        exit(1)
    return data.get("token")


if __name__ == "__main__":
    logger.info("===> 禁用重复任务开始 <===")
    load_send()
    token = get_token()
    headers["Authorization"] = f"Bearer {token}"

    # 获取过滤后的任务列表
    sub_str = "\n".join(sub_list)
    logger.info(f"\n=== 你选择过滤的任务前缀为 ===\n{sub_str}")
    tasklist = get_tasklist()
    if len(tasklist) == 0:
        logger.info("❌无法获取 tasklist!!!")
        exit(1)
    filter_list, res_list = filter_res_sub(tasklist)

    tem_ids, tem_tasks, dup_ids = get_duplicate_list(filter_list)
    # 是否在重复任务中只保留设置的前缀
    if res_only:
        ids = reserve_task_only(tem_ids, tem_tasks, dup_ids, res_list)
    else:
        ids = dup_ids
        logger.info("你选择保留除了设置的前缀以外的其他任务")

    sum = f"所有任务数量为:{len(tasklist)}"
    filter = f"过滤的任务数量为:{len(res_list)}"
    disable = f"禁用的任务数量为:{len(ids)}"
    logging.info("\n=== 禁用数量统计 ===\n" + sum + "\n" + filter + "\n" + disable)

    if len(ids) == 0:
        logger.info("?没有重复任务~")
    else:
        disable_duplicate_tasks(ids)
    if send:
        send("?禁用重复任务成功", f"\n{sum}\n{filter}\n{disable}")

然后再定时任务页面添加定时任务,名称随意,命令task update.sh,定时规则33 */6 * * *,每隔6小时的33分钟运行脚本。更新脚本,禁止重复任务。
请输入图片描述

添加京东Cookie

环境变量里添加变量 名称JD_COOKIE
值: 一行一个,可以配置多个账号

pt_key=*******;pt_pin=*****;

获取cookie教程网上很多,这里讲一个简单的,浏览器安装EditThisCookie,InPrivate窗口打开https://m.jd.com/,登录完成到首页后,打开EditThisCookie搜索pt_key,pt_pin,注意,退出账号后cookie会失效,如果需要登录多账号,可以开启多个InPrivate窗口登录账号,获取完成后关闭窗口在开启一个窗口登录就可以。

到这里所有的就已经配置完成了,手动执行下定时任务下的拉库任务,检查更新日志,脚本就已经拉取成功了。

如果觉得我的文章对你有用,请随意赞赏