#import #import #import #import #import "XUDPServer.h" #import "UDPHandler.h" #define FALLBACK_PORT_START 6001 #define FALLBACK_PORT_END 7000 #define PORT 6001 #define SEND_TIMEOUT 5.0 @interface XUDPServer() { @private GCDAsyncUdpSocket *serverSocket; dispatch_queue_t serverQueue; dispatch_source_t restartTimer; dispatch_source_t healthCheckTimer; // ⭐️ 改用dispatch_source NSUInteger restartAttempts; uint16_t currentPort; BOOL isStarting; BOOL isStopping; } @property (nonatomic, strong) NSMutableDictionary *pendingSends; @property (nonatomic, assign) long currentTag; @end @implementation XUDPServer #pragma mark - Singleton + (instancetype)sharedInstance { static XUDPServer* _sharedInstance = nil; static dispatch_once_t oncePredicate; dispatch_once(&oncePredicate, ^{ _sharedInstance = [[super allocWithZone:NULL] init]; }); return _sharedInstance; } + (instancetype)allocWithZone:(struct _NSZone *)zone { return [XUDPServer sharedInstance]; } - (instancetype)init { self = [super init]; if (self) { restartAttempts = 0; currentPort = PORT; _currentTag = 0; isStarting = NO; isStopping = NO; _pendingSends = [NSMutableDictionary dictionary]; serverQueue = dispatch_queue_create("com.xudpserver.queue", DISPATCH_QUEUE_SERIAL); } return self; } - (void)start { dispatch_async(serverQueue, ^{ [self _startInternal]; }); } - (uint16_t)udp_port { __block uint16_t port = 0; // ⭐️ 避免死锁,使用异步读取 if (dispatch_get_specific((__bridge const void *)serverQueue)) { // 已在serverQueue中 port = currentPort; } else { dispatch_sync(serverQueue, ^{ port = self->currentPort; }); } return port; } - (void)_startInternal { if (isStarting) { NSLog(@"⚠️ Server is already starting"); return; } if (serverSocket && !serverSocket.isClosed) { NSLog(@"⚠️ UDP server already running on port %d", currentPort); return; } isStarting = YES; NSLog(@"XS- Starting UDP server on port %d (PID: %d)", currentPort, getpid()); [self _forceCloseSocket]; // ⭐️ 使用dispatch_after代替usleep,避免阻塞队列 dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(0.1 * NSEC_PER_SEC)), serverQueue, ^{ [self _startInternalContinue]; }); } - (void)_startInternalContinue { serverSocket = [[GCDAsyncUdpSocket alloc] initWithDelegate:self delegateQueue:serverQueue]; NSError *error = nil; // ⭐️ 只使用IPv4 [serverSocket setIPv4Enabled:YES]; [serverSocket setIPv6Enabled:NO]; if (![serverSocket enableReusePort:YES error:&error]) { NSLog(@"❌ Error enabling reuse port: %@", error); } if (![serverSocket bindToPort:currentPort error:&error]) { NSLog(@"❌ Error binding to port %d: %@", currentPort, error); [self _forceCloseSocket]; isStarting = NO; if (error.code == 48) { // EADDRINUSE NSLog(@"⚠️ Port %d is in use", currentPort); // ⭐️ 异步清理,避免阻塞 dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_LOW, 0), ^{ [self _cleanupZombieSockets]; }); [self _tryFallbackPorts]; return; } [self _scheduleRestartWithBackoff]; return; } [self _configureSocketOptions]; if (![serverSocket beginReceiving:&error]) { NSLog(@"❌ Error starting server (recv): %@", error); [self _forceCloseSocket]; isStarting = NO; [self _scheduleRestartWithBackoff]; return; } isStarting = NO; restartAttempts = 0; [self _logSocketInfo]; NSLog(@"✅ UDP server started successfully on port %d", currentPort); [self _startHealthCheck]; } - (void)_forceCloseSocket { if (serverSocket) { int fd = [serverSocket socketFD]; if (fd != -1) { // ⭐️ 非阻塞方式关闭 fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK); struct linger lingerOption = {1, 0}; setsockopt(fd, SOL_SOCKET, SO_LINGER, &lingerOption, sizeof(lingerOption)); // ⭐️ shutdown可能阻塞,使用dispatch_async int fdCopy = fd; dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_LOW, 0), ^{ shutdown(fdCopy, SHUT_RDWR); }); } [serverSocket close]; serverSocket = nil; NSLog(@"🔒 Socket closed (fd: %d)", fd); } } - (void)_cleanupZombieSockets { NSLog(@"🧹 Cleaning up zombie sockets on port %d", currentPort); int testSocket = socket(AF_INET, SOCK_DGRAM, 0); if (testSocket < 0) { return; } // ⭐️ 设置非阻塞 fcntl(testSocket, F_SETFL, fcntl(testSocket, F_GETFL, 0) | O_NONBLOCK); int reuseAddr = 1; setsockopt(testSocket, SOL_SOCKET, SO_REUSEADDR, &reuseAddr, sizeof(reuseAddr)); struct linger lingerOption = {1, 0}; setsockopt(testSocket, SOL_SOCKET, SO_LINGER, &lingerOption, sizeof(lingerOption)); struct sockaddr_in addr; memset(&addr, 0, sizeof(addr)); addr.sin_family = AF_INET; addr.sin_port = htons(currentPort); addr.sin_addr.s_addr = INADDR_ANY; if (bind(testSocket, (struct sockaddr *)&addr, sizeof(addr)) == 0) { shutdown(testSocket, SHUT_RDWR); NSLog(@"✅ Cleaned up zombie socket on port %d", currentPort); } close(testSocket); } - (void)_logSocketInfo { if (!serverSocket) return; int fd = [serverSocket socketFD]; if (fd == -1) return; struct sockaddr_in addr; socklen_t addrLen = sizeof(addr); if (getsockname(fd, (struct sockaddr *)&addr, &addrLen) == 0) { char ipStr[INET_ADDRSTRLEN]; inet_ntop(AF_INET, &addr.sin_addr, ipStr, sizeof(ipStr)); NSLog(@"📊 Socket - FD: %d, Addr: %s:%d, PID: %d", fd, ipStr, ntohs(addr.sin_port), getpid()); } int recvBuf, sendBuf; socklen_t optLen = sizeof(int); getsockopt(fd, SOL_SOCKET, SO_RCVBUF, &recvBuf, &optLen); getsockopt(fd, SOL_SOCKET, SO_SNDBUF, &sendBuf, &optLen); NSLog(@"📊 Buffers - Recv: %d, Send: %d", recvBuf, sendBuf); } - (void)_configureSocketOptions { if (!serverSocket) return; int fd = [serverSocket socketFD]; if (fd == -1) return; // SO_REUSEADDR int reuseAddr = 1; setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuseAddr, sizeof(reuseAddr)); // 缓冲区 int recvBufferSize = 256 * 1024; setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &recvBufferSize, sizeof(recvBufferSize)); int sendBufferSize = 256 * 1024; setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &sendBufferSize, sizeof(sendBufferSize)); // SO_NOSIGPIPE #ifdef SO_NOSIGPIPE int noSigpipe = 1; setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &noSigpipe, sizeof(noSigpipe)); #endif // SO_LINGER struct linger lingerOption = {1, 0}; setsockopt(fd, SOL_SOCKET, SO_LINGER, &lingerOption, sizeof(lingerOption)); // 非阻塞 int flags = fcntl(fd, F_GETFL, 0); if (flags != -1) { fcntl(fd, F_SETFL, flags | O_NONBLOCK); } } - (void)stop { dispatch_async(serverQueue, ^{ [self _stopInternal]; }); } - (void)_stopInternal { if (isStopping) { NSLog(@"⚠️ Server is already stopping"); return; } isStopping = YES; NSLog(@"XS- Stopping UDP server on port %d", currentPort); [self _cancelRestartTimer]; [self _stopHealthCheck]; [self _forceCloseSocket]; [_pendingSends removeAllObjects]; // ⭐️ 使用dispatch_after代替usleep dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(0.1 * NSEC_PER_SEC)), serverQueue, ^{ self->isStopping = NO; NSLog(@"✅ UDP server stopped"); }); } #pragma mark - Health Check - (void)_startHealthCheck { // ⭐️ 使用dispatch_source代替NSTimer,避免主线程依赖 if (healthCheckTimer) { dispatch_source_cancel(healthCheckTimer); healthCheckTimer = nil; } healthCheckTimer = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, serverQueue); dispatch_source_set_timer(healthCheckTimer, dispatch_time(DISPATCH_TIME_NOW, 30 * NSEC_PER_SEC), 30 * NSEC_PER_SEC, 1 * NSEC_PER_SEC); dispatch_source_set_event_handler(healthCheckTimer, ^{ [self _performHealthCheck]; }); dispatch_resume(healthCheckTimer); } - (void)_stopHealthCheck { if (healthCheckTimer) { dispatch_source_cancel(healthCheckTimer); healthCheckTimer = nil; } } - (void)_performHealthCheck { if (isStarting || isStopping) { return; } if (!serverSocket || serverSocket.isClosed) { NSLog(@"⚠️ Health check failed: socket is closed"); [self _startInternal]; return; } int fd = [serverSocket socketFD]; if (fd == -1) { NSLog(@"⚠️ Health check failed: invalid socket"); [self _startInternal]; return; } struct sockaddr_in addr; socklen_t addrLen = sizeof(addr); if (getsockname(fd, (struct sockaddr *)&addr, &addrLen) == -1) { NSLog(@"⚠️ Health check failed: socket not bound"); [self _startInternal]; return; } // ⭐️ 异步检查重复socket,避免阻塞 dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_LOW, 0), ^{ [self _checkDuplicateSockets]; }); NSLog(@"✅ Health check passed (Port: %d, FD: %d)", currentPort, fd); } - (void)_checkDuplicateSockets { // ⭐️ 设置超时,防止popen阻塞过久 NSString *command = [NSString stringWithFormat:@"timeout 2 lsof -i UDP:%d -n -P 2>/dev/null || echo timeout", currentPort]; FILE *pipe = popen([command UTF8String], "r"); if (!pipe) { return; } char buffer[256]; int count = 0; BOOL timedOut = NO; while (fgets(buffer, sizeof(buffer), pipe) != NULL) { if (strstr(buffer, "timeout") != NULL) { timedOut = YES; break; } count++; } pclose(pipe); if (timedOut) { NSLog(@"⚠️ Socket check timed out"); return; } count = MAX(0, count - 1); if (count > 1) { NSLog(@"⚠️ WARNING: Found %d sockets on port %d!", count, currentPort); } } #pragma mark - Restart Logic - (void)_cancelRestartTimer { if (restartTimer) { dispatch_source_cancel(restartTimer); restartTimer = nil; } } - (void)_scheduleRestartWithBackoff { [self _cancelRestartTimer]; const NSUInteger maxAttempts = 10; if (restartAttempts >= maxAttempts) { NSLog(@"❌ Maximum restart attempts reached"); return; } restartAttempts++; NSTimeInterval delay = MIN(pow(2, restartAttempts - 1), 60.0); NSLog(@"⏰ Scheduling restart in %.1f seconds", delay); restartTimer = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, serverQueue); dispatch_source_set_timer(restartTimer, dispatch_time(DISPATCH_TIME_NOW, delay * NSEC_PER_SEC), DISPATCH_TIME_FOREVER, 0.1 * NSEC_PER_SEC); dispatch_source_set_event_handler(restartTimer, ^{ [self _startInternal]; }); dispatch_resume(restartTimer); } - (void)_tryFallbackPorts { NSLog(@"🔍 Searching for fallback port..."); // ⭐️ 限制扫描范围,避免卡死 uint16_t scanLimit = MIN(FALLBACK_PORT_END, FALLBACK_PORT_START + 100); for (uint16_t port = FALLBACK_PORT_START; port <= scanLimit; port++) { if ([self _isPortAvailable:port]) { currentPort = port; NSLog(@"✅ Found available port: %d", port); [self _startInternal]; return; } } NSLog(@"❌ No available fallback ports"); [self _scheduleRestartWithBackoff]; } - (BOOL)_isPortAvailable:(uint16_t)port { int testSocket = socket(AF_INET, SOCK_DGRAM, 0); if (testSocket < 0) { return NO; } // ⭐️ 设置非阻塞和超时 fcntl(testSocket, F_SETFL, fcntl(testSocket, F_GETFL, 0) | O_NONBLOCK); struct timeval timeout = {1, 0}; // 1秒超时 setsockopt(testSocket, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof(timeout)); setsockopt(testSocket, SOL_SOCKET, SO_SNDTIMEO, &timeout, sizeof(timeout)); int reuseAddr = 1; setsockopt(testSocket, SOL_SOCKET, SO_REUSEADDR, &reuseAddr, sizeof(reuseAddr)); struct sockaddr_in addr; memset(&addr, 0, sizeof(addr)); addr.sin_family = AF_INET; addr.sin_port = htons(port); addr.sin_addr.s_addr = INADDR_ANY; int result = bind(testSocket, (struct sockaddr *)&addr, sizeof(addr)); close(testSocket); return (result == 0); } - (void)scheduleRestart { dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(5.0 * NSEC_PER_SEC)), self->serverQueue, ^{ // 这里改为 serverQueue [self start]; }); } #pragma mark - GCDAsyncUdpSocket Delegate - (void)udpSocket:(GCDAsyncUdpSocket *)sock didConnectToAddress:(NSData *)address { NSLog(@"✅ Connected"); } - (void)udpSocket:(GCDAsyncUdpSocket *)sock didReceiveData:(NSData *)data fromAddress:(NSData *)address withFilterContext:(id)filterContext { @autoreleasepool { if (data.length > 65507) { NSLog(@"⚠️ Oversized packet: %lu bytes", (unsigned long)data.length); return; } NSString *datastr = [[NSString alloc] initWithData:data encoding:NSUTF8StringEncoding]; if (!datastr) { NSLog(@"⚠️ Failed to decode data"); return; } // ⭐️ 异步处理,避免阻塞接收 dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{ @autoreleasepool { UDPHandler *handle = [UDPHandler sharedInstance]; NSString *res = [handle handle:datastr]; if (res) { [self _sendResponse:res toAddress:address fromSocket:sock]; } } }); } } - (void)_sendResponse:(NSString *)response toAddress:(NSData *)address fromSocket:(GCDAsyncUdpSocket *)sock { dispatch_async(serverQueue, ^{ if (!sock || sock.isClosed) { return; } NSData *responseData = [response dataUsingEncoding:NSUTF8StringEncoding]; if (!responseData || responseData.length > 65507) { return; } long tag = ++self->_currentTag; self->_pendingSends[@(tag)] = @{ @"response": response, @"address": address, @"timestamp": @([[NSDate date] timeIntervalSince1970]) }; [sock sendData:responseData toAddress:address withTimeout:SEND_TIMEOUT tag:tag]; dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)((SEND_TIMEOUT + 1.0) * NSEC_PER_SEC)), self->serverQueue, ^{ [self _checkSendTimeout:tag]; }); }); } - (void)_checkSendTimeout:(long)tag { NSDictionary *pendingData = _pendingSends[@(tag)]; if (pendingData) { [_pendingSends removeObjectForKey:@(tag)]; } } - (void)udpSocket:(GCDAsyncUdpSocket *)sock didNotConnect:(NSError *)error { NSLog(@"❌ Did not connect: %@", error); } - (void)udpSocket:(GCDAsyncUdpSocket *)sock didSendDataWithTag:(long)tag { [_pendingSends removeObjectForKey:@(tag)]; } - (void)udpSocket:(GCDAsyncUdpSocket *)sock didNotSendDataWithTag:(long)tag dueToError:(NSError *)error { [_pendingSends removeObjectForKey:@(tag)]; if (error.code == 57) { // ENOTCONN [self _startInternal]; } } - (void)udpSocketDidClose:(GCDAsyncUdpSocket *)sock withError:(NSError *)error { NSLog(@"⚠️ Socket closed: %@", error); if (sock == serverSocket) { serverSocket = nil; } if (error && !isStopping) { [self scheduleRestart]; } } - (NSString *)_addressToString:(NSData *)addressData { if (addressData.length < sizeof(struct sockaddr_in)) { return @"unknown"; } struct sockaddr_in *addr = (struct sockaddr_in *)addressData.bytes; char ipStr[INET_ADDRSTRLEN]; inet_ntop(AF_INET, &addr->sin_addr, ipStr, sizeof(ipStr)); return [NSString stringWithFormat:@"%s:%d", ipStr, ntohs(addr->sin_port)]; } - (void)dealloc { [self _cancelRestartTimer]; [self _stopInternal]; } @end