From 97d1424fa7082e4412c57716192be5a475e69eed Mon Sep 17 00:00:00 2001 From: Ybehrooz Date: Thu, 31 Jul 2025 19:55:28 +0330 Subject: [PATCH] add log exec and complate workloads --- handler/handler.go | 174 +++++++++++++++++++++++++++++++++++++++------ main.go | 3 + 2 files changed, 155 insertions(+), 22 deletions(-) diff --git a/handler/handler.go b/handler/handler.go index 3c0c300..d00cf8a 100644 --- a/handler/handler.go +++ b/handler/handler.go @@ -1,10 +1,12 @@ package handler import ( + "bytes" "context" "encoding/base64" "encoding/json" "fmt" + "io" "log" "main/argohandler" "main/db" @@ -15,6 +17,7 @@ import ( appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + "k8s.io/kubectl/pkg/scheme" // "github.com/gorilla/mux" @@ -22,7 +25,9 @@ import ( "go.mongodb.org/mongo-driver/bson/primitive" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/tools/remotecommand" ) type Cluster struct { @@ -93,9 +98,9 @@ type Jobs struct { type Replicaset struct { Name string `json:name` - Desired string `json:desired` - Current string `json:current` - Ready string `json:Ready` + Desired int32 `json:desired` + Current int32 `json:current` + Ready int32 `json:Ready` Age string `json:age` Namespace string `json:name` } @@ -103,9 +108,9 @@ type Replicaset struct { type ReplicationController struct { Namespace string `json:name` Name string `json:name` - Desired string `json:desired` - Current string `json:current` - Ready string `json:Ready` + Desired int32 `json:desired` + Current int32 `json:current` + Ready int32 `json:Ready` Age string `json:age` } @@ -209,27 +214,28 @@ func CreateClusterHandler(w http.ResponseWriter, r *http.Request) { } -func getClientset(w http.ResponseWriter, clustername string) (*kubernetes.Clientset, error) { +func getClientset(w http.ResponseWriter, clustername string) (*kubernetes.Clientset, *rest.Config, error) { kubeconfig, err := getClusterConfig(clustername) if err != nil { http.Error(w, "File to get kubeconfig", http.StatusInternalServerError) - return nil, err + return nil, nil, err } kubeconfigbyte := []byte(kubeconfig) config, err := clientcmd.RESTConfigFromKubeConfig(kubeconfigbyte) - if err != nil { - log.Fatal("Error creating clientSet:", err) + log.Println("Error creating rest config:", err) + return nil, nil, err } clientset, err := kubernetes.NewForConfig(config) if err != nil { - log.Fatal("Error creating clientSet:", err) + log.Println("Error creating clientSet:", err) + return nil, nil, err } - return clientset, nil + return clientset, config, nil } func ListUserClusters(w http.ResponseWriter, r *http.Request) { @@ -250,7 +256,7 @@ func Cluster_namespaces(w http.ResponseWriter, r *http.Request) { return } - clientset, err := getClientset(w, clustername) + clientset, _, err := getClientset(w, clustername) if err != nil { log.Fatal("Error getting clientset: ", err) @@ -285,7 +291,7 @@ func Cluster_services(w http.ResponseWriter, r *http.Request) { return } - clientset, err := getClientset(w, clustername) + clientset, _, err := getClientset(w, clustername) if err != nil { log.Fatal("Error getting clientset: ", err) @@ -335,7 +341,7 @@ func Cluster_statefulset(w http.ResponseWriter, r *http.Request) { return } - clientset, err := getClientset(w, clustername) + clientset, _, err := getClientset(w, clustername) if err != nil { log.Fatal("Error getting clientset: ", err) @@ -383,7 +389,7 @@ func Cluster_daemonsets(w http.ResponseWriter, r *http.Request) { return } - clientset, err := getClientset(w, clustername) + clientset, _, err := getClientset(w, clustername) if err != nil { log.Fatal("Error getting clientset: ", err) @@ -434,7 +440,7 @@ func Cluster_deployments(w http.ResponseWriter, r *http.Request) { return } - clientset, err := getClientset(w, clustername) + clientset, _, err := getClientset(w, clustername) if err != nil { log.Fatal("Error getting clientset: ", err) @@ -494,7 +500,7 @@ func Cluster_pods(w http.ResponseWriter, r *http.Request) { return } - clientset, err := getClientset(w, clustername) + clientset, _, err := getClientset(w, clustername) if err != nil { log.Fatal("Error getting clientset: ", err) @@ -586,7 +592,7 @@ func Cluster_jobs(w http.ResponseWriter, r *http.Request) { return } - clientset, err := getClientset(w, clustername) + clientset, _, err := getClientset(w, clustername) if err != nil { log.Fatal("Error getting Jobs: ", err) @@ -605,6 +611,25 @@ func Cluster_jobs(w http.ResponseWriter, r *http.Request) { for _, d := range jobs.Items { job.Name = d.Name job.Namespace = d.Namespace + + status := "Active" + if d.Status.Succeeded > 0 { + status = "Complete" + } else if d.Status.Failed > 0 { + status = "Failed" + } + job.Status = status + + completions := fmt.Sprintf("%d/%d", d.Status.Succeeded, *d.Spec.Completions) + job.Completion = completions + + duration := "-" + if d.Status.StartTime != nil && d.Status.CompletionTime != nil { + d := d.Status.CompletionTime.Time.Sub(d.Status.StartTime.Time) + duration = d.String() + } + job.Duration = duration + age := now.Sub(d.CreationTimestamp.Time) // same as kubectl AGE job.Age = human(age) @@ -629,7 +654,7 @@ func Cluster_replicasets(w http.ResponseWriter, r *http.Request) { return } - clientset, err := getClientset(w, clustername) + clientset, _, err := getClientset(w, clustername) if err != nil { log.Fatal("Error getting replicasets: ", err) @@ -648,6 +673,9 @@ func Cluster_replicasets(w http.ResponseWriter, r *http.Request) { for _, d := range replicasets.Items { replicaset.Name = d.Name replicaset.Namespace = d.Namespace + replicaset.Desired = *d.Spec.Replicas + replicaset.Current = d.Status.Replicas + replicaset.Ready = d.Status.ReadyReplicas age := now.Sub(d.CreationTimestamp.Time) // same as kubectl AGE replicaset.Age = human(age) Allreplicaset = append(Allreplicaset, replicaset) @@ -671,7 +699,7 @@ func Cluster_replicationcontrollers(w http.ResponseWriter, r *http.Request) { return } - clientset, err := getClientset(w, clustername) + clientset, _, err := getClientset(w, clustername) if err != nil { log.Fatal("Error getting Replicationcontrollers: ", err) @@ -713,7 +741,7 @@ func Cluster_cronjobs(w http.ResponseWriter, r *http.Request) { return } - clientset, err := getClientset(w, clustername) + clientset, _, err := getClientset(w, clustername) if err != nil { log.Fatal("Error getting Replicationcontrollers: ", err) @@ -734,6 +762,11 @@ func Cluster_cronjobs(w http.ResponseWriter, r *http.Request) { ReplicationController.Namespace = d.Namespace age := now.Sub(d.CreationTimestamp.Time) // same as kubectl AGE ReplicationController.Age = human(age) + + ReplicationController.Desired = *d.Spec.Replicas + ReplicationController.Current = d.Status.Replicas + ReplicationController.Ready = d.Status.ReadyReplicas + AllreplicationController = append(AllreplicationController, ReplicationController) } @@ -744,3 +777,100 @@ func Cluster_cronjobs(w http.ResponseWriter, r *http.Request) { json.NewEncoder(w).Encode(AllreplicationController) } + +func Pod_logs(w http.ResponseWriter, r *http.Request) { + clustername := r.URL.Query().Get("Name") + namespace := r.URL.Query().Get("Namespace") + podName := r.URL.Query().Get("Pod") + // containerName := podName + if clustername == "" { + http.Error(w, "Missing 'Name' parameter", http.StatusBadRequest) + return + } + + clientset, _, err := getClientset(w, clustername) + + if err != nil { + log.Fatal("Error getting Replicationcontrollers: ", err) + } + + podLogOpts := corev1.PodLogOptions{} + req := clientset.CoreV1().Pods(namespace).GetLogs(podName, &podLogOpts) + podLogs, err := req.Stream(context.TODO()) + if err != nil { + http.Error(w, "an error happend in getting logs", http.StatusBadRequest) + return + } + defer podLogs.Close() + + buf := new([]byte) + *buf, err = io.ReadAll(podLogs) + if err != nil { + http.Error(w, "an error happend in getting logs", http.StatusBadRequest) + return + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(string(*buf)) +} + +func Pod_exec(w http.ResponseWriter, r *http.Request) { + clustername := r.URL.Query().Get("Name") + namespace := r.URL.Query().Get("Namespace") + podName := r.URL.Query().Get("Pod") + command := r.URL.Query().Get("Command") + + if clustername == "" || namespace == "" || podName == "" { + http.Error(w, "Missing required parameters (Name, Namespace, Pod)", http.StatusBadRequest) + return + } + + clientset, config, err := getClientset(w, clustername) + if err != nil { + http.Error(w, "Error getting Kubernetes clientset", http.StatusInternalServerError) + return + } + + cmd := []string{command} + + req := clientset.CoreV1().RESTClient(). + Post(). + Resource("pods"). + Name(podName). + Namespace(namespace). + SubResource("exec"). + VersionedParams(&corev1.PodExecOptions{ + Command: cmd, + Stdout: true, + Stderr: true, + Stdin: false, + TTY: false, + }, scheme.ParameterCodec) + + exec, err := remotecommand.NewSPDYExecutor(config, "POST", req.URL()) + if err != nil { + http.Error(w, "Error creating executor: "+err.Error(), http.StatusInternalServerError) + return + } + + var stdout, stderr bytes.Buffer + + err = exec.StreamWithContext(r.Context(), remotecommand.StreamOptions{ + Stdout: &stdout, + Stderr: &stderr, + Tty: false, + }) + + if err != nil { + http.Error(w, "Error streaming command output: "+err.Error(), http.StatusInternalServerError) + return + } + + output := map[string]interface{}{ + "stdout": strings.Split(strings.TrimSpace(stdout.String()), "\n"), + "stderr": strings.TrimSpace(stderr.String()), + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(output) +} diff --git a/main.go b/main.go index da35ce4..1815cb0 100644 --- a/main.go +++ b/main.go @@ -140,6 +140,9 @@ func main() { router.HandleFunc("/cluster_replicasets", handler.Cluster_replicasets) router.HandleFunc("/cluster_replicationcontrollers", handler.Cluster_replicationcontrollers) + router.HandleFunc("/pod_logs", handler.Pod_logs) + router.HandleFunc("/pod_exec", handler.Pod_exec) + //handler.RegsiterClusterRoute(router) // Enable CORS c := cors.New(cors.Options{