Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async Support #171

Closed
t-mart opened this issue Nov 21, 2019 · 10 comments
Closed

Async Support #171

t-mart opened this issue Nov 21, 2019 · 10 comments
Labels
feature Request for adding a new feature

Comments

@t-mart
Copy link
Contributor

t-mart commented Nov 21, 2019

I would like to see async support in loguru. In my otherwise asynchronous application, I was a bit frustrated that I had to resort to threading because I couldn't call my sinks asynchronously (I'm calling an HTTP API with log records in batches, which would kill a synchronous application for no good reason).

To me, the cleanest approach would be to add a whole 'nother set of logging functions on the Logger that are coroutines (async def) like trace_async, debug_async, etc. I'm not totally sure how this would work with the concurrency mechanisms in this project (like the enqueue flag provides) without more investigation.

I've done a bit of asynchronous programming in Python. From 20,000 feet, I don't think this would be a huge addition.

@Delgan, would you have an appetite for this?

@Delgan
Copy link
Owner

Delgan commented Nov 22, 2019

Yeah, out-of-the-box support for async functions is definitely something I would love to see integrated in Loguru! Actually, it's a thing I've shortly investigated, but I didn't really have an use case in mind. I decided to wait for someone to open an issue on this subject so we could discuss it. Here you are! 😄

Precisely, about your use case: isn't it something which can be solved using the enqueue=True argument of Loguru? One of its purpose is to allow non-blocking logging calls and saving you the hassle of implementing the threading mechanisms needed to do so. What would logging functions using asyncio bring in addition? What would you do with logger.debug_async() that you can't with logger.debug()? I have hard time picturing how logging coroutines would interact with the rest of the app. If they are added to the event loop, they need to be awaited and hence become blocking anyway.

I'm not a big fan of adding xxx_async() logging methods... 😅
Isn't there a way to add support for coroutine sinks within the current API? That would be very nice, as I guess async functions will become more and more common, it would avoid issues like #119. Actually, I think I like the asyncio.ensure_future() solution. This automatically adds the coroutine to the event loop, effectively transforming the logging method in a non-blocking call. That's what most users want, right?

@t-mart
Copy link
Contributor Author

t-mart commented Nov 22, 2019

@Delgan

enqueue=True

Ah, I didn't know that's what that was doing. Ok.

However, I don't think I have any control over how the queue gets processed, right?

For example, in my code, my sink puts messages into a SimpleQueue (I think you also use one in Handler, heh) and my worker thread drains it every 5 seconds and batches all the those messages into 1 HTTP API call to StackDriver. Reducing HTTP calls by batching is important in my use case. I'm not sure I could get that kind of functionality exactly with enqueue alone.

Maybe what I really need is batching support. Batching would help for any sink that uses a resource that requires overhead to access. For example, it looks like FileSink keeps an open file object as long as the handler exists? I'm not sure, but that doesn't feel right (e.g. "Why should I close files in Python?"). Batching might help here?

I'm not sure how the API would look if batching was possible. Or if this is worth it. Just thinking out loud. I think I can manage as-is.

asyncio.ensure_future()

Yes, I think this would be a step in the right direction. (I think asyncio.create_task() is more preferred, by the way). But without an async def chain of methods all the way from the debug()/info()/etc method down to the sink call, I'm not sure how you could run that code asynchronously.

Maybe loguru would manage its own loop (asyncio.new_event_loop())? I dunno. I'm out of my depth with this suggestion.

@Delgan
Copy link
Owner

Delgan commented Nov 22, 2019

Yes, Loguru uses a SimpleQueue internally when enqueue=True. You don't have much control over it (its main purpose is for non-blocking calls, not for batches), but I think you can achieve batching quite easily:

class BatcherHTTP:

    def __init__(self, limit):
        self._limit = limit
        self._batch = []

    def write(self, message):
        self._batch.append(message)
        if len(self._batch) == self._limit:
            batched_request(self._batch)
            self._batch.clear()

logger.add(BatcherHTTP(10), enqueue=True)

That's way you benefit from both asynchronous and batched logging.

Integrating a batching option to Loguru is an interesting idea, but quickly thinking about this, it seems complicated. The sinks are generally callable which expect just one message. If Loguru aims to manage batches, it can't just call for message in self._batch: sink(message) as it would defeats the purpose. So, that requires the sink to have a method which can accept multiple messages at once, and such sink would break as soon as batching=True/False is inverted. For now, I think it's better if batches are handled by the sink itself.

