起因

由于之前的天翼云cdn到期,根据其官网文档

2024年2月20日起,所有天翼云用户只要新开通CDN加速业务或订购静态HTTPS请求包,且客户的域名开启HTTPS或QUIC功能后,将根据访问CDN产生的静态HTTPS请求数或静态QUIC请求数计费,即会产生额外的增值服务费用。
新开通的cdn资源包将收取https费用。然后博主就想起腾讯云之前推出的EdgeOne,只需要36元就可以拥有每个月50Gcdn流量和300万次https请求:

edgeone新人优惠

果断下单付款。ps:现在eo免费版出来了,可以去直接领
开通后发现有安全防护-源站防护可以获取到回源节点信息,就想着把回源节点加入可信ip,防止有人伪造真实ip请求头对网站进行攻击。发现这个IP段信息有可能会有改动,每次手动同步麻烦又要时刻盯着,所以想到用云api来获取相关信息,在此分享下方法.

一、设置CAM

对于腾讯云而言,虽然主账号API的AK和SK也是能用于请求云api获取信息,博主不建议在长期的应用中使用,尤其需要格外提防由于密钥泄露造成的高额损失。腾讯云CAM位于【云资源管理】-【访问管理】下,可点击快速前往,在用户列表中新建一个CAM子账户,可选择编程访问,权限选择【QcloudTEOFullAccess】,则创建完成后会自动给出云api所需的认证信息:认证信息

二、找到云api文档,写自动脚本

1.找到腾讯云云api

在腾讯云文档里找到关于【查询源站防护详情】【确认回源 IP 网段更新】的文档,这就是我们要找的接口了,使用 API Explorer可以查看接口返回数据和示例代码,发现必要的参数就一个ZoneId,ZoneId可以在【EdgeOne服务总览】里找到

2.找到宝塔的文件操作接口

由于宝塔的接口文档实在是不好找,所以我直接F12抓取了一个/files?action=SaveFileBody,发现能用
具体代码

#https://1024code.com/embed-ide/@xss/qot7bgs
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import hashlib
import hmac
import json
import sys
import time
from datetime import datetime
import http.client
from urllib.parse import urlparse, urlencode
from http.cookiejar import CookieJar, Cookie

# 核心配置
CONFIG_FILE = "/www/server/panel/vhost/nginx/real_cdn_ip.conf"  # 目标配置文件
BT_PANEL = "http://127.0.0.1:8888"  # 宝塔面板地址
BT_KEY = ""  # 宝塔接口密钥
ZONE_ID = "zone-xxx"  # 腾讯云ZoneId



class BtFileOperator:
    """宝塔面板文件操作工具类"""
    def __init__(self, panel_url=BT_PANEL, api_key=BT_KEY):
        self.panel_url = panel_url
        self.api_key = api_key
        self.parsed_url = urlparse(panel_url)
        self.cookie_jar = CookieJar()
        self.cookie_file = f"./{hashlib.md5(panel_url.encode()).hexdigest()}.cookie"
        self._load_cookies()

    def _load_cookies(self):
        """加载已保存的cookie"""
        if os.path.exists(self.cookie_file):
            with open(self.cookie_file, 'r') as f:
                cookie_data = f.read()
            # 解析cookie字符串并添加到cookie jar
            for cookie in cookie_data.split(';'):
                if '=' in cookie:
                    name, value = cookie.strip().split('=', 1)
                    self.cookie_jar.set_cookie(
                        Cookie(
                            version=0, name=name, value=value,
                            port=None, port_specified=False,
                            domain=self.parsed_url.hostname, domain_specified=True, domain_initial_dot=False,
                            path='/', path_specified=True,
                            secure=False, expires=None,
                            discard=False, comment=None, comment_url=None,
                            rest={'HttpOnly': None}, rfc2109=False
                        )
                    )

    def _save_cookies(self, headers):
        """保存cookie到文件"""
        if 'Set-Cookie' in headers:
            with open(self.cookie_file, 'w') as f:
                f.write(headers['Set-Cookie'])

    def _get_sign_data(self):
        """生成宝塔API签名数据"""
        now_time = int(time.time())
        return {
            'request_token': hashlib.md5(f"{now_time}{hashlib.md5(self.api_key.encode()).hexdigest()}".encode()).hexdigest(),
            'request_time': now_time
        }

    def _http_post(self, path, data):
        """使用Python 3内置库发送POST请求"""
        # 准备POST数据
        post_data = urlencode(data).encode('utf-8')
        
        # 准备headers
        headers = {
            'Content-Type': 'application/x-www-form-urlencoded',
            'Cookie': '; '.join([f"{c.name}={c.value}" for c in self.cookie_jar])
        }
        
        # 建立连接
        if self.parsed_url.scheme == 'https':
            conn = http.client.HTTPSConnection(self.parsed_url.hostname, self.parsed_url.port or 443)
        else:
            conn = http.client.HTTPConnection(self.parsed_url.hostname, self.parsed_url.port or 80)
        
        try:
            conn.request("POST", path, post_data, headers)
            response = conn.getresponse()
            self._save_cookies(response.headers)
            return response.read().decode('utf-8')
        finally:
            conn.close()

    def write_file(self, file_path, content):
        """通过宝塔API写入文件(覆盖原有内容)"""
        path = "/files?action=SaveFileBody"
        data = self._get_sign_data()
        data.update({
            'path': file_path,
            'data': content,
            'encoding': 'utf-8'
        })

        try:
            response_text = self._http_post(path, data)
            return json.loads(response_text)
        except Exception as e:
            print(f"通过宝塔API写入文件失败: {str(e)}")
            return None

    def reload_nginx(self):
        """通过宝塔API重载Nginx服务"""
        path = "/system?action=ServiceReload"
        data = self._get_sign_data()
        data['name'] = 'nginx'

        try:
            response_text = self._http_post(path, data)
            result = json.loads(response_text)
            return result if result.get('status') else False
        except Exception as e:
            print(f"通过宝塔API重载Nginx失败: {str(e)}")
            return False


