暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

使用 client-go 实现 kubectl port-forward

ProdanLabs 2021-08-22
2733

一般来说, 外部访问运行在 kubernetes 的应用程序(Pod)有NodePorts,LoadBalancer 和 Ingress 三种模式,除此之外,还可以使用 kubectl 子命令 port-forward 通过端口转发映射本地端口到指定的 Pod 端口,从而访问 kubernetes 集群中的 Pod 。


01

kubectl port-forward 

将本地的 8080 端口转发到 nginx 容器的 80 端口。

root@k8s-dev-master01:~# kubectl get po 
NAME                     READY   STATUS    RESTARTS   AGE
ephemeral-demo           1/1     Running   0          26d
nginx-7885b78d89-89vm8   1/1     Running   0          9h
nginx-7885b78d89-kdtbl   1/1     Running   0          9h
root@k8s-dev-master01:~
root@k8s-dev-master01:~
root@k8s-dev-master01:~#  kubectl port-forward nginx-7885b78d89-89vm8 --address 0.0.0.0 8080:80
Forwarding from 0.0.0.0:8080 -> 80

复制


测试

root@k8s-dev-master01:~#  netstat -lnpt | grep 8080
tcp        0      0 0.0.0.0:8080            0.0.0.0:*               LISTEN      3775957/kubectl     
root@k8s-dev-master01:~# curl 127.0.0.1:8080
<!DOCTYPE html>
<html>
<head>
<title>Welcome to nginx!</title>
<style>
    body {
        width35em;
        margin0 auto;
        font-family: Tahoma, Verdana, Arial, sans-serif;
    }
</style>
</head>
<body>
<h1>Welcome to nginx!</h1>
<p>If you see this page, the nginx web server is successfully installed and
working. Further configuration is required.</p>

<p>For online documentation and support please refer to
<a href="http://nginx.org/">nginx.org</a>.<br/>
Commercial support is available at
<a href="http://nginx.com/">nginx.com</a>.</p>

<p><em>Thank you for using nginx.</em></p>
</body>
</html>
root@k8s-dev-master01:~# 

复制


02

kubectl port-forward 工作过程

kubectl 通过 spdy 发送请求到 kube-apiserver ,kube-apiserver 再通过 spdy 发送请求到 kubelet 

func (f *defaultPortForwarder) ForwardPorts(method string, url *url.URL, opts PortForwardOptions) error {
    transport, upgrader, err := spdy.RoundTripperFor(opts.Config)
    if err != nil {
        return err
    }
    dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, method, url)
    fw, err := portforward.NewOnAddresses(dialer, opts.Address, opts.Ports, opts.StopChannel, opts.ReadyChannel, f.Out, f.ErrOut)
    if err != nil {
        return err
    }
    return fw.ForwardPorts()
}

// RunPortForward implements all the necessary functionality for port-forward cmd.
func (o PortForwardOptions) RunPortForward() error {
    pod, err := o.PodClient.Pods(o.Namespace).Get(context.TODO(), o.PodName, metav1.GetOptions{})
    if err != nil {
        return err
    }

    if pod.Status.Phase != corev1.PodRunning {
        return fmt.Errorf("unable to forward port because pod is not running. Current status=%v", pod.Status.Phase)
    }

    signals := make(chan os.Signal, 1)
    signal.Notify(signals, os.Interrupt)
    defer signal.Stop(signals)

    go func() {
        <-signals
        if o.StopChannel != nil {
            close(o.StopChannel)
        }
    }()

    req := o.RESTClient.Post().
        Resource("pods").
        Namespace(o.Namespace).
        Name(pod.Name).
        SubResource("portforward")

    return o.PortForwarder.ForwardPorts("POST", req.URL(), o)
}

复制


kubelet 通过 cri api 请求 PortForward 准备一个流端点从 PodSandbox 转发端口,并返回 URL, 然后 kubelet 再请求这个 URL,通过 socat 监听此端口。

func (r *remoteRuntimeService) PortForward(req *runtimeapi.PortForwardRequest) (*runtimeapi.PortForwardResponse, error) {
    klog.V(10).InfoS("[RemoteRuntimeService] PortForward""podSandboxID", req.PodSandboxId, "port", req.Port, "timeout", r.timeout)
    ctx, cancel := getContextWithTimeout(r.timeout)
    defer cancel()

    resp, err := r.runtimeClient.PortForward(ctx, req)
    if err != nil {
        klog.ErrorS(err, "PortForward from runtime service failed""podSandboxID", req.PodSandboxId)
        return nil, err
    }
    klog.V(10).InfoS("[RemoteRuntimeService] PortForward Response""podSandboxID", req.PodSandboxId)

    if resp.Url == "" {
        errorMessage := "URL is not set"
        err := errors.New(errorMessage)
        klog.ErrorS(err, "PortForward failed")
        return nil, err
    }

    return resp, nil
}

