【实用github项目】Go语言开发的端口转发工具 for port data forward
本文最后更新于 2024-05-09,
若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益, 请联系我 删除。
本站只有Telegram群组为唯一交流群组, 点击加入
文章内容有误?申请成为本站文章修订者或作者? 向站长提出申请
官网地址: https://gitee.com/tavenli/port-forward
开发语言:GO
控制台框架:beego
数据库:sqlite3
最近更新
v1.3.6 发布,重新增加 TCP端口数据分发功能
v1.3.5 发布,增加批量导入、批量添加规则
v1.3.3 发布,增加 一键开启所有转发 和 一键关闭所有转发
v1.3.2 发布,服务稳定性已经过长时间的验证
v1.3.1 发布,增加程序启动自动开启转发
v1.2.9_beta 发布,完善点对点转发的稳定性,支持TCP和UDP协议转发
最新编译好的版本下载:
https://gitee.com/tavenli/port-forward/releases
功能介绍
支持Web控制台添加端口映射
支持对每条端口映射进行开启和关闭控制
支持 RestfulAPI 接口,方便被其它系统集成
支持每条端口转发的同时,再分发给多个端口,满足某些测试场景
类似企业交换机的功能,即软交换机,主要是方便企业网络维护人员或开发人员
快速安装说明
- 下载编译好的程序包,并解压程序包
- 执行 start.sh (Linux)或 start.bat (Win)命令
- 打开浏览器,进入控制台,打开 http://127.0.0.1:8080/login
- 输入用户 admin 密码 123456 进入控制台
转发中界面
WEB控制台
压测脚本(python3)
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed, TimeoutError
import time
def load_proxies(file_path):
""" 从指定文件加载代理列表,并为每个代理添加认证信息 """
with open(file_path, 'r') as file:
proxies = [line.strip() for line in file if line.strip()]
# 添加认证信息
proxies = [
f"http://统一用户:统一密码@{proxy}" for proxy in proxies
]
return proxies
def fetch_url(url, proxy):
""" 使用指定的代理访问 URL """
try:
response = requests.get(url, proxies={'http': proxy, 'https': proxy}, timeout=5)
return f"Success: {proxy}", response.status_code
except requests.exceptions.RequestException as e:
return f"Error: {proxy} {e}", None
def main():
start_time = time.time()
url = 'https://baidu.com'
proxies = load_proxies('/root/proxy.txt') # 加载代理
results = []
# 使用 ThreadPoolExecutor 管理并发请求
with ThreadPoolExecutor(max_workers=1000) as executor:
future_to_proxy = {executor.submit(fetch_url, url, proxy): proxy for proxy in proxies}
try:
for future in as_completed(future_to_proxy, timeout=5): # 设置总体超时时间
proxy = future_to_proxy[future]
try:
result = future.result()
results.append(result)
except Exception as exc:
results.append(f"Error with {proxy}: {str(exc)}")
except TimeoutError:
print("Some tasks did not complete in time")
# 打印结果
for result in results:
print(result)
print("Elapsed Time:", time.time() - start_time)
if __name__ == '__main__':
main()
该脚本用户测试并发1000条IP使用代理服务器获取baidu.com内容时的可用性。实测该软件可并发性能在300条左右。
定时替换IP池脚本(python3)
# coding:utf-8
import sqlite3
import random
def read_ips(filename, section):
"""
读取指定区域的 IP 地址列表。
"""
full_path = filename
with open(full_path, 'r') as file:
lines = file.readlines()
section_start = False
ips = []
for line in lines:
line = line.strip()
if line == section:
section_start = True
elif section_start and line.isalpha():
break
elif section_start and not line.isalpha():
ips.append(line)
return ips
def update_database(ips, dbname=r"D:\forward-server\data\data.db"):
"""
连接数据库并更新 `t_port_forward` 表中 name 为 "beijing" 的记录。
"""
conn = sqlite3.connect(dbname)
cursor = conn.cursor()
# 获取当前所有beijing条目的ID
cursor.execute("SELECT id FROM t_port_forward WHERE name = 'beijing'")
ids = cursor.fetchall()
# 检查是否有足够的 IP 地址
if len(ips) < len(ids):
print("Error: Not enough IPs for the number of 'beijing' entries in the database.")
return
# 打乱IP列表以随机分配
random.shuffle(ips)
# 准备更新操作
try:
for i, id_tuple in enumerate(ids):
ip = ips[i]
cursor.execute("UPDATE t_port_forward SET targetAddr = ? WHERE id = ?", (ip, id_tuple[0]))
conn.commit()
print(f"Updated targetAddr for all entries with name 'beijing'.")
except sqlite3.Error as e:
print(f"An error occurred: {e}")
finally:
cursor.close()
conn.close()
if __name__ == "__main__":
ips = read_ips(r"D:\forward-server\data\ip2.txt", "beijing")
if ips:
update_database(ips)
else:
print("Error: No IPs available for 'beijing'. Check the 'ip.txt' file and ensure it is formatted correctly.")
数据库使用sqlite3存储信息,可以使用该脚本读取ip.txt内的ip池信息,用于定时自动替换软件的ip,达到按时刷新ip的效果。
定时启动脚本(SHELL)
cd D:\forward-server
taskkill /F /IM forward-server.exe
python proxy.py
taskkill /F /IM forward-server.exe
start D:\forward-server\start.bat
定时任务选择该脚本,会先杀掉原线的进程。然后执行IP替换任务,最后自动启动程序做IP端口转发工作。
评论
隐私政策
你无需删除空行,直接评论以获取最佳展示效果