This commit is contained in:
2026-02-04 03:55:38 +08:00
parent 208786aa90
commit 62bc88477a
6 changed files with 302 additions and 72 deletions

View File

@@ -1,11 +1,13 @@
package transfer
import (
"archive/tar"
"bytes"
"encoding/json"
"fmt"
"io"
"log/slog"
"math"
"mesh-drop/internal/discovery"
"net/http"
"net/url"
@@ -44,6 +46,39 @@ func (s *Service) SendFile(target *discovery.Peer, targetIP string, filePath str
s.processTransfer(target, targetIP, task, file)
}
func (s *Service) SendFolder(target *discovery.Peer, targetIP string, folderPath string) {
size, err := calculateTarSize(folderPath)
if err != nil {
slog.Error("Failed to calculate folder size", "path", folderPath, "error", err, "component", "transfer-client")
return
}
r, w := io.Pipe()
go func() {
defer w.Close()
if err := streamFolderToTar(w, folderPath); err != nil {
slog.Error("Failed to stream folder to tar", "error", err, "component", "transfer-client")
w.CloseWithError(err)
}
}()
task := Transfer{
ID: uuid.New().String(),
FileName: filepath.Base(folderPath),
FileSize: size,
Sender: Sender{
ID: s.discoveryService.GetID(),
Name: s.discoveryService.GetName(),
},
Type: TransferTypeSend,
Status: TransferStatusPending,
ContentType: ContentTypeFolder,
}
s.processTransfer(target, targetIP, task, r)
}
func (s *Service) SendText(target *discovery.Peer, targetIP string, text string) {
reader := bytes.NewReader([]byte(text))
task := Transfer{
@@ -62,6 +97,116 @@ func (s *Service) SendText(target *discovery.Peer, targetIP string, text string)
s.processTransfer(target, targetIP, task, reader)
}
type countWriter struct {
n int64
}
func (w *countWriter) Write(p []byte) (int, error) {
w.n += int64(len(p))
return len(p), nil
}
func calculateTarSize(srcPath string) (int64, error) {
var size int64
err := filepath.Walk(srcPath, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
// 计算相对路径
relPath, err := filepath.Rel(srcPath, path)
if err != nil {
return err
}
if relPath == "." {
return nil
}
// 使用 tar.FileInfoHeader 计算 header
header, err := tar.FileInfoHeader(info, "")
if err != nil {
return err
}
// 保持与 streamFolderToTar 一致
header.Name = filepath.ToSlash(relPath)
if info.IsDir() {
header.Name += "/"
}
cw := &countWriter{}
tw := tar.NewWriter(cw)
if err := tw.WriteHeader(header); err != nil {
return err
}
// tw.WriteHeader 写入 header blocks包括扩展头
size += cw.n
if !info.IsDir() {
// 文件内容大小 + 填充
fileSize := info.Size()
blocks := math.Ceil(float64(fileSize) / 512)
size += int64(blocks) * 512
}
return nil
})
// 两个 512 字节的空块作为结束标记
size += 1024
return size, err
}
func streamFolderToTar(w io.Writer, srcPath string) error {
tw := tar.NewWriter(w)
defer tw.Close()
return filepath.Walk(srcPath, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
relPath, err := filepath.Rel(srcPath, path)
if err != nil {
return err
}
if relPath == "." {
return nil
}
slog.Debug("Processing file", "path", path, "relPath", relPath, "component", "transfer-client")
header, err := tar.FileInfoHeader(info, "")
if err != nil {
return err
}
// tar 文件名使用正斜杠
header.Name = filepath.ToSlash(relPath)
if info.IsDir() {
header.Name += "/"
}
if err := tw.WriteHeader(header); err != nil {
return err
}
if !info.IsDir() {
file, err := os.Open(path)
if err != nil {
return err
}
defer file.Close()
if _, err := io.Copy(tw, file); err != nil {
return err
}
}
return nil
})
}
func (s *Service) processTransfer(target *discovery.Peer, targetIP string, task Transfer, payload io.Reader) {
s.transferList.Store(task.ID, task)
s.app.Event.Emit("transfer:refreshList")

View File

@@ -1,6 +1,7 @@
package transfer
import (
"archive/tar"
"bytes"
"fmt"
"io"
@@ -9,6 +10,7 @@ import (
"net/http"
"os"
"path/filepath"
"strings"
"sync"
"github.com/gin-gonic/gin"
@@ -215,7 +217,7 @@ func (s *Service) handleUpload(c *gin.Context) {
s.transferList.Store(task.ID, task)
s.app.Event.Emit("transfer:refreshList")
case ContentTypeFolder:
// s.receiveFolder(c, savePath, task)
s.receiveFolder(c, savePath, &task)
}
}
@@ -230,7 +232,7 @@ func (s *Service) receive(c *gin.Context, task *Transfer, writer io.Writer) {
Total: total,
Speed: speed,
}
s.transferList.Store(task.ID, task)
s.transferList.Store(task.ID, *task)
s.app.Event.Emit("transfer:refreshList")
},
}
@@ -245,7 +247,7 @@ func (s *Service) receive(c *gin.Context, task *Transfer, writer io.Writer) {
slog.Error("Failed to write file", "error", err, "component", "transfer")
task.Status = TransferStatusError
task.ErrorMsg = fmt.Errorf("failed to write file: %v", err).Error()
s.transferList.Store(task.ID, task)
s.transferList.Store(task.ID, *task)
// 通知前端传输失败
s.app.Event.Emit("transfer:refreshList")
return
@@ -257,7 +259,91 @@ func (s *Service) receive(c *gin.Context, task *Transfer, writer io.Writer) {
})
// 传输成功,任务结束
task.Status = TransferStatusCompleted
s.transferList.Store(task.ID, task)
s.transferList.Store(task.ID, *task)
s.app.Event.Emit("transfer:refreshList")
}
func (s *Service) receiveFolder(c *gin.Context, savePath string, task *Transfer) {
// 创建根目录
destPath := filepath.Join(savePath, task.FileName)
if err := os.MkdirAll(destPath, 0755); err != nil {
c.JSON(http.StatusInternalServerError, TransferUploadResponse{
ID: task.ID,
Message: "Receiver failed to create folder",
})
return
}
// 包装 reader用于计算进度
reader := &PassThroughReader{
Reader: c.Request.Body,
total: task.FileSize,
callback: func(current, total int64, speed float64) {
task.Progress = Progress{
Current: current,
Total: total,
Speed: speed,
}
s.transferList.Store(task.ID, *task)
s.app.Event.Emit("transfer:refreshList")
},
}
tr := tar.NewReader(reader)
for {
header, err := tr.Next()
if err == io.EOF {
break
}
if err != nil {
c.JSON(http.StatusInternalServerError, TransferUploadResponse{
ID: task.ID,
Message: "Stream error",
})
slog.Error("Tar stream error", "error", err)
return
}
target := filepath.Join(destPath, header.Name)
// 确保路径没有越界
if !strings.HasPrefix(target, filepath.Clean(destPath)+string(os.PathSeparator)) {
// 非法路径
continue
}
switch header.Typeflag {
case tar.TypeDir:
if err := os.MkdirAll(target, 0755); err != nil {
slog.Error("Failed to create dir", "path", target, "error", err)
}
case tar.TypeReg:
f, err := os.OpenFile(target, os.O_CREATE|os.O_RDWR, os.FileMode(header.Mode))
if err != nil {
slog.Error("Failed to create file", "path", target, "error", err)
continue
}
if _, err := io.Copy(f, tr); err != nil {
f.Close()
slog.Error("Failed to write file", "path", target, "error", err)
c.JSON(http.StatusInternalServerError, TransferUploadResponse{
ID: task.ID,
Message: "Write error",
})
return
}
f.Close()
}
}
c.JSON(http.StatusOK, TransferUploadResponse{
ID: task.ID,
Message: "Folder received successfully",
})
task.Progress.Total = task.FileSize
task.Progress.Current = task.FileSize
task.Status = TransferStatusCompleted
s.transferList.Store(task.ID, *task)
s.app.Event.Emit("transfer:refreshList")
}