Spark本地提交Volcano调度支持方案

【Spark本地提交Volcano调度支持方案】重构Volcano客户端vcctl
1.1新增命令行
在vcctl中新增命令行

rootCmd.AddCommand(buildSpark()) rootCmd.AddCommand(buildSparkOperator())

1.2创建命令行根及指令
func buildSparkOperator() *cobra.Command {// 创建根 sparkOperatorCmd := &cobra.Command{ Use:"spark-operator", Short: "spark-operator cmd", }sparkSubmitCmd := &cobra.Command{ Use:"spark-operator", Short: "spark operator", Run: func(cmd *cobra.Command, args []string) { checkError(cmd, spark_operator.RunSparkOperatorSubmit()) }, } // 初始化flag spark_operator.InitSparkOperatorFlags(sparkSubmitCmd) sparkOperatorCmd.AddCommand(sparkSubmitCmd)return sparkOperatorCmd }

1.3构造yaml文件并提交本地jar包至文件服务器
func RunSparkOperatorSubmit() error {//获取文件名称 ///opt/spark-examples_2.11-2.4.4.jar filePathSplit := strings.Split(cf.FilePath, "/")cf.FileName = filePathSplit[len(filePathSplit)-1]//修改镜像内文件路径 sf.Spec.MainApplicationFile = "local:///opt/spark/examples/target/scala-2.11/jars/" + cf.FileNamesf.Spec.Volumes.Volume = []Volume{{Name: cf.VolumeName, HostPath: HostPath{cf.HostPath, cf.HostPathType}}}//构造标签 sf.Spec.Driver.Labels = map[string]string{cf.DriverLabel: cf.DriverLabelValue, "odin.k8s.io/spark": "true", "odin.io/filename": cf.FileName, "odin.registry/addr": "10.180.210.196"}sf.Spec.Driver.VolumeMounts.VolumeMount = []VolumeMount{{Name: cf.DriverVolumeMountName, MountPath: cf.DriverVolumeMountPath}}sf.Spec.Executor.Labels = map[string]string{cf.ExecutorLabel: cf.ExecutorLabelValue, "odin.k8s.io/spark": "true", "odin.io/filename": cf.FileName, "odin.registry/addr": "10.180.210.196"}sf.Spec.Executor.VolumeMounts.VolumeMount = []VolumeMount{{Name: cf.ExecutorVolumeMountName, MountPath: cf.ExecutorVolumeMountPath}}//构建yaml文件流 fs, err := yaml.Marshal(&sf) if err != nil { println(err.Error()) }//创建yaml文件 f, err := os.Create(sf.Metadata.Name + ".yaml") if err != nil { fmt.Println(err) }//删除多余标签行用于匹配api rmVolume := regexp.MustCompile("volume:\n") rmVolumeMount := regexp.MustCompile("volumeMount:\n") yamlString := string(fs) yamlString = rmVolume.ReplaceAllString(yamlString, "") yamlString = rmVolumeMount.ReplaceAllString(yamlString, "")//写入文件 _, err = f.WriteString(yamlString) if err != nil { fmt.Println(err) f.Close() }//上传jar包 uploadFile(cf.FilePath, "http://10.180.210.37:33332/upload") //执行命令行 cmd := exec.Command("/bin/bash", "-c", "kubectl apply -f "+f.Name()) output, err := cmd.Output() if err != nil { return err } fmt.Printf("Execute Shell:%s finished with output:\n%s", cmd, string(output))return err }

yaml文件结构体定义如下
type sparkOperatorFlags struct { ApiVersion string `yaml:"apiVersion"` Kindstring `yaml:"kind"` Metadatastruct { Namestring `yaml:"name"` Namespace string `yaml:"namespace"` } Spec struct { Typesstring `yaml:"type"` Modestring `yaml:"mode"` Imagestring `yaml:"image"` ImagePullPolicystring `yaml:"imagePullPolicy"` MainClassstring `yaml:"mainClass"` MainApplicationFile string `yaml:"mainApplicationFile"` SparkVersionstring `yaml:"sparkVersion"` BatchSchedulerstring `yaml:"batchScheduler"` RestartPolicystruct { Types string `yaml:"type"` } Volumes struct { Volume []Volume `yaml:"volume"` } Driver struct { Coresint`yaml:"cores"` CoreLimitstring`yaml:"coreLimit"` Memorystring`yaml:"memory"` Labelsmap[string]string `yaml:"labels"` ServiceAccount string`yaml:"serviceAccount"` VolumeMountsstruct { VolumeMount []VolumeMount `yaml:"volumeMount"` } } Executor struct { Coresint`yaml:"cores"` Instancesint`yaml:"instances"` Memorystring`yaml:"memory"` Labelsmap[string]string `yaml:"labels"` VolumeMounts struct { VolumeMount []VolumeMount `yaml:"volumeMount"` } } } }type VolumeMount struct { Namestring `yaml:"name"` MountPath string `yaml:"mountPath"` }type Volume struct { Namestring`yaml:"name"` HostPath HostPath `yaml:"hostPath"` }type HostPath struct { Pathstring `yaml:"path"` Types string `yaml:"type"` }

1.4修改webhook,使volcano能够拦截含有标签的请求
const ( // DefaultQueue constant stores the name of the queue as "default" DefaultQueue = "default"defaultSchedulerName = "volcano"INIT_CONTAINER_NAME = "spark-init" ODIN_FILE_SERVER_ADDR = "10.180.210.37"//"odin-file-server" ODIN_FILE_SERVER_PORT = 80 ODIN_FILE_DOWNLOAD_KEY = "odin.io/filename" ODIN_IMAGE_REGISTRY_ADDR_KEY = "odin.registry/addr" ODIN_CONFIRM_SPARK_APP_KEY = "odin.k8s.io/spark" ODIN_APP_EXEC_PATH="/opt/spark/examples/target/scala-2.11/jars/" ODIN_BASE_IMAGE="library/centos-ssh:latest" )func init() { router.RegisterAdmission(service) }// 创建MutatingWebhookConfiguration对象 var service = &router.AdmissionService{ Path: "/pods/mutate", // 路由回调 Func: MutatePods,// 拦截匹配条件 MutatingConfig: &whv1beta1.MutatingWebhookConfiguration{ Webhooks: []whv1beta1.MutatingWebhook{{ Name: "mutatepod.volcano.sh", Rules: []whv1beta1.RuleWithOperations{ { Operations: []whv1beta1.OperationType{whv1beta1.Create}, Rule: whv1beta1.Rule{ APIGroups:[]string{""}, APIVersions: []string{"v1"}, Resources:[]string{"pods"}, }, }, }, }}, }, }

    推荐阅读