2013-05-01 19 views
5

Çok parçalı bir sunucuyla ilgili bir sorun yaşıyorum, özellikle de incelikle kapatmak için bir bağlantı kurarak bir akademik egzersiz olarak yapıyorum.Engelleme kuyruğunda bekleyen bir iş parçacığının zarif bir şekilde sonlandırılması

Her bağlantı bir Oturum sınıfı tarafından yönetilir. Bu sınıf, bağlantı için 2 iş parçacığı, bir DownstreamThread ve bir UpstreamThread tutar.

UpstreamThread, istemci soketini engeller ve tüm gelen dizeleri, ele alınacak başka bir katmana geçirilecek iletilere kodlar. DownstreamThread, istemci için iletilerin eklendiği bir BlockingQueue üzerinde engeller. Kuyrukta bir ileti olduğunda, Aşağı iplik iş parçacığı sırayı alır, bir dizeye dönüştürür ve istemciye gönderir. Son sistemde, bir uygulama katmanı gelen mesajlar üzerinde harekete geçecek ve giden istemcileri uygun istemciye göndermek için sunucuya gönderecek, ancak şimdilik geri gelen bir mesajın yankılanmasından önce gelen bir mesajda bir saniye uyuyan basit bir uygulamaya sahibim. eklenen bir zaman damgası ile giden bir mesaj olarak.

Yaşadığım sorun, istemcinin bağlantısı kesildiğinde her şeyi incelikle kapatmaya çalışmak. Yarışmaya başladığım ilk konu, istemcinin sunucunun bir QUIT komutuyla bağlantıyı sonlandıracağını bilmesine izin verdiği normal bir bağlantı kesme işlemidir. Temel sözdizim kodu:

while (!quitting) { 
    inputString = socket.readLine() // blocks 
    if (inputString != "QUIT") { 
     // forward the message upstream 
     server.acceptMessage (inputString); 
    } else { 
     // Do cleanup 
     quitting = true; 
     socket.close(); 
    } 
} 

Yukarı akış iş parçacığının ana döngü giriş dizesine bakar. QUIT ise, iş parçacığı istemcinin iletişimi sonlandırdığını ve döngüsünden çıktığını belirtmek için bir bayrak ayarlar. Bu, yukarı doğru çalışan ipliği güzelce kapatır.

Bağlantı iş parçacığının ana döngüsü, bağlantı kapatma bayrağı ayarlanmadığı sürece, Engelleme Sorgulama'daki iletiler için bekler. Olduğu zaman, aşağı iplik parçasının da sonlanması gerekiyor. Ancak, öyle değil, sadece orada bekliyor. Onun psuedocode şuna benzer: Bu test ettiğinde

while (!quitting) { 
    outputMessage = messageQueue.take(); // blocks 
    sendMessageToClient (outputMessage); 
} 

, ben istemci çıktığınızda, memba iplik kapatıldı fark ettik ancak alt iplik yoktu.

Biraz kafa tırmaladıktan sonra, aşağı inen parçanın hala gelmeyecek gelen bir iletiyi beklerken BlockingQueue üzerinde engellendiğini fark ettim. Akış yukarı iş parçacığı, QUIT mesajını zincirden daha ileriye doğru iletmez.

Akış aşağı iş parçacığı düzgün bir şekilde nasıl kapatabilirim? Akla ilk gelen fikir, take() çağrısı üzerinde bir zaman aşımı ayarlıyordu. Bu fikre çok meraklı değilim, çünkü seçtiğiniz her ne olursa olsun, tamamen tatmin edici olmayacaktır. Ya çok uzun ve bir zombi ipliği, kapatılmadan önce uzun bir süre orada oturur ya da çok kısadır ve birkaç dakika boyunca boşta kalan ancak hala geçerli olan bağlantılar öldürülür. QUIT mesajını zincire göndermeyi düşünmüştüm, ancak sunucuya, sonra uygulamaya, sonra tekrar sunucuya ve son olarak seansa geri dönüş yapmasını gerektiriyordu. Bu da zarif bir çözüm gibi görünmüyor.

Thread.stop() için belgelere baktım ama görünüşe göre uygun bir şekilde işe yaramadı çünkü görünüşe göre zaten işe yaramadı, bu gerçekten de bir seçenek değil gibi görünüyor. Yaptığım bir diğer fikir ise, aşağı doğru olan bir iplik parçasında bir istisnanın bir şekilde tetiklenmesini ve nihayet son blokta temizlenmesine izin vermekti, ama bu bana korkunç ve küstah bir fikir olarak çarpıyor.

