Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add StreamProvider for configuring StreamTable #10600

Merged
merged 8 commits into from
Jun 6, 2024

Conversation

matthewmturner
Copy link
Contributor

Which issue does this PR close?

Closes #10599

Rationale for this change

What changes are included in this PR?

Are these changes tested?

Not yet, but I will add tests if there is agreement the API is going in the right direction

Are there any user-facing changes?

Yes, new interface for StreamTable

@github-actions github-actions bot added the core Core DataFusion crate label May 21, 2024
@matthewmturner
Copy link
Contributor Author

@metesynnada @mustafasrepo i believe you were both involved in the StreamTable implementation so im interested in getting your views if this is going in the right direction towards a more generic StreamTable. (theres plenty of cleanup to do still but the general API is there)

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @matthewmturner -- this looks pretty neat.

I wonder if there is some example we could add to datafusion-examples that would show this new API in action? That might help motivate what the API changes are for / help this PR review along as well as help others see the feature

@@ -103,19 +105,29 @@ impl FromStr for StreamEncoding {
}
}

/// The configuration for a [`StreamTable`]
pub trait StreamSource: std::fmt::Debug + Send + Sync {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps some documentation about what this trait is meant to represent would help

@matthewmturner
Copy link
Contributor Author

@alamb i would be happy to add example - for this PR it would likely just be copying from https://github.com/apache/datafusion/blob/main/datafusion/core/tests/fifo.rs. I will get to that shortly.

Ultimately the motivation here is to start working towards a more generic interface where StreamTable can be used for multiple stream types as opposed to just files (such as websockets, kafka, etc. or maybe thats a non-goal, in which case i can just close this). Ive seen that it can require lots of custom code (formats and providers) to implement streaming tables and im hoping to simplify that.

@matthewmturner
Copy link
Contributor Author

matthewmturner commented May 22, 2024

Im hoping to get to a similar API as ListingTable.

ListingTable => ListingTableConfig => FileFormat

Where ListingTable and ListingTableConfig are provided by datafusion and if you want to extend then you can just implement a FileFormat and benefit from the ListingTable and ListingTableConfig machinery.

In the case of streams i had in mind:

StreamTable => StreamTableConfig => StreamProvider (maybe i rename to StreamFormat for consistency?)

Where there are different StreamProvider like websocket, kafka, pcap, etc etc etc...

@matthewmturner
Copy link
Contributor Author

@alamb i have a working example now. i have idea to update it to show more of the streaming nature (i.e. write to the fifo and get batches multiple times) but wont have time for that today. Do you have thoughts in general on whether this type of API could be supported?

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this API looks pretty good @matthewmturner thank you

@metesynnada or @ozankabak or @devinjdangelo do you have any thoughts / suggestions on these patterns?

@ozankabak
Copy link
Contributor

@berkaysynnada PTAL

@berkaysynnada
Copy link
Contributor

I will review it in detail tomorrow

/// The StreamProvider trait is used as a generic interface for reading and writing from streaming
/// data sources (such as FIFO, Websocket, Kafka, etc.). Implementations of the provider are
/// responsible for providing a `RecordBatchReader` and optionally a `RecordBatchWriter`.
pub trait StreamProvider: std::fmt::Debug + Send + Sync {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StreamFormat might be a better name here, to align with FileFormat

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure people associate location with 'Format'. FileFormat does not deal with location. I think StreamProvider is descriptive enough.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i viewed location as specific to the FileStreamProvider and Format as a concept that provided reading and writing capabilities, similar to FileFormat. But i dont feel strongly either way so im fine to leave at StreamProvider.

Copy link
Contributor

@berkaysynnada berkaysynnada left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After addressing @alamb's feedback, this PR looks good and separates responsibilities with an additional layer. However, FileStreamProvider currently handles only the CSV format. I suggest focusing more on file format abstraction. Thanks for the example; it makes understanding and usage easier.

/// The StreamProvider trait is used as a generic interface for reading and writing from streaming
/// data sources (such as FIFO, Websocket, Kafka, etc.). Implementations of the provider are
/// responsible for providing a `RecordBatchReader` and optionally a `RecordBatchWriter`.
pub trait StreamProvider: std::fmt::Debug + Send + Sync {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure people associate location with 'Format'. FileFormat does not deal with location. I think StreamProvider is descriptive enough.

@alamb alamb changed the title Start setting up new StreamTable config Add StreamProvider for configuring StreamTable Jun 4, 2024
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR looks good to me. Are we waiting on anything else? Otherwise I think it is good to go from my perspective.

Thank you @matthewmturner and @berkaysynnada for your code and review

Copy link
Contributor

@ozankabak ozankabak left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM as well

@matthewmturner
Copy link
Contributor Author

thank you @alamb @berkaysynnada @ozankabak

@alamb alamb merged commit fca1df9 into apache:main Jun 6, 2024
26 checks passed
@alamb
Copy link
Contributor

alamb commented Jun 6, 2024

Thanks again everyone

findepi pushed a commit to findepi/datafusion that referenced this pull request Jul 16, 2024
* Start setting up new StreamTable config

* Cleanup

* Cleanup

* Fix some tests

* Cleanup

* Start adding example

* Feedback
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Make the configuration for StreamTable more generic to support more stream sources
4 participants