main
1package main
2
3import (
4 "fmt"
5 "image"
6 _ "image/jpeg"
7 "io/fs"
8 "log"
9 "os"
10 "path/filepath"
11 "sync"
12)
13
14// WorkerPool size
15const workerCount = 16
16const srcDir = "./all/"
17const badDir = "./bad/"
18
19func main() {
20 filesChan := make(chan string, 100)
21 var wg sync.WaitGroup
22
23 for i := 0; i < workerCount; i++ {
24 wg.Add(1)
25 go worker(filesChan, &wg)
26 }
27
28 // Scan directory for files
29 go func() {
30 filepath.Walk(srcDir, func(path string, info fs.FileInfo, err error) error {
31 if err != nil {
32 return err
33 }
34 if !info.IsDir() {
35 // Only process files
36 filesChan <- path
37 }
38 return nil
39 })
40 fmt.Println("walk complete closing channel")
41 close(filesChan)
42 }()
43
44 wg.Wait()
45 fmt.Println("All files processed.")
46}
47
48// worker processes files received from the channel.
49func worker(filesChan chan string, wg *sync.WaitGroup) {
50 defer wg.Done()
51 for file := range filesChan {
52 err := processFile(file)
53 if err != nil {
54 log.Printf("Error processing file %s: %v\n", file, err)
55 }
56 }
57}
58
59func processFile(filePath string) error {
60
61 // Open the file
62 file, err := os.Open(filePath)
63 if err != nil {
64 return fmt.Errorf("failed to open file: %w", err)
65 }
66 defer file.Close()
67
68 // Decode the image to check for issues
69 _, _, err = image.Decode(file)
70 if err != nil {
71
72 file.Close()
73 moveDest := filepath.Join(badDir, filepath.Base(filePath))
74 os.Rename(filePath, moveDest)
75 log.Printf("moving bad file %s: %v\n", filePath, err)
76 return nil
77 }
78
79 return nil
80}