Skip to content

Commit

Permalink
Merge branch 'main' into bump
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Sep 20, 2022
2 parents 852b671 + e59bed1 commit 06a34cc
Show file tree
Hide file tree
Showing 12 changed files with 8 additions and 142 deletions.
2 changes: 1 addition & 1 deletion src/query/functions/src/scalars/logics/and.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl Function for LogicAndFiltersFunction {
input_rows: usize,
) -> Result<ColumnRef> {
if columns.len() == 1 {
return Ok(columns[1].column().clone());
return Ok(columns[0].column().clone());
}

let mut validity = None;
Expand Down
1 change: 0 additions & 1 deletion src/query/pipeline/core/src/processors/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use futures::FutureExt;
use petgraph::graph::node_index;
use petgraph::prelude::NodeIndex;

#[derive(Debug)]
pub enum Event {
NeedData,
NeedConsume,
Expand Down
10 changes: 1 addition & 9 deletions src/query/service/src/pipelines/executor/executor_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,15 +242,7 @@ impl ExecutingGraph {
state_guard_cache = Some(node.state.lock().unwrap());
}

let event = node.processor.event()?;
tracing::debug!(
"node id:{:?}, name:{:?}, event: {:?}",
node.processor.id(),
node.processor.name(),
event
);

let processor_state = match event {
let processor_state = match node.processor.event()? {
Event::Finished => State::Finished,
Event::NeedData | Event::NeedConsume => State::Idle,
Event::Sync => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
use std::fmt::Debug;
use std::fmt::Formatter;
use std::sync::Arc;
use std::time::Instant;

use common_base::base::TrySpawn;
use common_exception::ErrorCode;
Expand Down Expand Up @@ -80,19 +79,7 @@ impl ExecutorWorkerContext {
}

unsafe fn execute_sync_task(&mut self, processor: ProcessorPtr) -> Result<Option<NodeIndex>> {
if tracing::enabled!(tracing::Level::DEBUG) {
let start = Instant::now();
processor.process()?;
tracing::debug!(
"sync processor, node id:{:?}, name:{:?}, event: {:?}",
processor.id(),
processor.name(),
start.elapsed()
);
} else {
processor.process()?;
}

processor.process()?;
Ok(Some(processor.id()))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,6 @@ impl ProcessorAsyncTask {
);
}
Either::Right((res, _)) => {
tracing::debug!(
"async processor, node id {:?} name: {:?}, elapsed:{:?}",
wraning_processor.id(),
wraning_processor.name(),
start.elapsed()
);
return res;
}
}
Expand Down
3 changes: 1 addition & 2 deletions src/query/service/src/servers/http/clickhouse_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ async fn execute(
.start()
.await
.map_err(|e| error!("interpreter.start.error: {:?}", e));
let data_stream: SendableDataBlockStream = {
let mut data_stream: SendableDataBlockStream = {
let output_port = OutputPort::create();
let stream_source = StreamSource::create(ctx.clone(), input_stream, output_port.clone())?;
let mut source_pipe_builder = SourcePipeBuilder::create();
Expand All @@ -122,7 +122,6 @@ async fn execute(
interpreter.execute(ctx.clone()).await?
};

let mut data_stream = ctx.try_create_abortable(data_stream)?;
let format_setting = ctx.get_format_settings()?;
let mut output_format = format.create_format(schema, format_setting);
let prefix = Ok(output_format.serialize_prefix()?);
Expand Down
16 changes: 3 additions & 13 deletions src/query/service/src/servers/http/v1/query/execute_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ use common_exception::Result;
use common_legacy_planners::PlanNode;
use common_streams::DataBlockStream;
use common_streams::SendableDataBlockStream;
use futures::future::AbortHandle;
use futures::future::Abortable;
use futures::StreamExt;
use futures_util::FutureExt;
use serde::Deserialize;
Expand Down Expand Up @@ -322,8 +320,7 @@ async fn execute(
block_buffer: Arc<BlockBuffer>,
executor: Arc<RwLock<Executor>>,
) -> Result<()> {
let data_stream = interpreter.execute(ctx.clone()).await?;
let mut data_stream = ctx.try_create_abortable(data_stream)?;
let mut data_stream = interpreter.execute(ctx.clone()).await?;
let use_result_cache = !ctx.get_config().query.management_mode;

match data_stream.next().await {
Expand Down Expand Up @@ -425,17 +422,10 @@ impl HttpQueryHandle {
};

let (error_sender, mut error_receiver) = mpsc::channel::<Result<()>>(1);
let (abort_handle, abort_registration) = AbortHandle::new_pair();

GlobalIORuntime::instance().spawn(async move {
let error_receiver = Abortable::new(error_receiver.recv(), abort_registration);
ctx.add_source_abort_handle(abort_handle);
match error_receiver.await {
Err(_) => {
Executor::stop(&executor, Err(ErrorCode::AbortedQuery("")), false).await;
block_buffer.stop_push().await.unwrap();
}
Ok(Some(Err(e))) => {
match error_receiver.recv().await {
Some(Err(e)) => {
Executor::stop(&executor, Err(e), false).await;
block_buffer.stop_push().await.unwrap();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -459,10 +459,7 @@ impl<W: AsyncWrite + Send + Unpin> InteractiveWorkerBase<W> {
}
};

let abortable_stream = ctx
.try_create_abortable(intercepted_stream.boxed())?
.boxed();
Ok::<_, ErrorCode>(abortable_stream)
Ok::<_, ErrorCode>(intercepted_stream.boxed())
}
.in_current_span()
})?;
Expand Down
13 changes: 0 additions & 13 deletions src/query/service/src/sessions/query_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,6 @@ use common_legacy_planners::SourceInfo;
use common_legacy_planners::StageTableInfo;
use common_meta_app::schema::TableInfo;
use common_meta_types::UserInfo;
use common_streams::AbortStream;
use common_streams::SendableDataBlockStream;
use futures::future::AbortHandle;
use opendal::Operator;
use parking_lot::Mutex;
use parking_lot::RwLock;
Expand Down Expand Up @@ -144,16 +141,6 @@ impl QueryContext {
DataExchangeManager::instance()
}

pub fn try_create_abortable(&self, input: SendableDataBlockStream) -> Result<AbortStream> {
let (abort_handle, abort_stream) = AbortStream::try_create(input)?;
self.shared.add_source_abort_handle(abort_handle);
Ok(abort_stream)
}

pub fn add_source_abort_handle(&self, abort_handle: AbortHandle) {
self.shared.add_source_abort_handle(abort_handle);
}

pub fn attach_http_query(&self, handle: HttpQueryHandle) {
self.shared.attach_http_query_handle(handle);
}
Expand Down
13 changes: 0 additions & 13 deletions src/query/service/src/sessions/query_ctx_shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use common_exception::ErrorCode;
use common_exception::Result;
use common_meta_types::UserInfo;
use common_storage::StorageOperator;
use futures::future::AbortHandle;
use opendal::Operator;
use parking_lot::Mutex;
use parking_lot::RwLock;
Expand Down Expand Up @@ -68,7 +67,6 @@ pub struct QueryContextShared {
pub(in crate::sessions) runtime: Arc<RwLock<Option<Arc<Runtime>>>>,
pub(in crate::sessions) init_query_id: Arc<RwLock<String>>,
pub(in crate::sessions) cluster_cache: Arc<Cluster>,
pub(in crate::sessions) sources_abort_handle: Arc<RwLock<Vec<AbortHandle>>>,
pub(in crate::sessions) subquery_index: Arc<AtomicUsize>,
pub(in crate::sessions) running_query: Arc<RwLock<Option<String>>>,
pub(in crate::sessions) http_query: Arc<RwLock<Option<HttpQueryHandle>>>,
Expand Down Expand Up @@ -99,7 +97,6 @@ impl QueryContextShared {
write_progress: Arc::new(Progress::create()),
error: Arc::new(Mutex::new(None)),
runtime: Arc::new(RwLock::new(None)),
sources_abort_handle: Arc::new(RwLock::new(Vec::new())),
subquery_index: Arc::new(AtomicUsize::new(1)),
running_query: Arc::new(RwLock::new(None)),
http_query: Arc::new(RwLock::new(None)),
Expand All @@ -123,11 +120,6 @@ impl QueryContextShared {
executor.finish(Some(cause));
}

let mut sources_abort_handle = self.sources_abort_handle.write();

while let Some(source_abort_handle) = sources_abort_handle.pop() {
source_abort_handle.abort();
}
// TODO: Wait for the query to be processed (write out the last error)
}

Expand Down Expand Up @@ -256,11 +248,6 @@ impl QueryContextShared {
running_query.as_ref().unwrap_or(&"".to_string()).clone()
}

pub fn add_source_abort_handle(&self, handle: AbortHandle) {
let mut sources_abort_handle = self.sources_abort_handle.write();
sources_abort_handle.push(handle);
}

pub fn get_config(&self) -> Config {
self.config.clone()
}
Expand Down
2 changes: 0 additions & 2 deletions src/query/streams/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,13 @@

mod sources;
mod stream;
mod stream_abort;
mod stream_datablock;
mod stream_error;
mod stream_progress;
mod stream_take;

pub use sources::*;
pub use stream::*;
pub use stream_abort::AbortStream;
pub use stream_datablock::DataBlockStream;
pub use stream_error::ErrorStream;
pub use stream_progress::ProgressStream;
Expand Down
64 changes: 0 additions & 64 deletions src/query/streams/src/stream_abort.rs

This file was deleted.

0 comments on commit 06a34cc

Please sign in to comment.