news 2026/6/23 19:18:44

anyrouter CDN 测速工具 v2.1 - 测试多个服务地址的连接速度

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
anyrouter CDN 测速工具 v2.1 - 测试多个服务地址的连接速度
#!/usr/bin/env python3 """ CDN 测速工具 v2.1 - 测试多个服务地址的连接速度 支持: HTTP延迟、TCP Ping、Traceroute、下载速度测试、JSON导出 """ import urllib.request import urllib.error import time import ssl import socket import statistics import subprocess import struct import sys import json import os from datetime import datetime from urllib.parse import urlparse from concurrent.futures import ThreadPoolExecutor, as_completed # 测试地址列表 ENDPOINTS = [ { "name": "大陆优化CDN", "url": "https://c.cspok.cn", "desc": "针对中国大陆地区优化的后端服务" }, { "name": "主站直连", "url": "https://anyrouter.top", "desc": "主站后端直连服务" }, { "name": "大陆网络优化-宁波", "url": "https://pmpjfbhq.cn-nb1.rainapp.top", "desc": "针对中国大陆地区优化的后端服务" }, { "name": "大陆网络优化-上海", "url": "https://a-ocnfniawgw.cn-shanghai.fcapp.run", "desc": "针对中国大陆地区优化的后端服务" } ] # 测试参数 TEST_COUNT = 3 TCP_PING_COUNT = 5 TIMEOUT = 10 TRACEROUTE_MAX_HOPS = 20 def create_ssl_context(): """创建SSL上下文""" ctx = ssl.create_default_context() ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE return ctx def parse_url(url): """解析URL获取主机名和端口""" parsed = urlparse(url) host = parsed.hostname port = parsed.port or (443 if parsed.scheme == 'https' else 80) return host, port def resolve_host(hostname): """DNS解析""" try: ip = socket.gethostbyname(hostname) return {"success": True, "ip": ip} except socket.gaierror as e: return {"success": False, "error": str(e)} def tcp_ping(host, port, timeout=5): """TCP Ping - 测试TCP连接延迟""" try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(timeout) start = time.time() sock.connect((host, port)) elapsed = (time.time() - start) * 1000 # ms sock.close() return {"success": True, "latency": elapsed} except socket.timeout: return {"success": False, "error": "连接超时"} except socket.error as e: return {"success": False, "error": str(e)} def tcp_ping_test(host, port, count=TCP_PING_COUNT): """执行多次TCP Ping测试""" results = [] for i in range(count): result = tcp_ping(host, port) results.append(result) time.sleep(0.1) # 短暂间隔 return results def traceroute_tcp(host, port=443, max_hops=TRACEROUTE_MAX_HOPS, timeout=2): """ TCP Traceroute 实现 使用递增的TTL值来追踪路由路径 """ results = [] # 先解析目标IP try: dest_ip = socket.gethostbyname(host) except socket.gaierror: return [{"hop": 1, "error": "DNS解析失败"}] print(f" 追踪路由到 {host} ({dest_ip}), 最大 {max_hops} 跳:") print(f" {'跳数':<4} {'IP地址':<18} {'延迟':<12} {'主机名'}") print(" " + "-" * 55) for ttl in range(1, max_hops + 1): # 创建原始socket(需要root权限)或使用UDP hop_result = {"hop": ttl, "ip": None, "latency": None, "hostname": None} try: # 使用UDP进行traceroute(不需要root) recv_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP) recv_socket.settimeout(timeout) send_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) send_socket.setsockopt(socket.SOL_IP, socket.IP_TTL, ttl) start = time.time() send_socket.sendto(b'', (dest_ip, 33434 + ttl)) try: data, addr = recv_socket.recvfrom(1024) elapsed = (time.time() - start) * 1000 hop_ip = addr[0] # 尝试反向DNS解析 try: hostname = socket.gethostbyaddr(hop_ip)[0] except: hostname = hop_ip hop_result["ip"] = hop_ip hop_result["latency"] = elapsed hop_result["hostname"] = hostname print(f" {ttl:<4} {hop_ip:<18} {elapsed:.2f} ms {hostname}") # 如果到达目标 if hop_ip == dest_ip: results.append(hop_result) break except socket.timeout: hop_result["error"] = "超时" print(f" {ttl:<4} {'*':<18} {'*':<12} 请求超时") recv_socket.close() send_socket.close() except PermissionError: # 没有root权限,使用系统traceroute命令 return traceroute_system(host, max_hops) except Exception as e: hop_result["error"] = str(e) # 尝试使用系统命令 return traceroute_system(host, max_hops) results.append(hop_result) return results def traceroute_system(host, max_hops=TRACEROUTE_MAX_HOPS): """使用系统traceroute命令""" results = [] # 检测可用的traceroute命令 traceroute_cmd = None for cmd in ['traceroute', 'tracepath', 'mtr']: try: subprocess.run([cmd, '--version'], capture_output=True, timeout=2) traceroute_cmd = cmd break except: continue if not traceroute_cmd: print(" 未找到traceroute工具,尝试安装...") return [{"hop": 0, "error": "未找到traceroute工具"}] try: if traceroute_cmd == 'traceroute': cmd = ['traceroute', '-n', '-m', str(max_hops), '-w', '2', host] elif traceroute_cmd == 'tracepath': cmd = ['tracepath', '-n', '-m', str(max_hops), host] else: # mtr cmd = ['mtr', '-n', '-c', '1', '--report', host] print(f" 执行: {' '.join(cmd)}") print() process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) # 实时输出 for line in process.stdout: print(f" {line.rstrip()}") # 解析输出 parts = line.split() if parts and parts[0].isdigit(): hop_num = int(parts[0]) results.append({"hop": hop_num, "raw": line.strip()}) process.wait(timeout=60) except subprocess.TimeoutExpired: print(" traceroute 超时") except Exception as e: print(f" traceroute 错误: {e}") return results def traceroute_simple(host, max_hops=TRACEROUTE_MAX_HOPS): """ 简化版TCP traceroute,使用connect超时方式 """ results = [] try: dest_ip = socket.gethostbyname(host) except socket.gaierror: return [{"hop": 1, "error": "DNS解析失败"}] print(f" 追踪到 {host} ({dest_ip}):") print(f" {'跳数':<4} {'状态':<15} {'延迟'}") print(" " + "-" * 40) # 尝试直接连接测试 for ttl in range(1, max_hops + 1): try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setsockopt(socket.IPPROTO_IP, socket.IP_TTL, ttl) sock.settimeout(2) start = time.time() try: sock.connect((dest_ip, 443)) elapsed = (time.time() - start) * 1000 print(f" {ttl:<4} {'到达目标':<15} {elapsed:.2f} ms") results.append({"hop": ttl, "status": "到达", "latency": elapsed}) sock.close() break except socket.timeout: print(f" {ttl:<4} {'超时':<15} *") results.append({"hop": ttl, "status": "超时"}) except socket.error as e: elapsed = (time.time() - start) * 1000 if e.errno == 111: # Connection refused - 通常表示到达 print(f" {ttl:<4} {'中间节点':<15} {elapsed:.2f} ms") results.append({"hop": ttl, "status": "中间节点", "latency": elapsed}) else: print(f" {ttl:<4} {'TTL过期':<15} {elapsed:.2f} ms") results.append({"hop": ttl, "status": "TTL过期", "latency": elapsed}) sock.close() except Exception as e: print(f" {ttl:<4} 错误: {e}") results.append({"hop": ttl, "error": str(e)}) return results def test_latency(url, timeout=TIMEOUT): """测试HTTP延迟""" ctx = create_ssl_context() start = time.time() try: req = urllib.request.Request(url, method='HEAD') req.add_header('User-Agent', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36') with urllib.request.urlopen(req, timeout=timeout, context=ctx) as response: status = response.status elapsed = (time.time() - start) * 1000 return {"success": True, "latency": elapsed, "status": status} except urllib.error.HTTPError as e: elapsed = (time.time() - start) * 1000 return {"success": True, "latency": elapsed, "status": e.code} except Exception as e: return {"success": False, "error": str(e)} def test_download_speed(url, timeout=TIMEOUT): """测试下载速度""" ctx = create_ssl_context() start = time.time() try: req = urllib.request.Request(url) req.add_header('User-Agent', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36') with urllib.request.urlopen(req, timeout=timeout, context=ctx) as response: data = response.read() size = len(data) elapsed = time.time() - start if elapsed > 0: speed = size / elapsed return {"success": True, "size": size, "time": elapsed, "speed": speed} return {"success": True, "size": size, "time": 0, "speed": 0} except Exception as e: return {"success": False, "error": str(e)} def format_speed(bps): """格式化速度""" if bps >= 1024 * 1024: return f"{bps / (1024 * 1024):.2f} MB/s" elif bps >= 1024: return f"{bps / 1024:.2f} KB/s" else: return f"{bps:.2f} B/s" def format_size(bytes_size): """格式化大小""" if bytes_size >= 1024 * 1024: return f"{bytes_size / (1024 * 1024):.2f} MB" elif bytes_size >= 1024: return f"{bytes_size / 1024:.2f} KB" else: return f"{bytes_size} B" def test_endpoint(endpoint, enable_traceroute=True): """测试单个端点""" name = endpoint["name"] url = endpoint["url"] desc = endpoint["desc"] host, port = parse_url(url) print(f"\n{'='*65}") print(f"📡 {name}") print(f" URL: {url}") print(f" 描述: {desc}") print(f"{'='*65}") # 结果数据结构 result_data = { "name": name, "url": url, "description": desc, "host": host, "port": port, "test_time": datetime.now().isoformat(), "dns": {}, "tcp_ping": {}, "traceroute": [], "http": {}, "download": {}, "summary": {} } # DNS 解析 print(f"\n🔍 DNS解析:") dns_result = resolve_host(host) if dns_result["success"]: ip = dns_result["ip"] print(f" {host} -> {ip}") result_data["dns"] = {"success": True, "ip": ip, "hostname": host} else: print(f" ❌ DNS解析失败: {dns_result['error']}") result_data["dns"] = {"success": False, "error": dns_result['error']} result_data["summary"] = { "avg_latency": None, "tcp_latency": None, "speed": None, "status": "DNS解析失败" } return result_data # TCP Ping 测试 print(f"\n🏓 TCP Ping 测试 (端口 {port}, 共{TCP_PING_COUNT}次):") tcp_results = tcp_ping_test(ip, port, TCP_PING_COUNT) tcp_latencies = [] tcp_details = [] for i, result in enumerate(tcp_results, 1): if result["success"]: lat = result["latency"] tcp_latencies.append(lat) tcp_details.append({"seq": i, "latency_ms": round(lat, 2), "success": True}) print(f" 第{i}次: {lat:.2f} ms") else: tcp_details.append({"seq": i, "success": False, "error": result['error']}) print(f" 第{i}次: 失败 - {result['error']}") result_data["tcp_ping"]["details"] = tcp_details if tcp_latencies: tcp_avg = statistics.mean(tcp_latencies) tcp_min = min(tcp_latencies) tcp_max = max(tcp_latencies) tcp_jitter = statistics.stdev(tcp_latencies) if len(tcp_latencies) > 1 else 0 tcp_loss = (TCP_PING_COUNT - len(tcp_latencies)) / TCP_PING_COUNT * 100 print(f"\n 📊 TCP Ping 统计:") print(f" 平均: {tcp_avg:.2f} ms") print(f" 最小: {tcp_min:.2f} ms") print(f" 最大: {tcp_max:.2f} ms") print(f" 抖动: {tcp_jitter:.2f} ms") print(f" 丢包: {tcp_loss:.1f}%") result_data["tcp_ping"]["statistics"] = { "avg_ms": round(tcp_avg, 2), "min_ms": round(tcp_min, 2), "max_ms": round(tcp_max, 2), "jitter_ms": round(tcp_jitter, 2), "packet_loss_percent": round(tcp_loss, 1), "success_count": len(tcp_latencies), "total_count": TCP_PING_COUNT } else: tcp_avg = None print(f"\n ❌ TCP Ping 全部失败") result_data["tcp_ping"]["statistics"] = {"success": False, "packet_loss_percent": 100} # Traceroute 测试 if enable_traceroute: print(f"\n🛤️ Traceroute 测试:") trace_results = traceroute_simple_with_data(ip, max_hops=15) result_data["traceroute"] = trace_results # HTTP 延迟测试 print(f"\n⏱️ HTTP 延迟测试 (共{TEST_COUNT}次):") latencies = [] http_details = [] http_status = None for i in range(TEST_COUNT): result = test_latency(url) if result["success"]: latency = result["latency"] latencies.append(latency) http_status = result.get("status", "N/A") http_details.append({"seq": i+1, "latency_ms": round(latency, 2), "status": http_status, "success": True}) print(f" 第{i+1}次: {latency:.2f} ms (HTTP {http_status})") else: http_details.append({"seq": i+1, "success": False, "error": result['error']}) print(f" 第{i+1}次: 失败 - {result['error']}") result_data["http"]["details"] = http_details result_data["http"]["status_code"] = http_status if latencies: avg_latency = statistics.mean(latencies) min_latency = min(latencies) max_latency = max(latencies) print(f"\n 📊 HTTP延迟统计:") print(f" 平均: {avg_latency:.2f} ms") print(f" 最小: {min_latency:.2f} ms") print(f" 最大: {max_latency:.2f} ms") result_data["http"]["statistics"] = { "avg_ms": round(avg_latency, 2), "min_ms": round(min_latency, 2), "max_ms": round(max_latency, 2), "success_count": len(latencies), "total_count": TEST_COUNT } else: avg_latency = None print(f"\n ❌ HTTP延迟测试失败") result_data["http"]["statistics"] = {"success": False} # 下载测试 print(f"\n📥 下载测试:") dl_result = test_download_speed(url) if dl_result["success"]: size = dl_result["size"] dl_time = dl_result["time"] speed = dl_result["speed"] print(f" 下载大小: {format_size(size)}") print(f" 下载用时: {dl_time:.3f} 秒") print(f" 下载速度: {format_speed(speed)}") result_data["download"] = { "success": True, "size_bytes": size, "time_seconds": round(dl_time, 3), "speed_bps": round(speed, 2), "speed_formatted": format_speed(speed) } else: speed = None print(f" ❌ 下载失败 - {dl_result['error']}") result_data["download"] = {"success": False, "error": dl_result['error']} # 汇总 result_data["summary"] = { "avg_latency": round(avg_latency, 2) if avg_latency else None, "tcp_latency": round(tcp_avg, 2) if tcp_avg else None, "speed": round(speed, 2) if speed else None, "status": "可用" if tcp_avg else "不可用" } return result_data def traceroute_simple_with_data(host, max_hops=TRACEROUTE_MAX_HOPS): """简化版TCP traceroute,返回数据""" results = [] try: dest_ip = socket.gethostbyname(host) except socket.gaierror: return [{"hop": 1, "error": "DNS解析失败"}] print(f" 追踪到 {host} ({dest_ip}):") print(f" {'跳数':<4} {'状态':<15} {'延迟'}") print(" " + "-" * 40) for ttl in range(1, max_hops + 1): hop_data = {"hop": ttl} try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setsockopt(socket.IPPROTO_IP, socket.IP_TTL, ttl) sock.settimeout(2) start = time.time() try: sock.connect((dest_ip, 443)) elapsed = (time.time() - start) * 1000 print(f" {ttl:<4} {'到达目标':<15} {elapsed:.2f} ms") hop_data.update({"status": "到达目标", "latency_ms": round(elapsed, 2), "reached": True}) results.append(hop_data) sock.close() break except socket.timeout: print(f" {ttl:<4} {'超时':<15} *") hop_data.update({"status": "超时", "timeout": True}) except socket.error as e: elapsed = (time.time() - start) * 1000 if e.errno == 111: print(f" {ttl:<4} {'中间节点':<15} {elapsed:.2f} ms") hop_data.update({"status": "中间节点", "latency_ms": round(elapsed, 2)}) else: print(f" {ttl:<4} {'TTL过期':<15} {elapsed:.2f} ms") hop_data.update({"status": "TTL过期", "latency_ms": round(elapsed, 2)}) sock.close() except Exception as e: print(f" {ttl:<4} 错误: {e}") hop_data.update({"error": str(e)}) results.append(hop_data) return results def print_summary(results): """打印汇总结果""" print("\n\n" + "="*70) print("📈 测速结果汇总") print("="*70) # 按TCP延迟排序 valid_results = [r for r in results if r["summary"].get("tcp_latency") is not None] valid_results.sort(key=lambda x: x["summary"]["tcp_latency"]) print(f"\n{'排名':<4} {'名称':<22} {'TCP延迟':<12} {'HTTP延迟':<12} {'速度':<12}") print("-" * 70) for i, r in enumerate(valid_results, 1): tcp_str = f"{r['summary']['tcp_latency']:.2f} ms" if r['summary']['tcp_latency'] else "N/A" http_str = f"{r['summary']['avg_latency']:.2f} ms" if r['summary']['avg_latency'] else "N/A" speed_str = format_speed(r['summary']['speed']) if r['summary']['speed'] else "N/A" print(f"{i:<4} {r['name']:<22} {tcp_str:<12} {http_str:<12} {speed_str:<12}") # 显示失败的 failed = [r for r in results if r["summary"].get("tcp_latency") is None] if failed: print("\n❌ 无法连接的节点:") for r in failed: print(f" - {r['name']} ({r['url']})") # 推荐 if valid_results: best = valid_results[0] print(f"\n" + "="*70) print(f"✅ 推荐使用: {best['name']}") print(f" URL: {best['url']}") if best['summary']['tcp_latency']: print(f" TCP延迟: {best['summary']['tcp_latency']:.2f} ms") if best['summary']['avg_latency']: print(f" HTTP延迟: {best['summary']['avg_latency']:.2f} ms") def export_json(results, output_file=None): """导出测试结果到JSON文件""" if output_file is None: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_file = f"speedtest_result_{timestamp}.json" # 构建完整报告 report = { "report_info": { "tool": "CDN 测速工具", "version": "2.1", "generated_at": datetime.now().isoformat(), "test_parameters": { "tcp_ping_count": TCP_PING_COUNT, "http_test_count": TEST_COUNT, "timeout_seconds": TIMEOUT, "traceroute_max_hops": TRACEROUTE_MAX_HOPS } }, "endpoints": results, "ranking": [], "recommendation": None } # 生成排名 valid_results = [r for r in results if r["summary"].get("tcp_latency") is not None] valid_results.sort(key=lambda x: x["summary"]["tcp_latency"]) for i, r in enumerate(valid_results, 1): report["ranking"].append({ "rank": i, "name": r["name"], "url": r["url"], "tcp_latency_ms": r["summary"]["tcp_latency"], "http_latency_ms": r["summary"]["avg_latency"], "speed_bps": r["summary"]["speed"] }) # 推荐 if valid_results: best = valid_results[0] report["recommendation"] = { "name": best["name"], "url": best["url"], "tcp_latency_ms": best["summary"]["tcp_latency"], "http_latency_ms": best["summary"]["avg_latency"], "reason": "TCP延迟最低" } # 写入文件 with open(output_file, 'w', encoding='utf-8') as f: json.dump(report, f, ensure_ascii=False, indent=2) return output_file def main(): import argparse parser = argparse.ArgumentParser(description='CDN 测速工具 v2.1') parser.add_argument('--no-traceroute', '-t', action='store_true', help='禁用 traceroute 测试') parser.add_argument('--quick', '-q', action='store_true', help='快速模式(减少测试次数)') parser.add_argument('--json', '-j', nargs='?', const='auto', default=None, metavar='FILE', help='导出结果到JSON文件(不指定文件名则自动生成)') parser.add_argument('--output', '-o', type=str, default=None, help='指定JSON输出文件路径') args = parser.parse_args() global TEST_COUNT, TCP_PING_COUNT if args.quick: TEST_COUNT = 1 TCP_PING_COUNT = 3 print("\n" + "="*70) print(" 🚀 CDN 测速工具 v2.1") print(" 支持: TCP Ping | Traceroute | HTTP延迟 | 下载速度 | JSON导出") print(" 测试中国大陆优化节点") print("="*70) results = [] enable_traceroute = not args.no_traceroute for endpoint in ENDPOINTS: result = test_endpoint(endpoint, enable_traceroute=enable_traceroute) results.append(result) print_summary(results) # 导出JSON json_file = args.output or args.json if json_file: if json_file == 'auto': json_file = None # 让export_json自动生成文件名 output_path = export_json(results, json_file) print(f"\n📄 测试结果已导出到: {output_path}") print("\n" + "="*70) print("测速完成!") print("="*70 + "\n") if __name__ == "__main__": main()
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/6/23 19:53:53

