Initial implementations

This commit is contained in:
2023-12-29 18:43:54 +06:00
commit 4f97224ab9
26 changed files with 1190 additions and 0 deletions

View File

@@ -0,0 +1,269 @@
package backup
import aws.sdk.kotlin.services.s3.*
import aws.sdk.kotlin.services.s3.model.CompletedMultipartUpload
import aws.sdk.kotlin.services.s3.model.CompletedPart
import aws.sdk.kotlin.services.s3.model.GetObjectRequest
import aws.sdk.kotlin.services.s3.model.UploadPartResponse
import aws.smithy.kotlin.runtime.content.ByteStream
import aws.smithy.kotlin.runtime.content.toByteArray
import aws.smithy.kotlin.runtime.content.toInputStream
import kotlinx.coroutines.*
import ziputils.*
import ziputils.CentralDirectoryFileHeader
import ziputils.EndOfCentralDirectoryLocator
import ziputils.EndOfCentralDirectoryRecord
import ziputils.EndOfCentralDirectoryRecord64
import java.io.*
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.attribute.BasicFileAttributeView
import java.time.Instant
import java.util.zip.ZipEntry
import java.util.zip.ZipInputStream
import java.util.zip.ZipOutputStream
import kotlin.io.path.createDirectory
class BackupClient(
private val s3: S3Client,
private val bucketName: String,
private val bufSize: Int = 1024 * 1024 * 100
) {
suspend fun upload(file: File) = coroutineScope {
val backupKey = "${file.name}/${Instant.now()}.zip"
PipedInputStream().use { inputStream ->
val outputStream = PipedOutputStream(inputStream)
val zipper = launch(Dispatchers.IO) {
file.compressToZip(outputStream)
}
val data = ByteArray(bufSize)
val initialRead = inputStream.readNBytes(data, 0, bufSize)
if (initialRead == bufSize) {
// Large upload, use multipart
// TODO: multipart uploads can be asynchronous, which would improve
// performance a little bit for big uploads.
val upload = s3.createMultipartUpload {
bucket = bucketName
key = backupKey
}
try {
val uploadParts = mutableListOf<CompletedPart>()
var number = 1
var bytesRead = initialRead
while (bytesRead > 0) {
val part = s3.uploadPart {
bucket = bucketName
key = backupKey
partNumber = number
uploadId = upload.uploadId
body = ByteStream.fromBytes(data.take(bytesRead))
}.asCompletedPart(number)
uploadParts.add(part)
number++
bytesRead = inputStream.readNBytes(data, 0, bufSize)
}
s3.completeMultipartUpload {
bucket = bucketName
key = backupKey
uploadId = upload.uploadId
multipartUpload = CompletedMultipartUpload {
parts = uploadParts
}
}
} catch (e: Exception) {
s3.abortMultipartUpload {
bucket = bucketName
key = backupKey
uploadId = upload.uploadId
}
throw e
}
} else {
// Small upload, use single request
s3.putObject {
bucket = bucketName
key = backupKey
body = ByteStream.fromBytes(data.take(initialRead))
}
}
zipper.join() // Should be instant
}
backupKey
}
suspend fun restore(destination: Path, backupKey: String) = coroutineScope {
val req = GetObjectRequest {
bucket = bucketName
key = backupKey
}
s3.getObject(req) { resp ->
ZipInputStream(
resp.body?.toInputStream()
?: throw IOException("S3 response is missing body")
).use { zipStream ->
zipStream.decompress { destination.resolve(it) }
}
}
}
suspend fun restoreFile(destination: Path, backupKey: String, fileName: String) = coroutineScope {
// For byte ranges refer to https://pkware.cachefly.net/webdocs/APPNOTE/APPNOTE-6.3.9.TXT
val eocdReq = GetObjectRequest {
bucket = bucketName
key = backupKey
// Assumption: EOCD has an empty comment
// Assumption: Backups are at least 22 + 20 (= 42) bytes. Only COMPLETELY empty backups can be smaller,
// in which case this function would error anyway, so it should be fine to have this edge-case.
range = "bytes=-${EndOfCentralDirectoryRecord.SIZE + EndOfCentralDirectoryLocator.SIZE}"
}
val eocdBytes = s3.getObject(eocdReq) { resp ->
val bytes = resp.body?.toByteArray() ?: throw IOException("S3 response is missing body")
bytes
}
val eocd = EndOfCentralDirectoryRecord.fromByteArray(eocdBytes, EndOfCentralDirectoryLocator.SIZE)
val eocd64 = if (eocd.eocd64Required()) {
val locator = EndOfCentralDirectoryLocator.fromByteArray(eocdBytes, 0)
val eocd64Req = GetObjectRequest {
bucket = bucketName
key = backupKey
range = "bytes=${locator.endOfCentralDirectory64Offset}-"
}
s3.getObject(eocd64Req) { resp ->
val bytes = resp.body?.toByteArray() ?: throw IOException("S3 response is missing body")
EndOfCentralDirectoryRecord64.fromByteArray(bytes, 0)
}
} else null
val cenOffset = if (eocd.centralDirectoryOffset == 0xffffffffU && eocd64 != null) {
eocd64.centralDirectoryOffset
} else eocd.centralDirectoryOffset.toULong()
val censReq = GetObjectRequest {
bucket = bucketName
key = backupKey
// We only know where to fetch until if we've also fetched EOCD64 (which isn't always the case).
// So just over-fetch a little bit, these headers aren't that big anyway.
range = "bytes=${cenOffset}-"
}
val cen = s3.getObject(censReq) { resp ->
val bytes = resp.body?.toByteArray() ?: throw IOException("S3 response is missing body")
var p = 0
while (p < bytes.size) {
try {
val cen = CentralDirectoryFileHeader.fromByteArray(bytes, p)
p += cen.size
if (cen.fileName == fileName) return@getObject cen
} catch (_: InvalidSignatureException) {
return@getObject null
}
}
null
} ?: throw FileNotFoundException("File '${fileName}' not found in backup")
val localHeaderOffset = cen.extraFieldRecords.firstNotNullOfOrNull {
if (it is Zip64ExtraFieldRecord && it.localHeaderOffset != null) it else null
}?.localHeaderOffset ?: cen.localHeaderOffset.toULong()
val compressedSize = cen.extraFieldRecords.firstNotNullOfOrNull {
if (it is Zip64ExtraFieldRecord && it.compressedSize != null) it else null
}?.compressedSize ?: cen.compressedSize.toULong()
val req = GetObjectRequest {
bucket = bucketName
key = backupKey
range = "bytes=${localHeaderOffset}-${
// Add CEN min size (46 bytes) so that the next CEN / LOC header is seen by the ZipInputStream
// and so it can see the current entry has stopped.
// Note: yes ZipInputStream should know the exact content length from the LOC, but it was still sending
// EOF errors. Perhaps due to fetching multiples of a power of two, or something else. But this helps.
localHeaderOffset + cen.size.toULong() + compressedSize + CentralDirectoryFileHeader.SIZE.toULong()
}"
}
s3.getObject(req) { resp ->
ZipInputStream(
resp.body?.toInputStream()
?: throw IOException("S3 response is missing body")
).use { zipStream ->
zipStream.decompress { name -> destination.resolve(name.takeLastWhile { it != '/' }) }
}
}
}
}
private fun UploadPartResponse.asCompletedPart(number: Int): CompletedPart {
val part = this
return CompletedPart {
partNumber = number
eTag = part.eTag
checksumSha256 = part.checksumSha256
checksumSha1 = part.checksumSha1
checksumCrc32 = part.checksumCrc32
checksumCrc32C = part.checksumCrc32C
}
}
private fun ByteArray.take(n: Int) =
if (n == size) this // No copy
else asList().subList(0, n).toByteArray() // TODO: One copy (toByteArray()), not sure how to do 0 copies here
private fun File.compressToZip(outputStream: OutputStream) = ZipOutputStream(outputStream).use { zipStream ->
val parentDir = this.absoluteFile.parent + "/"
val fileQueue = ArrayDeque<File>()
fileQueue.add(this)
fileQueue.forEach { subFile ->
val path = subFile.absolutePath.removePrefix(parentDir)
val subFiles = subFile.listFiles()
if (subFiles != null) { // Is a directory
val entry = ZipEntry("$path/")
setZipAttributes(entry, subFile.toPath())
zipStream.putNextEntry(entry)
fileQueue.addAll(subFiles)
} else { // Otherwise, treat it as a file
BufferedInputStream(subFile.inputStream()).use { origin ->
val entry = ZipEntry(path)
setZipAttributes(entry, subFile.toPath())
zipStream.putNextEntry(entry)
origin.copyTo(zipStream)
}
}
}
}
private fun ZipInputStream.decompress(
bufSize: Int = 1024 * 1024,
entryNameToPath: (String) -> Path
) {
var entry = this.nextEntry
while (entry != null) {
val path = entryNameToPath(entry.name)
if (entry.isDirectory) {
path.createDirectory()
} else {
val buf = ByteArray(bufSize)
path.toFile().outputStream().use { fileStream ->
var bytesRead = this.read(buf)
while (bytesRead > 0) {
fileStream.write(buf, 0, bytesRead)
bytesRead = this.read(buf)
}
}
}
applyZipAttributes(entry, path)
entry = this.nextEntry
}
}
private fun setZipAttributes(entry: ZipEntry, path: Path) {
try {
val attrs = Files.getFileAttributeView(path, BasicFileAttributeView::class.java).readAttributes()
entry.setCreationTime(attrs.creationTime())
entry.setLastModifiedTime(attrs.lastModifiedTime())
entry.setLastAccessTime(attrs.lastAccessTime())
} catch (_: IOException) {
}
}
private fun applyZipAttributes(entry: ZipEntry, path: Path) {
try {
val attrs = Files.getFileAttributeView(path, BasicFileAttributeView::class.java)
attrs.setTimes(entry.lastModifiedTime, entry.lastAccessTime, entry.creationTime)
} catch (_: IOException) {
}
}

