add: cancel transfer

This commit is contained in:
2026-02-04 15:06:41 +08:00
parent c2f3c2c3df
commit 68533dad31
9 changed files with 529 additions and 221 deletions

View File

@@ -3,7 +3,9 @@ package transfer
import (
"archive/tar"
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"log/slog"
@@ -18,6 +20,16 @@ import (
)
func (s *Service) SendFile(target *discovery.Peer, targetIP string, filePath string) {
taskID := uuid.New().String()
ctx, cancel := context.WithCancel(context.Background())
s.cancelMap.Store(taskID, cancel)
// 任务结束后清理 ctx
defer func() {
s.cancelMap.Delete(taskID)
cancel()
}()
file, err := os.Open(filePath)
if err != nil {
slog.Error("Failed to open file", "path", filePath, "error", err, "component", "transfer-client")
@@ -31,7 +43,7 @@ func (s *Service) SendFile(target *discovery.Peer, targetIP string, filePath str
}
task := Transfer{
ID: uuid.New().String(),
ID: taskID,
FileName: filepath.Base(filePath),
FileSize: stat.Size(),
Sender: Sender{
@@ -43,28 +55,52 @@ func (s *Service) SendFile(target *discovery.Peer, targetIP string, filePath str
ContentType: ContentTypeFile,
}
s.processTransfer(target, targetIP, task, file)
s.transferList.Store(task.ID, task)
s.app.Event.Emit("transfer:refreshList")
askResp, err := s.ask(ctx, target, targetIP, task)
if err != nil {
if errors.Is(err, context.Canceled) {
task.Status = TransferStatusCanceled
} else {
// 如果请求发送失败,更新状态为 Error
task.Status = TransferStatusError
task.ErrorMsg = fmt.Sprintf("Failed to connect to receiver: %v", err)
}
s.transferList.Store(task.ID, task)
s.app.Event.Emit("transfer:refreshList")
return
}
if askResp.Accepted {
s.processTransfer(ctx, askResp, target, targetIP, task, file)
} else {
// 接收方拒绝
task.Status = TransferStatusRejected
s.transferList.Store(task.ID, task)
s.app.Event.Emit("transfer:refreshList")
return
}
}
func (s *Service) SendFolder(target *discovery.Peer, targetIP string, folderPath string) {
size, err := calculateTarSize(folderPath)
taskID := uuid.New().String()
ctx, cancel := context.WithCancel(context.Background())
s.cancelMap.Store(taskID, cancel)
// 任务结束后清理 ctx
defer func() {
s.cancelMap.Delete(taskID)
cancel()
}()
size, err := calculateTarSize(ctx, folderPath)
if err != nil {
slog.Error("Failed to calculate folder size", "path", folderPath, "error", err, "component", "transfer-client")
return
}
r, w := io.Pipe()
go func() {
defer w.Close()
if err := streamFolderToTar(w, folderPath); err != nil {
slog.Error("Failed to stream folder to tar", "error", err, "component", "transfer-client")
w.CloseWithError(err)
}
}()
task := Transfer{
ID: uuid.New().String(),
ID: taskID,
FileName: filepath.Base(folderPath),
FileSize: size,
Sender: Sender{
@@ -76,13 +112,51 @@ func (s *Service) SendFolder(target *discovery.Peer, targetIP string, folderPath
ContentType: ContentTypeFolder,
}
s.processTransfer(target, targetIP, task, r)
s.transferList.Store(task.ID, task)
s.app.Event.Emit("transfer:refreshList")
askResp, err := s.ask(ctx, target, targetIP, task)
if err != nil {
// 如果请求发送失败,更新状态为 Error
task.Status = TransferStatusError
task.ErrorMsg = fmt.Sprintf("Failed to connect to receiver: %v", err)
s.transferList.Store(task.ID, task)
s.app.Event.Emit("transfer:refreshList")
return
}
if askResp.Accepted {
r, w := io.Pipe()
go func(ctx context.Context) {
defer w.Close()
if err := streamFolderToTar(ctx, w, folderPath); err != nil {
slog.Error("Failed to stream folder to tar", "error", err, "component", "transfer-client")
w.CloseWithError(err)
}
}(ctx)
s.processTransfer(ctx, askResp, target, targetIP, task, r)
} else {
// 接收方拒绝
task.Status = TransferStatusRejected
s.transferList.Store(task.ID, task)
s.app.Event.Emit("transfer:refreshList")
return
}
}
func (s *Service) SendText(target *discovery.Peer, targetIP string, text string) {
reader := bytes.NewReader([]byte(text))
taskID := uuid.New().String()
ctx, cancel := context.WithCancel(context.Background())
s.cancelMap.Store(taskID, cancel)
// 任务结束后清理 ctx
defer func() {
s.cancelMap.Delete(taskID)
cancel()
}()
r := bytes.NewReader([]byte(text))
task := Transfer{
ID: uuid.New().String(),
ID: taskID,
FileName: "",
FileSize: int64(len(text)),
Sender: Sender{
@@ -94,7 +168,135 @@ func (s *Service) SendText(target *discovery.Peer, targetIP string, text string)
ContentType: ContentTypeText,
}
s.processTransfer(target, targetIP, task, reader)
s.transferList.Store(task.ID, task)
s.app.Event.Emit("transfer:refreshList")
askResp, err := s.ask(ctx, target, targetIP, task)
if err != nil {
// 如果请求发送失败,更新状态为 Error
task.Status = TransferStatusError
task.ErrorMsg = fmt.Sprintf("Failed to connect to receiver: %v", err)
s.transferList.Store(task.ID, task)
s.app.Event.Emit("transfer:refreshList")
return
}
if askResp.Accepted {
s.processTransfer(ctx, askResp, target, targetIP, task, r)
} else {
// 接收方拒绝
task.Status = TransferStatusRejected
s.transferList.Store(task.ID, task)
s.app.Event.Emit("transfer:refreshList")
return
}
}
// ask 向接收端发送传输请求
func (s *Service) ask(ctx context.Context, target *discovery.Peer, targetIP string, task Transfer) (TransferAskResponse, error) {
if err := ctx.Err(); err != nil {
return TransferAskResponse{}, err
}
// 发送请求
askBody, _ := json.Marshal(task)
askUrl := fmt.Sprintf("http://%s:%d/transfer/ask", targetIP, target.Port)
req, err := http.NewRequestWithContext(ctx, http.MethodPost, askUrl, bytes.NewReader(askBody))
req.Header.Set("Content-Type", "application/json")
if err != nil {
return TransferAskResponse{}, err
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return TransferAskResponse{}, err
}
defer resp.Body.Close()
var askResp TransferAskResponse
if err := json.NewDecoder(resp.Body).Decode(&askResp); err != nil {
return TransferAskResponse{}, err
}
if resp.StatusCode != http.StatusOK {
return TransferAskResponse{}, errors.New(askResp.Message)
}
return askResp, nil
}
// processTransfer 传输数据
func (s *Service) processTransfer(ctx context.Context, askResp TransferAskResponse, target *discovery.Peer, targetIP string, task Transfer, payload io.Reader) {
if err := ctx.Err(); err != nil {
return
}
uploadUrl, _ := url.Parse(fmt.Sprintf("http://%s:%d/transfer/upload/%s", targetIP, target.Port, task.ID))
query := uploadUrl.Query()
query.Add("token", askResp.Token)
uploadUrl.RawQuery = query.Encode()
reader := &PassThroughReader{
Reader: payload,
total: task.FileSize,
callback: func(current, total int64, speed float64) {
task.Progress = Progress{
Current: current,
Total: total,
Speed: speed,
}
task.Status = TransferStatusActive
s.transferList.Store(task.ID, task)
s.app.Event.Emit("transfer:refreshList")
},
}
req, err := http.NewRequestWithContext(ctx, http.MethodPut, uploadUrl.String(), reader)
if err != nil {
return
}
req.ContentLength = task.FileSize
req.Header.Set("Content-Type", "application/octet-stream")
resp, err := http.DefaultClient.Do(req)
if err != nil {
if errors.Is(err, context.Canceled) {
task.Status = TransferStatusCanceled
} else {
task.Status = TransferStatusError
task.ErrorMsg = fmt.Sprintf("Failed to upload file: %v", err)
slog.Error("Failed to upload file", "url", uploadUrl.String(), "error", err, "component", "transfer-client")
}
s.transferList.Store(task.ID, task)
s.app.Event.Emit("transfer:refreshList")
return
}
defer resp.Body.Close()
var uploadResp TransferUploadResponse
if err := json.NewDecoder(resp.Body).Decode(&uploadResp); err != nil {
return
}
if resp.StatusCode != http.StatusOK {
task.Status = TransferStatusError
task.ErrorMsg = uploadResp.Message
s.transferList.Store(task.ID, task)
s.app.Event.Emit("transfer:refreshList")
return
}
// 判断任务完成还是被接收端取消
if uploadResp.Status == TransferStatusCanceled {
task.Status = TransferStatusCanceled
task.ErrorMsg = uploadResp.Message
s.transferList.Store(task.ID, task)
s.app.Event.Emit("transfer:refreshList")
return
}
// 传输成功,任务结束
task.Status = TransferStatusCompleted
s.transferList.Store(task.ID, task)
s.app.Event.Emit("transfer:refreshList")
}
type countWriter struct {
@@ -106,12 +308,15 @@ func (w *countWriter) Write(p []byte) (int, error) {
return len(p), nil
}
func calculateTarSize(srcPath string) (int64, error) {
func calculateTarSize(ctx context.Context, srcPath string) (int64, error) {
var size int64
err := filepath.Walk(srcPath, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if ctx.Err() != nil {
return ctx.Err()
}
// 计算相对路径
relPath, err := filepath.Rel(srcPath, path)
@@ -158,7 +363,7 @@ func calculateTarSize(srcPath string) (int64, error) {
return size, err
}
func streamFolderToTar(w io.Writer, srcPath string) error {
func streamFolderToTar(ctx context.Context, w io.Writer, srcPath string) error {
tw := tar.NewWriter(w)
defer tw.Close()
@@ -166,6 +371,9 @@ func streamFolderToTar(w io.Writer, srcPath string) error {
if err != nil {
return err
}
if ctx.Err() != nil {
return ctx.Err()
}
relPath, err := filepath.Rel(srcPath, path)
if err != nil {
@@ -206,98 +414,3 @@ func streamFolderToTar(w io.Writer, srcPath string) error {
return nil
})
}
func (s *Service) processTransfer(target *discovery.Peer, targetIP string, task Transfer, payload io.Reader) {
s.transferList.Store(task.ID, task)
s.app.Event.Emit("transfer:refreshList")
// 发送请求
askBody, _ := json.Marshal(task)
askUrl := fmt.Sprintf("http://%s:%d/transfer/ask", targetIP, target.Port)
resp, err := http.Post(askUrl, "application/json", bytes.NewReader(askBody))
if err != nil {
slog.Error("Failed to send ask request", "url", askUrl, "error", err, "component", "transfer-client")
// 如果请求发送失败,更新状态为 Error
task.Status = TransferStatusError
task.ErrorMsg = fmt.Sprintf("Failed to connect to receiver: %v", err)
s.transferList.Store(task.ID, task)
s.app.Event.Emit("transfer:refreshList")
return
}
defer resp.Body.Close()
var askResp TransferAskResponse
if err := json.NewDecoder(resp.Body).Decode(&askResp); err != nil {
return
}
if resp.StatusCode != http.StatusOK {
task.Status = TransferStatusError
task.ErrorMsg = askResp.Message
s.transferList.Store(task.ID, task)
s.app.Event.Emit("transfer:refreshList")
return
}
if !askResp.Accepted {
// 接收方拒绝
task.Status = TransferStatusRejected
s.transferList.Store(task.ID, task)
s.app.Event.Emit("transfer:refreshList")
return
}
// 上传
uploadUrl, _ := url.Parse(fmt.Sprintf("http://%s:%d/transfer/upload/%s", targetIP, target.Port, task.ID))
query := uploadUrl.Query()
query.Add("token", askResp.Token)
uploadUrl.RawQuery = query.Encode()
reader := &PassThroughReader{
Reader: payload,
total: task.FileSize,
callback: func(current, total int64, speed float64) {
task.Progress = Progress{
Current: current,
Total: total,
Speed: speed,
}
s.transferList.Store(task.ID, task)
s.app.Event.Emit("transfer:refreshList")
},
}
req, err := http.NewRequest(http.MethodPut, uploadUrl.String(), reader)
if err != nil {
return
}
req.ContentLength = task.FileSize
req.Header.Set("Content-Type", "application/octet-stream")
resp, err = http.DefaultClient.Do(req)
if err != nil {
slog.Error("Failed to upload file", "url", uploadUrl.String(), "error", err, "component", "transfer-client")
return
}
defer resp.Body.Close()
var uploadResp TransferUploadResponse
if err := json.NewDecoder(resp.Body).Decode(&uploadResp); err != nil {
return
}
if resp.StatusCode != http.StatusOK {
task.Status = TransferStatusError
task.ErrorMsg = uploadResp.Message
s.transferList.Store(task.ID, task)
s.app.Event.Emit("transfer:refreshList")
return
}
// 传输成功,任务结束
task.Status = TransferStatusCompleted
s.transferList.Store(task.ID, task)
s.app.Event.Emit("transfer:refreshList")
}