diff --git a/src/main/java/org/opensearch/agent/tools/LogPatternTool.java b/src/main/java/org/opensearch/agent/tools/LogPatternTool.java index a4ad832e..19e7b9ff 100644 --- a/src/main/java/org/opensearch/agent/tools/LogPatternTool.java +++ b/src/main/java/org/opensearch/agent/tools/LogPatternTool.java @@ -8,30 +8,41 @@ import static org.opensearch.ml.common.utils.StringUtils.gson; import java.security.AccessController; +import java.security.PrivilegedActionException; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; +import java.util.Arrays; import java.util.Comparator; import java.util.HashMap; import java.util.List; +import java.util.Locale; import java.util.Map; -import java.util.regex.Pattern; +import java.util.function.Function; +import java.util.stream.IntStream; import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; +import org.json.JSONObject; import org.opensearch.action.search.SearchRequest; import org.opensearch.action.search.SearchResponse; +import org.opensearch.agent.tools.utils.BrainLogParser; +import org.opensearch.agent.tools.utils.ToolHelper; import org.opensearch.client.Client; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.logging.LoggerMessageFormat; +import org.opensearch.core.common.util.CollectionUtils; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.ml.common.spi.tools.ToolAnnotation; import org.opensearch.search.SearchHit; +import org.opensearch.sql.plugin.transport.PPLQueryAction; +import org.opensearch.sql.plugin.transport.TransportPPLQueryRequest; +import org.opensearch.sql.plugin.transport.TransportPPLQueryResponse; +import org.opensearch.sql.ppl.domain.PPLQueryRequest; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableSet; +import com.google.gson.reflect.TypeToken; import lombok.Builder; -import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.Setter; import lombok.extern.log4j.Log4j2; @@ -39,12 +50,9 @@ /** * This tool supports generating log patterns on the input dsl and index. It's implemented by * several steps: - * 1. Retrival [[${DOC_SIZE_FIELD}]] logs from index - * 2. Extract patterns for each retrieved log - * 2.1 Find Pattern Field: If users provide parameter [[${PATTERN_FIELD}]], use it as the pattern + * 1. Retrival [[${DOC_SIZE_FIELD}]] logs from index by either dsl or ppl query + * 2. Extract patterns for input logs: If users provide parameter [[${PATTERN_FIELD}]], use it as the pattern * field; Otherwise, find the string field with the longest length on the first log. - * 2.2 Extract Pattern: If users provide parameter [[${PATTERN}]], compile it as a pattern; - * Otherwise, use [[${DEFAULT_IGNORED_CHARS}]]. It will remove all chars matching the pattern. * 3. Group logs by their extracted patterns. * 4. Find top N patterns with the largest sample log size. * 5. For each found top N patterns, return [[${SAMPLE_LOG_SIZE}]] sample logs. @@ -60,18 +68,20 @@ public class LogPatternTool extends AbstractRetrieverTool { public static final String TOP_N_PATTERN = "top_n_pattern"; public static final String SAMPLE_LOG_SIZE = "sample_log_size"; public static final String PATTERN_FIELD = "pattern_field"; - public static final String PATTERN = "pattern"; + public static final String PPL_FIELD = "ppl"; + public static final String VARIABLE_COUNT_THRESHOLD = "variable_count_threshold"; public static final int LOG_PATTERN_DEFAULT_DOC_SIZE = 1000; public static final int DEFAULT_TOP_N_PATTERN = 3; public static final int DEFAULT_SAMPLE_LOG_SIZE = 20; - private static final ImmutableSet DEFAULT_IGNORED_CHARS = ImmutableSet - .copyOf("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789".chars().mapToObj(c -> (char) c).toArray(Character[]::new)); + public static final int DEFAULT_VARIABLE_COUNT_THRESHOLD = 5; + + private static final float DEFAULT_THRESHOLD_PERCENTAGE = 0.3f; + private static final String PPL_SCHEMA_NAME = "name"; private String name = TYPE; private int topNPattern; private int sampleLogSize; - @EqualsAndHashCode.Exclude - private Pattern pattern; + private BrainLogParser logParser; @Builder public LogPatternTool( @@ -80,125 +90,98 @@ public LogPatternTool( int docSize, int topNPattern, int sampleLogSize, - String patternStr + int variableCountThreshold ) { super(client, xContentRegistry, null, null, docSize); checkPositive(topNPattern, TOP_N_PATTERN); checkPositive(sampleLogSize, SAMPLE_LOG_SIZE); + checkPositive(variableCountThreshold, VARIABLE_COUNT_THRESHOLD); this.topNPattern = topNPattern; this.sampleLogSize = sampleLogSize; - if (patternStr != null) - this.pattern = Pattern.compile(patternStr); + this.logParser = new BrainLogParser(variableCountThreshold, DEFAULT_THRESHOLD_PERCENTAGE); } @Override protected String getQueryBody(String queryText) { - return queryText; + return removeDSLAggregations(queryText); } @Override public void run(Map parameters, ActionListener listener) { - int topNPattern = parameters.containsKey(TOP_N_PATTERN) ? getPositiveInteger(parameters, TOP_N_PATTERN) : this.topNPattern; - int sampleLogSize = parameters.containsKey(SAMPLE_LOG_SIZE) ? getPositiveInteger(parameters, SAMPLE_LOG_SIZE) : this.sampleLogSize; - Pattern pattern = parameters.containsKey(PATTERN) ? Pattern.compile(parameters.get(PATTERN)) : this.pattern; + String dsl = parameters.get(INPUT_FIELD); + String ppl = parameters.get(PPL_FIELD); + if (!StringUtils.isBlank(dsl)) { + SearchRequest searchRequest; + try { + searchRequest = buildSearchRequest(parameters); + } catch (Exception e) { + log.error("Failed to build search request.", e); + listener.onFailure(e); + return; + } - SearchRequest searchRequest; - try { - searchRequest = buildSearchRequest(parameters); - } catch (Exception e) { - log.error("Failed to build search request.", e); - listener.onFailure(e); - return; - } + ActionListener actionListener = ActionListener.wrap(r -> { + SearchHit[] hits = r.getHits().getHits(); - ActionListener actionListener = ActionListener.wrap(r -> { - SearchHit[] hits = r.getHits().getHits(); - - if (hits != null && hits.length > 0) { - Map firstLogSource = hits[0].getSourceAsMap(); - String patternField = parameters.containsKey(PATTERN_FIELD) - ? parameters.get(PATTERN_FIELD) - : findLongestField(firstLogSource); - if (patternField == null) { - throw new IllegalArgumentException("Pattern field is not set and this index doesn't contain any string field"); - } else if (!firstLogSource.containsKey(patternField)) { - throw new IllegalArgumentException( - LoggerMessageFormat - .format( - null, - "Invalid parameter pattern_field: index {} does not have a field named {}", - parameters.getOrDefault(INDEX_FIELD, index), - patternField - ) - ); - } else if (!(firstLogSource.get(patternField) instanceof String)) { - throw new IllegalArgumentException( - LoggerMessageFormat - .format( - null, - "Invalid parameter pattern_field: pattern field {} in index {} is not type of String", - patternField, - parameters.getOrDefault(INDEX_FIELD, index) - ) - ); - } - Map>> patternGroups = new HashMap<>(); - for (SearchHit hit : hits) { - Map source = hit.getSourceAsMap(); - String patternValue = extractPattern((String) source.getOrDefault(patternField, ""), pattern); - List> group = patternGroups.computeIfAbsent(patternValue, k -> new ArrayList<>()); - group.add(source); + if (!CollectionUtils.isEmpty(hits)) { + Map firstLogSource = hits[0].getSourceAsMap(); + + Function> logMessagesProvider = (String patternField) -> Arrays.stream(hits).map(hit -> { + Map source = hit.getSourceAsMap(); + return (String) source.getOrDefault(patternField, ""); + }).collect(Collectors.toList()); + + onResponseSortedLogPatterns(parameters, listener, firstLogSource, logMessagesProvider); + } else { + listener.onResponse((T) "Can not get any match from search result."); } - List> sortedEntries = patternGroups - .entrySet() - .stream() - .sorted(Comparator.comparingInt(entry -> -entry.getValue().size())) - .limit(topNPattern) - .map( - entry -> Map - .of( - "total count", - entry.getValue().size(), - "pattern", - entry.getKey(), - "sample logs", - entry.getValue().subList(0, Math.min(entry.getValue().size(), sampleLogSize)) - ) - ) - .collect(Collectors.toList()); + }, e -> { + log.error("Failed to search index.", e); + listener.onFailure(e); + }); + client.search(searchRequest, actionListener); + } else if (!StringUtils.isBlank(ppl)) { + String prunedPPL = removePPLAggregations(ppl); + PPLQueryRequest pplQueryRequest = new PPLQueryRequest(prunedPPL, null, null, "jdbc"); + TransportPPLQueryRequest transportPPLQueryRequest = new TransportPPLQueryRequest(pplQueryRequest); - listener - .onResponse((T) AccessController.doPrivileged((PrivilegedExceptionAction) () -> gson.toJson(sortedEntries))); - } else { - listener.onResponse((T) "Can not get any match from search result."); - } - }, e -> { - log.error("Failed to search index.", e); - listener.onFailure(e); - }); - client.search(searchRequest, actionListener); - } + ActionListener actionListener = ActionListener.wrap(r -> { + String results = r.getResult(); + Map pplResult = gson.fromJson(results, new TypeToken>() { + }.getType()); + List> schema = (List>) pplResult.getOrDefault("schema", new ArrayList<>()); + List> dataRows = (List>) pplResult.getOrDefault("datarows", new ArrayList<>()); + List firstDataRow = dataRows.isEmpty() ? new ArrayList<>() : dataRows.getFirst(); + if (!firstDataRow.isEmpty()) { + Map firstLogSource = new HashMap<>(); + IntStream + .range(0, schema.size()) + .boxed() + .filter(i -> schema.get(i) != null && !StringUtils.isBlank(schema.get(i).get(PPL_SCHEMA_NAME))) + .forEach(i -> firstLogSource.put(schema.get(i).get(PPL_SCHEMA_NAME), firstDataRow.get(i))); - /** - * Extract a pattern from the value of a field by removing chars in the pattern. This function - * imitates the same logic of Observability log pattern feature here: - * parseValue - * @param rawString string value of the field to generate a pattern - * @param pattern @Nullable the specified pattern to remove, use DEFAULT_IGNORED_CHARS if null - * @return the generated pattern value - */ - @VisibleForTesting - static String extractPattern(String rawString, Pattern pattern) { - if (pattern != null) - return pattern.matcher(rawString).replaceAll(""); - char[] chars = rawString.toCharArray(); - int pos = 0; - for (int i = 0; i < chars.length; i++) { - if (!DEFAULT_IGNORED_CHARS.contains(chars[i])) { - chars[pos++] = chars[i]; - } + Function> logMessagesProvider = (String patternField) -> IntStream + .range(0, schema.size()) + .boxed() + .filter(i -> schema.get(i) != null && patternField.equals(schema.get(i).get(PPL_SCHEMA_NAME))) + .findFirst() + .map(fieldIndex -> dataRows.stream().map(dataRow -> (String) dataRow.get(fieldIndex)).toList()) + .orElseGet(ArrayList::new); + + onResponseSortedLogPatterns(parameters, listener, firstLogSource, logMessagesProvider); + } else { + listener.onResponse((T) "Can not get any data row from ppl response."); + } + }, e -> { + log.error("Failed to query ppl.", e); + listener.onFailure(e); + }); + client.execute(PPLQueryAction.INSTANCE, transportPPLQueryRequest, ToolHelper.getPPLTransportActionListener(actionListener)); + } else { + Exception e = new IllegalArgumentException("Both DSL and PPL input is null or empty, can not process it."); + log.error("Failed to find searchable query.", e); + listener.onFailure(e); } - return new String(chars, 0, pos); } /** @@ -234,8 +217,106 @@ public String getType() { @Override public boolean validate(Map parameters) { - // LogPatternTool needs to pass index and input as parameter in runtime. - return super.validate(parameters) && parameters.containsKey(INDEX_FIELD) && !StringUtils.isBlank(parameters.get(INDEX_FIELD)); + // LogPatternTool needs to pass dsl input with index or ppl as parameter in runtime. + return parameters != null + && !parameters.isEmpty() + && ((!StringUtils.isBlank(parameters.get(INPUT_FIELD)) && !StringUtils.isBlank(parameters.get(INDEX_FIELD))) + || !StringUtils.isBlank(parameters.get(PPL_FIELD))); + } + + private void onResponseSortedLogPatterns( + Map parameters, + ActionListener listener, + Map firstLogSource, + Function> logMessagesProvider + ) throws PrivilegedActionException { + String patternField = parameters.getOrDefault(PATTERN_FIELD, findLongestField(firstLogSource)); + validatePatternFieldAndFirstLogSource(parameters, patternField, firstLogSource); + List logMessages = logMessagesProvider.apply(patternField); + List> sortedEntries = getTopNLogPatterns(parameters, logMessages); + + listener.onResponse((T) AccessController.doPrivileged((PrivilegedExceptionAction) () -> gson.toJson(sortedEntries))); + } + + private List> getTopNLogPatterns(Map parameters, List logMessages) { + int topNPattern = parameters.containsKey(TOP_N_PATTERN) ? getPositiveInteger(parameters, TOP_N_PATTERN) : this.topNPattern; + int sampleLogSize = parameters.containsKey(SAMPLE_LOG_SIZE) ? getPositiveInteger(parameters, SAMPLE_LOG_SIZE) : this.sampleLogSize; + + Map> logPatternMap = logParser.parseAllLogPatterns(logMessages); + + return logPatternMap + .entrySet() + .stream() + .sorted(Comparator.comparingInt(entry -> -entry.getValue().size())) + .limit(topNPattern) + .map( + entry -> Map + .of( + "total count", + entry.getValue().size(), + "pattern", + entry.getKey(), + "sample logs", + entry + .getValue() + .subList(0, Math.min(entry.getValue().size(), sampleLogSize)) + .stream() + .map(logId -> logMessages.get(Integer.parseInt(logId))) + .toList() + ) + ) + .toList(); + } + + private void validatePatternFieldAndFirstLogSource( + Map parameters, + String patternField, + Map firstLogSource + ) { + if (patternField == null) { + throw new IllegalArgumentException("Pattern field is not set and this index doesn't contain any string field"); + } else if (!firstLogSource.containsKey(patternField)) { + throw new IllegalArgumentException( + LoggerMessageFormat + .format( + null, + "Invalid parameter pattern_field: index {} does not have a field named {}", + parameters.getOrDefault(INDEX_FIELD, index), + patternField + ) + ); + } else if (!(firstLogSource.get(patternField) instanceof String)) { + throw new IllegalArgumentException( + LoggerMessageFormat + .format( + null, + "Invalid parameter pattern_field: pattern field {} in index {} is not type of String", + patternField, + parameters.getOrDefault(INDEX_FIELD, index) + ) + ); + } + } + + private String removeDSLAggregations(String dsl) { + JSONObject dslObj = new JSONObject(dsl); + // DSL request is a json blob. Aggregations usually have keys 'aggs' or 'aggregations' + dslObj.remove("aggs"); + dslObj.remove("aggregations"); + return dslObj.toString(); + } + + private String removePPLAggregations(String ppl) { + String normPPL = ppl.replaceAll("\\s+", " "); + /* + * Remove all following query starting with stats as they rely on aggregation results. + * We don't convert ppl string to lower case or upper case and directly use converted ppl + * because some enum parameters of functions are case-sensitive. + * i.e. TIMESTAMPADD(DAY, -1, '2025-01-01 00:00:00') is different from TIMESTAMPADD(day, -1, '2025-01-01 00:00:00') + * The latter one is not parsed well by PPLService. + */ + int idx = normPPL.toUpperCase(Locale.ROOT).indexOf("| STATS"); + return idx != -1 ? normPPL.substring(0, idx).trim() : ppl; } private static int getPositiveInteger(Map params, String paramName) { @@ -285,7 +366,9 @@ public LogPatternTool create(Map params) { int docSize = params.containsKey(DOC_SIZE_FIELD) ? getPositiveInteger(params, DOC_SIZE_FIELD) : LOG_PATTERN_DEFAULT_DOC_SIZE; int topNPattern = params.containsKey(TOP_N_PATTERN) ? getPositiveInteger(params, TOP_N_PATTERN) : DEFAULT_TOP_N_PATTERN; int sampleLogSize = params.containsKey(SAMPLE_LOG_SIZE) ? getPositiveInteger(params, SAMPLE_LOG_SIZE) : DEFAULT_SAMPLE_LOG_SIZE; - String patternStr = params.containsKey(PATTERN) ? (String) params.get(PATTERN) : null; + int variableCountThreshold = params.containsKey(VARIABLE_COUNT_THRESHOLD) + ? getPositiveInteger(params, VARIABLE_COUNT_THRESHOLD) + : DEFAULT_VARIABLE_COUNT_THRESHOLD; return LogPatternTool .builder() .client(client) @@ -293,7 +376,7 @@ public LogPatternTool create(Map params) { .docSize(docSize) .topNPattern(topNPattern) .sampleLogSize(sampleLogSize) - .patternStr(patternStr) + .variableCountThreshold(variableCountThreshold) .build(); } diff --git a/src/main/java/org/opensearch/agent/tools/PPLTool.java b/src/main/java/org/opensearch/agent/tools/PPLTool.java index 621426ac..a2f998cd 100644 --- a/src/main/java/org/opensearch/agent/tools/PPLTool.java +++ b/src/main/java/org/opensearch/agent/tools/PPLTool.java @@ -35,7 +35,6 @@ import org.opensearch.client.Client; import org.opensearch.cluster.metadata.MappingMetadata; import org.opensearch.core.action.ActionListener; -import org.opensearch.core.action.ActionResponse; import org.opensearch.index.query.MatchAllQueryBuilder; import org.opensearch.ml.common.FunctionName; import org.opensearch.ml.common.dataset.remote.RemoteInferenceInputDataSet; @@ -52,7 +51,6 @@ import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.sql.plugin.transport.PPLQueryAction; import org.opensearch.sql.plugin.transport.TransportPPLQueryRequest; -import org.opensearch.sql.plugin.transport.TransportPPLQueryResponse; import org.opensearch.sql.ppl.domain.PPLQueryRequest; import com.google.gson.Gson; @@ -228,7 +226,7 @@ public void run(Map parameters, ActionListener listener) .execute( PPLQueryAction.INSTANCE, transportPPLQueryRequest, - getPPLTransportActionListener(ActionListener.wrap(transportPPLQueryResponse -> { + ToolHelper.getPPLTransportActionListener(ActionListener.wrap(transportPPLQueryResponse -> { String results = transportPPLQueryResponse.getResult(); Map returnResults = ImmutableMap.of("ppl", ppl, "executionResult", results); listener @@ -439,10 +437,6 @@ private static void extractSamples(Map sampleSource, Map ActionListener getPPLTransportActionListener(ActionListener listener) { - return ActionListener.wrap(r -> { listener.onResponse(TransportPPLQueryResponse.fromActionResponse(r)); }, listener::onFailure); - } - @SuppressWarnings("unchecked") private void extractFromChatParameters(Map parameters) { if (parameters.containsKey("input")) { diff --git a/src/main/java/org/opensearch/agent/tools/utils/BrainLogParser.java b/src/main/java/org/opensearch/agent/tools/utils/BrainLogParser.java new file mode 100644 index 00000000..fe9fc676 --- /dev/null +++ b/src/main/java/org/opensearch/agent/tools/utils/BrainLogParser.java @@ -0,0 +1,336 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.agent.tools.utils; + +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.OptionalLong; +import java.util.Set; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * Log parser Brain algorithm implementation. See: https://ieeexplore.ieee.org/document/10109145 + */ +public class BrainLogParser { + + private static final String VARIABLE_DENOTER = "<*>"; + private static final Map DEFAULT_FILTER_PATTERN_VARIABLE_MAP = new LinkedHashMap<>(); + static { + // IP + DEFAULT_FILTER_PATTERN_VARIABLE_MAP.put(Pattern.compile("(/|)([0-9]+\\.){3}[0-9]+(:[0-9]+|)(:|)"), "<*IP*>"); + // Simple ISO date and time + DEFAULT_FILTER_PATTERN_VARIABLE_MAP + .put(Pattern.compile("(\\d{4}-\\d{2}-\\d{2})[T ]?(\\d{2}:\\d{2}:\\d{2})(\\.\\d{3})?(Z|([+-]\\d{2}:?\\d{2}))?"), "<*DATETIME*>"); + // Hex Decimal, letters followed by digits, float numbers, 4 or more digits in case of leaving 3 digit response code alone + DEFAULT_FILTER_PATTERN_VARIABLE_MAP + .put(Pattern.compile("((0x|0X)[0-9a-fA-F]+)|[a-zA-Z]+\\d+|([+-]?(?!\\d{3}$)\\d{4,}(\\.\\d*)?|\\.\\d+)"), VARIABLE_DENOTER); + // generic number surrounded by non-alphanumeric + DEFAULT_FILTER_PATTERN_VARIABLE_MAP.put(Pattern.compile("(?<=[^A-Za-z0-9 ])(-?\\+?\\d+)(?=[^A-Za-z0-9])"), VARIABLE_DENOTER); + } + private static final List DEFAULT_DELIMITERS = List.of(",", "+"); + // counting frequency will be grouped by composite of position and token string + private static final String POSITIONED_TOKEN_KEY_FORMAT = "%d-%s"; + // Token set will be grouped by composite of tokens length per log message, word combination candidate and token position. + private static final String GROUP_TOKEN_SET_KEY_FORMAT = "%d-%s-%d"; + // By default, algorithm treats more than 2 different tokens in the group per position as variable token + private static final int DEFAULT_VARIABLE_COUNT_THRESHOLD = 5; + /* + * By default, algorithm treats the longest word combinations as the group root, no matter what its frequency is. + * Otherwise, the longest word combination will be selected when frequency >= highest frequency of log * threshold percentage + */ + private static final float DEFAULT_FREQUENCY_THRESHOLD_PERCENTAGE = 0.3f; + + private final Map tokenFreqMap; + private final Map> groupTokenSetMap; + private final Map logIdGroupCandidateMap; + private final int variableCountThreshold; + private final float thresholdPercentage; + private final Map filterPatternVariableMap; + private final List delimiters; + + /** + * Creates new Brain log parser with default parameters + */ + public BrainLogParser() { + this( + DEFAULT_VARIABLE_COUNT_THRESHOLD, + DEFAULT_FREQUENCY_THRESHOLD_PERCENTAGE, + DEFAULT_FILTER_PATTERN_VARIABLE_MAP, + DEFAULT_DELIMITERS + ); + } + + /** + * Creates new Brain log parser with overridden variableCountThreshold and thresholdPercentage + * @param variableCountThreshold the threshold to decide whether low frequency token is variable + * @param thresholdPercentage the threshold percentage to decide which frequency is representative + * frequency per log message + */ + public BrainLogParser(int variableCountThreshold, float thresholdPercentage) { + this(variableCountThreshold, thresholdPercentage, DEFAULT_FILTER_PATTERN_VARIABLE_MAP, DEFAULT_DELIMITERS); + } + + /** + * Creates new Brain log parser with overridden filter patterns and delimiters + * @param filterPatternVariableMap a map of regex patterns to variable denoter, with which the matched pattern will be replaced, + * recommend to use LinkedHashMap to make sure patterns in order + * @param delimiters a list of delimiters to be replaced with empty string after regex replacement + */ + public BrainLogParser(Map filterPatternVariableMap, List delimiters) { + this(DEFAULT_VARIABLE_COUNT_THRESHOLD, DEFAULT_FREQUENCY_THRESHOLD_PERCENTAGE, filterPatternVariableMap, delimiters); + } + + /** + * Creates new Brain log parser with overridden variableCountThreshold and thresholdPercentage and + * overridden filter patterns and delimiters + * @param variableCountThreshold the threshold to decide whether low frequency token is variable + * @param thresholdPercentage the threshold percentage to decide which frequency is representative + * frequency per log message + * @param filterPatternVariableMap a map of regex patterns to variable denoter, with which the matched pattern will be replaced, + * recommend to use LinkedHashMap to make sure patterns in order + * @param delimiters a list of delimiters to be replaced with empty string after regex replacement + */ + public BrainLogParser( + int variableCountThreshold, + float thresholdPercentage, + Map filterPatternVariableMap, + List delimiters + ) { + if (thresholdPercentage < 0.0f || thresholdPercentage > 1.0f) { + throw new IllegalArgumentException("Threshold percentage must be between 0.0 and 1.0"); + } + this.tokenFreqMap = new HashMap<>(); + this.groupTokenSetMap = new HashMap<>(); + this.logIdGroupCandidateMap = new HashMap<>(); + this.variableCountThreshold = variableCountThreshold; + this.thresholdPercentage = thresholdPercentage; + this.filterPatternVariableMap = filterPatternVariableMap; + this.delimiters = delimiters; + } + + /** + * Preprocess single line of log message with logId + * @param logMessage log message body per log + * @param logId logId of the log + * @return list of tokens by splitting preprocessed log message + */ + public List preprocess(String logMessage, String logId) { + if (logMessage == null || logId == null) { + throw new IllegalArgumentException("log message or logId must not be null"); + } + // match regex and replace it with variable denoter in order + for (Map.Entry patternVariablePair : filterPatternVariableMap.entrySet()) { + logMessage = patternVariablePair.getKey().matcher(logMessage).replaceAll(patternVariablePair.getValue()); + } + + for (String delimiter : delimiters) { + logMessage = logMessage.replace(delimiter, " "); + } + + // Append logId/docId to the end of the split tokens + logMessage = logMessage.trim() + " " + logId; + + return Arrays.asList(logMessage.split("\\s+")); + } + + /** + * Count token frequency per position/index in the token list + * @param tokens list of tokens from preprocessed log message + */ + public void processTokenHistogram(List tokens) { + // Ignore last element since it's designed to be appended logId + for (int i = 0; i < tokens.size() - 1; i++) { + String tokenKey = String.format(Locale.ROOT, POSITIONED_TOKEN_KEY_FORMAT, i, tokens.get(i)); + tokenFreqMap.compute(tokenKey, (k, v) -> v == null ? 1 : v + 1); + } + } + + /** + * Preprocess all lines of log messages with logId list. Empty logId list is allowed as the index within + * the list will be logId by default + * @param logMessages list of log messages + * @return list of token lists + */ + public List> preprocessAllLogs(List logMessages) { + List> preprocessedLogs = new ArrayList<>(); + + for (int i = 0; i < logMessages.size(); i++) { + String logId = String.valueOf(i); + List tokens = this.preprocess(logMessages.get(i), logId); + preprocessedLogs.add(tokens); + this.processTokenHistogram(tokens); + } + + return preprocessedLogs; + } + + /** + * The second process step to calculate initial groups of tokens based on previous token histogram. + * The group will be represented by the representative word combination of the log message. The word + * combination usually selects the longest word combination with the same frequency that should be above + * designed threshold. + *

+ * Within initial group, new group level token set per position is counted for final log pattern calculation + * @param preprocessedLogs preprocessed list of log messages + */ + public void calculateGroupTokenFreq(List> preprocessedLogs) { + for (List tokens : preprocessedLogs) { + Map wordOccurrences = this.getWordOccurrences(tokens); + List sortedWordCombinations = wordOccurrences + .entrySet() + .stream() + .map(entry -> new WordCombination(entry.getKey(), entry.getValue())) + .sorted() + .toList(); + WordCombination candidate = this.findCandidate(sortedWordCombinations); + String groupCandidateStr = String.format(Locale.ROOT, "%d,%d", candidate.wordFreq(), candidate.sameFreqCount()); + this.logIdGroupCandidateMap.put(tokens.getLast(), groupCandidateStr); + this.updateGroupTokenFreqMap(tokens, groupCandidateStr); + } + } + + /** + * Parse single line of log pattern after preprocess - processTokenHistogram - calculateGroupTokenFreq + * @param tokens list of tokens for a specific log message + * @return parsed log pattern that is a list of string + */ + public List parseLogPattern(List tokens) { + String logId = tokens.getLast(); + String groupCandidateStr = this.logIdGroupCandidateMap.get(logId); + String[] groupCandidate = groupCandidateStr.split(","); + Long repFreq = Long.parseLong(groupCandidate[0]); // representative frequency of the group + return IntStream.range(0, tokens.size() - 1).mapToObj(i -> new AbstractMap.SimpleEntry<>(i, tokens.get(i))).map(entry -> { + int index = entry.getKey(); + String token = entry.getValue(); + String tokenKey = String.format(Locale.ROOT, POSITIONED_TOKEN_KEY_FORMAT, index, token); + assert this.tokenFreqMap.get(tokenKey) != null : String.format(Locale.ROOT, "Not found token: %s on position %d", token, index); + + boolean isHigherFrequency = this.tokenFreqMap.get(tokenKey) > repFreq; + boolean isLowerFrequency = this.tokenFreqMap.get(tokenKey) < repFreq; + String groupTokenKey = String.format(Locale.ROOT, GROUP_TOKEN_SET_KEY_FORMAT, tokens.size() - 1, groupCandidateStr, index); + assert this.groupTokenSetMap.get(groupTokenKey) != null : String + .format(Locale.ROOT, "Not found any token in group: %s", groupTokenKey); + + if (isHigherFrequency) { + // For higher frequency token that doesn't belong to word combination, it's likely to be constant token only if + // it's unique token on that position within the group + boolean isUniqueToken = this.groupTokenSetMap.get(groupTokenKey).size() == 1; + if (!isUniqueToken) { + return VARIABLE_DENOTER; + } + } else if (isLowerFrequency) { + // For lower frequency token that doesn't belong to word combination, it's likely to be constant token only if + // it doesn't exceed the preset variable count threshold. For example, some variable are limited number of enums, + // and sometimes they could be treated as constant tokens. + if (this.groupTokenSetMap.get(groupTokenKey).size() >= variableCountThreshold) { + return VARIABLE_DENOTER; + } + } + return token; + }).collect(Collectors.toList()); + } + + /** + * Parse all lines of log messages to generate the log pattern map. + * @param logMessages all lines of log messages + * @return log pattern map with log pattern string as key, grouped logIds as value + */ + public Map> parseAllLogPatterns(List logMessages) { + List> processedMessages = this.preprocessAllLogs(logMessages); + + this.calculateGroupTokenFreq(processedMessages); + + Map> logPatternMap = new HashMap<>(); + for (List processedMessage : processedMessages) { + String logId = processedMessage.getLast(); + List logPattern = this.parseLogPattern(processedMessage); + String patternKey = String.join(" ", logPattern); + logPatternMap.computeIfAbsent(patternKey, k -> new ArrayList<>()).add(logId); + } + return logPatternMap; + } + + /** + * Get token histogram + * @return map of token per position key and its frequency + */ + public Map getTokenFreqMap() { + return this.tokenFreqMap; + } + + /** + * Get group per length per position to its token set map + * @return map of pattern group per length per position key and its token set + */ + public Map> getGroupTokenSetMap() { + return this.groupTokenSetMap; + } + + /** + * Get logId to its group candidate map + * @return map of logId and group candidate + */ + public Map getLogIdGroupCandidateMap() { + return this.logIdGroupCandidateMap; + } + + private Map getWordOccurrences(List tokens) { + Map occurrences = new HashMap<>(); + for (int i = 0; i < tokens.size() - 1; i++) { + String tokenKey = String.format(Locale.ROOT, POSITIONED_TOKEN_KEY_FORMAT, i, tokens.get(i)); + Long tokenFreq = tokenFreqMap.get(tokenKey); + occurrences.compute(tokenFreq, (k, v) -> v == null ? 1 : v + 1); + } + return occurrences; + } + + private WordCombination findCandidate(List sortedWordCombinations) { + if (sortedWordCombinations.isEmpty()) { + throw new IllegalArgumentException("Sorted word combinations must be non empty"); + } + OptionalLong maxFreqOptional = sortedWordCombinations.stream().mapToLong(WordCombination::wordFreq).max(); + long maxFreq = maxFreqOptional.getAsLong(); + float threshold = maxFreq * this.thresholdPercentage; + for (WordCombination wordCombination : sortedWordCombinations) { + if (wordCombination.wordFreq() > threshold) { + return wordCombination; + } + } + return sortedWordCombinations.getFirst(); + } + + private void updateGroupTokenFreqMap(List tokens, String groupCandidateStr) { + int tokensLen = tokens.size() - 1; + for (int i = 0; i < tokensLen; i++) { + String groupTokenFreqKey = String.format(Locale.ROOT, GROUP_TOKEN_SET_KEY_FORMAT, tokensLen, groupCandidateStr, i); + this.groupTokenSetMap.computeIfAbsent(groupTokenFreqKey, k -> new HashSet<>()).add(tokens.get(i)); + } + } + + private record WordCombination(Long wordFreq, Integer sameFreqCount) implements Comparable { + + @Override + public int compareTo(WordCombination other) { + // Compare by same frequency count in descending order + int wordFreqComparison = other.sameFreqCount.compareTo(this.sameFreqCount); + if (wordFreqComparison != 0) { + return wordFreqComparison; + } + + // If sameFreqCount are the same, compare by wordFreq in descending order + return other.wordFreq.compareTo(this.wordFreq); + } + } +} diff --git a/src/main/java/org/opensearch/agent/tools/utils/ToolHelper.java b/src/main/java/org/opensearch/agent/tools/utils/ToolHelper.java index b60f46c9..e4ff38f8 100644 --- a/src/main/java/org/opensearch/agent/tools/utils/ToolHelper.java +++ b/src/main/java/org/opensearch/agent/tools/utils/ToolHelper.java @@ -11,7 +11,10 @@ import java.util.HashMap; import java.util.Map; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.action.ActionResponse; import org.opensearch.ml.common.utils.StringUtils; +import org.opensearch.sql.plugin.transport.TransportPPLQueryResponse; import lombok.extern.log4j.Log4j2; @@ -75,4 +78,15 @@ public static void extractFieldNamesTypes( } } } + + /** + * Wrapper to get PPL transport action listener + * @param listener input action listener + * @return wrapped action listener + */ + public static ActionListener getPPLTransportActionListener( + ActionListener listener + ) { + return ActionListener.wrap(r -> { listener.onResponse(TransportPPLQueryResponse.fromActionResponse(r)); }, listener::onFailure); + } } diff --git a/src/test/java/org/opensearch/agent/tools/LogPatternToolTests.java b/src/test/java/org/opensearch/agent/tools/LogPatternToolTests.java index 1f469b43..b8c8ddaf 100644 --- a/src/test/java/org/opensearch/agent/tools/LogPatternToolTests.java +++ b/src/test/java/org/opensearch/agent/tools/LogPatternToolTests.java @@ -13,14 +13,17 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.opensearch.agent.tools.AbstractRetrieverTool.DOC_SIZE_FIELD; -import static org.opensearch.agent.tools.LogPatternTool.PATTERN; +import static org.opensearch.agent.tools.AbstractRetrieverTool.INDEX_FIELD; +import static org.opensearch.agent.tools.AbstractRetrieverTool.INPUT_FIELD; +import static org.opensearch.agent.tools.LogPatternTool.PPL_FIELD; import static org.opensearch.agent.tools.LogPatternTool.SAMPLE_LOG_SIZE; import static org.opensearch.agent.tools.LogPatternTool.TOP_N_PATTERN; -import static org.opensearch.integTest.BaseAgentToolsIT.gson; +import static org.opensearch.ml.common.utils.StringUtils.gson; import java.io.IOException; import java.nio.file.Files; @@ -28,9 +31,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.regex.Pattern; import org.hamcrest.MatcherAssert; +import org.json.JSONObject; import org.junit.Before; import org.junit.Test; import org.mockito.Mock; @@ -43,6 +46,8 @@ import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.search.SearchHit; import org.opensearch.search.SearchHits; +import org.opensearch.sql.plugin.transport.PPLQueryAction; +import org.opensearch.sql.plugin.transport.TransportPPLQueryResponse; import com.google.common.collect.ImmutableMap; import com.google.gson.JsonElement; @@ -52,19 +57,26 @@ public class LogPatternToolTests { public static String responseBodyResourceFile = "org/opensearch/agent/tools/expected_flow_agent_of_log_pattern_tool_response_body.json"; - public static final String TEST_QUERY_TEXT = "123fsd23134sdfouh"; + public static final String TEST_QUERY_TEXT = + """ + {"size":2,"query":{"bool":{"filter":[{"range":{"timestamp":{"from":"1734404246000||-1d","to":"1734404246000","include_lower":true,"include_upper":true,"format":"epoch_millis","boost":1}}},{"range":{"bytes":{"from":0,"to":null,"include_lower":true,"include_upper":true,"boost":1}}}],"adjust_pure_negative":true,"boost":1}}}"""; private Map params = new HashMap<>(); private final Client client = mock(Client.class); @Mock private SearchResponse searchResponse; @Mock private SearchHits searchHits; + @Mock + private TransportPPLQueryResponse pplQueryResponse; @SneakyThrows @Before public void setup() { MockitoAnnotations.openMocks(this); LogPatternTool.Factory.getInstance().init(client, null); + } + + private void mockDSLInvocation() throws IOException { List fields = List.of("field1", "field2", "field3"); SearchHit[] hits = new SearchHit[] { createHit(0, null, fields, List.of("123", "123.abc-AB * De /", 12345)), @@ -94,6 +106,19 @@ private BytesReference createSource(List fieldNames, List fieldC return (BytesReference.bytes(builder)); } + private void mockPPLInvocation() throws IOException { + String pplRawResponse = + """ + {"schema":[{"name":"field1","type":"string"},{"name":"field2","type":"string"},{"name":"field3","type":"long"}],"datarows":[["123","123.abc-AB * De /",12345],["123","45.abc-AB * De /",12345],["123","12.abc_AB * De /",12345],["123","45.ab_AB * De /",12345],["123",".abAB * De /",12345]],"total":5,"size":5} + """; + doAnswer(invocation -> { + ActionListener listener = (ActionListener) invocation.getArguments()[2]; + listener.onResponse(pplQueryResponse); + return null; + }).when(client).execute(eq(PPLQueryAction.INSTANCE), any(), any()); + when(pplQueryResponse.getResult()).thenReturn(pplRawResponse); + } + @Test @SneakyThrows public void testCreateTool() { @@ -101,7 +126,6 @@ public void testCreateTool() { assertEquals(LogPatternTool.LOG_PATTERN_DEFAULT_DOC_SIZE, (int) tool.docSize); assertEquals(LogPatternTool.DEFAULT_TOP_N_PATTERN, tool.getTopNPattern()); assertEquals(LogPatternTool.DEFAULT_SAMPLE_LOG_SIZE, tool.getSampleLogSize()); - assertNull(tool.getPattern()); assertEquals("LogPatternTool", tool.getType()); assertEquals("LogPatternTool", tool.getName()); assertEquals(LogPatternTool.DEFAULT_DESCRIPTION, LogPatternTool.Factory.getInstance().getDefaultDescription()); @@ -160,19 +184,20 @@ public void testCreateToolWithNonPositiveSize() { @Test public void testGetQueryBody() { LogPatternTool tool = LogPatternTool.Factory.getInstance().create(params); - assertEquals(TEST_QUERY_TEXT, tool.getQueryBody(TEST_QUERY_TEXT)); + assertEquals(new JSONObject(TEST_QUERY_TEXT).toString(), new JSONObject(tool.getQueryBody(TEST_QUERY_TEXT)).toString()); } @Test public void testValidate() { LogPatternTool tool = LogPatternTool.Factory.getInstance().create(params); - assertTrue(tool.validate(Map.of("index", "test1", "input", "input_value"))); + assertTrue(tool.validate(Map.of(INDEX_FIELD, "test1", INPUT_FIELD, "input_value"))); + assertTrue(tool.validate(Map.of(INDEX_FIELD, "test1", PPL_FIELD, "ppl_value"))); - // validate failure if no index - assertFalse(tool.validate(Map.of("input", "input_value"))); + // validate failure if no input or ppl + assertFalse(tool.validate(Map.of(INDEX_FIELD, "test1"))); - // validate failure if no - assertFalse(tool.validate(Map.of("index", "test1"))); + // validate failure if no index + assertFalse(tool.validate(Map.of(INPUT_FIELD, "input_value"))); } @Test @@ -180,16 +205,10 @@ public void testFindLongestField() { assertEquals("field2", LogPatternTool.findLongestField(Map.of("field1", "123", "field2", "1234", "filed3", 1234))); } - @Test - public void testExtractPattern() { - assertEquals("././", LogPatternTool.extractPattern("123.abc/.AB/", null)); - assertEquals("123.c/.AB/", LogPatternTool.extractPattern("123.abc/.AB/", Pattern.compile("ab"))); - assertEquals(".abc/.AB/", LogPatternTool.extractPattern("123.abc/.AB/", Pattern.compile("[0-9]"))); - } - @SneakyThrows @Test public void testExecutionDefault() { + mockDSLInvocation(); LogPatternTool tool = LogPatternTool.Factory.getInstance().create(params); JsonElement expected = gson .fromJson( @@ -210,12 +229,10 @@ public void testExecutionDefault() { @SneakyThrows @Test public void testExecutionWithSpecifiedPatternField() { + mockDSLInvocation(); LogPatternTool tool = LogPatternTool.Factory.getInstance().create(params); JsonElement expected = gson - .fromJson( - "[{\"total count\":5,\"sample logs\":[{\"field1\":\"123\",\"field3\":12345,\"field2\":\"123.abc-AB * De /\"},{\"field1\":\"123\",\"field3\":12345,\"field2\":\"45.abc-AB * De /\"}],\"pattern\":\"\"}]", - JsonElement.class - ); + .fromJson("[{\"pattern\":\"123\",\"total count\":5,\"sample logs\":[\"123\",\"123\"]}]", JsonElement.class); tool .run( ImmutableMap.of("index", "index_name", "input", "{}", "pattern_field", "field1", "sample_log_size", "2"), @@ -227,26 +244,6 @@ public void testExecutionWithSpecifiedPatternField() { ); } - @SneakyThrows - @Test - public void testExecutionWithSpecifiedPattern() { - LogPatternTool tool = LogPatternTool.Factory.getInstance().create(Map.of(PATTERN, "[a-zA-Z]")); - JsonElement expected = gson - .fromJson( - "[{\"pattern\":\"45.- * /\",\"sample logs\":[{\"field1\":\"123\",\"field3\":12345,\"field2\":\"45.abc-AB * De /\"}],\"total count\":1},{\"pattern\":\". * /\",\"sample logs\":[{\"field1\":\"123\",\"field3\":12345,\"field2\":\".abAB * De /\"}],\"total count\":1},{\"pattern\":\"123.- * /\",\"sample logs\":[{\"field1\":\"123\",\"field3\":12345,\"field2\":\"123.abc-AB * De /\"}],\"total count\":1}]", - JsonElement.class - ); - tool - .run( - ImmutableMap.of("index", "index_name", "input", "{}"), - ActionListener - .wrap( - response -> assertEquals(expected, gson.fromJson(response, JsonElement.class)), - e -> fail("Tool runs failed: " + e.getMessage()) - ) - ); - } - @SneakyThrows @Test public void testExecutionWithBlankInput() { @@ -257,7 +254,8 @@ public void testExecutionWithBlankInput() { ActionListener .wrap( response -> fail(), - e -> MatcherAssert.assertThat(e.getMessage(), containsString("[input] is null or empty, can not process it.")) + e -> MatcherAssert + .assertThat(e.getMessage(), containsString("Both DSL and PPL input is null or empty, can not process it.")) ) ); } @@ -375,4 +373,69 @@ public void testExecutionFailedInSearch() { ActionListener.wrap(response -> fail(), e -> assertEquals("Failed in Search", e.getMessage())) ); } + + @SneakyThrows + @Test + public void testExecutionWithPPLInput() { + mockPPLInvocation(); + LogPatternTool tool = LogPatternTool.Factory.getInstance().create(params); + JsonElement expected = gson + .fromJson( + Files.readString(Path.of(this.getClass().getClassLoader().getResource(responseBodyResourceFile).toURI())), + JsonElement.class + ); + tool + .run( + ImmutableMap.of(INDEX_FIELD, "index_name", PPL_FIELD, "source"), + ActionListener + .wrap( + response -> assertEquals(expected, gson.fromJson(response, JsonElement.class)), + e -> fail("Tool runs failed: " + e.getMessage()) + ) + ); + } + + @SneakyThrows + @Test + public void testExecutionWithPPLInputWhenNoDataIsReturned() { + String emptyDataPPLResponse = + """ + {"schema":[{"name":"field1","type":"string"},{"name":"field2","type":"string"},{"name":"field3","type":"long"}],"datarows":[],"total":0,"size":0} + """; + doAnswer(invocation -> { + ActionListener listener = (ActionListener) invocation.getArguments()[2]; + listener.onResponse(pplQueryResponse); + return null; + }).when(client).execute(eq(PPLQueryAction.INSTANCE), any(), any()); + when(pplQueryResponse.getResult()).thenReturn(emptyDataPPLResponse); + LogPatternTool tool = LogPatternTool.Factory.getInstance().create(params); + + tool + .run( + ImmutableMap.of(INDEX_FIELD, "index_name", PPL_FIELD, "source"), + ActionListener + .wrap( + response -> assertEquals("Can not get any data row from ppl response.", response), + e -> fail("Tool runs failed: " + e.getMessage()) + ) + ); + } + + @SneakyThrows + @Test + public void testExecutionWithPPLFailed() { + String pplFailureMessage = "Failed in execute ppl"; + doAnswer(invocation -> { + ActionListener listener = (ActionListener) invocation.getArguments()[1]; + listener.onFailure(new Exception(pplFailureMessage)); + return null; + }).when(client).search(any(), any()); + + LogPatternTool tool = LogPatternTool.Factory.getInstance().create(params); + tool + .run( + ImmutableMap.of(INDEX_FIELD, "index_name", PPL_FIELD, "source"), + ActionListener.wrap(response -> fail(), e -> assertEquals(pplFailureMessage, e.getMessage())) + ); + } } diff --git a/src/test/java/org/opensearch/agent/tools/utils/BrainLogParserTests.java b/src/test/java/org/opensearch/agent/tools/utils/BrainLogParserTests.java new file mode 100644 index 00000000..c4969183 --- /dev/null +++ b/src/test/java/org/opensearch/agent/tools/utils/BrainLogParserTests.java @@ -0,0 +1,233 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.agent.tools.utils; + +import java.util.Arrays; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import org.opensearch.test.OpenSearchTestCase; + +public class BrainLogParserTests extends OpenSearchTestCase { + + private static final List TEST_HDFS_LOGS = Arrays + .asList( + "BLOCK* NameSystem.addStoredBlock: blockMap updated: 10.251.31.85:50010 is added to blk_-7017553867379051457 size 67108864", + "BLOCK* NameSystem.allocateBlock: /user/root/sortrand/_temporary/_task_200811092030_0002_r_000296_0/part-00296. blk_-6620182933895093708", + "BLOCK* NameSystem.addStoredBlock: blockMap updated: 10.250.7.244:50010 is added to blk_-6956067134432991406 size 67108864", + "BLOCK* NameSystem.allocateBlock: /user/root/sortrand/_temporary/_task_200811092030_0002_r_000230_0/part-00230. blk_559204981722276126", + "BLOCK* NameSystem.allocateBlock: /user/root/sortrand/_temporary/_task_200811092030_0002_r_000169_0/part-00169. blk_-7105305952901940477", + "BLOCK* NameSystem.addStoredBlock: blockMap updated: 10.251.107.19:50010 is added to blk_-3249711809227781266 size 67108864", + "BLOCK* NameSystem.allocateBlock: /user/root/sortrand/_temporary/_task_200811092030_0002_r_000318_0/part-00318. blk_-207775976836691685", + "BLOCK* NameSystem.addStoredBlock: blockMap updated: 10.250.6.4:50010 is added to blk_5114010683183383297 size 67108864", + "BLOCK* NameSystem.allocateBlock: /user/root/sortrand/_temporary/_task_200811092030_0002_r_000318_0/part-00318. blk_2096692261399680562", + "BLOCK* NameSystem.addStoredBlock: blockMap updated: 10.250.15.240:50010 is added to blk_-1055254430948037872 size 67108864", + "BLOCK* NameSystem.addStoredBlock: blockMap updated: 10.250.7.146:50010 is added to blk_278357163850888 size 67108864", + "BLOCK* NameSystem.allocateBlock: /user/root/sortrand/_temporary/_task_200811092030_0002_r_000138_0/part-00138. blk_-210021574616486609", + "Verification succeeded for blk_-1547954353065580372", + "BLOCK* NameSystem.addStoredBlock: blockMap updated: 10.251.39.242:50010 is added to blk_-4110733372292809607 size 67108864", + "BLOCK* NameSystem.allocateBlock: /user/root/randtxt/_temporary/_task_200811092030_0003_m_000382_0/part-00382. blk_8935202950442998446", + "BLOCK* NameSystem.allocateBlock: /user/root/randtxt/_temporary/_task_200811092030_0003_m_000392_0/part-00392. blk_-3010126661650043258", + "BLOCK* NameSystem.addStoredBlock: blockMap updated: 10.251.25.237:50010 is added to blk_541463031152673662 size 67108864", + "Verification succeeded for blk_6996194389878584395", + "PacketResponder failed for blk_6996194389878584395", + "PacketResponder failed for blk_-1547954353065580372" + ); + + private BrainLogParser parser; + + @Override + public void setUp() throws Exception { + super.setUp(); + parser = new BrainLogParser(); + } + + public void testNewParserWithIllegalArgument() { + String exceptionMessage = "Threshold percentage must be between 0.0 and 1.0"; + Throwable throwable = assertThrows(IllegalArgumentException.class, () -> new BrainLogParser(2, -1.0f)); + assertEquals(exceptionMessage, throwable.getMessage()); + throwable = assertThrows(IllegalArgumentException.class, () -> new BrainLogParser(2, 1.1f)); + assertEquals(exceptionMessage, throwable.getMessage()); + } + + public void testPreprocess() { + String logMessage = "127.0.0.1 - 1234 something"; + String logId = "log1"; + List expectedResult = Arrays.asList("<*IP*>", "-", "<*>", "something", "log1"); + List result = parser.preprocess(logMessage, logId); + assertEquals(expectedResult, result); + + // Test with different delimiter + logMessage = "127.0.0.1=1234 something"; + logId = "log2"; + expectedResult = Arrays.asList("<*IP*>=<*>", "something", "log2"); + result = parser.preprocess(logMessage, logId); + assertEquals(expectedResult, result); + } + + public void testPreprocessWithIllegalInput() { + String logMessage = "127.0.0.1 - 1234 something"; + String logId = "log1"; + String exceptionMessage = "log message or logId must not be null"; + Throwable throwable = assertThrows(IllegalArgumentException.class, () -> parser.preprocess(null, logId)); + assertEquals(exceptionMessage, throwable.getMessage()); + throwable = assertThrows(IllegalArgumentException.class, () -> parser.preprocess(logMessage, null)); + assertEquals(exceptionMessage, throwable.getMessage()); + throwable = assertThrows(IllegalArgumentException.class, () -> parser.preprocess(null, null)); + assertEquals(exceptionMessage, throwable.getMessage()); + } + + public void testPreprocessAllLogs() { + List logMessages = Arrays.asList("127.0.0.1 - 1234 something", "192.168.0.1 - 5678 something_else"); + + List> result = parser.preprocessAllLogs(logMessages); + + assertEquals(2, result.size()); + assertEquals(Arrays.asList("<*IP*>", "-", "<*>", "something", "0"), result.get(0)); + assertEquals(Arrays.asList("<*IP*>", "-", "<*>", "something_else", "1"), result.get(1)); + } + + public void testProcessTokenHistogram() { + String something = String.format(Locale.ROOT, "%d-%s", 0, "something"); + String up = String.format(Locale.ROOT, "%d-%s", 1, "up"); + List firstTokens = Arrays.asList("something", "up", "0"); + parser.processTokenHistogram(firstTokens); + assertEquals(1L, parser.getTokenFreqMap().get(something).longValue()); + assertEquals(1L, parser.getTokenFreqMap().get(up).longValue()); + + List secondTokens = Arrays.asList("something", "down", "1"); + parser.processTokenHistogram(secondTokens); + assertEquals(2L, parser.getTokenFreqMap().get(something).longValue()); + assertEquals(1L, parser.getTokenFreqMap().get(up).longValue()); + } + + public void testCalculateGroupTokenFreq() { + List logMessages = Arrays + .asList("127.0.0.1 - 1234 something", "192.168.0.1:5678 something_else", "0.0.0.0:42 something_else"); + List logIds = Arrays.asList("0", "1", "2"); + + List> preprocessedLogs = parser.preprocessAllLogs(logMessages); + parser.calculateGroupTokenFreq(preprocessedLogs); + + for (String logId : logIds) { + String groupCandidate = parser.getLogIdGroupCandidateMap().get(logId); + assertNotNull(groupCandidate); + } + assertTrue(parser.getGroupTokenSetMap().containsValue(Set.of("something"))); + assertTrue(parser.getGroupTokenSetMap().containsValue(Set.of("something_else"))); + String sampleGroupTokenKey = String.format(Locale.ROOT, "%d-%s-%d", 4, parser.getLogIdGroupCandidateMap().get("0"), 3); + assertTrue(parser.getGroupTokenSetMap().get(sampleGroupTokenKey).contains("something")); + } + + public void testCalculateGroupTokenFreqWithIllegalInput() { + List> preprocessedLogs = Arrays.asList(List.of()); + String exceptionMessage = "Sorted word combinations must be non empty"; + Throwable throwable = assertThrows(IllegalArgumentException.class, () -> parser.calculateGroupTokenFreq(preprocessedLogs)); + assertEquals(exceptionMessage, throwable.getMessage()); + } + + public void testParseLogPattern() { + List> preprocessedLogs = parser.preprocessAllLogs(TEST_HDFS_LOGS); + parser.calculateGroupTokenFreq(preprocessedLogs); + + List expectedLogPattern = Arrays + .asList( + "BLOCK*", + "NameSystem.addStoredBlock:", + "blockMap", + "updated:", + "<*IP*>", + "is", + "added", + "to", + "blk_<*>", + "size", + "<*>" + ); + List logPattern = parser.parseLogPattern(preprocessedLogs.get(0)); + assertEquals(expectedLogPattern, logPattern); + } + + public void testParseAllLogPatterns() { + Map> logPatternMap = parser.parseAllLogPatterns(TEST_HDFS_LOGS); + + Map expectedResult = Map + .of( + "PacketResponder failed for blk_<*>", + 2, + "Verification succeeded for blk_<*>", + 2, + "BLOCK* NameSystem.addStoredBlock: blockMap updated: <*IP*> is added to blk_<*> size <*>", + 8, + "BLOCK* NameSystem.allocateBlock: /user/root/sortrand/_temporary/_task_<*>_<*>_r_<*>_<*>/part<*> blk_<*>", + 6, + "BLOCK* NameSystem.allocateBlock: /user/root/randtxt/_temporary/_task_<*>_<*>_m_<*>_<*>/part<*> blk_<*>", + 2 + ); + Map logPatternByCountMap = logPatternMap + .entrySet() + .stream() + .collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().size())); + assertEquals(expectedResult, logPatternByCountMap); + } + + public void testParseLogPatternWhenLowerFrequencyTokenIsVariable() { + int testVariableCountThreshold = 3; + parser = new BrainLogParser(testVariableCountThreshold, 0.0f); + List logMessages = Arrays + .asList( + "Verification succeeded a blk_-1547954353065580372", + "Verification succeeded b blk_6996194389878584395", + "Verification succeeded c blk_6996194389878584395", + "Verification succeeded d blk_6996194389878584395" + ); + + Map> expectedResult = Map.of("Verification succeeded <*> blk_<*>", Arrays.asList("0", "1", "2", "3")); + Map> logPatternMap = parser.parseAllLogPatterns(logMessages); + assertEquals(expectedResult, logPatternMap); + /* + * 'a', 'b', 'c' and 'd' token is on the 3rd position in the group 2,3, their frequency is lower than + * representative frequency. Since that position's distinct token number exceeds the variable count threshold, + * the third position in this log group is treated as variable + */ + assertTrue(parser.getTokenFreqMap().get("2-a") < parser.getTokenFreqMap().get("0-Verification")); + assertTrue(parser.getTokenFreqMap().get("2-b") < parser.getTokenFreqMap().get("0-Verification")); + assertTrue(testVariableCountThreshold <= parser.getGroupTokenSetMap().get("4-4,3-2").size()); + } + + public void testParseLogPatternWhenHigherFrequencyTokenIsVariable() { + List logMessages = Arrays + .asList( + "Verification succeeded for blk_-1547954353065580372", + "Verification succeeded for blk_6996194389878584395", + "Test succeeded for blk_6996194389878584395", + "Verification", + "Verification" + ); + + Map> expectedResult = Map + .of( + "<*> succeeded for blk_<*>", + Arrays.asList("0", "1"), + "Test succeeded for blk_<*>", + Arrays.asList("2"), + "Verification", + Arrays.asList("3", "4") + ); + Map> logPatternMap = parser.parseAllLogPatterns(logMessages); + assertEquals(expectedResult, logPatternMap); + /* + * 'Verification' and 'Test' token is on the 1st position in the group 3,3, 'Verification' frequency is higher than + * representative frequency because there are other groups which have 'Verification' token on the 1st position as well. + * Since first position's distinct token number is not unique, 'Verification' is treated as variable eventually. + */ + assertTrue(parser.getTokenFreqMap().get("0-Verification") > parser.getTokenFreqMap().get("1-succeeded")); + assertTrue(parser.getGroupTokenSetMap().get("4-3,3-0").size() > 1); + } +} diff --git a/src/test/java/org/opensearch/integTest/LogPatternToolIT.java b/src/test/java/org/opensearch/integTest/LogPatternToolIT.java index 39ce1e98..f4aced61 100644 --- a/src/test/java/org/opensearch/integTest/LogPatternToolIT.java +++ b/src/test/java/org/opensearch/integTest/LogPatternToolIT.java @@ -10,6 +10,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.List; +import java.util.Locale; import org.hamcrest.MatcherAssert; import org.junit.After; @@ -86,7 +87,7 @@ public void testLogPatternToolDefault() { .fromJson( executeAgent( agentId, - String.format("{\"parameters\": {\"index\": \"%s\", \"input\": \"%s\"}}", TEST_PATTERN_INDEX_NAME, "{}") + String.format(Locale.ROOT, "{\"parameters\": {\"index\": \"%s\", \"input\": \"%s\"}}", TEST_PATTERN_INDEX_NAME, "{}") ), JsonElement.class ); @@ -96,10 +97,7 @@ public void testLogPatternToolDefault() { @SneakyThrows public void testLogPatternToolWithSpecifiedPatternField() { JsonElement expected = gson - .fromJson( - "[{\"total count\":5,\"sample logs\":[{\"field1\":\"123\",\"field3\":12345,\"field2\":\"123.abc-AB * De /\"},{\"field1\":\"123\",\"field3\":12345,\"field2\":\"45.abc-AB * De /\"}],\"pattern\":\"\"}]", - JsonElement.class - ); + .fromJson("[{\"total count\":5,\"sample logs\":[\"123\", \"123\"],\"pattern\":\"123\"}]", JsonElement.class); JsonElement result = gson .fromJson( executeAgent( @@ -190,4 +188,27 @@ public void testLogPatternToolWithNonPositiveSampleLogSize() { .assertThat(exception.getMessage(), containsString("\"Invalid value -1 for parameter sample_log_size, it should be positive")); } + @SneakyThrows + public void testLogPatternToolWithPPLInput() { + JsonElement expected = gson + .fromJson( + Files.readString(Path.of(this.getClass().getClassLoader().getResource(responseBodyResourceFile).toURI())), + JsonElement.class + ); + JsonElement result = gson + .fromJson( + executeAgent( + agentId, + String + .format( + "{\"parameters\": {\"index\": \"%s\", \"ppl\": \"%s\"}}", + TEST_PATTERN_INDEX_NAME, + String.format(Locale.ROOT, "source=%s", TEST_PATTERN_INDEX_NAME) + ) + ), + JsonElement.class + ); + assertEquals(expected, result); + } + } diff --git a/src/test/resources/org/opensearch/agent/tools/expected_flow_agent_of_log_pattern_tool_response_body.json b/src/test/resources/org/opensearch/agent/tools/expected_flow_agent_of_log_pattern_tool_response_body.json index 7ba8659c..e2792f8c 100644 --- a/src/test/resources/org/opensearch/agent/tools/expected_flow_agent_of_log_pattern_tool_response_body.json +++ b/src/test/resources/org/opensearch/agent/tools/expected_flow_agent_of_log_pattern_tool_response_body.json @@ -1,45 +1,7 @@ [ { - "pattern": "._ * /", - "sample logs": [ - { - "field1": "123", - "field3": 12345, - "field2": "12.abc_AB * De /" - }, - { - "field1": "123", - "field3": 12345, - "field2": "45.ab_AB * De /" - } - ], - "total count": 2 - }, - { - "pattern": ".- * /", - "sample logs": [ - { - "field1": "123", - "field3": 12345, - "field2": "123.abc-AB * De /" - }, - { - "field1": "123", - "field3": 12345, - "field2": "45.abc-AB * De /" - } - ], - "total count": 2 - }, - { - "pattern": ". * /", - "sample logs": [ - { - "field1": "123", - "field3": 12345, - "field2": ".abAB * De /" - } - ], - "total count": 1 + "sample logs": ["123.abc-AB * De /", "45.abc-AB * De /", "12.abc_AB * De /", "45.ab_AB * De /", ".abAB * De /"], + "total count": 5, + "pattern": "<*> * De /" } -] +] \ No newline at end of file