View File

@@ -0,0 +1,13 @@
package backup
import aws.sdk.kotlin.services.s3.S3Client
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.runBlocking
import java.io.File
import kotlin.io.path.Path
fun main() = runBlocking {
S3Client.fromEnvironment().use { s3 ->
val backupClient = BackupClient(s3, "teamcity-executors-test-task", 1024 * 1024 * 10)
}
}

View File

@@ -0,0 +1,93 @@
package ziputils
import java.nio.ByteBuffer
import java.nio.ByteOrder
internal class CentralDirectoryFileHeader(
val compressedSize: UInt,
val uncompressedSize: UInt,
val nameLength: UShort,
val extraFieldLength: UShort,
val commentLength: UShort,
val disk: UShort,
val localHeaderOffset: UInt,
val fileName: String,
val extraFieldRecords: List<ExtraFieldRecord>
) {
val size: Int
get() = SIZE + nameLength.toInt() + extraFieldLength.toInt() + commentLength.toInt()
companion object {
const val SIGNATURE = 0x02014b50U
const val SIZE = 46
/**
* Create CentralDirectoryFileHeader from raw byte data.
* @throws InvalidDataException provided ByteArray is not a supported CEN.
*/
@Throws(InvalidDataException::class)
fun fromByteArray(data: ByteArray, offset: Int): CentralDirectoryFileHeader {
if (data.size - offset < SIZE) {
throw InvalidDataException("CEN must be at least 46 bytes")
}
val buf = ByteBuffer.wrap(data, offset, 46).order(ByteOrder.LITTLE_ENDIAN)
if (buf.getInt().toUInt() != SIGNATURE) {
throw InvalidSignatureException("Invalid signature")
}
val extraFieldRecords = mutableListOf<ExtraFieldRecord>()
val nameLength = buf.getShort(offset + 28).toUShort()
buf.position(offset + 20)
val cen = CentralDirectoryFileHeader(
compressedSize = buf.getInt().toUInt(),
uncompressedSize = buf.getInt().toUInt(),
nameLength = nameLength
.also { buf.position(offset + 30) },
extraFieldLength = buf.getShort().toUShort(),
commentLength = buf.getShort().toUShort(),
disk = buf.getShort().toUShort()
.also { buf.position(offset + 42) },
localHeaderOffset = buf.getInt().toUInt(),
fileName = String(data.sliceArray(offset + SIZE..<offset + SIZE + nameLength.toInt())),
extraFieldRecords = extraFieldRecords
)
if (data.size - offset < cen.size) {
throw InvalidDataException("CEN is too short")
}
// Parse extra field records
val extraFieldsBuf = ByteBuffer.wrap(
data, offset + SIZE + cen.nameLength.toInt(), cen.extraFieldLength.toInt()
).order(ByteOrder.LITTLE_ENDIAN)
while (extraFieldsBuf.remaining() > 0) {
val id = extraFieldsBuf.getShort().toUShort()
val size = extraFieldsBuf.getShort().toUShort()
extraFieldRecords.add(when (id) {
Zip64ExtraFieldRecord.ID -> {
Zip64ExtraFieldRecord(
size,
if (cen.uncompressedSize == 0xffffffffU) {
extraFieldsBuf.getLong().toULong()
} else null,
if (cen.compressedSize == 0xffffffffU) {
extraFieldsBuf.getLong().toULong()
} else null,
if (cen.localHeaderOffset == 0xffffffffU) {
extraFieldsBuf.getLong().toULong()
} else null,
if (cen.disk == 0xffffU.toUShort()) {
extraFieldsBuf.getInt().toUInt()
} else null
)
}
else -> {
extraFieldsBuf.position(extraFieldsBuf.position() + size.toInt())
ExtraFieldRecord(id, size)
}
})
}
return cen
}
}
}