Logging to a file supports some kind of batching through the buffering argument of the open() function. The FileSink keeps the file-object opened but cleanly closes it when the handler is stopped. I think this is a sane choice as long as the file lifetime is properly managed (I don't rely on the Python garbage collector to implicitly close it). It would be very expensive to open and close the file every time a log needs to be written. The standard logging library does the same actually.

Back to asyncio: I think create_task() would best suits our needs, but its requires Python 3.7 while Loguru supports also Python 3.5. That's why I proposed ensure_future() as it's recommended for backward compatibility. Also, I will add a loop argument to logger.add() (only available if the sink is an async function), so that user can manage where are executed the tasks and circumvent potential problems you evoked.

@t-mart
Copy link
Contributor Author

t-mart commented Nov 22, 2019

Nice with the BatcherHTTP code. Thank you. I'll experiment with that, but probably also add a stop() method to write the final batch if it's non-empty. One tradeoff with that approach is that you get less interactivity if messages hitting that sink are in a lull for some period of time. For example, you could be waiting at 9 messages for some time. Conversely, a thread on a sleep schedule would consistently maintain interactivity.

(FWIW, a reason I'm interested in async for this use case is because asyncio has its own queue, sleep, and worker concepts.)

You make solid points about the FileSink, especially if that's how the standard library does it. Touché.

I didn't realize create_task was a newer thing. All ahead with ensure_future then!

@Delgan Delgan added the feature Request for adding a new feature label Nov 22, 2019
@Delgan
Copy link
Owner

Delgan commented Nov 25, 2019

Ok, I realized it's a little bit more complicated than just wrapping the coroutine with ensure_future()... It will work, but as you said, something needs to be awaited, otherwise some logs might not be processed:

import asyncio

async def log():
    await asyncio.sleep(0.1)
    print("Done")

async def main():
    asyncio.ensure_future(log())  # Called by logger.debug() for example

asyncio.run(main())  # Nothing is printed

That means I need to add a new awaitable method to the logger, so that user can ends the loop by calling something like await logger.complete(). This bothers me a little, because it will only concern (possible) asynchronous sinks. But I don't think there's any other choice.... In any case, I could possibly extend its use in the future if features for other types of sinks are needed.

I looked a little bit at what was being done by other libraries. aiolog and aiologstash use some kind of "asynchronous worker loop" started with ensure_future(). aiologger makes use of loop.create_task() to automatically schedule the logging call. The former require a loop to start the handler and the worker, which isn't very convenient, and also this prevents the handler to be re-used from multiple different loops. That's why calling ensure_future() around each coroutine log seems more appropriate, in my opinion. Only downside is that if the scheduled tasks need to be awaited (with logger.complete()), I need to store them somewhere. I think I can use a WeakSet just as the standard library does: asyncio/tasks.py#L917

As I'm not very familiar with asynchronous programming, it's possible that I'm overlooking some things, though. 😄

Delgan added a commit that referenced this issue Nov 30, 2019
This also adds the new "complete()" method so that scheduled tasks can
be awaited before leaving the event loop.

There were some subtleties to be taken into account:
- We use "get_event_loop()" instead of "get_running_loop()" so that it
does not raise error if called outside of the event loop
- It not possible to wait for a task created in a different loop, so
"complete()" must filter the scheduled ones
- Before Python 3.5.3, coroutines did not have access to the running
loop, so "get_event_loop()" do not return the "expected" loop
- If "enqueue=True" and "loop=None", the worker thread dot not have
access to the running loop, so we use the global one at the time of
"add()" so user can `await` it
- If "enqueue=True", there must be a way to notify the awaiting
"complete()" that all tasks has been scheduled and can be awaited
@Delgan
Copy link
Owner

Delgan commented Nov 30, 2019

Merged in master: asynchronous sinks are now possible by passing a coroutine sink to the .add() method. It accepts an optional loop parameter, otherwise the event loop returned by asyncio.get_event_loop() is used, and then logging calls are scheduled using loop.create_task(). Scheduled tasks can and should be awaited with await logger.complete() before leaving the event loop.

async def sink(msg):
    print(msg)

async def main():
    logger.info("Start...")
    res = await your_function()
    logger.info("Result: {}", res)
    await logger.complete()

logger.add(sink)
asyncio.run(main())

There were some corner cases to resolve (interoperability with enqueue, using different event loops, support for earlier Python version etc.) but I think I've managed to keep things roughly consistent. I tried to test it extensively, but it is not always easy. It should be fine for the main use cases anyway. 👍

@Delgan Delgan closed this as completed Nov 30, 2019
@lsabi lsabi mentioned this issue Mar 30, 2020
@PieceOfGood
Copy link

Hello!
Do I understand correctly that when specifying enqueue=True in the following example:

logger.add(
    os.path.join(LOGS_DIR, "ERROR", "log_{time}.log"),
    rotation="100 MB",
    compression="zip",
    enqueue=True,
    level="ERROR"
)

Will logging to a file be done asynchronously?

And is also sufficient to logger.remove() and then logger.add(sys.stderr, enqueue=True) so that the output to the console is also carried out asynchronously, or in the case of the latter it does not make much sense?

And does the presence of enqueue=True in any number of logger add(...) mean that the last line before the script ends necessarily should be logger.complete()?

And also, please tell me, what is the best way to log exceptions thrown inside asynchronous tasks? For example, in the following code, the exception inside test() is not caught, and if you call test() using await everything happens as expected.

async def test() -> None:
    raise ValueError("Oops!")
	
@logger.catch
async def main() -> None:
    asyncio.create_task(test())
    # await test()
    await asyncio.sleep(1)
    await logger.complete()
	
asyncio.run(main())

Does this mean that all functions that will be called as tasks should be wrapped with the @logger.catch decorator, or is there some less verbose way to indicate that these errors should also be caugh, in case if these functions can be quite a lot?

It's just that only a couple of lines were mentioned about asynchrony, from which it is not at all obvious whether I am approaching this issue correctly so that the code that continues to execute correctly does not turn out to be an asynchronous illusion.

@Delgan
Copy link
Owner

Delgan commented Feb 15, 2023

Hello @PieceOfGood. :)

Will logging to a file be done asynchronously?

The logging will be made asynchronous in the sense that calling a logging function won't wait for the message to be written on disk. Using enqueue=True spawns a background thread which handle logged messages, thus reducing time spent in the main thread.
This is not related to asyncio at all, though.

And is also sufficient to logger.remove() and then logger.add(sys.stderr, enqueue=True) so that the output to the console is also carried out asynchronously, or in the case of the latter it does not make much sense?

Yes, it makes totally sense. This is the way to do it if you want logging to sys.stderr to not be executed in the main thread. However, because sys.stderr.write() is already very fast, using enqueue=True will likely not make logging calls faster.
Still, it can be very useful when using multiprocessing to avoid logs overlapping each others on the console if multiple processes write logs without synchronization.

And does the presence of enqueue=True in any number of logger add(...) mean that the last line before the script ends necessarily should be logger.complete()?

The logger.complete() is only required for logging handlers relying on asyncio and will wait for all scheduled tasks to finish. It can also be useful when using multiprocessing, to make sure all messages were properly sent to the main thread.
However, if you're using enqueue=True with only one process, calling logger.complete() is not useful. Loguru will wait for all messages to be processed when logger.remove() is called, which happens automatically at the end of the program.

And also, please tell me, what is the best way to log exceptions thrown inside asynchronous tasks? For example, in the following code, the exception inside test() is not caught, and if you call test() using await everything happens as expected.

Hum, I copy/pasted your snippet and it seems the exception is properly caught by Loguru.

Basically, logger.catch() is just syntactic sugar for a try / except block. Anything inside asyncio that works with standard try / except control flow could be handled by logger.catch(), but if the tasks exceptions are handled differently, I don't think it will work.

@PieceOfGood
Copy link

Thanks for your reply.
For next code:

import asyncio
from loguru import logger

async def test() -> None:
    raise ValueError("Oops!")

@logger.catch
async def main() -> None:
    asyncio.create_task(test())
    # await test()
    await asyncio.sleep(1)

asyncio.run(main())

my console output in PyCharm look like this:
image

But if I swap the comment on lines 9 and 10:

import asyncio
from loguru import logger

async def test() -> None:
    raise ValueError("Oops!")

@logger.catch
async def main() -> None:
    # asyncio.create_task(test())
    await test()
    await asyncio.sleep(1)

asyncio.run(main())

Output look like this:
image

As the documentation says, to get an exception that may have been thrown by a task, you need to call the result = task.exception() method on it, then this will throw an exception in the main thread, which is wrapped by @logger.catch, but for to do this, you must wait for it to complete so that instead of a possible exception, you do not raise an InvalidStateError. But because of this, the whole point of running an asynchronous function as a task is lost.

I'm talking more about various handlers, or "fire and forget" tasks, the result of which does not interest me and which can be generated in a very large number.

For such a case, it is necessary to wrap each such function with the @logger.catch decorator, but the additional complexity lies in the fact that there can be quite a lot of such functions.

Of course, I can argue incorrectly, but the only experience available to me says that for greater convenience in this situation, such functions should be described in a separate file, the import from which should be wrapped in the way mentioned earlier.

And since this solution still looks cumbersome, I thought maybe someone would share their wisdom and suggest the best way to do this.

And thanks for your hard work. Loguru is really a good tool!

@Delgan
Copy link
Owner

Delgan commented Apr 7, 2023

Sorry for the late answer @PieceOfGood, but I'm not sure of the best practices in your case.

What bothers me in your 1st example is that the task is never awaited. This is a problem, in my opinion, because that means you can't be sure the function will be entirely and properly executed. Try adding asyncio.sleep(2) at the beginning of the test() function and you will see the exception is never raised.

Also, the documentation of asyncio.create_task() explicitly warns about this and suggest to save the returned value into a variable:

Important: Save a reference to the result of this function, to avoid a task disappearing mid-execution. The event loop only keeps weak references to tasks. A task that isn’t referenced elsewhere may get garbage collected at any time, even before it’s done.

I think the best practice is to explicitly await the created tasks somewhere in your program. You may save same to a set() and call await asyncio.gather() at the end of your program.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature Request for adding a new feature
Projects
None yet
Development

No branches or pull requests

3 participants