Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IHP-31 Minio asset paths #34

Merged
merged 6 commits into from
Sep 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ params.conf
.bsp
converted-to-torchscript.pt
nsfw_model.pt
pics

target/
!.mvn/wrapper/maven-wrapper.jar
Expand Down
7 changes: 5 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ lazy val domain = (project in file("domain"))
.settings(
libraryDependencies ++= Seq(
newtype,
enumeratum
enumeratum,
chimney
) ++ codecs
)
.settings(
Expand Down Expand Up @@ -70,7 +71,9 @@ lazy val resizer = (project in file("resizer"))
assembly / mainClass := Some("com.github.baklanovsoft.imagehosting.resizer.Main")
)
.settings(
libraryDependencies ++= Seq(imgscalr) ++ Seq(
libraryDependencies ++= Seq(
imgscalr
) ++ Seq(
pureconfig,
logging
).flatten
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,24 @@ package com.github.baklanovsoft.imagehosting.s3

import cats.effect.kernel.Sync
import cats.implicits._
import com.github.baklanovsoft.imagehosting.BucketId
import com.github.baklanovsoft.imagehosting.{BucketId, ImageMeta}
import io.minio.{GetObjectArgs, MakeBucketArgs, MinioClient => MinioClientJava, PutObjectArgs, RemoveBucketArgs}

import java.io.InputStream

trait MinioClient[F[_]] {
def makeBucket(bucketId: BucketId): F[Unit]
def dropBucket(bucketId: BucketId): F[Unit]

def putObject(
bucketId: BucketId,
objectName: String,
def putImage(
imageMeta: ImageMeta,
stream: InputStream,
contentType: String,
folder: Option[String] = None
contentType: String
): F[Unit]

// todo ensure streams are closed
def getObject(bucketId: BucketId, objectName: String, folder: Option[String] = None): F[InputStream]
def getImage(imageMeta: ImageMeta): F[InputStream]

// those are not really used since storage app manages the buckets
def makeBucket(bucketId: BucketId): F[Unit]
def dropBucket(bucketId: BucketId): F[Unit]
}

object MinioClient {
Expand All @@ -34,56 +33,47 @@ object MinioClient {
.credentials(username, password)
.build()

override def makeBucket(bucketId: BucketId): F[Unit] =
Sync[F].delay {
client.makeBucket(MakeBucketArgs.builder().bucket(bucketId.value.toString).build())
}

override def putObject(
bucketId: BucketId,
objectName: String,
override def putImage(
imageMeta: ImageMeta,
stream: InputStream,
contentType: String,
folder: Option[String] = None
contentType: String
): F[Unit] =
Sync[F].delay {

val path = folder.fold(objectName)(f => s"$f/$objectName")

client
.putObject(
PutObjectArgs
.builder()
.bucket(bucketId.value.toString)
.`object`(path)
.bucket(imageMeta.bucket.value.toString)
.`object`(imageMeta.path)
.stream(stream, -1, 1024 * 1024 * 5)
.contentType(contentType)
.build()
)

}.void

override def getObject(
bucketId: BucketId,
objectName: String,
folder: Option[String] = None
override def getImage(
imageMeta: ImageMeta
): F[InputStream] =
Sync[F].delay {
val path = folder.fold(objectName)(f => s"$f/$objectName")

val inputStream: InputStream =
client
.getObject(
GetObjectArgs
.builder()
.bucket(bucketId.value.toString)
.`object`(path)
.bucket(imageMeta.bucket.value.toString)
.`object`(imageMeta.path)
.build()
)

inputStream
}

override def makeBucket(bucketId: BucketId): F[Unit] =
Sync[F].delay {
client.makeBucket(MakeBucketArgs.builder().bucket(bucketId.value.toString).build())
}

override def dropBucket(bucketId: BucketId): F[Unit] =
Sync[F].delay {
client.removeBucket(RemoveBucketArgs.builder().bucket(bucketId.value.toString).build())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ import io.circe.generic.AutoDerivation
import io.circe.generic.semiauto.deriveCodec

final case class Categories(
bucketId: BucketId,
imageId: ImageId,
image: ImageMeta,
categories: Map[Category, Score]
) {
def isEmpty: Boolean = categories.isEmpty
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package com.github.baklanovsoft.imagehosting

import com.github.baklanovsoft.imagehosting.common.NewtypeCodecs
import io.circe.Codec
import io.circe.generic.AutoDerivation
import io.circe.generic.semiauto.deriveCodec
import io.scalaland.chimney.Transformer
import io.scalaland.chimney.dsl._

/** Information for s3 on how to extract/put image
*/
final case class ImageMeta(
bucket: BucketId,
prefix: Prefix,
name: ImageName
) {
val path = s"$prefix/$name"

override def toString: String =
s"$bucket/$path"

def rename(newName: ImageName): ImageMeta =
this.copy(name = newName)
}

object ImageMeta extends NewtypeCodecs with AutoDerivation {

implicit val codec: Codec[NewImage] = deriveCodec

implicit val transformer: Transformer[NewImage, ImageMeta] =
(src: NewImage) =>
src
.into[ImageMeta]
.withFieldRenamed(_.image, _.name)
.transform
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@ import io.circe.Codec
import io.circe.generic.AutoDerivation
import io.circe.generic.semiauto.deriveCodec

/** New image upload notification received from kafka
*/
final case class NewImage(
bucketId: BucketId,
imageId: ImageId
bucket: BucketId,
prefix: Prefix,
image: ImageName
)

object NewImage extends NewtypeCodecs with AutoDerivation {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import java.util.UUID
package object imagehosting {
@newtype final case class BucketId(value: UUID)

@newtype final case class ImageId(value: UUID)
@newtype final case class Prefix(value: UUID)

@newtype final case class ImageName(value: String)

@newtype final case class Category(value: String)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@ class CategoriesCodecSpec extends AnyFunSuite with Matchers with EitherValues {
.parse(
"""
|{
| "bucketId" : "95fb5f81-9280-4a8f-850f-2f3438bcfe24",
| "imageId" : "22c403e8-3093-485a-834f-542504601e88",
| "categories" : {
| "person" : 0.9329947,
| "dog" : 0.83488,
| "nsfw" : 0.9342
| "image": {
| "bucket": "00000000-0000-0000-0000-000000000000",
| "prefix": "557b036f-c61f-40b6-ba13-4708519a566f",
| "name": "original.jpg"
| },
| "categories": {
| "person": 0.9329947,
| "dog": 0.83488,
| "nsfw": 0.9342
| }
|}
|""".stripMargin
Expand All @@ -30,8 +33,11 @@ class CategoriesCodecSpec extends AnyFunSuite with Matchers with EitherValues {

val expected =
Categories(
BucketId(UUID.fromString("95fb5f81-9280-4a8f-850f-2f3438bcfe24")),
ImageId(UUID.fromString("22c403e8-3093-485a-834f-542504601e88")),
ImageMeta(
BucketId(UUID.fromString("00000000-0000-0000-0000-000000000000")),
Prefix(UUID.fromString("557b036f-c61f-40b6-ba13-4708519a566f")),
ImageName("original.jpg")
),
Map(
Category("person") -> Score(0.9329947),
Category("dog") -> Score(0.83488),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,19 @@ class NewImageCodecSpec extends AnyFunSuite with Matchers with EitherValues {
.parse(
"""
|{
| "bucketId": "95fb5f81-9280-4a8f-850f-2f3438bcfe24",
| "imageId": "22c403e8-3093-485a-834f-542504601e88"
| "bucket": "00000000-0000-0000-0000-000000000000",
| "prefix": "557b036f-c61f-40b6-ba13-4708519a566f",
| "image": "original.jpg"
|}
|""".stripMargin
)
.value

val expected =
NewImage(
BucketId(UUID.fromString("95fb5f81-9280-4a8f-850f-2f3438bcfe24")),
ImageId(UUID.fromString("22c403e8-3093-485a-834f-542504601e88"))
BucketId(UUID.fromString("00000000-0000-0000-0000-000000000000")),
Prefix(UUID.fromString("557b036f-c61f-40b6-ba13-4708519a566f")),
ImageName("original.jpg")
)

json.as[NewImage].value mustBe expected: Unit
Expand Down
2 changes: 2 additions & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ object Dependencies {
private object Versions {
val cats = "2.12.0"
val catsEffect = "3.5.4"
val chimney = "1.4.0"
val circe = "0.14.10"

val djl = "0.30.0"
Expand Down Expand Up @@ -37,6 +38,7 @@ object Dependencies {

val cats = "org.typelevel" %% "cats-core" % Versions.cats
val catsEffect = "org.typelevel" %% "cats-effect" % Versions.catsEffect
val chimney = "io.scalaland" %% "chimney" % Versions.chimney

val codecs = Seq(
"io.circe" %% "circe-core" % Versions.circe,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ import ai.djl.modality.cv.ImageFactory
import cats.effect.Temporal
import cats.effect.kernel.{Async, Resource}
import cats.implicits._
import com.github.baklanovsoft.imagehosting.{Categories, NewImage}
import com.github.baklanovsoft.imagehosting.{Categories, ImageMeta, NewImage}
import com.github.baklanovsoft.imagehosting.kafka.{KafkaConsumer, KafkaJsonSerializer}
import com.github.baklanovsoft.imagehosting.s3.MinioClient
import fs2.Stream
import fs2.kafka._
import org.apache.kafka.common.TopicPartition
import org.typelevel.log4cats.{Logger, LoggerFactory}
import io.scalaland.chimney.dsl._

import scala.concurrent.duration._

Expand All @@ -26,12 +27,12 @@ class CategorizationStream[F[_]: Async: Logger](

private val imageFactory = ImageFactory.getInstance()

private def processRecord(record: NewImage) = for {
is <- minioClient.getObject(record.bucketId, record.imageId.value.toString)
private def processRecord(imageMeta: ImageMeta) = for {
is <- minioClient.getImage(imageMeta)
image <- Async[F].delay(imageFactory.fromInputStream(is))
categories <- detection.detect(image, record.bucketId, record.imageId)
nsfw0 <- nsfw.detect(image, record.bucketId, record.imageId)
} yield Categories(bucketId = record.bucketId, imageId = record.imageId, categories = categories ++ nsfw0)
categories <- detection.detect(image, imageMeta)
nsfw0 <- nsfw.detect(image, imageMeta)
} yield Categories(imageMeta, categories = categories ++ nsfw0)

/** Transactional fs2kafka stream
*/
Expand Down Expand Up @@ -63,8 +64,10 @@ class CategorizationStream[F[_]: Async: Logger](
val consumerOffset = commitable.offset

for {
_ <- Logger[F].info(s"Kafka read [$p:$o] --- $msg")
categories <- processRecord(msg)
_ <- Logger[F].info(s"Kafka read [$p:$o] --- $msg")

s3Image = msg.transformInto[ImageMeta]
categories <- processRecord(s3Image)
record = ProducerRecord(categoriesTopic, (), categories)
} yield
if (categories.isEmpty) // don't send empty categories
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import ai.djl.translate.Translator
import cats.Monad
import cats.effect.kernel.{Resource, Sync}
import cats.implicits._
import com.github.baklanovsoft.imagehosting.{BucketId, Category, ImageId, Score}
import com.github.baklanovsoft.imagehosting.{Category, ImageMeta, Score}
import org.typelevel.log4cats.{Logger, LoggerFactory}

import java.nio.file.{Files, Paths}
Expand All @@ -20,13 +20,13 @@ trait NsfwDetection[F[_]] {

/** Will return nsfw category with score if nsfw detected
*/
def detect(image: Image, bucketId: BucketId, imageId: ImageId): F[Option[(Category, Score)]]
def detect(image: Image, imageMeta: ImageMeta): F[Option[(Category, Score)]]
}

object NsfwDetection {

def dummy[F[_]: Monad]: NsfwDetection[F] = new NsfwDetection[F] {
override def detect(image: Image, bucketId: BucketId, imageId: ImageId): F[Option[(Category, Score)]] =
override def detect(image: Image, imageMeta: ImageMeta): F[Option[(Category, Score)]] =
Monad[F].pure(None)
}

Expand Down Expand Up @@ -95,10 +95,10 @@ object NsfwDetection {
(_, predictor) <- acquireModelPredictor[F](modelPath, synsetPath)
} yield new NsfwDetection[F] {

override def detect(image: Image, bucketId: BucketId, imageId: ImageId): F[Option[(Category, Score)]] =
override def detect(image: Image, imageMeta: ImageMeta): F[Option[(Category, Score)]] =
for {
detected <- Sync[F].delay(predictor.predict(image))
_ <- logger.info(s"NSFW detection result for image $bucketId:$imageId: $detected")
_ <- logger.info(s"NSFW detection result for image $imageMeta: $detected")
} yield detected.getClassNames.asScala
.zip(detected.getProbabilities.asScala)
.toMap
Expand Down
Loading
Loading