pty read should retry until it's empty

This commit is contained in:
Dongdong Zhou 2023-10-31 21:04:35 +00:00
parent b4a6ec5b04
commit 73414962b1
4 changed files with 78 additions and 39 deletions

View File

@ -275,6 +275,10 @@ pub fn insert_app_info(&self, info: AppInfo) -> Result<()> {
pub fn insert_app(&self, data: AppData) -> Result<()> {
let windows = data.windows.get_untracked();
if windows.is_empty() {
// insert_app is called after window is closed, so we don't want to store it
return Ok(());
}
for (_, window) in &windows {
let _ = self.insert_window(window.clone());
}

View File

@ -35,7 +35,10 @@ pub fn new(cx: Scope, common: Rc<CommonData>) -> Self {
root,
common,
};
data.toggle_expand(&path);
if data.common.workspace.path.is_some() {
// only fill in the child files if there is open folder
data.toggle_expand(&path);
}
data
}

View File

@ -37,7 +37,7 @@
use crate::{
buffer::{get_mod_time, load_file, Buffer},
plugin::{catalog::PluginCatalog, remove_volt, PluginCatalogRpcHandler},
terminal::Terminal,
terminal::{Terminal, TerminalSender},
watcher::{FileWatcher, Notify, WatchToken},
};
@ -50,7 +50,7 @@ pub struct Dispatcher {
core_rpc: CoreRpcHandler,
catalog_rpc: PluginCatalogRpcHandler,
buffers: HashMap<PathBuf, Buffer>,
terminals: HashMap<TermId, Sender<Msg>>,
terminals: HashMap<TermId, TerminalSender>,
file_watcher: FileWatcher,
window_id: usize,
tab_id: usize,
@ -135,7 +135,7 @@ fn handle_notification(&mut self, rpc: ProxyNotification) {
Shutdown {} => {
self.catalog_rpc.shutdown();
for (_, sender) in self.terminals.iter() {
let _ = sender.send(Msg::Shutdown);
sender.send(Msg::Shutdown);
}
self.proxy_rpc.shutdown();
}
@ -177,7 +177,9 @@ fn handle_notification(&mut self, rpc: ProxyNotification) {
self.core_rpc.terminal_process_id(term_id, child_id);
let tx = terminal.tx.clone();
self.terminals.insert(term_id, tx);
let poller = terminal.poller.clone();
let sender = TerminalSender::new(tx, poller);
self.terminals.insert(term_id, sender);
let rpc = self.core_rpc.clone();
thread::spawn(move || {
terminal.run(rpc);
@ -185,7 +187,7 @@ fn handle_notification(&mut self, rpc: ProxyNotification) {
}
TerminalWrite { term_id, content } => {
if let Some(tx) = self.terminals.get(&term_id) {
let _ = tx.send(Msg::Input(content.into_bytes().into()));
tx.send(Msg::Input(content.into_bytes().into()));
}
}
TerminalResize {
@ -201,12 +203,12 @@ fn handle_notification(&mut self, rpc: ProxyNotification) {
cell_height: 1,
};
let _ = tx.send(Msg::Resize(size));
tx.send(Msg::Resize(size));
}
}
TerminalClose { term_id } => {
if let Some(tx) = self.terminals.remove(&term_id) {
let _ = tx.send(Msg::Shutdown);
tx.send(Msg::Shutdown);
}
}
DapStart {

View File

@ -36,9 +36,25 @@
pub type TermConfig = alacritty_terminal::config::Config;
pub struct TerminalSender {
tx: Sender<Msg>,
poller: Arc<polling::Poller>,
}
impl TerminalSender {
pub fn new(tx: Sender<Msg>, poller: Arc<polling::Poller>) -> Self {
Self { tx, poller }
}
pub fn send(&self, msg: Msg) {
let _ = self.tx.send(msg);
let _ = self.poller.notify();
}
}
pub struct Terminal {
term_id: TermId,
poll: Arc<polling::Poller>,
pub(crate) poller: Arc<polling::Poller>,
pub(crate) pty: alacritty_terminal::tty::Pty,
rx: Receiver<Msg>,
pub tx: Sender<Msg>,
@ -78,7 +94,7 @@ pub fn new(
Ok(Terminal {
term_id,
poll,
poller: poll,
pty,
tx,
rx,
@ -94,7 +110,9 @@ pub fn run(&mut self, core_rpc: CoreRpcHandler) {
// Register TTY through EventedRW interface.
unsafe {
self.pty.register(&self.poll, interest, poll_opts).unwrap();
self.pty
.register(&self.poller, interest, poll_opts)
.unwrap();
}
let mut events =
@ -102,10 +120,7 @@ pub fn run(&mut self, core_rpc: CoreRpcHandler) {
'event_loop: loop {
events.clear();
if let Err(err) = self
.poll
.wait(&mut events, Some(std::time::Duration::from_millis(16)))
{
if let Err(err) = self.poller.wait(&mut events, None) {
match err.kind() {
ErrorKind::Interrupted => continue,
_ => panic!("EventLoop polling error: {err:?}"),
@ -134,30 +149,22 @@ pub fn run(&mut self, core_rpc: CoreRpcHandler) {
}
if event.readable {
match self.pty.reader().read(&mut buf) {
Ok(n) => {
core_rpc.update_terminal(
self.term_id,
buf[..n].to_vec(),
);
if let Err(err) = self.pty_read(&core_rpc, &mut buf) {
// On Linux, a `read` on the master side of a PTY can fail
// with `EIO` if the client side hangs up. In that case,
// just loop back round for the inevitable `Exited` event.
// This sucks, but checking the process is either racy or
// blocking.
#[cfg(target_os = "linux")]
if err.raw_os_error() == Some(libc::EIO) {
continue;
}
Err(err) => {
// On Linux, a `read` on the master side of a PTY can fail
// with `EIO` if the client side hangs up. In that case,
// just loop back round for the inevitable `Exited` event.
// This sucks, but checking the process is either racy or
// blocking.
#[cfg(target_os = "linux")]
if err.raw_os_error() == Some(libc::EIO) {
continue;
}
tracing::error!(
"Error reading from PTY in event loop: {}",
err
);
break 'event_loop;
}
tracing::error!(
"Error reading from PTY in event loop: {}",
err
);
break 'event_loop;
}
}
@ -182,12 +189,12 @@ pub fn run(&mut self, core_rpc: CoreRpcHandler) {
// Re-register with new interest.
self.pty
.reregister(&self.poll, interest, poll_opts)
.reregister(&self.poller, interest, poll_opts)
.unwrap();
}
}
core_rpc.terminal_process_stopped(self.term_id);
let _ = self.pty.deregister(&self.poll);
let _ = self.pty.deregister(&self.poller);
}
/// Drain the channel.
@ -205,6 +212,29 @@ fn drain_recv_channel(&mut self, state: &mut State) -> bool {
true
}
#[inline]
fn pty_read(
&mut self,
core_rpc: &CoreRpcHandler,
buf: &mut [u8],
) -> io::Result<()> {
loop {
match self.pty.reader().read(buf) {
Ok(0) => break,
Ok(n) => {
core_rpc.update_terminal(self.term_id, buf[..n].to_vec());
}
Err(err) => match err.kind() {
ErrorKind::Interrupted | ErrorKind::WouldBlock => {
break;
}
_ => return Err(err),
},
}
}
Ok(())
}
#[inline]
fn pty_write(&mut self, state: &mut State) -> io::Result<()> {
state.ensure_next();