复制


port-forward 会使用 socat 和 nsenter 命令工作。该函数检查 socat 和 nsenter 是否存在,故 Node 节点需要安装 socat 和 nsenter

func (r *streamingRuntime) portForward(podSandboxID string, port int32, stream io.ReadWriteCloser) error {
    container, err := r.client.InspectContainer(podSandboxID)
    if err != nil {
        return err
    }

    if !container.State.Running {
        return fmt.Errorf("container not running (%s)", container.ID)
    }

    containerPid := container.State.Pid
    socatPath, lookupErr := exec.LookPath("socat")
    if lookupErr != nil {
        return fmt.Errorf("unable to do port forwarding: socat not found")
    }

    args := []string{"-t", fmt.Sprintf("%d", containerPid), "-n", socatPath, "-", fmt.Sprintf("TCP4:localhost:%d", port)}

    nsenterPath, lookupErr := exec.LookPath("nsenter")
    if lookupErr != nil {
        return fmt.Errorf("unable to do port forwarding: nsenter not found")
    }

    commandString := fmt.Sprintf("%s %s", nsenterPath, strings.Join(args, " "))
    klog.V(4).InfoS("Executing port forwarding command""command", commandString)

    command := exec.Command(nsenterPath, args...)
    command.Stdout = stream

    stderr := new(bytes.Buffer)
    command.Stderr = stderr

    // If we use Stdin, command.Run() won't return until the goroutine that's copying
    // from stream finishes. Unfortunately, if you have a client like telnet connected
    // via port forwarding, as long as the user's telnet client is connected to the user's
    // local listener that port forwarding sets up, the telnet session never exits. This
    // means that even if socat has finished running, command.Run() won't ever return
    // (because the client still has the connection and stream open).
    //
    // The work around is to use StdinPipe(), as Wait() (called by Run()) closes the pipe
    // when the command (socat) exits.
    inPipe, err := command.StdinPipe()
    if err != nil {
        return fmt.Errorf("unable to do port forwarding: error creating stdin pipe: %v", err)
    }
    go func() {
        io.Copy(inPipe, stream)
        inPipe.Close()
    }()

    if err := command.Run(); err != nil {
        return fmt.Errorf("%v: %s", err, stderr.String())
    }

    return nil
}

复制


03

使用 client-go 实现 port-forward

参考 kubectl port-forward 源码实现。

package main

import (
    "io"
    "net/http"
    "net/url"
    "os"
    "os/signal"
    "syscall"

    "k8s.io/client-go/rest"
    "k8s.io/client-go/tools/portforward"
    "k8s.io/client-go/transport/spdy"
    "k8s.io/klog/v2"

    "github.com/prodanlabs/client-go-examples/client"
)

func main() {
    // 实例化 k8s 客户端
    kubeConfig, err := client.InitKubeConfig(false)
    if err != nil {
        klog.Fatal("kubernetes Config failed to initialize.", err)
    }

    clientSet, err := client.NewClientSet(kubeConfig)
    if err != nil {
        klog.Fatal("kubernetes clientSet failed.", err)
    }

    req := clientSet.CoreV1().RESTClient().Post().Namespace("default").
        Resource("pods").Name("nginx-7885b78d89-89vm8").SubResource("portforward")

    klog.Info(req.URL())

    signals := make(chan os.Signal, 1)
    StopChannel := make(chan struct{}, 1)
    ReadyChannel := make(chan struct{})

    defer signal.Stop(signals)

    signal.Notify(signals, os.Interrupt, syscall.SIGINT, syscall.SIGTERM, syscall.SIGKILL)

    go func() {
        <-signals
        if StopChannel != nil {
            close(StopChannel)
        }
    }()

    if err := ForwardPorts("POST", req.URL(), kubeConfig, StopChannel,ReadyChannel); err != nil {
        klog.Fatalln(err)
    }

}

func ForwardPorts(method string, url *url.URL, config *rest.Config, StopChannel, ReadyChannel chan struct{}) error {
    transport, upgrader, err := spdy.RoundTripperFor(config)
    if err != nil {
        return err
    }
    address := []string{"0.0.0.0"}
    ports := []string{"8080:80"}

    IOStreams := struct {
        // In think, os.Stdin
        In io.Reader
        // Out think, os.Stdout
        Out io.Writer
        // ErrOut think, os.Stderr
        ErrOut io.Writer
    }{os.Stdin, os.Stdout, os.Stderr}

    dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, method, url)
    fw, err := portforward.NewOnAddresses(dialer, address, ports, StopChannel, ReadyChannel, IOStreams.Out, IOStreams.ErrOut)
    if err != nil {
        return err
    }
    return fw.ForwardPorts()
}

复制




使用 client-go 实现 kubectl debug 命令

使用 client-go 实现 kubectl cp 命令

文章转载自ProdanLabs,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论