2010-09-08 5 views
1

J'ai une application F # qui communique avec une application Java via le canal nommé. Où F # agit en tant que serveur et java agit en tant que client. L'application fonctionne pour la plupart sauf que le F # se produit dans l'erreur "System.IO.IOException: Toutes les instances de tuyauterie sont occupées" à l'occasion. Vous trouverez ci-dessous la trace de la pile complète de l'exception et du code snip de F # et de Java. Toute aide est appréciée dans la résolution de ce problèmeF #, System.IO.IOException: toutes les instances de pipeline sont occupées

Merci, Sudaly

trace complète de la pile:

Unhandled Exception: System.IO.IOException: All pipe instances are busy. 
    at [email protected]e(Exception e) 
    at <StartupCode$FSharp-Core>[email protected](Trampoline this, FSharpFunc`2 action) 
    at Microsoft.FSharp.Control.Trampoline.ExecuteAction(FSharpFunc`2 firstAction) 
    at Microsoft.FSharp.Control.TrampolineHolder.Protect(FSharpFunc`2 firstAction) 
    at <StartupCode$FSharp-Core>[email protected](Object state) 
    at System.Threading.QueueUserWorkItemCallback.WaitCallback_Context(Object state) 
    at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean ignoreSyncCtx) 
    at System.Threading.QueueUserWorkItemCallback.System.Threading.IThreadPoolWorkItem.ExecuteWorkItem() 
    at System.Threading.ThreadPoolWorkQueue.Dispatch() 
    at System.Threading._ThreadPoolWaitCallback.PerformWaitCallback() 

F # Code:

[<DataContract>] 
type Quote = { 
    [<field: DataMember(Name="securityIdentifier") >] 
    RicCode:string 
    [<field: DataMember(Name="madeOn") >] 
    MadeOn:DateTime 
    [<field: DataMember(Name="closePrice") >] 
    Price:int 
    } 

let globalPriceCache = new Dictionary<string, Quote>() 

let ParseQuoteString (quoteString:string) = 
    let data = Encoding.Unicode.GetBytes(quoteString) 
    let stream = new MemoryStream() 
    stream.Write(data, 0, data.Length); 
    stream.Position <- 0L 
    let ser = Json.DataContractJsonSerializer(typeof<Quote array>) 
    let results:Quote array = ser.ReadObject(stream) :?> Quote array 
    results 

let RefreshCache quoteList = 
    globalPriceCache.Clear() 
    quoteList 
    |> Array.iter(fun result->globalPriceCache.Add(result.RicCode, result)) 

let EstablishConnection() = 
    let pipeServer = new NamedPipeServerStream("testpipe", PipeDirection.InOut, 4) 
    pipeServer.WaitForConnection() 
    try 
     Some(new StreamReader(pipeServer)) 
    with e -> 
     None 

let rec MarketPriceCache() = 
    match EstablishConnection() with 
    |Some(sr) -> 
     // Read request from the stream. 
     let m_cache = 
      sr.ReadLine() 
      |> ParseQuoteString 
      |> RefreshCache 

     MarketPriceCache() 
    | _ ->() 


[<EntryPoint>] 
let main args= 
    try 
     async { 
      MarketPriceCache() 
     } |> Async.Start 

     while true do 
      if globalPriceCache.Count > 0 then 
    //Business logic 
       System.Threading.Thread.Sleep(1000 * 50) 
      else 

       ignore(logInfo(sprintf "%s" "Price Cache is empty")) 
       System.Threading.Thread.Sleep(1000 * 65) 

    with e -> 
     ignore(logError e.Message) 
     ignore(logError e.StackTrace)  
    0 

code Java:

public void WatchForPrice() 
{ 
    while (true) 
    { 
    try 
    { 
    Map<String, SecurityQuoteCacheEntry> priceMap = getPriceCache().getCacheMap(); 
    List<LocalSecurityQuote> localSecurityQuotes = new ArrayList<LocalSecurityQuote>(); 
    if(priceMap != null) 
    { 

    Set<String> keySet = priceMap.keySet(); 
    System.out.println("Key Size: " + keySet.size()); 
    for(String key : keySet) 
    { 
     SecurityQuote quote = priceMap.get(key).getQuote(); 
     if(quote != null) 
     { 
     LocalSecurityQuote localSecurityQuote = new LocalSecurityQuote(); 
     localSecurityQuote.setClosePrice(quote.getClosePrice()); 
     localSecurityQuote.setMadeOn(quote.getMadeOn());  
     localSecurityQuote.setSecurityIdentifier(key); 
     localSecurityQuotes.add(localSecurityQuote); 
     } 

    } 

    JSONSerializer serializer = new JSONSerializer(); 
    String jsonString = serializer.serialize(localSecurityQuotes); 

    // Connect to the pipe 
    RandomAccessFile pipe = new RandomAccessFile("\\\\.\\pipe\\testpipe", "rw"); 
    if (pipe != null) 
    { 
     // write to pipe 
     pipe.write(jsonString.getBytes()); 
     pipe.close(); 

    } 
    else 
     System.out.println("Pipe not found"); 
    doPeriodicWait(); 
    } 
    else 
    System.out.println("No Price data found"); 
    } 
    catch (Exception e) 
    { 
    e.printStackTrace(); 
    System.out.println(e.getMessage()); 
    doPeriodicWait(); 
    } 
    } 
} 

Répondre

4

Ceci est un pressentiment, mais peut-être le problème est que vous ne fermez pas votre lecteur de flux de pipe?

let rec MarketPriceCache() = 
match EstablishConnection() with 
|Some(sr) -> 
    // Read request from the stream.   
    try   
     sr.ReadLine() 
     |> ParseQuoteString 
     |> RefreshCache 
    finally 
     sr.Close() 
    MarketPriceCache() 
| _ ->() 

(m_cache variable n'est pas nécessaire - vous ne l'utilisez pas partout)

+0

Merci Mitya, je vais essayer votre solution et vous faire connaître le résultat – sudaly

3

Vous devez disposer du NamedPipeServerStream chaque fois que vous créez un. La meilleure façon de le faire dans votre code est de disposer de la StreamReader à l'intérieur MarketPriceCache en mettant une déclaration use autour:

let rec MarketPriceCache() = 
    match EstablishConnection() with 
    | Some(sr) -> 
     // Read request from the stream. 
     use reader = sr in 
     (
      let m_cache = 
       reader.ReadLine() 
       |> ParseQuoteString 
       |> RefreshCache 
     ) 
     MarketPriceCache() 
    | _ ->() 

La syntaxe avec using ... in est là pour éviter que la portée du lecteur se termine après la récursif appelez au MarketPriceCache.

+0

Merci Ronald, je vais essayer votre solution et vous faire connaître le résultat – sudaly

+0

En fait, à la fois ma solution et celle par Mitya sont corrects. La liaison 'use' est traduite par le compilateur en une instruction try/finally qui appelle' Dispose' dans la branche finally. Une liaison 'use' est considérée comme une construction de plus haut niveau. –

Questions connexes