Her iki iş parçacığının da kendi başlarına zarafetle kapanabilmesi gerektiğini hissediyorum, ancak bir iş parçacığı bittiğinde, diğer iş parçacığının yalnızca diğerinin bayrağı ayarından daha proaktif bir şekilde bitmesini işaret etmesi gerektiğinden de şüpheleniyorum. kontrol etmek için iş parçacığı.Hala Java ile çok tecrübeli olmadığımdan, bu noktada fikirlerimden çok uzaktayım. Herhangi bir tavsiyesi varsa, çok takdir edilecektir.

Tamlık uğruna, yukarıdaki Session sınıfının gerçek kodunu ekledim, ancak yukarıdaki sözde kod parçacıklarının sorunun ilgili bölümlerini kapsadığına inanıyorum. Tam sınıf yaklaşık 250 satırdır. Bunun yerine Thread.stop kullanımını Thread.interrupt çağıran

import java.io.*; 
import java.net.*; 
import java.util.concurrent.*; 
import java.util.logging.*; 

/** 
* Session class 
* 
* A session manages the individual connection between a client and the server. 
* It accepts input from the client and sends output to the client over the 
* provided socket. 
* 
*/ 
public class Session { 
    private Socket    clientSocket = null; 
    private Server    server   = null; 
    private Integer    sessionId  = 0; 
    private DownstreamThread downstream  = null; 
    private UpstreamThread  upstream  = null; 
    private boolean    sessionEnding = false; 

    /** 
    * This thread handles waiting for messages from the server and sending 
    * them to the client 
    */ 
    private class DownstreamThread implements Runnable { 
     private BlockingQueue<DownstreamMessage> incomingMessages = null; 
     private OutputStreamWriter     streamWriter  = null; 
     private Session        outer    = null; 

     @Override 
     public void run() { 
      DownstreamMessage message; 
      Thread.currentThread().setName ("DownstreamThread_" + outer.getId()); 

      try { 
       // Send connect message 
       this.sendMessageToClient ("Hello, you are client " + outer.getId()); 

       while (!outer.sessionEnding) { 
        message = this.incomingMessages.take(); 
        this.sendMessageToClient (message.getPayload()); 
       } 

       // Send disconnect message 
       this.sendMessageToClient ("Goodbye, client " + getId()); 

      } catch (InterruptedException | IOException ex) { 
       Logger.getLogger (DownstreamThread.class.getName()).log (Level.SEVERE, ex.getMessage(), ex); 
      } finally { 
       this.terminate(); 
      } 
     } 

     /** 
     * Add a message to the downstream queue 
     * 
     * @param message 
     * @return 
     * @throws InterruptedException 
     */ 
     public DownstreamThread acceptMessage (DownstreamMessage message) throws InterruptedException { 
      if (!outer.sessionEnding) { 
       this.incomingMessages.put (message); 
      } 

      return this; 
     } 

     /** 
     * Send the given message to the client 
     * 
     * @param message 
     * @throws IOException 
     */ 
     private DownstreamThread sendMessageToClient (CharSequence message) throws IOException { 
      OutputStreamWriter osw; 
      // Output to client 
      if (null != (osw = this.getStreamWriter())) { 
       osw.write ((String) message); 
       osw.write ("\r\n"); 
       osw.flush(); 
      } 

      return this; 
     } 

     /** 
     * Perform session cleanup 
     * 
     * @return 
     */ 
     private DownstreamThread terminate() { 
      try { 
       this.streamWriter.close(); 
      } catch (IOException ex) { 
       Logger.getLogger (DownstreamThread.class.getName()).log (Level.SEVERE, ex.getMessage(), ex); 
      } 
      this.streamWriter = null; 

      return this; 
     } 

     /** 
     * Get an output stream writer, initialize it if it's not active 
     * 
     * @return A configured OutputStreamWriter object 
     * @throws IOException 
     */ 
     private OutputStreamWriter getStreamWriter() throws IOException { 
      if ((null == this.streamWriter) 
      && (!outer.sessionEnding)) { 
       BufferedOutputStream os = new BufferedOutputStream (outer.clientSocket.getOutputStream()); 
       this.streamWriter  = new OutputStreamWriter (os, "UTF8"); 
      } 

      return this.streamWriter; 
     } 

     /** 
     * 
     * @param outer 
     */ 
     public DownstreamThread (Session outer) { 
      this.outer    = outer; 
      this.incomingMessages = new LinkedBlockingQueue(); 
      System.out.println ("Class " + this.getClass() + " created"); 
     } 
    } 

    /** 
    * This thread handles waiting for client input and sending it upstream 
    */ 
    private class UpstreamThread implements Runnable { 
     private Session outer = null; 

