GBA001/Delta/Database/InputStreamOutputWriter.swift

149 lines
4.3 KiB
Swift

//
// InputStreamOutputWriter.swift
// Delta
//
// Created by Riley Testut on 12/25/16.
// Copyright © 2016 Riley Testut. All rights reserved.
//
import Foundation
private let MaximumBufferLength = 4 * 1024 // 4 KB
class InputStreamOutputWriter: NSObject
{
let inputStream: InputStream
let outputStream: OutputStream
fileprivate var completion: ((Error?) -> Void)?
fileprivate var dataBuffer = Data(capacity: MaximumBufferLength * 2)
init(inputStream: InputStream, outputStream: OutputStream)
{
self.inputStream = inputStream
self.outputStream = outputStream
super.init()
self.inputStream.delegate = self
self.outputStream.delegate = self
}
func start(with completion: @escaping ((Error?) -> Void))
{
guard self.completion == nil else { return }
self.completion = completion
let writingQueue = DispatchQueue(label: "com.rileytestut.InputStreamOutputWriter.writingQueue", qos: .userInitiated)
writingQueue.async {
self.inputStream.schedule(in: .current, forMode: .defaultRunLoopMode)
self.outputStream.schedule(in: .current, forMode: .defaultRunLoopMode)
self.outputStream.open()
self.inputStream.open()
RunLoop.current.run()
}
}
}
private extension InputStreamOutputWriter
{
func writeDataBuffer()
{
while self.outputStream.hasSpaceAvailable && self.dataBuffer.count > 0
{
self.dataBuffer.withUnsafeMutableBytes { (buffer: UnsafeMutablePointer<UInt8>) -> Void in
let writtenBytesCount = self.outputStream.write(buffer, maxLength: self.dataBuffer.count)
if writtenBytesCount >= 0
{
self.dataBuffer.removeSubrange(0 ..< writtenBytesCount)
}
}
}
}
func finishWriting()
{
self.inputStream.close()
self.outputStream.close()
self.inputStream.remove(from: .current, forMode: .commonModes)
self.outputStream.remove(from: .current, forMode: .commonModes)
self.completion?(self.inputStream.streamError ?? self.outputStream.streamError)
CFRunLoopStop(CFRunLoopGetCurrent())
}
}
extension InputStreamOutputWriter: StreamDelegate
{
func stream(_ aStream: Stream, handle eventCode: Stream.Event)
{
if let inputStream = aStream as? InputStream
{
self.inputStream(inputStream, handle: eventCode)
}
else if let outputStream = aStream as? OutputStream
{
self.outputStream(outputStream, handle: eventCode)
}
}
private func inputStream(_ inputStream: InputStream, handle eventCode: Stream.Event)
{
switch eventCode
{
case Stream.Event.hasBytesAvailable:
guard inputStream.streamError == nil else { return }
while inputStream.hasBytesAvailable
{
let buffer = UnsafeMutablePointer<UInt8>.allocate(capacity: MaximumBufferLength)
let readBytesCount = inputStream.read(buffer, maxLength: MaximumBufferLength)
guard readBytesCount >= 0 else { break }
self.dataBuffer.append(buffer, count: readBytesCount)
buffer.deallocate(capacity: MaximumBufferLength)
self.writeDataBuffer()
}
case Stream.Event.endEncountered:
if self.dataBuffer.count == 0
{
self.finishWriting()
}
case Stream.Event.errorOccurred: self.finishWriting()
default: break
}
}
private func outputStream(_ outputStream: OutputStream, handle eventCode: Stream.Event)
{
switch eventCode
{
case Stream.Event.hasSpaceAvailable:
self.writeDataBuffer()
if self.inputStream.streamStatus == .atEnd
{
self.finishWriting()
}
case Stream.Event.errorOccurred: self.finishWriting()
default: break
}
}
}