Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Graphman new command: index create #3175

Merged
merged 17 commits into from
Jan 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 48 additions & 1 deletion node/src/bin/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,9 @@ pub enum Command {
Chain(ChainCommand),
/// Manipulate internal subgraph statistics
Stats(StatsCommand),

/// Manage database indexes
Index(IndexCommand),
}

impl Command {
Expand Down Expand Up @@ -353,8 +356,37 @@ pub enum StatsCommand {
Analyze {
/// The id of the deployment
id: String,
/// The name of the Entity to ANALYZE
/// The name of the Entity to ANALYZE, in camel case
entity: String,
},
}

#[derive(Clone, Debug, StructOpt)]
pub enum IndexCommand {
/// Creates a new database index.
///
/// The new index will be created concurrenly for the provided entity and its fields. whose
/// names must be declared the in camel case, following GraphQL conventions.
///
/// The index will have its validity checked after the operation and will be dropped if it is
/// invalid.
///
/// This command may be time-consuming.
Create {
/// The id of the deployment
id: String,
/// The Entity name, in camel case.
#[structopt(empty_values = false)]
entity: String,
/// The Field names, in camel case.
#[structopt(min_values = 1, required = true)]
fields: Vec<String>,
/// The index method. Defaults to `btree`.
#[structopt(
short, long, default_value = "btree",
possible_values = &["btree", "hash", "gist", "spgist", "gin", "brin"]
)]
method: String,
},
}

Expand Down Expand Up @@ -709,6 +741,21 @@ async fn main() {
}
}
}
Index(cmd) => {
use IndexCommand::*;
match cmd {
Create {
id,
entity,
fields,
method,
} => {
let store = ctx.store();
let subgraph_store = store.subgraph_store();
commands::index::create(subgraph_store, id, entity, fields, method).await
}
}
}
};
if let Err(e) = result {
die!("error: {}", e)
Expand Down
43 changes: 43 additions & 0 deletions node/src/manager/commands/index.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
use graph::{
components::store::EntityType,
prelude::{anyhow, DeploymentHash, StoreError},
};
use graph_store_postgres::SubgraphStore;
use std::{collections::HashSet, sync::Arc};

fn validate_fields<T: AsRef<str>>(fields: &[T]) -> Result<(), anyhow::Error> {
// Must be non-empty. Double checking, since [`StructOpt`] already checks this.
if fields.is_empty() {
anyhow::bail!("at least one field must be informed")
}
// All values must be unique
let unique: HashSet<_> = fields.iter().map(AsRef::as_ref).collect();
if fields.len() != unique.len() {
anyhow::bail!("entity fields must be unique")
}
Ok(())
}
pub async fn create(
store: Arc<SubgraphStore>,
id: String,
entity_name: String,
field_names: Vec<String>,
index_method: String,
) -> Result<(), anyhow::Error> {
validate_fields(&field_names)?;
let deployment_hash = DeploymentHash::new(id)
.map_err(|e| anyhow::anyhow!("Subgraph hash must be a valid IPFS hash: {}", e))?;
let entity_type = EntityType::new(entity_name);
println!("Index creation started. Please wait.");
match store
.create_manual_index(&deployment_hash, entity_type, field_names, index_method)
.await
{
Ok(()) => Ok(()),
Err(StoreError::Canceled) => {
eprintln!("Index creation attempt faield. Please retry.");
::std::process::exit(1);
}
Err(other) => Err(anyhow::anyhow!(other)),
}
}
1 change: 1 addition & 0 deletions node/src/manager/commands/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ pub mod chain;
pub mod config;
pub mod copy;
pub mod create;
pub mod index;
pub mod info;
pub mod listen;
pub mod query;
Expand Down
34 changes: 33 additions & 1 deletion store/postgres/src/catalog.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use diesel::sql_types::Integer;
use diesel::sql_types::{Bool, Integer};
use diesel::{connection::SimpleConnection, prelude::RunQueryDsl, select};
use diesel::{insert_into, OptionalExtension};
use diesel::{pg::PgConnection, sql_query};
Expand Down Expand Up @@ -382,3 +382,35 @@ pub fn create_foreign_table(
})?;
Ok(query)
}

/// Checks in the database if a given index is valid.
pub(crate) fn check_index_is_valid(
conn: &PgConnection,
schema_name: &str,
index_name: &str,
) -> Result<bool, StoreError> {
#[derive(Queryable, QueryableByName)]
struct ManualIndexCheck {
#[sql_type = "Bool"]
is_valid: bool,
}

let query = "
select
i.indisvalid as is_valid
from
pg_class c
join pg_index i on i.indexrelid = c.oid
join pg_namespace n on c.relnamespace = n.oid
where
n.nspname = $1
and c.relname = $2";
let result = sql_query(query)
.bind::<Text, _>(schema_name)
.bind::<Text, _>(index_name)
.get_result::<ManualIndexCheck>(conn)
.optional()
.map_err::<StoreError, _>(Into::into)?
.map(|check| check.is_valid);
Ok(matches!(result, Some(true)))
}
53 changes: 53 additions & 0 deletions store/postgres/src/deployment_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -693,6 +693,59 @@ impl DeploymentStore {
})
.await
}

/// Creates a new index in the specified Entity table if it doesn't already exist.
///
/// This is a potentially time-consuming operation.
pub(crate) async fn create_manual_index(
&self,
site: Arc<Site>,
entity_type: EntityType,
field_names: Vec<String>,
index_method: String,
) -> Result<(), StoreError> {
let store = self.clone();

self.with_conn(move |conn, _| {
let schema_name = site.namespace.clone();
let layout = store.layout(conn, site)?;
let table = layout.table_for_entity(&entity_type)?;
let table_name = &table.name;

// resolve column names
let column_names = field_names
.iter()
.map(|f| table.column_for_field(f).map(|column| column.name.as_str()))
.collect::<Result<Vec<_>, _>>()?;

let column_names_sep_by_underscores = column_names.join("_");
let column_names_sep_by_commas = column_names.join(", ");
let index_name = format!("manual_{table_name}_{column_names_sep_by_underscores}");

let sql = format!(
"create index concurrently if not exists {index_name} \
on {schema_name}.{table_name} using {index_method} \
({column_names_sep_by_commas})"
);
// This might take a long time.
conn.execute(&sql)?;

// check if the index creation was successfull
let index_is_valid =
catalog::check_index_is_valid(conn, schema_name.as_str(), &index_name)?;
if index_is_valid {
Ok(())
} else {
// Index creation falied. We should drop the index before returning.
let drop_index_sql =
format!("drop index concurrently if exists {schema_name}.{index_name}");
conn.execute(&drop_index_sql)?;
Err(StoreError::Canceled)
}
.map_err(Into::into)
})
.await
}
}

/// Methods that back the trait `graph::components::Store`, but have small
Expand Down
13 changes: 13 additions & 0 deletions store/postgres/src/subgraph_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -956,6 +956,19 @@ impl SubgraphStoreInner {
let (store, site) = self.store(&id)?;
store.analyze(site, entity_type).await
}

pub async fn create_manual_index(
&self,
id: &DeploymentHash,
entity_type: EntityType,
field_names: Vec<String>,
index_method: String,
) -> Result<(), StoreError> {
let (store, site) = self.store(&id)?;
store
.create_manual_index(site, entity_type, field_names, index_method)
.await
}
}

struct EnsLookup {
Expand Down