Skip to content

Commit

Permalink
Implement support for deleting from base nodes
Browse files Browse the repository at this point in the history
This changes required a couple smaller changes:
* Base nodes now indicate key column
* Record enum can now be `DeleteRequest`
* Streamers send `Vec<StreamUpdate>`'s instead of Records
  • Loading branch information
fintelia committed Feb 8, 2017
1 parent 731c2d3 commit 76170fc
Show file tree
Hide file tree
Showing 13 changed files with 211 additions and 64 deletions.
2 changes: 1 addition & 1 deletion benchmarks/bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ pub fn setup(num_putters: usize) -> Box<Bank> {
let mut mig = g.start_migration();

// add transfers base table
transfers = mig.add_ingredient("transfers", &["src_acct", "dst_acct", "amount"], Base {});
transfers = mig.add_ingredient("transfers", &["src_acct", "dst_acct", "amount"], Base::default());

// add all debits
debits = mig.add_ingredient("debits",
Expand Down
6 changes: 3 additions & 3 deletions benchmarks/targets/soup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ pub fn make(_: &str, _: usize) -> SoupTarget {
let mut mig = g.start_migration();

// add article base node
article = mig.add_ingredient("article", &["id", "title"], Base {});
article = mig.add_ingredient("article", &["id", "title"], Base::default());

// add vote base table
vote = mig.add_ingredient("vote", &["user", "id"], Base {});
vote = mig.add_ingredient("vote", &["user", "id"], Base::default());

// add vote count
vc = mig.add_ingredient("votecount",
Expand Down Expand Up @@ -113,7 +113,7 @@ impl Backend for SoupTarget {
let mut mig = self._g.start_migration();

// add new "ratings" base table
let rating = mig.add_ingredient("rating", &["user", "id", "stars"], Base {});
let rating = mig.add_ingredient("rating", &["user", "id", "stars"], Base::default());

// add sum of ratings
let rs = mig.add_ingredient("rsum",
Expand Down
2 changes: 1 addition & 1 deletion src/backlog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ impl WriteHandle {
Record::Negative(r) => {
self.handle.remove(key, r);
}

Record::DeleteRequest(..) => unreachable!(),
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/flow/domain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,7 @@ impl Domain {
match r {
ops::Record::Positive(r) => state.insert(r),
ops::Record::Negative(ref r) => state.remove(r),
ops::Record::DeleteRequest(..) => unreachable!(),
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions src/flow/domain/single.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,9 @@ impl NodeDescriptor {
txs.retain(|tx| {
left -= 1;
if left == 0 {
tx.send(data.take().unwrap())
tx.send(data.take().unwrap().into_iter().map(|r| r.into()).collect())
} else {
tx.send(data.clone().unwrap())
tx.send(data.clone().unwrap().into_iter().map(|r| r.into()).collect())
}
.is_ok()
});
Expand Down Expand Up @@ -160,6 +160,7 @@ pub fn materialize(rs: &Records, state: Option<&mut State>) {
match *r {
ops::Record::Positive(ref r) => state.insert(r.clone()),
ops::Record::Negative(ref r) => state.remove(r),
ops::Record::DeleteRequest(..) => unreachable!(),
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/flow/migrate/materialization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,8 @@ pub fn index(graph: &Graph,
// this materialization doesn't have any primary key,
// so we assume it's not in use.

if graph[map[&n]].is_base() {
let ref node = graph[map[&n]];
if node.is_internal() && node.is_base() {
// but it's a base nodes!
// we must *always* materialize base nodes
// so, just make up some column to index on
Expand Down
27 changes: 26 additions & 1 deletion src/flow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,31 @@ impl Mutator {
}).unwrap();
recv.recv().unwrap()
}

/// Perform a non-transactional delete frome the base node this Mutator was generated for.
pub fn delete(&self, col: usize, key: query::DataType) {

This comment has been minimized.

Copy link
@jonhoo

jonhoo Feb 10, 2017

Contributor

Shouldn't need to specify col

self.tx.send(Message {
from: self.src,
to: self.addr,
data: vec![prelude::Record::DeleteRequest(col, key)].into(),
ts: None,
token: None,
}).unwrap()
}

/// Perform a transactional delete from the base node this Mutator was generated for.
pub fn transactional_delete(&self, col: usize, key: query::DataType, t: checktable::Token)
-> checktable::TransactionResult {
let (send, recv) = mpsc::channel();
self.tx.send(Message {
from: self.src,
to: self.addr,
data: vec![prelude::Record::DeleteRequest(col, key)].into(),
ts: None,
token: Some((t, send)),
}).unwrap();
recv.recv().unwrap()
}
}

/// `Blender` is the core component of the alternate Soup implementation.
Expand Down Expand Up @@ -672,7 +697,7 @@ impl<'a> Migration<'a> {
/// As new updates are processed by the given node, its outputs will be streamed to the
/// returned channel. Node that this channel is *not* bounded, and thus a receiver that is
/// slower than the system as a hole will accumulate a large buffer over time.
pub fn stream(&mut self, n: NodeAddress) -> mpsc::Receiver<prelude::Records> {
pub fn stream(&mut self, n: NodeAddress) -> mpsc::Receiver<Vec<node::StreamUpdate>> {
self.ensure_reader_for(n);
let (tx, rx) = mpsc::channel();
self.reader_for(n).streamers.lock().unwrap().push(tx);
Expand Down
29 changes: 27 additions & 2 deletions src/flow/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,40 @@ use std::ops::{Deref, DerefMut};
use checktable;

use query::DataType;
use ops::{Records, Datas};
use ops::{Record, Datas};
use flow::domain;
use flow::{Message, Ingredient, NodeAddress, Edge};

use backlog;

/// A StreamUpdate reflects the addition or deletion of a row from a reader node.
#[derive(Clone, Debug, PartialEq)]
pub enum StreamUpdate {
/// Indicates the addition of a new row
AddRow(sync::Arc<Vec<DataType>>),
/// Indicates the removal of an existing row
DeleteRow(sync::Arc<Vec<DataType>>),
}

impl From<Record> for StreamUpdate {
fn from(other: Record) -> Self {
match other {
Record::Positive(u) => StreamUpdate::AddRow(u),
Record::Negative(u) => StreamUpdate::DeleteRow(u),
Record::DeleteRequest(..) => unreachable!(),
}
}
}

impl From<Vec<DataType>> for StreamUpdate {
fn from(other: Vec<DataType>) -> Self {
StreamUpdate::AddRow(sync::Arc::new(other))
}
}

#[derive(Clone)]
pub struct Reader {
pub streamers: sync::Arc<sync::Mutex<Vec<mpsc::Sender<Records>>>>,
pub streamers: sync::Arc<sync::Mutex<Vec<mpsc::Sender<Vec<StreamUpdate>>>>>,
pub state: Option<backlog::ReadHandle>,
pub token_generator: Option<checktable::TokenGenerator>,
}
Expand Down
2 changes: 1 addition & 1 deletion src/flow/sql_to_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ impl SqlIncorporator {
let fields = Vec::from_iter(cols.iter().map(|c| c.name.clone()));

// make the new base node and record its information
let na = mig.add_ingredient(st.table.name.clone(), fields.as_slice(), Base {});
let na = mig.add_ingredient(st.table.name.clone(), fields.as_slice(), Base::default());
self.node_addresses.insert(name, na);
self.node_fields.insert(na, fields);
na
Expand Down
9 changes: 5 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,8 @@
//! let mut mig = g.start_migration();
//!
//! // base types
//! let article = mig.add_ingredient("article", &["id", "title"], Base {});
//! let vote = mig.add_ingredient("vote", &["user", "id"], Base {});
//! let article = mig.add_ingredient("article", &["id", "title"], Base::default());
//! let vote = mig.add_ingredient("vote", &["user", "id"], Base::default());
//!
//! // vote count is an aggregation over vote where we group by the second field ([1])
//! let vc = mig.add_ingredient("vc", &["id", "votes"], Aggregation::COUNT.over(vote, 0, &[1]));
Expand Down Expand Up @@ -176,7 +176,7 @@
//! # let mut g = Blender::new();
//! # let article = {
//! # let mut mig = g.start_migration();
//! # let article = mig.add_ingredient("article", &["id", "title"], Base {});
//! # let article = mig.add_ingredient("article", &["id", "title"], Base::default());
//! # mig.commit();
//! # article
//! # };
Expand Down Expand Up @@ -239,7 +239,7 @@
//! # let mut g = Blender::new();
//! # let vote = {
//! # let mut mig = g.start_migration();
//! # let vote = mig.add_ingredient("vote", &["user", "id"], Base {});
//! # let vote = mig.add_ingredient("vote", &["user", "id"], Base::default());
//! # mig.commit();
//! # vote
//! # };
Expand Down Expand Up @@ -356,6 +356,7 @@ mod backlog;

pub use checktable::{Token, TransactionResult};
pub use flow::{Blender, Migration, NodeAddress, Mutator};
pub use flow::node::StreamUpdate;
pub use flow::sql_to_flow::{SqlIncorporator, ToFlowParts};
pub use ops::Datas;
pub use ops::base::Base;
Expand Down
55 changes: 48 additions & 7 deletions src/ops/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,29 @@ use std::collections::HashMap;
/// forward them to interested downstream operators. A base node should only be sent updates of the
/// type corresponding to the node's type.
#[derive(Debug, Clone)]
pub struct Base {}
pub struct Base {
key_column: Option<usize>,
us: Option<NodeAddress>,
}

impl Base {
/// Create a base node operator.
pub fn new(key_column: usize) -> Self {
Base {
key_column: Some(key_column),
us: None,
}
}
}

impl Default for Base {
fn default() -> Self {
Base {
key_column: None,
us: None,
}
}
}

use flow::prelude::*;

Expand All @@ -24,17 +46,36 @@ impl Ingredient for Base {
}

fn will_query(&self, _: bool) -> bool {
false
self.key_column.is_some()

This comment has been minimized.

Copy link
@jonhoo

jonhoo Feb 10, 2017

Contributor

Should be

!materialized && self.key_column.is_some()
}

fn on_connected(&mut self, _: &Graph) {}
fn on_commit(&mut self, _: NodeAddress, _: &HashMap<NodeAddress, NodeAddress>) {}
fn on_input(&mut self, _: NodeAddress, rs: Records, _: &DomainNodes, _: &StateMap) -> Records {
rs
fn on_commit(&mut self, us: NodeAddress, _: &HashMap<NodeAddress, NodeAddress>) {
self.us = Some(us);
}
fn on_input(&mut self, _: NodeAddress, rs: Records, _: &DomainNodes, state: &StateMap) -> Records {
rs.into_iter().map(|r| match r {
Record::Positive(u) => Record::Positive(u),
Record::Negative(u) => Record::Negative(u),
Record::DeleteRequest(col, key) => {
assert_eq!(self.key_column, Some(col));

let db = state.get(self.us.as_ref().unwrap().as_local())
.expect("base must have its own state materialized to support deletions");
let rows = db.lookup(col, &key);
assert_ne!(rows.len(), 0);

This comment has been minimized.

Copy link
@jonhoo

jonhoo Feb 10, 2017

Contributor

assert_eq!(rows.len(), 1);


Record::Negative(rows[0].clone())
}
}).collect()
}

fn suggest_indexes(&self, _: NodeAddress) -> HashMap<NodeAddress, usize> {
HashMap::new()
fn suggest_indexes(&self, n: NodeAddress) -> HashMap<NodeAddress, usize> {
if self.key_column.is_some() {
[(n, self.key_column.unwrap())].iter().cloned().collect()

This comment has been minimized.

Copy link
@jonhoo

jonhoo Feb 10, 2017

Contributor

Some((n, self.key_column.unwrap())).into_iter().collect()

This comment has been minimized.

Copy link
@fintelia

fintelia Feb 10, 2017

Author Member
Some((n, self.key_column.unwrap())).into_iter().collect()
[(n, self.key_column.unwrap())].iter().cloned().collect()

This comment has been minimized.

Copy link
@jonhoo

jonhoo Feb 10, 2017

Contributor

Also:

vec!((n, self.key_column.unwrap())).into_iter().collect()

This comment has been minimized.

Copy link
@jonhoo

jonhoo Feb 10, 2017

Contributor

FWIW, there's an RFC on providing a vec!-like macro for HashMap, but it hasn't seen any progress in a while.

} else {
HashMap::new()
}
}

fn resolve(&self, _: usize) -> Option<Vec<(NodeAddress, usize)>> {
Expand Down
8 changes: 7 additions & 1 deletion src/ops/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ use std::sync;
pub enum Record {
Positive(sync::Arc<Vec<query::DataType>>),
Negative(sync::Arc<Vec<query::DataType>>),
DeleteRequest(usize, query::DataType),
}

impl Record {
pub fn rec(&self) -> &[query::DataType] {
match *self {
Record::Positive(ref v) |
Record::Negative(ref v) => &v[..],
Record::DeleteRequest(..) => unreachable!(),
}
}

Expand All @@ -40,6 +42,7 @@ impl Record {
match self {
Record::Positive(v) => (v, true),
Record::Negative(v) => (v, false),
Record::DeleteRequest(..) => unreachable!(),
}
}
}
Expand All @@ -50,6 +53,7 @@ impl Deref for Record {
match *self {
Record::Positive(ref r) |
Record::Negative(ref r) => r,
Record::DeleteRequest(..) => unreachable!(),
}
}
}
Expand All @@ -59,6 +63,7 @@ impl DerefMut for Record {
match *self {
Record::Positive(ref mut r) |
Record::Negative(ref mut r) => r,
Record::DeleteRequest(..) => unreachable!(),
}
}
}
Expand Down Expand Up @@ -205,7 +210,7 @@ pub mod test {

pub fn add_base(&mut self, name: &str, fields: &[&str]) -> NodeAddress {
use ops::base::Base;
let mut i: node::Type = Base {}.into();
let mut i: node::Type = Base::default().into();
i.on_connected(&self.graph);
let ni = self.graph.add_node(Node::new(name, fields, i));
self.graph.add_edge(self.source, ni, false);
Expand Down Expand Up @@ -312,6 +317,7 @@ pub mod test {
match data.into() {
Record::Positive(r) => state.insert(r),
Record::Negative(_) => unreachable!(),
Record::DeleteRequest(..) => unreachable!(),
}
} else {
assert!(false,
Expand Down
Loading

0 comments on commit 76170fc

Please sign in to comment.