Implement file sending protocol
This commit is contained in:
parent
965ba76a76
commit
293d9fbbb0
116
src/client.rs
116
src/client.rs
|
@ -12,12 +12,20 @@
|
||||||
// ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
// ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||||
// OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
// OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||||
|
|
||||||
use crate::Config;
|
use crate::{server::read_line, Config};
|
||||||
use std::io::Result;
|
use std::{
|
||||||
use tokio::prelude::*;
|
io::{Error, ErrorKind, Result},
|
||||||
|
os::unix::fs::MetadataExt,
|
||||||
|
path::PathBuf,
|
||||||
|
};
|
||||||
|
use tokio::{
|
||||||
|
fs::{remove_file, File},
|
||||||
|
io::{copy, split, BufReader, BufWriter},
|
||||||
|
prelude::*,
|
||||||
|
};
|
||||||
use tokio_libtls::prelude::*;
|
use tokio_libtls::prelude::*;
|
||||||
|
|
||||||
pub(crate) async fn run(config: Config) -> Result<()> {
|
pub(crate) async fn run(config: Config, filename: PathBuf) -> Result<()> {
|
||||||
let (cert, key, ca) = config.load_keys()?;
|
let (cert, key, ca) = config.load_keys()?;
|
||||||
let mut options = config.load_client_options();
|
let mut options = config.load_client_options();
|
||||||
|
|
||||||
|
@ -27,11 +35,103 @@ pub(crate) async fn run(config: Config) -> Result<()> {
|
||||||
.build()
|
.build()
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let addr = config.address.unwrap();
|
let addr = config.address.unwrap();
|
||||||
let mut tls = AsyncTls::connect(&addr.to_string(), &tls_config, options.build())
|
let tls = AsyncTls::connect(&addr.to_string(), &tls_config, options.build())
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let _ = tls.write_all(b"OK\r\n").await;
|
let peer_addr: String = addr.to_string();
|
||||||
let mut buf = vec![0u8; 1024];
|
|
||||||
let _ = tls.read(&mut buf).await;
|
let (reader, writer) = split(tls);
|
||||||
|
let mut reader = BufReader::new(reader);
|
||||||
|
let mut writer = BufWriter::new(writer);
|
||||||
|
|
||||||
|
// Send the filename
|
||||||
|
let name = match filename
|
||||||
|
.file_name()
|
||||||
|
.ok_or_else(|| {
|
||||||
|
debug!("{} failed: file name ({})", peer_addr, filename.display());
|
||||||
|
Error::new(ErrorKind::Other, "file")
|
||||||
|
})?
|
||||||
|
.to_str()
|
||||||
|
{
|
||||||
|
Some(name) => name,
|
||||||
|
None => {
|
||||||
|
debug!(
|
||||||
|
"{} failed: filename format ({})",
|
||||||
|
peer_addr,
|
||||||
|
filename.display()
|
||||||
|
);
|
||||||
|
return Err(Error::new(ErrorKind::Other, "file format"));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let _ = writer.write_all(format!("{}\n", name).as_bytes()).await;
|
||||||
|
|
||||||
|
// Send the file size
|
||||||
|
let file_size = filename.metadata()?.size();
|
||||||
|
let _ = writer
|
||||||
|
.write_all(format!("{}\n", file_size).as_bytes())
|
||||||
|
.await;
|
||||||
|
|
||||||
|
debug!(
|
||||||
|
"{} status: sending {} ({} bytes)",
|
||||||
|
peer_addr,
|
||||||
|
filename.display(),
|
||||||
|
file_size
|
||||||
|
);
|
||||||
|
|
||||||
|
// Send the actual file
|
||||||
|
let file = match File::open(&filename).await {
|
||||||
|
Ok(f) => f,
|
||||||
|
Err(err) => {
|
||||||
|
debug!(
|
||||||
|
"{} failed {}: file ({})",
|
||||||
|
peer_addr,
|
||||||
|
filename.display(),
|
||||||
|
err
|
||||||
|
);
|
||||||
|
return Err(err);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut file_reader = file.take(file_size);
|
||||||
|
let copied = match copy(&mut file_reader, &mut writer).await {
|
||||||
|
Ok(s) => s,
|
||||||
|
Err(err) => {
|
||||||
|
debug!("{} failed: I/O ({:?})", peer_addr, err);
|
||||||
|
return Err(err);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
if copied != file_size {
|
||||||
|
drop(file_reader);
|
||||||
|
let _ = remove_file(&filename).await;
|
||||||
|
warn!(
|
||||||
|
"{} failed: {} ({}/{} bytes)",
|
||||||
|
peer_addr,
|
||||||
|
filename.display(),
|
||||||
|
copied,
|
||||||
|
file_size
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
info!(
|
||||||
|
"{} success: {} ({} bytes)",
|
||||||
|
peer_addr,
|
||||||
|
filename.display(),
|
||||||
|
copied
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read the server result
|
||||||
|
match read_line(&peer_addr, &mut reader).await {
|
||||||
|
Ok(s) if s.starts_with("success") => s,
|
||||||
|
Ok(s) => {
|
||||||
|
debug!("server: {}", s);
|
||||||
|
return Err(Error::new(ErrorKind::Other, s));
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
debug!("{}", err);
|
||||||
|
return Err(err);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
28
src/main.rs
28
src/main.rs
|
@ -110,24 +110,38 @@ async fn main() {
|
||||||
"server name",
|
"server name",
|
||||||
&config.servername.as_ref().unwrap(),
|
&config.servername.as_ref().unwrap(),
|
||||||
);
|
);
|
||||||
|
opts.optopt("f", "file", "send file as client", "filename");
|
||||||
opts.optflag("s", "server", "run server");
|
opts.optflag("s", "server", "run server");
|
||||||
opts.optflag("c", "client", "connect as client");
|
|
||||||
opts.optflag("h", "help", "print this help menu");
|
opts.optflag("h", "help", "print this help menu");
|
||||||
let matches = match opts.parse(&args[1..]) {
|
let matches = match opts.parse(&args[1..]) {
|
||||||
Ok(m) => m,
|
Ok(m) => m,
|
||||||
Err(f) => panic!(f.to_string()),
|
Err(f) => panic!(f.to_string()),
|
||||||
};
|
};
|
||||||
if matches.opt_present("h") || (matches.opt_present("c") && matches.opt_present("s")) {
|
if matches.opt_present("h") || (matches.opt_present("f") && matches.opt_present("s")) {
|
||||||
usage(&program, opts);
|
usage(&program, opts);
|
||||||
}
|
}
|
||||||
|
if let Some(address) = matches.opt_str("a") {
|
||||||
|
let addr: SocketAddr = address.parse().unwrap();
|
||||||
|
config.address = Some(addr);
|
||||||
|
}
|
||||||
|
|
||||||
|
let addr = match config.address {
|
||||||
|
Some(SocketAddr::V6(addr)) => addr.clone(),
|
||||||
|
_ => panic!("invalid address: {:?}", config.address),
|
||||||
|
};
|
||||||
|
|
||||||
env_logger::builder()
|
env_logger::builder()
|
||||||
.filter_level(LevelFilter::Debug)
|
.filter_level(LevelFilter::Debug)
|
||||||
.init();
|
.init();
|
||||||
|
|
||||||
let addr = match config.address {
|
let file = if let Some(file) = matches.opt_str("f") {
|
||||||
Some(SocketAddr::V6(addr)) => addr.clone(),
|
let file = PathBuf::from(file);
|
||||||
_ => panic!("invalid address: {:?}", config.address),
|
if !file.exists() {
|
||||||
|
panic!("invalid file: {}", file.display())
|
||||||
|
}
|
||||||
|
Some(file)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
};
|
};
|
||||||
|
|
||||||
let keypair = KeyPair::new(&config);
|
let keypair = KeyPair::new(&config);
|
||||||
|
@ -140,8 +154,8 @@ async fn main() {
|
||||||
addr.port()
|
addr.port()
|
||||||
);
|
);
|
||||||
|
|
||||||
if matches.opt_present("c") {
|
if let Some(file) = file {
|
||||||
client::run(config).await.expect("client");
|
client::run(config, file).await.expect("client");
|
||||||
} else {
|
} else {
|
||||||
server::run(config).await.expect("server");
|
server::run(config).await.expect("server");
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,7 +19,9 @@ use std::{
|
||||||
};
|
};
|
||||||
use tokio::{
|
use tokio::{
|
||||||
fs::{remove_file, File},
|
fs::{remove_file, File},
|
||||||
io::{self, AsyncReadExt},
|
io::{
|
||||||
|
copy, split, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWriteExt, BufReader, BufWriter,
|
||||||
|
},
|
||||||
net::TcpListener,
|
net::TcpListener,
|
||||||
};
|
};
|
||||||
use tokio_libtls::prelude::*;
|
use tokio_libtls::prelude::*;
|
||||||
|
@ -44,7 +46,7 @@ pub(crate) async fn run(config: Config) -> Result<()> {
|
||||||
let size_limit = config.size_limit as u64;
|
let size_limit = config.size_limit as u64;
|
||||||
let peer_addr: String = tcp.peer_addr()?.to_string();
|
let peer_addr: String = tcp.peer_addr()?.to_string();
|
||||||
let options = options.build();
|
let options = options.build();
|
||||||
let mut tls = match AsyncTls::accept_stream(tcp, &tls_config, options).await {
|
let tls = match AsyncTls::accept_stream(tcp, &tls_config, options).await {
|
||||||
Ok(tls) => {
|
Ok(tls) => {
|
||||||
debug!("{} status: connected", peer_addr);
|
debug!("{} status: connected", peer_addr);
|
||||||
tls
|
tls
|
||||||
|
@ -55,11 +57,15 @@ pub(crate) async fn run(config: Config) -> Result<()> {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let (reader, mut writer) = split(tls);
|
||||||
|
let mut reader = BufReader::new(reader);
|
||||||
|
// let mut writer = BufWriter::new(writer);
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let mut filename = PathBuf::from("/home/reyk/Downloads");
|
let mut filename = PathBuf::from("/home/reyk/Downloads");
|
||||||
|
|
||||||
// Read and validate the filename
|
// Read and validate the filename
|
||||||
let line = match read_line(&peer_addr, &mut tls).await {
|
let line = match read_line(&peer_addr, &mut reader).await {
|
||||||
Ok(s) if !(s.contains(MAIN_SEPARATOR) || s.contains('/') || s.contains('\\')) => s,
|
Ok(s) if !(s.contains(MAIN_SEPARATOR) || s.contains('/') || s.contains('\\')) => s,
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
debug!("{}", err);
|
debug!("{}", err);
|
||||||
|
@ -89,7 +95,7 @@ pub(crate) async fn run(config: Config) -> Result<()> {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Read the file size
|
// Read the file size
|
||||||
let line = match read_line(&peer_addr, &mut tls).await {
|
let line = match read_line(&peer_addr, &mut reader).await {
|
||||||
Ok(s) => s,
|
Ok(s) => s,
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
debug!("{}", err);
|
debug!("{}", err);
|
||||||
|
@ -119,7 +125,7 @@ pub(crate) async fn run(config: Config) -> Result<()> {
|
||||||
);
|
);
|
||||||
|
|
||||||
// Create output file
|
// Create output file
|
||||||
let mut file = match File::create(&filename).await {
|
let file = match File::create(&filename).await {
|
||||||
Ok(f) => f,
|
Ok(f) => f,
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
debug!(
|
debug!(
|
||||||
|
@ -133,8 +139,9 @@ pub(crate) async fn run(config: Config) -> Result<()> {
|
||||||
};
|
};
|
||||||
|
|
||||||
// I/O
|
// I/O
|
||||||
let mut reader = tls.take(file_size);
|
let mut file_writer = BufWriter::new(file);
|
||||||
let copied = match io::copy(&mut reader, &mut file).await {
|
let mut reader = reader.take(file_size);
|
||||||
|
let copied = match copy(&mut reader, &mut file_writer).await {
|
||||||
Ok(s) => s,
|
Ok(s) => s,
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
debug!("{} failed: I/O ({})", peer_addr, err);
|
debug!("{} failed: I/O ({})", peer_addr, err);
|
||||||
|
@ -142,9 +149,10 @@ pub(crate) async fn run(config: Config) -> Result<()> {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
if copied != file_size {
|
// Check result and send response
|
||||||
drop(file);
|
let result = if copied != file_size {
|
||||||
let _ = remove_file("a.txt").await.is_ok();
|
drop(file_writer);
|
||||||
|
let _ = remove_file(&filename).await.is_ok();
|
||||||
warn!(
|
warn!(
|
||||||
"{} failed: {} ({}/{} bytes)",
|
"{} failed: {} ({}/{} bytes)",
|
||||||
peer_addr,
|
peer_addr,
|
||||||
|
@ -152,6 +160,7 @@ pub(crate) async fn run(config: Config) -> Result<()> {
|
||||||
copied,
|
copied,
|
||||||
file_size
|
file_size
|
||||||
);
|
);
|
||||||
|
"failed: truncated file\n".to_string()
|
||||||
} else {
|
} else {
|
||||||
info!(
|
info!(
|
||||||
"{} success: {} ({} bytes)",
|
"{} success: {} ({} bytes)",
|
||||||
|
@ -159,28 +168,34 @@ pub(crate) async fn run(config: Config) -> Result<()> {
|
||||||
filename.display(),
|
filename.display(),
|
||||||
copied
|
copied
|
||||||
);
|
);
|
||||||
}
|
"success\n".to_string()
|
||||||
|
};
|
||||||
|
|
||||||
|
let _ = writer.write_all(result.as_bytes()).await;
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn read_line<T: io::AsyncRead + Unpin>(peer: &str, reader: &mut T) -> Result<String> {
|
pub async fn read_line<T: AsyncRead + AsyncBufReadExt + Unpin>(
|
||||||
let mut buf = vec![0u8; 1024];
|
peer: &str,
|
||||||
if let Err(err) = reader.read(&mut buf).await {
|
reader: &mut T,
|
||||||
return Err(Error::new(
|
) -> Result<String> {
|
||||||
ErrorKind::Other,
|
let mut line = String::with_capacity(1024);
|
||||||
format!("{} failed: read ({})", peer, err),
|
let mut len = 0;
|
||||||
));
|
|
||||||
}
|
// Ignore some empty lines
|
||||||
let line = match String::from_utf8(buf) {
|
for _ in 0..10 {
|
||||||
Ok(s) => s,
|
if let Err(err) = reader.read_line(&mut line).await {
|
||||||
Err(err) => {
|
|
||||||
return Err(Error::new(
|
return Err(Error::new(
|
||||||
ErrorKind::Other,
|
ErrorKind::Other,
|
||||||
format!("{} read failed: line ({})", peer, err),
|
format!("{} failed: read ({})", peer, err),
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
};
|
len = line.find(|c: char| c == '\r' || c == '\n').unwrap_or(0);
|
||||||
let len = line.find(|c: char| c == '\r' || c == '\n').unwrap_or(0);
|
if len > 0 {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
Ok((&line[0..len]).to_owned())
|
Ok((&line[0..len]).to_owned())
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue