Skip to content

Commit

Permalink
fix: fix subscription clean during recovery (#18866)
Browse files Browse the repository at this point in the history
  • Loading branch information
yezizp2012 authored and yezizp2012 committed Oct 11, 2024
1 parent 0c99de2 commit 90b80e8
Showing 1 changed file with 15 additions and 4 deletions.
19 changes: 15 additions & 4 deletions src/meta/src/controller/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ use risingwave_pb::meta::{PbFragmentWorkerSlotMapping, PbRelation, PbRelationGro
use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::FragmentTypeFlag;
use risingwave_pb::user::PbUserInfo;
use sea_orm::sea_query::{Expr, SimpleExpr};
use sea_orm::sea_query::{Expr, Query, SimpleExpr};
use sea_orm::ActiveValue::Set;
use sea_orm::{
ActiveModelTrait, ColumnTrait, DatabaseConnection, DatabaseTransaction, EntityTrait,
Expand Down Expand Up @@ -609,10 +609,21 @@ impl CatalogController {
pub async fn clean_dirty_subscription(&self) -> MetaResult<()> {
let inner = self.inner.write().await;
let txn = inner.db.begin().await?;
let _res = Subscription::delete_many()

Object::delete_many()
.filter(
subscription::Column::SubscriptionState
.eq(Into::<i32>::into(SubscriptionState::Init)),
object::Column::ObjType.eq(ObjectType::Subscription).and(
object::Column::Oid.not_in_subquery(
Query::select()
.column(subscription::Column::SubscriptionId)
.from(Subscription)
.and_where(
subscription::Column::SubscriptionState
.eq(SubscriptionState::Created as i32),
)
.take(),
),
),
)
.exec(&txn)
.await?;
Expand Down

0 comments on commit 90b80e8

Please sign in to comment.