From c05b70bef1e4e19e9b872d9a4fb9f5a0b9a05fbc Mon Sep 17 00:00:00 2001 From: Jincheng Miao Date: Mon, 17 Jun 2024 09:44:23 +0800 Subject: [PATCH] Add a new embedding MosecEmbedding Signed-off-by: Jincheng Miao --- comps/embeddings/langchain-mosec/README.md | 23 ++++ comps/embeddings/langchain-mosec/__init__.py | 2 + .../langchain-mosec/docker/Dockerfile | 15 +++ .../langchain-mosec/docker/README.md | 36 +++++ .../langchain-mosec/docker/server-ipex.py | 126 ++++++++++++++++++ .../langchain-mosec/docker/test-embedding.py | 16 +++ .../langchain-mosec/embedding_mosec.py | 76 +++++++++++ .../langchain-mosec/requirements.txt | 9 ++ 8 files changed, 303 insertions(+) create mode 100644 comps/embeddings/langchain-mosec/README.md create mode 100644 comps/embeddings/langchain-mosec/__init__.py create mode 100644 comps/embeddings/langchain-mosec/docker/Dockerfile create mode 100644 comps/embeddings/langchain-mosec/docker/README.md create mode 100644 comps/embeddings/langchain-mosec/docker/server-ipex.py create mode 100644 comps/embeddings/langchain-mosec/docker/test-embedding.py create mode 100644 comps/embeddings/langchain-mosec/embedding_mosec.py create mode 100644 comps/embeddings/langchain-mosec/requirements.txt diff --git a/comps/embeddings/langchain-mosec/README.md b/comps/embeddings/langchain-mosec/README.md new file mode 100644 index 000000000..aa6b40c0e --- /dev/null +++ b/comps/embeddings/langchain-mosec/README.md @@ -0,0 +1,23 @@ +# build Mosec docker image +``` +cd docker +docker build -t mosec:latest . +``` + +# launch Mosec docker container +``` +docker run -p $your_port:8000 -v ./data:/data --name mosec_server +``` + +# launch microservice at port 6000 by default +``` +OPENAI_API_BASE=http://localhost:$your_port python embedding_mosec.py +``` + +# run client test +``` +curl localhost:6000/v1/embeddings \ + -X POST \ + -d '{"text":"Hello, world!"}' \ + -H 'Content-Type: application/json' +``` diff --git a/comps/embeddings/langchain-mosec/__init__.py b/comps/embeddings/langchain-mosec/__init__.py new file mode 100644 index 000000000..916f3a44b --- /dev/null +++ b/comps/embeddings/langchain-mosec/__init__.py @@ -0,0 +1,2 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 diff --git a/comps/embeddings/langchain-mosec/docker/Dockerfile b/comps/embeddings/langchain-mosec/docker/Dockerfile new file mode 100644 index 000000000..c89c62093 --- /dev/null +++ b/comps/embeddings/langchain-mosec/docker/Dockerfile @@ -0,0 +1,15 @@ +From ubuntu:22.04 +ARG DEBIAN_FRONTEND=noninteractive + +COPY ./server-ipex.py /root/ + +RUN apt update && apt install -y python3 python3-pip +RUN pip3 install torch==2.2.2 torchvision --index-url https://download.pytorch.org/whl/cpu +RUN pip3 install intel-extension-for-pytorch==2.2.0 +RUN pip3 install transformers huggingface-cli +RUN pip3 install llmspec mosec + +RUN cd /root/ && export HF_ENDPOINT=https://hf-mirror.com && huggingface-cli download --resume-download BAAI/bge-large-zh --local-dir /root/bge-large-zh + +ENV EMB_MODEL="/root/bge-large-zh/" +CMD ["python3", "/root/server-ipex.py"] diff --git a/comps/embeddings/langchain-mosec/docker/README.md b/comps/embeddings/langchain-mosec/docker/README.md new file mode 100644 index 000000000..5f87b9a69 --- /dev/null +++ b/comps/embeddings/langchain-mosec/docker/README.md @@ -0,0 +1,36 @@ +# Embedding Server + +## 1. Introduction +This service has an OpenAI compatible restful API to extract text features. +It is dedicated to be used on Xeon to accelerate embedding model serving. +Currently the local model is BGE-large-zh. + +## 2. Quick Start +### 2.1 Build Docker image +```shell +docker build -t embedding:latest . +``` + +### 2.2 Launch server +```shell +docker run -itd -p 8000:8000 embedding:latest +``` + +### 2.3 Client test +- Restful API by curl +```shell +curl -X POST http://127.0.0.1:8000/v1/embeddings -H "Content-Type: application/json" -d '{ "model": "/root/bge-large-zh/", "input": "hello world"}' +``` + +- generate embedding from python +```python +DEFAULT_MODEL = "/root/bge-large-zh/" +SERVICE_URL="http://127.0.0.1:8000" +INPUT_STR="Hello world!" + +client = Client(api_key="fake", base_url=SERVICE_URL) +emb = client.embeddings.create( + model=DEFAULT_MODEL, + input=INPUT_STR, +) +``` diff --git a/comps/embeddings/langchain-mosec/docker/server-ipex.py b/comps/embeddings/langchain-mosec/docker/server-ipex.py new file mode 100644 index 000000000..f8eb3981b --- /dev/null +++ b/comps/embeddings/langchain-mosec/docker/server-ipex.py @@ -0,0 +1,126 @@ +import base64 +import os +from typing import List, Union + +import numpy as np +import torch # type: ignore +import torch.nn.functional as F # type: ignore +import transformers # type: ignore +from llmspec import EmbeddingData, EmbeddingRequest, EmbeddingResponse, TokenUsage + +from mosec import ClientError, Runtime, Server, Worker + +import intel_extension_for_pytorch as ipex + +DEFAULT_MODEL = "/root/bge-large-zh/" + + +class Embedding(Worker): + def __init__(self): + self.model_name = os.environ.get("EMB_MODEL", DEFAULT_MODEL) + self.tokenizer = transformers.AutoTokenizer.from_pretrained(self.model_name) + self.model = transformers.AutoModel.from_pretrained(self.model_name) + self.device = ( + torch.cuda.current_device() if torch.cuda.is_available() else "cpu" + ) + + self.model = self.model.to(self.device) + self.model.eval() + + # jit trace model + self.model = ipex.optimize(self.model, dtype=torch.bfloat16) + vocab_size = self.model.config.vocab_size + batch_size = 16 + seq_length = 512 + d = torch.randint(vocab_size, size=[batch_size, seq_length]) + t = torch.randint(0, 1, size=[batch_size, seq_length]) + m = torch.randint(1, 2, size=[batch_size, seq_length]) + self.model = torch.jit.trace(self.model, [d, t, m], check_trace=False, strict=False) + self.model = torch.jit.freeze(self.model) + self.model(d, t, m) + + def get_embedding_with_token_count( + self, sentences: Union[str, List[Union[str, List[int]]]] + ): + # Mean Pooling - Take attention mask into account for correct averaging + def mean_pooling(model_output, attention_mask): + # First element of model_output contains all token embeddings + token_embeddings = model_output['last_hidden_state'] + input_mask_expanded = ( + attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float() + ) + return torch.sum(token_embeddings * input_mask_expanded, 1) / torch.clamp( + input_mask_expanded.sum(1), min=1e-9 + ) + + # Tokenize sentences + # TODO: support `List[List[int]]` input + encoded_input = self.tokenizer( + sentences, padding=True, truncation=True, return_tensors="pt" + ) + inputs = encoded_input.to(self.device) + token_count = inputs["attention_mask"].sum(dim=1).tolist() + # Compute token embeddings + model_output = self.model(**inputs) + # Perform pooling + sentence_embeddings = mean_pooling(model_output, inputs["attention_mask"]) + # Normalize embeddings + sentence_embeddings = F.normalize(sentence_embeddings, p=2, dim=1) + + return token_count, sentence_embeddings + + def deserialize(self, data: bytes) -> EmbeddingRequest: + return EmbeddingRequest.from_bytes(data) + + def serialize(self, data: EmbeddingResponse) -> bytes: + return data.to_json() + + def forward(self, data: List[EmbeddingRequest]) -> List[EmbeddingResponse]: + inputs = [] + inputs_lens = [] + for d in data: + inputs.extend(d.input if isinstance(d.input, list) else [d.input]) + inputs_lens.append(len(d.input) if isinstance(d.input, list) else 1) + token_cnt, embeddings = self.get_embedding_with_token_count(inputs) + + embeddings = embeddings.detach() + if self.device != "cpu": + embeddings = embeddings.cpu() + embeddings = embeddings.numpy() + embeddings = [emb.tolist() for emb in embeddings] + + resp = [] + emb_idx = 0 + for lens in inputs_lens: + token_count = sum(token_cnt[emb_idx:emb_idx+lens]) + resp.append( + EmbeddingResponse( + data=[ + EmbeddingData(embedding=emb, index=i) + for i, emb in enumerate(embeddings[emb_idx:emb_idx+lens]) + ], + model=self.model_name, + usage=TokenUsage( + prompt_tokens=token_count, + # No completions performed, only embeddings generated. + completion_tokens=0, + total_tokens=token_count, + ), + ) + ) + emb_idx += lens + return resp + + +if __name__ == "__main__": + MAX_BATCH_SIZE = int(os.environ.get("MAX_BATCH_SIZE", 128)) + MAX_WAIT_TIME = int(os.environ.get("MAX_WAIT_TIME", 10)) + server = Server() + emb = Runtime(Embedding, max_batch_size=MAX_BATCH_SIZE, max_wait_time=MAX_WAIT_TIME) + server.register_runtime( + { + "/v1/embeddings": [emb], + "/embeddings": [emb], + } + ) + server.run() diff --git a/comps/embeddings/langchain-mosec/docker/test-embedding.py b/comps/embeddings/langchain-mosec/docker/test-embedding.py new file mode 100644 index 000000000..0edca7766 --- /dev/null +++ b/comps/embeddings/langchain-mosec/docker/test-embedding.py @@ -0,0 +1,16 @@ +"""OpenAI embedding client example.""" + +from openai import Client + +DEFAULT_MODEL = "/root/bge-large-zh/" +SERVICE_URL="http://127.0.0.1:8000" +INPUT_STR="Hello world!" + +client = Client(api_key="fake", base_url=SERVICE_URL) +emb = client.embeddings.create( + model=DEFAULT_MODEL, + input=INPUT_STR, +) + +print(len(emb.data)) # type: ignore +print(emb.data[0].embedding) # type: ignore diff --git a/comps/embeddings/langchain-mosec/embedding_mosec.py b/comps/embeddings/langchain-mosec/embedding_mosec.py new file mode 100644 index 000000000..876381fd7 --- /dev/null +++ b/comps/embeddings/langchain-mosec/embedding_mosec.py @@ -0,0 +1,76 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +import os +import time + +from typing import List, Optional +from langsmith import traceable +from langchain_community.embeddings import OpenAIEmbeddings + +from comps import ( + EmbedDoc768, + ServiceType, + TextDoc, + opea_microservices, + register_microservice, + register_statistics, + statistics_dict, +) + +class MosecEmbeddings(OpenAIEmbeddings): + def _get_len_safe_embeddings( + self, texts: List[str], *, engine: str, chunk_size: Optional[int] = None + ) -> List[List[float]]: + _chunk_size = chunk_size or self.chunk_size + batched_embeddings: List[List[float]] = [] + response = self.client.create( + input=texts, **self._invocation_params + ) + if not isinstance(response, dict): + response = response.model_dump() + batched_embeddings.extend(r["embedding"] for r in response["data"]) + + _cached_empty_embedding: Optional[List[float]] = None + + def empty_embedding() -> List[float]: + nonlocal _cached_empty_embedding + if _cached_empty_embedding is None: + average_embedded = self.client.create( + input="", **self._invocation_params + ) + if not isinstance(average_embedded, dict): + average_embedded = average_embedded.model_dump() + _cached_empty_embedding = average_embedded["data"][0]["embedding"] + return _cached_empty_embedding + + return [e if e is not None else empty_embedding() for e in batched_embeddings] + +@register_microservice( + name="opea_service@embedding_mosec", + service_type=ServiceType.EMBEDDING, + endpoint="/v1/embeddings", + host="0.0.0.0", + port=6000, + input_datatype=TextDoc, + output_datatype=EmbedDoc768, +) +@traceable(run_type="embedding") +@register_statistics(names=["opea_service@embedding_mosec"]) +def embedding(input: TextDoc) -> EmbedDoc768: + start = time.time() + embed_vector = embeddings.embed_query(input.text) + embed_vector = embed_vector[:768] # Keep only the first 768 elements + res = EmbedDoc768(text=input.text, embedding=embed_vector) + statistics_dict["opea_service@embedding_mosec"].append_latency(time.time() - start, None) + return res + + +if __name__ == "__main__": + MOSEC_API_BASE = os.environ.get("MOSEC_API_BASE", "http://127.0.0.1:8080") + os.environ["OPENAI_API_BASE"] = MOSEC_API_BASE + os.environ["OPENAI_API_KEY"] = "Dummy key" + MODEL_ID="/root/bge-large-zh" + embeddings = MosecEmbeddings(model=MODEL_ID) + print("Mosec Embedding initialized.") + opea_microservices["opea_service@embedding_mosec"].start() diff --git a/comps/embeddings/langchain-mosec/requirements.txt b/comps/embeddings/langchain-mosec/requirements.txt new file mode 100644 index 000000000..65c79959e --- /dev/null +++ b/comps/embeddings/langchain-mosec/requirements.txt @@ -0,0 +1,9 @@ +docarray[full] +fastapi +langchain +langchain_community +openai +opentelemetry-api +opentelemetry-exporter-otlp +opentelemetry-sdk +shortuuid