Skip to content

Commit

Permalink
ARROW-18317: [Go] Dictionary replacement during IPC stream (#14636)
Browse files Browse the repository at this point in the history
Fix dictionary replacement for IPC streams. Currently they incorrectly get concatenated together instead of replaced when not using deltas. This will properly replace dictionaries when encountering a non-delta dictionary message.

Authored-by: Matt Topol <[email protected]>
Signed-off-by: Matt Topol <[email protected]>
  • Loading branch information
zeroshade authored and kou committed Nov 15, 2022
1 parent 1f9db90 commit 60e1d81
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 1 deletion.
10 changes: 9 additions & 1 deletion go/arrow/internal/dictutils/dict.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,10 +344,18 @@ func (memo *Memo) AddDelta(id int64, v arrow.ArrayData) {
memo.id2dict[id] = append(d, v)
}

// AddOrReplace puts the provided dictionary into the memo table. If it
// already exists, then the new data will replace it. Otherwise it is added
// to the memo table.
func (memo *Memo) AddOrReplace(id int64, v arrow.ArrayData) bool {
d, ok := memo.id2dict[id]
if ok {
d = append(d, v)
// replace the dictionary and release any existing ones
for _, dict := range d {
dict.Release()
}
d[0] = v
d = d[:1]
} else {
d = []arrow.ArrayData{v}
}
Expand Down
94 changes: 94 additions & 0 deletions go/arrow/ipc/ipc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,3 +330,97 @@ func TestIPCTable(t *testing.T) {
n++
}
}

// ARROW-18317
func TestDictionary(t *testing.T) {
pool := memory.NewCheckedAllocator(memory.NewGoAllocator())
defer pool.AssertSize(t, 0)

// A schema with a single dictionary field
schema := arrow.NewSchema([]arrow.Field{{Name: "field", Type: &arrow.DictionaryType{
IndexType: arrow.PrimitiveTypes.Uint16,
ValueType: arrow.BinaryTypes.String,
Ordered: false,
}}}, nil)

// IPC writer and reader
var bufWriter bytes.Buffer
ipcWriter := ipc.NewWriter(&bufWriter, ipc.WithSchema(schema))
defer ipcWriter.Close()

bufReader := bytes.NewReader([]byte{})
var ipcReader *ipc.Reader

bldr := array.NewBuilder(pool, schema.Field(0).Type)
defer bldr.Release()
require.NoError(t, bldr.UnmarshalJSON([]byte(`["value_0"]`)))

arr := bldr.NewArray()
defer arr.Release()
// Create a first record with field = "value_0"
record := array.NewRecord(schema, []arrow.Array{arr}, 1)
defer record.Release()

expectedJson, err := record.MarshalJSON()
require.NoError(t, err)
// Serialize and deserialize the record via an IPC stream
json, ipcReader, err := encodeDecodeIpcStream(t, record, &bufWriter, ipcWriter, bufReader, ipcReader)
require.NoError(t, err)
// Compare the expected JSON with the actual JSON
require.JSONEq(t, string(expectedJson), string(json))

// Create a second record with field = "value_1"
require.NoError(t, bldr.UnmarshalJSON([]byte(`["value_1"]`)))
arr = bldr.NewArray()
defer arr.Release()
record = array.NewRecord(schema, []arrow.Array{arr}, 1)

// record, _, err = array.RecordFromJSON(pool, schema, strings.NewReader(`[{"field": ["value_1"]}]`))
// require.NoError(t, err)
defer record.Release()

expectedJson, err = record.MarshalJSON()
require.NoError(t, err)
// Serialize and deserialize the record via an IPC stream
json, ipcReader, err = encodeDecodeIpcStream(t, record, &bufWriter, ipcWriter, bufReader, ipcReader)
require.NoError(t, err)
// Compare the expected JSON with the actual JSON
// field = "value_0" but should be "value_1"
require.JSONEq(t, string(expectedJson), string(json))
require.NoError(t, ipcReader.Err())
ipcReader.Release()
}

// Encode and decode a record over a tuple of IPC writer and reader.
// IPC writer and reader are the same from one call to another.
func encodeDecodeIpcStream(t *testing.T,
record arrow.Record,
bufWriter *bytes.Buffer, ipcWriter *ipc.Writer,
bufReader *bytes.Reader, ipcReader *ipc.Reader) ([]byte, *ipc.Reader, error) {

// Serialize the record via an ipc writer
if err := ipcWriter.Write(record); err != nil {
return nil, ipcReader, err
}
serializedRecord := bufWriter.Bytes()
bufWriter.Reset()

// Deserialize the record via an ipc reader
bufReader.Reset(serializedRecord)
if ipcReader == nil {
newIpcReader, err := ipc.NewReader(bufReader)
if err != nil {
return nil, newIpcReader, err
}
ipcReader = newIpcReader
}
ipcReader.Next()
record = ipcReader.Record()

// Return the decoded record as a json string
json, err := record.MarshalJSON()
if err != nil {
return nil, ipcReader, err
}
return json, ipcReader, nil
}

0 comments on commit 60e1d81

Please sign in to comment.