ios-hooks/AppRunMan/server/XUDPServer.m
2025-11-11 14:32:45 +08:00

622 lines
17 KiB
Objective-C

#import <Foundation/Foundation.h>
#import <sys/socket.h>
#import <netinet/in.h>
#import <arpa/inet.h>
#import "XUDPServer.h"
#import "UDPHandler.h"
#define FALLBACK_PORT_START 6001
#define FALLBACK_PORT_END 7000
#define PORT 6001
#define SEND_TIMEOUT 5.0
@interface XUDPServer() {
@private
GCDAsyncUdpSocket *serverSocket;
dispatch_queue_t serverQueue;
dispatch_source_t restartTimer;
dispatch_source_t healthCheckTimer; // ⭐️ 改用dispatch_source
NSUInteger restartAttempts;
uint16_t currentPort;
BOOL isStarting;
BOOL isStopping;
}
@property (nonatomic, strong) NSMutableDictionary<NSNumber *, NSDictionary *> *pendingSends;
@property (nonatomic, assign) long currentTag;
@end
@implementation XUDPServer
#pragma mark - Singleton
+ (instancetype)sharedInstance {
static XUDPServer* _sharedInstance = nil;
static dispatch_once_t oncePredicate;
dispatch_once(&oncePredicate, ^{
_sharedInstance = [[super allocWithZone:NULL] init];
});
return _sharedInstance;
}
+ (instancetype)allocWithZone:(struct _NSZone *)zone {
return [XUDPServer sharedInstance];
}
- (instancetype)init {
self = [super init];
if (self) {
restartAttempts = 0;
currentPort = PORT;
_currentTag = 0;
isStarting = NO;
isStopping = NO;
_pendingSends = [NSMutableDictionary dictionary];
serverQueue = dispatch_queue_create("com.xudpserver.queue", DISPATCH_QUEUE_SERIAL);
}
return self;
}
- (void)start {
dispatch_async(serverQueue, ^{
[self _startInternal];
});
}
- (uint16_t)udp_port {
__block uint16_t port = 0;
// ⭐️ 避免死锁,使用异步读取
if (dispatch_get_specific((__bridge const void *)serverQueue)) {
// 已在serverQueue中
port = currentPort;
} else {
dispatch_sync(serverQueue, ^{
port = self->currentPort;
});
}
return port;
}
- (void)_startInternal {
if (isStarting) {
NSLog(@"⚠️ Server is already starting");
return;
}
if (serverSocket && !serverSocket.isClosed) {
NSLog(@"⚠️ UDP server already running on port %d", currentPort);
return;
}
isStarting = YES;
NSLog(@"XS- Starting UDP server on port %d (PID: %d)", currentPort, getpid());
[self _forceCloseSocket];
// ⭐️ 使用dispatch_after代替usleep,避免阻塞队列
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(0.1 * NSEC_PER_SEC)),
serverQueue, ^{
[self _startInternalContinue];
});
}
- (void)_startInternalContinue {
serverSocket = [[GCDAsyncUdpSocket alloc] initWithDelegate:self
delegateQueue:serverQueue];
NSError *error = nil;
// ⭐️ 只使用IPv4
[serverSocket setIPv4Enabled:YES];
[serverSocket setIPv6Enabled:NO];
if (![serverSocket enableReusePort:YES error:&error]) {
NSLog(@"❌ Error enabling reuse port: %@", error);
}
if (![serverSocket bindToPort:currentPort error:&error]) {
NSLog(@"❌ Error binding to port %d: %@", currentPort, error);
[self _forceCloseSocket];
isStarting = NO;
if (error.code == 48) { // EADDRINUSE
NSLog(@"⚠️ Port %d is in use", currentPort);
// ⭐️ 异步清理,避免阻塞
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_LOW, 0), ^{
[self _cleanupZombieSockets];
});
[self _tryFallbackPorts];
return;
}
[self _scheduleRestartWithBackoff];
return;
}
[self _configureSocketOptions];
if (![serverSocket beginReceiving:&error]) {
NSLog(@"❌ Error starting server (recv): %@", error);
[self _forceCloseSocket];
isStarting = NO;
[self _scheduleRestartWithBackoff];
return;
}
isStarting = NO;
restartAttempts = 0;
[self _logSocketInfo];
NSLog(@"✅ UDP server started successfully on port %d", currentPort);
[self _startHealthCheck];
}
- (void)_forceCloseSocket {
if (serverSocket) {
int fd = [serverSocket socketFD];
if (fd != -1) {
// ⭐️ 非阻塞方式关闭
fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK);
struct linger lingerOption = {1, 0};
setsockopt(fd, SOL_SOCKET, SO_LINGER, &lingerOption, sizeof(lingerOption));
// ⭐️ shutdown可能阻塞,使用dispatch_async
int fdCopy = fd;
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_LOW, 0), ^{
shutdown(fdCopy, SHUT_RDWR);
});
}
[serverSocket close];
serverSocket = nil;
NSLog(@"🔒 Socket closed (fd: %d)", fd);
}
}
- (void)_cleanupZombieSockets {
NSLog(@"🧹 Cleaning up zombie sockets on port %d", currentPort);
int testSocket = socket(AF_INET, SOCK_DGRAM, 0);
if (testSocket < 0) {
return;
}
// ⭐️ 设置非阻塞
fcntl(testSocket, F_SETFL, fcntl(testSocket, F_GETFL, 0) | O_NONBLOCK);
int reuseAddr = 1;
setsockopt(testSocket, SOL_SOCKET, SO_REUSEADDR, &reuseAddr, sizeof(reuseAddr));
struct linger lingerOption = {1, 0};
setsockopt(testSocket, SOL_SOCKET, SO_LINGER, &lingerOption, sizeof(lingerOption));
struct sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = htons(currentPort);
addr.sin_addr.s_addr = INADDR_ANY;
if (bind(testSocket, (struct sockaddr *)&addr, sizeof(addr)) == 0) {
shutdown(testSocket, SHUT_RDWR);
NSLog(@"✅ Cleaned up zombie socket on port %d", currentPort);
}
close(testSocket);
}
- (void)_logSocketInfo {
if (!serverSocket) return;
int fd = [serverSocket socketFD];
if (fd == -1) return;
struct sockaddr_in addr;
socklen_t addrLen = sizeof(addr);
if (getsockname(fd, (struct sockaddr *)&addr, &addrLen) == 0) {
char ipStr[INET_ADDRSTRLEN];
inet_ntop(AF_INET, &addr.sin_addr, ipStr, sizeof(ipStr));
NSLog(@"📊 Socket - FD: %d, Addr: %s:%d, PID: %d",
fd, ipStr, ntohs(addr.sin_port), getpid());
}
int recvBuf, sendBuf;
socklen_t optLen = sizeof(int);
getsockopt(fd, SOL_SOCKET, SO_RCVBUF, &recvBuf, &optLen);
getsockopt(fd, SOL_SOCKET, SO_SNDBUF, &sendBuf, &optLen);
NSLog(@"📊 Buffers - Recv: %d, Send: %d", recvBuf, sendBuf);
}
- (void)_configureSocketOptions {
if (!serverSocket) return;
int fd = [serverSocket socketFD];
if (fd == -1) return;
// SO_REUSEADDR
int reuseAddr = 1;
setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuseAddr, sizeof(reuseAddr));
// 缓冲区
int recvBufferSize = 256 * 1024;
setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &recvBufferSize, sizeof(recvBufferSize));
int sendBufferSize = 256 * 1024;
setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &sendBufferSize, sizeof(sendBufferSize));
// SO_NOSIGPIPE
#ifdef SO_NOSIGPIPE
int noSigpipe = 1;
setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &noSigpipe, sizeof(noSigpipe));
#endif
// SO_LINGER
struct linger lingerOption = {1, 0};
setsockopt(fd, SOL_SOCKET, SO_LINGER, &lingerOption, sizeof(lingerOption));
// 非阻塞
int flags = fcntl(fd, F_GETFL, 0);
if (flags != -1) {
fcntl(fd, F_SETFL, flags | O_NONBLOCK);
}
}
- (void)stop {
dispatch_async(serverQueue, ^{
[self _stopInternal];
});
}
- (void)_stopInternal {
if (isStopping) {
NSLog(@"⚠️ Server is already stopping");
return;
}
isStopping = YES;
NSLog(@"XS- Stopping UDP server on port %d", currentPort);
[self _cancelRestartTimer];
[self _stopHealthCheck];
[self _forceCloseSocket];
[_pendingSends removeAllObjects];
// ⭐️ 使用dispatch_after代替usleep
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(0.1 * NSEC_PER_SEC)),
serverQueue, ^{
self->isStopping = NO;
NSLog(@"✅ UDP server stopped");
});
}
#pragma mark - Health Check
- (void)_startHealthCheck {
// ⭐️ 使用dispatch_source代替NSTimer,避免主线程依赖
if (healthCheckTimer) {
dispatch_source_cancel(healthCheckTimer);
healthCheckTimer = nil;
}
healthCheckTimer = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, serverQueue);
dispatch_source_set_timer(healthCheckTimer,
dispatch_time(DISPATCH_TIME_NOW, 30 * NSEC_PER_SEC),
30 * NSEC_PER_SEC,
1 * NSEC_PER_SEC);
dispatch_source_set_event_handler(healthCheckTimer, ^{
[self _performHealthCheck];
});
dispatch_resume(healthCheckTimer);
}
- (void)_stopHealthCheck {
if (healthCheckTimer) {
dispatch_source_cancel(healthCheckTimer);
healthCheckTimer = nil;
}
}
- (void)_performHealthCheck {
if (isStarting || isStopping) {
return;
}
if (!serverSocket || serverSocket.isClosed) {
NSLog(@"⚠️ Health check failed: socket is closed");
[self _startInternal];
return;
}
int fd = [serverSocket socketFD];
if (fd == -1) {
NSLog(@"⚠️ Health check failed: invalid socket");
[self _startInternal];
return;
}
struct sockaddr_in addr;
socklen_t addrLen = sizeof(addr);
if (getsockname(fd, (struct sockaddr *)&addr, &addrLen) == -1) {
NSLog(@"⚠️ Health check failed: socket not bound");
[self _startInternal];
return;
}
// ⭐️ 异步检查重复socket,避免阻塞
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_LOW, 0), ^{
[self _checkDuplicateSockets];
});
NSLog(@"✅ Health check passed (Port: %d, FD: %d)", currentPort, fd);
}
- (void)_checkDuplicateSockets {
// ⭐️ 设置超时,防止popen阻塞过久
NSString *command = [NSString stringWithFormat:@"timeout 2 lsof -i UDP:%d -n -P 2>/dev/null || echo timeout", currentPort];
FILE *pipe = popen([command UTF8String], "r");
if (!pipe) {
return;
}
char buffer[256];
int count = 0;
BOOL timedOut = NO;
while (fgets(buffer, sizeof(buffer), pipe) != NULL) {
if (strstr(buffer, "timeout") != NULL) {
timedOut = YES;
break;
}
count++;
}
pclose(pipe);
if (timedOut) {
NSLog(@"⚠️ Socket check timed out");
return;
}
count = MAX(0, count - 1);
if (count > 1) {
NSLog(@"⚠️ WARNING: Found %d sockets on port %d!", count, currentPort);
}
}
#pragma mark - Restart Logic
- (void)_cancelRestartTimer {
if (restartTimer) {
dispatch_source_cancel(restartTimer);
restartTimer = nil;
}
}
- (void)_scheduleRestartWithBackoff {
[self _cancelRestartTimer];
const NSUInteger maxAttempts = 10;
if (restartAttempts >= maxAttempts) {
NSLog(@"❌ Maximum restart attempts reached");
return;
}
restartAttempts++;
NSTimeInterval delay = MIN(pow(2, restartAttempts - 1), 60.0);
NSLog(@"⏰ Scheduling restart in %.1f seconds", delay);
restartTimer = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, serverQueue);
dispatch_source_set_timer(restartTimer,
dispatch_time(DISPATCH_TIME_NOW, delay * NSEC_PER_SEC),
DISPATCH_TIME_FOREVER,
0.1 * NSEC_PER_SEC);
dispatch_source_set_event_handler(restartTimer, ^{
[self _startInternal];
});
dispatch_resume(restartTimer);
}
- (void)_tryFallbackPorts {
NSLog(@"🔍 Searching for fallback port...");
// ⭐️ 限制扫描范围,避免卡死
uint16_t scanLimit = MIN(FALLBACK_PORT_END, FALLBACK_PORT_START + 100);
for (uint16_t port = FALLBACK_PORT_START; port <= scanLimit; port++) {
if ([self _isPortAvailable:port]) {
currentPort = port;
NSLog(@"✅ Found available port: %d", port);
[self _startInternal];
return;
}
}
NSLog(@"❌ No available fallback ports");
[self _scheduleRestartWithBackoff];
}
- (BOOL)_isPortAvailable:(uint16_t)port {
int testSocket = socket(AF_INET, SOCK_DGRAM, 0);
if (testSocket < 0) {
return NO;
}
// ⭐️ 设置非阻塞和超时
fcntl(testSocket, F_SETFL, fcntl(testSocket, F_GETFL, 0) | O_NONBLOCK);
struct timeval timeout = {1, 0}; // 1秒超时
setsockopt(testSocket, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof(timeout));
setsockopt(testSocket, SOL_SOCKET, SO_SNDTIMEO, &timeout, sizeof(timeout));
int reuseAddr = 1;
setsockopt(testSocket, SOL_SOCKET, SO_REUSEADDR, &reuseAddr, sizeof(reuseAddr));
struct sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = INADDR_ANY;
int result = bind(testSocket, (struct sockaddr *)&addr, sizeof(addr));
close(testSocket);
return (result == 0);
}
- (void)scheduleRestart {
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(5.0 * NSEC_PER_SEC)),
self->serverQueue, ^{ // 这里改为 serverQueue
[self start];
});
}
#pragma mark - GCDAsyncUdpSocket Delegate
- (void)udpSocket:(GCDAsyncUdpSocket *)sock didConnectToAddress:(NSData *)address {
NSLog(@"✅ Connected");
}
- (void)udpSocket:(GCDAsyncUdpSocket *)sock
didReceiveData:(NSData *)data
fromAddress:(NSData *)address
withFilterContext:(id)filterContext {
@autoreleasepool {
if (data.length > 65507) {
NSLog(@"⚠️ Oversized packet: %lu bytes", (unsigned long)data.length);
return;
}
NSString *datastr = [[NSString alloc] initWithData:data encoding:NSUTF8StringEncoding];
if (!datastr) {
NSLog(@"⚠️ Failed to decode data");
return;
}
// ⭐️ 异步处理,避免阻塞接收
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
@autoreleasepool {
UDPHandler *handle = [UDPHandler sharedInstance];
NSString *res = [handle handle:datastr];
if (res) {
[self _sendResponse:res toAddress:address fromSocket:sock];
}
}
});
}
}
- (void)_sendResponse:(NSString *)response
toAddress:(NSData *)address
fromSocket:(GCDAsyncUdpSocket *)sock {
dispatch_async(serverQueue, ^{
if (!sock || sock.isClosed) {
return;
}
NSData *responseData = [response dataUsingEncoding:NSUTF8StringEncoding];
if (!responseData || responseData.length > 65507) {
return;
}
long tag = ++self->_currentTag;
self->_pendingSends[@(tag)] = @{
@"response": response,
@"address": address,
@"timestamp": @([[NSDate date] timeIntervalSince1970])
};
[sock sendData:responseData
toAddress:address
withTimeout:SEND_TIMEOUT
tag:tag];
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)((SEND_TIMEOUT + 1.0) * NSEC_PER_SEC)),
self->serverQueue, ^{
[self _checkSendTimeout:tag];
});
});
}
- (void)_checkSendTimeout:(long)tag {
NSDictionary *pendingData = _pendingSends[@(tag)];
if (pendingData) {
[_pendingSends removeObjectForKey:@(tag)];
}
}
- (void)udpSocket:(GCDAsyncUdpSocket *)sock didNotConnect:(NSError *)error {
NSLog(@"❌ Did not connect: %@", error);
}
- (void)udpSocket:(GCDAsyncUdpSocket *)sock didSendDataWithTag:(long)tag {
[_pendingSends removeObjectForKey:@(tag)];
}
- (void)udpSocket:(GCDAsyncUdpSocket *)sock
didNotSendDataWithTag:(long)tag
dueToError:(NSError *)error {
[_pendingSends removeObjectForKey:@(tag)];
if (error.code == 57) { // ENOTCONN
[self _startInternal];
}
}
- (void)udpSocketDidClose:(GCDAsyncUdpSocket *)sock withError:(NSError *)error {
NSLog(@"⚠️ Socket closed: %@", error);
if (sock == serverSocket) {
serverSocket = nil;
}
if (error && !isStopping) {
[self scheduleRestart];
}
}
- (NSString *)_addressToString:(NSData *)addressData {
if (addressData.length < sizeof(struct sockaddr_in)) {
return @"unknown";
}
struct sockaddr_in *addr = (struct sockaddr_in *)addressData.bytes;
char ipStr[INET_ADDRSTRLEN];
inet_ntop(AF_INET, &addr->sin_addr, ipStr, sizeof(ipStr));
return [NSString stringWithFormat:@"%s:%d", ipStr, ntohs(addr->sin_port)];
}
- (void)dealloc {
[self _cancelRestartTimer];
[self _stopInternal];
}
@end