add log exec and complate workloads
This commit is contained in:
@@ -1,10 +1,12 @@
|
||||
package handler
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"main/argohandler"
|
||||
"main/db"
|
||||
@@ -15,6 +17,7 @@ import (
|
||||
|
||||
appsv1 "k8s.io/api/apps/v1"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/kubectl/pkg/scheme"
|
||||
|
||||
// "github.com/gorilla/mux"
|
||||
|
||||
@@ -22,7 +25,9 @@ import (
|
||||
"go.mongodb.org/mongo-driver/bson/primitive"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/tools/clientcmd"
|
||||
"k8s.io/client-go/tools/remotecommand"
|
||||
)
|
||||
|
||||
type Cluster struct {
|
||||
@@ -93,9 +98,9 @@ type Jobs struct {
|
||||
|
||||
type Replicaset struct {
|
||||
Name string `json:name`
|
||||
Desired string `json:desired`
|
||||
Current string `json:current`
|
||||
Ready string `json:Ready`
|
||||
Desired int32 `json:desired`
|
||||
Current int32 `json:current`
|
||||
Ready int32 `json:Ready`
|
||||
Age string `json:age`
|
||||
Namespace string `json:name`
|
||||
}
|
||||
@@ -103,9 +108,9 @@ type Replicaset struct {
|
||||
type ReplicationController struct {
|
||||
Namespace string `json:name`
|
||||
Name string `json:name`
|
||||
Desired string `json:desired`
|
||||
Current string `json:current`
|
||||
Ready string `json:Ready`
|
||||
Desired int32 `json:desired`
|
||||
Current int32 `json:current`
|
||||
Ready int32 `json:Ready`
|
||||
Age string `json:age`
|
||||
}
|
||||
|
||||
@@ -209,27 +214,28 @@ func CreateClusterHandler(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
}
|
||||
|
||||
func getClientset(w http.ResponseWriter, clustername string) (*kubernetes.Clientset, error) {
|
||||
func getClientset(w http.ResponseWriter, clustername string) (*kubernetes.Clientset, *rest.Config, error) {
|
||||
|
||||
kubeconfig, err := getClusterConfig(clustername)
|
||||
if err != nil {
|
||||
http.Error(w, "File to get kubeconfig", http.StatusInternalServerError)
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
kubeconfigbyte := []byte(kubeconfig)
|
||||
config, err := clientcmd.RESTConfigFromKubeConfig(kubeconfigbyte)
|
||||
|
||||
if err != nil {
|
||||
log.Fatal("Error creating clientSet:", err)
|
||||
log.Println("Error creating rest config:", err)
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
clientset, err := kubernetes.NewForConfig(config)
|
||||
if err != nil {
|
||||
log.Fatal("Error creating clientSet:", err)
|
||||
log.Println("Error creating clientSet:", err)
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
return clientset, nil
|
||||
return clientset, config, nil
|
||||
}
|
||||
|
||||
func ListUserClusters(w http.ResponseWriter, r *http.Request) {
|
||||
@@ -250,7 +256,7 @@ func Cluster_namespaces(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
clientset, err := getClientset(w, clustername)
|
||||
clientset, _, err := getClientset(w, clustername)
|
||||
|
||||
if err != nil {
|
||||
log.Fatal("Error getting clientset: ", err)
|
||||
@@ -285,7 +291,7 @@ func Cluster_services(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
clientset, err := getClientset(w, clustername)
|
||||
clientset, _, err := getClientset(w, clustername)
|
||||
|
||||
if err != nil {
|
||||
log.Fatal("Error getting clientset: ", err)
|
||||
@@ -335,7 +341,7 @@ func Cluster_statefulset(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
clientset, err := getClientset(w, clustername)
|
||||
clientset, _, err := getClientset(w, clustername)
|
||||
|
||||
if err != nil {
|
||||
log.Fatal("Error getting clientset: ", err)
|
||||
@@ -383,7 +389,7 @@ func Cluster_daemonsets(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
clientset, err := getClientset(w, clustername)
|
||||
clientset, _, err := getClientset(w, clustername)
|
||||
|
||||
if err != nil {
|
||||
log.Fatal("Error getting clientset: ", err)
|
||||
@@ -434,7 +440,7 @@ func Cluster_deployments(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
clientset, err := getClientset(w, clustername)
|
||||
clientset, _, err := getClientset(w, clustername)
|
||||
|
||||
if err != nil {
|
||||
log.Fatal("Error getting clientset: ", err)
|
||||
@@ -494,7 +500,7 @@ func Cluster_pods(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
clientset, err := getClientset(w, clustername)
|
||||
clientset, _, err := getClientset(w, clustername)
|
||||
|
||||
if err != nil {
|
||||
log.Fatal("Error getting clientset: ", err)
|
||||
@@ -586,7 +592,7 @@ func Cluster_jobs(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
clientset, err := getClientset(w, clustername)
|
||||
clientset, _, err := getClientset(w, clustername)
|
||||
|
||||
if err != nil {
|
||||
log.Fatal("Error getting Jobs: ", err)
|
||||
@@ -605,6 +611,25 @@ func Cluster_jobs(w http.ResponseWriter, r *http.Request) {
|
||||
for _, d := range jobs.Items {
|
||||
job.Name = d.Name
|
||||
job.Namespace = d.Namespace
|
||||
|
||||
status := "Active"
|
||||
if d.Status.Succeeded > 0 {
|
||||
status = "Complete"
|
||||
} else if d.Status.Failed > 0 {
|
||||
status = "Failed"
|
||||
}
|
||||
job.Status = status
|
||||
|
||||
completions := fmt.Sprintf("%d/%d", d.Status.Succeeded, *d.Spec.Completions)
|
||||
job.Completion = completions
|
||||
|
||||
duration := "-"
|
||||
if d.Status.StartTime != nil && d.Status.CompletionTime != nil {
|
||||
d := d.Status.CompletionTime.Time.Sub(d.Status.StartTime.Time)
|
||||
duration = d.String()
|
||||
}
|
||||
job.Duration = duration
|
||||
|
||||
age := now.Sub(d.CreationTimestamp.Time) // same as kubectl AGE
|
||||
job.Age = human(age)
|
||||
|
||||
@@ -629,7 +654,7 @@ func Cluster_replicasets(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
clientset, err := getClientset(w, clustername)
|
||||
clientset, _, err := getClientset(w, clustername)
|
||||
|
||||
if err != nil {
|
||||
log.Fatal("Error getting replicasets: ", err)
|
||||
@@ -648,6 +673,9 @@ func Cluster_replicasets(w http.ResponseWriter, r *http.Request) {
|
||||
for _, d := range replicasets.Items {
|
||||
replicaset.Name = d.Name
|
||||
replicaset.Namespace = d.Namespace
|
||||
replicaset.Desired = *d.Spec.Replicas
|
||||
replicaset.Current = d.Status.Replicas
|
||||
replicaset.Ready = d.Status.ReadyReplicas
|
||||
age := now.Sub(d.CreationTimestamp.Time) // same as kubectl AGE
|
||||
replicaset.Age = human(age)
|
||||
Allreplicaset = append(Allreplicaset, replicaset)
|
||||
@@ -671,7 +699,7 @@ func Cluster_replicationcontrollers(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
clientset, err := getClientset(w, clustername)
|
||||
clientset, _, err := getClientset(w, clustername)
|
||||
|
||||
if err != nil {
|
||||
log.Fatal("Error getting Replicationcontrollers: ", err)
|
||||
@@ -713,7 +741,7 @@ func Cluster_cronjobs(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
clientset, err := getClientset(w, clustername)
|
||||
clientset, _, err := getClientset(w, clustername)
|
||||
|
||||
if err != nil {
|
||||
log.Fatal("Error getting Replicationcontrollers: ", err)
|
||||
@@ -734,6 +762,11 @@ func Cluster_cronjobs(w http.ResponseWriter, r *http.Request) {
|
||||
ReplicationController.Namespace = d.Namespace
|
||||
age := now.Sub(d.CreationTimestamp.Time) // same as kubectl AGE
|
||||
ReplicationController.Age = human(age)
|
||||
|
||||
ReplicationController.Desired = *d.Spec.Replicas
|
||||
ReplicationController.Current = d.Status.Replicas
|
||||
ReplicationController.Ready = d.Status.ReadyReplicas
|
||||
|
||||
AllreplicationController = append(AllreplicationController, ReplicationController)
|
||||
}
|
||||
|
||||
@@ -744,3 +777,100 @@ func Cluster_cronjobs(w http.ResponseWriter, r *http.Request) {
|
||||
json.NewEncoder(w).Encode(AllreplicationController)
|
||||
|
||||
}
|
||||
|
||||
func Pod_logs(w http.ResponseWriter, r *http.Request) {
|
||||
clustername := r.URL.Query().Get("Name")
|
||||
namespace := r.URL.Query().Get("Namespace")
|
||||
podName := r.URL.Query().Get("Pod")
|
||||
// containerName := podName
|
||||
if clustername == "" {
|
||||
http.Error(w, "Missing 'Name' parameter", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
clientset, _, err := getClientset(w, clustername)
|
||||
|
||||
if err != nil {
|
||||
log.Fatal("Error getting Replicationcontrollers: ", err)
|
||||
}
|
||||
|
||||
podLogOpts := corev1.PodLogOptions{}
|
||||
req := clientset.CoreV1().Pods(namespace).GetLogs(podName, &podLogOpts)
|
||||
podLogs, err := req.Stream(context.TODO())
|
||||
if err != nil {
|
||||
http.Error(w, "an error happend in getting logs", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
defer podLogs.Close()
|
||||
|
||||
buf := new([]byte)
|
||||
*buf, err = io.ReadAll(podLogs)
|
||||
if err != nil {
|
||||
http.Error(w, "an error happend in getting logs", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(string(*buf))
|
||||
}
|
||||
|
||||
func Pod_exec(w http.ResponseWriter, r *http.Request) {
|
||||
clustername := r.URL.Query().Get("Name")
|
||||
namespace := r.URL.Query().Get("Namespace")
|
||||
podName := r.URL.Query().Get("Pod")
|
||||
command := r.URL.Query().Get("Command")
|
||||
|
||||
if clustername == "" || namespace == "" || podName == "" {
|
||||
http.Error(w, "Missing required parameters (Name, Namespace, Pod)", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
clientset, config, err := getClientset(w, clustername)
|
||||
if err != nil {
|
||||
http.Error(w, "Error getting Kubernetes clientset", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
cmd := []string{command}
|
||||
|
||||
req := clientset.CoreV1().RESTClient().
|
||||
Post().
|
||||
Resource("pods").
|
||||
Name(podName).
|
||||
Namespace(namespace).
|
||||
SubResource("exec").
|
||||
VersionedParams(&corev1.PodExecOptions{
|
||||
Command: cmd,
|
||||
Stdout: true,
|
||||
Stderr: true,
|
||||
Stdin: false,
|
||||
TTY: false,
|
||||
}, scheme.ParameterCodec)
|
||||
|
||||
exec, err := remotecommand.NewSPDYExecutor(config, "POST", req.URL())
|
||||
if err != nil {
|
||||
http.Error(w, "Error creating executor: "+err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
var stdout, stderr bytes.Buffer
|
||||
|
||||
err = exec.StreamWithContext(r.Context(), remotecommand.StreamOptions{
|
||||
Stdout: &stdout,
|
||||
Stderr: &stderr,
|
||||
Tty: false,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
http.Error(w, "Error streaming command output: "+err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
output := map[string]interface{}{
|
||||
"stdout": strings.Split(strings.TrimSpace(stdout.String()), "\n"),
|
||||
"stderr": strings.TrimSpace(stderr.String()),
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(output)
|
||||
}
|
||||
|
||||
3
main.go
3
main.go
@@ -140,6 +140,9 @@ func main() {
|
||||
router.HandleFunc("/cluster_replicasets", handler.Cluster_replicasets)
|
||||
router.HandleFunc("/cluster_replicationcontrollers", handler.Cluster_replicationcontrollers)
|
||||
|
||||
router.HandleFunc("/pod_logs", handler.Pod_logs)
|
||||
router.HandleFunc("/pod_exec", handler.Pod_exec)
|
||||
|
||||
//handler.RegsiterClusterRoute(router)
|
||||
// Enable CORS
|
||||
c := cors.New(cors.Options{
|
||||
|
||||
Reference in New Issue
Block a user