The Joblet storage layer provides a comprehensive solution for managing persistent and temporary storage in isolated job environments. It supports multiple storage backends, enforces disk quotas, and ensures complete isolation between jobs while maintaining performance and security.
Storage Layer
├── Volume Management
│ ├── Volume Types (filesystem, memory)
│ ├── Volume Lifecycle (create, mount, unmount, delete)
│ └── Volume Store (state management)
├── Filesystem Isolation
│ ├── Chroot Environments
│ ├── Mount Namespaces
│ └── Bind Mount Management
├── Disk Quota Management
│ ├── Default Work Directory (1MB tmpfs)
│ ├── Volume Size Limits
│ └── I/O Bandwidth Throttling
└── Storage APIs
├── gRPC Volume Service
├── CLI Commands
└── Internal Interfaces
/var/lib/joblet/volumes/<volume-id>
/var/lib/joblet/volumes/<volume-id>
(tmpfs)// Volume creation flow
1. Client: rnx volume create mydata --size = 1GB --type = filesystem
2. Server: Validates request and size constraints
3. VolumeManager: Creates volume directory/tmpfs
4. VolumeStore: Records volume metadata in memory
5. Server: Returns volume ID to client
// Volume mounting flow
1. Job execution request includes volume IDs
2. JobExecutor: Validates volume access permissions
3. VolumeManager: Prepares mount points
4. Isolation layer: Bind mounts volumes into job namespace
5. Job: Accesses volumes at /volumes/<name>
// Volume cleanup flow
1. Job completion triggers unmount
2. VolumeManager: Unmounts volumes from job namespace
3. Memory volumes: Data cleared automatically
4. Filesystem volumes: Data persists for next use
type VolumeStore struct {
mu sync.RWMutex
volumes map[string]*domain.Volume
}
type Volume struct {
ID string
Name string
Type VolumeType // FILESYSTEM or MEMORY
Size int64 // Size in bytes
Path string // Host path
MountPath string // Path inside container
CreatedAt time.Time
LastUsed time.Time
InUse bool
JobID string // Current job using volume
}
CLONE_NEWNS
flag/opt/joblet/jobs/<job-id>
// Standard job filesystem layout
/opt/joblet/jobs/<job-id>/
├── bin/ # Essential binaries (busybox)
├── lib/ # Required libraries
├── lib64/ # 64-bit libraries
├── etc/ # Minimal configuration
├── proc/ # Process information (read-only)
├── dev/ # Device files (minimal set)
├── tmp/ # Temporary space
├── work/ # Job workspace (1MB default)
└── volumes/ # Mounted volumes
├── data/ # Example: filesystem volume
└── cache/ # Example: memory volume
func validatePath(base, target string) error {
cleaned := filepath.Clean(target)
abs, err := filepath.Abs(filepath.Join(base, cleaned))
if err != nil {
return err
}
if !strings.HasPrefix(abs, base) {
return ErrPathTraversal
}
return nil
}
/proc
: Read-only, filtered view/sys
: Not mounted (security)// cgroup v2 I/O limits
type IOLimits struct {
ReadBPS uint64 // Read bytes per second
WriteBPS uint64 // Write bytes per second
ReadIOPS uint64 // Read operations per second
WriteIOPS uint64 // Write operations per second
}
// Applied via io.max cgroup controller
// Example: "8:0 rbps=10485760 wbps=10485760"
service VolumeService {
// Volume lifecycle operations
rpc CreateVolume(CreateVolumeRequest) returns (CreateVolumeResponse);
rpc ListVolumes(ListVolumesRequest) returns (ListVolumesResponse);
rpc GetVolume(GetVolumeRequest) returns (GetVolumeResponse);
rpc DeleteVolume(DeleteVolumeRequest) returns (DeleteVolumeResponse);
// Volume usage operations
rpc AttachVolume(AttachVolumeRequest) returns (AttachVolumeResponse);
rpc DetachVolume(DetachVolumeRequest) returns (DetachVolumeResponse);
}
message CreateVolumeRequest {
string name = 1;
VolumeType type = 2;
int64 size_bytes = 3;
map<string, string> labels = 4;
}
message Volume {
string id = 1;
string name = 2;
VolumeType type = 3;
int64 size_bytes = 4;
string status = 5;
google.protobuf.Timestamp created_at = 6;
google.protobuf.Timestamp last_used = 7;
string current_job_id = 8;
}
# Volume management commands
rnx volume create <name> [options]
--size=SIZE Volume size (e.g., 1GB, 512MB)
--type=TYPE Volume type: filesystem|memory
--label=KEY=VAL Metadata labels
rnx volume list [options]
--filter=KEY=VAL Filter by label
--format=FORMAT Output format: table|json|yaml
rnx volume inspect <name>
Shows detailed volume information
rnx volume remove <name> [options]
--force Remove even if in use
# Job execution with volumes
rnx run --volume=data:/data --volume=cache:/cache <command>
// VolumeManager interface
type VolumeManager interface {
CreateVolume(ctx context.Context, req *CreateVolumeRequest) (*Volume, error)
GetVolume(ctx context.Context, id string) (*Volume, error)
ListVolumes(ctx context.Context, filter *VolumeFilter) ([]*Volume, error)
DeleteVolume(ctx context.Context, id string) error
// Job integration
PrepareVolumes(ctx context.Context, jobID string, volumeIDs []string) error
CleanupVolumes(ctx context.Context, jobID string) error
}
// StorageProvider interface (for extensibility)
type StorageProvider interface {
Create(ctx context.Context, volume *Volume) error
Delete(ctx context.Context, volume *Volume) error
Mount(ctx context.Context, volume *Volume, target string) error
Unmount(ctx context.Context, volume *Volume) error
GetUsage(ctx context.Context, volume *Volume) (*UsageInfo, error)
}
func (vm *VolumeManager) CreateVolume(ctx context.Context, req *CreateVolumeRequest) (*Volume, error) {
// 1. Validate request
if err := validateVolumeRequest(req); err != nil {
return nil, err
}
// 2. Generate unique ID
volumeID := generateVolumeID()
volumePath := filepath.Join(vm.baseDir, volumeID)
// 3. Create volume based on type
switch req.Type {
case VolumeTypeFilesystem:
if err := os.MkdirAll(volumePath, 0755); err != nil {
return nil, err
}
// Set up disk quota if supported
if vm.quotaEnabled {
setDiskQuota(volumePath, req.SizeBytes)
}
case VolumeTypeMemory:
// Memory volumes created on-demand during mount
// Just validate size doesn't exceed system limits
if req.SizeBytes > vm.maxMemoryVolumeSize {
return nil, ErrVolumeTooLarge
}
}
// 4. Create volume record
volume := &Volume{
ID: volumeID,
Name: req.Name,
Type: req.Type,
Size: req.SizeBytes,
Path: volumePath,
CreatedAt: time.Now(),
Status: VolumeStatusAvailable,
}
// 5. Store in volume store
if err := vm.store.Add(volume); err != nil {
// Cleanup on failure
os.RemoveAll(volumePath)
return nil, err
}
return volume, nil
}
func (je *JobExecutor) mountVolumes(job *Job) error {
for _, volumeMount := range job.VolumeMounts {
volume, err := je.volumeManager.GetVolume(volumeMount.VolumeID)
if err != nil {
return fmt.Errorf("volume %s not found: %w", volumeMount.VolumeID, err)
}
// Create mount point in job filesystem
targetPath := filepath.Join(job.RootFS, "volumes", volumeMount.MountPath)
if err := os.MkdirAll(filepath.Dir(targetPath), 0755); err != nil {
return err
}
switch volume.Type {
case VolumeTypeFilesystem:
// Bind mount filesystem volume
if err := mount.BindMount(volume.Path, targetPath, false); err != nil {
return err
}
case VolumeTypeMemory:
// Create tmpfs mount
opts := fmt.Sprintf("size=%d", volume.Size)
if err := mount.TmpfsMount(targetPath, opts); err != nil {
return err
}
}
// Record mount for cleanup
job.mounts = append(job.mounts, targetPath)
}
return nil
}
func (vm *VolumeManager) cleanupUnusedVolumes(ctx context.Context) {
ticker := time.NewTicker(vm.cleanupInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
volumes, _ := vm.store.List(&VolumeFilter{
Status: VolumeStatusAvailable,
})
for _, volume := range volumes {
// Clean up memory volumes not used recently
if volume.Type == VolumeTypeMemory {
if time.Since(volume.LastUsed) > vm.memoryVolumeTimeout {
vm.DeleteVolume(ctx, volume.ID)
}
}
// Clean up orphaned mounts
if volume.InUse && volume.JobID != "" {
if !vm.jobStore.Exists(volume.JobID) {
vm.DetachVolume(ctx, volume.ID)
}
}
}
}
}
}
// Supported size formats
"512" -> 512 bytes
"10KB" -> 10 * 1024 bytes
"5MB" -> 5 * 1024 * 1024 bytes
"1GB" -> 1 * 1024 * 1024 * 1024 bytes
"100Mi" -> 100 * 1024 * 1024 bytes (IEC format)
"2Gi" -> 2 * 1024 * 1024 * 1024 bytes (IEC format)