起因
由于之前的天翼云cdn到期,根据其官网文档
2024年2月20日起,所有天翼云用户只要新开通CDN加速业务或订购静态HTTPS请求包,且客户的域名开启HTTPS或QUIC功能后,将根据访问CDN产生的静态HTTPS请求数或静态QUIC请求数计费,即会产生额外的增值服务费用。
新开通的cdn资源包将收取https费用。然后博主就想起腾讯云之前推出的EdgeOne,只需要36元就可以拥有每个月50Gcdn流量和300万次https请求:
果断下单付款。ps:现在eo免费版出来了,可以去直接领
开通后发现有安全防护-源站防护可以获取到回源节点信息,就想着把回源节点加入可信ip,防止有人伪造真实ip请求头对网站进行攻击。发现这个IP段信息有可能会有改动,每次手动同步麻烦又要时刻盯着,所以想到用云api来获取相关信息,在此分享下方法.
一、设置CAM
对于腾讯云而言,虽然主账号API的AK和SK也是能用于请求云api获取信息,博主不建议在长期的应用中使用,尤其需要格外提防由于密钥泄露造成的高额损失。腾讯云CAM位于【云资源管理】-【访问管理】下,可点击快速前往,在用户列表中新建一个CAM子账户,可选择编程访问,权限选择【QcloudTEOFullAccess】,则创建完成后会自动给出云api所需的认证信息:
二、找到云api文档,写自动脚本
1.找到腾讯云云api
在腾讯云文档里找到关于【查询源站防护详情】和【确认回源 IP 网段更新】的文档,这就是我们要找的接口了,使用 API Explorer可以查看接口返回数据和示例代码,发现必要的参数就一个ZoneId,ZoneId可以在【EdgeOne服务总览】里找到
2.找到宝塔的文件操作接口
由于宝塔的接口文档实在是不好找,所以我直接F12抓取了一个/files?action=SaveFileBody,发现能用
具体代码
#https://1024code.com/embed-ide/@xss/qot7bgs
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import hashlib
import hmac
import json
import sys
import time
from datetime import datetime
import http.client
from urllib.parse import urlparse, urlencode
from http.cookiejar import CookieJar, Cookie
# 核心配置
CONFIG_FILE = "/www/server/panel/vhost/nginx/real_cdn_ip.conf" # 目标配置文件
BT_PANEL = "http://127.0.0.1:8888" # 宝塔面板地址
BT_KEY = "" # 宝塔接口密钥
ZONE_ID = "zone-xxx" # 腾讯云ZoneId
class BtFileOperator:
"""宝塔面板文件操作工具类"""
def __init__(self, panel_url=BT_PANEL, api_key=BT_KEY):
self.panel_url = panel_url
self.api_key = api_key
self.parsed_url = urlparse(panel_url)
self.cookie_jar = CookieJar()
self.cookie_file = f"./{hashlib.md5(panel_url.encode()).hexdigest()}.cookie"
self._load_cookies()
def _load_cookies(self):
"""加载已保存的cookie"""
if os.path.exists(self.cookie_file):
with open(self.cookie_file, 'r') as f:
cookie_data = f.read()
# 解析cookie字符串并添加到cookie jar
for cookie in cookie_data.split(';'):
if '=' in cookie:
name, value = cookie.strip().split('=', 1)
self.cookie_jar.set_cookie(
Cookie(
version=0, name=name, value=value,
port=None, port_specified=False,
domain=self.parsed_url.hostname, domain_specified=True, domain_initial_dot=False,
path='/', path_specified=True,
secure=False, expires=None,
discard=False, comment=None, comment_url=None,
rest={'HttpOnly': None}, rfc2109=False
)
)
def _save_cookies(self, headers):
"""保存cookie到文件"""
if 'Set-Cookie' in headers:
with open(self.cookie_file, 'w') as f:
f.write(headers['Set-Cookie'])
def _get_sign_data(self):
"""生成宝塔API签名数据"""
now_time = int(time.time())
return {
'request_token': hashlib.md5(f"{now_time}{hashlib.md5(self.api_key.encode()).hexdigest()}".encode()).hexdigest(),
'request_time': now_time
}
def _http_post(self, path, data):
"""使用Python 3内置库发送POST请求"""
# 准备POST数据
post_data = urlencode(data).encode('utf-8')
# 准备headers
headers = {
'Content-Type': 'application/x-www-form-urlencoded',
'Cookie': '; '.join([f"{c.name}={c.value}" for c in self.cookie_jar])
}
# 建立连接
if self.parsed_url.scheme == 'https':
conn = http.client.HTTPSConnection(self.parsed_url.hostname, self.parsed_url.port or 443)
else:
conn = http.client.HTTPConnection(self.parsed_url.hostname, self.parsed_url.port or 80)
try:
conn.request("POST", path, post_data, headers)
response = conn.getresponse()
self._save_cookies(response.headers)
return response.read().decode('utf-8')
finally:
conn.close()
def write_file(self, file_path, content):
"""通过宝塔API写入文件(覆盖原有内容)"""
path = "/files?action=SaveFileBody"
data = self._get_sign_data()
data.update({
'path': file_path,
'data': content,
'encoding': 'utf-8'
})
try:
response_text = self._http_post(path, data)
return json.loads(response_text)
except Exception as e:
print(f"通过宝塔API写入文件失败: {str(e)}")
return None
def reload_nginx(self):
"""通过宝塔API重载Nginx服务"""
path = "/system?action=ServiceReload"
data = self._get_sign_data()
data['name'] = 'nginx'
try:
response_text = self._http_post(path, data)
result = json.loads(response_text)
return result if result.get('status') else False
except Exception as e:
print(f"通过宝塔API重载Nginx失败: {str(e)}")
return False
def sign(key, msg):
"""计算腾讯云API签名"""
return hmac.new(key, msg.encode("utf-8"), hashlib.sha256).digest()
def get_tencent_teo_ips(secret_id, secret_key, zone_id):
"""从腾讯云TEO API同时获取IPv4和IPv6列表"""
service = "teo"
host = "teo.tencentcloudapi.com"
version = "2022-09-01"
action = "DescribeOriginACL"
payload = json.dumps({"ZoneId": zone_id})
algorithm = "TC3-HMAC-SHA256"
timestamp = int(time.time())
date = datetime.utcfromtimestamp(timestamp).strftime("%Y-%m-%d")
# 构建规范请求串
http_method = "POST"
canonical_uri = "/"
canonical_querystring = ""
ct = "application/json; charset=utf-8"
canonical_headers = f"content-type:{ct}\nhost:{host}\nx-tc-action:{action.lower()}\n"
signed_headers = "content-type;host;x-tc-action"
hashed_payload = hashlib.sha256(payload.encode()).hexdigest()
canonical_request = f"{http_method}\n{canonical_uri}\n{canonical_querystring}\n{canonical_headers}\n{signed_headers}\n{hashed_payload}"
# 构建签名串
credential_scope = f"{date}/{service}/tc3_request"
hashed_canonical = hashlib.sha256(canonical_request.encode()).hexdigest()
string_to_sign = f"{algorithm}\n{timestamp}\n{credential_scope}\n{hashed_canonical}"
# 计算签名
secret_date = sign(("TC3" + secret_key).encode(), date)
secret_service = sign(secret_date, service)
secret_signing = sign(secret_service, "tc3_request")
signature = hmac.new(secret_signing, string_to_sign.encode(), hashlib.sha256).hexdigest()
# 构建请求头
headers = {
"Authorization": f"{algorithm} Credential={secret_id}/{credential_scope}, SignedHeaders={signed_headers}, Signature={signature}",
"Content-Type": ct,
"Host": host,
"X-TC-Action": action,
"X-TC-Timestamp": str(timestamp),
"X-TC-Version": version
}
# 发起请求并解析
try:
conn = http.client.HTTPSConnection(host)
conn.request(http_method, "/", body=payload.encode(), headers=headers)
resp = conn.getresponse()
resp_data = json.loads(resp.read().decode('utf-8'))
conn.close()
# 同时提取IPv4和IPv6地址列表
entire_addresses = resp_data["Response"]["OriginACLInfo"]["CurrentOriginACL"]["EntireAddresses"]
return {
"ipv4": entire_addresses.get("IPv4", []),
"ipv6": entire_addresses.get("IPv6", [])
}
except Exception as e:
print(f"获取腾讯云IP列表失败: {str(e)}")
return None
def check_need_update(secret_id, secret_key, zone_id):
"""检查是否需要更新IP"""
payload = json.dumps({"ZoneId": zone_id})
resp = api_request(secret_id, secret_key, "ConfirmOriginACLUpdate", payload)
if not resp:
return True # 接口异常默认需要更新
return not ("Response" in resp and "Error" in resp["Response"] and
resp["Response"]["Error"].get("Code") == "OperationDenied.LatestVersionNow")
def api_request(secret_id, secret_key, action, payload, token=""):
"""腾讯云API通用请求函数"""
service = "teo"
host = "teo.tencentcloudapi.com"
region = ""
version = "2022-09-01"
algorithm = "TC3-HMAC-SHA256"
timestamp = int(time.time())
date = datetime.utcfromtimestamp(timestamp).strftime("%Y-%m-%d")
# 构建签名
http_method = "POST"
canonical_uri = "/"
canonical_querystring = ""
ct = "application/json; charset=utf-8"
canonical_headers = f"content-type:{ct}\nhost:{host}\nx-tc-action:{action.lower()}\n"
signed_headers = "content-type;host;x-tc-action"
hashed_payload = hashlib.sha256(payload.encode()).hexdigest()
canonical_request = f"{http_method}\n{canonical_uri}\n{canonical_querystring}\n{canonical_headers}\n{signed_headers}\n{hashed_payload}"
credential_scope = f"{date}/{service}/tc3_request"
hashed_canonical = hashlib.sha256(canonical_request.encode()).hexdigest()
string_to_sign = f"{algorithm}\n{timestamp}\n{credential_scope}\n{hashed_canonical}"
secret_date = sign(("TC3" + secret_key).encode(), date)
secret_service = sign(secret_date, service)
secret_signing = sign(secret_service, "tc3_request")
signature = hmac.new(secret_signing, string_to_sign.encode(), hashlib.sha256).hexdigest()
headers = {
"Authorization": f"{algorithm} Credential={secret_id}/{credential_scope}, SignedHeaders={signed_headers}, Signature={signature}",
"Content-Type": ct,
"Host": host,
"X-TC-Action": action,
"X-TC-Timestamp": str(timestamp),
"X-TC-Version": version
}
if region:
headers["X-TC-Region"] = region
if token:
headers["X-TC-Token"] = token
try:
conn = http.client.HTTPSConnection(host)
conn.request(http_method, "/", body=payload.encode(), headers=headers)
resp = conn.getresponse()
resp_data = json.loads(resp.read().decode('utf-8'))
conn.close()
return resp_data
except Exception as e:
print(f"API请求失败: {str(e)}")
return None
def main():
# 腾讯云密钥
secret_id = ""
secret_key = ""
# 初始化宝塔操作工具
bt = BtFileOperator()
# 步骤1:检查是否需要更新
if not check_need_update(secret_id, secret_key, ZONE_ID):
print("IP列表已是最新,无需更新")
return
# 步骤2:获取最新IP列表(同时包含IPv4和IPv6)
ip_data = get_tencent_teo_ips(secret_id, secret_key, ZONE_ID)
if not ip_data:
print("未获取到有效IP列表,终止操作")
return
# 统计IP数量
ipv4_count = len(ip_data["ipv4"])
ipv6_count = len(ip_data["ipv6"])
print(f"获取到 {ipv4_count} 条IPv4地址和 {ipv6_count} 条IPv6地址")
# 步骤3:生成配置内容(同时包含IPv4和IPv6)
config_content = ["real_ip_header X-Forwarded-For;\n"]
# 添加IPv4规则
if ipv4_count > 0:
config_content.append("\n# IPv4地址段\n")
config_content.extend([f"set_real_ip_from {ip};\n" for ip in ip_data["ipv4"]])
# 添加IPv6规则
if ipv6_count > 0:
config_content.append("\n# IPv6地址段\n")
config_content.extend([f"set_real_ip_from {ip};\n" for ip in ip_data["ipv6"]])
# 步骤4:通过宝塔API写入配置文件
print(f"准备写入配置文件: {CONFIG_FILE}")
write_result = bt.write_file(CONFIG_FILE, ''.join(config_content))
if not write_result or not write_result.get('status'):
print(f"文件写入失败: {write_result.get('msg', '未知错误')}")
return
# 步骤5:通过宝塔API重载Nginx
reload_result = bt.reload_nginx()
if reload_result:
print(f"配置更新完成,共写入 {ipv4_count + ipv6_count} 条IP规则,Nginx已重载")
else:
print(f"配置已写入,共 {ipv4_count + ipv6_count} 条IP规则,但Nginx重载失败,请手动重启")
if __name__ == "__main__":
main()
将上述代码设置cron一天运行一次即可实现自动更新回源ip到宝塔real-ip.conf可信ip配置