def sign(key, msg):
    """计算腾讯云API签名"""
    return hmac.new(key, msg.encode("utf-8"), hashlib.sha256).digest()


def get_tencent_teo_ips(secret_id, secret_key, zone_id):
    """从腾讯云TEO API同时获取IPv4和IPv6列表"""
    service = "teo"
    host = "teo.tencentcloudapi.com"
    version = "2022-09-01"
    action = "DescribeOriginACL"
    payload = json.dumps({"ZoneId": zone_id})
    algorithm = "TC3-HMAC-SHA256"
    timestamp = int(time.time())
    date = datetime.utcfromtimestamp(timestamp).strftime("%Y-%m-%d")

    # 构建规范请求串
    http_method = "POST"
    canonical_uri = "/"
    canonical_querystring = ""
    ct = "application/json; charset=utf-8"
    canonical_headers = f"content-type:{ct}\nhost:{host}\nx-tc-action:{action.lower()}\n"
    signed_headers = "content-type;host;x-tc-action"
    hashed_payload = hashlib.sha256(payload.encode()).hexdigest()
    canonical_request = f"{http_method}\n{canonical_uri}\n{canonical_querystring}\n{canonical_headers}\n{signed_headers}\n{hashed_payload}"

    # 构建签名串
    credential_scope = f"{date}/{service}/tc3_request"
    hashed_canonical = hashlib.sha256(canonical_request.encode()).hexdigest()
    string_to_sign = f"{algorithm}\n{timestamp}\n{credential_scope}\n{hashed_canonical}"

    # 计算签名
    secret_date = sign(("TC3" + secret_key).encode(), date)
    secret_service = sign(secret_date, service)
    secret_signing = sign(secret_service, "tc3_request")
    signature = hmac.new(secret_signing, string_to_sign.encode(), hashlib.sha256).hexdigest()

    # 构建请求头
    headers = {
        "Authorization": f"{algorithm} Credential={secret_id}/{credential_scope}, SignedHeaders={signed_headers}, Signature={signature}",
        "Content-Type": ct,
        "Host": host,
        "X-TC-Action": action,
        "X-TC-Timestamp": str(timestamp),
        "X-TC-Version": version
    }

    # 发起请求并解析
    try:
        conn = http.client.HTTPSConnection(host)
        conn.request(http_method, "/", body=payload.encode(), headers=headers)
        resp = conn.getresponse()
        resp_data = json.loads(resp.read().decode('utf-8'))
        conn.close()
        
        # 同时提取IPv4和IPv6地址列表
        entire_addresses = resp_data["Response"]["OriginACLInfo"]["CurrentOriginACL"]["EntireAddresses"]
        return {
            "ipv4": entire_addresses.get("IPv4", []),
            "ipv6": entire_addresses.get("IPv6", [])
        }
    except Exception as e:
        print(f"获取腾讯云IP列表失败: {str(e)}")
        return None


def check_need_update(secret_id, secret_key, zone_id):
    """检查是否需要更新IP"""
    payload = json.dumps({"ZoneId": zone_id})
    resp = api_request(secret_id, secret_key, "ConfirmOriginACLUpdate", payload)
    
    if not resp:
        return True  # 接口异常默认需要更新
    
    return not ("Response" in resp and "Error" in resp["Response"] and 
                resp["Response"]["Error"].get("Code") == "OperationDenied.LatestVersionNow")