View File

@@ -0,0 +1,25 @@
package ziputils
import java.nio.ByteBuffer
import java.nio.ByteOrder
internal class EndOfCentralDirectoryLocator(
val endOfCentralDirectory64Offset: ULong
) {
companion object {
const val SIGNATURE = 0x07064b50U
const val SIZE = 20
@Throws(InvalidDataException::class)
fun fromByteArray(data: ByteArray, offset: Int): EndOfCentralDirectoryLocator {
if (data.size - offset < SIZE) {
throw InvalidDataException("EOCD64 locator must be at least 20 bytes")
}
val buf = ByteBuffer.wrap(data, offset, SIZE).order(ByteOrder.LITTLE_ENDIAN)
if (buf.getInt().toUInt() != SIGNATURE) {
throw InvalidSignatureException("Invalid signature")
}
buf.position(offset + 8)
return EndOfCentralDirectoryLocator(buf.getLong().toULong())
}
}
}

View File

@@ -0,0 +1,38 @@
package ziputils
import java.nio.ByteBuffer
import java.nio.ByteOrder
/**
* Partial End of Central Directory record class.
* Only supports data required by the backup tool.
*/
internal class EndOfCentralDirectoryRecord(
val centralDirectoryOffset: UInt
) {
fun eocd64Required(): Boolean =
centralDirectoryOffset == 0xffffffffU
companion object {
const val SIGNATURE = 0x06054b50U
const val SIZE = 22
/**
* Create EndOfCentralDirectoryRecord from raw byte data.
* @throws InvalidDataException provided ByteArray is not a supported EOCD64.
*/
@Throws(InvalidDataException::class)
fun fromByteArray(data: ByteArray, offset: Int): EndOfCentralDirectoryRecord {
if (data.size - offset < SIZE) {
throw InvalidDataException("EOCD must be at least 22 bytes")
}
val buf = ByteBuffer.wrap(data, offset, SIZE).order(ByteOrder.LITTLE_ENDIAN)
if (buf.getInt().toUInt() != SIGNATURE) {
throw InvalidSignatureException("Invalid signature")
}
buf.position(offset + 16)
return EndOfCentralDirectoryRecord(
centralDirectoryOffset = buf.getInt().toUInt()
)
}
}
}

