Skip to content

Commit

Permalink
FIXED: the namespace seleted missing bug. add the test namespace set.…
Browse files Browse the repository at this point in the history
… change some channel
  • Loading branch information
calmkart committed Nov 22, 2019
1 parent edc1758 commit 446cdd0
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 31 deletions.
2 changes: 1 addition & 1 deletion cmd/kubefwd/services/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func (opts *FwdServiceOpts) StartListen(stopListenCh <-chan struct{}) {
options.FieldSelector = fields.Everything().String()
options.LabelSelector = opts.ListOptions.LabelSelector
}
watchlist := cache.NewFilteredListWatchFromClient(opts.RESTClient, "services", v1.NamespaceDefault, optionsModifier)
watchlist := cache.NewFilteredListWatchFromClient(opts.RESTClient, "services", opts.Namespace, optionsModifier)
_, controller := cache.NewInformer(watchlist, &v1.Service{}, 0, cache.ResourceEventHandlerFuncs{
AddFunc: opts.AddServiceHandler,
DeleteFunc: opts.DeleteServiceHandler,
Expand Down
46 changes: 25 additions & 21 deletions cmd/kubefwd/services/services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,25 @@ import (
// then test to http get the service.
func TestMainPipe(t *testing.T) {

opts := buildFwdServiceOpts(t)
// we can change the namespace here
namespace := "default"
opts := buildFwdServiceOpts(t, namespace)

stopListenCh := make(chan struct{})
defer close(stopListenCh)
defer deleteTestService(t, opts.ClientSet)
defer deleteTestService(t, opts.ClientSet, namespace)

go opts.StartListen(stopListenCh)

go testFwd(t, opts.ClientSet, opts.Wg)
go testFwd(t, opts.ClientSet, opts.Wg, namespace)

time.Sleep(2 * time.Second)
opts.Wg.Wait()

}

// build the FwdServiceOpts struct
func buildFwdServiceOpts(t *testing.T) *FwdServiceOpts {
func buildFwdServiceOpts(t *testing.T, namespace string) *FwdServiceOpts {

hasRoot, err := utils.CheckRoot()

Expand Down Expand Up @@ -103,7 +105,7 @@ func buildFwdServiceOpts(t *testing.T) *FwdServiceOpts {
t.Fatalf("Error creating k8s RestClient: %s\n", err.Error())
}
// create the test service
createTestService(t, clientSet)
createTestService(t, clientSet, namespace)

listOptions := metav1.ListOptions{
LabelSelector: "app=kubefwd-test-nginx-service",
Expand All @@ -115,7 +117,7 @@ func buildFwdServiceOpts(t *testing.T) *FwdServiceOpts {
Wg: wg,
ClientSet: clientSet,
Context: rawConfig.CurrentContext,
Namespace: "default",
Namespace: namespace,
ListOptions: listOptions,
Hostfile: &fwdport.HostFileWithLock{Hosts: hostFile},
ClientConfig: restConfig,
Expand All @@ -130,14 +132,15 @@ func buildFwdServiceOpts(t *testing.T) *FwdServiceOpts {
}

// create a test nginx service and deployments
func createTestService(t *testing.T, clientset *kubernetes.Clientset) {
func createTestService(t *testing.T, clientset *kubernetes.Clientset, namespace string) {

// create the test nginx deployements
// default namespace is "default"

deployment := &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: "kubefwd-test-nginx-deployment",
Name: "kubefwd-test-nginx-deployment",
Namespace: namespace,
Labels: map[string]string{
"app": "kubefwd-test-nginx-deployment",
},
Expand Down Expand Up @@ -174,7 +177,7 @@ func createTestService(t *testing.T, clientset *kubernetes.Clientset) {
},
}

_, err := clientset.AppsV1().Deployments("default").Create(deployment)
_, err := clientset.AppsV1().Deployments(namespace).Create(deployment)
if err != nil {
t.Fatalf("Error creating the test nginx deployment: %s\n", err.Error())
}
Expand All @@ -183,7 +186,8 @@ func createTestService(t *testing.T, clientset *kubernetes.Clientset) {
// default namespace is "default"
service := &apiv1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "kubefwd-test-nginx-service",
Name: "kubefwd-test-nginx-service",
Namespace: namespace,
},
Spec: apiv1.ServiceSpec{
Selector: map[string]string{
Expand All @@ -201,24 +205,24 @@ func createTestService(t *testing.T, clientset *kubernetes.Clientset) {
},
}

_, err = clientset.CoreV1().Services("default").Create(service)
_, err = clientset.CoreV1().Services(namespace).Create(service)
if err != nil {
t.Fatalf("Error creating the test nginx deployment: %s\n", err.Error())
}
t.Log("Create test nginx service and deployment success")
}

// delete the test nginx service and deployments
func deleteTestService(t *testing.T, clientset *kubernetes.Clientset) {
clientset.AppsV1().Deployments("default").Delete("kubefwd-test-nginx-deployment", &metav1.DeleteOptions{})
clientset.CoreV1().Services("default").Delete("kubefwd-test-nginx-service", &metav1.DeleteOptions{})
func deleteTestService(t *testing.T, clientset *kubernetes.Clientset, namespace string) {
clientset.AppsV1().Deployments(namespace).Delete("kubefwd-test-nginx-deployment", &metav1.DeleteOptions{})
clientset.CoreV1().Services(namespace).Delete("kubefwd-test-nginx-service", &metav1.DeleteOptions{})
t.Log("Delete test nginx service and deployment success")
}

// http get to test if the forward is success
func testFwd(t *testing.T, clientset *kubernetes.Clientset, wg *sync.WaitGroup) {
pod := findFirstPodOfService(t, clientset)
if waitPodRunning(t, clientset, pod) {
func testFwd(t *testing.T, clientset *kubernetes.Clientset, wg *sync.WaitGroup, namespace string) {
pod := findFirstPodOfService(t, clientset, namespace)
if waitPodRunning(t, clientset, pod, namespace) {
resp, err := http.Get("http://kubefwd-test-nginx-service/")
if err != nil {
t.Fatalf("Forward Test nginx service faild, http get is err: %s", err.Error())
Expand All @@ -232,8 +236,8 @@ func testFwd(t *testing.T, clientset *kubernetes.Clientset, wg *sync.WaitGroup)

}

func findFirstPodOfService(t *testing.T, clientset *kubernetes.Clientset) *apiv1.Pod {
pods, err := clientset.CoreV1().Pods("default").List(metav1.ListOptions{
func findFirstPodOfService(t *testing.T, clientset *kubernetes.Clientset, namespace string) *apiv1.Pod {
pods, err := clientset.CoreV1().Pods(namespace).List(metav1.ListOptions{
LabelSelector: "app=kubefwd-test-nginx-deployment",
})
if err != nil {
Expand All @@ -242,13 +246,13 @@ func findFirstPodOfService(t *testing.T, clientset *kubernetes.Clientset) *apiv1
return &pods.Items[0]
}

func waitPodRunning(t *testing.T, clientset *kubernetes.Clientset, pod *apiv1.Pod) bool {
func waitPodRunning(t *testing.T, clientset *kubernetes.Clientset, pod *apiv1.Pod, namespace string) bool {

if pod.Status.Phase == apiv1.PodRunning {
return true
}

watcher, err := clientset.CoreV1().Pods("default").Watch(metav1.SingleObject(pod.ObjectMeta))
watcher, err := clientset.CoreV1().Pods(namespace).Watch(metav1.SingleObject(pod.ObjectMeta))
if err != nil {
t.Fatalf("error in create pod watcher, err: %s", err.Error())
}
Expand Down
34 changes: 25 additions & 9 deletions pkg/fwdport/fwdport.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ func (pfo *PortForwardOpts) PortForward() error {
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
pfo.ManualStopChan = make(chan struct{})
signalChan := make(chan struct{})

defer signal.Stop(signals)

Expand All @@ -104,6 +105,7 @@ func (pfo *PortForwardOpts) PortForward() error {
case <-signals:
case <-pfo.ManualStopChan:
}
close(signalChan)
if pfStopChannel != nil {
pfo.removeHosts()
close(pfStopChannel)
Expand All @@ -122,14 +124,20 @@ func (pfo *PortForwardOpts) PortForward() error {
}

// Waiting for the pod is runnning
pod, err := pfo.WaitForPodRunning(signals)
pod, err := pfo.WaitForPodRunning(signalChan)
if err != nil {
pfo.Stop()
return err
}
// if err is not nil but pod is nil
// mean service deleted but pod is not runnning.
// the pfo.Stop() has called yet, needn't call again
if pod == nil {
return nil
}

// Listen for pod is deleted
go pfo.ListenUntilPodDeleted(signals, pod)
go pfo.ListenUntilPodDeleted(signalChan, pod)

fw, err := portforward.NewOnAddresses(dialer, address, fwdPorts, pfStopChannel, pfReadyChannel, &p, &p)
if err != nil {
Expand Down Expand Up @@ -239,7 +247,7 @@ func (pfo *PortForwardOpts) removeHosts() {
}

// Waiting for the pod running
func (pfo *PortForwardOpts) WaitForPodRunning(signals chan os.Signal) (*v1.Pod, error) {
func (pfo *PortForwardOpts) WaitForPodRunning(signalsChan chan struct{}) (*v1.Pod, error) {
pod, err := pfo.ClientSet.CoreV1().Pods(pfo.Namespace).Get(pfo.PodName, metav1.GetOptions{})
if err != nil {
return nil, err
Expand All @@ -266,7 +274,7 @@ func (pfo *PortForwardOpts) WaitForPodRunning(signals chan os.Signal) (*v1.Pod,
go func() {
defer watcher.Stop()
select {
case <-signals:
case <-signalsChan:
case <-RunningChannel:
case <-pfo.ManualStopChan:
case <-time.After(time.Second * 300):
Expand All @@ -275,18 +283,22 @@ func (pfo *PortForwardOpts) WaitForPodRunning(signals chan os.Signal) (*v1.Pod,

// watcher until the pod status is running
for {
event := <-watcher.ResultChan()
event, ok := <-watcher.ResultChan()
if !ok {
break
}
if event.Object != nil {
changedPod := event.Object.(*v1.Pod)
if changedPod.Status.Phase == v1.PodRunning {
return changedPod, nil
}
}
}
return nil, nil
}

// listen for pod is deleted
func (pfo *PortForwardOpts) ListenUntilPodDeleted(signals chan os.Signal, pod *v1.Pod) {
func (pfo *PortForwardOpts) ListenUntilPodDeleted(signalsChan chan struct{}, pod *v1.Pod) {

watcher, err := pfo.ClientSet.CoreV1().Pods(pfo.Namespace).Watch(metav1.SingleObject(pod.ObjectMeta))
if err != nil {
Expand All @@ -297,7 +309,7 @@ func (pfo *PortForwardOpts) ListenUntilPodDeleted(signals chan os.Signal, pod *v
defer watcher.Stop()
select {
case <-pfo.ManualStopChan:
case <-signals:
case <-signalsChan:
}
}()

Expand All @@ -311,15 +323,19 @@ func (pfo *PortForwardOpts) ListenUntilPodDeleted(signals chan os.Signal, pod *v
// we'll get a pure /etc/hosts few times after.
// maybe we can fix here later.
for {
event := <-watcher.ResultChan()
event, ok := <-watcher.ResultChan()
if !ok {
break
}
switch event.Type {
case watch.Deleted:
fmt.Println("Pod deleted!")
fmt.Printf("%s Pod deleted, restart the %s service portforward.", pod.ObjectMeta.Name, pfo.NativeServiceName)
pfo.ServiceOperator.UnForwardService(pfo.NativeServiceName, pfo.Namespace)
pfo.ServiceOperator.ForwardService(pfo.NativeServiceName, pfo.Namespace)
return
}
}
return
}

// this method to stop PortForward for the pfo
Expand Down

0 comments on commit 446cdd0

Please sign in to comment.