def api_request(secret_id, secret_key, action, payload, token=""):
    """腾讯云API通用请求函数"""
    service = "teo"
    host = "teo.tencentcloudapi.com"
    region = ""
    version = "2022-09-01"
    algorithm = "TC3-HMAC-SHA256"
    timestamp = int(time.time())
    date = datetime.utcfromtimestamp(timestamp).strftime("%Y-%m-%d")

    # 构建签名
    http_method = "POST"
    canonical_uri = "/"
    canonical_querystring = ""
    ct = "application/json; charset=utf-8"
    canonical_headers = f"content-type:{ct}\nhost:{host}\nx-tc-action:{action.lower()}\n"
    signed_headers = "content-type;host;x-tc-action"
    hashed_payload = hashlib.sha256(payload.encode()).hexdigest()
    canonical_request = f"{http_method}\n{canonical_uri}\n{canonical_querystring}\n{canonical_headers}\n{signed_headers}\n{hashed_payload}"

    credential_scope = f"{date}/{service}/tc3_request"
    hashed_canonical = hashlib.sha256(canonical_request.encode()).hexdigest()
    string_to_sign = f"{algorithm}\n{timestamp}\n{credential_scope}\n{hashed_canonical}"

    secret_date = sign(("TC3" + secret_key).encode(), date)
    secret_service = sign(secret_date, service)
    secret_signing = sign(secret_service, "tc3_request")
    signature = hmac.new(secret_signing, string_to_sign.encode(), hashlib.sha256).hexdigest()

    headers = {
        "Authorization": f"{algorithm} Credential={secret_id}/{credential_scope}, SignedHeaders={signed_headers}, Signature={signature}",
        "Content-Type": ct,
        "Host": host,
        "X-TC-Action": action,
        "X-TC-Timestamp": str(timestamp),
        "X-TC-Version": version
    }
    if region:
        headers["X-TC-Region"] = region
    if token:
        headers["X-TC-Token"] = token

    try:
        conn = http.client.HTTPSConnection(host)
        conn.request(http_method, "/", body=payload.encode(), headers=headers)
        resp = conn.getresponse()
        resp_data = json.loads(resp.read().decode('utf-8'))
        conn.close()
        return resp_data
    except Exception as e:
        print(f"API请求失败: {str(e)}")
        return None


def main():
    # 腾讯云密钥
    secret_id = ""
    secret_key = ""

    # 初始化宝塔操作工具
    bt = BtFileOperator()

    # 步骤1:检查是否需要更新
    if not check_need_update(secret_id, secret_key, ZONE_ID):
        print("IP列表已是最新,无需更新")
        return

    # 步骤2:获取最新IP列表(同时包含IPv4和IPv6)
    ip_data = get_tencent_teo_ips(secret_id, secret_key, ZONE_ID)
    if not ip_data:
        print("未获取到有效IP列表,终止操作")
        return

    # 统计IP数量
    ipv4_count = len(ip_data["ipv4"])
    ipv6_count = len(ip_data["ipv6"])
    print(f"获取到 {ipv4_count} 条IPv4地址和 {ipv6_count} 条IPv6地址")

    # 步骤3:生成配置内容(同时包含IPv4和IPv6)
    config_content = ["real_ip_header X-Forwarded-For;\n"]
    
    # 添加IPv4规则
    if ipv4_count > 0:
        config_content.append("\n# IPv4地址段\n")
        config_content.extend([f"set_real_ip_from {ip};\n" for ip in ip_data["ipv4"]])
    
    # 添加IPv6规则
    if ipv6_count > 0:
        config_content.append("\n# IPv6地址段\n")
        config_content.extend([f"set_real_ip_from {ip};\n" for ip in ip_data["ipv6"]])

    # 步骤4:通过宝塔API写入配置文件
    print(f"准备写入配置文件: {CONFIG_FILE}")
    write_result = bt.write_file(CONFIG_FILE, ''.join(config_content))
    if not write_result or not write_result.get('status'):
        print(f"文件写入失败: {write_result.get('msg', '未知错误')}")
        return

    # 步骤5:通过宝塔API重载Nginx
    reload_result = bt.reload_nginx()
    if reload_result:
        print(f"配置更新完成,共写入 {ipv4_count + ipv6_count} 条IP规则,Nginx已重载")
    else:
        print(f"配置已写入,共 {ipv4_count + ipv6_count} 条IP规则,但Nginx重载失败,请手动重启")


if __name__ == "__main__":
    main()

将上述代码设置cron一天运行一次即可实现自动更新回源ip到宝塔real-ip.conf可信ip配置

Last modification:July 25, 2025