7、Windows应用开发中的用户界面控件使用指南

Windows应用开发中的用户界面控件使用指南 在Windows应用开发中,合理选择和使用各种用户界面控件对于提供良好的用户体验至关重要。以下将详细介绍一些常用控件的使用方法和最佳实践。 1. CheckBox和ToggleSwitch 使用场景区分 根据Windows Store应用指南,如果操作代表状…

作者头像 李华
网站建设 2026/6/23 19:53:28

18、Windows 应用数据管理全解析

Windows 应用数据管理全解析 1. 本地数据存储 在 Windows 应用开发中,API 允许存储数据和文件。数据存储在容器中,容器可根据需要创建,最多可嵌套 32 层,有助于隐藏和简化对系统注册表的访问。 以下是使用 LocalSettings 对象检索或存储本地信息的代码示例: C# 代码…

作者头像 李华
网站建设 2026/6/22 21:33:29

汇编语言全接触-34.RichEdit 控件:更多的正文操作

你将会了解到关于的RichEdit更多的正文操作。特别是你将会学习到如何搜索/替换正文&#xff0c;定位到某一指定的行号。 下载 例子程序. Theory Searching for Text RichEdit 控件具有几种正文操作&#xff0c;搜索指定正文就是其中的一种。搜索正文是通过发送 EM_FINDTEXT…

作者头像 李华
网站建设 2026/6/23 20:10:35

汇编语言全接触-35.RichEdit 控件:语法高亮显示

在读这篇教程之前先提醒你&#xff0c;这是一个复杂的主题&#xff1a;不适合初学者。这是最后一篇RichEdit 控件教程。 下载 例子程序. Theory 语法高亮显示对那些编写文本编辑器的人来说是一个热点主题。最好的解决方法&#xff08;我自己认为的&#xff09;是编写一个定制…

作者头像 李华
网站建设 2026/6/23 20:10:35

自养号测评:跳出“隐形工具”定位,筑牢品牌增长核心基建

在亚马逊的竞争生态中&#xff0c;早期评论与订单对新品至关重要&#xff0c;当外部测评风险与日俱增&#xff0c;一种更为自主的模式——“自养号测评”&#xff0c;正成为深度运营者的战略选择。它绝非简单的刷评&#xff0c;而是一项融合了技术、数据与合规管理的系统工程。…

作者头像 李华