2012-11-13 12 views
6

ile eşzamanlı yazma Bir OutputStream'e (bir Sokete ait) büyük miktarda veri parçaları yazmayı içeren bir uygulama yazıyorum. Bunu biraz karmaşık hale getiren şey, genellikle aynı OutputStream'e yazmayı deneyen çok sayıda iş parçacığı olmasıdır. Şu anda, verilerin yazılmakta olduğu OutputStream'in kendi iş parçacığı olması için tasarladım. İş parçacığı, bayt dizilerini yoklayan ve bunları en kısa zamanda yazan bir sıra (LinkedList) içerir.Standart bir OutputStream

private class OutputStreamWriter implements Runnable { 

    private final LinkedList<byte[]> chunkQueue = new LinkedList<byte[]>(); 

    public void run() { 
     OutputStream outputStream = User.this.outputStream; 
     while (true) { 
      try { 
       if (chunkQueue.isEmpty()) { 
        Thread.sleep(100); 
        continue; 
       } 
       outputStream.write(chunkQueue.poll()); 
      } catch (Exception e) { 
       e.printStackTrace(); 
      } 
     } 
    } 
} 

bu tasarım ile sorun daha da yazıyor ortaya çıktıkça, daha fazla veri kuyruklar yukarı ve daha hızlı yazılı almıyor olmasıdır. Başlangıçta, veriler sıraya konduğunda, hemen anında yazılır. Daha sonra yaklaşık 15 saniye sonra, veriler geride kalmaya başlar; Bir gecikme, verilerin gerçekte yazıldığı tarihe sıraya konulduktan sonra gelişir. Zaman geçtikçe, bu gecikme daha uzun ve daha uzun olur. Bu çok fark edilir.

Bunu düzeltmenin bir yolu, verilerin engellenmeye başlamaması için yazımın engellenmeden gönderilmesini sağlayan bir tür ConcurrentOutputStream uygulaması olabilir (heck, sıra o zaman gereksiz olur). Böyle bir uygulama olup olmadığını bilmiyorum - bir tane bulamadım - ve kişisel olarak bir tane bile yazmanın mümkün olmadığını düşünüyorum.

Peki, bunu nasıl yeniden tasarlayabileceğimi öneren var mı?

+4

Bu çok yapıcı değil. Bunun nesi var? –

+0

BufferedOutputStream? –

+0

Bir kenara göre, bağlantılarınızı bağlı listenize senkronize ediyor musunuz? Çünkü tasarım tarafından iş parçacığı güvenli değildir. Ayrıca, soket çıkışının üstünde ne tür bir çıkış akışı var ve bunun üzerinden ne kadar veri aktarıyorsunuz? – Perception

cevap

4

Soketin çıkışı sınırlı; veri üretme işleminizden daha yavaşsa, verilerin arabelleğe alınması gerekir, bununla ilgili bir yol yoktur. "Eşzamanlı" yazılması hiç yardımcı olmaz.

Bellek tüketimini azaltmak için kuyruğa alınmış veriler belirli bir sınırı aştığında veri üretimini duraklatmayı düşünebilirsiniz.

+0

Ben sadece bir SocketChannel hakkında, burada ne duvara şeyler atmaya, ancak ediyorum? –

+0

Yardım edeceğini sanmıyorum.Darboğaz ağ bant genişliği. – irreputable

0

Eşzamanlı yazmanın en ufak bir yarar sağlamayacağına @irreputable ile katılıyorum. Bunun yerine, üretim tarafına, yani sahip olduğunuz ürüne bakmalısınız.

  1. Bir LinkedList yerine bir BlockingQueue kullanın.

  2. Kullanım sıranın engelleme anket çalışması oldukça tanım gereği ortalama% 50 zaman israf olacaktır 100msl için körlemesine bir uyku, daha. Uzun bir süre boyunca gerçekten ekleyebilir.

0

Ben bunların uygulanmasına daha yakından bakıldığında Başlangıçta Java boruları kullanıldı ama çok kısa zamanda DB bağlantıları kapatmak için gereken yavaş bağlantıları kesmek için bir filtre gerekli, hepsi ben kullanarak kendi QueueInputStream oluşturarak sona erdi yüzden senkronize olduğu küçük bir tampon ve Engelleme kuyruk kez kuyrukta tampon koymak doluydu için, küçük tampon yardımıyla ucuz olmalıdır LinkedBlockingQueue kullanılan kilit koşulları için, bu sınıf yalnızca olması amaçlanmıştır dışında serbest kilitlemek olduğunu örneğin başına tek üretici ve tüketici için kullanılan ve nihai OutputStream için sırada bekleyen bayt akışa başlayabileceğiniz ExecutorService geçmelidir:

import java.io.IOException; 
import java.io.OutputStream; 
import java.util.concurrent.*; 

public class QueueOutputStream extends OutputStream 
{ 
    private static final int DEFAULT_BUFFER_SIZE=1024; 
    private static final byte[] END_SIGNAL=new byte[]{}; 

    private final BlockingQueue<byte[]> queue=new LinkedBlockingDeque<>(); 
    private final byte[] buffer; 

    private boolean closed=false; 
    private int count=0; 

    public QueueOutputStream() 
    { 
    this(DEFAULT_BUFFER_SIZE); 
    } 

    public QueueOutputStream(final int bufferSize) 
    { 
    if(bufferSize<=0){ 
     throw new IllegalArgumentException("Buffer size <= 0"); 
    } 
    this.buffer=new byte[bufferSize]; 
    } 

    private synchronized void flushBuffer() 
    { 
    if(count>0){ 
     final byte[] copy=new byte[count]; 
     System.arraycopy(buffer,0,copy,0,count); 
     queue.offer(copy); 
     count=0; 
    } 
    } 

    @Override 
    public synchronized void write(final int b) throws IOException 
    { 
    if(closed){ 
     throw new IllegalStateException("Stream is closed"); 
    } 
    if(count>=buffer.length){ 
     flushBuffer(); 
    } 
    buffer[count++]=(byte)b; 
    } 

    @Override 
    public synchronized void write(final byte[] b, final int off, final int len) throws IOException 
    { 
    super.write(b,off,len); 
    } 

    @Override 
    public synchronized void close() throws IOException 
    { 
    flushBuffer(); 
    queue.offer(END_SIGNAL); 
    closed=true; 
    } 

    public Future<Void> asyncSendToOutputStream(final ExecutorService executor, final OutputStream outputStream) 
    { 
    return executor.submit(
      new Callable<Void>() 
      { 
       @Override 
       public Void call() throws Exception 
       { 
       try{ 
        byte[] buffer=queue.take(); 
        while(buffer!=END_SIGNAL){ 
        outputStream.write(buffer); 
        buffer=queue.take(); 
        } 
        outputStream.flush(); 
       } catch(Exception e){ 
        close(); 
        throw e; 
       } finally{ 
        outputStream.close(); 
       } 
       return null; 
       } 
      } 
    ); 
    } 
İlgili konular