Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reusing AsyncOpenAI client results in openai.APIConnectionError #1059

Closed
1 task done
robert-foley opened this issue Jan 9, 2024 · 17 comments
Closed
1 task done

Reusing AsyncOpenAI client results in openai.APIConnectionError #1059

robert-foley opened this issue Jan 9, 2024 · 17 comments
Assignees
Labels
bug Something isn't working

Comments

@robert-foley
Copy link

robert-foley commented Jan 9, 2024

Confirm this is an issue with the Python library and not an underlying OpenAI API

  • This is an issue with the Python library

Describe the bug

Reusing an instance of AsyncOpenAI client for multiple calls of asyncio.gather results in an openai.APIConnectionError. Retried requests (either via the openai library directly or backoff decorator) succeed, but the first try of the second use of the client always fails.

I suspect that this usage of AsyncOpenAI is not ideal, but the behavior nonetheless feels buggy. Even if the reuse of the client should fail, I'm confused understand why retries succeed.

Bizarrely, in my application all retries after the initial openai.APIConnectionError result in unending openai.APITimeoutError instead of success, but I am unable to repro this outside of the application. However, I strongly suspect that the issue is related as reusing the client solves both the initial error as well as the timeouts.

To Reproduce

  1. Create an instance of AsyncOpenAI with no retries enabled
  2. Use AsyncOpenAI().chat.completions.create to create a list of Future objects (any number will do)
  3. Use asyncio.gather to get the results of the API calls
  4. Executes steps 3+4 again - this causes the error

Code snippets

import asyncio
import openai
from openai import AsyncOpenAI
import httpx
import backoff

print(f"OpenAI version: {openai.__version__}")

OPENAI_API_KEY = "redacted"

api_params = {
    "temperature": 0.2,
    "max_tokens": 500,
    "model": "gpt-3.5-turbo-1106",
}

messages = [{"role": "user", "content": "What is the capital of Quebec?"}]


@backoff.on_exception(
    backoff.expo,
    (
        openai.RateLimitError,
        openai.APIStatusError,
        openai.APIConnectionError,
        openai.APIError,
        openai.APITimeoutError,
        openai.InternalServerError,
    ),
)
async def create_request_retry(client, messages, api_params):
    return await client.chat.completions.create(messages=messages, **api_params)


async def create_request_no_retry(client, messages, api_params):
    return await client.chat.completions.create(messages=messages, **api_params)


# No retries, new client for each set of requests - succeeds
def succeed1():
    for i in range(2):
        client = AsyncOpenAI(
            api_key=OPENAI_API_KEY,
            timeout=10.0,
            http_client=httpx.AsyncClient(limits=httpx.Limits(max_keepalive_connections=500, max_connections=100)),
            max_retries=0,
        )
        arequests = []
        for _ in range(5):
            arequests.append(create_request_no_retry(client, messages, api_params))
        responses = asyncio.run(asyncio.gather(*arequests))
        results = [response.choices[0].message.content for response in responses]
        print(f"{i}: {results}")


# Retry using backoff decorator, reuse client - succeeds
def succeed2():
    client = AsyncOpenAI(
        api_key=OPENAI_API_KEY,
        timeout=10.0,
        http_client=httpx.AsyncClient(limits=httpx.Limits(max_keepalive_connections=500, max_connections=100)),
        max_retries=0,
    )
    for i in range(2):
        arequests = []
        for _ in range(5):
            arequests.append(create_request_retry(client, messages, api_params))
        responses = asyncio.run(asynciogather(*arequests))
        results = [response.choices[0].message.content for response in responses]
        print(f"{i}: {results}")


# Retry using openai library, reuse client - succeeds
def succeed3():
    client = AsyncOpenAI(
        api_key=OPENAI_API_KEY,
        timeout=10.0,
        http_client=httpx.AsyncClient(limits=httpx.Limits(max_keepalive_connections=500, max_connections=100)),
        max_retries=2,
    )
    for i in range(2):
        arequests = []
        for _ in range(5):
            arequests.append(create_request_no_retry(client, messages, api_params))
        responses = asyncio.run(asynciogather(*arequests))
        results = [response.choices[0].message.content for response in responses]
        print(f"{i}: {results}")


# No retries, reuse client - fails
def error():
    client = AsyncOpenAI(
        api_key=OPENAI_API_KEY,
        timeout=10.0,
        http_client=httpx.AsyncClient(limits=httpx.Limits(max_keepalive_connections=500, max_connections=100)),
        max_retries=0,
    )
    for i in range(2):
        arequests = []
        for _ in range(5):
            arequests.append(create_request_no_retry(client, messages, api_params))
        responses = asyncio.run(gather(*arequests))
        results = [response.choices[0].message.content for response in responses]
        print(f"{i}: {results}")

OS

macOS

Python version

Python 3.11.5

Library version

openai v1.6.1

@robert-foley robert-foley added the bug Something isn't working label Jan 9, 2024
@rattrayalex
Copy link
Collaborator

Thanks for the detailed report! @RobertCraigie can you take a look?

@DamianB-BitFlipper
Copy link

I can report the same exact issue. I instantiate a global AsyncOpenAI client. I issue a number of calls async, wait for the response with a gather. Then do the same thing over again and the requests hang forever, never coming back. I have a timeout built into my code. But retries are futile.

@robert-foley
Copy link
Author

@DamianB-BitFlipper I'm relieved to hear I'm not the only one facing the hanging retry/timeout issue. Were you able to get a code snippet to reproduce the hanging retries? Unfortunately I was only able to repro the initial request failure outside of my application.

@DamianB-BitFlipper
Copy link

I'm also unable to replicate this reliably outside of my application... Are you calling the AsyncClient within a Docker container?

@robert-foley
Copy link
Author

@DamianB-BitFlipper I see the timeout issue even when running a CLI version of my application outside of a Docker container

@arnaud-secondlayer
Copy link

Do you need anything to help with this issue? It's blocking us from moving to version 1.0 now, our unit tests fail after a few steps. I reproduce the issue systematically on OSX 14.3.

@rattrayalex
Copy link
Collaborator

@arnaud-secondlayer (or others) can you share your repro?

@rattrayalex
Copy link
Collaborator

rattrayalex commented Feb 3, 2024

I believe that our suspicion is that this relates to encode/httpcore#830 which httpx maintainer @tomchristie is currently working on.

@dumbPy
Copy link

dumbPy commented Feb 5, 2024

Here's a simple pytest based repro from me and @arnaud-secondlayer

import pytest
from langchain_openai.embeddings import OpenAIEmbeddings


@pytest.fixture(scope="module")  # initialized only once in the module
def embeddings():
    return OpenAIEmbeddings(timeout=5)


TEST_STRINGS = [
    "This is a test string.",
    "This is another test string.",
    "This is a third test string.",
    "This is a fourth test string.",
    "This is a fifth test string.",
]


@pytest.mark.parametrize("string", TEST_STRINGS)
async def test_embedding(embeddings: OpenAIEmbeddings, string):
    embedding = await embeddings.aembed_query(string)

@RobertCraigie
Copy link
Collaborator

@dumbPy can you share a full stack trace?

@dumbPy
Copy link

dumbPy commented Feb 5, 2024

Sure, here's what I could grab.

Note that this happens only if I add timeout. without timeout it just waits forever, so there's no stack trace for that.

Stack Trace
) -> ResponseT | _AsyncStreamT:
    cast_to = self._maybe_override_cast_to(cast_to, options)
    await self._prepare_options(options)

    retries = self._remaining_retries(remaining_retries, options)
    request = self._build_request(options)
    await self._prepare_request(request)

    kwargs: HttpxSendArgs = {}
    if self.custom_auth is not None:
        kwargs["auth"] = self.custom_auth

    try:
      response = await self._client.send(
            request,
            stream=stream or self._should_stream_response_body(request=request),
            **kwargs,
        )