View File

@@ -0,0 +1,35 @@
package ziputils
import java.nio.ByteBuffer
import java.nio.ByteOrder
/**
* Partial End of Central Directory record (ZIP64) class.
* Only supports data required by the backup tool.
*/
internal class EndOfCentralDirectoryRecord64(
val centralDirectoryOffset: ULong
) {
companion object {
const val SIGNATURE = 0x06064b50U
const val SIZE = 56
/**
* Create EndOfCentralDirectoryRecord64 from raw byte data.
* @throws InvalidDataException provided ByteArray is not a supported EOCD.
*/
@Throws(InvalidDataException::class)
fun fromByteArray(data: ByteArray, offset: Int): EndOfCentralDirectoryRecord64 {
if (data.size - offset < SIZE) {
throw InvalidDataException("EOCD64 must be at least 56 bytes")
}
val buf = ByteBuffer.wrap(data, offset, SIZE).order(ByteOrder.LITTLE_ENDIAN)
if (buf.getInt().toUInt() != SIGNATURE) {
throw InvalidSignatureException("Invalid signature")
}
buf.position(offset + 48)
return EndOfCentralDirectoryRecord64(
centralDirectoryOffset = buf.getLong().toULong()
)
}
}
}

View File

@@ -0,0 +1,4 @@
package ziputils
class InvalidDataException(message: String): Exception(message)
class InvalidSignatureException(message: String): Exception(message)

View File

@@ -0,0 +1,6 @@
package ziputils
internal open class ExtraFieldRecord(
val id: UShort,
val size: UShort
)

View File

@@ -0,0 +1,6 @@
# ZipUtils
These are **internal** utility classes for reading zip file metadata. They only implement what is required for the
backup tool, and no more.
Specifically, we are looking at the [ZIP v6.3.9 specification](https://pkware.cachefly.net/webdocs/APPNOTE/APPNOTE-6.3.9.TXT).

View File

@@ -0,0 +1,13 @@
package ziputils
internal class Zip64ExtraFieldRecord(
size: UShort,
val uncompressedSize: ULong?,
val compressedSize: ULong?,
val localHeaderOffset: ULong?,
val disk: UInt?
): ExtraFieldRecord(ID, size) {
companion object {
const val ID: UShort = 0x0001U
}
}