Çok parçalı bir sunucuyla ilgili bir sorun yaşıyorum, özellikle de incelikle kapatmak için bir bağlantı kurarak bir akademik egzersiz olarak yapıyorum.Engelleme kuyruğunda bekleyen bir iş parçacığının zarif bir şekilde sonlandırılması
Her bağlantı bir Oturum sınıfı tarafından yönetilir. Bu sınıf, bağlantı için 2 iş parçacığı, bir DownstreamThread ve bir UpstreamThread tutar.
UpstreamThread, istemci soketini engeller ve tüm gelen dizeleri, ele alınacak başka bir katmana geçirilecek iletilere kodlar. DownstreamThread, istemci için iletilerin eklendiği bir BlockingQueue üzerinde engeller. Kuyrukta bir ileti olduğunda, Aşağı iplik iş parçacığı sırayı alır, bir dizeye dönüştürür ve istemciye gönderir. Son sistemde, bir uygulama katmanı gelen mesajlar üzerinde harekete geçecek ve giden istemcileri uygun istemciye göndermek için sunucuya gönderecek, ancak şimdilik geri gelen bir mesajın yankılanmasından önce gelen bir mesajda bir saniye uyuyan basit bir uygulamaya sahibim. eklenen bir zaman damgası ile giden bir mesaj olarak.
Yaşadığım sorun, istemcinin bağlantısı kesildiğinde her şeyi incelikle kapatmaya çalışmak. Yarışmaya başladığım ilk konu, istemcinin sunucunun bir QUIT komutuyla bağlantıyı sonlandıracağını bilmesine izin verdiği normal bir bağlantı kesme işlemidir. Temel sözdizim kodu:
while (!quitting) {
inputString = socket.readLine() // blocks
if (inputString != "QUIT") {
// forward the message upstream
server.acceptMessage (inputString);
} else {
// Do cleanup
quitting = true;
socket.close();
}
}
Yukarı akış iş parçacığının ana döngü giriş dizesine bakar. QUIT ise, iş parçacığı istemcinin iletişimi sonlandırdığını ve döngüsünden çıktığını belirtmek için bir bayrak ayarlar. Bu, yukarı doğru çalışan ipliği güzelce kapatır.
Bağlantı iş parçacığının ana döngüsü, bağlantı kapatma bayrağı ayarlanmadığı sürece, Engelleme Sorgulama'daki iletiler için bekler. Olduğu zaman, aşağı iplik parçasının da sonlanması gerekiyor. Ancak, öyle değil, sadece orada bekliyor. Onun psuedocode şuna benzer: Bu test ettiğinde
while (!quitting) {
outputMessage = messageQueue.take(); // blocks
sendMessageToClient (outputMessage);
}
, ben istemci çıktığınızda, memba iplik kapatıldı fark ettik ancak alt iplik yoktu.
Biraz kafa tırmaladıktan sonra, aşağı inen parçanın hala gelmeyecek gelen bir iletiyi beklerken BlockingQueue üzerinde engellendiğini fark ettim. Akış yukarı iş parçacığı, QUIT mesajını zincirden daha ileriye doğru iletmez.
Akış aşağı iş parçacığı düzgün bir şekilde nasıl kapatabilirim? Akla ilk gelen fikir, take() çağrısı üzerinde bir zaman aşımı ayarlıyordu. Bu fikre çok meraklı değilim, çünkü seçtiğiniz her ne olursa olsun, tamamen tatmin edici olmayacaktır. Ya çok uzun ve bir zombi ipliği, kapatılmadan önce uzun bir süre orada oturur ya da çok kısadır ve birkaç dakika boyunca boşta kalan ancak hala geçerli olan bağlantılar öldürülür. QUIT mesajını zincire göndermeyi düşünmüştüm, ancak sunucuya, sonra uygulamaya, sonra tekrar sunucuya ve son olarak seansa geri dönüş yapmasını gerektiriyordu. Bu da zarif bir çözüm gibi görünmüyor.
Thread.stop() için belgelere baktım ama görünüşe göre uygun bir şekilde işe yaramadı çünkü görünüşe göre zaten işe yaramadı, bu gerçekten de bir seçenek değil gibi görünüyor. Yaptığım bir diğer fikir ise, aşağı doğru olan bir iplik parçasında bir istisnanın bir şekilde tetiklenmesini ve nihayet son blokta temizlenmesine izin vermekti, ama bu bana korkunç ve küstah bir fikir olarak çarpıyor.
Her iki iş parçacığının da kendi başlarına zarafetle kapanabilmesi gerektiğini hissediyorum, ancak bir iş parçacığı bittiğinde, diğer iş parçacığının yalnızca diğerinin bayrağı ayarından daha proaktif bir şekilde bitmesini işaret etmesi gerektiğinden de şüpheleniyorum. kontrol etmek için iş parçacığı.Hala Java ile çok tecrübeli olmadığımdan, bu noktada fikirlerimden çok uzaktayım. Herhangi bir tavsiyesi varsa, çok takdir edilecektir.
Tamlık uğruna, yukarıdaki Session sınıfının gerçek kodunu ekledim, ancak yukarıdaki sözde kod parçacıklarının sorunun ilgili bölümlerini kapsadığına inanıyorum. Tam sınıf yaklaşık 250 satırdır. Bunun yerine Thread.stop
kullanımını Thread.interrupt
çağıran
import java.io.*;
import java.net.*;
import java.util.concurrent.*;
import java.util.logging.*;
/**
* Session class
*
* A session manages the individual connection between a client and the server.
* It accepts input from the client and sends output to the client over the
* provided socket.
*
*/
public class Session {
private Socket clientSocket = null;
private Server server = null;
private Integer sessionId = 0;
private DownstreamThread downstream = null;
private UpstreamThread upstream = null;
private boolean sessionEnding = false;
/**
* This thread handles waiting for messages from the server and sending
* them to the client
*/
private class DownstreamThread implements Runnable {
private BlockingQueue<DownstreamMessage> incomingMessages = null;
private OutputStreamWriter streamWriter = null;
private Session outer = null;
@Override
public void run() {
DownstreamMessage message;
Thread.currentThread().setName ("DownstreamThread_" + outer.getId());
try {
// Send connect message
this.sendMessageToClient ("Hello, you are client " + outer.getId());
while (!outer.sessionEnding) {
message = this.incomingMessages.take();
this.sendMessageToClient (message.getPayload());
}
// Send disconnect message
this.sendMessageToClient ("Goodbye, client " + getId());
} catch (InterruptedException | IOException ex) {
Logger.getLogger (DownstreamThread.class.getName()).log (Level.SEVERE, ex.getMessage(), ex);
} finally {
this.terminate();
}
}
/**
* Add a message to the downstream queue
*
* @param message
* @return
* @throws InterruptedException
*/
public DownstreamThread acceptMessage (DownstreamMessage message) throws InterruptedException {
if (!outer.sessionEnding) {
this.incomingMessages.put (message);
}
return this;
}
/**
* Send the given message to the client
*
* @param message
* @throws IOException
*/
private DownstreamThread sendMessageToClient (CharSequence message) throws IOException {
OutputStreamWriter osw;
// Output to client
if (null != (osw = this.getStreamWriter())) {
osw.write ((String) message);
osw.write ("\r\n");
osw.flush();
}
return this;
}
/**
* Perform session cleanup
*
* @return
*/
private DownstreamThread terminate() {
try {
this.streamWriter.close();
} catch (IOException ex) {
Logger.getLogger (DownstreamThread.class.getName()).log (Level.SEVERE, ex.getMessage(), ex);
}
this.streamWriter = null;
return this;
}
/**
* Get an output stream writer, initialize it if it's not active
*
* @return A configured OutputStreamWriter object
* @throws IOException
*/
private OutputStreamWriter getStreamWriter() throws IOException {
if ((null == this.streamWriter)
&& (!outer.sessionEnding)) {
BufferedOutputStream os = new BufferedOutputStream (outer.clientSocket.getOutputStream());
this.streamWriter = new OutputStreamWriter (os, "UTF8");
}
return this.streamWriter;
}
/**
*
* @param outer
*/
public DownstreamThread (Session outer) {
this.outer = outer;
this.incomingMessages = new LinkedBlockingQueue();
System.out.println ("Class " + this.getClass() + " created");
}
}
/**
* This thread handles waiting for client input and sending it upstream
*/
private class UpstreamThread implements Runnable {
private Session outer = null;
@Override
public void run() {
StringBuffer inputBuffer = new StringBuffer();
BufferedReader inReader;
Thread.currentThread().setName ("UpstreamThread_" + outer.getId());
try {
inReader = new BufferedReader (new InputStreamReader (outer.clientSocket.getInputStream(), "UTF8"));
while (!outer.sessionEnding) {
// Read whatever was in the input buffer
inputBuffer.delete (0, inputBuffer.length());
inputBuffer.append (inReader.readLine());
System.out.println ("Input message was: " + inputBuffer);
if (!inputBuffer.toString().equals ("QUIT")) {
// Forward the message up the chain to the Server
outer.server.acceptMessage (new UpstreamMessage (sessionId, inputBuffer.toString()));
} else {
// End the session
outer.sessionEnding = true;
}
}
} catch (IOException | InterruptedException e) {
Logger.getLogger (Session.class.getName()).log (Level.SEVERE, e.getMessage(), e);
} finally {
outer.terminate();
outer.server.deleteSession (outer.getId());
}
}
/**
* Class constructor
*
* The Core Java volume 1 book said that a constructor such as this
* should be implicitly created, but that doesn't seem to be the case!
*
* @param outer
*/
public UpstreamThread (Session outer) {
this.outer = outer;
System.out.println ("Class " + this.getClass() + " created");
}
}
/**
* Start the session threads
*/
public void run() //throws InterruptedException
{
Thread upThread = new Thread (this.upstream);
Thread downThread = new Thread (this.downstream);
upThread.start();
downThread.start();
}
/**
* Accept a message to send to the client
*
* @param message
* @return
* @throws InterruptedException
*/
public Session acceptMessage (DownstreamMessage message) throws InterruptedException {
this.downstream.acceptMessage (message);
return this;
}
/**
* Accept a message to send to the client
*
* @param message
* @return
* @throws InterruptedException
*/
public Session acceptMessage (String message) throws InterruptedException {
return this.acceptMessage (new DownstreamMessage (this.getId(), message));
}
/**
* Terminate the client connection
*/
private void terminate() {
try {
this.clientSocket.close();
} catch (IOException e) {
Logger.getLogger (Session.class.getName()).log (Level.SEVERE, e.getMessage(), e);
}
}
/**
* Get this Session's ID
*
* @return The ID of this session
*/
public Integer getId() {
return this.sessionId;
}
/**
* Session constructor
*
* @param owner The Server object that owns this session
* @param sessionId The unique ID this session will be given
* @throws IOException
*/
public Session (Server owner, Socket clientSocket, Integer sessionId) throws IOException {
this.server = owner;
this.clientSocket = clientSocket;
this.sessionId = sessionId;
this.upstream = new UpstreamThread (this);
this.downstream = new DownstreamThread (this);
System.out.println ("Class " + this.getClass() + " created");
System.out.println ("Session ID is " + this.sessionId);
}
}
ben düşünmedim, ama normal kapatma parçası olarak bir kludge biraz bir istisna bir şey zorlama değil? – GordonM
Hayır, bu şekilde çalışmak üzere tasarlandı. Aksi takdirde, kesintiye uğrayabilecek her yöntem özel bir kesinti değeri döndürmek zorunda kalacaktır. Daha fazla bilgi için http://docs.oracle.com/javase/tutorial/essential/concurrency/interrupt.html adresini ziyaret edin. – Brigham
Java hakkında fazla bir şey bilmiyorum, ancak 'Thread.interrupt' dediğinizde ne olacak ve 'take' yöntemini engellemiyoruz? Kesintiyi yükseltmeyeceğimi düşünüyorum ve 'Thread.isInterrupted' kontrol etmeliyiz sanırım. – Zuljin