../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/openai/_base_client.py:1437:


self = <openai._base_client.AsyncHttpxClientWrapper object at 0x105c08a90>
request = <Request('POST', 'https://api.openai.com/v1/embeddings')>

async def send(
    self,
    request: Request,
    *,
    stream: bool = False,
    auth: typing.Union[AuthTypes, UseClientDefault, None] = USE_CLIENT_DEFAULT,
    follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT,
) -> Response:
    """
    Send a request.

    The request is sent as-is, unmodified.

    Typically you'll want to build one with `AsyncClient.build_request()`
    so that any client-level configuration is merged into the request,
    but passing an explicit `httpx.Request()` is supported as well.

    See also: [Request instances][0]

    [0]: /advanced/#request-instances
    """
    if self._state == ClientState.CLOSED:
        raise RuntimeError("Cannot send a request, as the client has been closed.")

    self._state = ClientState.OPENED
    follow_redirects = (
        self.follow_redirects
        if isinstance(follow_redirects, UseClientDefault)
        else follow_redirects
    )

    auth = self._build_request_auth(request, auth)
  response = await self._send_handling_auth(
        request,
        auth=auth,
        follow_redirects=follow_redirects,
        history=[],
    )

../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/httpx/_client.py:1646:


self = <openai._base_client.AsyncHttpxClientWrapper object at 0x105c08a90>
request = <Request('POST', 'https://api.openai.com/v1/embeddings')>, auth = <httpx.Auth object at 0x1105c7950>
follow_redirects = True, history = []

async def _send_handling_auth(
    self,
    request: Request,
    auth: Auth,
    follow_redirects: bool,
    history: typing.List[Response],
) -> Response:
    auth_flow = auth.async_auth_flow(request)
    try:
        request = await auth_flow.__anext__()

        while True:
          response = await self._send_handling_redirects(
                request,
                follow_redirects=follow_redirects,
                history=history,
            )

../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/httpx/_client.py:1674:


self = <openai._base_client.AsyncHttpxClientWrapper object at 0x105c08a90>
request = <Request('POST', 'https://api.openai.com/v1/embeddings')>, follow_redirects = True, history = []

async def _send_handling_redirects(
    self,
    request: Request,
    follow_redirects: bool,
    history: typing.List[Response],
) -> Response:
    while True:
        if len(history) > self.max_redirects:
            raise TooManyRedirects(
                "Exceeded maximum allowed redirects.", request=request
            )

        for hook in self._event_hooks["request"]:
            await hook(request)
      response = await self._send_single_request(request)

../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/httpx/_client.py:1711:


self = <openai._base_client.AsyncHttpxClientWrapper object at 0x105c08a90>
request = <Request('POST', 'https://api.openai.com/v1/embeddings')>

async def _send_single_request(self, request: Request) -> Response:
    """
    Sends a single request, without handling any redirections.
    """
    transport = self._transport_for_url(request.url)
    timer = Timer()
    await timer.async_start()

    if not isinstance(request.stream, AsyncByteStream):
        raise RuntimeError(
            "Attempted to send an sync request with an AsyncClient instance."
        )

    with request_context(request=request):
      response = await transport.handle_async_request(request)

../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/httpx/_client.py:1748:


self = <httpx.AsyncHTTPTransport object at 0x1100868d0>
request = <Request('POST', 'https://api.openai.com/v1/embeddings')>

async def handle_async_request(
    self,
    request: Request,
) -> Response:
    assert isinstance(request.stream, AsyncByteStream)

    req = httpcore.Request(
        method=request.method,
        url=httpcore.URL(
            scheme=request.url.raw_scheme,
            host=request.url.raw_host,
            port=request.url.port,
            target=request.url.raw_path,
        ),
        headers=request.headers.raw,
        content=request.stream,
        extensions=request.extensions,
    )
    with map_httpcore_exceptions():
      resp = await self._pool.handle_async_request(req)

../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/httpx/_transports/default.py:371:


self = <httpcore.AsyncConnectionPool object at 0x110086890>, request = <Request [b'POST']>

async def handle_async_request(self, request: Request) -> Response:
    """
    Send an HTTP request, and return an HTTP response.

    This is the core implementation that is called into by `.request()` or `.stream()`.
    """
    scheme = request.url.scheme.decode()
    if scheme == "":
        raise UnsupportedProtocol(
            "Request URL is missing an 'http://' or 'https://' protocol."
        )
    if scheme not in ("http", "https", "ws", "wss"):
        raise UnsupportedProtocol(
            f"Request URL has an unsupported protocol '{scheme}://'."
        )

    status = RequestStatus(request)
    timeouts = request.extensions.get("timeout", {})
    timeout = timeouts.get("pool", None)

    if timeout is not None:
        deadline = time.monotonic() + timeout
    else:
        deadline = float("inf")

    async with self._pool_lock:
        self._requests.append(status)
      await self._close_expired_connections()

../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/httpcore/_async/connection_pool.py:234:


self = <httpcore.AsyncConnectionPool object at 0x110086890>

async def _close_expired_connections(self) -> None:
    """
    Clean up the connection pool by closing off any connections that have expired.
    """
    # Close any connections that have expired their keep-alive time.
    for idx, connection in reversed(list(enumerate(self._pool))):
        if connection.has_expired():
          await connection.aclose()

../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/httpcore/_async/connection_pool.py:195:


self = <AsyncHTTPConnection ['https://api.openai.com:443', HTTP/1.1, CLOSED, Request Count: 1]>

async def aclose(self) -> None:
    if self._connection is not None:
        async with Trace("close", logger, None, {}):
          await self._connection.aclose()

../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/httpcore/_async/connection.py:173:


self = <AsyncHTTP11Connection ['https://api.openai.com:443', CLOSED, Request Count: 1]>

async def aclose(self) -> None:
    # Note that this method unilaterally closes the connection, and does
    # not have any kind of locking in place around it.
    self._state = HTTPConnectionState.CLOSED
  await self._network_stream.aclose()

../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/httpcore/_async/http11.py:253:


self = <httpcore._backends.anyio.AnyIOStream object at 0x110057cd0>

async def aclose(self) -> None:
  await self._stream.aclose()

../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/httpcore/_backends/anyio.py:54:


self = TLSStream(transport_stream=<anyio._backends._asyncio.SocketStream object at 0x110a9a6d0>, standard_compatible=False, _...t at 0x1072c4590>, _read_bio=<_ssl.MemoryBIO object at 0x110a9dff0>, _write_bio=<_ssl.MemoryBIO object at 0x110a9dfc0>)

async def aclose(self) -> None:
    if self.standard_compatible:
        try:
            await self.unwrap()
        except BaseException:
            await aclose_forcefully(self.transport_stream)
            raise
  await self.transport_stream.aclose()

../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/anyio/streams/tls.py:193:


self = <anyio._backends._asyncio.SocketStream object at 0x110a9a6d0>

async def aclose(self) -> None:
    if not self._transport.is_closing():
        self._closed = True
        try:
          self._transport.write_eof()

../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/anyio/_backends/_asyncio.py:1257:


???

uvloop/handles/stream.pyx:699:


???
E RuntimeError: unable to perform operation on <TCPTransport closed=True reading=False 0x103d8ba40>; the handler is closed

uvloop/handles/handle.pyx:159: RuntimeError

During handling of the above exception, another exception occurred:

self = <openai.AsyncOpenAI object at 0x1100688d0>
cast_to = <class 'openai.types.create_embedding_response.CreateEmbeddingResponse'>
options = FinalRequestOptions(method='post', url='/embeddings', params={}, headers=NOT_GIVEN, max_retries=NOT_GIVEN, timeout=NOT...28, 374, 264, 18172, 1296, 925, 13]], 'model': 'text-embedding-ada-002', 'encoding_format': 'base64'}, extra_json=None)

async def _request(
    self,
    cast_to: Type[ResponseT],
    options: FinalRequestOptions,
    *,
    stream: bool,
    stream_cls: type[_AsyncStreamT] | None,
    remaining_retries: int | None,
) -> ResponseT | _AsyncStreamT:
    cast_to = self._maybe_override_cast_to(cast_to, options)
    await self._prepare_options(options)

    retries = self._remaining_retries(remaining_retries, options)
    request = self._build_request(options)
    await self._prepare_request(request)

    kwargs: HttpxSendArgs = {}
    if self.custom_auth is not None:
        kwargs["auth"] = self.custom_auth

    try:
      response = await self._client.send(
            request,
            stream=stream or self._should_stream_response_body(request=request),
            **kwargs,
        )

../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/openai/_base_client.py:1437:


self = <openai._base_client.AsyncHttpxClientWrapper object at 0x105c08a90>
request = <Request('POST', 'https://api.openai.com/v1/embeddings')>

async def send(
    self,
    request: Request,
    *,
    stream: bool = False,
    auth: typing.Union[AuthTypes, UseClientDefault, None] = USE_CLIENT_DEFAULT,
    follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT,
) -> Response:
    """
    Send a request.

    The request is sent as-is, unmodified.

    Typically you'll want to build one with `AsyncClient.build_request()`
    so that any client-level configuration is merged into the request,
    but passing an explicit `httpx.Request()` is supported as well.

    See also: [Request instances][0]

    [0]: /advanced/#request-instances
    """
    if self._state == ClientState.CLOSED:
        raise RuntimeError("Cannot send a request, as the client has been closed.")

    self._state = ClientState.OPENED
    follow_redirects = (
        self.follow_redirects
        if isinstance(follow_redirects, UseClientDefault)
        else follow_redirects
    )

    auth = self._build_request_auth(request, auth)
  response = await self._send_handling_auth(
        request,
        auth=auth,
        follow_redirects=follow_redirects,
        history=[],
    )

../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/httpx/_client.py:1646:


self = <openai._base_client.AsyncHttpxClientWrapper object at 0x105c08a90>
request = <Request('POST', 'https://api.openai.com/v1/embeddings')>, auth = <httpx.Auth object at 0x1105c74d0>
follow_redirects = True, history = []

async def _send_handling_auth(
    self,
    request: Request,
    auth: Auth,
    follow_redirects: bool,
    history: typing.List[Response],
) -> Response:
    auth_flow = auth.async_auth_flow(request)
    try:
        request = await auth_flow.__anext__()

        while True:
          response = await self._send_handling_redirects(
                request,
                follow_redirects=follow_redirects,
                history=history,
            )

../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/httpx/_client.py:1674:


self = <openai._base_client.AsyncHttpxClientWrapper object at 0x105c08a90>
request = <Request('POST', 'https://api.openai.com/v1/embeddings')>, follow_redirects = True, history = []

async def _send_handling_redirects(
    self,
    request: Request,
    follow_redirects: bool,
    history: typing.List[Response],
) -> Response:
    while True:
        if len(history) > self.max_redirects:
            raise TooManyRedirects(
                "Exceeded maximum allowed redirects.", request=request
            )

        for hook in self._event_hooks["request"]:
            await hook(request)
      response = await self._send_single_request(request)

../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/httpx/_client.py:1711:


self = <openai._base_client.AsyncHttpxClientWrapper object at 0x105c08a90>
request = <Request('POST', 'https://api.openai.com/v1/embeddings')>

async def _send_single_request(self, request: Request) -> Response:
    """
    Sends a single request, without handling any redirections.
    """
    transport = self._transport_for_url(request.url)
    timer = Timer()
    await timer.async_start()

    if not isinstance(request.stream, AsyncByteStream):
        raise RuntimeError(
            "Attempted to send an sync request with an AsyncClient instance."
        )

    with request_context(request=request):
      response = await transport.handle_async_request(request)

../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/httpx/_client.py:1748:


self = <httpx.AsyncHTTPTransport object at 0x1100868d0>
request = <Request('POST', 'https://api.openai.com/v1/embeddings')>

async def handle_async_request(
    self,
    request: Request,
) -> Response:
    assert isinstance(request.stream, AsyncByteStream)

    req = httpcore.Request(
        method=request.method,
        url=httpcore.URL(
            scheme=request.url.raw_scheme,
            host=request.url.raw_host,
            port=request.url.port,
            target=request.url.raw_path,
        ),
        headers=request.headers.raw,
        content=request.stream,
        extensions=request.extensions,
    )
    with map_httpcore_exceptions():
      resp = await self._pool.handle_async_request(req)

../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/httpx/_transports/default.py:371:


self = <httpcore.AsyncConnectionPool object at 0x110086890>, request = <Request [b'POST']>

async def handle_async_request(self, request: Request) -> Response:
    """
    Send an HTTP request, and return an HTTP response.

    This is the core implementation that is called into by `.request()` or `.stream()`.
    """
    scheme = request.url.scheme.decode()
    if scheme == "":
        raise UnsupportedProtocol(
            "Request URL is missing an 'http://' or 'https://' protocol."
        )
    if scheme not in ("http", "https", "ws", "wss"):
        raise UnsupportedProtocol(
            f"Request URL has an unsupported protocol '{scheme}://'."
        )

    status = RequestStatus(request)
    timeouts = request.extensions.get("timeout", {})
    timeout = timeouts.get("pool", None)

    if timeout is not None:
        deadline = time.monotonic() + timeout
    else:
        deadline = float("inf")

    async with self._pool_lock:
        self._requests.append(status)
      await self._close_expired_connections()

../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/httpcore/_async/connection_pool.py:234:


self = <httpcore.AsyncConnectionPool object at 0x110086890>

async def _close_expired_connections(self) -> None:
    """
    Clean up the connection pool by closing off any connections that have expired.
    """
    # Close any connections that have expired their keep-alive time.
    for idx, connection in reversed(list(enumerate(self._pool))):
        if connection.has_expired():
          await connection.aclose()

../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/httpcore/_async/connection_pool.py:195:


self = <AsyncHTTPConnection ['https://api.openai.com:443', HTTP/1.1, CLOSED, Request Count: 1]>

async def aclose(self) -> None:
    if self._connection is not None:
        async with Trace("close", logger, None, {}):
          await self._connection.aclose()

../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/httpcore/_async/connection.py:173:


self = <AsyncHTTP11Connection ['https://api.openai.com:443', CLOSED, Request Count: 1]>

async def aclose(self) -> None:
    # Note that this method unilaterally closes the connection, and does
    # not have any kind of locking in place around it.
    self._state = HTTPConnectionState.CLOSED
  await self._network_stream.aclose()

../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/httpcore/_async/http11.py:253:


self = <httpcore._backends.anyio.AnyIOStream object at 0x110057cd0>

async def aclose(self) -> None:
  await self._stream.aclose()

../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/httpcore/_backends/anyio.py:54:


self = TLSStream(transport_stream=<anyio._backends._asyncio.SocketStream object at 0x110a9a6d0>, standard_compatible=False, _...t at 0x1072c4590>, _read_bio=<_ssl.MemoryBIO object at 0x110a9dff0>, _write_bio=<_ssl.MemoryBIO object at 0x110a9dfc0>)

