Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,20 @@

import java.util.List;

public record ChunkInferenceInput(String input, @Nullable ChunkingSettings chunkingSettings) {
import static org.elasticsearch.inference.InferenceString.DataType.TEXT;

public record ChunkInferenceInput(InferenceString input, @Nullable ChunkingSettings chunkingSettings) {

public ChunkInferenceInput(String input) {
this(input, null);
this(new InferenceString(input, TEXT), null);
}

public static List<String> inputs(List<ChunkInferenceInput> chunkInferenceInputs) {
public static List<InferenceString> inputs(List<ChunkInferenceInput> chunkInferenceInputs) {
return chunkInferenceInputs.stream().map(ChunkInferenceInput::input).toList();
}

public String inputText() {
assert input.isText();
return input.value();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.inference;

import java.util.EnumSet;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

/**
* This class represents a String which may be raw text, or the String representation of some other data such as an image in base64
*/
public record InferenceString(String value, DataType dataType) {
/**
* Describes the type of data represented by an {@link InferenceString}
*/
public enum DataType {
TEXT,
IMAGE_BASE64
}

private static final EnumSet<DataType> IMAGE_TYPES = EnumSet.of(DataType.IMAGE_BASE64);

/**
* Constructs an {@link InferenceString} with the given value and {@link DataType}
* @param value the String value
* @param dataType the type of data that the String represents
*/
public InferenceString(String value, DataType dataType) {
this.value = Objects.requireNonNull(value);
this.dataType = Objects.requireNonNull(dataType);
}

public boolean isImage() {
return IMAGE_TYPES.contains(dataType);
}

public boolean isText() {
return DataType.TEXT.equals(dataType);
}

public static List<String> toStringList(List<InferenceString> inferenceStrings) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we filter out DataType.IMAGE_BASE64 items?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should just filter out non-text inputs, because if any manage to make it into one of the two places we call this method, then there's a problem somewhere. Maybe an assert like in EmbeddingsInput.getTextInputs() just for safety? The two classes where this method is called (in ElasticsearchInternalService and SageMakerService) don't use EmbeddingsInput, which is why there's a slightly different flow for them.

return inferenceStrings.stream().map(InferenceString::value).collect(Collectors.toList());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.inference.ChunkingSettings;
import org.elasticsearch.inference.ChunkingStrategy;
import org.elasticsearch.inference.InferenceServiceResults;
import org.elasticsearch.inference.InferenceString;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xpack.core.inference.chunking.Chunker.ChunkOffset;
import org.elasticsearch.xpack.core.inference.results.ChunkedInferenceEmbedding;
Expand All @@ -36,22 +37,26 @@
* chunks. Multiple inputs may be fit into a single batch or
* a single large input that has been chunked may spread over
* multiple batches.
*
* <p>
* The final aspect is to gather the responses from the batch
* processing and map the results back to the original element
* in the input list.
*/
public class EmbeddingRequestChunker<E extends EmbeddingResults.Embedding<E>> {

// Visible for testing
record Request(int inputIndex, int chunkIndex, ChunkOffset chunk, String input) {
public String chunkText() {
return input.substring(chunk.start(), chunk.end());
record Request(int inputIndex, int chunkIndex, ChunkOffset chunk, InferenceString input) {
public InferenceString chunkText() {
if (chunk.start() == 0 && chunk.end() == input.value().length()) {
return input;
} else {
return new InferenceString(input.value().substring(chunk.start(), chunk.end()), input.dataType());
}
}
}

public record BatchRequest(List<Request> requests) {
public Supplier<List<String>> inputs() {
public Supplier<List<InferenceString>> inputs() {
return () -> requests.stream().map(Request::chunkText).collect(Collectors.toList());
}
}
Expand Down Expand Up @@ -107,21 +112,29 @@ public EmbeddingRequestChunker(

List<Request> allRequests = new ArrayList<>();
for (int inputIndex = 0; inputIndex < inputs.size(); inputIndex++) {
ChunkingSettings chunkingSettings = inputs.get(inputIndex).chunkingSettings();
ChunkInferenceInput chunkInferenceInput = inputs.get(inputIndex);
ChunkingSettings chunkingSettings = chunkInferenceInput.chunkingSettings();
if (chunkingSettings == null) {
chunkingSettings = defaultChunkingSettings;
}
Chunker chunker = chunkers.getOrDefault(chunkingSettings.getChunkingStrategy(), defaultChunker);
String inputString = inputs.get(inputIndex).input();
List<ChunkOffset> chunks = chunker.chunk(inputString, chunkingSettings);
Chunker chunker;
if (chunkInferenceInput.input().isText()) {
chunker = chunkers.getOrDefault(chunkingSettings.getChunkingStrategy(), defaultChunker);
} else {
// Do not chunk non-text inputs
chunker = NoopChunker.INSTANCE;
chunkingSettings = NoneChunkingSettings.INSTANCE;
}
InferenceString inputString = chunkInferenceInput.input();
List<ChunkOffset> chunks = chunker.chunk(inputString.value(), chunkingSettings);
int resultCount = Math.min(chunks.size(), MAX_CHUNKS);
resultEmbeddings.add(new AtomicReferenceArray<>(resultCount));
resultOffsetStarts.add(new ArrayList<>(resultCount));
resultOffsetEnds.add(new ArrayList<>(resultCount));

for (int chunkIndex = 0; chunkIndex < chunks.size(); chunkIndex++) {
// If the number of chunks is larger than the maximum allowed value,
// scale the indices to [0, MAX) with similar number of original
// scale the indices to [0, MAX] with similar number of original
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to confirm, the change here is because MAX is inclusive right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, my mistake, I thought this was just a typo rather than indicating inclusive/exclusive. I learned something new today!

// chunks in the final chunks.
int targetChunkIndex = chunks.size() <= MAX_CHUNKS ? chunkIndex : chunkIndex * MAX_CHUNKS / chunks.size();
if (resultOffsetStarts.getLast().size() <= targetChunkIndex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package org.elasticsearch.xpack.core.inference.results;

import org.elasticsearch.inference.ChunkedInference;
import org.elasticsearch.inference.InferenceString;
import org.elasticsearch.xcontent.XContent;

import java.io.IOException;
Expand All @@ -19,15 +20,18 @@

public record ChunkedInferenceEmbedding(List<EmbeddingResults.Chunk> chunks) implements ChunkedInference {

public static List<ChunkedInference> listOf(List<String> inputs, SparseEmbeddingResults sparseEmbeddingResults) {
validateInputSizeAgainstEmbeddings(inputs, sparseEmbeddingResults.embeddings().size());
public static List<ChunkedInference> listOf(List<InferenceString> inputs, SparseEmbeddingResults sparseEmbeddingResults) {
validateInputSizeAgainstEmbeddings(inputs.size(), sparseEmbeddingResults.embeddings().size());

var results = new ArrayList<ChunkedInference>(inputs.size());
for (int i = 0; i < inputs.size(); i++) {
results.add(
new ChunkedInferenceEmbedding(
List.of(
new EmbeddingResults.Chunk(sparseEmbeddingResults.embeddings().get(i), new TextOffset(0, inputs.get(i).length()))
new EmbeddingResults.Chunk(
sparseEmbeddingResults.embeddings().get(i),
new TextOffset(0, inputs.get(i).value().length())
)
)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,16 @@

import org.elasticsearch.common.Strings;

import java.util.List;

public class TextEmbeddingUtils {

/**
* Throws an exception if the number of elements in the input text list is different than the results in text embedding
* response.
*/
public static void validateInputSizeAgainstEmbeddings(List<String> inputs, int embeddingSize) {
if (inputs.size() != embeddingSize) {
public static void validateInputSizeAgainstEmbeddings(int inputsSize, int embeddingSize) {
if (inputsSize != embeddingSize) {
throw new IllegalArgumentException(
Strings.format("The number of inputs [%s] does not match the embeddings [%s]", inputs.size(), embeddingSize)
Strings.format("The number of inputs [%s] does not match the embeddings [%s]", inputsSize, embeddingSize)
);
}
}
Expand Down
Loading