     @Override 
     public void run() { 
      StringBuffer inputBuffer = new StringBuffer(); 
      BufferedReader inReader; 

      Thread.currentThread().setName ("UpstreamThread_" + outer.getId()); 

      try { 
       inReader = new BufferedReader (new InputStreamReader (outer.clientSocket.getInputStream(), "UTF8")); 

       while (!outer.sessionEnding) { 
        // Read whatever was in the input buffer 
        inputBuffer.delete (0, inputBuffer.length()); 
        inputBuffer.append (inReader.readLine()); 
        System.out.println ("Input message was: " + inputBuffer); 

        if (!inputBuffer.toString().equals ("QUIT")) { 
         // Forward the message up the chain to the Server 
         outer.server.acceptMessage (new UpstreamMessage (sessionId, inputBuffer.toString())); 
        } else { 
         // End the session 
         outer.sessionEnding = true; 
        } 
       } 

      } catch (IOException | InterruptedException e) { 
       Logger.getLogger (Session.class.getName()).log (Level.SEVERE, e.getMessage(), e); 
      } finally { 
       outer.terminate(); 
       outer.server.deleteSession (outer.getId()); 
      } 
     } 

     /** 
     * Class constructor 
     * 
     * The Core Java volume 1 book said that a constructor such as this 
     * should be implicitly created, but that doesn't seem to be the case! 
     * 
     * @param outer 
     */ 
     public UpstreamThread (Session outer) { 
      this.outer = outer; 
      System.out.println ("Class " + this.getClass() + " created"); 
     } 
    } 

    /** 
    * Start the session threads 
    */ 
    public void run() //throws InterruptedException 
    { 
     Thread upThread  = new Thread (this.upstream); 
     Thread downThread = new Thread (this.downstream); 

     upThread.start(); 
     downThread.start(); 
    } 

    /** 
    * Accept a message to send to the client 
    * 
    * @param message 
    * @return 
    * @throws InterruptedException 
    */ 
    public Session acceptMessage (DownstreamMessage message) throws InterruptedException { 
     this.downstream.acceptMessage (message); 
     return this; 
    } 

    /** 
    * Accept a message to send to the client 
    * 
    * @param message 
    * @return 
    * @throws InterruptedException 
    */ 
    public Session acceptMessage (String message) throws InterruptedException { 
     return this.acceptMessage (new DownstreamMessage (this.getId(), message)); 
    } 

    /** 
    * Terminate the client connection 
    */ 
    private void terminate() { 
     try { 
      this.clientSocket.close(); 
     } catch (IOException e) { 
      Logger.getLogger (Session.class.getName()).log (Level.SEVERE, e.getMessage(), e); 
     } 
    } 

    /** 
    * Get this Session's ID 
    * 
    * @return The ID of this session 
    */ 
    public Integer getId() { 
     return this.sessionId; 
    } 

    /** 
    * Session constructor 
    * 
    * @param owner The Server object that owns this session 
    * @param sessionId The unique ID this session will be given 
    * @throws IOException 
    */ 
    public Session (Server owner, Socket clientSocket, Integer sessionId) throws IOException { 

     this.server   = owner; 
     this.clientSocket = clientSocket; 
     this.sessionId  = sessionId; 
     this.upstream  = new UpstreamThread (this); 
     this.downstream  = new DownstreamThread (this); 

     System.out.println ("Class " + this.getClass() + " created"); 
     System.out.println ("Session ID is " + this.sessionId); 
    } 
} 

cevap

3

. Bu take yönteminin kapatmanız gerektiğini bilmek için kullanabileceğiniz bir InterruptedException atmak neden olur.

+1

ben düşünmedim, ama normal kapatma parçası olarak bir kludge biraz bir istisna bir şey zorlama değil? – GordonM

+0

Hayır, bu şekilde çalışmak üzere tasarlandı. Aksi takdirde, kesintiye uğrayabilecek her yöntem özel bir kesinti değeri döndürmek zorunda kalacaktır. Daha fazla bilgi için http://docs.oracle.com/javase/tutorial/essential/concurrency/interrupt.html adresini ziyaret edin. – Brigham

+0

Java hakkında fazla bir şey bilmiyorum, ancak 'Thread.interrupt' dediğinizde ne olacak ve 'take' yöntemini engellemiyoruz? Kesintiyi yükseltmeyeceğimi düşünüyorum ve 'Thread.isInterrupted' kontrol etmeliyiz sanırım. – Zuljin

0

"QUIT" görüntülendiğinde, outer.sessionEnding değerini ayarlamak yerine "sahte" ileti bırakabilirsiniz. Bu sahte mesajı sıraya koymak, DownstreamThread'i uyandıracak ve bitirebilirsiniz. Bu durumda, bu sessionEnding değişkenini bile ortadan kaldırabilirsiniz. Sözde kodda

bu gibi görünebilir:

while (true) { 
    outputMessage = messageQueue.take(); // blocks 
    if (QUIT == outputMessage) 
     break 
    sendMessageToClient (outputMessage); 
} 
İlgili konular