async def aclose(self) -> None:
    if self.standard_compatible:
        try:
            await self.unwrap()
        except BaseException:
            await aclose_forcefully(self.transport_stream)
            raise
  await self.transport_stream.aclose()

../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/anyio/streams/tls.py:193:


self = <anyio._backends._asyncio.SocketStream object at 0x110a9a6d0>

async def aclose(self) -> None:
    if not self._transport.is_closing():
        self._closed = True
        try:
          self._transport.write_eof()

../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/anyio/_backends/_asyncio.py:1257:


???

uvloop/handles/stream.pyx:699:


???
E RuntimeError: unable to perform operation on <TCPTransport closed=True reading=False 0x103d8ba40>; the handler is closed

uvloop/handles/handle.pyx:159: RuntimeError

During handling of the above exception, another exception occurred:

self = <openai.AsyncOpenAI object at 0x1100688d0>
cast_to = <class 'openai.types.create_embedding_response.CreateEmbeddingResponse'>
options = FinalRequestOptions(method='post', url='/embeddings', params={}, headers=NOT_GIVEN, max_retries=NOT_GIVEN, timeout=NOT...28, 374, 264, 18172, 1296, 925, 13]], 'model': 'text-embedding-ada-002', 'encoding_format': 'base64'}, extra_json=None)

async def _request(
    self,
    cast_to: Type[ResponseT],
    options: FinalRequestOptions,
    *,
    stream: bool,
    stream_cls: type[_AsyncStreamT] | None,
    remaining_retries: int | None,
) -> ResponseT | _AsyncStreamT:
    cast_to = self._maybe_override_cast_to(cast_to, options)
    await self._prepare_options(options)

    retries = self._remaining_retries(remaining_retries, options)
    request = self._build_request(options)
    await self._prepare_request(request)

    kwargs: HttpxSendArgs = {}
    if self.custom_auth is not None:
        kwargs["auth"] = self.custom_auth

    try:
      response = await self._client.send(
            request,
            stream=stream or self._should_stream_response_body(request=request),
            **kwargs,
        )

../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/openai/_base_client.py:1437:


self = <openai._base_client.AsyncHttpxClientWrapper object at 0x105c08a90>
request = <Request('POST', 'https://api.openai.com/v1/embeddings')>

async def send(
    self,
    request: Request,
    *,
    stream: bool = False,
    auth: typing.Union[AuthTypes, UseClientDefault, None] = USE_CLIENT_DEFAULT,
    follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT,
) -> Response:
    """
    Send a request.

    The request is sent as-is, unmodified.

    Typically you'll want to build one with `AsyncClient.build_request()`
    so that any client-level configuration is merged into the request,
    but passing an explicit `httpx.Request()` is supported as well.

    See also: [Request instances][0]

    [0]: /advanced/#request-instances
    """
    if self._state == ClientState.CLOSED:
        raise RuntimeError("Cannot send a request, as the client has been closed.")

    self._state = ClientState.OPENED
    follow_redirects = (
        self.follow_redirects
        if isinstance(follow_redirects, UseClientDefault)
        else follow_redirects
    )

    auth = self._build_request_auth(request, auth)
  response = await self._send_handling_auth(
        request,
        auth=auth,
        follow_redirects=follow_redirects,
        history=[],
    )

../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/httpx/_client.py:1646:


self = <openai._base_client.AsyncHttpxClientWrapper object at 0x105c08a90>
request = <Request('POST', 'https://api.openai.com/v1/embeddings')>, auth = <httpx.Auth object at 0x11332ec90>
follow_redirects = True, history = []

async def _send_handling_auth(
    self,
    request: Request,
    auth: Auth,
    follow_redirects: bool,
    history: typing.List[Response],
) -> Response:
    auth_flow = auth.async_auth_flow(request)
    try:
        request = await auth_flow.__anext__()

        while True:
          response = await self._send_handling_redirects(
                request,
                follow_redirects=follow_redirects,
                history=history,
            )

../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/httpx/_client.py:1674:


self = <openai._base_client.AsyncHttpxClientWrapper object at 0x105c08a90>
request = <Request('POST', 'https://api.openai.com/v1/embeddings')>, follow_redirects = True, history = []

async def _send_handling_redirects(
    self,
    request: Request,
    follow_redirects: bool,
    history: typing.List[Response],
) -> Response:
    while True:
        if len(history) > self.max_redirects:
            raise TooManyRedirects(
                "Exceeded maximum allowed redirects.", request=request
            )

        for hook in self._event_hooks["request"]:
            await hook(request)
      response = await self._send_single_request(request)

../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/httpx/_client.py:1711:


self = <openai._base_client.AsyncHttpxClientWrapper object at 0x105c08a90>
request = <Request('POST', 'https://api.openai.com/v1/embeddings')>

async def _send_single_request(self, request: Request) -> Response:
    """
    Sends a single request, without handling any redirections.
    """
    transport = self._transport_for_url(request.url)
    timer = Timer()
    await timer.async_start()

    if not isinstance(request.stream, AsyncByteStream):
        raise RuntimeError(
            "Attempted to send an sync request with an AsyncClient instance."
        )

    with request_context(request=request):
      response = await transport.handle_async_request(request)

../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/httpx/_client.py:1748:


self = <httpx.AsyncHTTPTransport object at 0x1100868d0>
request = <Request('POST', 'https://api.openai.com/v1/embeddings')>

async def handle_async_request(
    self,
    request: Request,
) -> Response:
    assert isinstance(request.stream, AsyncByteStream)

    req = httpcore.Request(
        method=request.method,
        url=httpcore.URL(
            scheme=request.url.raw_scheme,
            host=request.url.raw_host,
            port=request.url.port,
            target=request.url.raw_path,
        ),
        headers=request.headers.raw,
        content=request.stream,
        extensions=request.extensions,
    )
    with map_httpcore_exceptions():
      resp = await self._pool.handle_async_request(req)

../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/httpx/_transports/default.py:371:


self = <httpcore.AsyncConnectionPool object at 0x110086890>, request = <Request [b'POST']>

async def handle_async_request(self, request: Request) -> Response:
    """
    Send an HTTP request, and return an HTTP response.

    This is the core implementation that is called into by `.request()` or `.stream()`.
    """
    scheme = request.url.scheme.decode()
    if scheme == "":
        raise UnsupportedProtocol(
            "Request URL is missing an 'http://' or 'https://' protocol."
        )
    if scheme not in ("http", "https", "ws", "wss"):
        raise UnsupportedProtocol(
            f"Request URL has an unsupported protocol '{scheme}://'."
        )

    status = RequestStatus(request)
    timeouts = request.extensions.get("timeout", {})
    timeout = timeouts.get("pool", None)

    if timeout is not None:
        deadline = time.monotonic() + timeout
    else:
        deadline = float("inf")

    async with self._pool_lock:
        self._requests.append(status)
      await self._close_expired_connections()

../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/httpcore/_async/connection_pool.py:234:


self = <httpcore.AsyncConnectionPool object at 0x110086890>

async def _close_expired_connections(self) -> None:
    """
    Clean up the connection pool by closing off any connections that have expired.
    """
    # Close any connections that have expired their keep-alive time.
    for idx, connection in reversed(list(enumerate(self._pool))):
        if connection.has_expired():
          await connection.aclose()

../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/httpcore/_async/connection_pool.py:195:


self = <AsyncHTTPConnection ['https://api.openai.com:443', HTTP/1.1, CLOSED, Request Count: 1]>

async def aclose(self) -> None:
    if self._connection is not None:
        async with Trace("close", logger, None, {}):
          await self._connection.aclose()

../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/httpcore/_async/connection.py:173:


