diff --git a/comps/embeddings/mosec/langchain/embedding_mosec.py b/comps/embeddings/mosec/langchain/embedding_mosec.py index 61a3db7f24..bab254ae4e 100644 --- a/comps/embeddings/mosec/langchain/embedding_mosec.py +++ b/comps/embeddings/mosec/langchain/embedding_mosec.py @@ -57,11 +57,11 @@ def empty_embedding() -> List[float]: output_datatype=EmbedDoc, ) @register_statistics(names=["opea_service@embedding_mosec"]) -def embedding(input: TextDoc) -> EmbedDoc: +async def embedding(input: TextDoc) -> EmbedDoc: if logflag: logger.info(input) start = time.time() - embed_vector = embeddings.embed_query(input.text) + embed_vector = await embeddings.aembed_query(input.text) res = EmbedDoc(text=input.text, embedding=embed_vector) statistics_dict["opea_service@embedding_mosec"].append_latency(time.time() - start, None) if logflag: diff --git a/comps/intent_detection/langchain/intent_detection.py b/comps/intent_detection/langchain/intent_detection.py index 3abd6974ca..8aff7fbdc4 100644 --- a/comps/intent_detection/langchain/intent_detection.py +++ b/comps/intent_detection/langchain/intent_detection.py @@ -16,7 +16,7 @@ host="0.0.0.0", port=9000, ) -def llm_generate(input: LLMParamsDoc): +async def llm_generate(input: LLMParamsDoc): llm_endpoint = os.getenv("TGI_LLM_ENDPOINT", "http://localhost:8080") llm = HuggingFaceEndpoint( endpoint_url=llm_endpoint, @@ -35,7 +35,7 @@ def llm_generate(input: LLMParamsDoc): llm_chain = LLMChain(prompt=prompt, llm=llm) - response = llm_chain.invoke(input.query) + response = await llm_chain.ainvoke(input.query) response = response["text"] print("response", response) return GeneratedDoc(text=response, prompt=input.query) diff --git a/comps/llms/faq-generation/tgi/langchain/llm.py b/comps/llms/faq-generation/tgi/langchain/llm.py index 4d54438c55..81b102f018 100644 --- a/comps/llms/faq-generation/tgi/langchain/llm.py +++ b/comps/llms/faq-generation/tgi/langchain/llm.py @@ -34,10 +34,9 @@ def post_process_text(text: str): host="0.0.0.0", port=9000, ) -def llm_generate(input: LLMParamsDoc): +async def llm_generate(input: LLMParamsDoc): if logflag: logger.info(input) - llm_endpoint = os.getenv("TGI_LLM_ENDPOINT", "http://localhost:8080") llm = HuggingFaceEndpoint( endpoint_url=llm_endpoint, max_new_tokens=input.max_tokens, @@ -54,9 +53,6 @@ def llm_generate(input: LLMParamsDoc): """ PROMPT = PromptTemplate.from_template(templ) llm_chain = load_summarize_chain(llm=llm, prompt=PROMPT) - - # Split text - text_splitter = CharacterTextSplitter() texts = text_splitter.split_text(input.query) # Create multiple documents @@ -77,7 +73,7 @@ async def stream_generator(): return StreamingResponse(stream_generator(), media_type="text/event-stream") else: - response = llm_chain.invoke(docs) + response = await llm_chain.ainvoke(docs) response = response["output_text"] if logflag: logger.info(response) @@ -85,4 +81,7 @@ async def stream_generator(): if __name__ == "__main__": + llm_endpoint = os.getenv("TGI_LLM_ENDPOINT", "http://localhost:8080") + # Split text + text_splitter = CharacterTextSplitter() opea_microservices["opea_service@llm_faqgen"].start() diff --git a/comps/llms/summarization/tgi/langchain/llm.py b/comps/llms/summarization/tgi/langchain/llm.py index ff380ce167..e9f85cb829 100644 --- a/comps/llms/summarization/tgi/langchain/llm.py +++ b/comps/llms/summarization/tgi/langchain/llm.py @@ -33,10 +33,10 @@ def post_process_text(text: str): host="0.0.0.0", port=9000, ) -def llm_generate(input: LLMParamsDoc): +async def llm_generate(input: LLMParamsDoc): if logflag: logger.info(input) - llm_endpoint = os.getenv("TGI_LLM_ENDPOINT", "http://localhost:8080") + llm = HuggingFaceEndpoint( endpoint_url=llm_endpoint, max_new_tokens=input.max_tokens, @@ -48,9 +48,6 @@ def llm_generate(input: LLMParamsDoc): streaming=input.streaming, ) llm_chain = load_summarize_chain(llm=llm, chain_type="map_reduce") - - # Split text - text_splitter = CharacterTextSplitter() texts = text_splitter.split_text(input.query) # Create multiple documents @@ -71,7 +68,7 @@ async def stream_generator(): return StreamingResponse(stream_generator(), media_type="text/event-stream") else: - response = llm_chain.invoke(docs) + response = await llm_chain.ainvoke(docs) response = response["output_text"] if logflag: logger.info(response) @@ -79,4 +76,7 @@ async def stream_generator(): if __name__ == "__main__": + llm_endpoint = os.getenv("TGI_LLM_ENDPOINT", "http://localhost:8080") + # Split text + text_splitter = CharacterTextSplitter() opea_microservices["opea_service@llm_docsum"].start() diff --git a/comps/llms/text-generation/ollama/langchain/llm.py b/comps/llms/text-generation/ollama/langchain/llm.py index 9830cca159..331f80724d 100644 --- a/comps/llms/text-generation/ollama/langchain/llm.py +++ b/comps/llms/text-generation/ollama/langchain/llm.py @@ -19,7 +19,7 @@ host="0.0.0.0", port=9000, ) -def llm_generate(input: LLMParamsDoc): +async def llm_generate(input: LLMParamsDoc): if logflag: logger.info(input) ollama = Ollama( @@ -48,7 +48,7 @@ async def stream_generator(): return StreamingResponse(stream_generator(), media_type="text/event-stream") else: - response = ollama.invoke(input.query) + response = await ollama.ainvoke(input.query) if logflag: logger.info(response) return GeneratedDoc(text=response, prompt=input.query) diff --git a/comps/llms/text-generation/ray_serve/llm.py b/comps/llms/text-generation/ray_serve/llm.py index 1203794cdc..c09bb45848 100644 --- a/comps/llms/text-generation/ray_serve/llm.py +++ b/comps/llms/text-generation/ray_serve/llm.py @@ -38,7 +38,7 @@ def post_process_text(text: str): host="0.0.0.0", port=9000, ) -def llm_generate(input: LLMParamsDoc): +async def llm_generate(input: LLMParamsDoc): llm_endpoint = os.getenv("RAY_Serve_ENDPOINT", "http://localhost:8080") llm_model = os.getenv("LLM_MODEL", "Llama-2-7b-chat-hf") if "/" in llm_model: @@ -73,7 +73,7 @@ async def stream_generator(): return StreamingResponse(stream_generator(), media_type="text/event-stream") else: - response = llm.invoke(input.query) + response = await llm.ainvoke(input.query) response = response.content return GeneratedDoc(text=response, prompt=input.query) diff --git a/comps/llms/text-generation/vllm/langchain/llm.py b/comps/llms/text-generation/vllm/langchain/llm.py index 7160bdb9fc..662653492f 100644 --- a/comps/llms/text-generation/vllm/langchain/llm.py +++ b/comps/llms/text-generation/vllm/langchain/llm.py @@ -48,7 +48,7 @@ def post_process_text(text: str): host="0.0.0.0", port=9000, ) -def llm_generate(input: Union[LLMParamsDoc, ChatCompletionRequest, SearchedDoc]): +async def llm_generate(input: Union[LLMParamsDoc, ChatCompletionRequest, SearchedDoc]): if logflag: logger.info(input) @@ -102,7 +102,7 @@ async def stream_generator(): return StreamingResponse(stream_generator(), media_type="text/event-stream") else: - response = llm.invoke(new_input.query, **parameters) + response = await llm.ainvoke(new_input.query, **parameters) if logflag: logger.info(response) @@ -153,7 +153,7 @@ async def stream_generator(): return StreamingResponse(stream_generator(), media_type="text/event-stream") else: - response = llm.invoke(prompt, **parameters) + response = await llm.ainvoke(prompt, **parameters) if logflag: logger.info(response) diff --git a/comps/llms/text-generation/vllm/llama_index/llm.py b/comps/llms/text-generation/vllm/llama_index/llm.py index 4c3957bae7..55bcec7dc6 100644 --- a/comps/llms/text-generation/vllm/llama_index/llm.py +++ b/comps/llms/text-generation/vllm/llama_index/llm.py @@ -39,7 +39,7 @@ def post_process_text(text: str): host="0.0.0.0", port=9000, ) -def llm_generate(input: LLMParamsDoc): +async def llm_generate(input: LLMParamsDoc): if logflag: logger.info(input) llm_endpoint = os.getenv("vLLM_ENDPOINT", "http://localhost:8008") @@ -56,8 +56,8 @@ def llm_generate(input: LLMParamsDoc): if input.streaming: - def stream_generator(): - for text in llm.stream_complete(input.query): + async def stream_generator(): + async for text in llm.astream_complete(input.query): output = text.text yield f"data: {output}\n\n" if logflag: @@ -66,7 +66,7 @@ def stream_generator(): return StreamingResponse(stream_generator(), media_type="text/event-stream") else: - response = llm.complete(input.query).text + response = await llm.acomplete(input.query).text if logflag: logger.info(response) return GeneratedDoc(text=response, prompt=input.query) diff --git a/comps/llms/text-generation/vllm/ray/llm.py b/comps/llms/text-generation/vllm/ray/llm.py index b11b45fb7f..cd19cb13b3 100644 --- a/comps/llms/text-generation/vllm/ray/llm.py +++ b/comps/llms/text-generation/vllm/ray/llm.py @@ -30,7 +30,7 @@ host="0.0.0.0", port=9000, ) -def llm_generate(input: LLMParamsDoc): +async def llm_generate(input: LLMParamsDoc): if logflag: logger.info(input) llm_endpoint = os.getenv("vLLM_RAY_ENDPOINT", "http://localhost:8006") @@ -50,9 +50,9 @@ def llm_generate(input: LLMParamsDoc): if input.streaming: - def stream_generator(): + async def stream_generator(): chat_response = "" - for text in llm.stream(input.query): + for text in llm.astream(input.query): text = text.content chat_response += text chunk_repr = repr(text.encode("utf-8")) @@ -63,7 +63,7 @@ def stream_generator(): return StreamingResponse(stream_generator(), media_type="text/event-stream") else: - response = llm.invoke(input.query) + response = await llm.ainvoke(input.query) response = response.content if logflag: logger.info(response) diff --git a/comps/lvms/predictionguard/lvm.py b/comps/lvms/predictionguard/lvm.py index 1a452c457c..0c8290be1a 100644 --- a/comps/lvms/predictionguard/lvm.py +++ b/comps/lvms/predictionguard/lvm.py @@ -28,7 +28,7 @@ class LVMDoc(BaseDoc): output_datatype=TextDoc, ) @register_statistics(names=["opea_service@lvm_predictionguard"]) -async def lvm(request: LVMDoc) -> TextDoc: +def lvm(request: LVMDoc) -> TextDoc: start = time.time() # make a request to the Prediction Guard API using the LlaVa model diff --git a/comps/retrievers/milvus/langchain/retriever_milvus.py b/comps/retrievers/milvus/langchain/retriever_milvus.py index fb8fb64b2c..5ab2f34853 100644 --- a/comps/retrievers/milvus/langchain/retriever_milvus.py +++ b/comps/retrievers/milvus/langchain/retriever_milvus.py @@ -67,7 +67,7 @@ def empty_embedding() -> List[float]: port=7000, ) @register_statistics(names=["opea_service@retriever_milvus"]) -def retrieve(input: EmbedDoc) -> SearchedDoc: +async def retrieve(input: EmbedDoc) -> SearchedDoc: if logflag: logger.info(input) vector_db = Milvus( @@ -77,20 +77,20 @@ def retrieve(input: EmbedDoc) -> SearchedDoc: ) start = time.time() if input.search_type == "similarity": - search_res = vector_db.similarity_search_by_vector(embedding=input.embedding, k=input.k) + search_res = await vector_db.asimilarity_search_by_vector(embedding=input.embedding, k=input.k) elif input.search_type == "similarity_distance_threshold": if input.distance_threshold is None: raise ValueError("distance_threshold must be provided for " + "similarity_distance_threshold retriever") - search_res = vector_db.similarity_search_by_vector( + search_res = await vector_db.asimilarity_search_by_vector( embedding=input.embedding, k=input.k, distance_threshold=input.distance_threshold ) elif input.search_type == "similarity_score_threshold": - docs_and_similarities = vector_db.similarity_search_with_relevance_scores( + docs_and_similarities = await vector_db.asimilarity_search_with_relevance_scores( query=input.text, k=input.k, score_threshold=input.score_threshold ) search_res = [doc for doc, _ in docs_and_similarities] elif input.search_type == "mmr": - search_res = vector_db.max_marginal_relevance_search( + search_res = await vector_db.amax_marginal_relevance_search( query=input.text, k=input.k, fetch_k=input.fetch_k, lambda_mult=input.lambda_mult ) searched_docs = [] diff --git a/comps/retrievers/multimodal/redis/langchain/retriever_redis.py b/comps/retrievers/multimodal/redis/langchain/retriever_redis.py index c0d4dadbdf..a01b3e20c4 100644 --- a/comps/retrievers/multimodal/redis/langchain/retriever_redis.py +++ b/comps/retrievers/multimodal/redis/langchain/retriever_redis.py @@ -34,7 +34,7 @@ port=7000, ) @register_statistics(names=["opea_service@multimodal_retriever_redis"]) -def retrieve( +async def retrieve( input: Union[EmbedMultimodalDoc, RetrievalRequest, ChatCompletionRequest] ) -> Union[SearchedMultimodalDoc, RetrievalResponse, ChatCompletionRequest]: @@ -45,20 +45,20 @@ def retrieve( else: # if the Redis index has data, perform the search if input.search_type == "similarity": - search_res = vector_db.similarity_search_by_vector(embedding=input.embedding, k=input.k) + search_res = await vector_db.asimilarity_search_by_vector(embedding=input.embedding, k=input.k) elif input.search_type == "similarity_distance_threshold": if input.distance_threshold is None: raise ValueError("distance_threshold must be provided for " + "similarity_distance_threshold retriever") - search_res = vector_db.similarity_search_by_vector( + search_res = await vector_db.asimilarity_search_by_vector( embedding=input.embedding, k=input.k, distance_threshold=input.distance_threshold ) elif input.search_type == "similarity_score_threshold": - docs_and_similarities = vector_db.similarity_search_with_relevance_scores( + docs_and_similarities = await vector_db.asimilarity_search_with_relevance_scores( query=input.text, k=input.k, score_threshold=input.score_threshold ) search_res = [doc for doc, _ in docs_and_similarities] elif input.search_type == "mmr": - search_res = vector_db.max_marginal_relevance_search( + search_res = await vector_db.amax_marginal_relevance_search( query=input.text, k=input.k, fetch_k=input.fetch_k, lambda_mult=input.lambda_mult ) else: diff --git a/comps/retrievers/neo4j/langchain/retriever_neo4j.py b/comps/retrievers/neo4j/langchain/retriever_neo4j.py index 47ce4a5442..9025ac5a74 100644 --- a/comps/retrievers/neo4j/langchain/retriever_neo4j.py +++ b/comps/retrievers/neo4j/langchain/retriever_neo4j.py @@ -40,7 +40,7 @@ port=7000, ) @register_statistics(names=["opea_service@retriever_neo4j"]) -def retrieve( +async def retrieve( input: Union[EmbedDoc, RetrievalRequest, ChatCompletionRequest] ) -> Union[SearchedDoc, RetrievalResponse, ChatCompletionRequest]: if logflag: @@ -54,20 +54,22 @@ def retrieve( query = input.input if input.search_type == "similarity": - search_res = vector_db.similarity_search_by_vector(embedding=input.embedding, query=input.text, k=input.k) + search_res = await vector_db.asimilarity_search_by_vector( + embedding=input.embedding, query=input.text, k=input.k + ) elif input.search_type == "similarity_distance_threshold": if input.distance_threshold is None: raise ValueError("distance_threshold must be provided for " + "similarity_distance_threshold retriever") - search_res = vector_db.similarity_search_by_vector( + search_res = await vector_db.asimilarity_search_by_vector( embedding=input.embedding, query=input.text, k=input.k, distance_threshold=input.distance_threshold ) elif input.search_type == "similarity_score_threshold": - docs_and_similarities = vector_db.similarity_search_with_relevance_scores( + docs_and_similarities = await vector_db.asimilarity_search_with_relevance_scores( query=input.text, k=input.k, score_threshold=input.score_threshold ) search_res = [doc for doc, _ in docs_and_similarities] elif input.search_type == "mmr": - search_res = vector_db.max_marginal_relevance_search( + search_res = await vector_db.amax_marginal_relevance_search( query=input.text, k=input.k, fetch_k=input.fetch_k, lambda_mult=input.lambda_mult ) else: diff --git a/comps/retrievers/pgvector/langchain/retriever_pgvector.py b/comps/retrievers/pgvector/langchain/retriever_pgvector.py index d33a9f197a..b9db75c1cf 100644 --- a/comps/retrievers/pgvector/langchain/retriever_pgvector.py +++ b/comps/retrievers/pgvector/langchain/retriever_pgvector.py @@ -34,11 +34,11 @@ port=PORT, ) @register_statistics(names=["opea_service@retriever_pgvector"]) -def retrieve(input: EmbedDoc) -> SearchedDoc: +async def retrieve(input: EmbedDoc) -> SearchedDoc: if logflag: logger.info(input) start = time.time() - search_res = vector_db.similarity_search_by_vector(embedding=input.embedding) + search_res = await vector_db.asimilarity_search_by_vector(embedding=input.embedding) searched_docs = [] for r in search_res: searched_docs.append(TextDoc(text=r.page_content)) diff --git a/comps/retrievers/redis/langchain/requirements.txt b/comps/retrievers/redis/langchain/requirements.txt index c68c3d2748..f29317dd98 100644 --- a/comps/retrievers/redis/langchain/requirements.txt +++ b/comps/retrievers/redis/langchain/requirements.txt @@ -2,6 +2,7 @@ docarray[full] easyocr fastapi langchain_community +langchain_huggingface opentelemetry-api opentelemetry-exporter-otlp opentelemetry-sdk diff --git a/comps/retrievers/redis/langchain/retriever_redis.py b/comps/retrievers/redis/langchain/retriever_redis.py index b4c901cb30..d46e792f07 100644 --- a/comps/retrievers/redis/langchain/retriever_redis.py +++ b/comps/retrievers/redis/langchain/retriever_redis.py @@ -5,8 +5,9 @@ import time from typing import Union -from langchain_community.embeddings import HuggingFaceBgeEmbeddings, HuggingFaceHubEmbeddings +from langchain_community.embeddings import HuggingFaceBgeEmbeddings from langchain_community.vectorstores import Redis +from langchain_huggingface import HuggingFaceEndpointEmbeddings from redis_config import EMBED_MODEL, INDEX_NAME, REDIS_URL from comps import ( @@ -41,7 +42,7 @@ port=7000, ) @register_statistics(names=["opea_service@retriever_redis"]) -def retrieve( +async def retrieve( input: Union[EmbedDoc, RetrievalRequest, ChatCompletionRequest] ) -> Union[SearchedDoc, RetrievalResponse, ChatCompletionRequest]: if logflag: @@ -58,20 +59,20 @@ def retrieve( query = input.input # if the Redis index has data, perform the search if input.search_type == "similarity": - search_res = vector_db.similarity_search_by_vector(embedding=input.embedding, k=input.k) + search_res = await vector_db.asimilarity_search_by_vector(embedding=input.embedding, k=input.k) elif input.search_type == "similarity_distance_threshold": if input.distance_threshold is None: raise ValueError("distance_threshold must be provided for " + "similarity_distance_threshold retriever") - search_res = vector_db.similarity_search_by_vector( + search_res = await vector_db.asimilarity_search_by_vector( embedding=input.embedding, k=input.k, distance_threshold=input.distance_threshold ) elif input.search_type == "similarity_score_threshold": - docs_and_similarities = vector_db.similarity_search_with_relevance_scores( + docs_and_similarities = await vector_db.asimilarity_search_with_relevance_scores( query=input.text, k=input.k, score_threshold=input.score_threshold ) search_res = [doc for doc, _ in docs_and_similarities] elif input.search_type == "mmr": - search_res = vector_db.max_marginal_relevance_search( + search_res = await vector_db.amax_marginal_relevance_search( query=input.text, k=input.k, fetch_k=input.fetch_k, lambda_mult=input.lambda_mult ) else: @@ -103,7 +104,7 @@ def retrieve( # Create vectorstore if tei_embedding_endpoint: # create embeddings using TEI endpoint service - embeddings = HuggingFaceHubEmbeddings(model=tei_embedding_endpoint) + embeddings = HuggingFaceEndpointEmbeddings(model=tei_embedding_endpoint) else: # create embeddings using local embedding model embeddings = HuggingFaceBgeEmbeddings(model_name=EMBED_MODEL) diff --git a/comps/retrievers/redis/llama_index/retriever_redis.py b/comps/retrievers/redis/llama_index/retriever_redis.py index 8c20e36c9e..7c74def54a 100644 --- a/comps/retrievers/redis/llama_index/retriever_redis.py +++ b/comps/retrievers/redis/llama_index/retriever_redis.py @@ -22,11 +22,11 @@ host="0.0.0.0", port=7000, ) -def retrieve(input: EmbedDoc) -> SearchedDoc: +async def retrieve(input: EmbedDoc) -> SearchedDoc: if logflag: logger.info(input) vector_store_query = VectorStoreQuery(query_embedding=input.embedding) - search_res = vector_store.query(query=vector_store_query) + search_res = await vector_store.aquery(query=vector_store_query) searched_docs = [] for node, id, similarity in zip(search_res.nodes, search_res.ids, search_res.similarities): searched_docs.append(TextDoc(text=node.get_content())) diff --git a/comps/retrievers/vdms/langchain/retriever_vdms.py b/comps/retrievers/vdms/langchain/retriever_vdms.py index 5eaa29ad65..3d6a0e99e9 100644 --- a/comps/retrievers/vdms/langchain/retriever_vdms.py +++ b/comps/retrievers/vdms/langchain/retriever_vdms.py @@ -4,7 +4,7 @@ import os import time -from langchain_community.embeddings import HuggingFaceBgeEmbeddings, HuggingFaceHubEmbeddings +from langchain_community.embeddings import HuggingFaceBgeEmbeddings from langchain_community.vectorstores.vdms import VDMS, VDMS_Client from langchain_huggingface.embeddings import HuggingFaceEndpointEmbeddings from vdms_config import DEBUG, DISTANCE_STRATEGY, EMBED_MODEL, INDEX_NAME, SEARCH_ENGINE, VDMS_HOST, VDMS_PORT diff --git a/comps/web_retrievers/chroma/langchain/retriever_chroma.py b/comps/web_retrievers/chroma/langchain/retriever_chroma.py index 53d9f8c363..da12c5a702 100644 --- a/comps/web_retrievers/chroma/langchain/retriever_chroma.py +++ b/comps/web_retrievers/chroma/langchain/retriever_chroma.py @@ -63,7 +63,7 @@ def dump_docs(docs): port=7077, ) @register_statistics(names=["opea_service@web_retriever_chroma", "opea_service@search"]) -def web_retrieve(input: EmbedDoc) -> SearchedDoc: +async def web_retrieve(input: EmbedDoc) -> SearchedDoc: if logflag: logger.info(input) start = time.time() @@ -92,7 +92,7 @@ def web_retrieve(input: EmbedDoc) -> SearchedDoc: dump_docs(unique_documents) # Do the retrieval - search_res = vector_db.similarity_search_by_vector(embedding=embedding) + search_res = await vector_db.asimilarity_search_by_vector(embedding=embedding) searched_docs = []