Skip to content

Commit

Permalink
[FAB-3003] Added join channel functionality
Browse files Browse the repository at this point in the history
Change-Id: I7eb223e4c27725fc55e37c7f6834424a3cf51d0c
Signed-off-by: Divyank Katira <[email protected]>
  • Loading branch information
d1vyank committed Apr 5, 2017
1 parent 17cd4e0 commit f731064
Show file tree
Hide file tree
Showing 14 changed files with 705 additions and 107 deletions.
146 changes: 143 additions & 3 deletions fabric-client/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"time"

"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric-sdk-go/fabric-client/util"
"github.com/hyperledger/fabric/bccsp"
msp "github.com/hyperledger/fabric/msp"
"github.com/hyperledger/fabric/protos/common"
Expand Down Expand Up @@ -63,7 +64,8 @@ type Chain interface {
AddOrderer(orderer Orderer)
RemoveOrderer(orderer Orderer)
GetOrderers() []Orderer
CreateChannel(request CreateChannelRequest) error
CreateChannel(request *CreateChannelRequest) error
JoinChannel(request *JoinChannelRequest) error
UpdateChain() bool
IsReadonly() bool
QueryInfo() (*common.BlockchainInfo, error)
Expand Down Expand Up @@ -174,6 +176,13 @@ type CreateChannelRequest struct {
ConfigData []byte
}

// JoinChannelRequest allows a set of peers to transact on a channel on the network
type JoinChannelRequest struct {
Targets []Peer
TxID string
Nonce []byte
}

// NewChain ...
/**
* @param {string} name to identify different chain instances. The naming of chain instances
Expand Down Expand Up @@ -388,10 +397,10 @@ func (c *chain) GetOrderers() []Orderer {
// CreateChannel calls the an orderer to create a channel on the network
// @param {CreateChannelRequest} request Contains cofiguration information
// @returns {bool} result of the channel creation
func (c *chain) CreateChannel(request CreateChannelRequest) error {
func (c *chain) CreateChannel(request *CreateChannelRequest) error {
var failureCount int
// Validate request
if request.ConfigData == nil {
if request == nil || request.ConfigData == nil {
return fmt.Errorf("Configuration is required to create a chanel")
}

Expand Down Expand Up @@ -426,6 +435,89 @@ func (c *chain) CreateChannel(request CreateChannelRequest) error {
return nil
}

// JoinChannel instructs a set of peers to join the channel represented by
// this chain
// @param {JoinChannelRequest} Join channel request
// @returns error, if applicable
func (c *chain) JoinChannel(request *JoinChannelRequest) error {
joinCommand := "JoinChain"
err := validateJoinChannelRequest(request)
if err != nil {
return err
}
// Fetch genesis block
block, err := c.fetchGenesisBlock()
if err != nil {
return err
}
blockBytes, err := proto.Marshal(block)
if err != nil {
return fmt.Errorf("Error unmarshalling block: %s", err)
}
// Get user enrolment info and serialize for signing requests
user, err := c.clientContext.GetUserContext("")
if err != nil {
return fmt.Errorf("GetUserContext returned error: %s", err)
}
creatorID, err := getSerializedIdentity(user.GetEnrollmentCertificate())
if err != nil {
return err
}
// Create join channel transaction proposal for target peers
var args [][]byte
args = append(args, []byte(joinCommand))
args = append(args, blockBytes)
ccis := &pb.ChaincodeInvocationSpec{ChaincodeSpec: &pb.ChaincodeSpec{
Type: pb.ChaincodeSpec_GOLANG, ChaincodeId: &pb.ChaincodeID{Name: "cscc"},
Input: &pb.ChaincodeInput{Args: args}}}

// create a proposal from a ChaincodeInvocationSpec
proposal, _, err := protos_utils.
CreateChaincodeProposalWithTxIDNonceAndTransient(request.TxID,
common.HeaderType_ENDORSER_TRANSACTION, "", ccis,
request.Nonce, creatorID, nil)
if err != nil {
return fmt.Errorf("Could not create chaincode proposal, err %s", err)
}
// Serialize join proposal
proposalBytes, err := protos_utils.GetBytesProposal(proposal)
if err != nil {
return err
}
// Sign join proposal
signature, err := c.signObjectWithKey(proposalBytes, user.GetPrivateKey(),
&bccsp.SHAOpts{}, nil)
if err != nil {
return err
}
// Send join proposal
proposalResponses, err := c.SendTransactionProposal(&TransactionProposal{
TransactionID: request.TxID,
signedProposal: &pb.SignedProposal{
ProposalBytes: proposalBytes,
Signature: signature},
proposal: proposal,
}, 0, request.Targets)
if err != nil {
return fmt.Errorf("Error sending join transaction proposal: %s", err)
}

// Check responses from target peers for success/failure
var joinError string
for _, response := range proposalResponses {
if response.Err != nil {
joinError = joinError +
fmt.Sprintf("Join channel proposal response error: %s \n",
response.Err.Error())
}
}
if joinError != "" {
return fmt.Errorf(joinError)
}

return nil
}

// UpdateChain ...
/**
* Calls the orderer(s) to update an existing chain. This allows the addition and
Expand Down Expand Up @@ -1204,6 +1296,38 @@ func (c *chain) signProposal(proposal *pb.Proposal) (*pb.SignedProposal, error)
return &pb.SignedProposal{ProposalBytes: proposalBytes, Signature: signature}, nil
}

// fetchGenesisBlock fetches the configuration block for this channel
func (c *chain) fetchGenesisBlock() (*common.Block, error) {
// Get user enrolment info and serialize for signing requests
user, err := c.clientContext.GetUserContext("")
if err != nil {
return nil, fmt.Errorf("GetUserContext returned error: %s", err)
}
creatorID, err := getSerializedIdentity(user.GetEnrollmentCertificate())
if err != nil {
return nil, err
}
// Seek block zero (the configuration tx for this channel)
payload := util.CreateGenesisBlockRequest(c.name, creatorID)
blockRequest, err := c.SignPayload(payload)
if err != nil {
return nil, fmt.Errorf("Error signing payload: %s", err)
}
// Request genesis block from ordering service
var block *common.Block
// TODO: what if the primary orderer is down?
responses, errors := c.GetOrderers()[0].SendDeliver(blockRequest)
// Block on channels for genesis block or error
select {
case block = <-responses:
logger.Debugf("Got genesis block from ordering service: %#v", block)
case err = <-errors:
return nil, fmt.Errorf("Error from SendDeliver(): %s", err)
}

return block, nil
}

func getSerializedIdentity(userCertificate []byte) ([]byte, error) {
serializedIdentity := &msp.SerializedIdentity{Mspid: config.GetFabricCAID(),
IdBytes: userCertificate}
Expand Down Expand Up @@ -1242,3 +1366,19 @@ func buildChaincodePolicy(mspid string) (*common.SignaturePolicyEnvelope, error)
}
return p, nil
}
func validateJoinChannelRequest(request *JoinChannelRequest) error {
// Validate arguments
if request == nil {
return fmt.Errorf("JoinChannelRequest argument is required to join channel")
}
if request.Targets == nil || len(request.Targets) == 0 {
return fmt.Errorf("Atleast one target peer is required to join channel")
}
if request.TxID == "" {
return fmt.Errorf("Transaction ID is required to join channel")
}
if request.Nonce == nil {
return fmt.Errorf("Nonce is required to join channel")
}
return nil
}
73 changes: 71 additions & 2 deletions fabric-client/chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,18 @@ package fabricclient
import (
"fmt"
"io/ioutil"
"net"
"testing"

"google.golang.org/grpc"

mocks "github.com/hyperledger/fabric-sdk-go/fabric-client/mocks"
"github.com/hyperledger/fabric-sdk-go/fabric-client/util"
pb "github.com/hyperledger/fabric/protos/peer"
)

var testAddress = "0.0.0.0:5244"

func TestChainMethods(t *testing.T) {
client := NewClient()
chain, err := NewChain("testChain", client)
Expand Down Expand Up @@ -181,7 +187,7 @@ func TestCreateChain(t *testing.T) {
t.Fatalf(err.Error())
}
// Create channel without configuration
err = chain.CreateChannel(CreateChannelRequest{})
err = chain.CreateChannel(&CreateChannelRequest{})
if err == nil {
t.Fatalf("Expected error creating channel without config tx")
}
Expand All @@ -194,7 +200,7 @@ func TestCreateChain(t *testing.T) {
orderer := mockOrderer{fmt.Sprintf("0.0.0.0:1234"), nil}
chain.AddOrderer(&orderer)
// Test with valid cofiguration
err = chain.CreateChannel(CreateChannelRequest{ConfigData: configTx})
err = chain.CreateChannel(&CreateChannelRequest{ConfigData: configTx})
if err != nil {
t.Fatalf("Did not expect error from create channel. Got error: %s", err.Error())
}
Expand Down Expand Up @@ -238,6 +244,55 @@ func TestConcurrentOrderers(t *testing.T) {
}
}

func TestJoinChannel(t *testing.T) {
var peers []Peer
endorserServer := startEndorserServer(t)
chain, _ := setupTestChain()
peer, _ := CreateNewPeer(testAddress, "", "")
peers = append(peers, peer)
orderer := &mockOrderer{}
nonce, _ := util.GenerateRandomNonce()
txID, _ := util.ComputeTxID(nonce, []byte("testID"))
request := &JoinChannelRequest{Targets: peers, Nonce: nonce, TxID: txID}
chain.AddOrderer(orderer)
chain.AddPeer(peer)
// Test join channel with valid arguments
err := chain.JoinChannel(request)
if err != nil {
t.Fatalf("Did not expect error from join channel. Got: %s", err)
}
// Test join channel without request
err = chain.JoinChannel(nil)
if err.Error() != "JoinChannelRequest argument is required to join channel" {
t.Fatalf("Expected error without join channel request")
}
// Test join channel without target peers
request = &JoinChannelRequest{Targets: nil, Nonce: nonce, TxID: txID}
err = chain.JoinChannel(request)
if err.Error() != "Atleast one target peer is required to join channel" {
t.Fatalf("Expected error without target peers")
}
// Test join channel without nonce
request = &JoinChannelRequest{Targets: peers, Nonce: nil, TxID: txID}
err = chain.JoinChannel(request)
if err.Error() != "Nonce is required to join channel" {
t.Fatalf("Expected error without nonce")
}
// Test join channel without TxID
request = &JoinChannelRequest{Targets: peers, Nonce: nonce, TxID: ""}
err = chain.JoinChannel(request)
if err.Error() != "Transaction ID is required to join channel" {
t.Fatalf("Expected error without transaction ID")
}
// Test failed proposal error handling
endorserServer.ProposalError = fmt.Errorf("Test Error")
request = &JoinChannelRequest{Targets: peers, Nonce: nonce, TxID: txID}
err = chain.JoinChannel(request)
if err == nil {
t.Fatalf("Expected error")
}
}

func setupTestChain() (Chain, error) {
client := NewClient()
user := NewUser("test")
Expand Down Expand Up @@ -265,3 +320,17 @@ func setupMassiveTestChain(numberOfPeers int, numberOfOrderers int) (Chain, erro

return chain, error
}

func startEndorserServer(t *testing.T) *mocks.MockEndorserServer {
grpcServer := grpc.NewServer()
lis, err := net.Listen("tcp", testAddress)
endorserServer := &mocks.MockEndorserServer{}
pb.RegisterEndorserServer(grpcServer, endorserServer)
if err != nil {
fmt.Printf("Error starting test server %s", err)
t.FailNow()
}
fmt.Printf("Starting test server\n")
go grpcServer.Serve(lis)
return endorserServer
}
25 changes: 25 additions & 0 deletions fabric-client/mockorderer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,22 @@ limitations under the License.

package fabricclient

import (
"github.com/hyperledger/fabric/protos/common"
ab "github.com/hyperledger/fabric/protos/orderer"
)

// TestBlock is a test block
var testBlock = &ab.DeliverResponse{
Type: &ab.DeliverResponse_Block{
Block: &common.Block{
Data: &common.BlockData{
Data: [][]byte{[]byte("test")},
},
},
},
}

// mockOrderer is a mock fabricclient.Orderer
type mockOrderer struct {
MockURL string
Expand All @@ -34,3 +50,12 @@ func (o *mockOrderer) GetURL() string {
func (o *mockOrderer) SendBroadcast(envelope *SignedEnvelope) error {
return o.MockError
}

// SendBroadcast mocks sending a deliver request to the ordering service
func (o *mockOrderer) SendDeliver(envelope *SignedEnvelope) (chan *common.Block,
chan error) {
responses := make(chan *common.Block, 1)
errors := make(chan error, 1)
responses <- testBlock.GetBlock()
return responses, errors
}
Loading

0 comments on commit f731064

Please sign in to comment.