self = <AsyncHTTP11Connection ['https://api.openai.com:443', CLOSED, Request Count: 1]>

async def aclose(self) -> None:
    # Note that this method unilaterally closes the connection, and does
    # not have any kind of locking in place around it.
    self._state = HTTPConnectionState.CLOSED
  await self._network_stream.aclose()

../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/httpcore/_async/http11.py:253:


self = <httpcore._backends.anyio.AnyIOStream object at 0x110057cd0>

async def aclose(self) -> None:
  await self._stream.aclose()

../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/httpcore/_backends/anyio.py:54:


self = TLSStream(transport_stream=<anyio._backends._asyncio.SocketStream object at 0x110a9a6d0>, standard_compatible=False, _...t at 0x1072c4590>, _read_bio=<_ssl.MemoryBIO object at 0x110a9dff0>, _write_bio=<_ssl.MemoryBIO object at 0x110a9dfc0>)

async def aclose(self) -> None:
    if self.standard_compatible:
        try:
            await self.unwrap()
        except BaseException:
            await aclose_forcefully(self.transport_stream)
            raise
  await self.transport_stream.aclose()

../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/anyio/streams/tls.py:193:


self = <anyio._backends._asyncio.SocketStream object at 0x110a9a6d0>

async def aclose(self) -> None:
    if not self._transport.is_closing():
        self._closed = True
        try:
          self._transport.write_eof()

../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/anyio/_backends/_asyncio.py:1257:


???

uvloop/handles/stream.pyx:699:


???
E RuntimeError: unable to perform operation on <TCPTransport closed=True reading=False 0x103d8ba40>; the handler is closed

uvloop/handles/handle.pyx:159: RuntimeError

The above exception was the direct cause of the following exception:

embeddings = OpenAIEmbeddings(client=<openai.resources.embeddings.Embeddings object at 0x11006aa50>, async_client=<openai.resources...kip_empty=False, default_headers=None, default_query=None, retry_min_seconds=4, retry_max_seconds=20, http_client=None)
string = 'This is a fifth test string.'

@pytest.mark.parametrize("string", TEST_STRINGS)
async def test_embedding(embeddings: OpenAIEmbeddings, string):
  embedding = await embeddings.aembed_query(string)

pkg_tests/test_openai_embeddings.py:21:


../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/langchain_openai/embeddings/base.py:522: in aembed_query
embeddings = await self.aembed_documents([text])
../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/langchain_openai/embeddings/base.py:500: in aembed_documents
return await self._aget_len_safe_embeddings(texts, engine=engine)
../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/langchain_openai/embeddings/base.py:436: in _aget_len_safe_embeddings
response = await self.async_client.create(
../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/openai/resources/embeddings.py:214: in create
return await self._post(
../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/openai/_base_client.py:1705: in post
return await self.request(cast_to, opts, stream=stream, stream_cls=stream_cls)
../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/openai/_base_client.py:1408: in request
return await self._request(
../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/openai/_base_client.py:1461: in _request
return await self._retry_request(
../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/openai/_base_client.py:1530: in _retry_request
return await self._request(
../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/openai/_base_client.py:1461: in _request
return await self._retry_request(
../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/openai/_base_client.py:1530: in _retry_request
return await self._request(


self = <openai.AsyncOpenAI object at 0x1100688d0>
cast_to = <class 'openai.types.create_embedding_response.CreateEmbeddingResponse'>
options = FinalRequestOptions(method='post', url='/embeddings', params={}, headers=NOT_GIVEN, max_retries=NOT_GIVEN, timeout=NOT...28, 374, 264, 18172, 1296, 925, 13]], 'model': 'text-embedding-ada-002', 'encoding_format': 'base64'}, extra_json=None)

async def _request(
    self,
    cast_to: Type[ResponseT],
    options: FinalRequestOptions,
    *,
    stream: bool,
    stream_cls: type[_AsyncStreamT] | None,
    remaining_retries: int | None,
) -> ResponseT | _AsyncStreamT:
    cast_to = self._maybe_override_cast_to(cast_to, options)
    await self._prepare_options(options)

    retries = self._remaining_retries(remaining_retries, options)
    request = self._build_request(options)
    await self._prepare_request(request)

    kwargs: HttpxSendArgs = {}
    if self.custom_auth is not None:
        kwargs["auth"] = self.custom_auth

    try:
        response = await self._client.send(
            request,
            stream=stream or self._should_stream_response_body(request=request),
            **kwargs,
        )
    except httpx.TimeoutException as err:
        log.debug("Encountered httpx.TimeoutException", exc_info=True)

        if retries > 0:
            return await self._retry_request(
                options,
                cast_to,
                retries,
                stream=stream,
                stream_cls=stream_cls,
                response_headers=None,
            )

        log.debug("Raising timeout error")
        raise APITimeoutError(request=request) from err
    except Exception as err:
        log.debug("Encountered Exception", exc_info=True)

        if retries > 0:
            return await self._retry_request(
                options,
                cast_to,
                retries,
                stream=stream,
                stream_cls=stream_cls,
                response_headers=None,
            )

        log.debug("Raising connection error")
      raise APIConnectionError(request=request) from err

E openai.APIConnectionError: Connection error.

../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/openai/_base_client.py:1471: APIConnectionError
============================================ short test summary info ============================================
FAILED pkg_tests/test_openai_embeddings.py::test_embedding[asyncio-This is another test string.] - openai.APIConnectionError: Connection error.
FAILED pkg_tests/test_openai_embeddings.py::test_embedding[asyncio-This is a third test string.] - openai.APIConnectionError: Connection error.
FAILED pkg_tests/test_openai_embeddings.py::test_embedding[asyncio-This is a fourth test string.] - openai.APIConnectionError: Connection error.
FAILED pkg_tests/test_openai_embeddings.py::test_embedding[asyncio-This is a fifth test string.] - openai.APIConnectionError: Connection error.
========================================= 4 failed, 1 passed in 17.26s ==========================================

╭─ ~/secondlayerco/secondlayer/nlp-cloud-server sufiyan/auto…-adjustments *5 !10 ?10   1 ✘ 17s nlp-test Py
╰─ python3 -m pytest pkg_tests/test_openai_embeddings.py
============================================== test session starts ==============================================
platform darwin -- Python 3.11.0, pytest-7.3.1, pluggy-1.4.0
rootdir: /Users/sufiyan/secondlayerco/secondlayer/nlp-cloud-server
plugins: aio-1.5.0, anyio-3.7.1
collected 5 items

pkg_tests/test_openai_embeddings.py ..... [100%]

=============================================== warnings summary ================================================
pkg_tests/test_openai_embeddings.py::test_embedding[asyncio-This is a test string.]
pkg_tests/test_openai_embeddings.py::test_embedding[asyncio-This is another test string.]
pkg_tests/test_openai_embeddings.py::test_embedding[asyncio-This is a third test string.]
pkg_tests/test_openai_embeddings.py::test_embedding[asyncio-This is a fourth test string.]
pkg_tests/test_openai_embeddings.py::test_embedding[asyncio-This is a fifth test string.]
/Users/sufiyan/micromamba/envs/nlp-test/lib/python3.11/site-packages/pytest_aio/plugin.py:62: RuntimeWarning: coroutine 'AsyncEmbeddings.create' was never awaited
runner.run(testfunc, **kwargs)
Enable tracemalloc to get traceback where the object was allocated.
See https://docs.pytest.org/en/stable/how-to/capture-warnings.html#resource-warnings for more info.

-- Docs: https://docs.pytest.org/en/stable/how-to/capture-warnings.html
========================================= 5 passed, 5 warnings in 0.29s =========================================

@RobertCraigie
Copy link
Collaborator

RobertCraigie commented Feb 5, 2024

This appears to be because of your pytest setup, can you reproduce without using pytest?

@dumbPy
Copy link

dumbPy commented Feb 5, 2024

@RobertCraigie may be the pytest expedites the underlying issue (as the first reported issue isn't pytest related).

It works fine without pytest for my above simple example with just 5 concurrent requests
import asyncio
from openai import AsyncOpenAI

TEST_STRINGS = [
    "This is a test string.",
    "This is another test string.",
    "This is a third test string.",
    "This is a fourth test string.",
    "This is a fifth test string.",
]


client = AsyncOpenAI()

async def get_embedding(string):
    embedding = await client.embeddings.create(input=string, model="text-embedding-ada-002")
    print("Embedding received for string:", string)

async def main():
    return await asyncio.gather(*[get_embedding(string) for string in TEST_STRINGS])

asyncio.run(main())
But fails when ran inside pytest
import pytest
from openai import AsyncOpenAI

TEST_STRINGS = [
    "This is a test string.",
    "This is another test string.",
    "This is a third test string.",
    "This is a fourth test string.",
    "This is a fifth test string.",
]


@pytest.fixture(scope="module")  # initialized only once in the module
def client():
    return AsyncOpenAI()


@pytest.mark.parametrize("string", TEST_STRINGS)
async def test_embedding(client: AsyncOpenAI, string):
    return await client.embeddings.create(input=string, model="text-embedding-ada-002")
with stack trace
        self,
        cast_to: Type[ResponseT],
        options: FinalRequestOptions,
        *,
        stream: bool,
        stream_cls: type[_AsyncStreamT] | None,
        remaining_retries: int | None,
    ) -> ResponseT | _AsyncStreamT:
        cast_to = self._maybe_override_cast_to(cast_to, options)
        await self._prepare_options(options)
    
        retries = self._remaining_retries(remaining_retries, options)
        request = self._build_request(options)
        await self._prepare_request(request)
    
        kwargs: HttpxSendArgs = {}
        if self.custom_auth is not None:
            kwargs["auth"] = self.custom_auth
    
        try:
>           response = await self._client.send(
                request,
                stream=stream or self._should_stream_response_body(request=request),
                **kwargs,
            )

../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/openai/_base_client.py:1437: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <openai._base_client.AsyncHttpxClientWrapper object at 0x1024e2e90>
request = <Request('POST', 'https://api.openai.com/v1/embeddings')>

    async def send(
        self,
        request: Request,
        *,
        stream: bool = False,
        auth: typing.Union[AuthTypes, UseClientDefault, None] = USE_CLIENT_DEFAULT,
        follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT,
    ) -> Response:
        """
        Send a request.
    
        The request is sent as-is, unmodified.
    
        Typically you'll want to build one with `AsyncClient.build_request()`
        so that any client-level configuration is merged into the request,
        but passing an explicit `httpx.Request()` is supported as well.
    
        See also: [Request instances][0]
    
        [0]: /advanced/#request-instances
        """
        if self._state == ClientState.CLOSED:
            raise RuntimeError("Cannot send a request, as the client has been closed.")
    
        self._state = ClientState.OPENED
        follow_redirects = (
            self.follow_redirects
            if isinstance(follow_redirects, UseClientDefault)
            else follow_redirects
        )
    
        auth = self._build_request_auth(request, auth)
    
>       response = await self._send_handling_auth(
            request,
            auth=auth,
            follow_redirects=follow_redirects,
            history=[],
        )

../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/httpx/_client.py:1646: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <openai._base_client.AsyncHttpxClientWrapper object at 0x1024e2e90>
request = <Request('POST', 'https://api.openai.com/v1/embeddings')>
auth = <httpx.Auth object at 0x109732fd0>
follow_redirects = True, history = []

    async def _send_handling_auth(
        self,
        request: Request,
        auth: Auth,
        follow_redirects: bool,
        history: typing.List[Response],
    ) -> Response:
        auth_flow = auth.async_auth_flow(request)
        try:
            request = await auth_flow.__anext__()
    
            while True:
>               response = await self._send_handling_redirects(
                    request,
                    follow_redirects=follow_redirects,
                    history=history,
                )

../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/httpx/_client.py:1674: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <openai._base_client.AsyncHttpxClientWrapper object at 0x1024e2e90>
request = <Request('POST', 'https://api.openai.com/v1/embeddings')>
follow_redirects = True, history = []

    async def _send_handling_redirects(
        self,
        request: Request,
        follow_redirects: bool,
        history: typing.List[Response],
    ) -> Response:
        while True:
            if len(history) > self.max_redirects:
                raise TooManyRedirects(
                    "Exceeded maximum allowed redirects.", request=request
                )
    
            for hook in self._event_hooks["request"]:
                await hook(request)
    
>           response = await self._send_single_request(request)

../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/httpx/_client.py:1711: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <openai._base_client.AsyncHttpxClientWrapper object at 0x1024e2e90>
request = <Request('POST', 'https://api.openai.com/v1/embeddings')>

    async def _send_single_request(self, request: Request) -> Response:
        """
        Sends a single request, without handling any redirections.
        """
        transport = self._transport_for_url(request.url)
        timer = Timer()
        await timer.async_start()
    
        if not isinstance(request.stream, AsyncByteStream):
            raise RuntimeError(
                "Attempted to send an sync request with an AsyncClient instance."
            )
    
        with request_context(request=request):
>           response = await transport.handle_async_request(request)

../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/httpx/_client.py:1748: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <httpx.AsyncHTTPTransport object at 0x10320eb50>
request = <Request('POST', 'https://api.openai.com/v1/embeddings')>

    async def handle_async_request(
        self,
        request: Request,
    ) -> Response:
        assert isinstance(request.stream, AsyncByteStream)
    
        req = httpcore.Request(
            method=request.method,
            url=httpcore.URL(
                scheme=request.url.raw_scheme,
                host=request.url.raw_host,
                port=request.url.port,
                target=request.url.raw_path,
            ),
            headers=request.headers.raw,
            content=request.stream,
            extensions=request.extensions,
        )
        with map_httpcore_exceptions():
>           resp = await self._pool.handle_async_request(req)

../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/httpx/_transports/default.py:371: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <httpcore.AsyncConnectionPool object at 0x1031fff50>
request = <Request [b'POST']>

    async def handle_async_request(self, request: Request) -> Response:
        """
        Send an HTTP request, and return an HTTP response.
    
        This is the core implementation that is called into by `.request()` or `.stream()`.
        """
        scheme = request.url.scheme.decode()
        if scheme == "":
            raise UnsupportedProtocol(
                "Request URL is missing an 'http://' or 'https://' protocol."
            )
        if scheme not in ("http", "https", "ws", "wss"):
            raise UnsupportedProtocol(
                f"Request URL has an unsupported protocol '{scheme}://'."
            )
    
        status = RequestStatus(request)
        timeouts = request.extensions.get("timeout", {})
        timeout = timeouts.get("pool", None)
    
        if timeout is not None:
            deadline = time.monotonic() + timeout
        else:
            deadline = float("inf")
    
        async with self._pool_lock:
            self._requests.append(status)
>           await self._close_expired_connections()

../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/httpcore/_async/connection_pool.py:234: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <httpcore.AsyncConnectionPool object at 0x1031fff50>

    async def _close_expired_connections(self) -> None:
        """
        Clean up the connection pool by closing off any connections that have expired.
        """
        # Close any connections that have expired their keep-alive time.
        for idx, connection in reversed(list(enumerate(self._pool))):
            if connection.has_expired():
>               await connection.aclose()

../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/httpcore/_async/connection_pool.py:195: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <AsyncHTTPConnection ['https://api.openai.com:443', HTTP/1.1, CLOSED, Request Count: 1]>

    async def aclose(self) -> None:
        if self._connection is not None:
            async with Trace("close", logger, None, {}):
>               await self._connection.aclose()

../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/httpcore/_async/connection.py:173: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <AsyncHTTP11Connection ['https://api.openai.com:443', CLOSED, Request Count: 1]>

    async def aclose(self) -> None:
        # Note that this method unilaterally closes the connection, and does
        # not have any kind of locking in place around it.
        self._state = HTTPConnectionState.CLOSED
>       await self._network_stream.aclose()

../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/httpcore/_async/http11.py:253: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <httpcore._backends.anyio.AnyIOStream object at 0x1035dcad0>

    async def aclose(self) -> None:
>       await self._stream.aclose()

../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/httpcore/_backends/anyio.py:54: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = TLSStream(transport_stream=<anyio._backends._asyncio.SocketStream object at 0x103481210>, standard_compatible=False, _...t at 0x10320e3d0>, _read_bio=<_ssl.MemoryBIO object at 0x10946e200>, _write_bio=<_ssl.MemoryBIO object at 0x10946e230>)

    async def aclose(self) -> None:
        if self.standard_compatible:
            try:
                await self.unwrap()
            except BaseException:
                await aclose_forcefully(self.transport_stream)
                raise
    
>       await self.transport_stream.aclose()

../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/anyio/streams/tls.py:193: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <anyio._backends._asyncio.SocketStream object at 0x103481210>

    async def aclose(self) -> None:
        if not self._transport.is_closing():
            self._closed = True
            try:
>               self._transport.write_eof()

../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/anyio/_backends/_asyncio.py:1257: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

>   ???

uvloop/handles/stream.pyx:699: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

>   ???
E   RuntimeError: unable to perform operation on <TCPTransport closed=True reading=False 0x1300201e0>; the handler is closed

uvloop/handles/handle.pyx:159: RuntimeError

During handling of the above exception, another exception occurred:

self = <openai.AsyncOpenAI object at 0x10320e2d0>
cast_to = <class 'openai.types.create_embedding_response.CreateEmbeddingResponse'>
options = FinalRequestOptions(method='post', url='/embeddings', params={}, headers=NOT_GIVEN, max_retries=NOT_GIVEN, timeout=NOT...put': 'This is a fifth test string.', 'model': 'text-embedding-ada-002', 'encoding_format': 'base64'}, extra_json=None)

    async def _request(
        self,
        cast_to: Type[ResponseT],
        options: FinalRequestOptions,
        *,
        stream: bool,
        stream_cls: type[_AsyncStreamT] | None,
        remaining_retries: int | None,
    ) -> ResponseT | _AsyncStreamT:
        cast_to = self._maybe_override_cast_to(cast_to, options)
        await self._prepare_options(options)
    
        retries = self._remaining_retries(remaining_retries, options)
        request = self._build_request(options)
        await self._prepare_request(request)
    
        kwargs: HttpxSendArgs = {}
        if self.custom_auth is not None:
            kwargs["auth"] = self.custom_auth
    
        try:
>           response = await self._client.send(
                request,
                stream=stream or self._should_stream_response_body(request=request),
                **kwargs,
            )

../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/openai/_base_client.py:1437: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <openai._base_client.AsyncHttpxClientWrapper object at 0x1024e2e90>
request = <Request('POST', 'https://api.openai.com/v1/embeddings')>

    async def send(
        self,
        request: Request,
        *,
        stream: bool = False,
        auth: typing.Union[AuthTypes, UseClientDefault, None] = USE_CLIENT_DEFAULT,
        follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT,
    ) -> Response:
        """
        Send a request.
    
        The request is sent as-is, unmodified.
    
        Typically you'll want to build one with `AsyncClient.build_request()`
        so that any client-level configuration is merged into the request,
        but passing an explicit `httpx.Request()` is supported as well.
    
        See also: [Request instances][0]
    
        [0]: /advanced/#request-instances
        """
        if self._state == ClientState.CLOSED:
            raise RuntimeError("Cannot send a request, as the client has been closed.")
    
        self._state = ClientState.OPENED
        follow_redirects = (
            self.follow_redirects
            if isinstance(follow_redirects, UseClientDefault)
            else follow_redirects
        )
    
        auth = self._build_request_auth(request, auth)
    
>       response = await self._send_handling_auth(
            request,
            auth=auth,
            follow_redirects=follow_redirects,
            history=[],
        )

../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/httpx/_client.py:1646: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <openai._base_client.AsyncHttpxClientWrapper object at 0x1024e2e90>
request = <Request('POST', 'https://api.openai.com/v1/embeddings')>
auth = <httpx.Auth object at 0x109faf590>
follow_redirects = True, history = []

    async def _send_handling_auth(
        self,
        request: Request,
        auth: Auth,
        follow_redirects: bool,
        history: typing.List[Response],
    ) -> Response:
        auth_flow = auth.async_auth_flow(request)
        try:
            request = await auth_flow.__anext__()
    
            while True:
>               response = await self._send_handling_redirects(
                    request,
                    follow_redirects=follow_redirects,
                    history=history,
                )

../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/httpx/_client.py:1674: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <openai._base_client.AsyncHttpxClientWrapper object at 0x1024e2e90>
request = <Request('POST', 'https://api.openai.com/v1/embeddings')>
follow_redirects = True, history = []

    async def _send_handling_redirects(
        self,
        request: Request,
        follow_redirects: bool,
        history: typing.List[Response],
    ) -> Response:
        while True:
            if len(history) > self.max_redirects:
                raise TooManyRedirects(
                    "Exceeded maximum allowed redirects.", request=request
                )
    
            for hook in self._event_hooks["request"]:
                await hook(request)
    
>           response = await self._send_single_request(request)

../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/httpx/_client.py:1711: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <openai._base_client.AsyncHttpxClientWrapper object at 0x1024e2e90>
request = <Request('POST', 'https://api.openai.com/v1/embeddings')>

    async def _send_single_request(self, request: Request) -> Response:
        """
        Sends a single request, without handling any redirections.
        """
        transport = self._transport_for_url(request.url)
        timer = Timer()
        await timer.async_start()
    
        if not isinstance(request.stream, AsyncByteStream):
            raise RuntimeError(
                "Attempted to send an sync request with an AsyncClient instance."
            )
    
        with request_context(request=request):
>           response = await transport.handle_async_request(request)

../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/httpx/_client.py:1748: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <httpx.AsyncHTTPTransport object at 0x10320eb50>
request = <Request('POST', 'https://api.openai.com/v1/embeddings')>

    async def handle_async_request(
        self,
        request: Request,
    ) -> Response:
        assert isinstance(request.stream, AsyncByteStream)
    
        req = httpcore.Request(
            method=request.method,
            url=httpcore.URL(
                scheme=request.url.raw_scheme,
                host=request.url.raw_host,
                port=request.url.port,
                target=request.url.raw_path,
            ),
            headers=request.headers.raw,
            content=request.stream,
            extensions=request.extensions,
        )
        with map_httpcore_exceptions():
>           resp = await self._pool.handle_async_request(req)

../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/httpx/_transports/default.py:371: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <httpcore.AsyncConnectionPool object at 0x1031fff50>
request = <Request [b'POST']>

    async def handle_async_request(self, request: Request) -> Response:
        """
        Send an HTTP request, and return an HTTP response.
    
        This is the core implementation that is called into by `.request()` or `.stream()`.
        """
        scheme = request.url.scheme.decode()
        if scheme == "":
            raise UnsupportedProtocol(
                "Request URL is missing an 'http://' or 'https://' protocol."
            )
        if scheme not in ("http", "https", "ws", "wss"):
            raise UnsupportedProtocol(
                f"Request URL has an unsupported protocol '{scheme}://'."
            )
    
        status = RequestStatus(request)
        timeouts = request.extensions.get("timeout", {})
        timeout = timeouts.get("pool", None)
    
        if timeout is not None:
            deadline = time.monotonic() + timeout
        else:
            deadline = float("inf")
    
        async with self._pool_lock:
            self._requests.append(status)
>           await self._close_expired_connections()

../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/httpcore/_async/connection_pool.py:234: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <httpcore.AsyncConnectionPool object at 0x1031fff50>

    async def _close_expired_connections(self) -> None:
        """
        Clean up the connection pool by closing off any connections that have expired.
        """
        # Close any connections that have expired their keep-alive time.
        for idx, connection in reversed(list(enumerate(self._pool))):
            if connection.has_expired():
>               await connection.aclose()

../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/httpcore/_async/connection_pool.py:195: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <AsyncHTTPConnection ['https://api.openai.com:443', HTTP/1.1, CLOSED, Request Count: 1]>

    async def aclose(self) -> None:
        if self._connection is not None:
            async with Trace("close", logger, None, {}):
>               await self._connection.aclose()

../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/httpcore/_async/connection.py:173: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <AsyncHTTP11Connection ['https://api.openai.com:443', CLOSED, Request Count: 1]>

    async def aclose(self) -> None:
        # Note that this method unilaterally closes the connection, and does
        # not have any kind of locking in place around it.
        self._state = HTTPConnectionState.CLOSED
>       await self._network_stream.aclose()

../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/httpcore/_async/http11.py:253: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <httpcore._backends.anyio.AnyIOStream object at 0x1035dcad0>

    async def aclose(self) -> None:
>       await self._stream.aclose()

../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/httpcore/_backends/anyio.py:54: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = TLSStream(transport_stream=<anyio._backends._asyncio.SocketStream object at 0x103481210>, standard_compatible=False, _...t at 0x10320e3d0>, _read_bio=<_ssl.MemoryBIO object at 0x10946e200>, _write_bio=<_ssl.MemoryBIO object at 0x10946e230>)

    async def aclose(self) -> None:
        if self.standard_compatible:
            try:
                await self.unwrap()
            except BaseException:
                await aclose_forcefully(self.transport_stream)
                raise
    
>       await self.transport_stream.aclose()

../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/anyio/streams/tls.py:193: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <anyio._backends._asyncio.SocketStream object at 0x103481210>

    async def aclose(self) -> None:
        if not self._transport.is_closing():
            self._closed = True
            try:
>               self._transport.write_eof()

../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/anyio/_backends/_asyncio.py:1257: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

>   ???

uvloop/handles/stream.pyx:699: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

>   ???
E   RuntimeError: unable to perform operation on <TCPTransport closed=True reading=False 0x1300201e0>; the handler is closed

uvloop/handles/handle.pyx:159: RuntimeError

The above exception was the direct cause of the following exception:

client = <openai.AsyncOpenAI object at 0x10320e2d0>
string = 'This is a fifth test string.'

    @pytest.mark.parametrize("string", TEST_STRINGS)
    async def test_embedding(client: AsyncOpenAI, string):
>       return await client.embeddings.create(input=string, model="text-embedding-ada-002")

pkg_tests/test_openai_embeddings.py:20: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/openai/resources/embeddings.py:214: in create
    return await self._post(
../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/openai/_base_client.py:1705: in post
    return await self.request(cast_to, opts, stream=stream, stream_cls=stream_cls)
../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/openai/_base_client.py:1408: in request
    return await self._request(
../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/openai/_base_client.py:1461: in _request
    return await self._retry_request(
../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/openai/_base_client.py:1530: in _retry_request
    return await self._request(
../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/openai/_base_client.py:1461: in _request
    return await self._retry_request(
../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/openai/_base_client.py:1530: in _retry_request
    return await self._request(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <openai.AsyncOpenAI object at 0x10320e2d0>
cast_to = <class 'openai.types.create_embedding_response.CreateEmbeddingResponse'>
options = FinalRequestOptions(method='post', url='/embeddings', params={}, headers=NOT_GIVEN, max_retries=NOT_GIVEN, timeout=NOT...put': 'This is a fifth test string.', 'model': 'text-embedding-ada-002', 'encoding_format': 'base64'}, extra_json=None)

    async def _request(
        self,
        cast_to: Type[ResponseT],
        options: FinalRequestOptions,
        *,
        stream: bool,
        stream_cls: type[_AsyncStreamT] | None,
        remaining_retries: int | None,
    ) -> ResponseT | _AsyncStreamT:
        cast_to = self._maybe_override_cast_to(cast_to, options)
        await self._prepare_options(options)
    
        retries = self._remaining_retries(remaining_retries, options)
        request = self._build_request(options)
        await self._prepare_request(request)
    
        kwargs: HttpxSendArgs = {}
        if self.custom_auth is not None:
            kwargs["auth"] = self.custom_auth
    
        try:
            response = await self._client.send(
                request,
                stream=stream or self._should_stream_response_body(request=request),
                **kwargs,
            )
        except httpx.TimeoutException as err:
            log.debug("Encountered httpx.TimeoutException", exc_info=True)
    
            if retries > 0:
                return await self._retry_request(
                    options,
                    cast_to,
                    retries,
                    stream=stream,
                    stream_cls=stream_cls,
                    response_headers=None,
                )
    
            log.debug("Raising timeout error")
            raise APITimeoutError(request=request) from err
        except Exception as err:
            log.debug("Encountered Exception", exc_info=True)
    
            if retries > 0:
                return await self._retry_request(
                    options,
                    cast_to,
                    retries,
                    stream=stream,
                    stream_cls=stream_cls,
                    response_headers=None,
                )
    
            log.debug("Raising connection error")
>           raise APIConnectionError(request=request) from err
E           openai.APIConnectionError: Connection error.

../../../micromamba/envs/nlp-test/lib/python3.11/site-packages/openai/_base_client.py:1471: APIConnectionError
======== short test summary info ========
FAILED pkg_tests/test_openai_embeddings.py::test_embedding[asyncio-This is another test string.] - openai.APIConnectionError: Connectio...
FAILED pkg_tests/test_openai_embeddings.py::test_embedding[asyncio-This is a third test string.] - openai.APIConnectionError: Connectio...
FAILED pkg_tests/test_openai_embeddings.py::test_embedding[asyncio-This is a fourth test string.] - openai.APIConnectionError: Connectio...
FAILED pkg_tests/test_openai_embeddings.py::test_embedding[asyncio-This is a fifth test string.] - openai.APIConnectionError: Connectio...
===== 4 failed, 1 passed in 17.11s ======

From a user's perspective though; since the auth is simply a token without expiry, the error shouldn't happen either way, so far as the client is in reference.

@antont
Copy link

antont commented Feb 5, 2024

@dumbPy Async is sometimes tricky with pytest so it would be nice indeed to have this reproduced without pytest, just to have less moving parts and a more straightforward failing case.

@dumbPy
Copy link

dumbPy commented Feb 9, 2024

Can confirm that encode/httpcore#880 fixes the issue.
To reproduce,

pip3 uninstall httpcore && \
    pip3 install git+https://github.com/encode/httpcore.git@clean-state-cancellations

And run the above pytest tests, and it seem to pass.

@rattrayalex
Copy link
Collaborator

Terrific – thank you so much for investigating & sharing your results @dumbPy !

@RobertCraigie
Copy link
Collaborator

Please try installing httpcore==1.0.3 which includes this fix!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

7 participants